1// Copyright (c) 2012 The Chromium Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style license that can be 3// found in the LICENSE file. 4 5#include "remoting/host/linux/audio_pipe_reader.h" 6 7#include <fcntl.h> 8#include <sys/stat.h> 9#include <sys/types.h> 10#include <unistd.h> 11 12#include "base/logging.h" 13#include "base/posix/eintr_wrapper.h" 14#include "base/stl_util.h" 15 16namespace remoting { 17 18namespace { 19 20const int kSampleBytesPerSecond = AudioPipeReader::kSamplingRate * 21 AudioPipeReader::kChannels * 22 AudioPipeReader::kBytesPerSample; 23 24// Read data from the pipe every 40ms. 25const int kCapturingPeriodMs = 40; 26 27// Size of the pipe buffer in milliseconds. 28const int kPipeBufferSizeMs = kCapturingPeriodMs * 2; 29 30// Size of the pipe buffer in bytes. 31const int kPipeBufferSizeBytes = kPipeBufferSizeMs * kSampleBytesPerSecond / 32 base::Time::kMillisecondsPerSecond; 33 34#if !defined(F_SETPIPE_SZ) 35// F_SETPIPE_SZ is supported only starting linux 2.6.35, but we want to be able 36// to compile this code on machines with older kernel. 37#define F_SETPIPE_SZ 1031 38#endif // defined(F_SETPIPE_SZ) 39 40} // namespace 41 42// static 43scoped_refptr<AudioPipeReader> AudioPipeReader::Create( 44 scoped_refptr<base::SingleThreadTaskRunner> task_runner, 45 const base::FilePath& pipe_path) { 46 // Create a reference to the new AudioPipeReader before posting the 47 // StartOnAudioThread task, otherwise it may be deleted on the audio 48 // thread before we return. 49 scoped_refptr<AudioPipeReader> pipe_reader = 50 new AudioPipeReader(task_runner, pipe_path); 51 task_runner->PostTask( 52 FROM_HERE, base::Bind(&AudioPipeReader::StartOnAudioThread, pipe_reader)); 53 return pipe_reader; 54} 55 56AudioPipeReader::AudioPipeReader( 57 scoped_refptr<base::SingleThreadTaskRunner> task_runner, 58 const base::FilePath& pipe_path) 59 : task_runner_(task_runner), 60 pipe_path_(pipe_path), 61 observers_(new ObserverListThreadSafe<StreamObserver>()) { 62} 63 64AudioPipeReader::~AudioPipeReader() {} 65 66void AudioPipeReader::AddObserver(StreamObserver* observer) { 67 observers_->AddObserver(observer); 68} 69void AudioPipeReader::RemoveObserver(StreamObserver* observer) { 70 observers_->RemoveObserver(observer); 71} 72 73void AudioPipeReader::OnFileCanReadWithoutBlocking(int fd) { 74 DCHECK_EQ(fd, pipe_.GetPlatformFile()); 75 StartTimer(); 76} 77 78void AudioPipeReader::OnFileCanWriteWithoutBlocking(int fd) { 79 NOTREACHED(); 80} 81 82void AudioPipeReader::StartOnAudioThread() { 83 DCHECK(task_runner_->BelongsToCurrentThread()); 84 85 if (!file_watcher_.Watch(pipe_path_.DirName(), true, 86 base::Bind(&AudioPipeReader::OnDirectoryChanged, 87 base::Unretained(this)))) { 88 LOG(ERROR) << "Failed to watch pulseaudio directory " 89 << pipe_path_.DirName().value(); 90 } 91 92 TryOpenPipe(); 93} 94 95void AudioPipeReader::OnDirectoryChanged(const base::FilePath& path, 96 bool error) { 97 DCHECK(task_runner_->BelongsToCurrentThread()); 98 99 if (error) { 100 LOG(ERROR) << "File watcher returned an error."; 101 return; 102 } 103 104 TryOpenPipe(); 105} 106 107void AudioPipeReader::TryOpenPipe() { 108 DCHECK(task_runner_->BelongsToCurrentThread()); 109 110 base::File new_pipe; 111 new_pipe.Initialize( 112 pipe_path_, 113 base::File::FLAG_OPEN | base::File::FLAG_READ | base::File::FLAG_ASYNC); 114 115 // If both |pipe_| and |new_pipe| are valid then compare inodes for the two 116 // file descriptors. Don't need to do anything if inode hasn't changed. 117 if (new_pipe.IsValid() && pipe_.IsValid()) { 118 struct stat old_stat; 119 struct stat new_stat; 120 if (fstat(pipe_.GetPlatformFile(), &old_stat) == 0 && 121 fstat(new_pipe.GetPlatformFile(), &new_stat) == 0 && 122 old_stat.st_ino == new_stat.st_ino) { 123 return; 124 } 125 } 126 127 file_descriptor_watcher_.StopWatchingFileDescriptor(); 128 timer_.Stop(); 129 130 pipe_ = new_pipe.Pass(); 131 132 if (pipe_.IsValid()) { 133 // Set O_NONBLOCK flag. 134 if (HANDLE_EINTR(fcntl(pipe_.GetPlatformFile(), F_SETFL, O_NONBLOCK)) < 0) { 135 PLOG(ERROR) << "fcntl"; 136 pipe_.Close(); 137 return; 138 } 139 140 // Set buffer size for the pipe. 141 if (HANDLE_EINTR(fcntl( 142 pipe_.GetPlatformFile(), F_SETPIPE_SZ, kPipeBufferSizeBytes)) < 0) { 143 PLOG(ERROR) << "fcntl"; 144 } 145 146 WaitForPipeReadable(); 147 } 148} 149 150void AudioPipeReader::StartTimer() { 151 DCHECK(task_runner_->BelongsToCurrentThread()); 152 started_time_ = base::TimeTicks::Now(); 153 last_capture_position_ = 0; 154 timer_.Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kCapturingPeriodMs), 155 this, &AudioPipeReader::DoCapture); 156} 157 158void AudioPipeReader::DoCapture() { 159 DCHECK(task_runner_->BelongsToCurrentThread()); 160 DCHECK(pipe_.IsValid()); 161 162 // Calculate how much we need read from the pipe. Pulseaudio doesn't control 163 // how much data it writes to the pipe, so we need to pace the stream. 164 base::TimeDelta stream_position = base::TimeTicks::Now() - started_time_; 165 int64 stream_position_bytes = stream_position.InMilliseconds() * 166 kSampleBytesPerSecond / base::Time::kMillisecondsPerSecond; 167 int64 bytes_to_read = stream_position_bytes - last_capture_position_; 168 169 std::string data = left_over_bytes_; 170 size_t pos = data.size(); 171 left_over_bytes_.clear(); 172 data.resize(pos + bytes_to_read); 173 174 while (pos < data.size()) { 175 int read_result = 176 pipe_.ReadAtCurrentPos(string_as_array(&data) + pos, data.size() - pos); 177 if (read_result > 0) { 178 pos += read_result; 179 } else { 180 if (read_result < 0 && errno != EWOULDBLOCK && errno != EAGAIN) 181 PLOG(ERROR) << "read"; 182 break; 183 } 184 } 185 186 // Stop reading from the pipe if PulseAudio isn't writing anything. 187 if (pos == 0) { 188 WaitForPipeReadable(); 189 return; 190 } 191 192 // Save any incomplete samples we've read for later. Each packet should 193 // contain integer number of samples. 194 int incomplete_samples_bytes = pos % (kChannels * kBytesPerSample); 195 left_over_bytes_.assign(data, pos - incomplete_samples_bytes, 196 incomplete_samples_bytes); 197 data.resize(pos - incomplete_samples_bytes); 198 199 last_capture_position_ += data.size(); 200 // Normally PulseAudio will keep pipe buffer full, so we should always be able 201 // to read |bytes_to_read| bytes, but in case it's misbehaving we need to make 202 // sure that |stream_position_bytes| doesn't go out of sync with the current 203 // stream position. 204 if (stream_position_bytes - last_capture_position_ > kPipeBufferSizeBytes) 205 last_capture_position_ = stream_position_bytes - kPipeBufferSizeBytes; 206 DCHECK_LE(last_capture_position_, stream_position_bytes); 207 208 // Dispatch asynchronous notification to the stream observers. 209 scoped_refptr<base::RefCountedString> data_ref = 210 base::RefCountedString::TakeString(&data); 211 observers_->Notify(&StreamObserver::OnDataRead, data_ref); 212} 213 214void AudioPipeReader::WaitForPipeReadable() { 215 timer_.Stop(); 216 base::MessageLoopForIO::current()->WatchFileDescriptor( 217 pipe_.GetPlatformFile(), false, base::MessageLoopForIO::WATCH_READ, 218 &file_descriptor_watcher_, this); 219} 220 221// static 222void AudioPipeReaderTraits::Destruct(const AudioPipeReader* audio_pipe_reader) { 223 audio_pipe_reader->task_runner_->DeleteSoon(FROM_HERE, audio_pipe_reader); 224} 225 226} // namespace remoting 227