1/*
2 * Copyright (C) 2010 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "MediaSourceSplitter"
19#include <utils/Log.h>
20
21#include <media/stagefright/MediaSourceSplitter.h>
22#include <media/stagefright/MediaDebug.h>
23#include <media/stagefright/MediaBuffer.h>
24#include <media/stagefright/MetaData.h>
25
26namespace android {
27
28MediaSourceSplitter::MediaSourceSplitter(sp<MediaSource> mediaSource) {
29    mNumberOfClients = 0;
30    mSource = mediaSource;
31    mSourceStarted = false;
32
33    mNumberOfClientsStarted = 0;
34    mNumberOfCurrentReads = 0;
35    mCurrentReadBit = 0;
36    mLastReadCompleted = true;
37}
38
39MediaSourceSplitter::~MediaSourceSplitter() {
40}
41
42sp<MediaSource> MediaSourceSplitter::createClient() {
43    Mutex::Autolock autoLock(mLock);
44
45    sp<MediaSource> client = new Client(this, mNumberOfClients++);
46    mClientsStarted.push(false);
47    mClientsDesiredReadBit.push(0);
48    return client;
49}
50
51status_t MediaSourceSplitter::start(int clientId, MetaData *params) {
52    Mutex::Autolock autoLock(mLock);
53
54    LOGV("start client (%d)", clientId);
55    if (mClientsStarted[clientId]) {
56        return OK;
57    }
58
59    mNumberOfClientsStarted++;
60
61    if (!mSourceStarted) {
62        LOGV("Starting real source from client (%d)", clientId);
63        status_t err = mSource->start(params);
64
65        if (err == OK) {
66            mSourceStarted = true;
67            mClientsStarted.editItemAt(clientId) = true;
68            mClientsDesiredReadBit.editItemAt(clientId) = !mCurrentReadBit;
69        }
70
71        return err;
72    } else {
73        mClientsStarted.editItemAt(clientId) = true;
74        if (mLastReadCompleted) {
75            // Last read was completed. So join in the threads for the next read.
76            mClientsDesiredReadBit.editItemAt(clientId) = !mCurrentReadBit;
77        } else {
78            // Last read is ongoing. So join in the threads for the current read.
79            mClientsDesiredReadBit.editItemAt(clientId) = mCurrentReadBit;
80        }
81        return OK;
82    }
83}
84
85status_t MediaSourceSplitter::stop(int clientId) {
86    Mutex::Autolock autoLock(mLock);
87
88    LOGV("stop client (%d)", clientId);
89    CHECK(clientId >= 0 && clientId < mNumberOfClients);
90    CHECK(mClientsStarted[clientId]);
91
92    if (--mNumberOfClientsStarted == 0) {
93        LOGV("Stopping real source from client (%d)", clientId);
94        status_t err = mSource->stop();
95        mSourceStarted = false;
96        mClientsStarted.editItemAt(clientId) = false;
97        return err;
98    } else {
99        mClientsStarted.editItemAt(clientId) = false;
100        if (!mLastReadCompleted && (mClientsDesiredReadBit[clientId] == mCurrentReadBit)) {
101            // !mLastReadCompleted implies that buffer has been read from source, but all
102            // clients haven't read it.
103            // mClientsDesiredReadBit[clientId] == mCurrentReadBit implies that this
104            // client would have wanted to read from this buffer. (i.e. it has not yet
105            // called read() for the current read buffer.)
106            // Since other threads may be waiting for all the clients' reads to complete,
107            // signal that this read has been aborted.
108            signalReadComplete_lock(true);
109        }
110        return OK;
111    }
112}
113
114sp<MetaData> MediaSourceSplitter::getFormat(int clientId) {
115    Mutex::Autolock autoLock(mLock);
116
117    LOGV("getFormat client (%d)", clientId);
118    return mSource->getFormat();
119}
120
121status_t MediaSourceSplitter::read(int clientId,
122        MediaBuffer **buffer, const MediaSource::ReadOptions *options) {
123    Mutex::Autolock autoLock(mLock);
124
125    CHECK(clientId >= 0 && clientId < mNumberOfClients);
126
127    LOGV("read client (%d)", clientId);
128    *buffer = NULL;
129
130    if (!mClientsStarted[clientId]) {
131        return OK;
132    }
133
134    if (mCurrentReadBit != mClientsDesiredReadBit[clientId]) {
135        // Desired buffer has not been read from source yet.
136
137        // If the current client is the special client with clientId = 0
138        // then read from source, else wait until the client 0 has finished
139        // reading from source.
140        if (clientId == 0) {
141            // Wait for all client's last read to complete first so as to not
142            // corrupt the buffer at mLastReadMediaBuffer.
143            waitForAllClientsLastRead_lock(clientId);
144
145            readFromSource_lock(options);
146            *buffer = mLastReadMediaBuffer;
147        } else {
148            waitForReadFromSource_lock(clientId);
149
150            *buffer = mLastReadMediaBuffer;
151            (*buffer)->add_ref();
152        }
153        CHECK(mCurrentReadBit == mClientsDesiredReadBit[clientId]);
154    } else {
155        // Desired buffer has already been read from source. Use the cached data.
156        CHECK(clientId != 0);
157
158        *buffer = mLastReadMediaBuffer;
159        (*buffer)->add_ref();
160    }
161
162    mClientsDesiredReadBit.editItemAt(clientId) = !mClientsDesiredReadBit[clientId];
163    signalReadComplete_lock(false);
164
165    return mLastReadStatus;
166}
167
168void MediaSourceSplitter::readFromSource_lock(const MediaSource::ReadOptions *options) {
169    mLastReadStatus = mSource->read(&mLastReadMediaBuffer , options);
170
171    mCurrentReadBit = !mCurrentReadBit;
172    mLastReadCompleted = false;
173    mReadFromSourceCondition.broadcast();
174}
175
176void MediaSourceSplitter::waitForReadFromSource_lock(int32_t clientId) {
177    mReadFromSourceCondition.wait(mLock);
178}
179
180void MediaSourceSplitter::waitForAllClientsLastRead_lock(int32_t clientId) {
181    if (mLastReadCompleted) {
182        return;
183    }
184    mAllReadsCompleteCondition.wait(mLock);
185    CHECK(mLastReadCompleted);
186}
187
188void MediaSourceSplitter::signalReadComplete_lock(bool readAborted) {
189    if (!readAborted) {
190        mNumberOfCurrentReads++;
191    }
192
193    if (mNumberOfCurrentReads == mNumberOfClientsStarted) {
194        mLastReadCompleted = true;
195        mNumberOfCurrentReads = 0;
196        mAllReadsCompleteCondition.broadcast();
197    }
198}
199
200status_t MediaSourceSplitter::pause(int clientId) {
201    return ERROR_UNSUPPORTED;
202}
203
204// Client
205
206MediaSourceSplitter::Client::Client(
207        sp<MediaSourceSplitter> splitter,
208        int32_t clientId) {
209    mSplitter = splitter;
210    mClientId = clientId;
211}
212
213status_t MediaSourceSplitter::Client::start(MetaData *params) {
214    return mSplitter->start(mClientId, params);
215}
216
217status_t MediaSourceSplitter::Client::stop() {
218    return mSplitter->stop(mClientId);
219}
220
221sp<MetaData> MediaSourceSplitter::Client::getFormat() {
222    return mSplitter->getFormat(mClientId);
223}
224
225status_t MediaSourceSplitter::Client::read(
226        MediaBuffer **buffer, const ReadOptions *options) {
227    return mSplitter->read(mClientId, buffer, options);
228}
229
230status_t MediaSourceSplitter::Client::pause() {
231    return mSplitter->pause(mClientId);
232}
233
234}  // namespace android
235