1/*
2 * libjingle
3 * Copyright 2006, Google Inc.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 *  1. Redistributions of source code must retain the above copyright notice,
9 *     this list of conditions and the following disclaimer.
10 *  2. Redistributions in binary form must reproduce the above copyright notice,
11 *     this list of conditions and the following disclaimer in the documentation
12 *     and/or other materials provided with the distribution.
13 *  3. The name of the author may not be used to endorse or promote products
14 *     derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include <iostream>
29#include "libjingleplus.h"
30#ifdef WIN32
31#include "talk/base/win32socketserver.h"
32#endif
33#include "talk/base/physicalsocketserver.h"
34#include "talk/base/logging.h"
35#include "talk/examples/login/xmppauth.h"
36#include "talk/examples/login/xmppsocket.h"
37#include "talk/examples/login/xmpppump.h"
38#include "presencepushtask.h"
39#include "talk/app/status.h"
40#include "talk/app/message.h"
41#include "rostertask.h"
42#include "talk/app/iqtask.h"
43#include "talk/app/presenceouttask.h"
44#include "talk/app/receivemessagetask.h"
45#include "talk/app/rostersettask.h"
46#include "talk/app/sendmessagetask.h"
47
48enum {
49  MSG_START,
50
51  // main thread to worker
52  MSG_LOGIN,
53  MSG_DISCONNECT,
54  MSG_SEND_PRESENCE,
55  MSG_SEND_DIRECTED_PRESENCE,
56  MSG_SEND_DIRECTED_MUC_PRESENCE,
57  MSG_SEND_XMPP_MESSAGE,
58  MSG_SEND_XMPP_IQ,
59  MSG_UPDATE_ROSTER_ITEM,
60  MSG_REMOVE_ROSTER_ITEM,
61
62  // worker thread to main thread
63  MSG_STATE_CHANGE,
64  MSG_STATUS_UPDATE,
65  MSG_STATUS_ERROR,
66  MSG_ROSTER_REFRESH_STARTED,
67  MSG_ROSTER_REFRESH_FINISHED,
68  MSG_ROSTER_ITEM_UPDATED,
69  MSG_ROSTER_ITEM_REMOVED,
70  MSG_ROSTER_SUBSCRIBE,
71  MSG_ROSTER_UNSUBSCRIBE,
72  MSG_ROSTER_SUBSCRIBED,
73  MSG_ROSTER_UNSUBSCRIBED,
74  MSG_INCOMING_MESSAGE,
75  MSG_IQ_COMPLETE,
76  MSG_XMPP_INPUT,
77  MSG_XMPP_OUTPUT
78};
79
80class LibjinglePlusWorker : public talk_base::MessageHandler,
81			    public XmppPumpNotify,
82                            public sigslot::has_slots<> {
83 public:
84  LibjinglePlusWorker(LibjinglePlus *ljp, LibjinglePlusNotify *notify) :
85    worker_thread_(NULL), ljp_(ljp), notify_(notify),
86    ppt_(NULL), rmt_(NULL), rt_(NULL), is_test_login_(false) {
87
88    main_thread_.reset(new talk_base::AutoThread());
89#ifdef WIN32
90    ss_.reset(new talk_base::Win32SocketServer(main_thread_.get()));
91    main_thread_->set_socketserver(ss_.get());
92#endif
93
94    pump_.reset(new XmppPump(this));
95
96    pump_->client()->SignalLogInput.connect(this, &LibjinglePlusWorker::OnInputDebug);
97    pump_->client()->SignalLogOutput.connect(this, &LibjinglePlusWorker::OnOutputDebug);
98    //pump_->client()->SignalStateChange.connect(this, &LibjinglePlusWorker::OnStateChange);
99    }
100
101  ~LibjinglePlusWorker() {
102    if (worker_thread_) {
103      worker_thread_->Send(this, MSG_DISCONNECT);
104      delete worker_thread_;
105    }
106  }
107
108  virtual void OnMessage(talk_base::Message *msg) {
109    switch (msg->message_id) {
110    case MSG_START:
111      LoginW();
112      break;
113    case MSG_DISCONNECT:
114      DisconnectW();
115      break;
116    case MSG_SEND_XMPP_MESSAGE:
117      SendXmppMessageW(static_cast<SendMessageData*>(msg->pdata)->m_);
118      delete msg->pdata;
119      break;
120    case MSG_SEND_XMPP_IQ:
121      SendXmppIqW(static_cast<SendIqData*>(msg->pdata)->to_jid_,
122                  static_cast<SendIqData*>(msg->pdata)->is_get_,
123                  static_cast<SendIqData*>(msg->pdata)->xml_element_);
124      delete msg->pdata;
125      break;
126    case MSG_SEND_PRESENCE:
127      SendPresenceW(static_cast<SendPresenceData*>(msg->pdata)->s_);
128      delete msg->pdata;
129      break;
130    case MSG_SEND_DIRECTED_PRESENCE:
131      SendDirectedPresenceW(static_cast<SendDirectedPresenceData*>(msg->pdata)->j_,
132			    static_cast<SendDirectedPresenceData*>(msg->pdata)->s_);
133      delete msg->pdata;
134      break;
135    case MSG_SEND_DIRECTED_MUC_PRESENCE:
136      SendDirectedMUCPresenceW(static_cast<SendDirectedMUCPresenceData*>(msg->pdata)->j_,
137			       static_cast<SendDirectedMUCPresenceData*>(msg->pdata)->s_,
138			       static_cast<SendDirectedMUCPresenceData*>(msg->pdata)->un_,
139			       static_cast<SendDirectedMUCPresenceData*>(msg->pdata)->ac_,
140			       static_cast<SendDirectedMUCPresenceData*>(msg->pdata)->am_,
141			       static_cast<SendDirectedMUCPresenceData*>(msg->pdata)->role_);
142      delete msg->pdata;
143      break;
144    case MSG_UPDATE_ROSTER_ITEM:
145      UpdateRosterItemW(static_cast<UpdateRosterItemData*>(msg->pdata)->jid_,
146			static_cast<UpdateRosterItemData*>(msg->pdata)->n_,
147			static_cast<UpdateRosterItemData*>(msg->pdata)->g_,
148			static_cast<UpdateRosterItemData*>(msg->pdata)->grt_);
149      delete msg->pdata;
150      break;
151    case MSG_REMOVE_ROSTER_ITEM:
152      RemoveRosterItemW(static_cast<JidData*>(msg->pdata)->jid_);
153      delete msg->pdata;
154      break;
155
156
157
158
159    case MSG_STATUS_UPDATE:
160      OnStatusUpdateW(static_cast<SendPresenceData*>(msg->pdata)->s_);
161      delete msg->pdata;
162      break;
163    case MSG_STATUS_ERROR:
164      OnStatusErrorW(static_cast<StatusErrorData*>(msg->pdata)->stanza_);
165      delete msg->pdata;
166      break;
167    case MSG_STATE_CHANGE:
168      OnStateChangeW(static_cast<StateChangeData*>(msg->pdata)->s_);
169      delete msg->pdata;
170      break;
171    case MSG_ROSTER_REFRESH_STARTED:
172      OnRosterRefreshStartedW();
173      break;
174    case MSG_ROSTER_REFRESH_FINISHED:
175      OnRosterRefreshFinishedW();
176      break;
177    case MSG_ROSTER_ITEM_UPDATED:
178      OnRosterItemUpdatedW(static_cast<RosterItemData*>(msg->pdata)->ri_);
179      delete msg->pdata;
180      break;
181    case MSG_ROSTER_ITEM_REMOVED:
182      OnRosterItemRemovedW(static_cast<RosterItemData*>(msg->pdata)->ri_);
183      delete msg->pdata;
184      break;
185    case MSG_ROSTER_SUBSCRIBE:
186      OnRosterSubscribeW(static_cast<JidData*>(msg->pdata)->jid_);
187      delete msg->pdata;
188      break;
189    case MSG_ROSTER_UNSUBSCRIBE:
190      OnRosterUnsubscribeW(static_cast<JidData*>(msg->pdata)->jid_);
191      delete msg->pdata;
192      break;
193    case MSG_ROSTER_SUBSCRIBED:
194      OnRosterSubscribedW(static_cast<JidData*>(msg->pdata)->jid_);
195      delete msg->pdata;
196      break;
197    case MSG_ROSTER_UNSUBSCRIBED:
198      OnRosterUnsubscribedW(static_cast<JidData*>(msg->pdata)->jid_);
199      delete msg->pdata;
200      break;
201    case MSG_INCOMING_MESSAGE:
202      OnIncomingMessageW(static_cast<XmppMessageData*>(msg->pdata)->m_);
203      delete msg->pdata;
204      break;
205    case MSG_IQ_COMPLETE:
206      OnIqCompleteW(static_cast<IqCompleteData*>(msg->pdata)->success_,
207                    static_cast<IqCompleteData*>(msg->pdata)->stanza_);
208      delete msg->pdata;
209      break;
210    case MSG_XMPP_OUTPUT:
211      OnOutputDebugW(static_cast<StringData*>(msg->pdata)->s_);
212      delete msg->pdata;
213      break;
214    case MSG_XMPP_INPUT:
215      OnInputDebugW(static_cast<StringData*>(msg->pdata)->s_);
216      delete msg->pdata;
217      break;
218    }
219  }
220
221  void Login(const std::string &jid, const std::string &password,
222	     const std::string &machine_address, bool is_test, bool cookie_auth) {
223    is_test_login_ = is_test;
224
225    xcs_.set_user(jid);
226    if (cookie_auth) {
227	    xcs_.set_auth_cookie(password);
228    } else {
229	    talk_base::InsecureCryptStringImpl pass;
230	    pass.password() = password;
231	    xcs_.set_pass(talk_base::CryptString(pass));
232    }
233    xcs_.set_host(is_test ? "google.com" : "gmail.com");
234    xcs_.set_resource("libjingleplus");
235    xcs_.set_server(talk_base::SocketAddress(machine_address, 5222));
236    xcs_.set_use_tls(!is_test);
237    if (is_test) {
238      xcs_.set_allow_plain(true);
239    }
240
241    worker_thread_ = new talk_base::Thread(&pss_);
242    worker_thread_->Start();
243    worker_thread_->Send(this, MSG_START);
244  }
245
246  void SendXmppMessage(const buzz::XmppMessage &m) {
247    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
248    worker_thread_->Post(this, MSG_SEND_XMPP_MESSAGE, new SendMessageData(m));
249  }
250
251  void SendXmppIq(const buzz::Jid &to_jid, bool is_get,
252                  const buzz::XmlElement *xml_element) {
253    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
254    worker_thread_->Post(this, MSG_SEND_XMPP_IQ,
255                         new SendIqData(to_jid, is_get, xml_element));
256  }
257
258  void SendPresence(const buzz::Status & s) {
259    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
260    worker_thread_->Post(this, MSG_SEND_PRESENCE, new SendPresenceData(s));
261  }
262
263  void SendDirectedPresence (const buzz::Jid &j, const buzz::Status &s) {
264    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
265    worker_thread_->Post(this, MSG_SEND_DIRECTED_PRESENCE, new SendDirectedPresenceData(j,s));
266  }
267
268  void SendDirectedMUCPresence(const buzz::Jid &j, const buzz::Status &s,
269			       const std::string &un, const std::string &ac,
270			       const std::string &am, const std::string &role) {
271    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
272    worker_thread_->Post(this, MSG_SEND_DIRECTED_MUC_PRESENCE, new SendDirectedMUCPresenceData(j,s,un,ac,am, role));
273  }
274
275  void UpdateRosterItem(const buzz::Jid & jid, const std::string & name,
276			const std::vector<std::string> & groups, buzz::GrType grt) {
277    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
278    worker_thread_->Post(this, MSG_UPDATE_ROSTER_ITEM, new UpdateRosterItemData(jid,name,groups,grt));
279  }
280
281  void RemoveRosterItemW(const buzz::Jid &jid) {
282    buzz::RosterSetTask *rst = new buzz::RosterSetTask(pump_.get()->client());
283    rst->Remove(jid);
284    rst->Start();
285  }
286
287  void RemoveRosterItem(const buzz::Jid &jid) {
288    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
289    worker_thread_->Post(this, MSG_REMOVE_ROSTER_ITEM, new JidData(jid));
290  }
291
292  void DoCallbacks() {
293    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
294    talk_base::Message m;
295    while (main_thread_->Get(&m, 0)) {
296      main_thread_->Dispatch(&m);
297    }
298  }
299
300 private:
301
302  struct UpdateRosterItemData : public talk_base::MessageData {
303    UpdateRosterItemData(const buzz::Jid &jid, const std::string &name,
304			 const std::vector<std::string> &groups, buzz::GrType grt) :
305      jid_(jid), n_(name), g_(groups), grt_(grt) {}
306    buzz::Jid jid_;
307    std::string n_;
308    std::vector<std::string> g_;
309    buzz::GrType grt_;
310  };
311
312  void UpdateRosterItemW(const buzz::Jid &jid, const std::string &name,
313			 const std::vector<std::string> &groups, buzz::GrType grt) {
314    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
315    buzz::RosterSetTask *rst = new buzz::RosterSetTask(pump_.get()->client());
316    rst->Update(jid, name, groups, grt);
317    rst->Start();
318  }
319
320  struct StringData : public talk_base::MessageData {
321    StringData(std::string s) : s_(s) {}
322    std::string s_;
323  };
324
325  void OnInputDebugW(const std::string &data) {
326    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
327    if (notify_)
328      notify_->OnXmppInput(data);
329  }
330
331  void OnInputDebug(const char *data, int len) {
332    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
333    main_thread_->Post(this, MSG_XMPP_INPUT, new StringData(std::string(data,len)));
334    if (notify_)
335      notify_->WakeupMainThread();
336  }
337
338  void OnOutputDebugW(const std::string &data) {
339    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
340    if (notify_)
341      notify_->OnXmppOutput(data);
342  }
343
344  void OnOutputDebug(const char *data, int len) {
345    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
346    main_thread_->Post(this, MSG_XMPP_OUTPUT, new StringData(std::string(data,len)));
347    if (notify_)
348      notify_->WakeupMainThread();
349  }
350
351  struct StateChangeData : public talk_base::MessageData {
352    StateChangeData(buzz::XmppEngine::State state) : s_(state) {}
353    buzz::XmppEngine::State s_;
354  };
355
356  void OnStateChange(buzz::XmppEngine::State state) {
357    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
358    switch (state) {
359    case buzz::XmppEngine::STATE_OPEN:
360      ppt_ = new buzz::PresencePushTask(pump_.get()->client());
361      ppt_->SignalStatusUpdate.connect(this,
362                                       &LibjinglePlusWorker::OnStatusUpdate);
363      ppt_->SignalStatusError.connect(this,
364                                      &LibjinglePlusWorker::OnStatusError);
365      ppt_->Start();
366
367      rmt_ = new buzz::ReceiveMessageTask(pump_.get()->client(), buzz::XmppEngine::HL_ALL);
368      rmt_->SignalIncomingMessage.connect(this, &LibjinglePlusWorker::OnIncomingMessage);
369      rmt_->Start();
370
371      rt_ = new buzz::RosterTask(pump_.get()->client());
372      rt_->SignalRosterItemUpdated.connect(this, &LibjinglePlusWorker::OnRosterItemUpdated);
373      rt_->SignalRosterItemRemoved.connect(this, &LibjinglePlusWorker::OnRosterItemRemoved);
374      rt_->SignalSubscribe.connect(this, &LibjinglePlusWorker::OnRosterSubscribe);
375      rt_->SignalUnsubscribe.connect(this, &LibjinglePlusWorker::OnRosterUnsubscribe);
376      rt_->SignalSubscribed.connect(this, &LibjinglePlusWorker::OnRosterSubscribed);
377      rt_->SignalUnsubscribed.connect(this, &LibjinglePlusWorker::OnRosterUnsubscribed);
378      rt_->SignalRosterRefreshStarted.connect(this, &LibjinglePlusWorker::OnRosterRefreshStarted);
379      rt_->SignalRosterRefreshFinished.connect(this, &LibjinglePlusWorker::OnRosterRefreshFinished);
380      rt_->Start();
381      rt_->RefreshRosterNow();
382
383      break;
384    }
385    main_thread_->Post(this, MSG_STATE_CHANGE, new StateChangeData(state));
386    if (notify_)
387      notify_->WakeupMainThread();
388  }
389
390  void OnStateChangeW(buzz::XmppEngine::State state) {
391    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
392    if (notify_)
393      notify_->OnStateChange(state);
394  }
395
396  struct RosterItemData : public talk_base::MessageData {
397    RosterItemData(const buzz::RosterItem &ri) : ri_(ri) {}
398    buzz::RosterItem ri_;
399  };
400
401  void OnRosterItemUpdatedW(const buzz::RosterItem &ri) {
402    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
403    if (notify_)
404      notify_->OnRosterItemUpdated(ri);
405  }
406
407  void OnRosterItemUpdated(const buzz::RosterItem &ri, bool huh) {
408    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
409    main_thread_->Post(this, MSG_ROSTER_ITEM_UPDATED, new RosterItemData(ri));
410    if (notify_)
411      notify_->WakeupMainThread();
412  }
413
414  void OnRosterItemRemovedW(const buzz::RosterItem &ri) {
415    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
416    if (notify_)
417      notify_->OnRosterItemRemoved(ri);
418  }
419
420  void OnRosterItemRemoved(const buzz::RosterItem &ri) {
421    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
422    main_thread_->Post(this, MSG_ROSTER_ITEM_REMOVED, new RosterItemData(ri));
423    if (notify_)
424      notify_->WakeupMainThread();
425  }
426
427  struct JidData : public talk_base::MessageData {
428    JidData(const buzz::Jid& jid) : jid_(jid) {}
429    const buzz::Jid jid_;
430  };
431
432  void OnRosterSubscribeW(const buzz::Jid& jid) {
433    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
434    if (notify_)
435      notify_->OnRosterSubscribe(jid);
436  }
437
438  void OnRosterSubscribe(const buzz::Jid& jid) {
439    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
440    main_thread_->Post(this, MSG_ROSTER_SUBSCRIBE, new JidData(jid));
441    if (notify_)
442      notify_->WakeupMainThread();
443  }
444
445  void OnRosterUnsubscribeW(const buzz::Jid &jid) {
446    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
447    if (notify_)
448      notify_->OnRosterUnsubscribe(jid);
449  }
450
451  void OnRosterUnsubscribe(const buzz::Jid &jid) {
452    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
453    main_thread_->Post(this, MSG_ROSTER_UNSUBSCRIBE, new JidData(jid));
454    if (notify_)
455      notify_->WakeupMainThread();
456  }
457
458  void OnRosterSubscribedW(const buzz::Jid &jid) {
459    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
460    if (notify_)
461      notify_->OnRosterSubscribed(jid);
462  }
463
464  void OnRosterSubscribed(const buzz::Jid &jid) {
465    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
466    main_thread_->Post(this, MSG_ROSTER_SUBSCRIBED, new JidData(jid));
467    if (notify_)
468      notify_->WakeupMainThread();
469  }
470
471  void OnRosterUnsubscribedW(const buzz::Jid &jid) {
472    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
473    if (notify_)
474      notify_->OnRosterUnsubscribed(jid);
475  }
476
477  void OnRosterUnsubscribed(const buzz::Jid &jid) {
478    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
479    main_thread_->Post(this, MSG_ROSTER_UNSUBSCRIBED, new JidData(jid));
480    if (notify_)
481      notify_->WakeupMainThread();
482  }
483
484  void OnRosterRefreshStartedW() {
485    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
486    if (notify_)
487      notify_->OnRosterRefreshStarted();
488  }
489
490  void OnRosterRefreshStarted() {
491    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
492    main_thread_->Post(this, MSG_ROSTER_REFRESH_STARTED);
493    if (notify_)
494      notify_->WakeupMainThread();
495  }
496
497  void OnRosterRefreshFinishedW() {
498    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
499    if (notify_)
500      notify_->OnRosterRefreshFinished();
501  }
502
503  void OnRosterRefreshFinished() {
504    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
505    main_thread_->Post(this, MSG_ROSTER_REFRESH_FINISHED);
506    if (notify_)
507      notify_->WakeupMainThread();
508  }
509
510  struct XmppMessageData : talk_base::MessageData {
511    XmppMessageData(const buzz::XmppMessage &m) : m_(m) {}
512    buzz::XmppMessage m_;
513  };
514
515  void OnIncomingMessageW(const buzz::XmppMessage &msg) {
516    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
517    if (notify_)
518      notify_->OnMessage(msg);
519  }
520
521  void OnIncomingMessage(const buzz::XmppMessage &msg) {
522    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
523    main_thread_->Post(this, MSG_INCOMING_MESSAGE, new XmppMessageData(msg));
524    if (notify_)
525      notify_->WakeupMainThread();
526  }
527
528  void OnStatusUpdateW (const buzz::Status &status) {
529    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
530    if (notify_)
531      notify_->OnStatusUpdate(status);
532  }
533
534  void OnStatusUpdate (const buzz::Status &status) {
535    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
536    main_thread_->Post(this, MSG_STATUS_UPDATE, new SendPresenceData(status));
537    if (notify_)
538      notify_->WakeupMainThread();
539  }
540
541  struct StatusErrorData : talk_base::MessageData {
542    StatusErrorData(const buzz::XmlElement &stanza) : stanza_(stanza) {}
543    buzz::XmlElement stanza_;
544  };
545
546  void OnStatusErrorW (const buzz::XmlElement &stanza) {
547    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
548    if (notify_)
549      notify_->OnStatusError(stanza);
550  }
551
552  void OnStatusError (const buzz::XmlElement &stanza) {
553    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
554    main_thread_->Post(this, MSG_STATUS_ERROR, new StatusErrorData(stanza));
555    if (notify_)
556      notify_->WakeupMainThread();
557  }
558
559  void LoginW() {
560    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
561    XmppSocket* socket = new XmppSocket(true);
562    pump_->DoLogin(xcs_, socket, is_test_login_ ?  NULL : new XmppAuth());
563    socket->SignalCloseEvent.connect(this,
564        &LibjinglePlusWorker::OnXmppSocketClose);
565  }
566
567  void DisconnectW() {
568    assert(talk_base::ThreadManager::CurrentThread() == worker_thread_);
569    pump_->DoDisconnect();
570  }
571
572  void SendXmppMessageW(const buzz::XmppMessage &m) {
573    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
574    buzz::SendMessageTask * smt = new buzz::SendMessageTask(pump_.get()->client());
575    smt->Send(m);
576    smt->Start();
577  }
578
579  void SendXmppIqW(const buzz::Jid &to_jid, bool is_get,
580                   const buzz::XmlElement *xml_element) {
581    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
582    buzz::IqTask *iq_task = new buzz::IqTask(pump_.get()->client(),
583        is_get, to_jid, const_cast<buzz::XmlElement *>(xml_element));
584    iq_task->SignalDone.connect(this, &LibjinglePlusWorker::OnIqComplete);
585    iq_task->Start();
586  }
587
588 struct IqCompleteData : public talk_base::MessageData {
589   IqCompleteData(bool success, const buzz::XmlElement *stanza) :
590     success_(success), stanza_(*stanza) {}
591   bool success_;
592   buzz::XmlElement stanza_;
593 };
594
595  void OnIqCompleteW(bool success, const buzz::XmlElement& stanza) {
596    assert(talk_base::ThreadManager::CurrentThread() != worker_thread_);
597    if (notify_)
598      notify_->OnIqDone(success, stanza);
599  }
600
601  void OnIqComplete(bool success, const buzz::XmlElement *stanza) {
602    assert(talk_base::ThreadManager::CurrentThread() == worker_thread_);
603    main_thread_->Post(this, MSG_IQ_COMPLETE,
604       new IqCompleteData(success, stanza));
605    if (notify_)
606      notify_->WakeupMainThread();
607  }
608
609  void SendPresenceW(const buzz::Status & s) {
610    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
611    buzz::PresenceOutTask *pot = new buzz::PresenceOutTask(pump_.get()->client());
612    pot->Send(s);
613    pot->Start();
614  }
615
616
617  void SendDirectedMUCPresenceW(const buzz::Jid & j, const buzz::Status & s,
618			       const std::string &user_nick, const std::string &api_capability,
619			       const std::string &api_message, const std::string &role) {
620    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
621    buzz::PresenceOutTask *pot = new buzz::PresenceOutTask(pump_.get()->client());
622    pot->SendDirectedMUC(j,s,user_nick,api_capability,api_message, role);
623    pot->Start();
624  }
625
626  void SendDirectedPresenceW(const buzz::Jid & j, const buzz::Status & s) {
627    assert (talk_base::ThreadManager::CurrentThread() == worker_thread_);
628    buzz::PresenceOutTask *pot = new buzz::PresenceOutTask(pump_.get()->client());
629    pot->SendDirected(j,s);
630    pot->Start();
631  }
632
633  void OnXmppSocketClose(int error) {
634    notify_->OnSocketClose(error);
635  }
636
637 struct SendMessageData : public talk_base::MessageData {
638   SendMessageData(const buzz::XmppMessage &m) : m_(m) {}
639   buzz::XmppMessage m_;
640  };
641
642 struct SendIqData : public talk_base::MessageData {
643   SendIqData(const buzz::Jid &jid, bool is_get, const buzz::XmlElement *m)
644     : to_jid_(jid), is_get_(is_get), xml_element_(m) {}
645   buzz::Jid to_jid_;
646   bool is_get_;
647   const buzz::XmlElement *xml_element_;
648  };
649
650 struct SendPresenceData : public talk_base::MessageData {
651   SendPresenceData(const buzz::Status &s) : s_(s) {}
652   buzz::Status s_;
653  };
654
655 struct SendDirectedPresenceData : public talk_base::MessageData {
656   SendDirectedPresenceData(const buzz::Jid &j, const buzz::Status &s) : j_(j), s_(s) {}
657   buzz::Jid j_;
658   buzz::Status s_;
659 };
660
661  struct SendDirectedMUCPresenceData : public talk_base::MessageData {
662    SendDirectedMUCPresenceData(const buzz::Jid &j, const buzz::Status &s,
663				const std::string &un, const std::string &ac,
664				const std::string &am, const std::string &role)
665      : j_(j), s_(s), un_(un), ac_(ac), am_(am), role_(role) {}
666    buzz::Jid j_;
667    buzz::Status s_;
668    std::string un_;
669    std::string ac_;
670    std::string am_;
671    std::string role_;
672  };
673
674  talk_base::scoped_ptr<talk_base::Win32SocketServer> ss_;
675  talk_base::scoped_ptr<talk_base::Thread> main_thread_;
676  talk_base::Thread *worker_thread_;
677
678  LibjinglePlus *ljp_;
679  LibjinglePlusNotify *notify_;
680  buzz::XmppClientSettings xcs_;
681  talk_base::PhysicalSocketServer pss_;
682
683  talk_base::scoped_ptr<XmppPump> pump_;
684  buzz::PresencePushTask * ppt_;
685  buzz::ReceiveMessageTask * rmt_;
686  buzz::RosterTask * rt_;
687
688  bool is_test_login_;
689};
690
691LibjinglePlus::LibjinglePlus(LibjinglePlusNotify *notify)
692{
693  worker_ = new LibjinglePlusWorker(this, notify);
694}
695
696LibjinglePlus::~LibjinglePlus()
697{
698 delete worker_;
699  worker_ = NULL;
700}
701
702void LibjinglePlus::Login(const std::string &jid,
703		          const std::string &password,
704		          const std::string &machine_address,
705			  bool is_test, bool cookie_auth) {
706  worker_->Login(jid, password, machine_address, is_test, cookie_auth);
707}
708
709void LibjinglePlus::SendPresence(const buzz::Status & s) {
710  worker_->SendPresence(s);
711}
712
713void LibjinglePlus::SendDirectedPresence(const buzz::Jid & j, const buzz::Status & s) {
714  worker_->SendDirectedPresence(j,s);
715}
716
717void LibjinglePlus::SendDirectedMUCPresence(const buzz::Jid & j,
718    const buzz::Status & s, const std::string &user_nick,
719    const std::string &api_capability, const std::string &api_message,
720    const std::string &role) {
721  worker_->SendDirectedMUCPresence(j,s,user_nick,api_capability,api_message,
722      role);
723}
724
725void LibjinglePlus::SendXmppMessage(const buzz::XmppMessage & m) {
726  worker_->SendXmppMessage(m);
727}
728
729void LibjinglePlus::SendXmppIq(const buzz::Jid &to_jid, bool is_get,
730                               const buzz::XmlElement *iq_element) {
731  worker_->SendXmppIq(to_jid, is_get, iq_element);
732}
733
734void LibjinglePlus::DoCallbacks() {
735  worker_->DoCallbacks();
736}
737