1/*
2 * Copyright (C) 2016 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *      http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "StreamHalHidl"
18//#define LOG_NDEBUG 0
19
20#include <android/hardware/audio/2.0/IStreamOutCallback.h>
21#include <hwbinder/IPCThreadState.h>
22#include <mediautils/SchedulingPolicyService.h>
23#include <utils/Log.h>
24
25#include "DeviceHalHidl.h"
26#include "EffectHalHidl.h"
27#include "StreamHalHidl.h"
28
29using ::android::hardware::audio::common::V2_0::AudioChannelMask;
30using ::android::hardware::audio::common::V2_0::AudioFormat;
31using ::android::hardware::audio::common::V2_0::ThreadInfo;
32using ::android::hardware::audio::V2_0::AudioDrain;
33using ::android::hardware::audio::V2_0::IStreamOutCallback;
34using ::android::hardware::audio::V2_0::MessageQueueFlagBits;
35using ::android::hardware::audio::V2_0::MmapBufferInfo;
36using ::android::hardware::audio::V2_0::MmapPosition;
37using ::android::hardware::audio::V2_0::ParameterValue;
38using ::android::hardware::audio::V2_0::Result;
39using ::android::hardware::audio::V2_0::TimeSpec;
40using ::android::hardware::MQDescriptorSync;
41using ::android::hardware::Return;
42using ::android::hardware::Void;
43using ReadCommand = ::android::hardware::audio::V2_0::IStreamIn::ReadCommand;
44
45namespace android {
46
47StreamHalHidl::StreamHalHidl(IStream *stream)
48        : ConversionHelperHidl("Stream"),
49          mStream(stream),
50          mHalThreadPriority(HAL_THREAD_PRIORITY_DEFAULT),
51          mCachedBufferSize(0){
52
53    // Instrument audio signal power logging.
54    // Note: This assumes channel mask, format, and sample rate do not change after creation.
55    if (mStream != nullptr && mStreamPowerLog.isUserDebugOrEngBuild()) {
56        // Obtain audio properties (see StreamHalHidl::getAudioProperties() below).
57        Return<void> ret = mStream->getAudioProperties(
58                [&](uint32_t sr, AudioChannelMask m, AudioFormat f) {
59                mStreamPowerLog.init(sr,
60                        static_cast<audio_channel_mask_t>(m),
61                        static_cast<audio_format_t>(f));
62            });
63    }
64}
65
66StreamHalHidl::~StreamHalHidl() {
67    mStream = nullptr;
68}
69
70status_t StreamHalHidl::getSampleRate(uint32_t *rate) {
71    if (!mStream) return NO_INIT;
72    return processReturn("getSampleRate", mStream->getSampleRate(), rate);
73}
74
75status_t StreamHalHidl::getBufferSize(size_t *size) {
76    if (!mStream) return NO_INIT;
77    status_t status = processReturn("getBufferSize", mStream->getBufferSize(), size);
78    if (status == OK) {
79        mCachedBufferSize = *size;
80    }
81    return status;
82}
83
84status_t StreamHalHidl::getChannelMask(audio_channel_mask_t *mask) {
85    if (!mStream) return NO_INIT;
86    return processReturn("getChannelMask", mStream->getChannelMask(), mask);
87}
88
89status_t StreamHalHidl::getFormat(audio_format_t *format) {
90    if (!mStream) return NO_INIT;
91    return processReturn("getFormat", mStream->getFormat(), format);
92}
93
94status_t StreamHalHidl::getAudioProperties(
95        uint32_t *sampleRate, audio_channel_mask_t *mask, audio_format_t *format) {
96    if (!mStream) return NO_INIT;
97    Return<void> ret = mStream->getAudioProperties(
98            [&](uint32_t sr, AudioChannelMask m, AudioFormat f) {
99                *sampleRate = sr;
100                *mask = static_cast<audio_channel_mask_t>(m);
101                *format = static_cast<audio_format_t>(f);
102            });
103    return processReturn("getAudioProperties", ret);
104}
105
106status_t StreamHalHidl::setParameters(const String8& kvPairs) {
107    if (!mStream) return NO_INIT;
108    hidl_vec<ParameterValue> hidlParams;
109    status_t status = parametersFromHal(kvPairs, &hidlParams);
110    if (status != OK) return status;
111    return processReturn("setParameters", mStream->setParameters(hidlParams));
112}
113
114status_t StreamHalHidl::getParameters(const String8& keys, String8 *values) {
115    values->clear();
116    if (!mStream) return NO_INIT;
117    hidl_vec<hidl_string> hidlKeys;
118    status_t status = keysFromHal(keys, &hidlKeys);
119    if (status != OK) return status;
120    Result retval;
121    Return<void> ret = mStream->getParameters(
122            hidlKeys,
123            [&](Result r, const hidl_vec<ParameterValue>& parameters) {
124                retval = r;
125                if (retval == Result::OK) {
126                    parametersToHal(parameters, values);
127                }
128            });
129    return processReturn("getParameters", ret, retval);
130}
131
132status_t StreamHalHidl::addEffect(sp<EffectHalInterface> effect) {
133    if (!mStream) return NO_INIT;
134    return processReturn("addEffect", mStream->addEffect(
135                    static_cast<EffectHalHidl*>(effect.get())->effectId()));
136}
137
138status_t StreamHalHidl::removeEffect(sp<EffectHalInterface> effect) {
139    if (!mStream) return NO_INIT;
140    return processReturn("removeEffect", mStream->removeEffect(
141                    static_cast<EffectHalHidl*>(effect.get())->effectId()));
142}
143
144status_t StreamHalHidl::standby() {
145    if (!mStream) return NO_INIT;
146    return processReturn("standby", mStream->standby());
147}
148
149status_t StreamHalHidl::dump(int fd) {
150    if (!mStream) return NO_INIT;
151    native_handle_t* hidlHandle = native_handle_create(1, 0);
152    hidlHandle->data[0] = fd;
153    Return<void> ret = mStream->debugDump(hidlHandle);
154    native_handle_delete(hidlHandle);
155    mStreamPowerLog.dump(fd);
156    return processReturn("dump", ret);
157}
158
159status_t StreamHalHidl::start() {
160    if (!mStream) return NO_INIT;
161    return processReturn("start", mStream->start());
162}
163
164status_t StreamHalHidl::stop() {
165    if (!mStream) return NO_INIT;
166    return processReturn("stop", mStream->stop());
167}
168
169status_t StreamHalHidl::createMmapBuffer(int32_t minSizeFrames,
170                                  struct audio_mmap_buffer_info *info) {
171    Result retval;
172    Return<void> ret = mStream->createMmapBuffer(
173            minSizeFrames,
174            [&](Result r, const MmapBufferInfo& hidlInfo) {
175                retval = r;
176                if (retval == Result::OK) {
177                    const native_handle *handle = hidlInfo.sharedMemory.handle();
178                    if (handle->numFds > 0) {
179                        info->shared_memory_fd = handle->data[0];
180                        info->buffer_size_frames = hidlInfo.bufferSizeFrames;
181                        info->burst_size_frames = hidlInfo.burstSizeFrames;
182                        // info->shared_memory_address is not needed in HIDL context
183                        info->shared_memory_address = NULL;
184                    } else {
185                        retval = Result::NOT_INITIALIZED;
186                    }
187                }
188            });
189    return processReturn("createMmapBuffer", ret, retval);
190}
191
192status_t StreamHalHidl::getMmapPosition(struct audio_mmap_position *position) {
193    Result retval;
194    Return<void> ret = mStream->getMmapPosition(
195            [&](Result r, const MmapPosition& hidlPosition) {
196                retval = r;
197                if (retval == Result::OK) {
198                    position->time_nanoseconds = hidlPosition.timeNanoseconds;
199                    position->position_frames = hidlPosition.positionFrames;
200                }
201            });
202    return processReturn("getMmapPosition", ret, retval);
203}
204
205status_t StreamHalHidl::setHalThreadPriority(int priority) {
206    mHalThreadPriority = priority;
207    return OK;
208}
209
210status_t StreamHalHidl::getCachedBufferSize(size_t *size) {
211    if (mCachedBufferSize != 0) {
212        *size = mCachedBufferSize;
213        return OK;
214    }
215    return getBufferSize(size);
216}
217
218bool StreamHalHidl::requestHalThreadPriority(pid_t threadPid, pid_t threadId) {
219    if (mHalThreadPriority == HAL_THREAD_PRIORITY_DEFAULT) {
220        return true;
221    }
222    int err = requestPriority(
223            threadPid, threadId,
224            mHalThreadPriority, false /*isForApp*/, true /*asynchronous*/);
225    ALOGE_IF(err, "failed to set priority %d for pid %d tid %d; error %d",
226            mHalThreadPriority, threadPid, threadId, err);
227    // Audio will still work, but latency will be higher and sometimes unacceptable.
228    return err == 0;
229}
230
231namespace {
232
233/* Notes on callback ownership.
234
235This is how (Hw)Binder ownership model looks like. The server implementation
236is owned by Binder framework (via sp<>). Proxies are owned by clients.
237When the last proxy disappears, Binder framework releases the server impl.
238
239Thus, it is not needed to keep any references to StreamOutCallback (this is
240the server impl) -- it will live as long as HAL server holds a strong ref to
241IStreamOutCallback proxy. We clear that reference by calling 'clearCallback'
242from the destructor of StreamOutHalHidl.
243
244The callback only keeps a weak reference to the stream. The stream is owned
245by AudioFlinger.
246
247*/
248
249struct StreamOutCallback : public IStreamOutCallback {
250    StreamOutCallback(const wp<StreamOutHalHidl>& stream) : mStream(stream) {}
251
252    // IStreamOutCallback implementation
253    Return<void> onWriteReady()  override {
254        sp<StreamOutHalHidl> stream = mStream.promote();
255        if (stream != 0) {
256            stream->onWriteReady();
257        }
258        return Void();
259    }
260
261    Return<void> onDrainReady()  override {
262        sp<StreamOutHalHidl> stream = mStream.promote();
263        if (stream != 0) {
264            stream->onDrainReady();
265        }
266        return Void();
267    }
268
269    Return<void> onError()  override {
270        sp<StreamOutHalHidl> stream = mStream.promote();
271        if (stream != 0) {
272            stream->onError();
273        }
274        return Void();
275    }
276
277  private:
278    wp<StreamOutHalHidl> mStream;
279};
280
281}  // namespace
282
283StreamOutHalHidl::StreamOutHalHidl(const sp<IStreamOut>& stream)
284        : StreamHalHidl(stream.get()), mStream(stream), mWriterClient(0), mEfGroup(nullptr) {
285}
286
287StreamOutHalHidl::~StreamOutHalHidl() {
288    if (mStream != 0) {
289        if (mCallback.unsafe_get()) {
290            processReturn("clearCallback", mStream->clearCallback());
291        }
292        processReturn("close", mStream->close());
293        mStream.clear();
294    }
295    mCallback.clear();
296    hardware::IPCThreadState::self()->flushCommands();
297    if (mEfGroup) {
298        EventFlag::deleteEventFlag(&mEfGroup);
299    }
300}
301
302status_t StreamOutHalHidl::getFrameSize(size_t *size) {
303    if (mStream == 0) return NO_INIT;
304    return processReturn("getFrameSize", mStream->getFrameSize(), size);
305}
306
307status_t StreamOutHalHidl::getLatency(uint32_t *latency) {
308    if (mStream == 0) return NO_INIT;
309    if (mWriterClient == gettid() && mCommandMQ) {
310        return callWriterThread(
311                WriteCommand::GET_LATENCY, "getLatency", nullptr, 0,
312                [&](const WriteStatus& writeStatus) {
313                    *latency = writeStatus.reply.latencyMs;
314                });
315    } else {
316        return processReturn("getLatency", mStream->getLatency(), latency);
317    }
318}
319
320status_t StreamOutHalHidl::setVolume(float left, float right) {
321    if (mStream == 0) return NO_INIT;
322    return processReturn("setVolume", mStream->setVolume(left, right));
323}
324
325status_t StreamOutHalHidl::write(const void *buffer, size_t bytes, size_t *written) {
326    if (mStream == 0) return NO_INIT;
327    *written = 0;
328
329    if (bytes == 0 && !mDataMQ) {
330        // Can't determine the size for the MQ buffer. Wait for a non-empty write request.
331        ALOGW_IF(mCallback.unsafe_get(), "First call to async write with 0 bytes");
332        return OK;
333    }
334
335    status_t status;
336    if (!mDataMQ) {
337        // In case if playback starts close to the end of a compressed track, the bytes
338        // that need to be written is less than the actual buffer size. Need to use
339        // full buffer size for the MQ since otherwise after seeking back to the middle
340        // data will be truncated.
341        size_t bufferSize;
342        if ((status = getCachedBufferSize(&bufferSize)) != OK) {
343            return status;
344        }
345        if (bytes > bufferSize) bufferSize = bytes;
346        if ((status = prepareForWriting(bufferSize)) != OK) {
347            return status;
348        }
349    }
350
351    status = callWriterThread(
352            WriteCommand::WRITE, "write", static_cast<const uint8_t*>(buffer), bytes,
353            [&] (const WriteStatus& writeStatus) {
354                *written = writeStatus.reply.written;
355                // Diagnostics of the cause of b/35813113.
356                ALOGE_IF(*written > bytes,
357                        "hal reports more bytes written than asked for: %lld > %lld",
358                        (long long)*written, (long long)bytes);
359            });
360    mStreamPowerLog.log(buffer, *written);
361    return status;
362}
363
364status_t StreamOutHalHidl::callWriterThread(
365        WriteCommand cmd, const char* cmdName,
366        const uint8_t* data, size_t dataSize, StreamOutHalHidl::WriterCallback callback) {
367    if (!mCommandMQ->write(&cmd)) {
368        ALOGE("command message queue write failed for \"%s\"", cmdName);
369        return -EAGAIN;
370    }
371    if (data != nullptr) {
372        size_t availableToWrite = mDataMQ->availableToWrite();
373        if (dataSize > availableToWrite) {
374            ALOGW("truncating write data from %lld to %lld due to insufficient data queue space",
375                    (long long)dataSize, (long long)availableToWrite);
376            dataSize = availableToWrite;
377        }
378        if (!mDataMQ->write(data, dataSize)) {
379            ALOGE("data message queue write failed for \"%s\"", cmdName);
380        }
381    }
382    mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
383
384    // TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
385    uint32_t efState = 0;
386retry:
387    status_t ret = mEfGroup->wait(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL), &efState);
388    if (efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL)) {
389        WriteStatus writeStatus;
390        writeStatus.retval = Result::NOT_INITIALIZED;
391        if (!mStatusMQ->read(&writeStatus)) {
392            ALOGE("status message read failed for \"%s\"", cmdName);
393        }
394        if (writeStatus.retval == Result::OK) {
395            ret = OK;
396            callback(writeStatus);
397        } else {
398            ret = processReturn(cmdName, writeStatus.retval);
399        }
400        return ret;
401    }
402    if (ret == -EAGAIN || ret == -EINTR) {
403        // Spurious wakeup. This normally retries no more than once.
404        goto retry;
405    }
406    return ret;
407}
408
409status_t StreamOutHalHidl::prepareForWriting(size_t bufferSize) {
410    std::unique_ptr<CommandMQ> tempCommandMQ;
411    std::unique_ptr<DataMQ> tempDataMQ;
412    std::unique_ptr<StatusMQ> tempStatusMQ;
413    Result retval;
414    pid_t halThreadPid, halThreadTid;
415    Return<void> ret = mStream->prepareForWriting(
416            1, bufferSize,
417            [&](Result r,
418                    const CommandMQ::Descriptor& commandMQ,
419                    const DataMQ::Descriptor& dataMQ,
420                    const StatusMQ::Descriptor& statusMQ,
421                    const ThreadInfo& halThreadInfo) {
422                retval = r;
423                if (retval == Result::OK) {
424                    tempCommandMQ.reset(new CommandMQ(commandMQ));
425                    tempDataMQ.reset(new DataMQ(dataMQ));
426                    tempStatusMQ.reset(new StatusMQ(statusMQ));
427                    if (tempDataMQ->isValid() && tempDataMQ->getEventFlagWord()) {
428                        EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(), &mEfGroup);
429                    }
430                    halThreadPid = halThreadInfo.pid;
431                    halThreadTid = halThreadInfo.tid;
432                }
433            });
434    if (!ret.isOk() || retval != Result::OK) {
435        return processReturn("prepareForWriting", ret, retval);
436    }
437    if (!tempCommandMQ || !tempCommandMQ->isValid() ||
438            !tempDataMQ || !tempDataMQ->isValid() ||
439            !tempStatusMQ || !tempStatusMQ->isValid() ||
440            !mEfGroup) {
441        ALOGE_IF(!tempCommandMQ, "Failed to obtain command message queue for writing");
442        ALOGE_IF(tempCommandMQ && !tempCommandMQ->isValid(),
443                "Command message queue for writing is invalid");
444        ALOGE_IF(!tempDataMQ, "Failed to obtain data message queue for writing");
445        ALOGE_IF(tempDataMQ && !tempDataMQ->isValid(), "Data message queue for writing is invalid");
446        ALOGE_IF(!tempStatusMQ, "Failed to obtain status message queue for writing");
447        ALOGE_IF(tempStatusMQ && !tempStatusMQ->isValid(),
448                "Status message queue for writing is invalid");
449        ALOGE_IF(!mEfGroup, "Event flag creation for writing failed");
450        return NO_INIT;
451    }
452    requestHalThreadPriority(halThreadPid, halThreadTid);
453
454    mCommandMQ = std::move(tempCommandMQ);
455    mDataMQ = std::move(tempDataMQ);
456    mStatusMQ = std::move(tempStatusMQ);
457    mWriterClient = gettid();
458    return OK;
459}
460
461status_t StreamOutHalHidl::getRenderPosition(uint32_t *dspFrames) {
462    if (mStream == 0) return NO_INIT;
463    Result retval;
464    Return<void> ret = mStream->getRenderPosition(
465            [&](Result r, uint32_t d) {
466                retval = r;
467                if (retval == Result::OK) {
468                    *dspFrames = d;
469                }
470            });
471    return processReturn("getRenderPosition", ret, retval);
472}
473
474status_t StreamOutHalHidl::getNextWriteTimestamp(int64_t *timestamp) {
475    if (mStream == 0) return NO_INIT;
476    Result retval;
477    Return<void> ret = mStream->getNextWriteTimestamp(
478            [&](Result r, int64_t t) {
479                retval = r;
480                if (retval == Result::OK) {
481                    *timestamp = t;
482                }
483            });
484    return processReturn("getRenderPosition", ret, retval);
485}
486
487status_t StreamOutHalHidl::setCallback(wp<StreamOutHalInterfaceCallback> callback) {
488    if (mStream == 0) return NO_INIT;
489    status_t status = processReturn(
490            "setCallback", mStream->setCallback(new StreamOutCallback(this)));
491    if (status == OK) {
492        mCallback = callback;
493    }
494    return status;
495}
496
497status_t StreamOutHalHidl::supportsPauseAndResume(bool *supportsPause, bool *supportsResume) {
498    if (mStream == 0) return NO_INIT;
499    Return<void> ret = mStream->supportsPauseAndResume(
500            [&](bool p, bool r) {
501                *supportsPause = p;
502                *supportsResume = r;
503            });
504    return processReturn("supportsPauseAndResume", ret);
505}
506
507status_t StreamOutHalHidl::pause() {
508    if (mStream == 0) return NO_INIT;
509    return processReturn("pause", mStream->pause());
510}
511
512status_t StreamOutHalHidl::resume() {
513    if (mStream == 0) return NO_INIT;
514    return processReturn("pause", mStream->resume());
515}
516
517status_t StreamOutHalHidl::supportsDrain(bool *supportsDrain) {
518    if (mStream == 0) return NO_INIT;
519    return processReturn("supportsDrain", mStream->supportsDrain(), supportsDrain);
520}
521
522status_t StreamOutHalHidl::drain(bool earlyNotify) {
523    if (mStream == 0) return NO_INIT;
524    return processReturn(
525            "drain", mStream->drain(earlyNotify ? AudioDrain::EARLY_NOTIFY : AudioDrain::ALL));
526}
527
528status_t StreamOutHalHidl::flush() {
529    if (mStream == 0) return NO_INIT;
530    return processReturn("pause", mStream->flush());
531}
532
533status_t StreamOutHalHidl::getPresentationPosition(uint64_t *frames, struct timespec *timestamp) {
534    if (mStream == 0) return NO_INIT;
535    if (mWriterClient == gettid() && mCommandMQ) {
536        return callWriterThread(
537                WriteCommand::GET_PRESENTATION_POSITION, "getPresentationPosition", nullptr, 0,
538                [&](const WriteStatus& writeStatus) {
539                    *frames = writeStatus.reply.presentationPosition.frames;
540                    timestamp->tv_sec = writeStatus.reply.presentationPosition.timeStamp.tvSec;
541                    timestamp->tv_nsec = writeStatus.reply.presentationPosition.timeStamp.tvNSec;
542                });
543    } else {
544        Result retval;
545        Return<void> ret = mStream->getPresentationPosition(
546                [&](Result r, uint64_t hidlFrames, const TimeSpec& hidlTimeStamp) {
547                    retval = r;
548                    if (retval == Result::OK) {
549                        *frames = hidlFrames;
550                        timestamp->tv_sec = hidlTimeStamp.tvSec;
551                        timestamp->tv_nsec = hidlTimeStamp.tvNSec;
552                    }
553                });
554        return processReturn("getPresentationPosition", ret, retval);
555    }
556}
557
558status_t StreamOutHalHidl::updateSourceMetadata(const SourceMetadata& /* sourceMetadata */) {
559    // Audio HAL V2.0 does not support propagating source metadata
560    return INVALID_OPERATION;
561}
562
563void StreamOutHalHidl::onWriteReady() {
564    sp<StreamOutHalInterfaceCallback> callback = mCallback.promote();
565    if (callback == 0) return;
566    ALOGV("asyncCallback onWriteReady");
567    callback->onWriteReady();
568}
569
570void StreamOutHalHidl::onDrainReady() {
571    sp<StreamOutHalInterfaceCallback> callback = mCallback.promote();
572    if (callback == 0) return;
573    ALOGV("asyncCallback onDrainReady");
574    callback->onDrainReady();
575}
576
577void StreamOutHalHidl::onError() {
578    sp<StreamOutHalInterfaceCallback> callback = mCallback.promote();
579    if (callback == 0) return;
580    ALOGV("asyncCallback onError");
581    callback->onError();
582}
583
584
585StreamInHalHidl::StreamInHalHidl(const sp<IStreamIn>& stream)
586        : StreamHalHidl(stream.get()), mStream(stream), mReaderClient(0), mEfGroup(nullptr) {
587}
588
589StreamInHalHidl::~StreamInHalHidl() {
590    if (mStream != 0) {
591        processReturn("close", mStream->close());
592        mStream.clear();
593        hardware::IPCThreadState::self()->flushCommands();
594    }
595    if (mEfGroup) {
596        EventFlag::deleteEventFlag(&mEfGroup);
597    }
598}
599
600status_t StreamInHalHidl::getFrameSize(size_t *size) {
601    if (mStream == 0) return NO_INIT;
602    return processReturn("getFrameSize", mStream->getFrameSize(), size);
603}
604
605status_t StreamInHalHidl::setGain(float gain) {
606    if (mStream == 0) return NO_INIT;
607    return processReturn("setGain", mStream->setGain(gain));
608}
609
610status_t StreamInHalHidl::read(void *buffer, size_t bytes, size_t *read) {
611    if (mStream == 0) return NO_INIT;
612    *read = 0;
613
614    if (bytes == 0 && !mDataMQ) {
615        // Can't determine the size for the MQ buffer. Wait for a non-empty read request.
616        return OK;
617    }
618
619    status_t status;
620    if (!mDataMQ && (status = prepareForReading(bytes)) != OK) {
621        return status;
622    }
623
624    ReadParameters params;
625    params.command = ReadCommand::READ;
626    params.params.read = bytes;
627    status = callReaderThread(params, "read",
628            [&](const ReadStatus& readStatus) {
629                const size_t availToRead = mDataMQ->availableToRead();
630                if (!mDataMQ->read(static_cast<uint8_t*>(buffer), std::min(bytes, availToRead))) {
631                    ALOGE("data message queue read failed for \"read\"");
632                }
633                ALOGW_IF(availToRead != readStatus.reply.read,
634                        "HAL read report inconsistent: mq = %d, status = %d",
635                        (int32_t)availToRead, (int32_t)readStatus.reply.read);
636                *read = readStatus.reply.read;
637            });
638    mStreamPowerLog.log(buffer, *read);
639    return status;
640}
641
642status_t StreamInHalHidl::callReaderThread(
643        const ReadParameters& params, const char* cmdName,
644        StreamInHalHidl::ReaderCallback callback) {
645    if (!mCommandMQ->write(&params)) {
646        ALOGW("command message queue write failed");
647        return -EAGAIN;
648    }
649    mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
650
651    // TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
652    uint32_t efState = 0;
653retry:
654    status_t ret = mEfGroup->wait(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY), &efState);
655    if (efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY)) {
656        ReadStatus readStatus;
657        readStatus.retval = Result::NOT_INITIALIZED;
658        if (!mStatusMQ->read(&readStatus)) {
659            ALOGE("status message read failed for \"%s\"", cmdName);
660        }
661         if (readStatus.retval == Result::OK) {
662            ret = OK;
663            callback(readStatus);
664        } else {
665            ret = processReturn(cmdName, readStatus.retval);
666        }
667        return ret;
668    }
669    if (ret == -EAGAIN || ret == -EINTR) {
670        // Spurious wakeup. This normally retries no more than once.
671        goto retry;
672    }
673    return ret;
674}
675
676status_t StreamInHalHidl::prepareForReading(size_t bufferSize) {
677    std::unique_ptr<CommandMQ> tempCommandMQ;
678    std::unique_ptr<DataMQ> tempDataMQ;
679    std::unique_ptr<StatusMQ> tempStatusMQ;
680    Result retval;
681    pid_t halThreadPid, halThreadTid;
682    Return<void> ret = mStream->prepareForReading(
683            1, bufferSize,
684            [&](Result r,
685                    const CommandMQ::Descriptor& commandMQ,
686                    const DataMQ::Descriptor& dataMQ,
687                    const StatusMQ::Descriptor& statusMQ,
688                    const ThreadInfo& halThreadInfo) {
689                retval = r;
690                if (retval == Result::OK) {
691                    tempCommandMQ.reset(new CommandMQ(commandMQ));
692                    tempDataMQ.reset(new DataMQ(dataMQ));
693                    tempStatusMQ.reset(new StatusMQ(statusMQ));
694                    if (tempDataMQ->isValid() && tempDataMQ->getEventFlagWord()) {
695                        EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(), &mEfGroup);
696                    }
697                    halThreadPid = halThreadInfo.pid;
698                    halThreadTid = halThreadInfo.tid;
699                }
700            });
701    if (!ret.isOk() || retval != Result::OK) {
702        return processReturn("prepareForReading", ret, retval);
703    }
704    if (!tempCommandMQ || !tempCommandMQ->isValid() ||
705            !tempDataMQ || !tempDataMQ->isValid() ||
706            !tempStatusMQ || !tempStatusMQ->isValid() ||
707            !mEfGroup) {
708        ALOGE_IF(!tempCommandMQ, "Failed to obtain command message queue for writing");
709        ALOGE_IF(tempCommandMQ && !tempCommandMQ->isValid(),
710                "Command message queue for writing is invalid");
711        ALOGE_IF(!tempDataMQ, "Failed to obtain data message queue for reading");
712        ALOGE_IF(tempDataMQ && !tempDataMQ->isValid(), "Data message queue for reading is invalid");
713        ALOGE_IF(!tempStatusMQ, "Failed to obtain status message queue for reading");
714        ALOGE_IF(tempStatusMQ && !tempStatusMQ->isValid(),
715                "Status message queue for reading is invalid");
716        ALOGE_IF(!mEfGroup, "Event flag creation for reading failed");
717        return NO_INIT;
718    }
719    requestHalThreadPriority(halThreadPid, halThreadTid);
720
721    mCommandMQ = std::move(tempCommandMQ);
722    mDataMQ = std::move(tempDataMQ);
723    mStatusMQ = std::move(tempStatusMQ);
724    mReaderClient = gettid();
725    return OK;
726}
727
728status_t StreamInHalHidl::getInputFramesLost(uint32_t *framesLost) {
729    if (mStream == 0) return NO_INIT;
730    return processReturn("getInputFramesLost", mStream->getInputFramesLost(), framesLost);
731}
732
733status_t StreamInHalHidl::getCapturePosition(int64_t *frames, int64_t *time) {
734    if (mStream == 0) return NO_INIT;
735    if (mReaderClient == gettid() && mCommandMQ) {
736        ReadParameters params;
737        params.command = ReadCommand::GET_CAPTURE_POSITION;
738        return callReaderThread(params, "getCapturePosition",
739                [&](const ReadStatus& readStatus) {
740                    *frames = readStatus.reply.capturePosition.frames;
741                    *time = readStatus.reply.capturePosition.time;
742                });
743    } else {
744        Result retval;
745        Return<void> ret = mStream->getCapturePosition(
746                [&](Result r, uint64_t hidlFrames, uint64_t hidlTime) {
747                    retval = r;
748                    if (retval == Result::OK) {
749                        *frames = hidlFrames;
750                        *time = hidlTime;
751                    }
752                });
753        return processReturn("getCapturePosition", ret, retval);
754    }
755}
756
757status_t StreamInHalHidl::getActiveMicrophones(
758        std::vector<media::MicrophoneInfo> *microphones __unused) {
759    if (mStream == 0) return NO_INIT;
760    return INVALID_OPERATION;
761}
762
763status_t StreamInHalHidl::updateSinkMetadata(const SinkMetadata& /* sinkMetadata */) {
764    // Audio HAL V2.0 does not support propagating sink metadata
765    return INVALID_OPERATION;
766}
767
768} // namespace android
769