1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "google_apis/gcm/engine/mcs_client.h"
6
7#include <set>
8
9#include "base/basictypes.h"
10#include "base/message_loop/message_loop.h"
11#include "base/metrics/histogram.h"
12#include "base/strings/string_number_conversions.h"
13#include "base/time/clock.h"
14#include "base/time/time.h"
15#include "google_apis/gcm/base/mcs_util.h"
16#include "google_apis/gcm/base/socket_stream.h"
17#include "google_apis/gcm/engine/connection_factory.h"
18#include "google_apis/gcm/monitoring/gcm_stats_recorder.h"
19
20using namespace google::protobuf::io;
21
22namespace gcm {
23
24namespace {
25
26typedef scoped_ptr<google::protobuf::MessageLite> MCSProto;
27
28// The category of messages intended for the GCM client itself from MCS.
29const char kMCSCategory[] = "com.google.android.gsf.gtalkservice";
30
31// The from field for messages originating in the GCM client.
32const char kGCMFromField[] = "gcm@android.com";
33
34// MCS status message types.
35// TODO(zea): handle these at the GCMClient layer.
36const char kIdleNotification[] = "IdleNotification";
37// const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
38// const char kPowerNotification[] = "PowerNotification";
39// const char kDataActiveNotification[] = "DataActiveNotification";
40
41// The number of unacked messages to allow before sending a stream ack.
42// Applies to both incoming and outgoing messages.
43// TODO(zea): make this server configurable.
44const int kUnackedMessageBeforeStreamAck = 10;
45
46// The global maximum number of pending messages to have in the send queue.
47const size_t kMaxSendQueueSize = 10 * 1024;
48
49// The maximum message size that can be sent to the server.
50const int kMaxMessageBytes = 4 * 1024;  // 4KB, like the server.
51
52// Helper for converting a proto persistent id list to a vector of strings.
53bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes,
54                                    std::vector<std::string>* id_list) {
55  mcs_proto::SelectiveAck selective_ack;
56  if (!selective_ack.ParseFromString(bytes))
57    return false;
58  std::vector<std::string> new_list;
59  for (int i = 0; i < selective_ack.id_size(); ++i) {
60    DCHECK(!selective_ack.id(i).empty());
61    new_list.push_back(selective_ack.id(i));
62  }
63  id_list->swap(new_list);
64  return true;
65}
66
67}  // namespace
68
69class CollapseKey {
70 public:
71  explicit CollapseKey(const mcs_proto::DataMessageStanza& message);
72  ~CollapseKey();
73
74  // Comparison operator for use in maps.
75  bool operator<(const CollapseKey& right) const;
76
77  // Whether the message had a valid collapse key.
78  bool IsValid() const;
79
80  std::string token() const { return token_; }
81  std::string app_id() const { return app_id_; }
82  int64 device_user_id() const { return device_user_id_; }
83
84 private:
85  const std::string token_;
86  const std::string app_id_;
87  const int64 device_user_id_;
88};
89
90CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message)
91    : token_(message.token()),
92      app_id_(message.category()),
93      device_user_id_(message.device_user_id()) {}
94
95CollapseKey::~CollapseKey() {}
96
97bool CollapseKey::IsValid() const {
98  // Device user id is optional, but the application id and token are not.
99  return !token_.empty() && !app_id_.empty();
100}
101
102bool CollapseKey::operator<(const CollapseKey& right) const {
103  if (device_user_id_ != right.device_user_id())
104    return device_user_id_ < right.device_user_id();
105  if (app_id_ != right.app_id())
106    return app_id_ < right.app_id();
107  return token_ < right.token();
108}
109
110struct ReliablePacketInfo {
111  ReliablePacketInfo();
112  ~ReliablePacketInfo();
113
114  // The stream id with which the message was sent.
115  uint32 stream_id;
116
117  // If reliable delivery was requested, the persistent id of the message.
118  std::string persistent_id;
119
120  // The type of message itself (for easier lookup).
121  uint8 tag;
122
123  // The protobuf of the message itself.
124  MCSProto protobuf;
125};
126
127ReliablePacketInfo::ReliablePacketInfo()
128  : stream_id(0), tag(0) {
129}
130ReliablePacketInfo::~ReliablePacketInfo() {}
131
132int MCSClient::GetSendQueueSize() const {
133  return to_send_.size();
134}
135
136int MCSClient::GetResendQueueSize() const {
137  return to_resend_.size();
138}
139
140std::string MCSClient::GetStateString() const {
141  switch(state_) {
142    case UNINITIALIZED:
143      return "UNINITIALIZED";
144    case LOADED:
145      return "LOADED";
146    case CONNECTING:
147      return "CONNECTING";
148    case CONNECTED:
149      return "CONNECTED";
150    default:
151      NOTREACHED();
152      return std::string();
153  }
154}
155
156MCSClient::MCSClient(const std::string& version_string,
157                     base::Clock* clock,
158                     ConnectionFactory* connection_factory,
159                     GCMStore* gcm_store,
160                     GCMStatsRecorder* recorder)
161    : version_string_(version_string),
162      clock_(clock),
163      state_(UNINITIALIZED),
164      android_id_(0),
165      security_token_(0),
166      connection_factory_(connection_factory),
167      connection_handler_(NULL),
168      last_device_to_server_stream_id_received_(0),
169      last_server_to_device_stream_id_received_(0),
170      stream_id_out_(0),
171      stream_id_in_(0),
172      gcm_store_(gcm_store),
173      recorder_(recorder),
174      weak_ptr_factory_(this) {
175}
176
177MCSClient::~MCSClient() {
178}
179
180void MCSClient::Initialize(
181    const ErrorCallback& error_callback,
182    const OnMessageReceivedCallback& message_received_callback,
183    const OnMessageSentCallback& message_sent_callback,
184    scoped_ptr<GCMStore::LoadResult> load_result) {
185  DCHECK_EQ(state_, UNINITIALIZED);
186
187  state_ = LOADED;
188  mcs_error_callback_ = error_callback;
189  message_received_callback_ = message_received_callback;
190  message_sent_callback_ = message_sent_callback;
191
192  connection_factory_->Initialize(
193      base::Bind(&MCSClient::ResetStateAndBuildLoginRequest,
194                 weak_ptr_factory_.GetWeakPtr()),
195      base::Bind(&MCSClient::HandlePacketFromWire,
196                 weak_ptr_factory_.GetWeakPtr()),
197      base::Bind(&MCSClient::MaybeSendMessage,
198                 weak_ptr_factory_.GetWeakPtr()));
199  connection_handler_ = connection_factory_->GetConnectionHandler();
200
201  stream_id_out_ = 1;  // Login request is hardcoded to id 1.
202
203  android_id_ = load_result->device_android_id;
204  security_token_ = load_result->device_security_token;
205
206  if (android_id_ == 0) {
207    DVLOG(1) << "No device credentials found, assuming new client.";
208    // No need to try and load RMQ data in that case.
209    return;
210  }
211
212  // |android_id_| is non-zero, so should |security_token_|.
213  DCHECK_NE(0u, security_token_) << "Security token invalid, while android id"
214                                 << " is non-zero.";
215
216  DVLOG(1) << "RMQ Load finished with " << load_result->incoming_messages.size()
217           << " incoming acks pending and "
218           << load_result->outgoing_messages.size()
219           << " outgoing messages pending.";
220
221  restored_unackeds_server_ids_ = load_result->incoming_messages;
222
223  // First go through and order the outgoing messages by recency.
224  std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
225  std::vector<PersistentId> expired_ttl_ids;
226  for (GCMStore::OutgoingMessageMap::iterator iter =
227           load_result->outgoing_messages.begin();
228       iter != load_result->outgoing_messages.end(); ++iter) {
229    uint64 timestamp = 0;
230    if (!base::StringToUint64(iter->first, &timestamp)) {
231      LOG(ERROR) << "Invalid restored message.";
232      // TODO(fgorski): Error: data unreadable
233      mcs_error_callback_.Run();
234      return;
235    }
236
237    // Check if the TTL has expired for this message.
238    if (HasTTLExpired(*iter->second, clock_)) {
239      expired_ttl_ids.push_back(iter->first);
240      NotifyMessageSendStatus(*iter->second, TTL_EXCEEDED);
241      continue;
242    }
243
244    ordered_messages[timestamp] = iter->second.release();
245  }
246
247  if (!expired_ttl_ids.empty()) {
248    gcm_store_->RemoveOutgoingMessages(
249        expired_ttl_ids,
250        base::Bind(&MCSClient::OnGCMUpdateFinished,
251                   weak_ptr_factory_.GetWeakPtr()));
252  }
253
254  // Now go through and add the outgoing messages to the send queue in their
255  // appropriate order (oldest at front, most recent at back).
256  for (std::map<uint64, google::protobuf::MessageLite*>::iterator
257           iter = ordered_messages.begin();
258       iter != ordered_messages.end(); ++iter) {
259    ReliablePacketInfo* packet_info = new ReliablePacketInfo();
260    packet_info->protobuf.reset(iter->second);
261    packet_info->tag = GetMCSProtoTag(*iter->second);
262    packet_info->persistent_id = base::Uint64ToString(iter->first);
263    to_send_.push_back(make_linked_ptr(packet_info));
264
265    if (packet_info->tag == kDataMessageStanzaTag) {
266      mcs_proto::DataMessageStanza* data_message =
267          reinterpret_cast<mcs_proto::DataMessageStanza*>(
268              packet_info->protobuf.get());
269      CollapseKey collapse_key(*data_message);
270      if (collapse_key.IsValid())
271        collapse_key_map_[collapse_key] = packet_info;
272    }
273  }
274}
275
276void MCSClient::Login(uint64 android_id, uint64 security_token) {
277  DCHECK_EQ(state_, LOADED);
278  DCHECK(android_id_ == 0 || android_id_ == android_id);
279  DCHECK(security_token_ == 0 || security_token_ == security_token);
280
281  if (android_id != android_id_ && security_token != security_token_) {
282    DCHECK(android_id);
283    DCHECK(security_token);
284    android_id_ = android_id;
285    security_token_ = security_token;
286  }
287
288  DCHECK(android_id_ != 0 || restored_unackeds_server_ids_.empty());
289
290  state_ = CONNECTING;
291  connection_factory_->Connect();
292}
293
294void MCSClient::SendMessage(const MCSMessage& message) {
295  int ttl = GetTTL(message.GetProtobuf());
296  DCHECK_GE(ttl, 0);
297  if (to_send_.size() > kMaxSendQueueSize) {
298    NotifyMessageSendStatus(message.GetProtobuf(), QUEUE_SIZE_LIMIT_REACHED);
299    return;
300  }
301  if (message.size() > kMaxMessageBytes) {
302    NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE);
303    return;
304  }
305
306  scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo());
307  packet_info->tag = message.tag();
308  packet_info->protobuf = message.CloneProtobuf();
309
310  if (ttl > 0) {
311    DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
312
313    // First check if this message should replace a pending message with the
314    // same collapse key.
315    mcs_proto::DataMessageStanza* data_message =
316        reinterpret_cast<mcs_proto::DataMessageStanza*>(
317            packet_info->protobuf.get());
318    CollapseKey collapse_key(*data_message);
319    if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) {
320      ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key];
321      DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
322               << original_packet->persistent_id;
323      original_packet->protobuf = packet_info->protobuf.Pass();
324      SetPersistentId(original_packet->persistent_id,
325                      original_packet->protobuf.get());
326      gcm_store_->OverwriteOutgoingMessage(
327          original_packet->persistent_id,
328          message,
329          base::Bind(&MCSClient::OnGCMUpdateFinished,
330                     weak_ptr_factory_.GetWeakPtr()));
331
332      // The message is already queued, return.
333      return;
334    } else {
335      PersistentId persistent_id = GetNextPersistentId();
336      DVLOG(1) << "Setting persistent id to " << persistent_id;
337      packet_info->persistent_id = persistent_id;
338      SetPersistentId(persistent_id, packet_info->protobuf.get());
339      if (!gcm_store_->AddOutgoingMessage(
340               persistent_id,
341               MCSMessage(message.tag(), *(packet_info->protobuf)),
342               base::Bind(&MCSClient::OnGCMUpdateFinished,
343                          weak_ptr_factory_.GetWeakPtr()))) {
344        NotifyMessageSendStatus(message.GetProtobuf(),
345                                APP_QUEUE_SIZE_LIMIT_REACHED);
346        return;
347      }
348    }
349
350    if (collapse_key.IsValid())
351      collapse_key_map_[collapse_key] = packet_info.get();
352  } else if (!connection_factory_->IsEndpointReachable()) {
353    DVLOG(1) << "No active connection, dropping message.";
354    NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL);
355    return;
356  }
357
358  to_send_.push_back(make_linked_ptr(packet_info.release()));
359
360  // Notify that the messages has been succsfully queued for sending.
361  // TODO(jianli): We should report QUEUED after writing to GCM store succeeds.
362  NotifyMessageSendStatus(message.GetProtobuf(), QUEUED);
363
364  MaybeSendMessage();
365}
366
367void MCSClient::ResetStateAndBuildLoginRequest(
368    mcs_proto::LoginRequest* request) {
369  DCHECK(android_id_);
370  DCHECK(security_token_);
371  stream_id_in_ = 0;
372  stream_id_out_ = 1;
373  last_device_to_server_stream_id_received_ = 0;
374  last_server_to_device_stream_id_received_ = 0;
375
376  heartbeat_manager_.Stop();
377
378  // Add any pending acknowledgments to the list of ids.
379  for (StreamIdToPersistentIdMap::const_iterator iter =
380           unacked_server_ids_.begin();
381       iter != unacked_server_ids_.end(); ++iter) {
382    restored_unackeds_server_ids_.push_back(iter->second);
383  }
384  unacked_server_ids_.clear();
385
386  // Any acknowledged server ids which have not been confirmed by the server
387  // are treated like unacknowledged ids.
388  for (std::map<StreamId, PersistentIdList>::const_iterator iter =
389           acked_server_ids_.begin();
390       iter != acked_server_ids_.end(); ++iter) {
391    restored_unackeds_server_ids_.insert(restored_unackeds_server_ids_.end(),
392                                         iter->second.begin(),
393                                         iter->second.end());
394  }
395  acked_server_ids_.clear();
396
397  // Then build the request, consuming all pending acknowledgments.
398  request->Swap(BuildLoginRequest(android_id_,
399                                  security_token_,
400                                  version_string_).get());
401  for (PersistentIdList::const_iterator iter =
402           restored_unackeds_server_ids_.begin();
403       iter != restored_unackeds_server_ids_.end(); ++iter) {
404    request->add_received_persistent_id(*iter);
405  }
406  acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_;
407  restored_unackeds_server_ids_.clear();
408
409  // Push all unacknowledged messages to front of send queue. No need to save
410  // to RMQ, as all messages that reach this point should already have been
411  // saved as necessary.
412  while (!to_resend_.empty()) {
413    to_send_.push_front(to_resend_.back());
414    to_resend_.pop_back();
415  }
416
417  // Drop all TTL == 0 or expired TTL messages from the queue.
418  std::deque<MCSPacketInternal> new_to_send;
419  std::vector<PersistentId> expired_ttl_ids;
420  while (!to_send_.empty()) {
421    MCSPacketInternal packet = PopMessageForSend();
422    if (GetTTL(*packet->protobuf) > 0 &&
423        !HasTTLExpired(*packet->protobuf, clock_)) {
424      new_to_send.push_back(packet);
425    } else {
426      // If the TTL was 0 there is no persistent id, so no need to remove the
427      // message from the persistent store.
428      if (!packet->persistent_id.empty())
429        expired_ttl_ids.push_back(packet->persistent_id);
430      NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
431    }
432  }
433
434  if (!expired_ttl_ids.empty()) {
435    DVLOG(1) << "Connection reset, " << expired_ttl_ids.size()
436             << " messages expired.";
437    gcm_store_->RemoveOutgoingMessages(
438        expired_ttl_ids,
439        base::Bind(&MCSClient::OnGCMUpdateFinished,
440                   weak_ptr_factory_.GetWeakPtr()));
441  }
442
443  to_send_.swap(new_to_send);
444
445  DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
446           << " incoming acks pending, and " << to_send_.size()
447           << " pending outgoing messages.";
448
449  state_ = CONNECTING;
450}
451
452void MCSClient::SendHeartbeat() {
453  SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()));
454}
455
456void MCSClient::OnGCMUpdateFinished(bool success) {
457  LOG_IF(ERROR, !success) << "GCM Update failed!";
458  UMA_HISTOGRAM_BOOLEAN("GCM.StoreUpdateSucceeded", success);
459  // TODO(zea): Rebuild the store from scratch in case of persistence failure?
460}
461
462void MCSClient::MaybeSendMessage() {
463  if (to_send_.empty())
464    return;
465
466  // If the connection has been reset, do nothing. On reconnection
467  // MaybeSendMessage will be automatically invoked again.
468  // TODO(zea): consider doing TTL expiration at connection reset time, rather
469  // than reconnect time.
470  if (!connection_factory_->IsEndpointReachable())
471    return;
472
473  MCSPacketInternal packet = PopMessageForSend();
474  if (HasTTLExpired(*packet->protobuf, clock_)) {
475    DCHECK(!packet->persistent_id.empty());
476    DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
477    NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
478    gcm_store_->RemoveOutgoingMessage(
479        packet->persistent_id,
480        base::Bind(&MCSClient::OnGCMUpdateFinished,
481                   weak_ptr_factory_.GetWeakPtr()));
482    base::MessageLoop::current()->PostTask(
483            FROM_HERE,
484            base::Bind(&MCSClient::MaybeSendMessage,
485                       weak_ptr_factory_.GetWeakPtr()));
486    return;
487  }
488  DVLOG(1) << "Pending output message found, sending.";
489  if (!packet->persistent_id.empty())
490    to_resend_.push_back(packet);
491  SendPacketToWire(packet.get());
492}
493
494void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) {
495  packet_info->stream_id = ++stream_id_out_;
496  DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName();
497
498  // Set the queued time as necessary.
499  if (packet_info->tag == kDataMessageStanzaTag) {
500    mcs_proto::DataMessageStanza* data_message =
501        reinterpret_cast<mcs_proto::DataMessageStanza*>(
502            packet_info->protobuf.get());
503    uint64 sent = data_message->sent();
504    DCHECK_GT(sent, 0U);
505    int queued = (clock_->Now().ToInternalValue() /
506        base::Time::kMicrosecondsPerSecond) - sent;
507    DVLOG(1) << "Message was queued for " << queued << " seconds.";
508    data_message->set_queued(queued);
509    recorder_->RecordDataSentToWire(
510        data_message->category(),
511        data_message->to(),
512        data_message->id(),
513        queued);
514  }
515
516  // Set the proper last received stream id to acknowledge received server
517  // packets.
518  DVLOG(1) << "Setting last stream id received to "
519           << stream_id_in_;
520  SetLastStreamIdReceived(stream_id_in_,
521                          packet_info->protobuf.get());
522  if (stream_id_in_ != last_server_to_device_stream_id_received_) {
523    last_server_to_device_stream_id_received_ = stream_id_in_;
524    // Mark all acknowledged server messages as such. Note: they're not dropped,
525    // as it may be that they'll need to be re-acked if this message doesn't
526    // make it.
527    PersistentIdList persistent_id_list;
528    for (StreamIdToPersistentIdMap::const_iterator iter =
529             unacked_server_ids_.begin();
530         iter != unacked_server_ids_.end(); ++iter) {
531      DCHECK_LE(iter->first, last_server_to_device_stream_id_received_);
532      persistent_id_list.push_back(iter->second);
533    }
534    unacked_server_ids_.clear();
535    acked_server_ids_[stream_id_out_] = persistent_id_list;
536  }
537
538  connection_handler_->SendMessage(*packet_info->protobuf);
539}
540
541void MCSClient::HandleMCSDataMesssage(
542    scoped_ptr<google::protobuf::MessageLite> protobuf) {
543  mcs_proto::DataMessageStanza* data_message =
544      reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
545  // TODO(zea): implement a proper status manager rather than hardcoding these
546  // values.
547  scoped_ptr<mcs_proto::DataMessageStanza> response(
548      new mcs_proto::DataMessageStanza());
549  response->set_from(kGCMFromField);
550  response->set_sent(clock_->Now().ToInternalValue() /
551                         base::Time::kMicrosecondsPerSecond);
552  response->set_ttl(0);
553  bool send = false;
554  for (int i = 0; i < data_message->app_data_size(); ++i) {
555    const mcs_proto::AppData& app_data = data_message->app_data(i);
556    if (app_data.key() == kIdleNotification) {
557      // Tell the MCS server the client is not idle.
558      send = true;
559      mcs_proto::AppData data;
560      data.set_key(kIdleNotification);
561      data.set_value("false");
562      response->add_app_data()->CopyFrom(data);
563      response->set_category(kMCSCategory);
564    }
565  }
566
567  if (send) {
568    SendMessage(
569        MCSMessage(kDataMessageStanzaTag,
570                   response.PassAs<const google::protobuf::MessageLite>()));
571  }
572}
573
574void MCSClient::HandlePacketFromWire(
575    scoped_ptr<google::protobuf::MessageLite> protobuf) {
576  if (!protobuf.get())
577    return;
578  uint8 tag = GetMCSProtoTag(*protobuf);
579  PersistentId persistent_id = GetPersistentId(*protobuf);
580  StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf);
581
582  if (last_stream_id_received != 0) {
583    last_device_to_server_stream_id_received_ = last_stream_id_received;
584
585    // Process device to server messages that have now been acknowledged by the
586    // server. Because messages are stored in order, just pop off all that have
587    // a stream id lower than server's last received stream id.
588    HandleStreamAck(last_stream_id_received);
589
590    // Process server_to_device_messages that the server now knows were
591    // acknowledged. Again, they're in order, so just keep going until the
592    // stream id is reached.
593    StreamIdList acked_stream_ids_to_remove;
594    for (std::map<StreamId, PersistentIdList>::iterator iter =
595             acked_server_ids_.begin();
596         iter != acked_server_ids_.end() &&
597             iter->first <= last_stream_id_received; ++iter) {
598      acked_stream_ids_to_remove.push_back(iter->first);
599    }
600    for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin();
601         iter != acked_stream_ids_to_remove.end(); ++iter) {
602      acked_server_ids_.erase(*iter);
603    }
604  }
605
606  ++stream_id_in_;
607  if (!persistent_id.empty()) {
608    unacked_server_ids_[stream_id_in_] = persistent_id;
609    gcm_store_->AddIncomingMessage(persistent_id,
610                                   base::Bind(&MCSClient::OnGCMUpdateFinished,
611                                              weak_ptr_factory_.GetWeakPtr()));
612  }
613
614  DVLOG(1) << "Received message of type " << protobuf->GetTypeName()
615           << " with persistent id "
616           << (persistent_id.empty() ? "NULL" : persistent_id)
617           << ", stream id " << stream_id_in_ << " and last stream id received "
618           << last_stream_id_received;
619
620  if (unacked_server_ids_.size() > 0 &&
621      unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
622    SendMessage(MCSMessage(kIqStanzaTag,
623                           BuildStreamAck().
624                               PassAs<const google::protobuf::MessageLite>()));
625  }
626
627  // The connection is alive, treat this message as a heartbeat ack.
628  heartbeat_manager_.OnHeartbeatAcked();
629
630  switch (tag) {
631    case kLoginResponseTag: {
632      DCHECK_EQ(CONNECTING, state_);
633      mcs_proto::LoginResponse* login_response =
634          reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get());
635      DVLOG(1) << "Received login response:";
636      DVLOG(1) << "  Id: " << login_response->id();
637      DVLOG(1) << "  Timestamp: " << login_response->server_timestamp();
638      if (login_response->has_error() && login_response->error().code() != 0) {
639        state_ = UNINITIALIZED;
640        DVLOG(1) << "  Error code: " << login_response->error().code();
641        DVLOG(1) << "  Error message: " << login_response->error().message();
642        LOG(ERROR) << "Failed to log in to GCM, resetting connection.";
643        connection_factory_->SignalConnectionReset(
644            ConnectionFactory::LOGIN_FAILURE);
645        mcs_error_callback_.Run();
646        return;
647      }
648
649      if (login_response->has_heartbeat_config()) {
650        heartbeat_manager_.UpdateHeartbeatConfig(
651            login_response->heartbeat_config());
652      }
653
654      state_ = CONNECTED;
655      stream_id_in_ = 1;  // To account for the login response.
656      DCHECK_EQ(1U, stream_id_out_);
657
658      // Pass the login response on up.
659      base::MessageLoop::current()->PostTask(
660          FROM_HERE,
661          base::Bind(message_received_callback_,
662                     MCSMessage(tag,
663                                protobuf.PassAs<
664                                    const google::protobuf::MessageLite>())));
665
666      // If there are pending messages, attempt to send one.
667      if (!to_send_.empty()) {
668        base::MessageLoop::current()->PostTask(
669            FROM_HERE,
670            base::Bind(&MCSClient::MaybeSendMessage,
671                       weak_ptr_factory_.GetWeakPtr()));
672      }
673
674      heartbeat_manager_.Start(
675          base::Bind(&MCSClient::SendHeartbeat,
676                     weak_ptr_factory_.GetWeakPtr()),
677          base::Bind(&MCSClient::OnConnectionResetByHeartbeat,
678                     weak_ptr_factory_.GetWeakPtr()));
679      return;
680    }
681    case kHeartbeatPingTag:
682      DCHECK_GE(stream_id_in_, 1U);
683      DVLOG(1) << "Received heartbeat ping, sending ack.";
684      SendMessage(
685          MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()));
686      return;
687    case kHeartbeatAckTag:
688      DCHECK_GE(stream_id_in_, 1U);
689      DVLOG(1) << "Received heartbeat ack.";
690      // Do nothing else, all messages act as heartbeat acks.
691      return;
692    case kCloseTag:
693      LOG(ERROR) << "Received close command, resetting connection.";
694      state_ = LOADED;
695      connection_factory_->SignalConnectionReset(
696          ConnectionFactory::CLOSE_COMMAND);
697      return;
698    case kIqStanzaTag: {
699      DCHECK_GE(stream_id_in_, 1U);
700      mcs_proto::IqStanza* iq_stanza =
701          reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get());
702      const mcs_proto::Extension& iq_extension = iq_stanza->extension();
703      switch (iq_extension.id()) {
704        case kSelectiveAck: {
705          PersistentIdList acked_ids;
706          if (BuildPersistentIdListFromProto(iq_extension.data(),
707                                             &acked_ids)) {
708            HandleSelectiveAck(acked_ids);
709          }
710          return;
711        }
712        case kStreamAck:
713          // Do nothing. The last received stream id is always processed if it's
714          // present.
715          return;
716        default:
717          LOG(WARNING) << "Received invalid iq stanza extension "
718                       << iq_extension.id();
719          return;
720      }
721    }
722    case kDataMessageStanzaTag: {
723      DCHECK_GE(stream_id_in_, 1U);
724      mcs_proto::DataMessageStanza* data_message =
725          reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
726      if (data_message->category() == kMCSCategory) {
727        HandleMCSDataMesssage(protobuf.Pass());
728        return;
729      }
730
731      DCHECK(protobuf.get());
732      base::MessageLoop::current()->PostTask(
733          FROM_HERE,
734          base::Bind(message_received_callback_,
735                     MCSMessage(tag,
736                                protobuf.PassAs<
737                                    const google::protobuf::MessageLite>())));
738      return;
739    }
740    default:
741      LOG(ERROR) << "Received unexpected message of type "
742                 << static_cast<int>(tag);
743      return;
744  }
745}
746
747void MCSClient::HandleStreamAck(StreamId last_stream_id_received) {
748  PersistentIdList acked_outgoing_persistent_ids;
749  StreamIdList acked_outgoing_stream_ids;
750  while (!to_resend_.empty() &&
751         to_resend_.front()->stream_id <= last_stream_id_received) {
752    const MCSPacketInternal& outgoing_packet = to_resend_.front();
753    acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id);
754    acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id);
755    NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
756    to_resend_.pop_front();
757  }
758
759  DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size()
760           << " outgoing messages, " << to_resend_.size()
761           << " remaining unacked";
762  gcm_store_->RemoveOutgoingMessages(
763      acked_outgoing_persistent_ids,
764      base::Bind(&MCSClient::OnGCMUpdateFinished,
765                 weak_ptr_factory_.GetWeakPtr()));
766
767  HandleServerConfirmedReceipt(last_stream_id_received);
768}
769
770void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
771  std::set<PersistentId> remaining_ids(id_list.begin(), id_list.end());
772
773  StreamId last_stream_id_received = 0;
774
775  // First check the to_resend_ queue. Acknowledgments are always contiguous,
776  // so if there's a pending message that hasn't been acked, all newer messages
777  // must also be unacked.
778  while(!to_resend_.empty() && !remaining_ids.empty()) {
779    const MCSPacketInternal& outgoing_packet = to_resend_.front();
780    if (remaining_ids.count(outgoing_packet->persistent_id) == 0)
781      break;  // Newer message must be unacked too.
782    remaining_ids.erase(outgoing_packet->persistent_id);
783    NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
784
785    // No need to re-acknowledge any server messages this message already
786    // acknowledged.
787    StreamId device_stream_id = outgoing_packet->stream_id;
788    if (device_stream_id > last_stream_id_received)
789      last_stream_id_received = device_stream_id;
790    to_resend_.pop_front();
791  }
792
793  // If the acknowledged ids aren't all there, they might be in the to_send_
794  // queue (typically when a SelectiveAck confirms messages as part of a login
795  // response).
796  while (!to_send_.empty() && !remaining_ids.empty()) {
797    const MCSPacketInternal& outgoing_packet = to_send_.front();
798    if (remaining_ids.count(outgoing_packet->persistent_id) == 0)
799      break;  // Newer messages must be unacked too.
800    remaining_ids.erase(outgoing_packet->persistent_id);
801    NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
802
803    // No need to re-acknowledge any server messages this message already
804    // acknowledged.
805    StreamId device_stream_id = outgoing_packet->stream_id;
806    if (device_stream_id > last_stream_id_received)
807      last_stream_id_received = device_stream_id;
808    PopMessageForSend();
809  }
810
811  // Only handle the largest stream id value. All other stream ids are
812  // implicitly handled.
813  if (last_stream_id_received > 0)
814    HandleServerConfirmedReceipt(last_stream_id_received);
815
816  // At this point, all remaining acked ids are redundant.
817  PersistentIdList acked_ids;
818  if (remaining_ids.size() > 0) {
819    for (size_t i = 0; i < id_list.size(); ++i) {
820      if (remaining_ids.count(id_list[i]) > 0)
821        continue;
822      acked_ids.push_back(id_list[i]);
823    }
824  } else {
825    acked_ids = id_list;
826  }
827
828  DVLOG(1) << "Server acked " << acked_ids.size()
829           << " messages, " << to_resend_.size() << " remaining unacked.";
830  gcm_store_->RemoveOutgoingMessages(
831      acked_ids,
832      base::Bind(&MCSClient::OnGCMUpdateFinished,
833                 weak_ptr_factory_.GetWeakPtr()));
834
835  // Resend any remaining outgoing messages, as they were not received by the
836  // server.
837  DVLOG(1) << "Resending " << to_resend_.size() << " messages.";
838  while (!to_resend_.empty()) {
839    to_send_.push_front(to_resend_.back());
840    to_resend_.pop_back();
841  }
842  base::MessageLoop::current()->PostTask(
843      FROM_HERE,
844      base::Bind(&MCSClient::MaybeSendMessage,
845                 weak_ptr_factory_.GetWeakPtr()));
846}
847
848void MCSClient::HandleServerConfirmedReceipt(StreamId device_stream_id) {
849  PersistentIdList acked_incoming_ids;
850  for (std::map<StreamId, PersistentIdList>::iterator iter =
851           acked_server_ids_.begin();
852       iter != acked_server_ids_.end() &&
853           iter->first <= device_stream_id;) {
854    acked_incoming_ids.insert(acked_incoming_ids.end(),
855                              iter->second.begin(),
856                              iter->second.end());
857    acked_server_ids_.erase(iter++);
858  }
859
860  DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size()
861           << " acknowledged server messages.";
862  gcm_store_->RemoveIncomingMessages(
863      acked_incoming_ids,
864      base::Bind(&MCSClient::OnGCMUpdateFinished,
865                 weak_ptr_factory_.GetWeakPtr()));
866}
867
868MCSClient::PersistentId MCSClient::GetNextPersistentId() {
869  return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
870}
871
872void MCSClient::OnConnectionResetByHeartbeat() {
873  connection_factory_->SignalConnectionReset(
874      ConnectionFactory::HEARTBEAT_FAILURE);
875}
876
877void MCSClient::NotifyMessageSendStatus(
878    const google::protobuf::MessageLite& protobuf,
879    MessageSendStatus status) {
880  if (GetMCSProtoTag(protobuf) != kDataMessageStanzaTag)
881    return;
882
883  const mcs_proto::DataMessageStanza* data_message_stanza =
884      reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf);
885  recorder_->RecordNotifySendStatus(
886      data_message_stanza->category(),
887      data_message_stanza->to(),
888      data_message_stanza->id(),
889      status,
890      protobuf.ByteSize(),
891      data_message_stanza->ttl());
892  message_sent_callback_.Run(
893      data_message_stanza->device_user_id(),
894      data_message_stanza->category(),
895      data_message_stanza->id(),
896      status);
897}
898
899MCSClient::MCSPacketInternal MCSClient::PopMessageForSend() {
900  MCSPacketInternal packet = to_send_.front();
901  to_send_.pop_front();
902
903  if (packet->tag == kDataMessageStanzaTag) {
904    mcs_proto::DataMessageStanza* data_message =
905        reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get());
906    CollapseKey collapse_key(*data_message);
907    if (collapse_key.IsValid())
908      collapse_key_map_.erase(collapse_key);
909  }
910
911  return packet;
912}
913
914} // namespace gcm
915