AudioGroup.cpp revision a54e36694b3909ef367867eb7516b2d6f6031261
1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <stdio.h>
18#include <stdint.h>
19#include <string.h>
20#include <errno.h>
21#include <fcntl.h>
22#include <sys/epoll.h>
23#include <sys/types.h>
24#include <sys/socket.h>
25#include <sys/stat.h>
26#include <sys/time.h>
27#include <time.h>
28#include <arpa/inet.h>
29#include <netinet/in.h>
30
31#define LOG_TAG "AudioGroup"
32#include <cutils/atomic.h>
33#include <utils/Log.h>
34#include <utils/Errors.h>
35#include <utils/RefBase.h>
36#include <utils/threads.h>
37#include <utils/SystemClock.h>
38#include <media/AudioSystem.h>
39#include <media/AudioRecord.h>
40#include <media/AudioTrack.h>
41#include <media/mediarecorder.h>
42
43#include "jni.h"
44#include "JNIHelp.h"
45
46#include "AudioCodec.h"
47#include "EchoSuppressor.h"
48
49extern int parse(JNIEnv *env, jstring jAddress, int port, sockaddr_storage *ss);
50
51namespace {
52
53using namespace android;
54
55int gRandom = -1;
56
57// We use a circular array to implement jitter buffer. The simplest way is doing
58// a modulo operation on the index while accessing the array. However modulo can
59// be expensive on some platforms, such as ARM. Thus we round up the size of the
60// array to the nearest power of 2 and then use bitwise-and instead of modulo.
61// Currently we make it 512ms long and assume packet interval is 40ms or less.
62// The first 80ms is the place where samples get mixed. The rest 432ms is the
63// real jitter buffer. For a stream at 8000Hz it takes 8192 bytes. These numbers
64// are chosen by experiments and each of them can be adjusted as needed.
65
66// Other notes:
67// + We use elapsedRealtime() to get the time. Since we use 32bit variables
68//   instead of 64bit ones, comparison must be done by subtraction.
69// + Sampling rate must be multiple of 1000Hz, and packet length must be in
70//   milliseconds. No floating points.
71// + If we cannot get enough CPU, we drop samples and simulate packet loss.
72// + Resampling is not done yet, so streams in one group must use the same rate.
73//   For the first release only 8000Hz is supported.
74
75#define BUFFER_SIZE     512
76#define HISTORY_SIZE    80
77#define MEASURE_PERIOD  2000
78
79class AudioStream
80{
81public:
82    AudioStream();
83    ~AudioStream();
84    bool set(int mode, int socket, sockaddr_storage *remote,
85        AudioCodec *codec, int sampleRate, int sampleCount,
86        int codecType, int dtmfType);
87
88    void sendDtmf(int event);
89    bool mix(int32_t *output, int head, int tail, int sampleRate);
90    void encode(int tick, AudioStream *chain);
91    void decode(int tick);
92
93private:
94    enum {
95        NORMAL = 0,
96        SEND_ONLY = 1,
97        RECEIVE_ONLY = 2,
98        LAST_MODE = 2,
99    };
100
101    int mMode;
102    int mSocket;
103    sockaddr_storage mRemote;
104    AudioCodec *mCodec;
105    uint32_t mCodecMagic;
106    uint32_t mDtmfMagic;
107    bool mFixRemote;
108
109    int mTick;
110    int mSampleRate;
111    int mSampleCount;
112    int mInterval;
113    int mLogThrottle;
114
115    int16_t *mBuffer;
116    int mBufferMask;
117    int mBufferHead;
118    int mBufferTail;
119    int mLatencyTimer;
120    int mLatencyScore;
121
122    uint16_t mSequence;
123    uint32_t mTimestamp;
124    uint32_t mSsrc;
125
126    int mDtmfEvent;
127    int mDtmfStart;
128
129    AudioStream *mNext;
130
131    friend class AudioGroup;
132};
133
134AudioStream::AudioStream()
135{
136    mSocket = -1;
137    mCodec = NULL;
138    mBuffer = NULL;
139    mNext = NULL;
140}
141
142AudioStream::~AudioStream()
143{
144    close(mSocket);
145    delete mCodec;
146    delete [] mBuffer;
147    LOGD("stream[%d] is dead", mSocket);
148}
149
150bool AudioStream::set(int mode, int socket, sockaddr_storage *remote,
151    AudioCodec *codec, int sampleRate, int sampleCount,
152    int codecType, int dtmfType)
153{
154    if (mode < 0 || mode > LAST_MODE) {
155        return false;
156    }
157    mMode = mode;
158
159    mCodecMagic = (0x8000 | codecType) << 16;
160    mDtmfMagic = (dtmfType == -1) ? 0 : (0x8000 | dtmfType) << 16;
161
162    mTick = elapsedRealtime();
163    mSampleRate = sampleRate / 1000;
164    mSampleCount = sampleCount;
165    mInterval = mSampleCount / mSampleRate;
166
167    // Allocate jitter buffer.
168    for (mBufferMask = 8; mBufferMask < mSampleRate; mBufferMask <<= 1);
169    mBufferMask *= BUFFER_SIZE;
170    mBuffer = new int16_t[mBufferMask];
171    --mBufferMask;
172    mBufferHead = 0;
173    mBufferTail = 0;
174    mLatencyTimer = 0;
175    mLatencyScore = 0;
176
177    // Initialize random bits.
178    read(gRandom, &mSequence, sizeof(mSequence));
179    read(gRandom, &mTimestamp, sizeof(mTimestamp));
180    read(gRandom, &mSsrc, sizeof(mSsrc));
181
182    mDtmfEvent = -1;
183    mDtmfStart = 0;
184
185    // Only take over these things when succeeded.
186    mSocket = socket;
187    if (codec) {
188        mRemote = *remote;
189        mCodec = codec;
190
191        // Here we should never get an private address, but some buggy proxy
192        // servers do give us one. To solve this, we replace the address when
193        // the first time we successfully decode an incoming packet.
194        mFixRemote = false;
195        if (remote->ss_family == AF_INET) {
196            unsigned char *address =
197                (unsigned char *)&((sockaddr_in *)remote)->sin_addr;
198            if (address[0] == 10 ||
199                (address[0] == 172 && (address[1] >> 4) == 1) ||
200                (address[0] == 192 && address[1] == 168)) {
201                mFixRemote = true;
202            }
203        }
204    }
205
206    LOGD("stream[%d] is configured as %s %dkHz %dms mode %d", mSocket,
207        (codec ? codec->name : "RAW"), mSampleRate, mInterval, mMode);
208    return true;
209}
210
211void AudioStream::sendDtmf(int event)
212{
213    if (mDtmfMagic != 0) {
214        mDtmfEvent = event << 24;
215        mDtmfStart = mTimestamp + mSampleCount;
216    }
217}
218
219bool AudioStream::mix(int32_t *output, int head, int tail, int sampleRate)
220{
221    if (mMode == SEND_ONLY) {
222        return false;
223    }
224
225    if (head - mBufferHead < 0) {
226        head = mBufferHead;
227    }
228    if (tail - mBufferTail > 0) {
229        tail = mBufferTail;
230    }
231    if (tail - head <= 0) {
232        return false;
233    }
234
235    head *= mSampleRate;
236    tail *= mSampleRate;
237
238    if (sampleRate == mSampleRate) {
239        for (int i = head; i - tail < 0; ++i) {
240            output[i - head] += mBuffer[i & mBufferMask];
241        }
242    } else {
243        // TODO: implement resampling.
244        return false;
245    }
246    return true;
247}
248
249void AudioStream::encode(int tick, AudioStream *chain)
250{
251    if (tick - mTick >= mInterval) {
252        // We just missed the train. Pretend that packets in between are lost.
253        int skipped = (tick - mTick) / mInterval;
254        mTick += skipped * mInterval;
255        mSequence += skipped;
256        mTimestamp += skipped * mSampleCount;
257        LOGV("stream[%d] skips %d packets", mSocket, skipped);
258    }
259
260    tick = mTick;
261    mTick += mInterval;
262    ++mSequence;
263    mTimestamp += mSampleCount;
264
265    if (mMode == RECEIVE_ONLY) {
266        return;
267    }
268
269    // If there is an ongoing DTMF event, send it now.
270    if (mDtmfEvent != -1) {
271        int duration = mTimestamp - mDtmfStart;
272        // Make sure duration is reasonable.
273        if (duration >= 0 && duration < mSampleRate * 100) {
274            duration += mSampleCount;
275            int32_t buffer[4] = {
276                htonl(mDtmfMagic | mSequence),
277                htonl(mDtmfStart),
278                mSsrc,
279                htonl(mDtmfEvent | duration),
280            };
281            if (duration >= mSampleRate * 100) {
282                buffer[3] |= htonl(1 << 23);
283                mDtmfEvent = -1;
284            }
285            sendto(mSocket, buffer, sizeof(buffer), MSG_DONTWAIT,
286                (sockaddr *)&mRemote, sizeof(mRemote));
287            return;
288        }
289        mDtmfEvent = -1;
290    }
291
292    // It is time to mix streams.
293    bool mixed = false;
294    int32_t buffer[mSampleCount + 3];
295    memset(buffer, 0, sizeof(buffer));
296    while (chain) {
297        if (chain != this &&
298            chain->mix(buffer, tick - mInterval, tick, mSampleRate)) {
299            mixed = true;
300        }
301        chain = chain->mNext;
302    }
303    if (!mixed) {
304        if ((mTick ^ mLogThrottle) >> 10) {
305            mLogThrottle = mTick;
306            LOGV("stream[%d] no data", mSocket);
307        }
308        return;
309    }
310
311    // Cook the packet and send it out.
312    int16_t samples[mSampleCount];
313    for (int i = 0; i < mSampleCount; ++i) {
314        int32_t sample = buffer[i];
315        if (sample < -32768) {
316            sample = -32768;
317        }
318        if (sample > 32767) {
319            sample = 32767;
320        }
321        samples[i] = sample;
322    }
323    if (!mCodec) {
324        // Special case for device stream.
325        send(mSocket, samples, sizeof(samples), MSG_DONTWAIT);
326        return;
327    }
328
329    buffer[0] = htonl(mCodecMagic | mSequence);
330    buffer[1] = htonl(mTimestamp);
331    buffer[2] = mSsrc;
332    int length = mCodec->encode(&buffer[3], samples);
333    if (length <= 0) {
334        LOGV("stream[%d] encoder error", mSocket);
335        return;
336    }
337    sendto(mSocket, buffer, length + 12, MSG_DONTWAIT, (sockaddr *)&mRemote,
338        sizeof(mRemote));
339}
340
341void AudioStream::decode(int tick)
342{
343    char c;
344    if (mMode == SEND_ONLY) {
345        recv(mSocket, &c, 1, MSG_DONTWAIT);
346        return;
347    }
348
349    // Make sure mBufferHead and mBufferTail are reasonable.
350    if ((unsigned int)(tick + BUFFER_SIZE - mBufferHead) > BUFFER_SIZE * 2) {
351        mBufferHead = tick - HISTORY_SIZE;
352        mBufferTail = mBufferHead;
353    }
354
355    if (tick - mBufferHead > HISTORY_SIZE) {
356        // Throw away outdated samples.
357        mBufferHead = tick - HISTORY_SIZE;
358        if (mBufferTail - mBufferHead < 0) {
359            mBufferTail = mBufferHead;
360        }
361    }
362
363    // Adjust the jitter buffer if the latency keeps larger than two times of the
364    // packet interval in the past two seconds.
365    int score = mBufferTail - tick - mInterval * 2;
366    if (mLatencyScore > score) {
367        mLatencyScore = score;
368    }
369    if (mLatencyScore <= 0) {
370        mLatencyTimer = tick;
371        mLatencyScore = score;
372    } else if (tick - mLatencyTimer >= MEASURE_PERIOD) {
373        LOGV("stream[%d] reduces latency of %dms", mSocket, mLatencyScore);
374        mBufferTail -= mLatencyScore;
375        mLatencyTimer = tick;
376    }
377
378    if (mBufferTail - mBufferHead > BUFFER_SIZE - mInterval) {
379        // Buffer overflow. Drop the packet.
380        LOGV("stream[%d] buffer overflow", mSocket);
381        recv(mSocket, &c, 1, MSG_DONTWAIT);
382        return;
383    }
384
385    // Receive the packet and decode it.
386    int16_t samples[mSampleCount];
387    int length = 0;
388    if (!mCodec) {
389        // Special case for device stream.
390        length = recv(mSocket, samples, sizeof(samples),
391            MSG_TRUNC | MSG_DONTWAIT) >> 1;
392    } else {
393        __attribute__((aligned(4))) uint8_t buffer[2048];
394        sockaddr_storage remote;
395        socklen_t len = sizeof(remote);
396
397        length = recvfrom(mSocket, buffer, sizeof(buffer),
398            MSG_TRUNC | MSG_DONTWAIT, (sockaddr *)&remote, &len);
399
400        // Do we need to check SSRC, sequence, and timestamp? They are not
401        // reliable but at least they can be used to identify duplicates?
402        if (length < 12 || length > (int)sizeof(buffer) ||
403            (ntohl(*(uint32_t *)buffer) & 0xC07F0000) != mCodecMagic) {
404            LOGV("stream[%d] malformed packet", mSocket);
405            return;
406        }
407        int offset = 12 + ((buffer[0] & 0x0F) << 2);
408        if ((buffer[0] & 0x10) != 0) {
409            offset += 4 + (ntohs(*(uint16_t *)&buffer[offset + 2]) << 2);
410        }
411        if ((buffer[0] & 0x20) != 0) {
412            length -= buffer[length - 1];
413        }
414        length -= offset;
415        if (length >= 0) {
416            length = mCodec->decode(samples, &buffer[offset], length);
417        }
418        if (length > 0 && mFixRemote) {
419            mRemote = remote;
420            mFixRemote = false;
421        }
422    }
423    if (length <= 0) {
424        LOGV("stream[%d] decoder error", mSocket);
425        return;
426    }
427
428    if (tick - mBufferTail > 0) {
429        // Buffer underrun. Reset the jitter buffer.
430        LOGV("stream[%d] buffer underrun", mSocket);
431        if (mBufferTail - mBufferHead <= 0) {
432            mBufferHead = tick + mInterval;
433            mBufferTail = mBufferHead;
434        } else {
435            int tail = (tick + mInterval) * mSampleRate;
436            for (int i = mBufferTail * mSampleRate; i - tail < 0; ++i) {
437                mBuffer[i & mBufferMask] = 0;
438            }
439            mBufferTail = tick + mInterval;
440        }
441    }
442
443    // Append to the jitter buffer.
444    int tail = mBufferTail * mSampleRate;
445    for (int i = 0; i < mSampleCount; ++i) {
446        mBuffer[tail & mBufferMask] = samples[i];
447        ++tail;
448    }
449    mBufferTail += mInterval;
450}
451
452//------------------------------------------------------------------------------
453
454class AudioGroup
455{
456public:
457    AudioGroup();
458    ~AudioGroup();
459    bool set(int sampleRate, int sampleCount);
460
461    bool setMode(int mode);
462    bool sendDtmf(int event);
463    bool add(AudioStream *stream);
464    bool remove(int socket);
465
466private:
467    enum {
468        ON_HOLD = 0,
469        MUTED = 1,
470        NORMAL = 2,
471        EC_ENABLED = 3,
472        LAST_MODE = 3,
473    };
474
475    AudioStream *mChain;
476    int mEventQueue;
477    volatile int mDtmfEvent;
478
479    int mMode;
480    int mSampleRate;
481    int mSampleCount;
482    int mDeviceSocket;
483
484    class NetworkThread : public Thread
485    {
486    public:
487        NetworkThread(AudioGroup *group) : Thread(false), mGroup(group) {}
488
489        bool start()
490        {
491            if (run("Network", ANDROID_PRIORITY_AUDIO) != NO_ERROR) {
492                LOGE("cannot start network thread");
493                return false;
494            }
495            return true;
496        }
497
498    private:
499        AudioGroup *mGroup;
500        bool threadLoop();
501    };
502    sp<NetworkThread> mNetworkThread;
503
504    class DeviceThread : public Thread
505    {
506    public:
507        DeviceThread(AudioGroup *group) : Thread(false), mGroup(group) {}
508
509        bool start()
510        {
511            if (run("Device", ANDROID_PRIORITY_AUDIO) != NO_ERROR) {
512                LOGE("cannot start device thread");
513                return false;
514            }
515            return true;
516        }
517
518    private:
519        AudioGroup *mGroup;
520        bool threadLoop();
521    };
522    sp<DeviceThread> mDeviceThread;
523};
524
525AudioGroup::AudioGroup()
526{
527    mMode = ON_HOLD;
528    mChain = NULL;
529    mEventQueue = -1;
530    mDtmfEvent = -1;
531    mDeviceSocket = -1;
532    mNetworkThread = new NetworkThread(this);
533    mDeviceThread = new DeviceThread(this);
534}
535
536AudioGroup::~AudioGroup()
537{
538    mNetworkThread->requestExitAndWait();
539    mDeviceThread->requestExitAndWait();
540    close(mEventQueue);
541    close(mDeviceSocket);
542    while (mChain) {
543        AudioStream *next = mChain->mNext;
544        delete mChain;
545        mChain = next;
546    }
547    LOGD("group[%d] is dead", mDeviceSocket);
548}
549
550bool AudioGroup::set(int sampleRate, int sampleCount)
551{
552    mEventQueue = epoll_create(2);
553    if (mEventQueue == -1) {
554        LOGE("epoll_create: %s", strerror(errno));
555        return false;
556    }
557
558    mSampleRate = sampleRate;
559    mSampleCount = sampleCount;
560
561    // Create device socket.
562    int pair[2];
563    if (socketpair(AF_UNIX, SOCK_DGRAM, 0, pair)) {
564        LOGE("socketpair: %s", strerror(errno));
565        return false;
566    }
567    mDeviceSocket = pair[0];
568
569    // Create device stream.
570    mChain = new AudioStream;
571    if (!mChain->set(AudioStream::NORMAL, pair[1], NULL, NULL,
572        sampleRate, sampleCount, -1, -1)) {
573        close(pair[1]);
574        LOGE("cannot initialize device stream");
575        return false;
576    }
577
578    // Give device socket a reasonable timeout.
579    timeval tv;
580    tv.tv_sec = 0;
581    tv.tv_usec = 1000 * sampleCount / sampleRate * 500;
582    if (setsockopt(pair[0], SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
583        LOGE("setsockopt: %s", strerror(errno));
584        return false;
585    }
586
587    // Add device stream into event queue.
588    epoll_event event;
589    event.events = EPOLLIN;
590    event.data.ptr = mChain;
591    if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, pair[1], &event)) {
592        LOGE("epoll_ctl: %s", strerror(errno));
593        return false;
594    }
595
596    // Anything else?
597    LOGD("stream[%d] joins group[%d]", pair[1], pair[0]);
598    return true;
599}
600
601bool AudioGroup::setMode(int mode)
602{
603    if (mode < 0 || mode > LAST_MODE) {
604        return false;
605    }
606    if (mMode == mode) {
607        return true;
608    }
609
610    mDeviceThread->requestExitAndWait();
611    LOGD("group[%d] switches from mode %d to %d", mDeviceSocket, mMode, mode);
612    mMode = mode;
613    return (mode == ON_HOLD) || mDeviceThread->start();
614}
615
616bool AudioGroup::sendDtmf(int event)
617{
618    if (event < 0 || event > 15) {
619        return false;
620    }
621
622    // DTMF is rarely used, so we try to make it as lightweight as possible.
623    // Using volatile might be dodgy, but using a pipe or pthread primitives
624    // or stop-set-restart threads seems too heavy. Will investigate later.
625    timespec ts;
626    ts.tv_sec = 0;
627    ts.tv_nsec = 100000000;
628    for (int i = 0; mDtmfEvent != -1 && i < 20; ++i) {
629        nanosleep(&ts, NULL);
630    }
631    if (mDtmfEvent != -1) {
632        return false;
633    }
634    mDtmfEvent = event;
635    nanosleep(&ts, NULL);
636    return true;
637}
638
639bool AudioGroup::add(AudioStream *stream)
640{
641    mNetworkThread->requestExitAndWait();
642
643    epoll_event event;
644    event.events = EPOLLIN;
645    event.data.ptr = stream;
646    if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, stream->mSocket, &event)) {
647        LOGE("epoll_ctl: %s", strerror(errno));
648        return false;
649    }
650
651    stream->mNext = mChain->mNext;
652    mChain->mNext = stream;
653    if (!mNetworkThread->start()) {
654        // Only take over the stream when succeeded.
655        mChain->mNext = stream->mNext;
656        return false;
657    }
658
659    LOGD("stream[%d] joins group[%d]", stream->mSocket, mDeviceSocket);
660    return true;
661}
662
663bool AudioGroup::remove(int socket)
664{
665    mNetworkThread->requestExitAndWait();
666
667    for (AudioStream *stream = mChain; stream->mNext; stream = stream->mNext) {
668        AudioStream *target = stream->mNext;
669        if (target->mSocket == socket) {
670            if (epoll_ctl(mEventQueue, EPOLL_CTL_DEL, socket, NULL)) {
671                LOGE("epoll_ctl: %s", strerror(errno));
672                return false;
673            }
674            stream->mNext = target->mNext;
675            LOGD("stream[%d] leaves group[%d]", socket, mDeviceSocket);
676            delete target;
677            break;
678        }
679    }
680
681    // Do not start network thread if there is only one stream.
682    if (!mChain->mNext || !mNetworkThread->start()) {
683        return false;
684    }
685    return true;
686}
687
688bool AudioGroup::NetworkThread::threadLoop()
689{
690    AudioStream *chain = mGroup->mChain;
691    int tick = elapsedRealtime();
692    int deadline = tick + 10;
693    int count = 0;
694
695    for (AudioStream *stream = chain; stream; stream = stream->mNext) {
696        if (tick - stream->mTick >= 0) {
697            stream->encode(tick, chain);
698        }
699        if (deadline - stream->mTick > 0) {
700            deadline = stream->mTick;
701        }
702        ++count;
703    }
704
705    int event = mGroup->mDtmfEvent;
706    if (event != -1) {
707        for (AudioStream *stream = chain; stream; stream = stream->mNext) {
708            stream->sendDtmf(event);
709        }
710        mGroup->mDtmfEvent = -1;
711    }
712
713    deadline -= tick;
714    if (deadline < 1) {
715        deadline = 1;
716    }
717
718    epoll_event events[count];
719    count = epoll_wait(mGroup->mEventQueue, events, count, deadline);
720    if (count == -1) {
721        LOGE("epoll_wait: %s", strerror(errno));
722        return false;
723    }
724    for (int i = 0; i < count; ++i) {
725        ((AudioStream *)events[i].data.ptr)->decode(tick);
726    }
727
728    return true;
729}
730
731bool AudioGroup::DeviceThread::threadLoop()
732{
733    int mode = mGroup->mMode;
734    int sampleRate = mGroup->mSampleRate;
735    int sampleCount = mGroup->mSampleCount;
736    int deviceSocket = mGroup->mDeviceSocket;
737
738    // Find out the frame count for AudioTrack and AudioRecord.
739    int output = 0;
740    int input = 0;
741    if (AudioTrack::getMinFrameCount(&output, AudioSystem::VOICE_CALL,
742        sampleRate) != NO_ERROR || output <= 0 ||
743        AudioRecord::getMinFrameCount(&input, sampleRate,
744        AudioSystem::PCM_16_BIT, 1) != NO_ERROR || input <= 0) {
745        LOGE("cannot compute frame count");
746        return false;
747    }
748    LOGD("reported frame count: output %d, input %d", output, input);
749
750    if (output < sampleCount * 2) {
751        output = sampleCount * 2;
752    }
753    if (input < sampleCount * 2) {
754        input = sampleCount * 2;
755    }
756    LOGD("adjusted frame count: output %d, input %d", output, input);
757
758    // Initialize AudioTrack and AudioRecord.
759    AudioTrack track;
760    AudioRecord record;
761    if (track.set(AudioSystem::VOICE_CALL, sampleRate, AudioSystem::PCM_16_BIT,
762        AudioSystem::CHANNEL_OUT_MONO, output) != NO_ERROR ||
763        record.set(AUDIO_SOURCE_MIC, sampleRate, AudioSystem::PCM_16_BIT,
764        AudioSystem::CHANNEL_IN_MONO, input) != NO_ERROR) {
765        LOGE("cannot initialize audio device");
766        return false;
767    }
768    LOGD("latency: output %d, input %d", track.latency(), record.latency());
769
770    // Initialize echo canceler.
771    EchoSuppressor echo(sampleCount,
772        (track.latency() + record.latency()) * sampleRate / 1000);
773
774    // Give device socket a reasonable buffer size.
775    setsockopt(deviceSocket, SOL_SOCKET, SO_RCVBUF, &output, sizeof(output));
776    setsockopt(deviceSocket, SOL_SOCKET, SO_SNDBUF, &output, sizeof(output));
777
778    // Drain device socket.
779    char c;
780    while (recv(deviceSocket, &c, 1, MSG_DONTWAIT) == 1);
781
782    // Start AudioRecord before AudioTrack. This prevents AudioTrack from being
783    // disabled due to buffer underrun while waiting for AudioRecord.
784    if (mode != MUTED) {
785        record.start();
786        int16_t one;
787        record.read(&one, sizeof(one));
788    }
789    track.start();
790
791    while (!exitPending()) {
792        int16_t output[sampleCount];
793        if (recv(deviceSocket, output, sizeof(output), 0) <= 0) {
794            memset(output, 0, sizeof(output));
795        }
796
797        int16_t input[sampleCount];
798        int toWrite = sampleCount;
799        int toRead = (mode == MUTED) ? 0 : sampleCount;
800        int chances = 100;
801
802        while (--chances > 0 && (toWrite > 0 || toRead > 0)) {
803            if (toWrite > 0) {
804                AudioTrack::Buffer buffer;
805                buffer.frameCount = toWrite;
806
807                status_t status = track.obtainBuffer(&buffer, 1);
808                if (status == NO_ERROR) {
809                    int offset = sampleCount - toWrite;
810                    memcpy(buffer.i8, &output[offset], buffer.size);
811                    toWrite -= buffer.frameCount;
812                    track.releaseBuffer(&buffer);
813                } else if (status != TIMED_OUT && status != WOULD_BLOCK) {
814                    LOGE("cannot write to AudioTrack");
815                    return true;
816                }
817            }
818
819            if (toRead > 0) {
820                AudioRecord::Buffer buffer;
821                buffer.frameCount = toRead;
822
823                status_t status = record.obtainBuffer(&buffer, 1);
824                if (status == NO_ERROR) {
825                    int offset = sampleCount - toRead;
826                    memcpy(&input[offset], buffer.i8, buffer.size);
827                    toRead -= buffer.frameCount;
828                    record.releaseBuffer(&buffer);
829                } else if (status != TIMED_OUT && status != WOULD_BLOCK) {
830                    LOGE("cannot read from AudioRecord");
831                    return true;
832                }
833            }
834        }
835
836        if (chances <= 0) {
837            LOGW("device loop timeout");
838            while (recv(deviceSocket, &c, 1, MSG_DONTWAIT) == 1);
839        }
840
841        if (mode != MUTED) {
842            if (mode == NORMAL) {
843                send(deviceSocket, input, sizeof(input), MSG_DONTWAIT);
844            } else {
845                echo.run(output, input);
846                send(deviceSocket, input, sizeof(input), MSG_DONTWAIT);
847            }
848        }
849    }
850    return false;
851}
852
853//------------------------------------------------------------------------------
854
855static jfieldID gNative;
856static jfieldID gMode;
857
858void add(JNIEnv *env, jobject thiz, jint mode,
859    jint socket, jstring jRemoteAddress, jint remotePort,
860    jstring jCodecSpec, jint dtmfType)
861{
862    AudioCodec *codec = NULL;
863    AudioStream *stream = NULL;
864    AudioGroup *group = NULL;
865
866    // Sanity check.
867    sockaddr_storage remote;
868    if (parse(env, jRemoteAddress, remotePort, &remote) < 0) {
869        // Exception already thrown.
870        return;
871    }
872    if (!jCodecSpec) {
873        jniThrowNullPointerException(env, "codecSpec");
874        return;
875    }
876    const char *codecSpec = env->GetStringUTFChars(jCodecSpec, NULL);
877    if (!codecSpec) {
878        // Exception already thrown.
879        return;
880    }
881
882    // Create audio codec.
883    int codecType = -1;
884    char codecName[16];
885    int sampleRate = -1;
886    sscanf(codecSpec, "%d %[^/]%*c%d", &codecType, codecName, &sampleRate);
887    codec = newAudioCodec(codecName);
888    int sampleCount = (codec ? codec->set(sampleRate, codecSpec) : -1);
889    env->ReleaseStringUTFChars(jCodecSpec, codecSpec);
890    if (sampleCount <= 0) {
891        jniThrowException(env, "java/lang/IllegalStateException",
892            "cannot initialize audio codec");
893        goto error;
894    }
895
896    // Create audio stream.
897    stream = new AudioStream;
898    if (!stream->set(mode, socket, &remote, codec, sampleRate, sampleCount,
899        codecType, dtmfType)) {
900        jniThrowException(env, "java/lang/IllegalStateException",
901            "cannot initialize audio stream");
902        goto error;
903    }
904    socket = -1;
905    codec = NULL;
906
907    // Create audio group.
908    group = (AudioGroup *)env->GetIntField(thiz, gNative);
909    if (!group) {
910        int mode = env->GetIntField(thiz, gMode);
911        group = new AudioGroup;
912        if (!group->set(8000, 256) || !group->setMode(mode)) {
913            jniThrowException(env, "java/lang/IllegalStateException",
914                "cannot initialize audio group");
915            goto error;
916        }
917    }
918
919    // Add audio stream into audio group.
920    if (!group->add(stream)) {
921        jniThrowException(env, "java/lang/IllegalStateException",
922            "cannot add audio stream");
923        goto error;
924    }
925
926    // Succeed.
927    env->SetIntField(thiz, gNative, (int)group);
928    return;
929
930error:
931    delete group;
932    delete stream;
933    delete codec;
934    close(socket);
935    env->SetIntField(thiz, gNative, NULL);
936}
937
938void remove(JNIEnv *env, jobject thiz, jint socket)
939{
940    AudioGroup *group = (AudioGroup *)env->GetIntField(thiz, gNative);
941    if (group) {
942        if (socket == -1 || !group->remove(socket)) {
943            delete group;
944            env->SetIntField(thiz, gNative, NULL);
945        }
946    }
947}
948
949void setMode(JNIEnv *env, jobject thiz, jint mode)
950{
951    AudioGroup *group = (AudioGroup *)env->GetIntField(thiz, gNative);
952    if (group && !group->setMode(mode)) {
953        jniThrowException(env, "java/lang/IllegalArgumentException", NULL);
954    }
955}
956
957void sendDtmf(JNIEnv *env, jobject thiz, jint event)
958{
959    AudioGroup *group = (AudioGroup *)env->GetIntField(thiz, gNative);
960    if (group && !group->sendDtmf(event)) {
961        jniThrowException(env, "java/lang/IllegalArgumentException", NULL);
962    }
963}
964
965JNINativeMethod gMethods[] = {
966    {"nativeAdd", "(IILjava/lang/String;ILjava/lang/String;I)V", (void *)add},
967    {"nativeRemove", "(I)V", (void *)remove},
968    {"nativeSetMode", "(I)V", (void *)setMode},
969    {"nativeSendDtmf", "(I)V", (void *)sendDtmf},
970};
971
972} // namespace
973
974int registerAudioGroup(JNIEnv *env)
975{
976    gRandom = open("/dev/urandom", O_RDONLY);
977    if (gRandom == -1) {
978        LOGE("urandom: %s", strerror(errno));
979        return -1;
980    }
981
982    jclass clazz;
983    if ((clazz = env->FindClass("android/net/rtp/AudioGroup")) == NULL ||
984        (gNative = env->GetFieldID(clazz, "mNative", "I")) == NULL ||
985        (gMode = env->GetFieldID(clazz, "mMode", "I")) == NULL ||
986        env->RegisterNatives(clazz, gMethods, NELEM(gMethods)) < 0) {
987        LOGE("JNI registration failed");
988        return -1;
989    }
990    return 0;
991}
992