1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "media/audio/pulse/pulse_unified.h"
6
7#include "base/message_loop/message_loop.h"
8#include "base/time/time.h"
9#include "media/audio/audio_manager_base.h"
10#include "media/audio/audio_parameters.h"
11#include "media/audio/pulse/pulse_util.h"
12#include "media/base/seekable_buffer.h"
13
14namespace media {
15
16using pulse::AutoPulseLock;
17using pulse::WaitForOperationCompletion;
18
19static const int kFifoSizeInPackets = 10;
20
21// static, pa_stream_notify_cb
22void PulseAudioUnifiedStream::StreamNotifyCallback(pa_stream* s,
23                                                   void* user_data) {
24  PulseAudioUnifiedStream* stream =
25      static_cast<PulseAudioUnifiedStream*>(user_data);
26
27  // Forward unexpected failures to the AudioSourceCallback if available.  All
28  // these variables are only modified under pa_threaded_mainloop_lock() so this
29  // should be thread safe.
30  if (s && stream->source_callback_ &&
31      pa_stream_get_state(s) == PA_STREAM_FAILED) {
32    stream->source_callback_->OnError(stream);
33  }
34
35  pa_threaded_mainloop_signal(stream->pa_mainloop_, 0);
36}
37
38// static, used by pa_stream_set_read_callback.
39void PulseAudioUnifiedStream::ReadCallback(pa_stream* handle, size_t length,
40                                           void* user_data) {
41  static_cast<PulseAudioUnifiedStream*>(user_data)->ReadData();
42}
43
44PulseAudioUnifiedStream::PulseAudioUnifiedStream(
45    const AudioParameters& params,
46    const std::string& input_device_id,
47    AudioManagerBase* manager)
48    : params_(params),
49      input_device_id_(input_device_id),
50      manager_(manager),
51      pa_context_(NULL),
52      pa_mainloop_(NULL),
53      input_stream_(NULL),
54      output_stream_(NULL),
55      volume_(1.0f),
56      source_callback_(NULL) {
57  DCHECK(manager_->GetMessageLoop()->BelongsToCurrentThread());
58  CHECK(params_.IsValid());
59  input_bus_ = AudioBus::Create(params_);
60  output_bus_ = AudioBus::Create(params_);
61}
62
63PulseAudioUnifiedStream::~PulseAudioUnifiedStream() {
64  // All internal structures should already have been freed in Close(), which
65  // calls AudioManagerBase::ReleaseOutputStream() which deletes this object.
66  DCHECK(!input_stream_);
67  DCHECK(!output_stream_);
68  DCHECK(!pa_context_);
69  DCHECK(!pa_mainloop_);
70}
71
72bool PulseAudioUnifiedStream::Open() {
73  DCHECK(manager_->GetMessageLoop()->BelongsToCurrentThread());
74  // Prepare the recording buffers for the callbacks.
75  fifo_.reset(new media::SeekableBuffer(
76      0, kFifoSizeInPackets * params_.GetBytesPerBuffer()));
77  input_data_buffer_.reset(new uint8[params_.GetBytesPerBuffer()]);
78
79  if (!pulse::CreateOutputStream(&pa_mainloop_, &pa_context_, &output_stream_,
80                                 params_, &StreamNotifyCallback, NULL, this))
81    return false;
82
83  if (!pulse::CreateInputStream(pa_mainloop_, pa_context_, &input_stream_,
84                                params_, input_device_id_,
85                                &StreamNotifyCallback, this))
86    return false;
87
88  DCHECK(pa_mainloop_);
89  DCHECK(pa_context_);
90  DCHECK(input_stream_);
91  DCHECK(output_stream_);
92  return true;
93}
94
95void PulseAudioUnifiedStream::Reset() {
96  if (!pa_mainloop_) {
97    DCHECK(!input_stream_);
98    DCHECK(!output_stream_);
99    DCHECK(!pa_context_);
100    return;
101  }
102
103  {
104    AutoPulseLock auto_lock(pa_mainloop_);
105
106    // Close the input stream.
107    if (input_stream_) {
108      // Disable all the callbacks before disconnecting.
109      pa_stream_set_state_callback(input_stream_, NULL, NULL);
110      pa_stream_flush(input_stream_, NULL, NULL);
111      pa_stream_disconnect(input_stream_);
112
113      // Release PulseAudio structures.
114      pa_stream_unref(input_stream_);
115      input_stream_ = NULL;
116    }
117
118    // Close the ouput stream.
119    if (output_stream_) {
120      // Release PulseAudio output stream structures.
121      pa_stream_set_state_callback(output_stream_, NULL, NULL);
122      pa_stream_disconnect(output_stream_);
123      pa_stream_unref(output_stream_);
124      output_stream_ = NULL;
125    }
126
127    if (pa_context_) {
128      pa_context_disconnect(pa_context_);
129      pa_context_set_state_callback(pa_context_, NULL, NULL);
130      pa_context_unref(pa_context_);
131      pa_context_ = NULL;
132    }
133  }
134
135  pa_threaded_mainloop_stop(pa_mainloop_);
136  pa_threaded_mainloop_free(pa_mainloop_);
137  pa_mainloop_ = NULL;
138}
139
140void PulseAudioUnifiedStream::Close() {
141  DCHECK(manager_->GetMessageLoop()->BelongsToCurrentThread());
142  Reset();
143
144  // Signal to the manager that we're closed and can be removed.
145  // This should be the last call in the function as it deletes "this".
146  manager_->ReleaseOutputStream(this);
147}
148
149void PulseAudioUnifiedStream::WriteData(size_t requested_bytes) {
150  CHECK_EQ(requested_bytes, static_cast<size_t>(params_.GetBytesPerBuffer()));
151
152  void* buffer = NULL;
153  int frames_filled = 0;
154  if (source_callback_) {
155    CHECK_GE(pa_stream_begin_write(
156        output_stream_, &buffer, &requested_bytes), 0);
157    uint32 hardware_delay = pulse::GetHardwareLatencyInBytes(
158        output_stream_, params_.sample_rate(),
159        params_.GetBytesPerFrame());
160    fifo_->Read(input_data_buffer_.get(), requested_bytes);
161    input_bus_->FromInterleaved(
162        input_data_buffer_.get(), params_.frames_per_buffer(), 2);
163
164    frames_filled = source_callback_->OnMoreIOData(
165        input_bus_.get(),
166        output_bus_.get(),
167        AudioBuffersState(0, hardware_delay));
168  }
169
170  // Zero the unfilled data so it plays back as silence.
171  if (frames_filled < output_bus_->frames()) {
172    output_bus_->ZeroFramesPartial(
173        frames_filled, output_bus_->frames() - frames_filled);
174  }
175
176  // Note: If this ever changes to output raw float the data must be clipped
177  // and sanitized since it may come from an untrusted source such as NaCl.
178  output_bus_->Scale(volume_);
179  output_bus_->ToInterleaved(
180      output_bus_->frames(), params_.bits_per_sample() / 8, buffer);
181
182  if (pa_stream_write(output_stream_, buffer, requested_bytes, NULL, 0LL,
183                      PA_SEEK_RELATIVE) < 0) {
184    if (source_callback_) {
185      source_callback_->OnError(this);
186    }
187  }
188}
189
190void PulseAudioUnifiedStream::ReadData() {
191  do {
192    size_t length = 0;
193    const void* data = NULL;
194    pa_stream_peek(input_stream_, &data, &length);
195    if (!data || length == 0)
196      break;
197
198    fifo_->Append(reinterpret_cast<const uint8*>(data), length);
199
200    // Deliver the recording data to the renderer and drive the playout.
201    int packet_size = params_.GetBytesPerBuffer();
202    while (fifo_->forward_bytes() >= packet_size) {
203      WriteData(packet_size);
204    }
205
206    // Checks if we still have data.
207    pa_stream_drop(input_stream_);
208  } while (pa_stream_readable_size(input_stream_) > 0);
209
210  pa_threaded_mainloop_signal(pa_mainloop_, 0);
211}
212
213void PulseAudioUnifiedStream::Start(AudioSourceCallback* callback) {
214  DCHECK(manager_->GetMessageLoop()->BelongsToCurrentThread());
215  CHECK(callback);
216  CHECK(input_stream_);
217  CHECK(output_stream_);
218  AutoPulseLock auto_lock(pa_mainloop_);
219
220  // Ensure the context and stream are ready.
221  if (pa_context_get_state(pa_context_) != PA_CONTEXT_READY &&
222      pa_stream_get_state(output_stream_) != PA_STREAM_READY &&
223      pa_stream_get_state(input_stream_) != PA_STREAM_READY) {
224    callback->OnError(this);
225    return;
226  }
227
228  source_callback_ = callback;
229
230  fifo_->Clear();
231
232  // Uncork (resume) the input stream.
233  pa_stream_set_read_callback(input_stream_, &ReadCallback, this);
234  pa_stream_readable_size(input_stream_);
235  pa_operation* operation = pa_stream_cork(input_stream_, 0, NULL, NULL);
236  WaitForOperationCompletion(pa_mainloop_, operation);
237
238  // Uncork (resume) the output stream.
239  // We use the recording stream to drive the playback, so we do not need to
240  // register the write callback using pa_stream_set_write_callback().
241  operation = pa_stream_cork(output_stream_, 0,
242                             &pulse::StreamSuccessCallback, pa_mainloop_);
243  WaitForOperationCompletion(pa_mainloop_, operation);
244}
245
246void PulseAudioUnifiedStream::Stop() {
247  DCHECK(manager_->GetMessageLoop()->BelongsToCurrentThread());
248
249  // Cork (pause) the stream.  Waiting for the main loop lock will ensure
250  // outstanding callbacks have completed.
251  AutoPulseLock auto_lock(pa_mainloop_);
252
253  // Set |source_callback_| to NULL so all FulfillWriteRequest() calls which may
254  // occur while waiting on the flush and cork exit immediately.
255  source_callback_ = NULL;
256
257  // Set the read callback to NULL before flushing the stream, otherwise it
258  // will cause deadlock on the operation.
259  pa_stream_set_read_callback(input_stream_, NULL, NULL);
260  pa_operation* operation = pa_stream_flush(
261      input_stream_, &pulse::StreamSuccessCallback, pa_mainloop_);
262  WaitForOperationCompletion(pa_mainloop_, operation);
263
264  operation = pa_stream_cork(input_stream_, 1, &pulse::StreamSuccessCallback,
265                             pa_mainloop_);
266  WaitForOperationCompletion(pa_mainloop_, operation);
267
268  // Flush the stream prior to cork, doing so after will cause hangs.  Write
269  // callbacks are suspended while inside pa_threaded_mainloop_lock() so this
270  // is all thread safe.
271  operation = pa_stream_flush(
272      output_stream_, &pulse::StreamSuccessCallback, pa_mainloop_);
273  WaitForOperationCompletion(pa_mainloop_, operation);
274
275  operation = pa_stream_cork(output_stream_, 1, &pulse::StreamSuccessCallback,
276                             pa_mainloop_);
277  WaitForOperationCompletion(pa_mainloop_, operation);
278}
279
280void PulseAudioUnifiedStream::SetVolume(double volume) {
281  DCHECK(manager_->GetMessageLoop()->BelongsToCurrentThread());
282
283  volume_ = static_cast<float>(volume);
284}
285
286void PulseAudioUnifiedStream::GetVolume(double* volume) {
287  DCHECK(manager_->GetMessageLoop()->BelongsToCurrentThread());
288
289  *volume = volume_;
290}
291
292}  // namespace media
293