1// Copyright (c) 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5
6#include "nacl_io/ossocket.h"
7#ifdef PROVIDES_SOCKET_API
8
9#include <assert.h>
10#include <errno.h>
11#include <string.h>
12#include <algorithm>
13
14#include "nacl_io/kernel_handle.h"
15#include "nacl_io/mount_node_tcp.h"
16#include "nacl_io/mount_stream.h"
17#include "nacl_io/pepper_interface.h"
18
19namespace {
20  const size_t kMaxPacketSize = 65536;
21  const size_t kDefaultFifoSize = kMaxPacketSize * 8;
22}
23
24namespace nacl_io {
25
26class TCPWork : public MountStream::Work {
27 public:
28  explicit TCPWork(const ScopedEventEmitterTCP& emitter)
29      : MountStream::Work(emitter->stream()->mount_stream()),
30        emitter_(emitter),
31        data_(NULL) {
32  }
33
34  ~TCPWork() {
35    delete[] data_;
36  }
37
38  TCPSocketInterface* TCPInterface() {
39    return mount()->ppapi()->GetTCPSocketInterface();
40  }
41
42 protected:
43  ScopedEventEmitterTCP emitter_;
44  char* data_;
45};
46
47class TCPSendWork : public TCPWork {
48 public:
49  explicit TCPSendWork(const ScopedEventEmitterTCP& emitter,
50                       const ScopedMountNodeSocket& stream)
51      : TCPWork(emitter), node_(stream) {}
52
53  virtual bool Start(int32_t val) {
54    AUTO_LOCK(emitter_->GetLock());
55
56    // Does the stream exist, and can it send?
57    if (!node_->TestStreamFlags(SSF_CAN_SEND))
58      return false;
59
60    // Check if we are already sending.
61    if (node_->TestStreamFlags(SSF_SENDING))
62      return false;
63
64    size_t tx_data_avail = emitter_->BytesInOutputFIFO();
65    int capped_len = std::min(tx_data_avail, kMaxPacketSize);
66    if (capped_len == 0)
67      return false;
68
69    data_ = new char[capped_len];
70    emitter_->ReadOut_Locked(data_, capped_len);
71
72    int err = TCPInterface()->Write(node_->socket_resource(),
73                                    data_,
74                                    capped_len,
75                                    mount()->GetRunCompletion(this));
76
77    if (err != PP_OK_COMPLETIONPENDING) {
78      // Anything else, we should assume the socket has gone bad.
79      node_->SetError_Locked(err);
80      return false;
81    }
82
83    node_->SetStreamFlags(SSF_SENDING);
84    return true;
85  }
86
87  virtual void Run(int32_t length_error) {
88    AUTO_LOCK(emitter_->GetLock());
89
90    if (length_error < 0) {
91      // Send failed, mark the socket as bad
92      node_->SetError_Locked(length_error);
93      return;
94    }
95
96    // If we did send, then Q more work.
97    node_->ClearStreamFlags(SSF_SENDING);
98    node_->QueueOutput();
99  }
100
101 private:
102  // We assume that transmits will always complete.  If the upstream
103  // actually back pressures, enough to prevent the Send callback
104  // from triggering, this resource may never go away.
105  ScopedMountNodeSocket node_;
106};
107
108class TCPRecvWork : public TCPWork {
109 public:
110  explicit TCPRecvWork(const ScopedEventEmitterTCP& emitter)
111      : TCPWork(emitter) {}
112
113  virtual bool Start(int32_t val) {
114    AUTO_LOCK(emitter_->GetLock());
115    MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
116
117    // Does the stream exist, and can it recv?
118    if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
119      return false;
120
121    // If we are not currently receiving
122    if (stream->TestStreamFlags(SSF_RECVING))
123      return false;
124
125    size_t rx_space_avail = emitter_->SpaceInInputFIFO();
126    int capped_len =
127        static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
128
129    if (capped_len == 0)
130      return false;
131
132    data_ = new char[capped_len];
133    int err = TCPInterface()->Read(stream->socket_resource(),
134                                   data_,
135                                   capped_len,
136                                   mount()->GetRunCompletion(this));
137    if (err != PP_OK_COMPLETIONPENDING) {
138      // Anything else, we should assume the socket has gone bad.
139      stream->SetError_Locked(err);
140      return false;
141    }
142
143    stream->SetStreamFlags(SSF_RECVING);
144    return true;
145  }
146
147  virtual void Run(int32_t length_error) {
148    AUTO_LOCK(emitter_->GetLock());
149    MountNodeTCP* stream = static_cast<MountNodeTCP*>(emitter_->stream());
150
151    if (!stream)
152      return;
153
154    if (length_error <= 0) {
155      stream->SetError_Locked(length_error);
156      return;
157    }
158
159    // If we successfully received, queue more input
160    emitter_->WriteIn_Locked(data_, length_error);
161    stream->ClearStreamFlags(SSF_RECVING);
162    stream->QueueInput();
163  }
164};
165
166class TCPAcceptWork : public MountStream::Work {
167 public:
168  explicit TCPAcceptWork(MountStream* stream,
169                         const ScopedEventEmitterTCP& emitter)
170      : MountStream::Work(stream),
171        emitter_(emitter) {}
172
173  TCPSocketInterface* TCPInterface() {
174    return mount()->ppapi()->GetTCPSocketInterface();
175  }
176
177  virtual bool Start(int32_t val) {
178    AUTO_LOCK(emitter_->GetLock());
179    MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
180
181    // Does the stream exist, and can it accept?
182    if (NULL == node)
183      return false;
184
185    // If we are not currently accepting
186    if (!node->TestStreamFlags(SSF_LISTENING))
187      return false;
188
189    int err = TCPInterface()->Accept(node->socket_resource(),
190                                     &new_socket_,
191                                     mount()->GetRunCompletion(this));
192
193    if (err != PP_OK_COMPLETIONPENDING) {
194      // Anything else, we should assume the socket has gone bad.
195      node->SetError_Locked(err);
196      return false;
197    }
198
199    return true;
200  }
201
202  virtual void Run(int32_t error) {
203    AUTO_LOCK(emitter_->GetLock());
204    MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
205
206    if (node == NULL)
207      return;
208
209    if (error != PP_OK) {
210      node->SetError_Locked(error);
211      return;
212    }
213
214    emitter_->SetAcceptedSocket_Locked(new_socket_);
215  }
216
217 protected:
218  PP_Resource new_socket_;
219  ScopedEventEmitterTCP emitter_;
220};
221
222class TCPConnectWork : public MountStream::Work {
223 public:
224  explicit TCPConnectWork(MountStream* stream,
225                          const ScopedEventEmitterTCP& emitter)
226      : MountStream::Work(stream),
227        emitter_(emitter) {}
228
229  TCPSocketInterface* TCPInterface() {
230    return mount()->ppapi()->GetTCPSocketInterface();
231  }
232
233  virtual bool Start(int32_t val) {
234    AUTO_LOCK(emitter_->GetLock());
235    MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
236
237    // Does the stream exist, and can it connect?
238    if (NULL == node)
239      return false;
240
241    int err = TCPInterface()->Connect(node->socket_resource(),
242                                      node->remote_addr(),
243                                      mount()->GetRunCompletion(this));
244    if (err != PP_OK_COMPLETIONPENDING) {
245      // Anything else, we should assume the socket has gone bad.
246      node->SetError_Locked(err);
247      return false;
248    }
249
250    return true;
251  }
252
253  virtual void Run(int32_t error) {
254    AUTO_LOCK(emitter_->GetLock());
255    MountNodeTCP* node = static_cast<MountNodeTCP*>(emitter_->stream());
256
257    if (node == NULL)
258      return;
259
260    if (error != PP_OK) {
261      node->ConnectFailed_Locked();
262      node->SetError_Locked(error);
263      return;
264    }
265
266    node->ConnectDone_Locked();
267  }
268
269 protected:
270  ScopedEventEmitterTCP emitter_;
271};
272
273MountNodeTCP::MountNodeTCP(Mount* mount)
274    : MountNodeSocket(mount),
275      emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)),
276      tcp_nodelay_(false) {
277  emitter_->AttachStream(this);
278}
279
280MountNodeTCP::MountNodeTCP(Mount* mount, PP_Resource socket)
281    : MountNodeSocket(mount, socket),
282      emitter_(new EventEmitterTCP(kDefaultFifoSize, kDefaultFifoSize)),
283      tcp_nodelay_(false) {
284  emitter_->AttachStream(this);
285}
286
287void MountNodeTCP::Destroy() {
288  emitter_->DetachStream();
289  MountNodeSocket::Destroy();
290}
291
292Error MountNodeTCP::Init(int open_flags) {
293  Error err = MountNodeSocket::Init(open_flags);
294  if (err != 0)
295    return err;
296
297  if (TCPInterface() == NULL)
298    return EACCES;
299
300  if (socket_resource_ != 0) {
301    // TCP sockets that are contructed with an existing socket_resource_
302    // are those that generated from calls to Accept() and therefore are
303    // already connected.
304    remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_);
305    ConnectDone_Locked();
306  } else {
307    socket_resource_ = TCPInterface()->Create(mount_->ppapi()->GetInstance());
308    if (0 == socket_resource_)
309      return EACCES;
310    SetStreamFlags(SSF_CAN_CONNECT);
311  }
312
313  return 0;
314}
315
316EventEmitter* MountNodeTCP::GetEventEmitter() {
317  return emitter_.get();
318}
319
320void MountNodeTCP::SetError_Locked(int pp_error_num) {
321  MountNodeSocket::SetError_Locked(pp_error_num);
322  emitter_->SetError_Locked();
323}
324
325Error MountNodeTCP::GetSockOpt(int lvl,
326                               int optname,
327                               void* optval,
328                               socklen_t* len) {
329  if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
330    AUTO_LOCK(node_lock_);
331    int value = tcp_nodelay_;
332    socklen_t value_len = sizeof(value);
333    int copy_bytes = std::min(value_len, *len);
334    memcpy(optval, &value, copy_bytes);
335    *len = value_len;
336    return 0;
337  }
338
339  return MountNodeSocket::GetSockOpt(lvl, optname, optval, len);
340}
341
342
343Error MountNodeTCP::SetNoDelay_Locked() {
344  if (!IsConnected())
345    return 0;
346
347  int32_t error = TCPInterface()->SetOption(socket_resource_,
348      PP_TCPSOCKET_OPTION_NO_DELAY,
349      PP_MakeBool(tcp_nodelay_ ? PP_TRUE : PP_FALSE),
350      PP_BlockUntilComplete());
351  return PPErrorToErrno(error);
352}
353
354Error MountNodeTCP::SetSockOpt(int lvl,
355                               int optname,
356                               const void* optval,
357                               socklen_t len) {
358  if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
359    if (len < sizeof(int))
360      return EINVAL;
361    AUTO_LOCK(node_lock_);
362    tcp_nodelay_ = *static_cast<const int*>(optval) != 0;
363    return SetNoDelay_Locked();
364  }
365
366  return MountNodeSocket::SetSockOpt(lvl, optname, optval, len);
367}
368
369void MountNodeTCP::QueueAccept() {
370  MountStream::Work* work = new TCPAcceptWork(mount_stream(), emitter_);
371  mount_stream()->EnqueueWork(work);
372}
373
374void MountNodeTCP::QueueConnect() {
375  MountStream::Work* work = new TCPConnectWork(mount_stream(), emitter_);
376  mount_stream()->EnqueueWork(work);
377}
378
379void MountNodeTCP::QueueInput() {
380  MountStream::Work* work = new TCPRecvWork(emitter_);
381  mount_stream()->EnqueueWork(work);
382}
383
384void MountNodeTCP::QueueOutput() {
385  if (TestStreamFlags(SSF_SENDING))
386    return;
387
388  if (!TestStreamFlags(SSF_CAN_SEND))
389    return;
390
391  if (0 == emitter_->BytesInOutputFIFO())
392    return;
393
394  MountStream::Work* work = new TCPSendWork(emitter_,
395                                            ScopedMountNodeSocket(this));
396  mount_stream()->EnqueueWork(work);
397}
398
399Error MountNodeTCP::Accept(const HandleAttr& attr,
400                           PP_Resource* out_sock,
401                           struct sockaddr* addr,
402                           socklen_t* len) {
403  EventListenerLock wait(GetEventEmitter());
404
405  if (!TestStreamFlags(SSF_LISTENING))
406    return EINVAL;
407
408  // Either block forever or not at all
409  int ms = attr.IsBlocking() ? -1 : 0;
410
411  Error err = wait.WaitOnEvent(POLLIN, ms);
412  if (ETIMEDOUT == err)
413    return EWOULDBLOCK;
414
415  int s = emitter_->GetAcceptedSocket_Locked();
416  // Non-blocking case.
417  if (s == 0)
418    return EAGAIN;
419
420  // Consume the new socket and start listening for the next one
421  *out_sock = s;
422  emitter_->ClearEvents_Locked(POLLIN);
423
424  // Set the out paramaters
425  PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock);
426  *len = ResourceToSockAddr(remote_addr, *len, addr);
427  mount_->ppapi()->ReleaseResource(remote_addr);
428
429  QueueAccept();
430  return 0;
431}
432
433// We can not bind a client socket with PPAPI.  For now we ignore the
434// bind but report the correct address later, just in case someone is
435// binding without really caring what the address is (for example to
436// select a more optimized interface/route.)
437Error MountNodeTCP::Bind(const struct sockaddr* addr, socklen_t len) {
438  AUTO_LOCK(node_lock_);
439
440  /* Only bind once. */
441  if (IsBound())
442    return EINVAL;
443
444  local_addr_ = SockAddrToResource(addr, len);
445  int err = TCPInterface()->Bind(socket_resource_,
446                                 local_addr_,
447                                 PP_BlockUntilComplete());
448
449  // If we fail, release the local addr resource
450  if (err != PP_OK) {
451    mount_->ppapi()->ReleaseResource(local_addr_);
452    local_addr_ = 0;
453    return PPErrorToErrno(err);
454  }
455
456  return 0;
457}
458
459Error MountNodeTCP::Connect(const HandleAttr& attr,
460                            const struct sockaddr* addr,
461                            socklen_t len) {
462  EventListenerLock wait(GetEventEmitter());
463
464  if (TestStreamFlags(SSF_CONNECTING))
465    return EALREADY;
466
467  if (IsConnected())
468    return EISCONN;
469
470  remote_addr_ = SockAddrToResource(addr, len);
471  if (0 == remote_addr_)
472    return EINVAL;
473
474  int ms = attr.IsBlocking() ? -1 : 0;
475
476  SetStreamFlags(SSF_CONNECTING);
477  QueueConnect();
478
479  Error err = wait.WaitOnEvent(POLLOUT, ms);
480  if (ETIMEDOUT == err)
481    return EINPROGRESS;
482
483  // If we fail, release the dest addr resource
484  if (err != 0) {
485    ConnectFailed_Locked();
486    return err;
487  }
488
489  ConnectDone_Locked();
490  return 0;
491}
492
493Error MountNodeTCP::Shutdown(int how) {
494  AUTO_LOCK(node_lock_);
495  if (!IsConnected())
496    return ENOTCONN;
497  {
498    AUTO_LOCK(emitter_->GetLock());
499    emitter_->SetError_Locked();
500  }
501  return 0;
502}
503
504void MountNodeTCP::ConnectDone_Locked() {
505  local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
506
507  // Now that we are connected, we can start sending and receiving.
508  ClearStreamFlags(SSF_CONNECTING | SSF_CAN_CONNECT);
509  SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
510
511  emitter_->ConnectDone_Locked();
512
513  // The NODELAY option cannot be set in PPAPI before the socket
514  // is connected, but setsockopt() might have already set it.
515  SetNoDelay_Locked();
516
517  // Begin the input pump
518  QueueInput();
519}
520
521void MountNodeTCP::ConnectFailed_Locked() {
522  mount_->ppapi()->ReleaseResource(remote_addr_);
523  remote_addr_ = 0;
524}
525
526Error MountNodeTCP::Listen(int backlog) {
527  AUTO_LOCK(node_lock_);
528  if (!IsBound())
529    return EINVAL;
530
531  int err = TCPInterface()->Listen(socket_resource_,
532                                   backlog,
533                                   PP_BlockUntilComplete());
534  if (err != PP_OK)
535    return PPErrorToErrno(err);
536
537  ClearStreamFlags(SSF_CAN_CONNECT);
538  SetStreamFlags(SSF_LISTENING);
539  emitter_->SetListening_Locked();
540  QueueAccept();
541  return 0;
542}
543
544Error MountNodeTCP::Recv_Locked(void* buf,
545                                size_t len,
546                                PP_Resource* out_addr,
547                                int* out_len) {
548  assert(emitter_.get());
549  *out_len = emitter_->ReadIn_Locked((char*)buf, len);
550  *out_addr = remote_addr_;
551
552  // Ref the address copy we pass back.
553  mount_->ppapi()->AddRefResource(remote_addr_);
554  return 0;
555}
556
557// TCP ignores dst addr passed to send_to, and always uses bound address
558Error MountNodeTCP::Send_Locked(const void* buf,
559                                size_t len,
560                                PP_Resource,
561                                int* out_len) {
562  assert(emitter_.get());
563  if (emitter_->GetError_Locked())
564    return EPIPE;
565  *out_len = emitter_->WriteOut_Locked((char*)buf, len);
566  return 0;
567}
568
569
570}  // namespace nacl_io
571
572#endif  // PROVIDES_SOCKET_API
573