tcp_node.cc revision 5d1f7b1de12d16ceb2c938c56701a3e8bfa558f7
1// Copyright 2013 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "nacl_io/ossocket.h"
6#ifdef PROVIDES_SOCKET_API
7
8#include <assert.h>
9#include <errno.h>
10#include <string.h>
11#include <algorithm>
12
13#include "nacl_io/kernel_handle.h"
14#include "nacl_io/pepper_interface.h"
15#include "nacl_io/socket/tcp_node.h"
16#include "nacl_io/stream/stream_fs.h"
17
18namespace {
19const size_t kMaxPacketSize = 65536;
20const size_t kDefaultFifoSize = kMaxPacketSize * 8;
21}
22
23namespace nacl_io {
24
25class TcpWork : public StreamFs::Work {
26 public:
27  explicit TcpWork(const ScopedTcpEventEmitter& emitter)
28      : StreamFs::Work(emitter->stream()->stream()),
29        emitter_(emitter),
30        data_(NULL) {}
31
32  ~TcpWork() { delete[] data_; }
33
34  TCPSocketInterface* TCPInterface() {
35    return filesystem()->ppapi()->GetTCPSocketInterface();
36  }
37
38 protected:
39  ScopedTcpEventEmitter emitter_;
40  char* data_;
41};
42
43class TcpSendWork : public TcpWork {
44 public:
45  explicit TcpSendWork(const ScopedTcpEventEmitter& emitter,
46                       const ScopedSocketNode& stream)
47      : TcpWork(emitter), node_(stream) {}
48
49  virtual bool Start(int32_t val) {
50    AUTO_LOCK(emitter_->GetLock());
51
52    // Does the stream exist, and can it send?
53    if (!node_->TestStreamFlags(SSF_CAN_SEND))
54      return false;
55
56    // Check if we are already sending.
57    if (node_->TestStreamFlags(SSF_SENDING))
58      return false;
59
60    size_t tx_data_avail = emitter_->BytesInOutputFIFO();
61    int capped_len = std::min(tx_data_avail, kMaxPacketSize);
62    if (capped_len == 0)
63      return false;
64
65    data_ = new char[capped_len];
66    emitter_->ReadOut_Locked(data_, capped_len);
67
68    int err = TCPInterface()->Write(node_->socket_resource(),
69                                    data_,
70                                    capped_len,
71                                    filesystem()->GetRunCompletion(this));
72
73    if (err != PP_OK_COMPLETIONPENDING) {
74      // Anything else, we should assume the socket has gone bad.
75      node_->SetError_Locked(err);
76      return false;
77    }
78
79    node_->SetStreamFlags(SSF_SENDING);
80    return true;
81  }
82
83  virtual void Run(int32_t length_error) {
84    AUTO_LOCK(emitter_->GetLock());
85
86    if (length_error < 0) {
87      // Send failed, mark the socket as bad
88      node_->SetError_Locked(length_error);
89      return;
90    }
91
92    // If we did send, then Q more work.
93    node_->ClearStreamFlags(SSF_SENDING);
94    node_->QueueOutput();
95  }
96
97 private:
98  // We assume that transmits will always complete.  If the upstream
99  // actually back pressures, enough to prevent the Send callback
100  // from triggering, this resource may never go away.
101  ScopedSocketNode node_;
102};
103
104class TcpRecvWork : public TcpWork {
105 public:
106  explicit TcpRecvWork(const ScopedTcpEventEmitter& emitter)
107      : TcpWork(emitter) {}
108
109  virtual bool Start(int32_t val) {
110    AUTO_LOCK(emitter_->GetLock());
111    TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());
112
113    // Does the stream exist, and can it recv?
114    if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
115      return false;
116
117    // If we are not currently receiving
118    if (stream->TestStreamFlags(SSF_RECVING))
119      return false;
120
121    size_t rx_space_avail = emitter_->SpaceInInputFIFO();
122    int capped_len =
123        static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
124
125    if (capped_len == 0)
126      return false;
127
128    data_ = new char[capped_len];
129    int err = TCPInterface()->Read(stream->socket_resource(),
130                                   data_,
131                                   capped_len,
132                                   filesystem()->GetRunCompletion(this));
133    if (err != PP_OK_COMPLETIONPENDING) {
134      // Anything else, we should assume the socket has gone bad.
135      stream->SetError_Locked(err);
136      return false;
137    }
138
139    stream->SetStreamFlags(SSF_RECVING);
140    return true;
141  }
142
143  virtual void Run(int32_t length_error) {
144    AUTO_LOCK(emitter_->GetLock());
145    TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());
146
147    if (!stream)
148      return;
149
150    if (length_error <= 0) {
151      stream->SetError_Locked(length_error);
152      return;
153    }
154
155    // If we successfully received, queue more input
156    emitter_->WriteIn_Locked(data_, length_error);
157    stream->ClearStreamFlags(SSF_RECVING);
158    stream->QueueInput();
159  }
160};
161
162class TCPAcceptWork : public StreamFs::Work {
163 public:
164  explicit TCPAcceptWork(StreamFs* stream, const ScopedTcpEventEmitter& emitter)
165      : StreamFs::Work(stream), emitter_(emitter) {}
166
167  TCPSocketInterface* TCPInterface() {
168    return filesystem()->ppapi()->GetTCPSocketInterface();
169  }
170
171  virtual bool Start(int32_t val) {
172    AUTO_LOCK(emitter_->GetLock());
173    TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
174
175    // Does the stream exist, and can it accept?
176    if (NULL == node)
177      return false;
178
179    // If we are not currently accepting
180    if (!node->TestStreamFlags(SSF_LISTENING))
181      return false;
182
183    int err = TCPInterface()->Accept(node->socket_resource(),
184                                     &new_socket_,
185                                     filesystem()->GetRunCompletion(this));
186
187    if (err != PP_OK_COMPLETIONPENDING) {
188      // Anything else, we should assume the socket has gone bad.
189      node->SetError_Locked(err);
190      return false;
191    }
192
193    return true;
194  }
195
196  virtual void Run(int32_t error) {
197    AUTO_LOCK(emitter_->GetLock());
198    TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
199
200    if (node == NULL)
201      return;
202
203    if (error != PP_OK) {
204      node->SetError_Locked(error);
205      return;
206    }
207
208    emitter_->SetAcceptedSocket_Locked(new_socket_);
209  }
210
211 protected:
212  PP_Resource new_socket_;
213  ScopedTcpEventEmitter emitter_;
214};
215
216class TCPConnectWork : public StreamFs::Work {
217 public:
218  explicit TCPConnectWork(StreamFs* stream,
219                          const ScopedTcpEventEmitter& emitter)
220      : StreamFs::Work(stream), emitter_(emitter) {}
221
222  TCPSocketInterface* TCPInterface() {
223    return filesystem()->ppapi()->GetTCPSocketInterface();
224  }
225
226  virtual bool Start(int32_t val) {
227    AUTO_LOCK(emitter_->GetLock());
228    TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
229
230    // Does the stream exist, and can it connect?
231    if (NULL == node)
232      return false;
233
234    int err = TCPInterface()->Connect(node->socket_resource(),
235                                      node->remote_addr(),
236                                      filesystem()->GetRunCompletion(this));
237    if (err != PP_OK_COMPLETIONPENDING) {
238      // Anything else, we should assume the socket has gone bad.
239      node->SetError_Locked(err);
240      return false;
241    }
242
243    return true;
244  }
245
246  virtual void Run(int32_t error) {
247    AUTO_LOCK(emitter_->GetLock());
248    TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
249
250    if (node == NULL)
251      return;
252
253    if (error != PP_OK) {
254      node->ConnectFailed_Locked();
255      node->SetError_Locked(error);
256      return;
257    }
258
259    node->ConnectDone_Locked();
260  }
261
262 protected:
263  ScopedTcpEventEmitter emitter_;
264};
265
266TcpNode::TcpNode(Filesystem* filesystem)
267    : SocketNode(filesystem),
268      emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
269      tcp_nodelay_(false) {
270  emitter_->AttachStream(this);
271}
272
273TcpNode::TcpNode(Filesystem* filesystem, PP_Resource socket)
274    : SocketNode(filesystem, socket),
275      emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
276      tcp_nodelay_(false) {
277  emitter_->AttachStream(this);
278}
279
280void TcpNode::Destroy() {
281  emitter_->DetachStream();
282  SocketNode::Destroy();
283}
284
285Error TcpNode::Init(int open_flags) {
286  Error err = SocketNode::Init(open_flags);
287  if (err != 0)
288    return err;
289
290  if (TCPInterface() == NULL)
291    return EACCES;
292
293  if (socket_resource_ != 0) {
294    // TCP sockets that are contructed with an existing socket_resource_
295    // are those that generated from calls to Accept() and therefore are
296    // already connected.
297    remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_);
298    ConnectDone_Locked();
299  } else {
300    socket_resource_ =
301        TCPInterface()->Create(filesystem_->ppapi()->GetInstance());
302    if (0 == socket_resource_)
303      return EACCES;
304    SetStreamFlags(SSF_CAN_CONNECT);
305  }
306
307  return 0;
308}
309
310EventEmitter* TcpNode::GetEventEmitter() { return emitter_.get(); }
311
312void TcpNode::SetError_Locked(int pp_error_num) {
313  SocketNode::SetError_Locked(pp_error_num);
314  emitter_->SetError_Locked();
315}
316
317Error TcpNode::GetSockOpt(int lvl, int optname, void* optval, socklen_t* len) {
318  if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
319    AUTO_LOCK(node_lock_);
320    int value = tcp_nodelay_;
321    socklen_t value_len = sizeof(value);
322    int copy_bytes = std::min(value_len, *len);
323    memcpy(optval, &value, copy_bytes);
324    *len = value_len;
325    return 0;
326  }
327
328  return SocketNode::GetSockOpt(lvl, optname, optval, len);
329}
330
331Error TcpNode::SetNoDelay_Locked() {
332  if (!IsConnected())
333    return 0;
334
335  int32_t error =
336      TCPInterface()->SetOption(socket_resource_,
337                                PP_TCPSOCKET_OPTION_NO_DELAY,
338                                PP_MakeBool(tcp_nodelay_ ? PP_TRUE : PP_FALSE),
339                                PP_BlockUntilComplete());
340  return PPErrorToErrno(error);
341}
342
343Error TcpNode::SetSockOpt(int lvl,
344                          int optname,
345                          const void* optval,
346                          socklen_t len) {
347  if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
348    if (len < sizeof(int))
349      return EINVAL;
350    AUTO_LOCK(node_lock_);
351    tcp_nodelay_ = *static_cast<const int*>(optval) != 0;
352    return SetNoDelay_Locked();
353  }
354
355  return SocketNode::SetSockOpt(lvl, optname, optval, len);
356}
357
358void TcpNode::QueueAccept() {
359  StreamFs::Work* work = new TCPAcceptWork(stream(), emitter_);
360  stream()->EnqueueWork(work);
361}
362
363void TcpNode::QueueConnect() {
364  StreamFs::Work* work = new TCPConnectWork(stream(), emitter_);
365  stream()->EnqueueWork(work);
366}
367
368void TcpNode::QueueInput() {
369  StreamFs::Work* work = new TcpRecvWork(emitter_);
370  stream()->EnqueueWork(work);
371}
372
373void TcpNode::QueueOutput() {
374  if (TestStreamFlags(SSF_SENDING))
375    return;
376
377  if (!TestStreamFlags(SSF_CAN_SEND))
378    return;
379
380  if (0 == emitter_->BytesInOutputFIFO())
381    return;
382
383  StreamFs::Work* work = new TcpSendWork(emitter_, ScopedSocketNode(this));
384  stream()->EnqueueWork(work);
385}
386
387Error TcpNode::Accept(const HandleAttr& attr,
388                      PP_Resource* out_sock,
389                      struct sockaddr* addr,
390                      socklen_t* len) {
391  EventListenerLock wait(GetEventEmitter());
392
393  if (!TestStreamFlags(SSF_LISTENING))
394    return EINVAL;
395
396  // Either block forever or not at all
397  int ms = attr.IsBlocking() ? -1 : 0;
398
399  Error err = wait.WaitOnEvent(POLLIN, ms);
400  if (ETIMEDOUT == err)
401    return EWOULDBLOCK;
402
403  int s = emitter_->GetAcceptedSocket_Locked();
404  // Non-blocking case.
405  if (s == 0)
406    return EAGAIN;
407
408  // Consume the new socket and start listening for the next one
409  *out_sock = s;
410  emitter_->ClearEvents_Locked(POLLIN);
411
412  // Set the out paramaters
413  PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock);
414  *len = ResourceToSockAddr(remote_addr, *len, addr);
415  filesystem_->ppapi()->ReleaseResource(remote_addr);
416
417  QueueAccept();
418  return 0;
419}
420
421// We can not bind a client socket with PPAPI.  For now we ignore the
422// bind but report the correct address later, just in case someone is
423// binding without really caring what the address is (for example to
424// select a more optimized interface/route.)
425Error TcpNode::Bind(const struct sockaddr* addr, socklen_t len) {
426  AUTO_LOCK(node_lock_);
427
428  /* Only bind once. */
429  if (IsBound())
430    return EINVAL;
431
432  local_addr_ = SockAddrToResource(addr, len);
433  int err = TCPInterface()->Bind(
434      socket_resource_, local_addr_, PP_BlockUntilComplete());
435
436  // If we fail, release the local addr resource
437  if (err != PP_OK) {
438    filesystem_->ppapi()->ReleaseResource(local_addr_);
439    local_addr_ = 0;
440    return PPErrorToErrno(err);
441  }
442
443  return 0;
444}
445
446Error TcpNode::Connect(const HandleAttr& attr,
447                       const struct sockaddr* addr,
448                       socklen_t len) {
449  EventListenerLock wait(GetEventEmitter());
450
451  if (TestStreamFlags(SSF_CONNECTING))
452    return EALREADY;
453
454  if (IsConnected())
455    return EISCONN;
456
457  remote_addr_ = SockAddrToResource(addr, len);
458  if (0 == remote_addr_)
459    return EINVAL;
460
461  int ms = attr.IsBlocking() ? -1 : 0;
462
463  SetStreamFlags(SSF_CONNECTING);
464  QueueConnect();
465
466  Error err = wait.WaitOnEvent(POLLOUT, ms);
467  if (ETIMEDOUT == err)
468    return EINPROGRESS;
469
470  // If we fail, release the dest addr resource
471  if (err != 0) {
472    ConnectFailed_Locked();
473    return err;
474  }
475
476  ConnectDone_Locked();
477  return 0;
478}
479
480Error TcpNode::Shutdown(int how) {
481  AUTO_LOCK(node_lock_);
482  if (!IsConnected())
483    return ENOTCONN;
484  {
485    AUTO_LOCK(emitter_->GetLock());
486    emitter_->SetError_Locked();
487  }
488  return 0;
489}
490
491void TcpNode::ConnectDone_Locked() {
492  local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
493
494  // Now that we are connected, we can start sending and receiving.
495  ClearStreamFlags(SSF_CONNECTING | SSF_CAN_CONNECT);
496  SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
497
498  emitter_->ConnectDone_Locked();
499
500  // The NODELAY option cannot be set in PPAPI before the socket
501  // is connected, but setsockopt() might have already set it.
502  SetNoDelay_Locked();
503
504  // Begin the input pump
505  QueueInput();
506}
507
508void TcpNode::ConnectFailed_Locked() {
509  filesystem_->ppapi()->ReleaseResource(remote_addr_);
510  remote_addr_ = 0;
511}
512
513Error TcpNode::Listen(int backlog) {
514  AUTO_LOCK(node_lock_);
515  if (!IsBound())
516    return EINVAL;
517
518  int err = TCPInterface()->Listen(
519      socket_resource_, backlog, PP_BlockUntilComplete());
520  if (err != PP_OK)
521    return PPErrorToErrno(err);
522
523  ClearStreamFlags(SSF_CAN_CONNECT);
524  SetStreamFlags(SSF_LISTENING);
525  emitter_->SetListening_Locked();
526  QueueAccept();
527  return 0;
528}
529
530Error TcpNode::Recv_Locked(void* buf,
531                           size_t len,
532                           PP_Resource* out_addr,
533                           int* out_len) {
534  assert(emitter_.get());
535  *out_len = emitter_->ReadIn_Locked((char*)buf, len);
536  *out_addr = remote_addr_;
537
538  // Ref the address copy we pass back.
539  filesystem_->ppapi()->AddRefResource(remote_addr_);
540  return 0;
541}
542
543// TCP ignores dst addr passed to send_to, and always uses bound address
544Error TcpNode::Send_Locked(const void* buf,
545                           size_t len,
546                           PP_Resource,
547                           int* out_len) {
548  assert(emitter_.get());
549  if (emitter_->GetError_Locked())
550    return EPIPE;
551  *out_len = emitter_->WriteOut_Locked((char*)buf, len);
552  return 0;
553}
554
555}  // namespace nacl_io
556
557#endif  // PROVIDES_SOCKET_API
558