1// Copyright (c) 2012 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "remoting/host/linux/audio_pipe_reader.h"
6
7#include <fcntl.h>
8#include <sys/stat.h>
9#include <sys/types.h>
10#include <unistd.h>
11
12#include "base/logging.h"
13#include "base/posix/eintr_wrapper.h"
14#include "base/stl_util.h"
15
16namespace remoting {
17
18namespace {
19
20const int kSampleBytesPerSecond = AudioPipeReader::kSamplingRate *
21                                  AudioPipeReader::kChannels *
22                                  AudioPipeReader::kBytesPerSample;
23
24// Read data from the pipe every 40ms.
25const int kCapturingPeriodMs = 40;
26
27// Size of the pipe buffer in milliseconds.
28const int kPipeBufferSizeMs = kCapturingPeriodMs * 2;
29
30// Size of the pipe buffer in bytes.
31const int kPipeBufferSizeBytes = kPipeBufferSizeMs * kSampleBytesPerSecond /
32    base::Time::kMillisecondsPerSecond;
33
34#if !defined(F_SETPIPE_SZ)
35// F_SETPIPE_SZ is supported only starting linux 2.6.35, but we want to be able
36// to compile this code on machines with older kernel.
37#define F_SETPIPE_SZ 1031
38#endif  // defined(F_SETPIPE_SZ)
39
40}  // namespace
41
42// static
43scoped_refptr<AudioPipeReader> AudioPipeReader::Create(
44    scoped_refptr<base::SingleThreadTaskRunner> task_runner,
45    const base::FilePath& pipe_path) {
46  // Create a reference to the new AudioPipeReader before posting the
47  // StartOnAudioThread task, otherwise it may be deleted on the audio
48  // thread before we return.
49  scoped_refptr<AudioPipeReader> pipe_reader =
50      new AudioPipeReader(task_runner, pipe_path);
51  task_runner->PostTask(
52      FROM_HERE, base::Bind(&AudioPipeReader::StartOnAudioThread, pipe_reader));
53  return pipe_reader;
54}
55
56AudioPipeReader::AudioPipeReader(
57    scoped_refptr<base::SingleThreadTaskRunner> task_runner,
58    const base::FilePath& pipe_path)
59    : task_runner_(task_runner),
60      pipe_path_(pipe_path),
61      observers_(new ObserverListThreadSafe<StreamObserver>()) {
62}
63
64AudioPipeReader::~AudioPipeReader() {}
65
66void AudioPipeReader::AddObserver(StreamObserver* observer) {
67  observers_->AddObserver(observer);
68}
69void AudioPipeReader::RemoveObserver(StreamObserver* observer) {
70  observers_->RemoveObserver(observer);
71}
72
73void AudioPipeReader::OnFileCanReadWithoutBlocking(int fd) {
74  DCHECK_EQ(fd, pipe_.GetPlatformFile());
75  StartTimer();
76}
77
78void AudioPipeReader::OnFileCanWriteWithoutBlocking(int fd) {
79  NOTREACHED();
80}
81
82void AudioPipeReader::StartOnAudioThread() {
83  DCHECK(task_runner_->BelongsToCurrentThread());
84
85  if (!file_watcher_.Watch(pipe_path_.DirName(), true,
86                           base::Bind(&AudioPipeReader::OnDirectoryChanged,
87                                      base::Unretained(this)))) {
88    LOG(ERROR) << "Failed to watch pulseaudio directory "
89               << pipe_path_.DirName().value();
90  }
91
92  TryOpenPipe();
93}
94
95void AudioPipeReader::OnDirectoryChanged(const base::FilePath& path,
96                                         bool error) {
97  DCHECK(task_runner_->BelongsToCurrentThread());
98
99  if (error) {
100    LOG(ERROR) << "File watcher returned an error.";
101    return;
102  }
103
104  TryOpenPipe();
105}
106
107void AudioPipeReader::TryOpenPipe() {
108  DCHECK(task_runner_->BelongsToCurrentThread());
109
110  base::File new_pipe;
111  new_pipe.Initialize(
112      pipe_path_,
113      base::File::FLAG_OPEN | base::File::FLAG_READ | base::File::FLAG_ASYNC);
114
115  // If both |pipe_| and |new_pipe| are valid then compare inodes for the two
116  // file descriptors. Don't need to do anything if inode hasn't changed.
117  if (new_pipe.IsValid() && pipe_.IsValid()) {
118    struct stat old_stat;
119    struct stat new_stat;
120    if (fstat(pipe_.GetPlatformFile(), &old_stat) == 0 &&
121        fstat(new_pipe.GetPlatformFile(), &new_stat) == 0 &&
122        old_stat.st_ino == new_stat.st_ino) {
123      return;
124    }
125  }
126
127  file_descriptor_watcher_.StopWatchingFileDescriptor();
128  timer_.Stop();
129
130  pipe_ = new_pipe.Pass();
131
132  if (pipe_.IsValid()) {
133    // Set O_NONBLOCK flag.
134    if (HANDLE_EINTR(fcntl(pipe_.GetPlatformFile(), F_SETFL, O_NONBLOCK)) < 0) {
135      PLOG(ERROR) << "fcntl";
136      pipe_.Close();
137      return;
138    }
139
140    // Set buffer size for the pipe.
141    if (HANDLE_EINTR(fcntl(
142            pipe_.GetPlatformFile(), F_SETPIPE_SZ, kPipeBufferSizeBytes)) < 0) {
143      PLOG(ERROR) << "fcntl";
144    }
145
146    WaitForPipeReadable();
147  }
148}
149
150void AudioPipeReader::StartTimer() {
151  DCHECK(task_runner_->BelongsToCurrentThread());
152  started_time_ = base::TimeTicks::Now();
153  last_capture_position_ = 0;
154  timer_.Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kCapturingPeriodMs),
155               this, &AudioPipeReader::DoCapture);
156}
157
158void AudioPipeReader::DoCapture() {
159  DCHECK(task_runner_->BelongsToCurrentThread());
160  DCHECK(pipe_.IsValid());
161
162  // Calculate how much we need read from the pipe. Pulseaudio doesn't control
163  // how much data it writes to the pipe, so we need to pace the stream.
164  base::TimeDelta stream_position = base::TimeTicks::Now() - started_time_;
165  int64 stream_position_bytes = stream_position.InMilliseconds() *
166      kSampleBytesPerSecond / base::Time::kMillisecondsPerSecond;
167  int64 bytes_to_read = stream_position_bytes - last_capture_position_;
168
169  std::string data = left_over_bytes_;
170  size_t pos = data.size();
171  left_over_bytes_.clear();
172  data.resize(pos + bytes_to_read);
173
174  while (pos < data.size()) {
175    int read_result =
176        pipe_.ReadAtCurrentPos(string_as_array(&data) + pos, data.size() - pos);
177    if (read_result > 0) {
178      pos += read_result;
179    } else {
180      if (read_result < 0 && errno != EWOULDBLOCK && errno != EAGAIN)
181        PLOG(ERROR) << "read";
182      break;
183    }
184  }
185
186  // Stop reading from the pipe if PulseAudio isn't writing anything.
187  if (pos == 0) {
188    WaitForPipeReadable();
189    return;
190  }
191
192  // Save any incomplete samples we've read for later. Each packet should
193  // contain integer number of samples.
194  int incomplete_samples_bytes = pos % (kChannels * kBytesPerSample);
195  left_over_bytes_.assign(data, pos - incomplete_samples_bytes,
196                          incomplete_samples_bytes);
197  data.resize(pos - incomplete_samples_bytes);
198
199  last_capture_position_ += data.size();
200  // Normally PulseAudio will keep pipe buffer full, so we should always be able
201  // to read |bytes_to_read| bytes, but in case it's misbehaving we need to make
202  // sure that |stream_position_bytes| doesn't go out of sync with the current
203  // stream position.
204  if (stream_position_bytes - last_capture_position_ > kPipeBufferSizeBytes)
205    last_capture_position_ = stream_position_bytes - kPipeBufferSizeBytes;
206  DCHECK_LE(last_capture_position_, stream_position_bytes);
207
208  // Dispatch asynchronous notification to the stream observers.
209  scoped_refptr<base::RefCountedString> data_ref =
210      base::RefCountedString::TakeString(&data);
211  observers_->Notify(&StreamObserver::OnDataRead, data_ref);
212}
213
214void AudioPipeReader::WaitForPipeReadable() {
215  timer_.Stop();
216  base::MessageLoopForIO::current()->WatchFileDescriptor(
217      pipe_.GetPlatformFile(), false, base::MessageLoopForIO::WATCH_READ,
218      &file_descriptor_watcher_, this);
219}
220
221// static
222void AudioPipeReaderTraits::Destruct(const AudioPipeReader* audio_pipe_reader) {
223  audio_pipe_reader->task_runner_->DeleteSoon(FROM_HERE, audio_pipe_reader);
224}
225
226}  // namespace remoting
227