rpc_handler.cc revision 1320f92c476a1ad9d19dba2a48c72b75566198e9
1// Copyright 2014 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "components/copresence/rpc/rpc_handler.h"
6
7#include <map>
8
9#include "base/bind.h"
10#include "base/command_line.h"
11#include "base/guid.h"
12#include "base/logging.h"
13#include "base/strings/string_util.h"
14
15// TODO(ckehoe): time.h includes windows.h, which #defines DeviceCapabilities
16// to DeviceCapabilitiesW. This breaks the pb.h headers below. For now,
17// we fix this with an #undef.
18#include "base/time/time.h"
19#if defined(OS_WIN)
20#undef DeviceCapabilities
21#endif
22
23#include "components/copresence/copresence_switches.h"
24#include "components/copresence/handlers/directive_handler.h"
25#include "components/copresence/proto/codes.pb.h"
26#include "components/copresence/proto/data.pb.h"
27#include "components/copresence/proto/rpcs.pb.h"
28#include "components/copresence/public/copresence_delegate.h"
29#include "net/http/http_status_code.h"
30
31// TODO(ckehoe): Return error messages for bad requests.
32
33namespace copresence {
34
35using google::protobuf::MessageLite;
36using google::protobuf::RepeatedPtrField;
37
38const char RpcHandler::kReportRequestRpcName[] = "report";
39
40namespace {
41
42// UrlSafe is defined as:
43// '/' represented by a '_' and '+' represented by a '-'
44// TODO(rkc): Move this to the wrapper.
45std::string ToUrlSafe(std::string token) {
46  base::ReplaceChars(token, "+", "-", &token);
47  base::ReplaceChars(token, "/", "_", &token);
48  return token;
49}
50
51const int kInvalidTokenExpiryTimeMs = 10 * 60 * 1000;  // 10 minutes.
52const int kMaxInvalidTokens = 10000;
53const char kRegisterDeviceRpcName[] = "registerdevice";
54const char kDefaultCopresenceServer[] =
55    "https://www.googleapis.com/copresence/v2/copresence";
56
57// Logging
58
59// Checks for a copresence error. If there is one, logs it and returns true.
60bool CopresenceErrorLogged(const Status& status) {
61  if (status.code() != OK) {
62    LOG(ERROR) << "Copresence error code " << status.code()
63               << (status.message().empty() ? std::string() :
64                  ": " + status.message());
65  }
66  return status.code() != OK;
67}
68
69void LogIfErrorStatus(const util::error::Code& code,
70                      const std::string& context) {
71  LOG_IF(ERROR, code != util::error::OK)
72      << context << " error " << code << ". See "
73      << "cs/google3/util/task/codes.proto for more info.";
74}
75
76// If any errors occurred, logs them and returns true.
77bool ReportErrorLogged(const ReportResponse& response) {
78  bool result = CopresenceErrorLogged(response.header().status());
79
80  // The Report fails or succeeds as a unit. If any responses had errors,
81  // the header will too. Thus we don't need to propagate individual errors.
82  if (response.has_update_signals_response())
83    LogIfErrorStatus(response.update_signals_response().status(), "Update");
84  if (response.has_manage_messages_response())
85    LogIfErrorStatus(response.manage_messages_response().status(), "Publish");
86  if (response.has_manage_subscriptions_response()) {
87    LogIfErrorStatus(response.manage_subscriptions_response().status(),
88                     "Subscribe");
89  }
90
91  return result;
92}
93
94// Request construction
95// TODO(ckehoe): Move these into a separate file?
96
97template <typename T>
98BroadcastScanConfiguration GetBroadcastScanConfig(const T& msg) {
99  if (msg.has_token_exchange_strategy() &&
100      msg.token_exchange_strategy().has_broadcast_scan_configuration()) {
101    return msg.token_exchange_strategy().broadcast_scan_configuration();
102  }
103  return BROADCAST_SCAN_CONFIGURATION_UNKNOWN;
104}
105
106scoped_ptr<DeviceState> GetDeviceCapabilities(const ReportRequest& request) {
107  scoped_ptr<DeviceState> state(new DeviceState);
108
109  TokenTechnology* ultrasound =
110      state->mutable_capabilities()->add_token_technology();
111  ultrasound->set_medium(AUDIO_ULTRASOUND_PASSBAND);
112  ultrasound->add_instruction_type(TRANSMIT);
113  ultrasound->add_instruction_type(RECEIVE);
114
115  TokenTechnology* audible =
116      state->mutable_capabilities()->add_token_technology();
117  audible->set_medium(AUDIO_AUDIBLE_DTMF);
118  audible->add_instruction_type(TRANSMIT);
119  audible->add_instruction_type(RECEIVE);
120
121  return state.Pass();
122}
123
124// TODO(ckehoe): We're keeping this code in a separate function for now
125// because we get a version string from Chrome, but the proto expects
126// an int64 version. We should probably change the version proto
127// to handle a more detailed version.
128ClientVersion* CreateVersion(const std::string& client,
129                             const std::string& version_name) {
130  ClientVersion* version = new ClientVersion;
131
132  version->set_client(client);
133  version->set_version_name(version_name);
134
135  return version;
136}
137
138void AddTokenToRequest(ReportRequest* request, const AudioToken& token) {
139  TokenObservation* token_observation =
140      request->mutable_update_signals_request()->add_token_observation();
141  token_observation->set_token_id(ToUrlSafe(token.token));
142
143  TokenSignals* signals = token_observation->add_signals();
144  signals->set_medium(token.audible ? AUDIO_AUDIBLE_DTMF
145                                    : AUDIO_ULTRASOUND_PASSBAND);
146  signals->set_observed_time_millis(base::Time::Now().ToJsTime());
147}
148
149}  // namespace
150
151// Public methods
152
153RpcHandler::RpcHandler(CopresenceDelegate* delegate)
154    : delegate_(delegate),
155      invalid_audio_token_cache_(
156          base::TimeDelta::FromMilliseconds(kInvalidTokenExpiryTimeMs),
157          kMaxInvalidTokens),
158      server_post_callback_(base::Bind(&RpcHandler::SendHttpPost,
159                                       base::Unretained(this))) {}
160
161RpcHandler::~RpcHandler() {
162  for (std::set<HttpPost*>::iterator post = pending_posts_.begin();
163       post != pending_posts_.end(); ++post) {
164    delete *post;
165  }
166
167  if (delegate_ && delegate_->GetWhispernetClient()) {
168    delegate_->GetWhispernetClient()->RegisterTokensCallback(
169        WhispernetClient::TokensCallback());
170  }
171}
172
173void RpcHandler::Initialize(const SuccessCallback& init_done_callback) {
174  scoped_ptr<RegisterDeviceRequest> request(new RegisterDeviceRequest);
175  DCHECK(device_id_.empty());
176
177  request->mutable_push_service()->set_service(PUSH_SERVICE_NONE);
178  Identity* identity =
179      request->mutable_device_identifiers()->mutable_registrant();
180  identity->set_type(CHROME);
181  identity->set_chrome_id(base::GenerateGUID());
182  SendServerRequest(
183      kRegisterDeviceRpcName,
184      std::string(),
185      request.Pass(),
186      base::Bind(&RpcHandler::RegisterResponseHandler,
187                 // On destruction, this request will be cancelled.
188                 base::Unretained(this),
189                 init_done_callback));
190}
191
192void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request) {
193  SendReportRequest(request.Pass(), std::string(), StatusCallback());
194}
195
196void RpcHandler::SendReportRequest(scoped_ptr<ReportRequest> request,
197                                   const std::string& app_id,
198                                   const StatusCallback& status_callback) {
199  DCHECK(request.get());
200  DCHECK(!device_id_.empty())
201      << "RpcHandler::Initialize() must complete successfully "
202      << "before other RpcHandler methods are called.";
203
204  DVLOG(3) << "Sending report request to server.";
205
206  // If we are unpublishing or unsubscribing, we need to stop those publish or
207  // subscribes right away, we don't need to wait for the server to tell us.
208  ProcessRemovedOperations(*request);
209
210  request->mutable_update_signals_request()->set_allocated_state(
211      GetDeviceCapabilities(*request).release());
212
213  AddPlayingTokens(request.get());
214
215  SendServerRequest(kReportRequestRpcName,
216                    app_id,
217                    request.Pass(),
218                    // On destruction, this request will be cancelled.
219                    base::Bind(&RpcHandler::ReportResponseHandler,
220                               base::Unretained(this),
221                               status_callback));
222}
223
224void RpcHandler::ReportTokens(const std::vector<AudioToken>& tokens) {
225  DCHECK(!tokens.empty());
226
227  scoped_ptr<ReportRequest> request(new ReportRequest);
228  for (size_t i = 0; i < tokens.size(); ++i) {
229    if (invalid_audio_token_cache_.HasKey(ToUrlSafe(tokens[i].token)))
230      continue;
231    DVLOG(3) << "Sending token " << tokens[i].token << " to server.";
232    AddTokenToRequest(request.get(), tokens[i]);
233  }
234  SendReportRequest(request.Pass());
235}
236
237void RpcHandler::ConnectToWhispernet() {
238  WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
239
240  // |directive_handler_| will be destructed with us, so unretained is safe.
241  directive_handler_.reset(new DirectiveHandler);
242  directive_handler_->Initialize(
243      base::Bind(&WhispernetClient::DecodeSamples,
244                 base::Unretained(whispernet_client)),
245      base::Bind(&RpcHandler::AudioDirectiveListToWhispernetConnector,
246                 base::Unretained(this)));
247
248  whispernet_client->RegisterTokensCallback(
249      base::Bind(&RpcHandler::ReportTokens,
250                 // On destruction, this callback will be disconnected.
251                 base::Unretained(this)));
252}
253
254// Private methods
255
256void RpcHandler::RegisterResponseHandler(
257    const SuccessCallback& init_done_callback,
258    HttpPost* completed_post,
259    int http_status_code,
260    const std::string& response_data) {
261  if (completed_post) {
262    int elements_erased = pending_posts_.erase(completed_post);
263    DCHECK(elements_erased);
264    delete completed_post;
265  }
266
267  if (http_status_code != net::HTTP_OK) {
268    init_done_callback.Run(false);
269    return;
270  }
271
272  RegisterDeviceResponse response;
273  if (!response.ParseFromString(response_data)) {
274    LOG(ERROR) << "Invalid RegisterDeviceResponse:\n" << response_data;
275    init_done_callback.Run(false);
276    return;
277  }
278
279  if (CopresenceErrorLogged(response.header().status()))
280    return;
281  device_id_ = response.registered_device_id();
282  DCHECK(!device_id_.empty());
283  DVLOG(2) << "Device registration successful: id " << device_id_;
284  init_done_callback.Run(true);
285}
286
287void RpcHandler::ReportResponseHandler(const StatusCallback& status_callback,
288                                       HttpPost* completed_post,
289                                       int http_status_code,
290                                       const std::string& response_data) {
291  if (completed_post) {
292    int elements_erased = pending_posts_.erase(completed_post);
293    DCHECK(elements_erased);
294    delete completed_post;
295  }
296
297  if (http_status_code != net::HTTP_OK) {
298    if (!status_callback.is_null())
299      status_callback.Run(FAIL);
300    return;
301  }
302
303  DVLOG(3) << "Received ReportResponse.";
304  ReportResponse response;
305  if (!response.ParseFromString(response_data)) {
306    LOG(ERROR) << "Invalid ReportResponse";
307    if (!status_callback.is_null())
308      status_callback.Run(FAIL);
309    return;
310  }
311
312  if (ReportErrorLogged(response)) {
313    if (!status_callback.is_null())
314      status_callback.Run(FAIL);
315    return;
316  }
317
318  const RepeatedPtrField<MessageResult>& message_results =
319      response.manage_messages_response().published_message_result();
320  for (int i = 0; i < message_results.size(); ++i) {
321    DVLOG(2) << "Published message with id "
322             << message_results.Get(i).published_message_id();
323  }
324
325  const RepeatedPtrField<SubscriptionResult>& subscription_results =
326      response.manage_subscriptions_response().subscription_result();
327  for (int i = 0; i < subscription_results.size(); ++i) {
328    DVLOG(2) << "Created subscription with id "
329             << subscription_results.Get(i).subscription_id();
330  }
331
332  if (response.has_update_signals_response()) {
333    const UpdateSignalsResponse& update_response =
334        response.update_signals_response();
335    DispatchMessages(update_response.message());
336
337    if (directive_handler_.get()) {
338      for (int i = 0; i < update_response.directive_size(); ++i)
339        directive_handler_->AddDirective(update_response.directive(i));
340    } else {
341      DVLOG(1) << "No directive handler.";
342    }
343
344    const RepeatedPtrField<Token>& tokens = update_response.token();
345    for (int i = 0; i < tokens.size(); ++i) {
346      switch (tokens.Get(i).status()) {
347        case VALID:
348          // TODO(rkc/ckehoe): Store the token in a |valid_token_cache_| with a
349          // short TTL (like 10s) and send it up with every report request.
350          // Then we'll still get messages while we're waiting to hear it again.
351          VLOG(1) << "Got valid token " << tokens.Get(i).id();
352          break;
353        case INVALID:
354          DVLOG(3) << "Discarding invalid token " << tokens.Get(i).id();
355          invalid_audio_token_cache_.Add(tokens.Get(i).id(), true);
356          break;
357        default:
358          DVLOG(2) << "Token " << tokens.Get(i).id() << " has status code "
359                   << tokens.Get(i).status();
360      }
361    }
362  }
363
364  // TODO(ckehoe): Return a more detailed status response.
365  if (!status_callback.is_null())
366    status_callback.Run(SUCCESS);
367}
368
369void RpcHandler::ProcessRemovedOperations(const ReportRequest& request) {
370  // Remove unpublishes.
371  if (request.has_manage_messages_request()) {
372    const RepeatedPtrField<std::string>& unpublishes =
373        request.manage_messages_request().id_to_unpublish();
374    for (int i = 0; i < unpublishes.size(); ++i)
375      directive_handler_->RemoveDirectives(unpublishes.Get(i));
376  }
377
378  // Remove unsubscribes.
379  if (request.has_manage_subscriptions_request()) {
380    const RepeatedPtrField<std::string>& unsubscribes =
381        request.manage_subscriptions_request().id_to_unsubscribe();
382    for (int i = 0; i < unsubscribes.size(); ++i)
383      directive_handler_->RemoveDirectives(unsubscribes.Get(i));
384  }
385}
386
387void RpcHandler::AddPlayingTokens(ReportRequest* request) {
388  if (!directive_handler_)
389    return;
390
391  const std::string& audible_token = directive_handler_->CurrentAudibleToken();
392  const std::string& inaudible_token =
393      directive_handler_->CurrentInaudibleToken();
394
395  if (!audible_token.empty())
396    AddTokenToRequest(request, AudioToken(audible_token, true));
397  if (!inaudible_token.empty())
398    AddTokenToRequest(request, AudioToken(inaudible_token, false));
399}
400
401void RpcHandler::DispatchMessages(
402    const RepeatedPtrField<SubscribedMessage>& messages) {
403  if (messages.size() == 0)
404    return;
405
406  // Index the messages by subscription id.
407  std::map<std::string, std::vector<Message>> messages_by_subscription;
408  DVLOG(3) << "Dispatching " << messages.size() << " messages";
409  for (int m = 0; m < messages.size(); ++m) {
410    const RepeatedPtrField<std::string>& subscription_ids =
411        messages.Get(m).subscription_id();
412    for (int s = 0; s < subscription_ids.size(); ++s) {
413      messages_by_subscription[subscription_ids.Get(s)].push_back(
414          messages.Get(m).published_message());
415    }
416  }
417
418  // Send the messages for each subscription.
419  for (std::map<std::string, std::vector<Message>>::const_iterator
420           subscription = messages_by_subscription.begin();
421       subscription != messages_by_subscription.end();
422       ++subscription) {
423    // TODO(ckehoe): Once we have the app ID from the server, we need to pass
424    // it in here and get rid of the app id registry from the main API class.
425    delegate_->HandleMessages("", subscription->first, subscription->second);
426  }
427}
428
429RequestHeader* RpcHandler::CreateRequestHeader(
430    const std::string& client_name) const {
431  RequestHeader* header = new RequestHeader;
432
433  header->set_allocated_framework_version(CreateVersion(
434      "Chrome", delegate_->GetPlatformVersionString()));
435  if (!client_name.empty()) {
436    header->set_allocated_client_version(
437        CreateVersion(client_name, std::string()));
438  }
439  header->set_current_time_millis(base::Time::Now().ToJsTime());
440  header->set_registered_device_id(device_id_);
441
442  DeviceFingerprint* fingerprint = new DeviceFingerprint;
443  fingerprint->set_platform_version(delegate_->GetPlatformVersionString());
444  fingerprint->set_type(CHROME_PLATFORM_TYPE);
445  header->set_allocated_device_fingerprint(fingerprint);
446
447  return header;
448}
449
450template <class T>
451void RpcHandler::SendServerRequest(
452    const std::string& rpc_name,
453    const std::string& app_id,
454    scoped_ptr<T> request,
455    const PostCleanupCallback& response_handler) {
456  request->set_allocated_header(CreateRequestHeader(app_id));
457  server_post_callback_.Run(delegate_->GetRequestContext(),
458                            rpc_name,
459                            make_scoped_ptr<MessageLite>(request.release()),
460                            response_handler);
461}
462
463void RpcHandler::SendHttpPost(net::URLRequestContextGetter* url_context_getter,
464                              const std::string& rpc_name,
465                              scoped_ptr<MessageLite> request_proto,
466                              const PostCleanupCallback& callback) {
467  // Create the base URL to call.
468  CommandLine* command_line = CommandLine::ForCurrentProcess();
469  const std::string copresence_server_host =
470      command_line->HasSwitch(switches::kCopresenceServer) ?
471      command_line->GetSwitchValueASCII(switches::kCopresenceServer) :
472      kDefaultCopresenceServer;
473
474  // Create the request and keep a pointer until it completes.
475  HttpPost* http_post = new HttpPost(
476      url_context_getter,
477      copresence_server_host,
478      rpc_name,
479      command_line->GetSwitchValueASCII(switches::kCopresenceTracingToken),
480      delegate_->GetAPIKey(),
481      *request_proto);
482
483  http_post->Start(base::Bind(callback, http_post));
484  pending_posts_.insert(http_post);
485}
486
487void RpcHandler::AudioDirectiveListToWhispernetConnector(
488    const std::string& token,
489    bool audible,
490    const WhispernetClient::SamplesCallback& samples_callback) {
491  WhispernetClient* whispernet_client = delegate_->GetWhispernetClient();
492  if (whispernet_client) {
493    whispernet_client->RegisterSamplesCallback(samples_callback);
494    whispernet_client->EncodeToken(token, audible);
495  }
496}
497
498}  // namespace copresence
499