native_messaging_reader.cc revision 010d83a9304c5a91596085d917d248abff47903a
1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "remoting/host/native_messaging/native_messaging_reader.h"
6
7#include <string>
8
9#include "base/bind.h"
10#include "base/files/file.h"
11#include "base/json/json_reader.h"
12#include "base/location.h"
13#include "base/sequenced_task_runner.h"
14#include "base/single_thread_task_runner.h"
15#include "base/stl_util.h"
16#include "base/thread_task_runner_handle.h"
17#include "base/threading/sequenced_worker_pool.h"
18#include "base/values.h"
19
20namespace {
21
22// uint32 is specified in the protocol as the type for the message header.
23typedef uint32 MessageLengthType;
24
25const int kMessageHeaderSize = sizeof(MessageLengthType);
26
27// Limit the size of received messages, to avoid excessive memory-allocation in
28// this process, and potential overflow issues when casting to a signed 32-bit
29// int.
30const MessageLengthType kMaximumMessageSize = 1024 * 1024;
31
32}  // namespace
33
34namespace remoting {
35
36class NativeMessagingReader::Core {
37 public:
38  Core(base::File file,
39       scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner,
40       scoped_refptr<base::SequencedTaskRunner> read_task_runner,
41       base::WeakPtr<NativeMessagingReader> reader_);
42  ~Core();
43
44  // Reads a message from the Native Messaging client and passes it to
45  // |message_callback_| on the originating thread. Called on the reader thread.
46  void ReadMessage();
47
48 private:
49  // Notify the reader's EOF callback when an error occurs or EOF is reached.
50  void NotifyEof();
51
52  base::File read_stream_;
53
54  base::WeakPtr<NativeMessagingReader> reader_;
55
56  // Used to post the caller-supplied reader callbacks on the caller thread.
57  scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_;
58
59  // Used to DCHECK that the reader code executes on the correct thread.
60  scoped_refptr<base::SequencedTaskRunner> read_task_runner_;
61
62  DISALLOW_COPY_AND_ASSIGN(Core);
63};
64
65NativeMessagingReader::Core::Core(
66    base::File file,
67    scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner,
68    scoped_refptr<base::SequencedTaskRunner> read_task_runner,
69    base::WeakPtr<NativeMessagingReader> reader)
70    : read_stream_(file.Pass()),
71      reader_(reader),
72      caller_task_runner_(caller_task_runner),
73      read_task_runner_(read_task_runner) {
74}
75
76NativeMessagingReader::Core::~Core() {}
77
78void NativeMessagingReader::Core::ReadMessage() {
79  DCHECK(read_task_runner_->RunsTasksOnCurrentThread());
80
81  // Keep reading messages until the stream is closed or an error occurs.
82  while (true) {
83    MessageLengthType message_length;
84    int read_result = read_stream_.ReadAtCurrentPos(
85        reinterpret_cast<char*>(&message_length), kMessageHeaderSize);
86    if (read_result != kMessageHeaderSize) {
87      // 0 means EOF which is normal and should not be logged as an error.
88      if (read_result != 0) {
89        LOG(ERROR) << "Failed to read message header, read returned "
90                   << read_result;
91      }
92      NotifyEof();
93      return;
94    }
95
96    if (message_length > kMaximumMessageSize) {
97      LOG(ERROR) << "Message size too large: " << message_length;
98      NotifyEof();
99      return;
100    }
101
102    std::string message_json(message_length, '\0');
103    read_result = read_stream_.ReadAtCurrentPos(string_as_array(&message_json),
104                                                message_length);
105    if (read_result != static_cast<int>(message_length)) {
106      LOG(ERROR) << "Failed to read message body, read returned "
107                 << read_result;
108      NotifyEof();
109      return;
110    }
111
112    scoped_ptr<base::Value> message(base::JSONReader::Read(message_json));
113    if (!message) {
114      LOG(ERROR) << "Failed to parse JSON message: " << message;
115      NotifyEof();
116      return;
117    }
118
119    // Notify callback of new message.
120    caller_task_runner_->PostTask(
121        FROM_HERE, base::Bind(&NativeMessagingReader::InvokeMessageCallback,
122                              reader_, base::Passed(&message)));
123  }
124}
125
126void NativeMessagingReader::Core::NotifyEof() {
127  DCHECK(read_task_runner_->RunsTasksOnCurrentThread());
128  caller_task_runner_->PostTask(
129      FROM_HERE,
130      base::Bind(&NativeMessagingReader::InvokeEofCallback, reader_));
131}
132
133NativeMessagingReader::NativeMessagingReader(base::File file)
134    : reader_thread_("Reader"),
135      weak_factory_(this) {
136  reader_thread_.Start();
137  read_task_runner_ = reader_thread_.message_loop_proxy();
138  core_.reset(new Core(file.Pass(), base::ThreadTaskRunnerHandle::Get(),
139                       read_task_runner_, weak_factory_.GetWeakPtr()));
140}
141
142NativeMessagingReader::~NativeMessagingReader() {
143  read_task_runner_->DeleteSoon(FROM_HERE, core_.release());
144}
145
146void NativeMessagingReader::Start(MessageCallback message_callback,
147                                  base::Closure eof_callback) {
148  message_callback_ = message_callback;
149  eof_callback_ = eof_callback;
150
151  // base::Unretained is safe since |core_| is only deleted via the
152  // DeleteSoon task which is posted from this class's dtor.
153  read_task_runner_->PostTask(
154      FROM_HERE, base::Bind(&NativeMessagingReader::Core::ReadMessage,
155                            base::Unretained(core_.get())));
156}
157
158void NativeMessagingReader::InvokeMessageCallback(
159    scoped_ptr<base::Value> message) {
160  message_callback_.Run(message.Pass());
161}
162
163void NativeMessagingReader::InvokeEofCallback() {
164  eof_callback_.Run();
165}
166
167}  // namespace remoting
168