1/*
2 *  Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "webrtc/system_wrappers/include/data_log.h"
12
13#include <assert.h>
14
15#include <algorithm>
16#include <list>
17
18#include "webrtc/system_wrappers/include/critical_section_wrapper.h"
19#include "webrtc/system_wrappers/include/event_wrapper.h"
20#include "webrtc/system_wrappers/include/file_wrapper.h"
21#include "webrtc/system_wrappers/include/rw_lock_wrapper.h"
22
23namespace webrtc {
24
25DataLogImpl::CritSectScopedPtr DataLogImpl::crit_sect_(
26  CriticalSectionWrapper::CreateCriticalSection());
27
28DataLogImpl* DataLogImpl::instance_ = NULL;
29
30// A Row contains cells, which are indexed by the column names as std::string.
31// The string index is treated in a case sensitive way.
32class Row {
33 public:
34  Row();
35  ~Row();
36
37  // Inserts a Container into the cell of the column specified with
38  // column_name.
39  // column_name is treated in a case sensitive way.
40  int InsertCell(const std::string& column_name,
41                 const Container* value_container);
42
43  // Converts the value at the column specified by column_name to a string
44  // stored in value_string.
45  // column_name is treated in a case sensitive way.
46  void ToString(const std::string& column_name, std::string* value_string);
47
48 private:
49  // Collection of containers indexed by column name as std::string
50  typedef std::map<std::string, const Container*> CellMap;
51
52  CellMap                   cells_;
53  CriticalSectionWrapper*   cells_lock_;
54};
55
56// A LogTable contains multiple rows, where only the latest row is active for
57// editing. The rows are defined by the ColumnMap, which contains the name of
58// each column and the length of the column (1 for one-value-columns and greater
59// than 1 for multi-value-columns).
60class LogTable {
61 public:
62  LogTable();
63  ~LogTable();
64
65  // Adds the column with name column_name to the table. The column will be a
66  // multi-value-column if multi_value_length is greater than 1.
67  // column_name is treated in a case sensitive way.
68  int AddColumn(const std::string& column_name, int multi_value_length);
69
70  // Buffers the current row while it is waiting to be written to file,
71  // which is done by a call to Flush(). A new row is available when the
72  // function returns
73  void NextRow();
74
75  // Inserts a Container into the cell of the column specified with
76  // column_name.
77  // column_name is treated in a case sensitive way.
78  int InsertCell(const std::string& column_name,
79                 const Container* value_container);
80
81  // Creates a log file, named as specified in the string file_name, to
82  // where the table will be written when calling Flush().
83  int CreateLogFile(const std::string& file_name);
84
85  // Write all complete rows to file.
86  // May not be called by two threads simultaneously (doing so may result in
87  // a race condition). Will be called by the file_writer_thread_ when that
88  // thread is running.
89  void Flush();
90
91 private:
92  // Collection of multi_value_lengths indexed by column name as std::string
93  typedef std::map<std::string, int> ColumnMap;
94  typedef std::list<Row*> RowList;
95
96  ColumnMap               columns_;
97  RowList                 rows_[2];
98  RowList*                rows_history_;
99  RowList*                rows_flush_;
100  Row*                    current_row_;
101  FileWrapper*            file_;
102  bool                    write_header_;
103  CriticalSectionWrapper* table_lock_;
104};
105
106Row::Row()
107  : cells_(),
108    cells_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
109}
110
111Row::~Row() {
112  for (CellMap::iterator it = cells_.begin(); it != cells_.end();) {
113    delete it->second;
114    // For maps all iterators (except the erased) are valid after an erase
115    cells_.erase(it++);
116  }
117  delete cells_lock_;
118}
119
120int Row::InsertCell(const std::string& column_name,
121                    const Container* value_container) {
122  CriticalSectionScoped synchronize(cells_lock_);
123  assert(cells_.count(column_name) == 0);
124  if (cells_.count(column_name) > 0)
125    return -1;
126  cells_[column_name] = value_container;
127  return 0;
128}
129
130void Row::ToString(const std::string& column_name,
131                   std::string* value_string) {
132  CriticalSectionScoped synchronize(cells_lock_);
133  const Container* container = cells_[column_name];
134  if (container == NULL) {
135    *value_string = "NaN,";
136    return;
137  }
138  container->ToString(value_string);
139}
140
141LogTable::LogTable()
142  : columns_(),
143    rows_(),
144    rows_history_(&rows_[0]),
145    rows_flush_(&rows_[1]),
146    current_row_(new Row),
147    file_(FileWrapper::Create()),
148    write_header_(true),
149    table_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
150}
151
152LogTable::~LogTable() {
153  for (RowList::iterator row_it = rows_history_->begin();
154       row_it != rows_history_->end();) {
155    delete *row_it;
156    row_it = rows_history_->erase(row_it);
157  }
158  for (ColumnMap::iterator col_it = columns_.begin();
159       col_it != columns_.end();) {
160    // For maps all iterators (except the erased) are valid after an erase
161    columns_.erase(col_it++);
162  }
163  if (file_ != NULL) {
164    file_->Flush();
165    file_->CloseFile();
166    delete file_;
167  }
168  delete current_row_;
169  delete table_lock_;
170}
171
172int LogTable::AddColumn(const std::string& column_name,
173                        int multi_value_length) {
174  assert(multi_value_length > 0);
175  if (!write_header_) {
176    // It's not allowed to add new columns after the header
177    // has been written.
178    assert(false);
179    return -1;
180  } else {
181    CriticalSectionScoped synchronize(table_lock_);
182    if (write_header_)
183      columns_[column_name] = multi_value_length;
184    else
185      return -1;
186  }
187  return 0;
188}
189
190void LogTable::NextRow() {
191  CriticalSectionScoped sync_rows(table_lock_);
192  rows_history_->push_back(current_row_);
193  current_row_ = new Row;
194}
195
196int LogTable::InsertCell(const std::string& column_name,
197                         const Container* value_container) {
198  CriticalSectionScoped synchronize(table_lock_);
199  assert(columns_.count(column_name) > 0);
200  if (columns_.count(column_name) == 0)
201    return -1;
202  return current_row_->InsertCell(column_name, value_container);
203}
204
205int LogTable::CreateLogFile(const std::string& file_name) {
206  if (file_name.length() == 0)
207    return -1;
208  if (file_->Open())
209    return -1;
210  file_->OpenFile(file_name.c_str(),
211                  false,  // Open with read/write permissions
212                  false,  // Don't wraparound and write at the beginning when
213                          // the file is full
214                  true);  // Open as a text file
215  if (file_ == NULL)
216    return -1;
217  return 0;
218}
219
220void LogTable::Flush() {
221  ColumnMap::iterator column_it;
222  bool commit_header = false;
223  if (write_header_) {
224    CriticalSectionScoped synchronize(table_lock_);
225    if (write_header_) {
226      commit_header = true;
227      write_header_ = false;
228    }
229  }
230  if (commit_header) {
231    for (column_it = columns_.begin();
232         column_it != columns_.end(); ++column_it) {
233      if (column_it->second > 1) {
234        file_->WriteText("%s[%u],", column_it->first.c_str(),
235                         column_it->second);
236        for (int i = 1; i < column_it->second; ++i)
237          file_->WriteText(",");
238      } else {
239        file_->WriteText("%s,", column_it->first.c_str());
240      }
241    }
242    if (columns_.size() > 0)
243      file_->WriteText("\n");
244  }
245
246  // Swap the list used for flushing with the list containing the row history
247  // and clear the history. We also create a local pointer to the new
248  // list used for flushing to avoid race conditions if another thread
249  // calls this function while we are writing.
250  // We don't want to block the list while we're writing to file.
251  {
252    CriticalSectionScoped synchronize(table_lock_);
253    RowList* tmp = rows_flush_;
254    rows_flush_ = rows_history_;
255    rows_history_ = tmp;
256    rows_history_->clear();
257  }
258
259  // Write all complete rows to file and delete them
260  for (RowList::iterator row_it = rows_flush_->begin();
261       row_it != rows_flush_->end();) {
262    for (column_it = columns_.begin();
263         column_it != columns_.end(); ++column_it) {
264      std::string row_string;
265      (*row_it)->ToString(column_it->first, &row_string);
266      file_->WriteText("%s", row_string.c_str());
267    }
268    if (columns_.size() > 0)
269      file_->WriteText("\n");
270    delete *row_it;
271    row_it = rows_flush_->erase(row_it);
272  }
273}
274
275int DataLog::CreateLog() {
276  return DataLogImpl::CreateLog();
277}
278
279void DataLog::ReturnLog() {
280  return DataLogImpl::ReturnLog();
281}
282
283std::string DataLog::Combine(const std::string& table_name, int table_id) {
284  std::stringstream ss;
285  std::string combined_id = table_name;
286  std::string number_suffix;
287  ss << "_" << table_id;
288  ss >> number_suffix;
289  combined_id += number_suffix;
290  std::transform(combined_id.begin(), combined_id.end(), combined_id.begin(),
291                 ::tolower);
292  return combined_id;
293}
294
295int DataLog::AddTable(const std::string& table_name) {
296  DataLogImpl* data_log = DataLogImpl::StaticInstance();
297  if (data_log == NULL)
298    return -1;
299  return data_log->AddTable(table_name);
300}
301
302int DataLog::AddColumn(const std::string& table_name,
303                       const std::string& column_name,
304                       int multi_value_length) {
305  DataLogImpl* data_log = DataLogImpl::StaticInstance();
306  if (data_log == NULL)
307    return -1;
308  return data_log->DataLogImpl::StaticInstance()->AddColumn(table_name,
309                                                            column_name,
310                                                            multi_value_length);
311}
312
313int DataLog::NextRow(const std::string& table_name) {
314  DataLogImpl* data_log = DataLogImpl::StaticInstance();
315  if (data_log == NULL)
316    return -1;
317  return data_log->DataLogImpl::StaticInstance()->NextRow(table_name);
318}
319
320DataLogImpl::DataLogImpl()
321    : counter_(1),
322      tables_(),
323      flush_event_(EventWrapper::Create()),
324      file_writer_thread_(
325          new rtc::PlatformThread(DataLogImpl::Run, instance_, "DataLog")),
326      tables_lock_(RWLockWrapper::CreateRWLock()) {}
327
328DataLogImpl::~DataLogImpl() {
329  StopThread();
330  Flush();  // Write any remaining rows
331  delete flush_event_;
332  for (TableMap::iterator it = tables_.begin(); it != tables_.end();) {
333    delete static_cast<LogTable*>(it->second);
334    // For maps all iterators (except the erased) are valid after an erase
335    tables_.erase(it++);
336  }
337  delete tables_lock_;
338}
339
340int DataLogImpl::CreateLog() {
341  CriticalSectionScoped synchronize(crit_sect_.get());
342  if (instance_ == NULL) {
343    instance_ = new DataLogImpl();
344    return instance_->Init();
345  } else {
346    ++instance_->counter_;
347  }
348  return 0;
349}
350
351int DataLogImpl::Init() {
352  file_writer_thread_->Start();
353  file_writer_thread_->SetPriority(rtc::kHighestPriority);
354  return 0;
355}
356
357DataLogImpl* DataLogImpl::StaticInstance() {
358  return instance_;
359}
360
361void DataLogImpl::ReturnLog() {
362  CriticalSectionScoped synchronize(crit_sect_.get());
363  if (instance_ && instance_->counter_ > 1) {
364    --instance_->counter_;
365    return;
366  }
367  delete instance_;
368  instance_ = NULL;
369}
370
371int DataLogImpl::AddTable(const std::string& table_name) {
372  WriteLockScoped synchronize(*tables_lock_);
373  // Make sure we don't add a table which already exists
374  if (tables_.count(table_name) > 0)
375    return -1;
376  tables_[table_name] = new LogTable();
377  if (tables_[table_name]->CreateLogFile(table_name + ".txt") == -1)
378    return -1;
379  return 0;
380}
381
382int DataLogImpl::AddColumn(const std::string& table_name,
383                           const std::string& column_name,
384                           int multi_value_length) {
385  ReadLockScoped synchronize(*tables_lock_);
386  if (tables_.count(table_name) == 0)
387    return -1;
388  return tables_[table_name]->AddColumn(column_name, multi_value_length);
389}
390
391int DataLogImpl::InsertCell(const std::string& table_name,
392                            const std::string& column_name,
393                            const Container* value_container) {
394  ReadLockScoped synchronize(*tables_lock_);
395  assert(tables_.count(table_name) > 0);
396  if (tables_.count(table_name) == 0)
397    return -1;
398  return tables_[table_name]->InsertCell(column_name, value_container);
399}
400
401int DataLogImpl::NextRow(const std::string& table_name) {
402  ReadLockScoped synchronize(*tables_lock_);
403  if (tables_.count(table_name) == 0)
404    return -1;
405  tables_[table_name]->NextRow();
406  // Signal a complete row
407  flush_event_->Set();
408  return 0;
409}
410
411void DataLogImpl::Flush() {
412  ReadLockScoped synchronize(*tables_lock_);
413  for (TableMap::iterator it = tables_.begin(); it != tables_.end(); ++it) {
414    it->second->Flush();
415  }
416}
417
418bool DataLogImpl::Run(void* obj) {
419  static_cast<DataLogImpl*>(obj)->Process();
420  return true;
421}
422
423void DataLogImpl::Process() {
424  // Wait for a row to be complete
425  flush_event_->Wait(WEBRTC_EVENT_INFINITE);
426  Flush();
427}
428
429void DataLogImpl::StopThread() {
430  flush_event_->Set();
431  file_writer_thread_->Stop();
432}
433
434}  // namespace webrtc
435