1/*
2 *  Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "data_log.h"
12
13#include <assert.h>
14
15#include <algorithm>
16#include <list>
17
18#include "critical_section_wrapper.h"
19#include "event_wrapper.h"
20#include "file_wrapper.h"
21#include "rw_lock_wrapper.h"
22#include "thread_wrapper.h"
23
24namespace webrtc {
25
26DataLogImpl::CritSectScopedPtr DataLogImpl::crit_sect_(
27  CriticalSectionWrapper::CreateCriticalSection());
28
29DataLogImpl* DataLogImpl::instance_ = NULL;
30
31// A Row contains cells, which are indexed by the column names as std::string.
32// The string index is treated in a case sensitive way.
33class Row {
34 public:
35  Row();
36  ~Row();
37
38  // Inserts a Container into the cell of the column specified with
39  // column_name.
40  // column_name is treated in a case sensitive way.
41  int InsertCell(const std::string& column_name,
42                 const Container* value_container);
43
44  // Converts the value at the column specified by column_name to a string
45  // stored in value_string.
46  // column_name is treated in a case sensitive way.
47  void ToString(const std::string& column_name, std::string* value_string);
48
49 private:
50  // Collection of containers indexed by column name as std::string
51  typedef std::map<std::string, const Container*> CellMap;
52
53  CellMap                   cells_;
54  CriticalSectionWrapper*   cells_lock_;
55};
56
57// A LogTable contains multiple rows, where only the latest row is active for
58// editing. The rows are defined by the ColumnMap, which contains the name of
59// each column and the length of the column (1 for one-value-columns and greater
60// than 1 for multi-value-columns).
61class LogTable {
62 public:
63  LogTable();
64  ~LogTable();
65
66  // Adds the column with name column_name to the table. The column will be a
67  // multi-value-column if multi_value_length is greater than 1.
68  // column_name is treated in a case sensitive way.
69  int AddColumn(const std::string& column_name, int multi_value_length);
70
71  // Buffers the current row while it is waiting to be written to file,
72  // which is done by a call to Flush(). A new row is available when the
73  // function returns
74  void NextRow();
75
76  // Inserts a Container into the cell of the column specified with
77  // column_name.
78  // column_name is treated in a case sensitive way.
79  int InsertCell(const std::string& column_name,
80                 const Container* value_container);
81
82  // Creates a log file, named as specified in the string file_name, to
83  // where the table will be written when calling Flush().
84  int CreateLogFile(const std::string& file_name);
85
86  // Write all complete rows to file.
87  // May not be called by two threads simultaneously (doing so may result in
88  // a race condition). Will be called by the file_writer_thread_ when that
89  // thread is running.
90  void Flush();
91
92 private:
93  // Collection of multi_value_lengths indexed by column name as std::string
94  typedef std::map<std::string, int> ColumnMap;
95  typedef std::list<Row*> RowList;
96
97  ColumnMap               columns_;
98  RowList                 rows_[2];
99  RowList*                rows_history_;
100  RowList*                rows_flush_;
101  Row*                    current_row_;
102  FileWrapper*            file_;
103  bool                    write_header_;
104  CriticalSectionWrapper* table_lock_;
105};
106
107Row::Row()
108  : cells_(),
109    cells_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
110}
111
112Row::~Row() {
113  for (CellMap::iterator it = cells_.begin(); it != cells_.end();) {
114    delete it->second;
115    // For maps all iterators (except the erased) are valid after an erase
116    cells_.erase(it++);
117  }
118  delete cells_lock_;
119}
120
121int Row::InsertCell(const std::string& column_name,
122                    const Container* value_container) {
123  CriticalSectionScoped synchronize(cells_lock_);
124  assert(cells_.count(column_name) == 0);
125  if (cells_.count(column_name) > 0)
126    return -1;
127  cells_[column_name] = value_container;
128  return 0;
129}
130
131void Row::ToString(const std::string& column_name,
132                   std::string* value_string) {
133  CriticalSectionScoped synchronize(cells_lock_);
134  const Container* container = cells_[column_name];
135  if (container == NULL) {
136    *value_string = "NaN,";
137    return;
138  }
139  container->ToString(value_string);
140}
141
142LogTable::LogTable()
143  : columns_(),
144    rows_(),
145    rows_history_(&rows_[0]),
146    rows_flush_(&rows_[1]),
147    current_row_(new Row),
148    file_(FileWrapper::Create()),
149    write_header_(true),
150    table_lock_(CriticalSectionWrapper::CreateCriticalSection()) {
151}
152
153LogTable::~LogTable() {
154  for (RowList::iterator row_it = rows_history_->begin();
155       row_it != rows_history_->end();) {
156    delete *row_it;
157    row_it = rows_history_->erase(row_it);
158  }
159  for (ColumnMap::iterator col_it = columns_.begin();
160       col_it != columns_.end();) {
161    // For maps all iterators (except the erased) are valid after an erase
162    columns_.erase(col_it++);
163  }
164  if (file_ != NULL) {
165    file_->Flush();
166    file_->CloseFile();
167    delete file_;
168  }
169  delete current_row_;
170  delete table_lock_;
171}
172
173int LogTable::AddColumn(const std::string& column_name,
174                        int multi_value_length) {
175  assert(multi_value_length > 0);
176  if (!write_header_) {
177    // It's not allowed to add new columns after the header
178    // has been written.
179    assert(false);
180    return -1;
181  } else {
182    CriticalSectionScoped synchronize(table_lock_);
183    if (write_header_)
184      columns_[column_name] = multi_value_length;
185    else
186      return -1;
187  }
188  return 0;
189}
190
191void LogTable::NextRow() {
192  CriticalSectionScoped sync_rows(table_lock_);
193  rows_history_->push_back(current_row_);
194  current_row_ = new Row;
195}
196
197int LogTable::InsertCell(const std::string& column_name,
198                         const Container* value_container) {
199  CriticalSectionScoped synchronize(table_lock_);
200  assert(columns_.count(column_name) > 0);
201  if (columns_.count(column_name) == 0)
202    return -1;
203  return current_row_->InsertCell(column_name, value_container);
204}
205
206int LogTable::CreateLogFile(const std::string& file_name) {
207  if (file_name.length() == 0)
208    return -1;
209  if (file_->Open())
210    return -1;
211  file_->OpenFile(file_name.c_str(),
212                  false,  // Open with read/write permissions
213                  false,  // Don't wraparound and write at the beginning when
214                          // the file is full
215                  true);  // Open as a text file
216  if (file_ == NULL)
217    return -1;
218  return 0;
219}
220
221void LogTable::Flush() {
222  ColumnMap::iterator column_it;
223  bool commit_header = false;
224  if (write_header_) {
225    CriticalSectionScoped synchronize(table_lock_);
226    if (write_header_) {
227      commit_header = true;
228      write_header_ = false;
229    }
230  }
231  if (commit_header) {
232    for (column_it = columns_.begin();
233         column_it != columns_.end(); ++column_it) {
234      if (column_it->second > 1) {
235        file_->WriteText("%s[%u],", column_it->first.c_str(),
236                         column_it->second);
237        for (int i = 1; i < column_it->second; ++i)
238          file_->WriteText(",");
239      } else {
240        file_->WriteText("%s,", column_it->first.c_str());
241      }
242    }
243    if (columns_.size() > 0)
244      file_->WriteText("\n");
245  }
246
247  // Swap the list used for flushing with the list containing the row history
248  // and clear the history. We also create a local pointer to the new
249  // list used for flushing to avoid race conditions if another thread
250  // calls this function while we are writing.
251  // We don't want to block the list while we're writing to file.
252  {
253    CriticalSectionScoped synchronize(table_lock_);
254    RowList* tmp = rows_flush_;
255    rows_flush_ = rows_history_;
256    rows_history_ = tmp;
257    rows_history_->clear();
258  }
259
260  // Write all complete rows to file and delete them
261  for (RowList::iterator row_it = rows_flush_->begin();
262       row_it != rows_flush_->end();) {
263    for (column_it = columns_.begin();
264         column_it != columns_.end(); ++column_it) {
265      std::string row_string;
266      (*row_it)->ToString(column_it->first, &row_string);
267      file_->WriteText("%s", row_string.c_str());
268    }
269    if (columns_.size() > 0)
270      file_->WriteText("\n");
271    delete *row_it;
272    row_it = rows_flush_->erase(row_it);
273  }
274}
275
276int DataLog::CreateLog() {
277  return DataLogImpl::CreateLog();
278}
279
280void DataLog::ReturnLog() {
281  return DataLogImpl::ReturnLog();
282}
283
284std::string DataLog::Combine(const std::string& table_name, int table_id) {
285  std::stringstream ss;
286  std::string combined_id = table_name;
287  std::string number_suffix;
288  ss << "_" << table_id;
289  ss >> number_suffix;
290  combined_id += number_suffix;
291  std::transform(combined_id.begin(), combined_id.end(), combined_id.begin(),
292                 ::tolower);
293  return combined_id;
294}
295
296int DataLog::AddTable(const std::string& table_name) {
297  DataLogImpl* data_log = DataLogImpl::StaticInstance();
298  if (data_log == NULL)
299    return -1;
300  return data_log->AddTable(table_name);
301}
302
303int DataLog::AddColumn(const std::string& table_name,
304                       const std::string& column_name,
305                       int multi_value_length) {
306  DataLogImpl* data_log = DataLogImpl::StaticInstance();
307  if (data_log == NULL)
308    return -1;
309  return data_log->DataLogImpl::StaticInstance()->AddColumn(table_name,
310                                                            column_name,
311                                                            multi_value_length);
312}
313
314int DataLog::NextRow(const std::string& table_name) {
315  DataLogImpl* data_log = DataLogImpl::StaticInstance();
316  if (data_log == NULL)
317    return -1;
318  return data_log->DataLogImpl::StaticInstance()->NextRow(table_name);
319}
320
321DataLogImpl::DataLogImpl()
322  : counter_(1),
323    tables_(),
324    flush_event_(EventWrapper::Create()),
325    file_writer_thread_(NULL),
326    tables_lock_(RWLockWrapper::CreateRWLock()) {
327}
328
329DataLogImpl::~DataLogImpl() {
330  StopThread();
331  Flush();  // Write any remaining rows
332  delete file_writer_thread_;
333  delete flush_event_;
334  for (TableMap::iterator it = tables_.begin(); it != tables_.end();) {
335    delete static_cast<LogTable*>(it->second);
336    // For maps all iterators (except the erased) are valid after an erase
337    tables_.erase(it++);
338  }
339  delete tables_lock_;
340}
341
342int DataLogImpl::CreateLog() {
343  CriticalSectionScoped synchronize(crit_sect_.get());
344  if (instance_ == NULL) {
345    instance_ = new DataLogImpl();
346    return instance_->Init();
347  } else {
348    ++instance_->counter_;
349  }
350  return 0;
351}
352
353int DataLogImpl::Init() {
354  file_writer_thread_ = ThreadWrapper::CreateThread(
355                          DataLogImpl::Run,
356                          instance_,
357                          kHighestPriority,
358                          "DataLog");
359  if (file_writer_thread_ == NULL)
360    return -1;
361  unsigned int thread_id = 0;
362  bool success = file_writer_thread_->Start(thread_id);
363  if (!success)
364    return -1;
365  return 0;
366}
367
368DataLogImpl* DataLogImpl::StaticInstance() {
369  return instance_;
370}
371
372void DataLogImpl::ReturnLog() {
373  CriticalSectionScoped synchronize(crit_sect_.get());
374  if (instance_ && instance_->counter_ > 1) {
375    --instance_->counter_;
376    return;
377  }
378  delete instance_;
379  instance_ = NULL;
380}
381
382int DataLogImpl::AddTable(const std::string& table_name) {
383  WriteLockScoped synchronize(*tables_lock_);
384  // Make sure we don't add a table which already exists
385  if (tables_.count(table_name) > 0)
386    return -1;
387  tables_[table_name] = new LogTable();
388  if (tables_[table_name]->CreateLogFile(table_name + ".txt") == -1)
389    return -1;
390  return 0;
391}
392
393int DataLogImpl::AddColumn(const std::string& table_name,
394                           const std::string& column_name,
395                           int multi_value_length) {
396  ReadLockScoped synchronize(*tables_lock_);
397  if (tables_.count(table_name) == 0)
398    return -1;
399  return tables_[table_name]->AddColumn(column_name, multi_value_length);
400}
401
402int DataLogImpl::InsertCell(const std::string& table_name,
403                            const std::string& column_name,
404                            const Container* value_container) {
405  ReadLockScoped synchronize(*tables_lock_);
406  assert(tables_.count(table_name) > 0);
407  if (tables_.count(table_name) == 0)
408    return -1;
409  return tables_[table_name]->InsertCell(column_name, value_container);
410}
411
412int DataLogImpl::NextRow(const std::string& table_name) {
413  ReadLockScoped synchronize(*tables_lock_);
414  if (tables_.count(table_name) == 0)
415    return -1;
416  tables_[table_name]->NextRow();
417  if (file_writer_thread_ == NULL) {
418    // Write every row to file as they get complete.
419    tables_[table_name]->Flush();
420  } else {
421    // Signal a complete row
422    flush_event_->Set();
423  }
424  return 0;
425}
426
427void DataLogImpl::Flush() {
428  ReadLockScoped synchronize(*tables_lock_);
429  for (TableMap::iterator it = tables_.begin(); it != tables_.end(); ++it) {
430    it->second->Flush();
431  }
432}
433
434bool DataLogImpl::Run(void* obj) {
435  static_cast<DataLogImpl*>(obj)->Process();
436  return true;
437}
438
439void DataLogImpl::Process() {
440  // Wait for a row to be complete
441  flush_event_->Wait(WEBRTC_EVENT_INFINITE);
442  Flush();
443}
444
445void DataLogImpl::StopThread() {
446  if (file_writer_thread_ != NULL) {
447    file_writer_thread_->SetNotAlive();
448    flush_event_->Set();
449    // Call Stop() repeatedly, waiting for the Flush() call in Process() to
450    // finish.
451    while (!file_writer_thread_->Stop()) continue;
452  }
453}
454
455}  // namespace webrtc
456