1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "chrome/browser/media/webrtc_rtp_dump_writer.h"
6
7#include "base/big_endian.h"
8#include "base/files/file_util.h"
9#include "base/logging.h"
10#include "content/public/browser/browser_thread.h"
11#include "third_party/zlib/zlib.h"
12
13using content::BrowserThread;
14
15namespace {
16
17static const size_t kMinimumGzipOutputBufferSize = 256;  // In bytes.
18
19const unsigned char kRtpDumpFileHeaderFirstLine[] = "#!rtpplay1.0 0.0.0.0/0\n";
20static const size_t kRtpDumpFileHeaderSize = 16;  // In bytes.
21
22// A helper for writing the header of the dump file.
23void WriteRtpDumpFileHeaderBigEndian(base::TimeTicks start,
24                                     std::vector<uint8>* output) {
25  size_t buffer_start_pos = output->size();
26  output->resize(output->size() + kRtpDumpFileHeaderSize);
27
28  char* buffer = reinterpret_cast<char*>(&(*output)[buffer_start_pos]);
29
30  base::TimeDelta delta = start - base::TimeTicks();
31  uint32 start_sec = delta.InSeconds();
32  base::WriteBigEndian(buffer, start_sec);
33  buffer += sizeof(start_sec);
34
35  uint32 start_usec =
36      delta.InMilliseconds() * base::Time::kMicrosecondsPerMillisecond;
37  base::WriteBigEndian(buffer, start_usec);
38  buffer += sizeof(start_usec);
39
40  // Network source, always 0.
41  base::WriteBigEndian(buffer, uint32(0));
42  buffer += sizeof(uint32);
43
44  // UDP port, always 0.
45  base::WriteBigEndian(buffer, uint16(0));
46  buffer += sizeof(uint16);
47
48  // 2 bytes padding.
49  base::WriteBigEndian(buffer, uint16(0));
50}
51
52// The header size for each packet dump.
53static const size_t kPacketDumpHeaderSize = 8;  // In bytes.
54
55// A helper for writing the header for each packet dump.
56// |start| is the time when the recording is started.
57// |dump_length| is the length of the packet dump including this header.
58// |packet_length| is the length of the RTP packet header.
59void WritePacketDumpHeaderBigEndian(const base::TimeTicks& start,
60                                    uint16 dump_length,
61                                    uint16 packet_length,
62                                    std::vector<uint8>* output) {
63  size_t buffer_start_pos = output->size();
64  output->resize(output->size() + kPacketDumpHeaderSize);
65
66  char* buffer = reinterpret_cast<char*>(&(*output)[buffer_start_pos]);
67
68  base::WriteBigEndian(buffer, dump_length);
69  buffer += sizeof(dump_length);
70
71  base::WriteBigEndian(buffer, packet_length);
72  buffer += sizeof(packet_length);
73
74  uint32 elapsed =
75      static_cast<uint32>((base::TimeTicks::Now() - start).InMilliseconds());
76  base::WriteBigEndian(buffer, elapsed);
77}
78
79// Append |src_len| bytes from |src| to |dest|.
80void AppendToBuffer(const uint8* src,
81                    size_t src_len,
82                    std::vector<uint8>* dest) {
83  size_t old_dest_size = dest->size();
84  dest->resize(old_dest_size + src_len);
85  memcpy(&(*dest)[old_dest_size], src, src_len);
86}
87
88}  // namespace
89
90// This class is running on the FILE thread for compressing and writing the
91// dump buffer to disk.
92class WebRtcRtpDumpWriter::FileThreadWorker {
93 public:
94  explicit FileThreadWorker(const base::FilePath& dump_path)
95      : dump_path_(dump_path) {
96    thread_checker_.DetachFromThread();
97
98    memset(&stream_, 0, sizeof(stream_));
99    int result = deflateInit2(&stream_,
100                              Z_DEFAULT_COMPRESSION,
101                              Z_DEFLATED,
102                              // windowBits = 15 is default, 16 is added to
103                              // produce a gzip header + trailer.
104                              15 + 16,
105                              8,  // memLevel = 8 is default.
106                              Z_DEFAULT_STRATEGY);
107    DCHECK_EQ(Z_OK, result);
108  }
109
110  ~FileThreadWorker() {
111    DCHECK(thread_checker_.CalledOnValidThread());
112
113    // Makes sure all allocations are freed.
114    deflateEnd(&stream_);
115  }
116
117  // Compresses the data in |buffer| and write to the dump file. If |end_stream|
118  // is true, the compression stream will be ended and the dump file cannot be
119  // written to any more.
120  void CompressAndWriteToFileOnFileThread(
121      scoped_ptr<std::vector<uint8> > buffer,
122      bool end_stream,
123      FlushResult* result,
124      size_t* bytes_written) {
125    DCHECK(thread_checker_.CalledOnValidThread());
126
127    // This is called either when the in-memory buffer is full or the dump
128    // should be ended.
129    DCHECK(!buffer->empty() || end_stream);
130
131    *result = FLUSH_RESULT_SUCCESS;
132    *bytes_written = 0;
133
134    // There may be nothing to compress/write if there is no RTP packet since
135    // the last flush.
136    if (!buffer->empty()) {
137      *bytes_written = CompressAndWriteBufferToFile(buffer.get(), result);
138    } else if (!base::PathExists(dump_path_)) {
139      // If the dump does not exist, it means there is no RTP packet recorded.
140      // Return FLUSH_RESULT_NO_DATA to indicate no dump file created.
141      *result = FLUSH_RESULT_NO_DATA;
142    }
143
144    if (end_stream && !EndDumpFile())
145      *result = FLUSH_RESULT_FAILURE;
146  }
147
148 private:
149  // Helper for CompressAndWriteToFileOnFileThread to compress and write one
150  // dump.
151  size_t CompressAndWriteBufferToFile(std::vector<uint8>* buffer,
152                                      FlushResult* result) {
153    DCHECK(thread_checker_.CalledOnValidThread());
154    DCHECK(buffer->size());
155
156    *result = FLUSH_RESULT_SUCCESS;
157
158    std::vector<uint8> compressed_buffer;
159    if (!Compress(buffer, &compressed_buffer)) {
160      DVLOG(2) << "Compressing buffer failed.";
161      *result = FLUSH_RESULT_FAILURE;
162      return 0;
163    }
164
165    int bytes_written = -1;
166
167    if (base::PathExists(dump_path_)) {
168      bytes_written = base::AppendToFile(
169          dump_path_,
170          reinterpret_cast<const char*>(&compressed_buffer[0]),
171          compressed_buffer.size());
172    } else {
173      bytes_written = base::WriteFile(
174          dump_path_,
175          reinterpret_cast<const char*>(&compressed_buffer[0]),
176          compressed_buffer.size());
177    }
178
179    if (bytes_written == -1) {
180      DVLOG(2) << "Writing file failed: " << dump_path_.value();
181      *result = FLUSH_RESULT_FAILURE;
182      return 0;
183    }
184
185    DCHECK_EQ(static_cast<size_t>(bytes_written), compressed_buffer.size());
186    return bytes_written;
187  }
188
189  // Compresses |input| into |output|.
190  bool Compress(std::vector<uint8>* input, std::vector<uint8>* output) {
191    DCHECK(thread_checker_.CalledOnValidThread());
192    int result = Z_OK;
193
194    output->resize(std::max(kMinimumGzipOutputBufferSize, input->size()));
195
196    stream_.next_in = &(*input)[0];
197    stream_.avail_in = input->size();
198    stream_.next_out = &(*output)[0];
199    stream_.avail_out = output->size();
200
201    result = deflate(&stream_, Z_SYNC_FLUSH);
202    DCHECK_EQ(Z_OK, result);
203    DCHECK_EQ(0U, stream_.avail_in);
204
205    output->resize(output->size() - stream_.avail_out);
206
207    stream_.next_in = NULL;
208    stream_.next_out = NULL;
209    stream_.avail_out = 0;
210    return true;
211  }
212
213  // Ends the compression stream and completes the dump file.
214  bool EndDumpFile() {
215    DCHECK(thread_checker_.CalledOnValidThread());
216
217    std::vector<uint8> output_buffer;
218    output_buffer.resize(kMinimumGzipOutputBufferSize);
219
220    stream_.next_in = NULL;
221    stream_.avail_in = 0;
222    stream_.next_out = &output_buffer[0];
223    stream_.avail_out = output_buffer.size();
224
225    int result = deflate(&stream_, Z_FINISH);
226    DCHECK_EQ(Z_STREAM_END, result);
227
228    result = deflateEnd(&stream_);
229    DCHECK_EQ(Z_OK, result);
230
231    output_buffer.resize(output_buffer.size() - stream_.avail_out);
232
233    memset(&stream_, 0, sizeof(z_stream));
234
235    DCHECK(!output_buffer.empty());
236    int bytes_written =
237        base::AppendToFile(dump_path_,
238                           reinterpret_cast<const char*>(&output_buffer[0]),
239                           output_buffer.size());
240
241    return bytes_written > 0;
242  }
243
244  const base::FilePath dump_path_;
245
246  z_stream stream_;
247
248  base::ThreadChecker thread_checker_;
249
250  DISALLOW_COPY_AND_ASSIGN(FileThreadWorker);
251};
252
253WebRtcRtpDumpWriter::WebRtcRtpDumpWriter(
254    const base::FilePath& incoming_dump_path,
255    const base::FilePath& outgoing_dump_path,
256    size_t max_dump_size,
257    const base::Closure& max_dump_size_reached_callback)
258    : max_dump_size_(max_dump_size),
259      max_dump_size_reached_callback_(max_dump_size_reached_callback),
260      total_dump_size_on_disk_(0),
261      incoming_file_thread_worker_(new FileThreadWorker(incoming_dump_path)),
262      outgoing_file_thread_worker_(new FileThreadWorker(outgoing_dump_path)),
263      weak_ptr_factory_(this) {
264}
265
266WebRtcRtpDumpWriter::~WebRtcRtpDumpWriter() {
267  DCHECK(thread_checker_.CalledOnValidThread());
268
269  bool success = BrowserThread::DeleteSoon(
270      BrowserThread::FILE, FROM_HERE, incoming_file_thread_worker_.release());
271  DCHECK(success);
272
273  success = BrowserThread::DeleteSoon(
274      BrowserThread::FILE, FROM_HERE, outgoing_file_thread_worker_.release());
275  DCHECK(success);
276}
277
278void WebRtcRtpDumpWriter::WriteRtpPacket(const uint8* packet_header,
279                                         size_t header_length,
280                                         size_t packet_length,
281                                         bool incoming) {
282  DCHECK(thread_checker_.CalledOnValidThread());
283
284  static const size_t kMaxInMemoryBufferSize = 65536;
285
286  std::vector<uint8>* dest_buffer =
287      incoming ? &incoming_buffer_ : &outgoing_buffer_;
288
289  // We use the capacity of the buffer to indicate if the buffer has been
290  // initialized and if the dump file header has been created.
291  if (!dest_buffer->capacity()) {
292    dest_buffer->reserve(std::min(kMaxInMemoryBufferSize, max_dump_size_));
293
294    start_time_ = base::TimeTicks::Now();
295
296    // Writes the dump file header.
297    AppendToBuffer(kRtpDumpFileHeaderFirstLine,
298                   arraysize(kRtpDumpFileHeaderFirstLine) - 1,
299                   dest_buffer);
300    WriteRtpDumpFileHeaderBigEndian(start_time_, dest_buffer);
301  }
302
303  size_t packet_dump_length = kPacketDumpHeaderSize + header_length;
304
305  // Flushes the buffer to disk if the buffer is full.
306  if (dest_buffer->size() + packet_dump_length > dest_buffer->capacity())
307    FlushBuffer(incoming, false, FlushDoneCallback());
308
309  WritePacketDumpHeaderBigEndian(
310      start_time_, packet_dump_length, packet_length, dest_buffer);
311
312  // Writes the actual RTP packet header.
313  AppendToBuffer(packet_header, header_length, dest_buffer);
314}
315
316void WebRtcRtpDumpWriter::EndDump(RtpDumpType type,
317                                  const EndDumpCallback& finished_callback) {
318  DCHECK(thread_checker_.CalledOnValidThread());
319  DCHECK(type == RTP_DUMP_OUTGOING || incoming_file_thread_worker_ != NULL);
320  DCHECK(type == RTP_DUMP_INCOMING || outgoing_file_thread_worker_ != NULL);
321
322  bool incoming = (type == RTP_DUMP_BOTH || type == RTP_DUMP_INCOMING);
323  EndDumpContext context(type, finished_callback);
324
325  // End the incoming dump first if required. OnDumpEnded will continue to end
326  // the outgoing dump if necessary.
327  FlushBuffer(incoming,
328              true,
329              base::Bind(&WebRtcRtpDumpWriter::OnDumpEnded,
330                         weak_ptr_factory_.GetWeakPtr(),
331                         context,
332                         incoming));
333}
334
335size_t WebRtcRtpDumpWriter::max_dump_size() const {
336  DCHECK(thread_checker_.CalledOnValidThread());
337  return max_dump_size_;
338}
339
340WebRtcRtpDumpWriter::EndDumpContext::EndDumpContext(
341    RtpDumpType type,
342    const EndDumpCallback& callback)
343    : type(type),
344      incoming_succeeded(false),
345      outgoing_succeeded(false),
346      callback(callback) {
347}
348
349WebRtcRtpDumpWriter::EndDumpContext::~EndDumpContext() {
350}
351
352void WebRtcRtpDumpWriter::FlushBuffer(bool incoming,
353                                      bool end_stream,
354                                      const FlushDoneCallback& callback) {
355  DCHECK(thread_checker_.CalledOnValidThread());
356
357  scoped_ptr<std::vector<uint8> > new_buffer(new std::vector<uint8>());
358
359  if (incoming) {
360    new_buffer->reserve(incoming_buffer_.capacity());
361    new_buffer->swap(incoming_buffer_);
362  } else {
363    new_buffer->reserve(outgoing_buffer_.capacity());
364    new_buffer->swap(outgoing_buffer_);
365  }
366
367  scoped_ptr<FlushResult> result(new FlushResult(FLUSH_RESULT_FAILURE));
368
369  scoped_ptr<size_t> bytes_written(new size_t(0));
370
371  FileThreadWorker* worker = incoming ? incoming_file_thread_worker_.get()
372                                      : outgoing_file_thread_worker_.get();
373
374  // Using "Unretained(worker)" because |worker| is owner by this object and it
375  // guaranteed to be deleted on the FILE thread before this object goes away.
376  base::Closure task =
377      base::Bind(&FileThreadWorker::CompressAndWriteToFileOnFileThread,
378                 base::Unretained(worker),
379                 Passed(&new_buffer),
380                 end_stream,
381                 result.get(),
382                 bytes_written.get());
383
384  // OnFlushDone is necessary to avoid running the callback after this
385  // object is gone.
386  base::Closure reply = base::Bind(&WebRtcRtpDumpWriter::OnFlushDone,
387                                   weak_ptr_factory_.GetWeakPtr(),
388                                   callback,
389                                   Passed(&result),
390                                   Passed(&bytes_written));
391
392  // Define the task and reply outside the method call so that getting and
393  // passing the scoped_ptr does not depend on the argument evaluation order.
394  BrowserThread::PostTaskAndReply(BrowserThread::FILE, FROM_HERE, task, reply);
395
396  if (end_stream) {
397    bool success = BrowserThread::DeleteSoon(
398        BrowserThread::FILE,
399        FROM_HERE,
400        incoming ? incoming_file_thread_worker_.release()
401                 : outgoing_file_thread_worker_.release());
402    DCHECK(success);
403  }
404}
405
406void WebRtcRtpDumpWriter::OnFlushDone(const FlushDoneCallback& callback,
407                                      const scoped_ptr<FlushResult>& result,
408                                      const scoped_ptr<size_t>& bytes_written) {
409  DCHECK(thread_checker_.CalledOnValidThread());
410
411  total_dump_size_on_disk_ += *bytes_written;
412
413  if (total_dump_size_on_disk_ >= max_dump_size_ &&
414      !max_dump_size_reached_callback_.is_null()) {
415    max_dump_size_reached_callback_.Run();
416  }
417
418  // Returns success for FLUSH_RESULT_MAX_SIZE_REACHED since the dump is still
419  // valid.
420  if (!callback.is_null()) {
421    callback.Run(*result != FLUSH_RESULT_FAILURE &&
422                 *result != FLUSH_RESULT_NO_DATA);
423  }
424}
425
426void WebRtcRtpDumpWriter::OnDumpEnded(EndDumpContext context,
427                                      bool incoming,
428                                      bool success) {
429  DCHECK(thread_checker_.CalledOnValidThread());
430
431  DVLOG(2) << "Dump ended, incoming = " << incoming
432           << ", succeeded = " << success;
433
434  if (incoming)
435    context.incoming_succeeded = success;
436  else
437    context.outgoing_succeeded = success;
438
439  // End the outgoing dump if needed.
440  if (incoming && context.type == RTP_DUMP_BOTH) {
441    FlushBuffer(false,
442                true,
443                base::Bind(&WebRtcRtpDumpWriter::OnDumpEnded,
444                           weak_ptr_factory_.GetWeakPtr(),
445                           context,
446                           false));
447    return;
448  }
449
450  // This object might be deleted after running the callback.
451  context.callback.Run(context.incoming_succeeded, context.outgoing_succeeded);
452}
453