1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include <stdio.h>
18#include <stdint.h>
19#include <string.h>
20#include <errno.h>
21#include <fcntl.h>
22#include <sys/epoll.h>
23#include <sys/types.h>
24#include <sys/socket.h>
25#include <sys/stat.h>
26#include <sys/time.h>
27#include <time.h>
28#include <arpa/inet.h>
29#include <netinet/in.h>
30
31// #define LOG_NDEBUG 0
32#define LOG_TAG "AudioGroup"
33#include <cutils/atomic.h>
34#include <cutils/properties.h>
35#include <utils/Log.h>
36#include <utils/Errors.h>
37#include <utils/RefBase.h>
38#include <utils/threads.h>
39#include <utils/SystemClock.h>
40#include <media/AudioSystem.h>
41#include <media/AudioRecord.h>
42#include <media/AudioTrack.h>
43#include <media/mediarecorder.h>
44#include <media/AudioEffect.h>
45#include <audio_effects/effect_aec.h>
46#include <system/audio.h>
47
48#include "jni.h"
49#include "JNIHelp.h"
50
51#include "AudioCodec.h"
52#include "EchoSuppressor.h"
53
54extern int parse(JNIEnv *env, jstring jAddress, int port, sockaddr_storage *ss);
55
56namespace {
57
58using namespace android;
59
60int gRandom = -1;
61
62// We use a circular array to implement jitter buffer. The simplest way is doing
63// a modulo operation on the index while accessing the array. However modulo can
64// be expensive on some platforms, such as ARM. Thus we round up the size of the
65// array to the nearest power of 2 and then use bitwise-and instead of modulo.
66// Currently we make it 2048ms long and assume packet interval is 50ms or less.
67// The first 100ms is the place where samples get mixed. The rest is the real
68// jitter buffer. For a stream at 8000Hz it takes 32 kilobytes. These numbers
69// are chosen by experiments and each of them can be adjusted as needed.
70
71// Originally a stream does not send packets when it is receive-only or there is
72// nothing to mix. However, this causes some problems with certain firewalls and
73// proxies. A firewall might remove a port mapping when there is no outgoing
74// packet for a preiod of time, and a proxy might wait for incoming packets from
75// both sides before start forwarding. To solve these problems, we send out a
76// silence packet on the stream for every second. It should be good enough to
77// keep the stream alive with relatively low resources.
78
79// Other notes:
80// + We use elapsedRealtime() to get the time. Since we use 32bit variables
81//   instead of 64bit ones, comparison must be done by subtraction.
82// + Sampling rate must be multiple of 1000Hz, and packet length must be in
83//   milliseconds. No floating points.
84// + If we cannot get enough CPU, we drop samples and simulate packet loss.
85// + Resampling is not done yet, so streams in one group must use the same rate.
86//   For the first release only 8000Hz is supported.
87
88#define BUFFER_SIZE     2048
89#define HISTORY_SIZE    100
90#define MEASURE_BASE    100
91#define MEASURE_PERIOD  5000
92#define DTMF_PERIOD     200
93
94class AudioStream
95{
96public:
97    AudioStream();
98    ~AudioStream();
99    bool set(int mode, int socket, sockaddr_storage *remote,
100        AudioCodec *codec, int sampleRate, int sampleCount,
101        int codecType, int dtmfType);
102
103    void sendDtmf(int event);
104    bool mix(int32_t *output, int head, int tail, int sampleRate);
105    void encode(int tick, AudioStream *chain);
106    void decode(int tick);
107
108private:
109    enum {
110        NORMAL = 0,
111        SEND_ONLY = 1,
112        RECEIVE_ONLY = 2,
113        LAST_MODE = 2,
114    };
115
116    int mMode;
117    int mSocket;
118    sockaddr_storage mRemote;
119    AudioCodec *mCodec;
120    uint32_t mCodecMagic;
121    uint32_t mDtmfMagic;
122    bool mFixRemote;
123
124    int mTick;
125    int mSampleRate;
126    int mSampleCount;
127    int mInterval;
128    int mKeepAlive;
129
130    int16_t *mBuffer;
131    int mBufferMask;
132    int mBufferHead;
133    int mBufferTail;
134    int mLatencyTimer;
135    int mLatencyScore;
136
137    uint16_t mSequence;
138    uint32_t mTimestamp;
139    uint32_t mSsrc;
140
141    int mDtmfEvent;
142    int mDtmfStart;
143
144    AudioStream *mNext;
145
146    friend class AudioGroup;
147};
148
149AudioStream::AudioStream()
150{
151    mSocket = -1;
152    mCodec = NULL;
153    mBuffer = NULL;
154    mNext = NULL;
155}
156
157AudioStream::~AudioStream()
158{
159    close(mSocket);
160    delete mCodec;
161    delete [] mBuffer;
162    ALOGD("stream[%d] is dead", mSocket);
163}
164
165bool AudioStream::set(int mode, int socket, sockaddr_storage *remote,
166    AudioCodec *codec, int sampleRate, int sampleCount,
167    int codecType, int dtmfType)
168{
169    if (mode < 0 || mode > LAST_MODE) {
170        return false;
171    }
172    mMode = mode;
173
174    mCodecMagic = (0x8000 | codecType) << 16;
175    mDtmfMagic = (dtmfType == -1) ? 0 : (0x8000 | dtmfType) << 16;
176
177    mTick = elapsedRealtime();
178    mSampleRate = sampleRate / 1000;
179    mSampleCount = sampleCount;
180    mInterval = mSampleCount / mSampleRate;
181
182    // Allocate jitter buffer.
183    for (mBufferMask = 8; mBufferMask < mSampleRate; mBufferMask <<= 1);
184    mBufferMask *= BUFFER_SIZE;
185    mBuffer = new int16_t[mBufferMask];
186    --mBufferMask;
187    mBufferHead = 0;
188    mBufferTail = 0;
189    mLatencyTimer = 0;
190    mLatencyScore = 0;
191
192    // Initialize random bits.
193    read(gRandom, &mSequence, sizeof(mSequence));
194    read(gRandom, &mTimestamp, sizeof(mTimestamp));
195    read(gRandom, &mSsrc, sizeof(mSsrc));
196
197    mDtmfEvent = -1;
198    mDtmfStart = 0;
199
200    // Only take over these things when succeeded.
201    mSocket = socket;
202    if (codec) {
203        mRemote = *remote;
204        mCodec = codec;
205
206        // Here we should never get an private address, but some buggy proxy
207        // servers do give us one. To solve this, we replace the address when
208        // the first time we successfully decode an incoming packet.
209        mFixRemote = false;
210        if (remote->ss_family == AF_INET) {
211            unsigned char *address =
212                (unsigned char *)&((sockaddr_in *)remote)->sin_addr;
213            if (address[0] == 10 ||
214                (address[0] == 172 && (address[1] >> 4) == 1) ||
215                (address[0] == 192 && address[1] == 168)) {
216                mFixRemote = true;
217            }
218        }
219    }
220
221    ALOGD("stream[%d] is configured as %s %dkHz %dms mode %d", mSocket,
222        (codec ? codec->name : "RAW"), mSampleRate, mInterval, mMode);
223    return true;
224}
225
226void AudioStream::sendDtmf(int event)
227{
228    if (mDtmfMagic != 0) {
229        mDtmfEvent = event << 24;
230        mDtmfStart = mTimestamp + mSampleCount;
231    }
232}
233
234bool AudioStream::mix(int32_t *output, int head, int tail, int sampleRate)
235{
236    if (mMode == SEND_ONLY) {
237        return false;
238    }
239
240    if (head - mBufferHead < 0) {
241        head = mBufferHead;
242    }
243    if (tail - mBufferTail > 0) {
244        tail = mBufferTail;
245    }
246    if (tail - head <= 0) {
247        return false;
248    }
249
250    head *= mSampleRate;
251    tail *= mSampleRate;
252
253    if (sampleRate == mSampleRate) {
254        for (int i = head; i - tail < 0; ++i) {
255            output[i - head] += mBuffer[i & mBufferMask];
256        }
257    } else {
258        // TODO: implement resampling.
259        return false;
260    }
261    return true;
262}
263
264void AudioStream::encode(int tick, AudioStream *chain)
265{
266    if (tick - mTick >= mInterval) {
267        // We just missed the train. Pretend that packets in between are lost.
268        int skipped = (tick - mTick) / mInterval;
269        mTick += skipped * mInterval;
270        mSequence += skipped;
271        mTimestamp += skipped * mSampleCount;
272        ALOGV("stream[%d] skips %d packets", mSocket, skipped);
273    }
274
275    tick = mTick;
276    mTick += mInterval;
277    ++mSequence;
278    mTimestamp += mSampleCount;
279
280    // If there is an ongoing DTMF event, send it now.
281    if (mMode != RECEIVE_ONLY && mDtmfEvent != -1) {
282        int duration = mTimestamp - mDtmfStart;
283        // Make sure duration is reasonable.
284        if (duration >= 0 && duration < mSampleRate * DTMF_PERIOD) {
285            duration += mSampleCount;
286            int32_t buffer[4] = {
287                static_cast<int32_t>(htonl(mDtmfMagic | mSequence)),
288                static_cast<int32_t>(htonl(mDtmfStart)),
289                static_cast<int32_t>(mSsrc),
290                static_cast<int32_t>(htonl(mDtmfEvent | duration)),
291            };
292            if (duration >= mSampleRate * DTMF_PERIOD) {
293                buffer[3] |= htonl(1 << 23);
294                mDtmfEvent = -1;
295            }
296            sendto(mSocket, buffer, sizeof(buffer), MSG_DONTWAIT,
297                (sockaddr *)&mRemote, sizeof(mRemote));
298            return;
299        }
300        mDtmfEvent = -1;
301    }
302
303    int32_t buffer[mSampleCount + 3];
304    bool data = false;
305    if (mMode != RECEIVE_ONLY) {
306        // Mix all other streams.
307        memset(buffer, 0, sizeof(buffer));
308        while (chain) {
309            if (chain != this) {
310                data |= chain->mix(buffer, tick - mInterval, tick, mSampleRate);
311            }
312            chain = chain->mNext;
313        }
314    }
315
316    int16_t samples[mSampleCount];
317    if (data) {
318        // Saturate into 16 bits.
319        for (int i = 0; i < mSampleCount; ++i) {
320            int32_t sample = buffer[i];
321            if (sample < -32768) {
322                sample = -32768;
323            }
324            if (sample > 32767) {
325                sample = 32767;
326            }
327            samples[i] = sample;
328        }
329    } else {
330        if ((mTick ^ mKeepAlive) >> 10 == 0) {
331            return;
332        }
333        mKeepAlive = mTick;
334        memset(samples, 0, sizeof(samples));
335
336        if (mMode != RECEIVE_ONLY) {
337            ALOGV("stream[%d] no data", mSocket);
338        }
339    }
340
341    if (!mCodec) {
342        // Special case for device stream.
343        send(mSocket, samples, sizeof(samples), MSG_DONTWAIT);
344        return;
345    }
346
347    // Cook the packet and send it out.
348    buffer[0] = htonl(mCodecMagic | mSequence);
349    buffer[1] = htonl(mTimestamp);
350    buffer[2] = mSsrc;
351    int length = mCodec->encode(&buffer[3], samples);
352    if (length <= 0) {
353        ALOGV("stream[%d] encoder error", mSocket);
354        return;
355    }
356    sendto(mSocket, buffer, length + 12, MSG_DONTWAIT, (sockaddr *)&mRemote,
357        sizeof(mRemote));
358}
359
360void AudioStream::decode(int tick)
361{
362    char c;
363    if (mMode == SEND_ONLY) {
364        recv(mSocket, &c, 1, MSG_DONTWAIT);
365        return;
366    }
367
368    // Make sure mBufferHead and mBufferTail are reasonable.
369    if ((unsigned int)(tick + BUFFER_SIZE - mBufferHead) > BUFFER_SIZE * 2) {
370        mBufferHead = tick - HISTORY_SIZE;
371        mBufferTail = mBufferHead;
372    }
373
374    if (tick - mBufferHead > HISTORY_SIZE) {
375        // Throw away outdated samples.
376        mBufferHead = tick - HISTORY_SIZE;
377        if (mBufferTail - mBufferHead < 0) {
378            mBufferTail = mBufferHead;
379        }
380    }
381
382    // Adjust the jitter buffer if the latency keeps larger than the threshold
383    // in the measurement period.
384    int score = mBufferTail - tick - MEASURE_BASE;
385    if (mLatencyScore > score || mLatencyScore <= 0) {
386        mLatencyScore = score;
387        mLatencyTimer = tick;
388    } else if (tick - mLatencyTimer >= MEASURE_PERIOD) {
389        ALOGV("stream[%d] reduces latency of %dms", mSocket, mLatencyScore);
390        mBufferTail -= mLatencyScore;
391        mLatencyScore = -1;
392    }
393
394    int count = (BUFFER_SIZE - (mBufferTail - mBufferHead)) * mSampleRate;
395    if (count < mSampleCount) {
396        // Buffer overflow. Drop the packet.
397        ALOGV("stream[%d] buffer overflow", mSocket);
398        recv(mSocket, &c, 1, MSG_DONTWAIT);
399        return;
400    }
401
402    // Receive the packet and decode it.
403    int16_t samples[count];
404    if (!mCodec) {
405        // Special case for device stream.
406        count = recv(mSocket, samples, sizeof(samples),
407            MSG_TRUNC | MSG_DONTWAIT) >> 1;
408    } else {
409        __attribute__((aligned(4))) uint8_t buffer[2048];
410        sockaddr_storage remote;
411        socklen_t addrlen = sizeof(remote);
412
413        int length = recvfrom(mSocket, buffer, sizeof(buffer),
414            MSG_TRUNC | MSG_DONTWAIT, (sockaddr *)&remote, &addrlen);
415
416        // Do we need to check SSRC, sequence, and timestamp? They are not
417        // reliable but at least they can be used to identify duplicates?
418        if (length < 12 || length > (int)sizeof(buffer) ||
419            (ntohl(*(uint32_t *)buffer) & 0xC07F0000) != mCodecMagic) {
420            ALOGV("stream[%d] malformed packet", mSocket);
421            return;
422        }
423        int offset = 12 + ((buffer[0] & 0x0F) << 2);
424        if ((buffer[0] & 0x10) != 0) {
425            offset += 4 + (ntohs(*(uint16_t *)&buffer[offset + 2]) << 2);
426        }
427        if ((buffer[0] & 0x20) != 0) {
428            length -= buffer[length - 1];
429        }
430        length -= offset;
431        if (length >= 0) {
432            length = mCodec->decode(samples, count, &buffer[offset], length);
433        }
434        if (length > 0 && mFixRemote) {
435            mRemote = remote;
436            mFixRemote = false;
437        }
438        count = length;
439    }
440    if (count <= 0) {
441        ALOGV("stream[%d] decoder error", mSocket);
442        return;
443    }
444
445    if (tick - mBufferTail > 0) {
446        // Buffer underrun. Reset the jitter buffer.
447        ALOGV("stream[%d] buffer underrun", mSocket);
448        if (mBufferTail - mBufferHead <= 0) {
449            mBufferHead = tick + mInterval;
450            mBufferTail = mBufferHead;
451        } else {
452            int tail = (tick + mInterval) * mSampleRate;
453            for (int i = mBufferTail * mSampleRate; i - tail < 0; ++i) {
454                mBuffer[i & mBufferMask] = 0;
455            }
456            mBufferTail = tick + mInterval;
457        }
458    }
459
460    // Append to the jitter buffer.
461    int tail = mBufferTail * mSampleRate;
462    for (int i = 0; i < count; ++i) {
463        mBuffer[tail & mBufferMask] = samples[i];
464        ++tail;
465    }
466    mBufferTail += mInterval;
467}
468
469//------------------------------------------------------------------------------
470
471class AudioGroup
472{
473public:
474    AudioGroup();
475    ~AudioGroup();
476    bool set(int sampleRate, int sampleCount);
477
478    bool setMode(int mode);
479    bool sendDtmf(int event);
480    bool add(AudioStream *stream);
481    bool remove(AudioStream *stream);
482    bool platformHasAec() { return mPlatformHasAec; }
483
484private:
485    enum {
486        ON_HOLD = 0,
487        MUTED = 1,
488        NORMAL = 2,
489        ECHO_SUPPRESSION = 3,
490        LAST_MODE = 3,
491    };
492
493    bool checkPlatformAec();
494
495    AudioStream *mChain;
496    int mEventQueue;
497    volatile int mDtmfEvent;
498
499    int mMode;
500    int mSampleRate;
501    int mSampleCount;
502    int mDeviceSocket;
503    bool mPlatformHasAec;
504
505    class NetworkThread : public Thread
506    {
507    public:
508        NetworkThread(AudioGroup *group) : Thread(false), mGroup(group) {}
509
510        bool start()
511        {
512            if (run("Network", ANDROID_PRIORITY_AUDIO) != NO_ERROR) {
513                ALOGE("cannot start network thread");
514                return false;
515            }
516            return true;
517        }
518
519    private:
520        AudioGroup *mGroup;
521        bool threadLoop();
522    };
523    sp<NetworkThread> mNetworkThread;
524
525    class DeviceThread : public Thread
526    {
527    public:
528        DeviceThread(AudioGroup *group) : Thread(false), mGroup(group) {}
529
530        bool start()
531        {
532            if (run("Device", ANDROID_PRIORITY_AUDIO) != NO_ERROR) {
533                ALOGE("cannot start device thread");
534                return false;
535            }
536            return true;
537        }
538
539    private:
540        AudioGroup *mGroup;
541        bool threadLoop();
542    };
543    sp<DeviceThread> mDeviceThread;
544};
545
546AudioGroup::AudioGroup()
547{
548    mMode = ON_HOLD;
549    mChain = NULL;
550    mEventQueue = -1;
551    mDtmfEvent = -1;
552    mDeviceSocket = -1;
553    mNetworkThread = new NetworkThread(this);
554    mDeviceThread = new DeviceThread(this);
555    mPlatformHasAec = checkPlatformAec();
556}
557
558AudioGroup::~AudioGroup()
559{
560    mNetworkThread->requestExitAndWait();
561    mDeviceThread->requestExitAndWait();
562    close(mEventQueue);
563    close(mDeviceSocket);
564    while (mChain) {
565        AudioStream *next = mChain->mNext;
566        delete mChain;
567        mChain = next;
568    }
569    ALOGD("group[%d] is dead", mDeviceSocket);
570}
571
572bool AudioGroup::set(int sampleRate, int sampleCount)
573{
574    mEventQueue = epoll_create(2);
575    if (mEventQueue == -1) {
576        ALOGE("epoll_create: %s", strerror(errno));
577        return false;
578    }
579
580    mSampleRate = sampleRate;
581    mSampleCount = sampleCount;
582
583    // Create device socket.
584    int pair[2];
585    if (socketpair(AF_UNIX, SOCK_DGRAM, 0, pair)) {
586        ALOGE("socketpair: %s", strerror(errno));
587        return false;
588    }
589    mDeviceSocket = pair[0];
590
591    // Create device stream.
592    mChain = new AudioStream;
593    if (!mChain->set(AudioStream::NORMAL, pair[1], NULL, NULL,
594        sampleRate, sampleCount, -1, -1)) {
595        close(pair[1]);
596        ALOGE("cannot initialize device stream");
597        return false;
598    }
599
600    // Give device socket a reasonable timeout.
601    timeval tv;
602    tv.tv_sec = 0;
603    tv.tv_usec = 1000 * sampleCount / sampleRate * 500;
604    if (setsockopt(pair[0], SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) {
605        ALOGE("setsockopt: %s", strerror(errno));
606        return false;
607    }
608
609    // Add device stream into event queue.
610    epoll_event event;
611    event.events = EPOLLIN;
612    event.data.ptr = mChain;
613    if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, pair[1], &event)) {
614        ALOGE("epoll_ctl: %s", strerror(errno));
615        return false;
616    }
617
618    // Anything else?
619    ALOGD("stream[%d] joins group[%d]", pair[1], pair[0]);
620    return true;
621}
622
623bool AudioGroup::setMode(int mode)
624{
625    if (mode < 0 || mode > LAST_MODE) {
626        return false;
627    }
628    // FIXME: temporary code to overcome echo and mic gain issues on herring and tuna boards.
629    // Must be modified/removed when the root cause of the issue is fixed in the hardware or
630    // driver
631    char value[PROPERTY_VALUE_MAX];
632    property_get("ro.product.board", value, "");
633    if (mode == NORMAL &&
634            (!strcmp(value, "herring") || !strcmp(value, "tuna"))) {
635        mode = ECHO_SUPPRESSION;
636    }
637    if (mMode == mode) {
638        return true;
639    }
640
641    mDeviceThread->requestExitAndWait();
642    ALOGD("group[%d] switches from mode %d to %d", mDeviceSocket, mMode, mode);
643    mMode = mode;
644    return (mode == ON_HOLD) || mDeviceThread->start();
645}
646
647bool AudioGroup::sendDtmf(int event)
648{
649    if (event < 0 || event > 15) {
650        return false;
651    }
652
653    // DTMF is rarely used, so we try to make it as lightweight as possible.
654    // Using volatile might be dodgy, but using a pipe or pthread primitives
655    // or stop-set-restart threads seems too heavy. Will investigate later.
656    timespec ts;
657    ts.tv_sec = 0;
658    ts.tv_nsec = 100000000;
659    for (int i = 0; mDtmfEvent != -1 && i < 20; ++i) {
660        nanosleep(&ts, NULL);
661    }
662    if (mDtmfEvent != -1) {
663        return false;
664    }
665    mDtmfEvent = event;
666    nanosleep(&ts, NULL);
667    return true;
668}
669
670bool AudioGroup::add(AudioStream *stream)
671{
672    mNetworkThread->requestExitAndWait();
673
674    epoll_event event;
675    event.events = EPOLLIN;
676    event.data.ptr = stream;
677    if (epoll_ctl(mEventQueue, EPOLL_CTL_ADD, stream->mSocket, &event)) {
678        ALOGE("epoll_ctl: %s", strerror(errno));
679        return false;
680    }
681
682    stream->mNext = mChain->mNext;
683    mChain->mNext = stream;
684    if (!mNetworkThread->start()) {
685        // Only take over the stream when succeeded.
686        mChain->mNext = stream->mNext;
687        return false;
688    }
689
690    ALOGD("stream[%d] joins group[%d]", stream->mSocket, mDeviceSocket);
691    return true;
692}
693
694bool AudioGroup::remove(AudioStream *stream)
695{
696    mNetworkThread->requestExitAndWait();
697
698    for (AudioStream *chain = mChain; chain->mNext; chain = chain->mNext) {
699        if (chain->mNext == stream) {
700            if (epoll_ctl(mEventQueue, EPOLL_CTL_DEL, stream->mSocket, NULL)) {
701                ALOGE("epoll_ctl: %s", strerror(errno));
702                return false;
703            }
704            chain->mNext = stream->mNext;
705            ALOGD("stream[%d] leaves group[%d]", stream->mSocket, mDeviceSocket);
706            delete stream;
707            break;
708        }
709    }
710
711    // Do not start network thread if there is only one stream.
712    if (!mChain->mNext || !mNetworkThread->start()) {
713        return false;
714    }
715    return true;
716}
717
718bool AudioGroup::NetworkThread::threadLoop()
719{
720    AudioStream *chain = mGroup->mChain;
721    int tick = elapsedRealtime();
722    int deadline = tick + 10;
723    int count = 0;
724
725    for (AudioStream *stream = chain; stream; stream = stream->mNext) {
726        if (tick - stream->mTick >= 0) {
727            stream->encode(tick, chain);
728        }
729        if (deadline - stream->mTick > 0) {
730            deadline = stream->mTick;
731        }
732        ++count;
733    }
734
735    int event = mGroup->mDtmfEvent;
736    if (event != -1) {
737        for (AudioStream *stream = chain; stream; stream = stream->mNext) {
738            stream->sendDtmf(event);
739        }
740        mGroup->mDtmfEvent = -1;
741    }
742
743    deadline -= tick;
744    if (deadline < 1) {
745        deadline = 1;
746    }
747
748    epoll_event events[count];
749    count = epoll_wait(mGroup->mEventQueue, events, count, deadline);
750    if (count == -1) {
751        ALOGE("epoll_wait: %s", strerror(errno));
752        return false;
753    }
754    for (int i = 0; i < count; ++i) {
755        ((AudioStream *)events[i].data.ptr)->decode(tick);
756    }
757
758    return true;
759}
760
761bool AudioGroup::checkPlatformAec()
762{
763    effect_descriptor_t fxDesc;
764    uint32_t numFx;
765
766    if (AudioEffect::queryNumberEffects(&numFx) != NO_ERROR) {
767        return false;
768    }
769    for (uint32_t i = 0; i < numFx; i++) {
770        if (AudioEffect::queryEffect(i, &fxDesc) != NO_ERROR) {
771            continue;
772        }
773        if (memcmp(&fxDesc.type, FX_IID_AEC, sizeof(effect_uuid_t)) == 0) {
774            return true;
775        }
776    }
777    return false;
778}
779
780bool AudioGroup::DeviceThread::threadLoop()
781{
782    int mode = mGroup->mMode;
783    int sampleRate = mGroup->mSampleRate;
784    int sampleCount = mGroup->mSampleCount;
785    int deviceSocket = mGroup->mDeviceSocket;
786
787    // Find out the frame count for AudioTrack and AudioRecord.
788    size_t output = 0;
789    size_t input = 0;
790    if (AudioTrack::getMinFrameCount(&output, AUDIO_STREAM_VOICE_CALL,
791        sampleRate) != NO_ERROR || output <= 0 ||
792        AudioRecord::getMinFrameCount(&input, sampleRate,
793        AUDIO_FORMAT_PCM_16_BIT, AUDIO_CHANNEL_IN_MONO) != NO_ERROR || input <= 0) {
794        ALOGE("cannot compute frame count");
795        return false;
796    }
797    ALOGD("reported frame count: output %d, input %d", output, input);
798
799    if (output < sampleCount * 2) {
800        output = sampleCount * 2;
801    }
802    if (input < sampleCount * 2) {
803        input = sampleCount * 2;
804    }
805    ALOGD("adjusted frame count: output %d, input %d", output, input);
806
807    // Initialize AudioTrack and AudioRecord.
808    sp<AudioTrack> track = new AudioTrack();
809    sp<AudioRecord> record = new AudioRecord();
810    if (track->set(AUDIO_STREAM_VOICE_CALL, sampleRate, AUDIO_FORMAT_PCM_16_BIT,
811                AUDIO_CHANNEL_OUT_MONO, output, AUDIO_OUTPUT_FLAG_NONE, NULL /*callback_t*/,
812                NULL /*user*/, 0 /*notificationFrames*/, 0 /*sharedBuffer*/,
813                false /*threadCanCallJava*/, 0 /*sessionId*/,
814                AudioTrack::TRANSFER_OBTAIN) != NO_ERROR ||
815            record->set(AUDIO_SOURCE_VOICE_COMMUNICATION, sampleRate, AUDIO_FORMAT_PCM_16_BIT,
816                AUDIO_CHANNEL_IN_MONO, input, NULL /*callback_t*/, NULL /*user*/,
817                0 /*notificationFrames*/, false /*threadCanCallJava*/, 0 /*sessionId*/,
818                AudioRecord::TRANSFER_OBTAIN) != NO_ERROR) {
819        ALOGE("cannot initialize audio device");
820        return false;
821    }
822    ALOGD("latency: output %d, input %d", track->latency(), record->latency());
823
824    // Give device socket a reasonable buffer size.
825    setsockopt(deviceSocket, SOL_SOCKET, SO_RCVBUF, &output, sizeof(output));
826    setsockopt(deviceSocket, SOL_SOCKET, SO_SNDBUF, &output, sizeof(output));
827
828    // Drain device socket.
829    char c;
830    while (recv(deviceSocket, &c, 1, MSG_DONTWAIT) == 1);
831
832    // check if platform supports echo cancellation and do not active local echo suppression in
833    // this case
834    EchoSuppressor *echo = NULL;
835    AudioEffect *aec = NULL;
836    if (mode == ECHO_SUPPRESSION) {
837        if (mGroup->platformHasAec()) {
838            aec = new AudioEffect(FX_IID_AEC,
839                                    NULL,
840                                    0,
841                                    0,
842                                    0,
843                                    record->getSessionId(),
844                                    record->getInput());
845            status_t status = aec->initCheck();
846            if (status == NO_ERROR || status == ALREADY_EXISTS) {
847                aec->setEnabled(true);
848            } else {
849                delete aec;
850                aec = NULL;
851            }
852        }
853        // Create local echo suppressor if platform AEC cannot be used.
854        if (aec == NULL) {
855             echo = new EchoSuppressor(sampleCount,
856                                       (track->latency() + record->latency()) * sampleRate / 1000);
857        }
858    }
859    // Start AudioRecord before AudioTrack. This prevents AudioTrack from being
860    // disabled due to buffer underrun while waiting for AudioRecord.
861    if (mode != MUTED) {
862        record->start();
863        int16_t one;
864        // FIXME this may not work any more
865        record->read(&one, sizeof(one));
866    }
867    track->start();
868
869    while (!exitPending()) {
870        int16_t output[sampleCount];
871        if (recv(deviceSocket, output, sizeof(output), 0) <= 0) {
872            memset(output, 0, sizeof(output));
873        }
874
875        int16_t input[sampleCount];
876        int toWrite = sampleCount;
877        int toRead = (mode == MUTED) ? 0 : sampleCount;
878        int chances = 100;
879
880        while (--chances > 0 && (toWrite > 0 || toRead > 0)) {
881            if (toWrite > 0) {
882                AudioTrack::Buffer buffer;
883                buffer.frameCount = toWrite;
884
885                status_t status = track->obtainBuffer(&buffer, 1);
886                if (status == NO_ERROR) {
887                    int offset = sampleCount - toWrite;
888                    memcpy(buffer.i8, &output[offset], buffer.size);
889                    toWrite -= buffer.frameCount;
890                    track->releaseBuffer(&buffer);
891                } else if (status != TIMED_OUT && status != WOULD_BLOCK) {
892                    ALOGE("cannot write to AudioTrack");
893                    goto exit;
894                }
895            }
896
897            if (toRead > 0) {
898                AudioRecord::Buffer buffer;
899                buffer.frameCount = toRead;
900
901                status_t status = record->obtainBuffer(&buffer, 1);
902                if (status == NO_ERROR) {
903                    int offset = sampleCount - toRead;
904                    memcpy(&input[offset], buffer.i8, buffer.size);
905                    toRead -= buffer.frameCount;
906                    record->releaseBuffer(&buffer);
907                } else if (status != TIMED_OUT && status != WOULD_BLOCK) {
908                    ALOGE("cannot read from AudioRecord");
909                    goto exit;
910                }
911            }
912        }
913
914        if (chances <= 0) {
915            ALOGW("device loop timeout");
916            while (recv(deviceSocket, &c, 1, MSG_DONTWAIT) == 1);
917        }
918
919        if (mode != MUTED) {
920            if (echo != NULL) {
921                ALOGV("echo->run()");
922                echo->run(output, input);
923            }
924            send(deviceSocket, input, sizeof(input), MSG_DONTWAIT);
925        }
926    }
927
928exit:
929    delete echo;
930    delete aec;
931    return true;
932}
933
934//------------------------------------------------------------------------------
935
936static jfieldID gNative;
937static jfieldID gMode;
938
939jlong add(JNIEnv *env, jobject thiz, jint mode,
940    jint socket, jstring jRemoteAddress, jint remotePort,
941    jstring jCodecSpec, jint dtmfType)
942{
943    AudioCodec *codec = NULL;
944    AudioStream *stream = NULL;
945    AudioGroup *group = NULL;
946
947    // Sanity check.
948    sockaddr_storage remote;
949    if (parse(env, jRemoteAddress, remotePort, &remote) < 0) {
950        // Exception already thrown.
951        return 0;
952    }
953    if (!jCodecSpec) {
954        jniThrowNullPointerException(env, "codecSpec");
955        return 0;
956    }
957    const char *codecSpec = env->GetStringUTFChars(jCodecSpec, NULL);
958    if (!codecSpec) {
959        // Exception already thrown.
960        return 0;
961    }
962    socket = dup(socket);
963    if (socket == -1) {
964        jniThrowException(env, "java/lang/IllegalStateException",
965            "cannot get stream socket");
966        return 0;
967    }
968
969    // Create audio codec.
970    int codecType = -1;
971    char codecName[16];
972    int sampleRate = -1;
973    sscanf(codecSpec, "%d %15[^/]%*c%d", &codecType, codecName, &sampleRate);
974    codec = newAudioCodec(codecName);
975    int sampleCount = (codec ? codec->set(sampleRate, codecSpec) : -1);
976    env->ReleaseStringUTFChars(jCodecSpec, codecSpec);
977    if (sampleCount <= 0) {
978        jniThrowException(env, "java/lang/IllegalStateException",
979            "cannot initialize audio codec");
980        goto error;
981    }
982
983    // Create audio stream.
984    stream = new AudioStream;
985    if (!stream->set(mode, socket, &remote, codec, sampleRate, sampleCount,
986        codecType, dtmfType)) {
987        jniThrowException(env, "java/lang/IllegalStateException",
988            "cannot initialize audio stream");
989        goto error;
990    }
991    socket = -1;
992    codec = NULL;
993
994    // Create audio group.
995    group = (AudioGroup *)env->GetLongField(thiz, gNative);
996    if (!group) {
997        int mode = env->GetIntField(thiz, gMode);
998        group = new AudioGroup;
999        if (!group->set(8000, 256) || !group->setMode(mode)) {
1000            jniThrowException(env, "java/lang/IllegalStateException",
1001                "cannot initialize audio group");
1002            goto error;
1003        }
1004    }
1005
1006    // Add audio stream into audio group.
1007    if (!group->add(stream)) {
1008        jniThrowException(env, "java/lang/IllegalStateException",
1009            "cannot add audio stream");
1010        goto error;
1011    }
1012
1013    // Succeed.
1014    env->SetLongField(thiz, gNative, (jlong)group);
1015    return (jlong)stream;
1016
1017error:
1018    delete group;
1019    delete stream;
1020    delete codec;
1021    close(socket);
1022    env->SetLongField(thiz, gNative, 0);
1023    return 0;
1024}
1025
1026void remove(JNIEnv *env, jobject thiz, jlong stream)
1027{
1028    AudioGroup *group = (AudioGroup *)env->GetLongField(thiz, gNative);
1029    if (group) {
1030        if (!stream || !group->remove((AudioStream *)stream)) {
1031            delete group;
1032            env->SetLongField(thiz, gNative, 0);
1033        }
1034    }
1035}
1036
1037void setMode(JNIEnv *env, jobject thiz, jint mode)
1038{
1039    AudioGroup *group = (AudioGroup *)env->GetLongField(thiz, gNative);
1040    if (group && !group->setMode(mode)) {
1041        jniThrowException(env, "java/lang/IllegalArgumentException", NULL);
1042    }
1043}
1044
1045void sendDtmf(JNIEnv *env, jobject thiz, jint event)
1046{
1047    AudioGroup *group = (AudioGroup *)env->GetLongField(thiz, gNative);
1048    if (group && !group->sendDtmf(event)) {
1049        jniThrowException(env, "java/lang/IllegalArgumentException", NULL);
1050    }
1051}
1052
1053JNINativeMethod gMethods[] = {
1054    {"nativeAdd", "(IILjava/lang/String;ILjava/lang/String;I)J", (void *)add},
1055    {"nativeRemove", "(J)V", (void *)remove},
1056    {"nativeSetMode", "(I)V", (void *)setMode},
1057    {"nativeSendDtmf", "(I)V", (void *)sendDtmf},
1058};
1059
1060} // namespace
1061
1062int registerAudioGroup(JNIEnv *env)
1063{
1064    gRandom = open("/dev/urandom", O_RDONLY);
1065    if (gRandom == -1) {
1066        ALOGE("urandom: %s", strerror(errno));
1067        return -1;
1068    }
1069
1070    jclass clazz;
1071    if ((clazz = env->FindClass("android/net/rtp/AudioGroup")) == NULL ||
1072        (gNative = env->GetFieldID(clazz, "mNative", "J")) == NULL ||
1073        (gMode = env->GetFieldID(clazz, "mMode", "I")) == NULL ||
1074        env->RegisterNatives(clazz, gMethods, NELEM(gMethods)) < 0) {
1075        ALOGE("JNI registration failed");
1076        return -1;
1077    }
1078    return 0;
1079}
1080