1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "remoting/host/linux/audio_pipe_reader.h"
6
7#include <fcntl.h>
8#include <sys/stat.h>
9#include <sys/types.h>
10#include <unistd.h>
11
12#include "base/files/file_path.h"
13#include "base/logging.h"
14#include "base/posix/eintr_wrapper.h"
15#include "base/stl_util.h"
16
17namespace remoting {
18
19namespace {
20
21// PulseAudio's module-pipe-sink must be configured to use the following
22// parameters for the sink we read from.
23const int kSamplesPerSecond = 48000;
24const int kChannels = 2;
25const int kBytesPerSample = 2;
26const int kSampleBytesPerSecond =
27    kSamplesPerSecond * kChannels * kBytesPerSample;
28
29// Read data from the pipe every 40ms.
30const int kCapturingPeriodMs = 40;
31
32// Size of the pipe buffer in milliseconds.
33const int kPipeBufferSizeMs = kCapturingPeriodMs * 2;
34
35// Size of the pipe buffer in bytes.
36const int kPipeBufferSizeBytes = kPipeBufferSizeMs * kSampleBytesPerSecond /
37    base::Time::kMillisecondsPerSecond;
38
39#if !defined(F_SETPIPE_SZ)
40// F_SETPIPE_SZ is supported only starting linux 2.6.35, but we want to be able
41// to compile this code on machines with older kernel.
42#define F_SETPIPE_SZ 1031
43#endif  // defined(F_SETPIPE_SZ)
44
45}  // namespace
46
47// static
48scoped_refptr<AudioPipeReader> AudioPipeReader::Create(
49    scoped_refptr<base::SingleThreadTaskRunner> task_runner,
50    const base::FilePath& pipe_name) {
51  // Create a reference to the new AudioPipeReader before posting the
52  // StartOnAudioThread task, otherwise it may be deleted on the audio
53  // thread before we return.
54  scoped_refptr<AudioPipeReader> pipe_reader =
55      new AudioPipeReader(task_runner);
56  task_runner->PostTask(FROM_HERE, base::Bind(
57      &AudioPipeReader::StartOnAudioThread, pipe_reader, pipe_name));
58  return pipe_reader;
59}
60
61void AudioPipeReader::StartOnAudioThread(const base::FilePath& pipe_name) {
62  DCHECK(task_runner_->BelongsToCurrentThread());
63
64  pipe_fd_ = HANDLE_EINTR(open(
65      pipe_name.value().c_str(), O_RDONLY | O_NONBLOCK));
66  if (pipe_fd_ < 0) {
67    LOG(ERROR) << "Failed to open " << pipe_name.value();
68    return;
69  }
70
71  // Set buffer size for the pipe.
72  int result = HANDLE_EINTR(
73      fcntl(pipe_fd_, F_SETPIPE_SZ, kPipeBufferSizeBytes));
74  if (result < 0) {
75    PLOG(ERROR) << "fcntl";
76  }
77
78  WaitForPipeReadable();
79}
80
81AudioPipeReader::AudioPipeReader(
82    scoped_refptr<base::SingleThreadTaskRunner> task_runner)
83    : task_runner_(task_runner),
84      observers_(new ObserverListThreadSafe<StreamObserver>()) {
85}
86
87AudioPipeReader::~AudioPipeReader() {
88}
89
90void AudioPipeReader::AddObserver(StreamObserver* observer) {
91  observers_->AddObserver(observer);
92}
93void AudioPipeReader::RemoveObserver(StreamObserver* observer) {
94  observers_->RemoveObserver(observer);
95}
96
97void AudioPipeReader::OnFileCanReadWithoutBlocking(int fd) {
98  DCHECK_EQ(fd, pipe_fd_);
99  StartTimer();
100}
101
102void AudioPipeReader::OnFileCanWriteWithoutBlocking(int fd) {
103  NOTREACHED();
104}
105
106void AudioPipeReader::StartTimer() {
107  DCHECK(task_runner_->BelongsToCurrentThread());
108  started_time_ = base::TimeTicks::Now();
109  last_capture_position_ = 0;
110  timer_.Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kCapturingPeriodMs),
111               this, &AudioPipeReader::DoCapture);
112}
113
114void AudioPipeReader::DoCapture() {
115  DCHECK(task_runner_->BelongsToCurrentThread());
116  DCHECK_GT(pipe_fd_, 0);
117
118  // Calculate how much we need read from the pipe. Pulseaudio doesn't control
119  // how much data it writes to the pipe, so we need to pace the stream, so
120  // that we read the exact number of the samples per second we need.
121  base::TimeDelta stream_position = base::TimeTicks::Now() - started_time_;
122  int64 stream_position_bytes = stream_position.InMilliseconds() *
123      kSampleBytesPerSecond / base::Time::kMillisecondsPerSecond;
124  int64 bytes_to_read = stream_position_bytes - last_capture_position_;
125
126  std::string data = left_over_bytes_;
127  size_t pos = data.size();
128  left_over_bytes_.clear();
129  data.resize(pos + bytes_to_read);
130
131  while (pos < data.size()) {
132    int read_result = HANDLE_EINTR(
133       read(pipe_fd_, string_as_array(&data) + pos, data.size() - pos));
134    if (read_result > 0) {
135      pos += read_result;
136    } else {
137      if (read_result < 0 && errno != EWOULDBLOCK && errno != EAGAIN)
138        PLOG(ERROR) << "read";
139      break;
140    }
141  }
142
143  // Stop reading from the pipe if PulseAudio isn't writing anything.
144  if (pos == 0) {
145    WaitForPipeReadable();
146    return;
147  }
148
149  // Save any incomplete samples we've read for later. Each packet should
150  // contain integer number of samples.
151  int incomplete_samples_bytes = pos % (kChannels * kBytesPerSample);
152  left_over_bytes_.assign(data, pos - incomplete_samples_bytes,
153                          incomplete_samples_bytes);
154  data.resize(pos - incomplete_samples_bytes);
155
156  last_capture_position_ += data.size();
157  // Normally PulseAudio will keep pipe buffer full, so we should always be able
158  // to read |bytes_to_read| bytes, but in case it's misbehaving we need to make
159  // sure that |stream_position_bytes| doesn't go out of sync with the current
160  // stream position.
161  if (stream_position_bytes - last_capture_position_ > kPipeBufferSizeBytes)
162    last_capture_position_ = stream_position_bytes - kPipeBufferSizeBytes;
163  DCHECK_LE(last_capture_position_, stream_position_bytes);
164
165  // Dispatch asynchronous notification to the stream observers.
166  scoped_refptr<base::RefCountedString> data_ref =
167      base::RefCountedString::TakeString(&data);
168  observers_->Notify(&StreamObserver::OnDataRead, data_ref);
169}
170
171void AudioPipeReader::WaitForPipeReadable() {
172  timer_.Stop();
173  base::MessageLoopForIO::current()->WatchFileDescriptor(
174      pipe_fd_,
175      false,
176      base::MessageLoopForIO::WATCH_READ,
177      &file_descriptor_watcher_,
178      this);
179}
180
181// static
182void AudioPipeReaderTraits::Destruct(const AudioPipeReader* audio_pipe_reader) {
183  audio_pipe_reader->task_runner_->DeleteSoon(FROM_HERE, audio_pipe_reader);
184}
185
186}  // namespace remoting
187