1/*
2 * Copyright 2012, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "TunnelRenderer"
19#include <utils/Log.h>
20
21#include "TunnelRenderer.h"
22
23#include "ATSParser.h"
24
25#include <binder/IMemory.h>
26#include <binder/IServiceManager.h>
27#include <gui/SurfaceComposerClient.h>
28#include <media/IMediaPlayerService.h>
29#include <media/IStreamSource.h>
30#include <media/stagefright/foundation/ABuffer.h>
31#include <media/stagefright/foundation/ADebug.h>
32#include <media/stagefright/foundation/AMessage.h>
33#include <ui/DisplayInfo.h>
34
35namespace android {
36
37struct TunnelRenderer::PlayerClient : public BnMediaPlayerClient {
38    PlayerClient() {}
39
40    virtual void notify(int msg, int ext1, int ext2, const Parcel *obj) {
41        ALOGI("notify %d, %d, %d", msg, ext1, ext2);
42    }
43
44protected:
45    virtual ~PlayerClient() {}
46
47private:
48    DISALLOW_EVIL_CONSTRUCTORS(PlayerClient);
49};
50
51struct TunnelRenderer::StreamSource : public BnStreamSource {
52    StreamSource(TunnelRenderer *owner);
53
54    virtual void setListener(const sp<IStreamListener> &listener);
55    virtual void setBuffers(const Vector<sp<IMemory> > &buffers);
56
57    virtual void onBufferAvailable(size_t index);
58
59    virtual uint32_t flags() const;
60
61    void doSomeWork();
62
63protected:
64    virtual ~StreamSource();
65
66private:
67    mutable Mutex mLock;
68
69    TunnelRenderer *mOwner;
70
71    sp<IStreamListener> mListener;
72
73    Vector<sp<IMemory> > mBuffers;
74    List<size_t> mIndicesAvailable;
75
76    size_t mNumDeqeued;
77
78    DISALLOW_EVIL_CONSTRUCTORS(StreamSource);
79};
80
81////////////////////////////////////////////////////////////////////////////////
82
83TunnelRenderer::StreamSource::StreamSource(TunnelRenderer *owner)
84    : mOwner(owner),
85      mNumDeqeued(0) {
86}
87
88TunnelRenderer::StreamSource::~StreamSource() {
89}
90
91void TunnelRenderer::StreamSource::setListener(
92        const sp<IStreamListener> &listener) {
93    mListener = listener;
94}
95
96void TunnelRenderer::StreamSource::setBuffers(
97        const Vector<sp<IMemory> > &buffers) {
98    mBuffers = buffers;
99}
100
101void TunnelRenderer::StreamSource::onBufferAvailable(size_t index) {
102    CHECK_LT(index, mBuffers.size());
103
104    {
105        Mutex::Autolock autoLock(mLock);
106        mIndicesAvailable.push_back(index);
107    }
108
109    doSomeWork();
110}
111
112uint32_t TunnelRenderer::StreamSource::flags() const {
113    return kFlagAlignedVideoData;
114}
115
116void TunnelRenderer::StreamSource::doSomeWork() {
117    Mutex::Autolock autoLock(mLock);
118
119    while (!mIndicesAvailable.empty()) {
120        sp<ABuffer> srcBuffer = mOwner->dequeueBuffer();
121        if (srcBuffer == NULL) {
122            break;
123        }
124
125        ++mNumDeqeued;
126
127        if (mNumDeqeued == 1) {
128            ALOGI("fixing real time now.");
129
130            sp<AMessage> extra = new AMessage;
131
132            extra->setInt32(
133                    IStreamListener::kKeyDiscontinuityMask,
134                    ATSParser::DISCONTINUITY_ABSOLUTE_TIME);
135
136            extra->setInt64("timeUs", ALooper::GetNowUs());
137
138            mListener->issueCommand(
139                    IStreamListener::DISCONTINUITY,
140                    false /* synchronous */,
141                    extra);
142        }
143
144        ALOGV("dequeue TS packet of size %d", srcBuffer->size());
145
146        size_t index = *mIndicesAvailable.begin();
147        mIndicesAvailable.erase(mIndicesAvailable.begin());
148
149        sp<IMemory> mem = mBuffers.itemAt(index);
150        CHECK_LE(srcBuffer->size(), mem->size());
151        CHECK_EQ((srcBuffer->size() % 188), 0u);
152
153        memcpy(mem->pointer(), srcBuffer->data(), srcBuffer->size());
154        mListener->queueBuffer(index, srcBuffer->size());
155    }
156}
157
158////////////////////////////////////////////////////////////////////////////////
159
160TunnelRenderer::TunnelRenderer(
161        const sp<AMessage> &notifyLost,
162        const sp<ISurfaceTexture> &surfaceTex)
163    : mNotifyLost(notifyLost),
164      mSurfaceTex(surfaceTex),
165      mTotalBytesQueued(0ll),
166      mLastDequeuedExtSeqNo(-1),
167      mFirstFailedAttemptUs(-1ll),
168      mRequestedRetransmission(false) {
169}
170
171TunnelRenderer::~TunnelRenderer() {
172    destroyPlayer();
173}
174
175void TunnelRenderer::queueBuffer(const sp<ABuffer> &buffer) {
176    Mutex::Autolock autoLock(mLock);
177
178    mTotalBytesQueued += buffer->size();
179
180    if (mPackets.empty()) {
181        mPackets.push_back(buffer);
182        return;
183    }
184
185    int32_t newExtendedSeqNo = buffer->int32Data();
186
187    List<sp<ABuffer> >::iterator firstIt = mPackets.begin();
188    List<sp<ABuffer> >::iterator it = --mPackets.end();
189    for (;;) {
190        int32_t extendedSeqNo = (*it)->int32Data();
191
192        if (extendedSeqNo == newExtendedSeqNo) {
193            // Duplicate packet.
194            return;
195        }
196
197        if (extendedSeqNo < newExtendedSeqNo) {
198            // Insert new packet after the one at "it".
199            mPackets.insert(++it, buffer);
200            return;
201        }
202
203        if (it == firstIt) {
204            // Insert new packet before the first existing one.
205            mPackets.insert(it, buffer);
206            return;
207        }
208
209        --it;
210    }
211}
212
213sp<ABuffer> TunnelRenderer::dequeueBuffer() {
214    Mutex::Autolock autoLock(mLock);
215
216    sp<ABuffer> buffer;
217    int32_t extSeqNo;
218    while (!mPackets.empty()) {
219        buffer = *mPackets.begin();
220        extSeqNo = buffer->int32Data();
221
222        if (mLastDequeuedExtSeqNo < 0 || extSeqNo > mLastDequeuedExtSeqNo) {
223            break;
224        }
225
226        // This is a retransmission of a packet we've already returned.
227
228        mTotalBytesQueued -= buffer->size();
229        buffer.clear();
230        extSeqNo = -1;
231
232        mPackets.erase(mPackets.begin());
233    }
234
235    if (mPackets.empty()) {
236        if (mFirstFailedAttemptUs < 0ll) {
237            mFirstFailedAttemptUs = ALooper::GetNowUs();
238            mRequestedRetransmission = false;
239        } else {
240            ALOGV("no packets available for %.2f secs",
241                    (ALooper::GetNowUs() - mFirstFailedAttemptUs) / 1E6);
242        }
243
244        return NULL;
245    }
246
247    if (mLastDequeuedExtSeqNo < 0 || extSeqNo == mLastDequeuedExtSeqNo + 1) {
248        if (mRequestedRetransmission) {
249            ALOGI("Recovered after requesting retransmission of %d",
250                  extSeqNo);
251        }
252
253        mLastDequeuedExtSeqNo = extSeqNo;
254        mFirstFailedAttemptUs = -1ll;
255        mRequestedRetransmission = false;
256
257        mPackets.erase(mPackets.begin());
258
259        mTotalBytesQueued -= buffer->size();
260
261        return buffer;
262    }
263
264    if (mFirstFailedAttemptUs < 0ll) {
265        mFirstFailedAttemptUs = ALooper::GetNowUs();
266
267        ALOGI("failed to get the correct packet the first time.");
268        return NULL;
269    }
270
271    if (mFirstFailedAttemptUs + 50000ll > ALooper::GetNowUs()) {
272        // We're willing to wait a little while to get the right packet.
273
274        if (!mRequestedRetransmission) {
275            ALOGI("requesting retransmission of seqNo %d",
276                  (mLastDequeuedExtSeqNo + 1) & 0xffff);
277
278            sp<AMessage> notify = mNotifyLost->dup();
279            notify->setInt32("seqNo", (mLastDequeuedExtSeqNo + 1) & 0xffff);
280            notify->post();
281
282            mRequestedRetransmission = true;
283        } else {
284            ALOGI("still waiting for the correct packet to arrive.");
285        }
286
287        return NULL;
288    }
289
290    ALOGI("dropping packet. extSeqNo %d didn't arrive in time",
291            mLastDequeuedExtSeqNo + 1);
292
293    // Permanent failure, we never received the packet.
294    mLastDequeuedExtSeqNo = extSeqNo;
295    mFirstFailedAttemptUs = -1ll;
296    mRequestedRetransmission = false;
297
298    mTotalBytesQueued -= buffer->size();
299
300    mPackets.erase(mPackets.begin());
301
302    return buffer;
303}
304
305void TunnelRenderer::onMessageReceived(const sp<AMessage> &msg) {
306    switch (msg->what()) {
307        case kWhatQueueBuffer:
308        {
309            sp<ABuffer> buffer;
310            CHECK(msg->findBuffer("buffer", &buffer));
311
312            queueBuffer(buffer);
313
314            if (mStreamSource == NULL) {
315                if (mTotalBytesQueued > 0ll) {
316                    initPlayer();
317                } else {
318                    ALOGI("Have %lld bytes queued...", mTotalBytesQueued);
319                }
320            } else {
321                mStreamSource->doSomeWork();
322            }
323            break;
324        }
325
326        default:
327            TRESPASS();
328    }
329}
330
331void TunnelRenderer::initPlayer() {
332    if (mSurfaceTex == NULL) {
333        mComposerClient = new SurfaceComposerClient;
334        CHECK_EQ(mComposerClient->initCheck(), (status_t)OK);
335
336        DisplayInfo info;
337        SurfaceComposerClient::getDisplayInfo(0, &info);
338        ssize_t displayWidth = info.w;
339        ssize_t displayHeight = info.h;
340
341        mSurfaceControl =
342            mComposerClient->createSurface(
343                    String8("A Surface"),
344                    displayWidth,
345                    displayHeight,
346                    PIXEL_FORMAT_RGB_565,
347                    0);
348
349        CHECK(mSurfaceControl != NULL);
350        CHECK(mSurfaceControl->isValid());
351
352        SurfaceComposerClient::openGlobalTransaction();
353        CHECK_EQ(mSurfaceControl->setLayer(INT_MAX), (status_t)OK);
354        CHECK_EQ(mSurfaceControl->show(), (status_t)OK);
355        SurfaceComposerClient::closeGlobalTransaction();
356
357        mSurface = mSurfaceControl->getSurface();
358        CHECK(mSurface != NULL);
359    }
360
361    sp<IServiceManager> sm = defaultServiceManager();
362    sp<IBinder> binder = sm->getService(String16("media.player"));
363    sp<IMediaPlayerService> service = interface_cast<IMediaPlayerService>(binder);
364    CHECK(service.get() != NULL);
365
366    mStreamSource = new StreamSource(this);
367
368    mPlayerClient = new PlayerClient;
369
370    mPlayer = service->create(getpid(), mPlayerClient, 0);
371    CHECK(mPlayer != NULL);
372    CHECK_EQ(mPlayer->setDataSource(mStreamSource), (status_t)OK);
373
374    mPlayer->setVideoSurfaceTexture(
375            mSurfaceTex != NULL ? mSurfaceTex : mSurface->getSurfaceTexture());
376
377    mPlayer->start();
378}
379
380void TunnelRenderer::destroyPlayer() {
381    mStreamSource.clear();
382
383    mPlayer->stop();
384    mPlayer.clear();
385
386    if (mSurfaceTex == NULL) {
387        mSurface.clear();
388        mSurfaceControl.clear();
389
390        mComposerClient->dispose();
391        mComposerClient.clear();
392    }
393}
394
395}  // namespace android
396
397