1/*
2 * Copyright 2012, The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "MediaPuller"
19#include <utils/Log.h>
20
21#include "MediaPuller.h"
22
23#include <media/stagefright/foundation/ABuffer.h>
24#include <media/stagefright/foundation/ADebug.h>
25#include <media/stagefright/foundation/AMessage.h>
26#include <media/stagefright/MediaBuffer.h>
27#include <media/stagefright/MediaSource.h>
28#include <media/stagefright/MetaData.h>
29
30namespace android {
31
32MediaPuller::MediaPuller(
33        const sp<MediaSource> &source, const sp<AMessage> &notify)
34    : mSource(source),
35      mNotify(notify),
36      mPullGeneration(0),
37      mIsAudio(false) {
38    sp<MetaData> meta = source->getFormat();
39    const char *mime;
40    CHECK(meta->findCString(kKeyMIMEType, &mime));
41
42    mIsAudio = !strncasecmp(mime, "audio/", 6);
43}
44
45MediaPuller::~MediaPuller() {
46}
47
48status_t MediaPuller::postSynchronouslyAndReturnError(
49        const sp<AMessage> &msg) {
50    sp<AMessage> response;
51    status_t err = msg->postAndAwaitResponse(&response);
52
53    if (err != OK) {
54        return err;
55    }
56
57    if (!response->findInt32("err", &err)) {
58        err = OK;
59    }
60
61    return err;
62}
63
64status_t MediaPuller::start() {
65    return postSynchronouslyAndReturnError(new AMessage(kWhatStart, id()));
66}
67
68void MediaPuller::stopAsync(const sp<AMessage> &notify) {
69    sp<AMessage> msg = new AMessage(kWhatStop, id());
70    msg->setMessage("notify", notify);
71    msg->post();
72}
73
74void MediaPuller::onMessageReceived(const sp<AMessage> &msg) {
75    switch (msg->what()) {
76        case kWhatStart:
77        {
78            status_t err;
79            if (mIsAudio) {
80                // This atrocity causes AudioSource to deliver absolute
81                // systemTime() based timestamps (off by 1 us).
82                sp<MetaData> params = new MetaData;
83                params->setInt64(kKeyTime, 1ll);
84                err = mSource->start(params.get());
85            } else {
86                err = mSource->start();
87            }
88
89            if (err == OK) {
90                schedulePull();
91            }
92
93            sp<AMessage> response = new AMessage;
94            response->setInt32("err", err);
95
96            uint32_t replyID;
97            CHECK(msg->senderAwaitsResponse(&replyID));
98
99            response->postReply(replyID);
100            break;
101        }
102
103        case kWhatStop:
104        {
105            sp<MetaData> meta = mSource->getFormat();
106            const char *tmp;
107            CHECK(meta->findCString(kKeyMIMEType, &tmp));
108            AString mime = tmp;
109
110            ALOGI("MediaPuller(%s) stopping.", mime.c_str());
111            mSource->stop();
112            ALOGI("MediaPuller(%s) stopped.", mime.c_str());
113            ++mPullGeneration;
114
115            sp<AMessage> notify;
116            CHECK(msg->findMessage("notify", &notify));
117            notify->post();
118            break;
119        }
120
121        case kWhatPull:
122        {
123            int32_t generation;
124            CHECK(msg->findInt32("generation", &generation));
125
126            if (generation != mPullGeneration) {
127                break;
128            }
129
130            MediaBuffer *mbuf;
131            status_t err = mSource->read(&mbuf);
132
133            if (err != OK) {
134                if (err == ERROR_END_OF_STREAM) {
135                    ALOGI("stream ended.");
136                } else {
137                    ALOGE("error %d reading stream.", err);
138                }
139
140                sp<AMessage> notify = mNotify->dup();
141                notify->setInt32("what", kWhatEOS);
142                notify->post();
143            } else {
144                int64_t timeUs;
145                CHECK(mbuf->meta_data()->findInt64(kKeyTime, &timeUs));
146
147                sp<ABuffer> accessUnit = new ABuffer(mbuf->range_length());
148
149                memcpy(accessUnit->data(),
150                       (const uint8_t *)mbuf->data() + mbuf->range_offset(),
151                       mbuf->range_length());
152
153                accessUnit->meta()->setInt64("timeUs", timeUs);
154
155                if (mIsAudio) {
156                    mbuf->release();
157                    mbuf = NULL;
158                } else {
159                    // video encoder will release MediaBuffer when done
160                    // with underlying data.
161                    accessUnit->meta()->setPointer("mediaBuffer", mbuf);
162                }
163
164                sp<AMessage> notify = mNotify->dup();
165
166                notify->setInt32("what", kWhatAccessUnit);
167                notify->setBuffer("accessUnit", accessUnit);
168                notify->post();
169
170                if (mbuf != NULL) {
171                    ALOGV("posted mbuf %p", mbuf);
172                }
173
174                schedulePull();
175            }
176            break;
177        }
178
179        default:
180            TRESPASS();
181    }
182}
183
184void MediaPuller::schedulePull() {
185    sp<AMessage> msg = new AMessage(kWhatPull, id());
186    msg->setInt32("generation", mPullGeneration);
187    msg->post();
188}
189
190}  // namespace android
191
192