OMXClient.cpp revision 20111aa043c5f404472bc63b90bc5aad906b1101
1/*
2 * Copyright (C) 2009 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17//#define LOG_NDEBUG 0
18#define LOG_TAG "OMXClient"
19#include <utils/Log.h>
20
21#include <sys/socket.h>
22
23#undef NDEBUG
24#include <assert.h>
25
26#include <binder/IServiceManager.h>
27#include <media/IMediaPlayerService.h>
28#include <media/IOMX.h>
29#include <media/stagefright/OMXClient.h>
30
31namespace android {
32
33OMXClient::OMXClient()
34    : mSock(-1) {
35}
36
37OMXClient::~OMXClient() {
38    disconnect();
39}
40
41status_t OMXClient::connect() {
42    Mutex::Autolock autoLock(mLock);
43
44    if (mSock >= 0) {
45        return UNKNOWN_ERROR;
46    }
47
48    sp<IServiceManager> sm = defaultServiceManager();
49    sp<IBinder> binder = sm->getService(String16("media.player"));
50    sp<IMediaPlayerService> service = interface_cast<IMediaPlayerService>(binder);
51
52    assert(service.get() != NULL);
53
54    mOMX = service->createOMX();
55    assert(mOMX.get() != NULL);
56
57#if IOMX_USES_SOCKETS
58    status_t result = mOMX->connect(&mSock);
59    if (result != OK) {
60        mSock = -1;
61
62        mOMX = NULL;
63        return result;
64    }
65
66    pthread_attr_t attr;
67    pthread_attr_init(&attr);
68    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
69
70    int err = pthread_create(&mThread, &attr, ThreadWrapper, this);
71    assert(err == 0);
72
73    pthread_attr_destroy(&attr);
74#else
75    mReflector = new OMXClientReflector(this);
76#endif
77
78    return OK;
79}
80
81void OMXClient::disconnect() {
82    {
83        Mutex::Autolock autoLock(mLock);
84
85        if (mSock < 0) {
86            return;
87        }
88
89        assert(mObservers.isEmpty());
90    }
91
92#if IOMX_USES_SOCKETS
93    omx_message msg;
94    msg.type = omx_message::DISCONNECT;
95    ssize_t n = send(mSock, &msg, sizeof(msg), 0);
96    assert(n > 0 && static_cast<size_t>(n) == sizeof(msg));
97
98    void *dummy;
99    pthread_join(mThread, &dummy);
100#else
101    mReflector->reset();
102    mReflector.clear();
103#endif
104}
105
106#if IOMX_USES_SOCKETS
107// static
108void *OMXClient::ThreadWrapper(void *me) {
109    ((OMXClient *)me)->threadEntry();
110
111    return NULL;
112}
113
114void OMXClient::threadEntry() {
115    bool done = false;
116    while (!done) {
117        omx_message msg;
118        ssize_t n = recv(mSock, &msg, sizeof(msg), 0);
119
120        if (n <= 0) {
121            break;
122        }
123
124        done = onOMXMessage(msg);
125    }
126
127    Mutex::Autolock autoLock(mLock);
128    close(mSock);
129    mSock = -1;
130}
131#endif
132
133status_t OMXClient::fillBuffer(IOMX::node_id node, IOMX::buffer_id buffer) {
134#if !IOMX_USES_SOCKETS
135    mOMX->fill_buffer(node, buffer);
136#else
137    if (mSock < 0) {
138        return UNKNOWN_ERROR;
139    }
140
141    omx_message msg;
142    msg.type = omx_message::FILL_BUFFER;
143    msg.u.buffer_data.node = node;
144    msg.u.buffer_data.buffer = buffer;
145
146    ssize_t n = send(mSock, &msg, sizeof(msg), 0);
147    assert(n > 0 && static_cast<size_t>(n) == sizeof(msg));
148#endif
149
150    return OK;
151}
152
153status_t OMXClient::emptyBuffer(
154        IOMX::node_id node, IOMX::buffer_id buffer,
155        OMX_U32 range_offset, OMX_U32 range_length,
156        OMX_U32 flags, OMX_TICKS timestamp) {
157#if !IOMX_USES_SOCKETS
158    mOMX->empty_buffer(
159            node, buffer, range_offset, range_length, flags, timestamp);
160#else
161    if (mSock < 0) {
162        return UNKNOWN_ERROR;
163    }
164
165    // XXX I don't like all this copying...
166
167    omx_message msg;
168    msg.type = omx_message::EMPTY_BUFFER;
169    msg.u.extended_buffer_data.node = node;
170    msg.u.extended_buffer_data.buffer = buffer;
171    msg.u.extended_buffer_data.range_offset = range_offset;
172    msg.u.extended_buffer_data.range_length = range_length;
173    msg.u.extended_buffer_data.flags = flags;
174    msg.u.extended_buffer_data.timestamp = timestamp;
175
176    ssize_t n = send(mSock, &msg, sizeof(msg), 0);
177    assert(n > 0 && static_cast<size_t>(n) == sizeof(msg));
178#endif
179
180    return OK;
181}
182
183status_t OMXClient::send_command(
184        IOMX::node_id node, OMX_COMMANDTYPE cmd, OMX_S32 param) {
185#if !IOMX_USES_SOCKETS
186    return mOMX->send_command(node, cmd, param);
187#else
188    if (mSock < 0) {
189        return UNKNOWN_ERROR;
190    }
191
192    omx_message msg;
193    msg.type = omx_message::SEND_COMMAND;
194    msg.u.send_command_data.node = node;
195    msg.u.send_command_data.cmd = cmd;
196    msg.u.send_command_data.param = param;
197
198    ssize_t n = send(mSock, &msg, sizeof(msg), 0);
199    assert(n > 0 && static_cast<size_t>(n) == sizeof(msg));
200#endif
201
202    return OK;
203}
204
205status_t OMXClient::registerObserver(
206        IOMX::node_id node, OMXObserver *observer) {
207    Mutex::Autolock autoLock(&mLock);
208
209    ssize_t index = mObservers.indexOfKey(node);
210    if (index >= 0) {
211        return UNKNOWN_ERROR;
212    }
213
214    mObservers.add(node, observer);
215    observer->start();
216
217#if !IOMX_USES_SOCKETS
218    mOMX->observe_node(node, mReflector);
219#endif
220
221    return OK;
222}
223
224void OMXClient::unregisterObserver(IOMX::node_id node) {
225    Mutex::Autolock autoLock(mLock);
226
227    ssize_t index = mObservers.indexOfKey(node);
228    assert(index >= 0);
229
230    if (index < 0) {
231        return;
232    }
233
234    OMXObserver *observer = mObservers.valueAt(index);
235    observer->stop();
236    mObservers.removeItemsAt(index);
237}
238
239bool OMXClient::onOMXMessage(const omx_message &msg) {
240    bool done = false;
241
242    switch (msg.type) {
243        case omx_message::EVENT:
244        {
245            LOGV("OnEvent node:%p event:%d data1:%ld data2:%ld",
246                 msg.u.event_data.node,
247                 msg.u.event_data.event,
248                 msg.u.event_data.data1,
249                 msg.u.event_data.data2);
250
251            break;
252        }
253
254        case omx_message::FILL_BUFFER_DONE:
255        {
256            LOGV("FillBufferDone %p", msg.u.extended_buffer_data.buffer);
257            break;
258        }
259
260        case omx_message::EMPTY_BUFFER_DONE:
261        {
262            LOGV("EmptyBufferDone %p", msg.u.buffer_data.buffer);
263            break;
264        }
265
266#if IOMX_USES_SOCKETS
267        case omx_message::DISCONNECTED:
268        {
269            LOGV("Disconnected");
270            done = true;
271            break;
272        }
273#endif
274
275        default:
276            LOGE("received unknown omx_message type %d", msg.type);
277            break;
278    }
279
280    Mutex::Autolock autoLock(mLock);
281    ssize_t index = mObservers.indexOfKey(msg.u.buffer_data.node);
282
283    if (index >= 0) {
284        mObservers.editValueAt(index)->postMessage(msg);
285    }
286
287    return done;
288}
289
290////////////////////////////////////////////////////////////////////////////////
291
292OMXObserver::OMXObserver() {
293}
294
295OMXObserver::~OMXObserver() {
296}
297
298void OMXObserver::start() {
299    pthread_attr_t attr;
300    pthread_attr_init(&attr);
301    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
302
303    int err = pthread_create(&mThread, &attr, ThreadWrapper, this);
304    assert(err == 0);
305
306    pthread_attr_destroy(&attr);
307}
308
309void OMXObserver::stop() {
310    omx_message msg;
311    msg.type = omx_message::QUIT_OBSERVER;
312    postMessage(msg);
313
314    void *dummy;
315    pthread_join(mThread, &dummy);
316}
317
318void OMXObserver::postMessage(const omx_message &msg) {
319    Mutex::Autolock autoLock(mLock);
320    mQueue.push_back(msg);
321    mQueueNotEmpty.signal();
322}
323
324// static
325void *OMXObserver::ThreadWrapper(void *me) {
326    static_cast<OMXObserver *>(me)->threadEntry();
327
328    return NULL;
329}
330
331void OMXObserver::threadEntry() {
332    for (;;) {
333        omx_message msg;
334
335        {
336            Mutex::Autolock autoLock(mLock);
337            while (mQueue.empty()) {
338                mQueueNotEmpty.wait(mLock);
339            }
340
341            msg = *mQueue.begin();
342            mQueue.erase(mQueue.begin());
343        }
344
345        if (msg.type == omx_message::QUIT_OBSERVER) {
346            break;
347        }
348
349        onOMXMessage(msg);
350    }
351}
352
353////////////////////////////////////////////////////////////////////////////////
354
355OMXClientReflector::OMXClientReflector(OMXClient *client)
356    : mClient(client) {
357}
358
359void OMXClientReflector::on_message(const omx_message &msg) {
360    if (mClient != NULL) {
361        mClient->onOMXMessage(msg);
362    }
363}
364
365void OMXClientReflector::reset() {
366    mClient = NULL;
367}
368
369}  // namespace android
370