1/*
2 *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
3 *
4 *  Use of this source code is governed by a BSD-style license
5 *  that can be found in the LICENSE file in the root of the source
6 *  tree. An additional intellectual property rights grant can be found
7 *  in the file PATENTS.  All contributing project authors may
8 *  be found in the AUTHORS file in the root of the source tree.
9 */
10
11
12#if defined(WEBRTC_WIN)
13#include "webrtc/base/win32.h"
14#else  // !WEBRTC_WIN
15#define SEC_E_CERT_EXPIRED (-2146893016)
16#endif  // !WEBRTC_WIN
17
18#include "webrtc/base/common.h"
19#include "webrtc/base/httpbase.h"
20#include "webrtc/base/logging.h"
21#include "webrtc/base/socket.h"
22#include "webrtc/base/stringutils.h"
23#include "webrtc/base/thread.h"
24
25namespace rtc {
26
27//////////////////////////////////////////////////////////////////////
28// Helpers
29//////////////////////////////////////////////////////////////////////
30
31bool MatchHeader(const char* str, size_t len, HttpHeader header) {
32  const char* const header_str = ToString(header);
33  const size_t header_len = strlen(header_str);
34  return (len == header_len) && (_strnicmp(str, header_str, header_len) == 0);
35}
36
37enum {
38  MSG_READ
39};
40
41//////////////////////////////////////////////////////////////////////
42// HttpParser
43//////////////////////////////////////////////////////////////////////
44
45HttpParser::HttpParser() {
46  reset();
47}
48
49HttpParser::~HttpParser() {
50}
51
52void
53HttpParser::reset() {
54  state_ = ST_LEADER;
55  chunked_ = false;
56  data_size_ = SIZE_UNKNOWN;
57}
58
59HttpParser::ProcessResult
60HttpParser::Process(const char* buffer, size_t len, size_t* processed,
61                    HttpError* error) {
62  *processed = 0;
63  *error = HE_NONE;
64
65  if (state_ >= ST_COMPLETE) {
66    ASSERT(false);
67    return PR_COMPLETE;
68  }
69
70  while (true) {
71    if (state_ < ST_DATA) {
72      size_t pos = *processed;
73      while ((pos < len) && (buffer[pos] != '\n')) {
74        pos += 1;
75      }
76      if (pos >= len) {
77        break;  // don't have a full header
78      }
79      const char* line = buffer + *processed;
80      size_t len = (pos - *processed);
81      *processed = pos + 1;
82      while ((len > 0) && isspace(static_cast<unsigned char>(line[len-1]))) {
83        len -= 1;
84      }
85      ProcessResult result = ProcessLine(line, len, error);
86      LOG(LS_VERBOSE) << "Processed line, result=" << result;
87
88      if (PR_CONTINUE != result) {
89        return result;
90      }
91    } else if (data_size_ == 0) {
92      if (chunked_) {
93        state_ = ST_CHUNKTERM;
94      } else {
95        return PR_COMPLETE;
96      }
97    } else {
98      size_t available = len - *processed;
99      if (available <= 0) {
100        break; // no more data
101      }
102      if ((data_size_ != SIZE_UNKNOWN) && (available > data_size_)) {
103        available = data_size_;
104      }
105      size_t read = 0;
106      ProcessResult result = ProcessData(buffer + *processed, available, read,
107                                         error);
108      LOG(LS_VERBOSE) << "Processed data, result: " << result << " read: "
109                      << read << " err: " << error;
110
111      if (PR_CONTINUE != result) {
112        return result;
113      }
114      *processed += read;
115      if (data_size_ != SIZE_UNKNOWN) {
116        data_size_ -= read;
117      }
118    }
119  }
120
121  return PR_CONTINUE;
122}
123
124HttpParser::ProcessResult
125HttpParser::ProcessLine(const char* line, size_t len, HttpError* error) {
126  LOG_F(LS_VERBOSE) << " state: " << state_ << " line: "
127                    << std::string(line, len) << " len: " << len << " err: "
128                    << error;
129
130  switch (state_) {
131  case ST_LEADER:
132    state_ = ST_HEADERS;
133    return ProcessLeader(line, len, error);
134
135  case ST_HEADERS:
136    if (len > 0) {
137      const char* value = strchrn(line, len, ':');
138      if (!value) {
139        *error = HE_PROTOCOL;
140        return PR_COMPLETE;
141      }
142      size_t nlen = (value - line);
143      const char* eol = line + len;
144      do {
145        value += 1;
146      } while ((value < eol) && isspace(static_cast<unsigned char>(*value)));
147      size_t vlen = eol - value;
148      if (MatchHeader(line, nlen, HH_CONTENT_LENGTH)) {
149        // sscanf isn't safe with strings that aren't null-terminated, and there
150        // is no guarantee that |value| is.
151        // Create a local copy that is null-terminated.
152        std::string value_str(value, vlen);
153        unsigned int temp_size;
154        if (sscanf(value_str.c_str(), "%u", &temp_size) != 1) {
155          *error = HE_PROTOCOL;
156          return PR_COMPLETE;
157        }
158        data_size_ = static_cast<size_t>(temp_size);
159      } else if (MatchHeader(line, nlen, HH_TRANSFER_ENCODING)) {
160        if ((vlen == 7) && (_strnicmp(value, "chunked", 7) == 0)) {
161          chunked_ = true;
162        } else if ((vlen == 8) && (_strnicmp(value, "identity", 8) == 0)) {
163          chunked_ = false;
164        } else {
165          *error = HE_PROTOCOL;
166          return PR_COMPLETE;
167        }
168      }
169      return ProcessHeader(line, nlen, value, vlen, error);
170    } else {
171      state_ = chunked_ ? ST_CHUNKSIZE : ST_DATA;
172      return ProcessHeaderComplete(chunked_, data_size_, error);
173    }
174    break;
175
176  case ST_CHUNKSIZE:
177    if (len > 0) {
178      char* ptr = NULL;
179      data_size_ = strtoul(line, &ptr, 16);
180      if (ptr != line + len) {
181        *error = HE_PROTOCOL;
182        return PR_COMPLETE;
183      }
184      state_ = (data_size_ == 0) ? ST_TRAILERS : ST_DATA;
185    } else {
186      *error = HE_PROTOCOL;
187      return PR_COMPLETE;
188    }
189    break;
190
191  case ST_CHUNKTERM:
192    if (len > 0) {
193      *error = HE_PROTOCOL;
194      return PR_COMPLETE;
195    } else {
196      state_ = chunked_ ? ST_CHUNKSIZE : ST_DATA;
197    }
198    break;
199
200  case ST_TRAILERS:
201    if (len == 0) {
202      return PR_COMPLETE;
203    }
204    // *error = onHttpRecvTrailer();
205    break;
206
207  default:
208    ASSERT(false);
209    break;
210  }
211
212  return PR_CONTINUE;
213}
214
215bool
216HttpParser::is_valid_end_of_input() const {
217  return (state_ == ST_DATA) && (data_size_ == SIZE_UNKNOWN);
218}
219
220void
221HttpParser::complete(HttpError error) {
222  if (state_ < ST_COMPLETE) {
223    state_ = ST_COMPLETE;
224    OnComplete(error);
225  }
226}
227
228//////////////////////////////////////////////////////////////////////
229// HttpBase::DocumentStream
230//////////////////////////////////////////////////////////////////////
231
232class BlockingMemoryStream : public ExternalMemoryStream {
233public:
234  BlockingMemoryStream(char* buffer, size_t size)
235  : ExternalMemoryStream(buffer, size) { }
236
237  StreamResult DoReserve(size_t size, int* error) override {
238    return (buffer_length_ >= size) ? SR_SUCCESS : SR_BLOCK;
239  }
240};
241
242class HttpBase::DocumentStream : public StreamInterface {
243public:
244  DocumentStream(HttpBase* base) : base_(base), error_(HE_DEFAULT) { }
245
246  StreamState GetState() const override {
247    if (NULL == base_)
248      return SS_CLOSED;
249    if (HM_RECV == base_->mode_)
250      return SS_OPEN;
251    return SS_OPENING;
252  }
253
254  StreamResult Read(void* buffer,
255                    size_t buffer_len,
256                    size_t* read,
257                    int* error) override {
258    if (!base_) {
259      if (error) *error = error_;
260      return (HE_NONE == error_) ? SR_EOS : SR_ERROR;
261    }
262
263    if (HM_RECV != base_->mode_) {
264      return SR_BLOCK;
265    }
266
267    // DoReceiveLoop writes http document data to the StreamInterface* document
268    // member of HttpData.  In this case, we want this data to be written
269    // directly to our buffer.  To accomplish this, we wrap our buffer with a
270    // StreamInterface, and replace the existing document with our wrapper.
271    // When the method returns, we restore the old document.  Ideally, we would
272    // pass our StreamInterface* to DoReceiveLoop, but due to the callbacks
273    // of HttpParser, we would still need to store the pointer temporarily.
274    scoped_ptr<StreamInterface>
275        stream(new BlockingMemoryStream(reinterpret_cast<char*>(buffer),
276                                        buffer_len));
277
278    // Replace the existing document with our wrapped buffer.
279    base_->data_->document.swap(stream);
280
281    // Pump the I/O loop.  DoReceiveLoop is guaranteed not to attempt to
282    // complete the I/O process, which means that our wrapper is not in danger
283    // of being deleted.  To ensure this, DoReceiveLoop returns true when it
284    // wants complete to be called.  We make sure to uninstall our wrapper
285    // before calling complete().
286    HttpError http_error;
287    bool complete = base_->DoReceiveLoop(&http_error);
288
289    // Reinstall the original output document.
290    base_->data_->document.swap(stream);
291
292    // If we reach the end of the receive stream, we disconnect our stream
293    // adapter from the HttpBase, and further calls to read will either return
294    // EOS or ERROR, appropriately.  Finally, we call complete().
295    StreamResult result = SR_BLOCK;
296    if (complete) {
297      HttpBase* base = Disconnect(http_error);
298      if (error) *error = error_;
299      result = (HE_NONE == error_) ? SR_EOS : SR_ERROR;
300      base->complete(http_error);
301    }
302
303    // Even if we are complete, if some data was read we must return SUCCESS.
304    // Future Reads will return EOS or ERROR based on the error_ variable.
305    size_t position;
306    stream->GetPosition(&position);
307    if (position > 0) {
308      if (read) *read = position;
309      result = SR_SUCCESS;
310    }
311    return result;
312  }
313
314  StreamResult Write(const void* data,
315                     size_t data_len,
316                     size_t* written,
317                     int* error) override {
318    if (error) *error = -1;
319    return SR_ERROR;
320  }
321
322  void Close() override {
323    if (base_) {
324      HttpBase* base = Disconnect(HE_NONE);
325      if (HM_RECV == base->mode_ && base->http_stream_) {
326        // Read I/O could have been stalled on the user of this DocumentStream,
327        // so restart the I/O process now that we've removed ourselves.
328        base->http_stream_->PostEvent(SE_READ, 0);
329      }
330    }
331  }
332
333  bool GetAvailable(size_t* size) const override {
334    if (!base_ || HM_RECV != base_->mode_)
335      return false;
336    size_t data_size = base_->GetDataRemaining();
337    if (SIZE_UNKNOWN == data_size)
338      return false;
339    if (size)
340      *size = data_size;
341    return true;
342  }
343
344  HttpBase* Disconnect(HttpError error) {
345    ASSERT(NULL != base_);
346    ASSERT(NULL != base_->doc_stream_);
347    HttpBase* base = base_;
348    base_->doc_stream_ = NULL;
349    base_ = NULL;
350    error_ = error;
351    return base;
352  }
353
354private:
355  HttpBase* base_;
356  HttpError error_;
357};
358
359//////////////////////////////////////////////////////////////////////
360// HttpBase
361//////////////////////////////////////////////////////////////////////
362
363HttpBase::HttpBase() : mode_(HM_NONE), data_(NULL), notify_(NULL),
364                       http_stream_(NULL), doc_stream_(NULL) {
365}
366
367HttpBase::~HttpBase() {
368  ASSERT(HM_NONE == mode_);
369}
370
371bool
372HttpBase::isConnected() const {
373  return (http_stream_ != NULL) && (http_stream_->GetState() == SS_OPEN);
374}
375
376bool
377HttpBase::attach(StreamInterface* stream) {
378  if ((mode_ != HM_NONE) || (http_stream_ != NULL) || (stream == NULL)) {
379    ASSERT(false);
380    return false;
381  }
382  http_stream_ = stream;
383  http_stream_->SignalEvent.connect(this, &HttpBase::OnHttpStreamEvent);
384  mode_ = (http_stream_->GetState() == SS_OPENING) ? HM_CONNECT : HM_NONE;
385  return true;
386}
387
388StreamInterface*
389HttpBase::detach() {
390  ASSERT(HM_NONE == mode_);
391  if (mode_ != HM_NONE) {
392    return NULL;
393  }
394  StreamInterface* stream = http_stream_;
395  http_stream_ = NULL;
396  if (stream) {
397    stream->SignalEvent.disconnect(this);
398  }
399  return stream;
400}
401
402void
403HttpBase::send(HttpData* data) {
404  ASSERT(HM_NONE == mode_);
405  if (mode_ != HM_NONE) {
406    return;
407  } else if (!isConnected()) {
408    OnHttpStreamEvent(http_stream_, SE_CLOSE, HE_DISCONNECTED);
409    return;
410  }
411
412  mode_ = HM_SEND;
413  data_ = data;
414  len_ = 0;
415  ignore_data_ = chunk_data_ = false;
416
417  if (data_->document) {
418    data_->document->SignalEvent.connect(this, &HttpBase::OnDocumentEvent);
419  }
420
421  std::string encoding;
422  if (data_->hasHeader(HH_TRANSFER_ENCODING, &encoding)
423      && (encoding == "chunked")) {
424    chunk_data_ = true;
425  }
426
427  len_ = data_->formatLeader(buffer_, sizeof(buffer_));
428  len_ += strcpyn(buffer_ + len_, sizeof(buffer_) - len_, "\r\n");
429
430  header_ = data_->begin();
431  if (header_ == data_->end()) {
432    // We must call this at least once, in the case where there are no headers.
433    queue_headers();
434  }
435
436  flush_data();
437}
438
439void
440HttpBase::recv(HttpData* data) {
441  ASSERT(HM_NONE == mode_);
442  if (mode_ != HM_NONE) {
443    return;
444  } else if (!isConnected()) {
445    OnHttpStreamEvent(http_stream_, SE_CLOSE, HE_DISCONNECTED);
446    return;
447  }
448
449  mode_ = HM_RECV;
450  data_ = data;
451  len_ = 0;
452  ignore_data_ = chunk_data_ = false;
453
454  reset();
455  if (doc_stream_) {
456    doc_stream_->SignalEvent(doc_stream_, SE_OPEN | SE_READ, 0);
457  } else {
458    read_and_process_data();
459  }
460}
461
462void
463HttpBase::abort(HttpError err) {
464  if (mode_ != HM_NONE) {
465    if (http_stream_ != NULL) {
466      http_stream_->Close();
467    }
468    do_complete(err);
469  }
470}
471
472StreamInterface* HttpBase::GetDocumentStream() {
473  if (doc_stream_)
474    return NULL;
475  doc_stream_ = new DocumentStream(this);
476  return doc_stream_;
477}
478
479HttpError HttpBase::HandleStreamClose(int error) {
480  if (http_stream_ != NULL) {
481    http_stream_->Close();
482  }
483  if (error == 0) {
484    if ((mode_ == HM_RECV) && is_valid_end_of_input()) {
485      return HE_NONE;
486    } else {
487      return HE_DISCONNECTED;
488    }
489  } else if (error == SOCKET_EACCES) {
490    return HE_AUTH;
491  } else if (error == SEC_E_CERT_EXPIRED) {
492    return HE_CERTIFICATE_EXPIRED;
493  }
494  LOG_F(LS_ERROR) << "(" << error << ")";
495  return (HM_CONNECT == mode_) ? HE_CONNECT_FAILED : HE_SOCKET_ERROR;
496}
497
498bool HttpBase::DoReceiveLoop(HttpError* error) {
499  ASSERT(HM_RECV == mode_);
500  ASSERT(NULL != error);
501
502  // Do to the latency between receiving read notifications from
503  // pseudotcpchannel, we rely on repeated calls to read in order to acheive
504  // ideal throughput.  The number of reads is limited to prevent starving
505  // the caller.
506
507  size_t loop_count = 0;
508  const size_t kMaxReadCount = 20;
509  bool process_requires_more_data = false;
510  do {
511    // The most frequent use of this function is response to new data available
512    // on http_stream_.  Therefore, we optimize by attempting to read from the
513    // network first (as opposed to processing existing data first).
514
515    if (len_ < sizeof(buffer_)) {
516      // Attempt to buffer more data.
517      size_t read;
518      int read_error;
519      StreamResult read_result = http_stream_->Read(buffer_ + len_,
520                                                    sizeof(buffer_) - len_,
521                                                    &read, &read_error);
522      switch (read_result) {
523      case SR_SUCCESS:
524        ASSERT(len_ + read <= sizeof(buffer_));
525        len_ += read;
526        break;
527      case SR_BLOCK:
528        if (process_requires_more_data) {
529          // We're can't make progress until more data is available.
530          return false;
531        }
532        // Attempt to process the data already in our buffer.
533        break;
534      case SR_EOS:
535        // Clean close, with no error.
536        read_error = 0;
537        FALLTHROUGH();  // Fall through to HandleStreamClose.
538      case SR_ERROR:
539        *error = HandleStreamClose(read_error);
540        return true;
541      }
542    } else if (process_requires_more_data) {
543      // We have too much unprocessed data in our buffer.  This should only
544      // occur when a single HTTP header is longer than the buffer size (32K).
545      // Anything longer than that is almost certainly an error.
546      *error = HE_OVERFLOW;
547      return true;
548    }
549
550    // Process data in our buffer.  Process is not guaranteed to process all
551    // the buffered data.  In particular, it will wait until a complete
552    // protocol element (such as http header, or chunk size) is available,
553    // before processing it in its entirety.  Also, it is valid and sometimes
554    // necessary to call Process with an empty buffer, since the state machine
555    // may have interrupted state transitions to complete.
556    size_t processed;
557    ProcessResult process_result = Process(buffer_, len_, &processed,
558                                            error);
559    ASSERT(processed <= len_);
560    len_ -= processed;
561    memmove(buffer_, buffer_ + processed, len_);
562    switch (process_result) {
563    case PR_CONTINUE:
564      // We need more data to make progress.
565      process_requires_more_data = true;
566      break;
567    case PR_BLOCK:
568      // We're stalled on writing the processed data.
569      return false;
570    case PR_COMPLETE:
571      // *error already contains the correct code.
572      return true;
573    }
574  } while (++loop_count <= kMaxReadCount);
575
576  LOG_F(LS_WARNING) << "danger of starvation";
577  return false;
578}
579
580void
581HttpBase::read_and_process_data() {
582  HttpError error;
583  if (DoReceiveLoop(&error)) {
584    complete(error);
585  }
586}
587
588void
589HttpBase::flush_data() {
590  ASSERT(HM_SEND == mode_);
591
592  // When send_required is true, no more buffering can occur without a network
593  // write.
594  bool send_required = (len_ >= sizeof(buffer_));
595
596  while (true) {
597    ASSERT(len_ <= sizeof(buffer_));
598
599    // HTTP is inherently sensitive to round trip latency, since a frequent use
600    // case is for small requests and responses to be sent back and forth, and
601    // the lack of pipelining forces a single request to take a minimum of the
602    // round trip time.  As a result, it is to our benefit to pack as much data
603    // into each packet as possible.  Thus, we defer network writes until we've
604    // buffered as much data as possible.
605
606    if (!send_required && (header_ != data_->end())) {
607      // First, attempt to queue more header data.
608      send_required = queue_headers();
609    }
610
611    if (!send_required && data_->document) {
612      // Next, attempt to queue document data.
613
614      const size_t kChunkDigits = 8;
615      size_t offset, reserve;
616      if (chunk_data_) {
617        // Reserve characters at the start for X-byte hex value and \r\n
618        offset = len_ + kChunkDigits + 2;
619        // ... and 2 characters at the end for \r\n
620        reserve = offset + 2;
621      } else {
622        offset = len_;
623        reserve = offset;
624      }
625
626      if (reserve >= sizeof(buffer_)) {
627        send_required = true;
628      } else {
629        size_t read;
630        int error;
631        StreamResult result = data_->document->Read(buffer_ + offset,
632                                                    sizeof(buffer_) - reserve,
633                                                    &read, &error);
634        if (result == SR_SUCCESS) {
635          ASSERT(reserve + read <= sizeof(buffer_));
636          if (chunk_data_) {
637            // Prepend the chunk length in hex.
638            // Note: sprintfn appends a null terminator, which is why we can't
639            // combine it with the line terminator.
640            sprintfn(buffer_ + len_, kChunkDigits + 1, "%.*x",
641                     kChunkDigits, read);
642            // Add line terminator to the chunk length.
643            memcpy(buffer_ + len_ + kChunkDigits, "\r\n", 2);
644            // Add line terminator to the end of the chunk.
645            memcpy(buffer_ + offset + read, "\r\n", 2);
646          }
647          len_ = reserve + read;
648        } else if (result == SR_BLOCK) {
649          // Nothing to do but flush data to the network.
650          send_required = true;
651        } else if (result == SR_EOS) {
652          if (chunk_data_) {
653            // Append the empty chunk and empty trailers, then turn off
654            // chunking.
655            ASSERT(len_ + 5 <= sizeof(buffer_));
656            memcpy(buffer_ + len_, "0\r\n\r\n", 5);
657            len_ += 5;
658            chunk_data_ = false;
659          } else if (0 == len_) {
660            // No more data to read, and no more data to write.
661            do_complete();
662            return;
663          }
664          // Although we are done reading data, there is still data which needs
665          // to be flushed to the network.
666          send_required = true;
667        } else {
668          LOG_F(LS_ERROR) << "Read error: " << error;
669          do_complete(HE_STREAM);
670          return;
671        }
672      }
673    }
674
675    if (0 == len_) {
676      // No data currently available to send.
677      if (!data_->document) {
678        // If there is no source document, that means we're done.
679        do_complete();
680      }
681      return;
682    }
683
684    size_t written;
685    int error;
686    StreamResult result = http_stream_->Write(buffer_, len_, &written, &error);
687    if (result == SR_SUCCESS) {
688      ASSERT(written <= len_);
689      len_ -= written;
690      memmove(buffer_, buffer_ + written, len_);
691      send_required = false;
692    } else if (result == SR_BLOCK) {
693      if (send_required) {
694        // Nothing more we can do until network is writeable.
695        return;
696      }
697    } else {
698      ASSERT(result == SR_ERROR);
699      LOG_F(LS_ERROR) << "error";
700      OnHttpStreamEvent(http_stream_, SE_CLOSE, error);
701      return;
702    }
703  }
704
705  ASSERT(false);
706}
707
708bool
709HttpBase::queue_headers() {
710  ASSERT(HM_SEND == mode_);
711  while (header_ != data_->end()) {
712    size_t len = sprintfn(buffer_ + len_, sizeof(buffer_) - len_,
713                          "%.*s: %.*s\r\n",
714                          header_->first.size(), header_->first.data(),
715                          header_->second.size(), header_->second.data());
716    if (len_ + len < sizeof(buffer_) - 3) {
717      len_ += len;
718      ++header_;
719    } else if (len_ == 0) {
720      LOG(WARNING) << "discarding header that is too long: " << header_->first;
721      ++header_;
722    } else {
723      // Not enough room for the next header, write to network first.
724      return true;
725    }
726  }
727  // End of headers
728  len_ += strcpyn(buffer_ + len_, sizeof(buffer_) - len_, "\r\n");
729  return false;
730}
731
732void
733HttpBase::do_complete(HttpError err) {
734  ASSERT(mode_ != HM_NONE);
735  HttpMode mode = mode_;
736  mode_ = HM_NONE;
737  if (data_ && data_->document) {
738    data_->document->SignalEvent.disconnect(this);
739  }
740  data_ = NULL;
741  if ((HM_RECV == mode) && doc_stream_) {
742    ASSERT(HE_NONE != err);  // We should have Disconnected doc_stream_ already.
743    DocumentStream* ds = doc_stream_;
744    ds->Disconnect(err);
745    ds->SignalEvent(ds, SE_CLOSE, err);
746  }
747  if (notify_) {
748    notify_->onHttpComplete(mode, err);
749  }
750}
751
752//
753// Stream Signals
754//
755
756void
757HttpBase::OnHttpStreamEvent(StreamInterface* stream, int events, int error) {
758  ASSERT(stream == http_stream_);
759  if ((events & SE_OPEN) && (mode_ == HM_CONNECT)) {
760    do_complete();
761    return;
762  }
763
764  if ((events & SE_WRITE) && (mode_ == HM_SEND)) {
765    flush_data();
766    return;
767  }
768
769  if ((events & SE_READ) && (mode_ == HM_RECV)) {
770    if (doc_stream_) {
771      doc_stream_->SignalEvent(doc_stream_, SE_READ, 0);
772    } else {
773      read_and_process_data();
774    }
775    return;
776  }
777
778  if ((events & SE_CLOSE) == 0)
779    return;
780
781  HttpError http_error = HandleStreamClose(error);
782  if (mode_ == HM_RECV) {
783    complete(http_error);
784  } else if (mode_ != HM_NONE) {
785    do_complete(http_error);
786  } else if (notify_) {
787    notify_->onHttpClosed(http_error);
788  }
789}
790
791void
792HttpBase::OnDocumentEvent(StreamInterface* stream, int events, int error) {
793  ASSERT(stream == data_->document.get());
794  if ((events & SE_WRITE) && (mode_ == HM_RECV)) {
795    read_and_process_data();
796    return;
797  }
798
799  if ((events & SE_READ) && (mode_ == HM_SEND)) {
800    flush_data();
801    return;
802  }
803
804  if (events & SE_CLOSE) {
805    LOG_F(LS_ERROR) << "Read error: " << error;
806    do_complete(HE_STREAM);
807    return;
808  }
809}
810
811//
812// HttpParser Implementation
813//
814
815HttpParser::ProcessResult
816HttpBase::ProcessLeader(const char* line, size_t len, HttpError* error) {
817  *error = data_->parseLeader(line, len);
818  return (HE_NONE == *error) ? PR_CONTINUE : PR_COMPLETE;
819}
820
821HttpParser::ProcessResult
822HttpBase::ProcessHeader(const char* name, size_t nlen, const char* value,
823                        size_t vlen, HttpError* error) {
824  std::string sname(name, nlen), svalue(value, vlen);
825  data_->addHeader(sname, svalue);
826  return PR_CONTINUE;
827}
828
829HttpParser::ProcessResult
830HttpBase::ProcessHeaderComplete(bool chunked, size_t& data_size,
831                                HttpError* error) {
832  StreamInterface* old_docstream = doc_stream_;
833  if (notify_) {
834    *error = notify_->onHttpHeaderComplete(chunked, data_size);
835    // The request must not be aborted as a result of this callback.
836    ASSERT(NULL != data_);
837  }
838  if ((HE_NONE == *error) && data_->document) {
839    data_->document->SignalEvent.connect(this, &HttpBase::OnDocumentEvent);
840  }
841  if (HE_NONE != *error) {
842    return PR_COMPLETE;
843  }
844  if (old_docstream != doc_stream_) {
845    // Break out of Process loop, since our I/O model just changed.
846    return PR_BLOCK;
847  }
848  return PR_CONTINUE;
849}
850
851HttpParser::ProcessResult
852HttpBase::ProcessData(const char* data, size_t len, size_t& read,
853                      HttpError* error) {
854  if (ignore_data_ || !data_->document) {
855    read = len;
856    return PR_CONTINUE;
857  }
858  int write_error = 0;
859  switch (data_->document->Write(data, len, &read, &write_error)) {
860  case SR_SUCCESS:
861    return PR_CONTINUE;
862  case SR_BLOCK:
863    return PR_BLOCK;
864  case SR_EOS:
865    LOG_F(LS_ERROR) << "Unexpected EOS";
866    *error = HE_STREAM;
867    return PR_COMPLETE;
868  case SR_ERROR:
869  default:
870    LOG_F(LS_ERROR) << "Write error: " << write_error;
871    *error = HE_STREAM;
872    return PR_COMPLETE;
873  }
874}
875
876void
877HttpBase::OnComplete(HttpError err) {
878  LOG_F(LS_VERBOSE);
879  do_complete(err);
880}
881
882} // namespace rtc
883