1/* 2 * Copyright (c) 2009-2012 Niels Provos and Nick Mathewson 3 * 4 * All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. The name of the author may not be used to endorse or promote products 15 * derived from this software without specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29#include "event2/event-config.h" 30#include "evconfig-private.h" 31 32#ifdef EVENT__HAVE_SYS_TIME_H 33#include <sys/time.h> 34#endif 35 36#include <errno.h> 37#include <stdio.h> 38#include <stdlib.h> 39#include <string.h> 40#ifdef EVENT__HAVE_STDARG_H 41#include <stdarg.h> 42#endif 43#ifdef EVENT__HAVE_UNISTD_H 44#include <unistd.h> 45#endif 46 47#ifdef _WIN32 48#include <winsock2.h> 49#include <ws2tcpip.h> 50#endif 51 52#include <sys/queue.h> 53 54#include "event2/util.h" 55#include "event2/bufferevent.h" 56#include "event2/buffer.h" 57#include "event2/bufferevent_struct.h" 58#include "event2/event.h" 59#include "event2/util.h" 60#include "event-internal.h" 61#include "log-internal.h" 62#include "mm-internal.h" 63#include "bufferevent-internal.h" 64#include "util-internal.h" 65#include "iocp-internal.h" 66 67#ifndef SO_UPDATE_CONNECT_CONTEXT 68/* Mingw is sometimes missing this */ 69#define SO_UPDATE_CONNECT_CONTEXT 0x7010 70#endif 71 72/* prototypes */ 73static int be_async_enable(struct bufferevent *, short); 74static int be_async_disable(struct bufferevent *, short); 75static void be_async_destruct(struct bufferevent *); 76static int be_async_flush(struct bufferevent *, short, enum bufferevent_flush_mode); 77static int be_async_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *); 78 79struct bufferevent_async { 80 struct bufferevent_private bev; 81 struct event_overlapped connect_overlapped; 82 struct event_overlapped read_overlapped; 83 struct event_overlapped write_overlapped; 84 size_t read_in_progress; 85 size_t write_in_progress; 86 unsigned ok : 1; 87 unsigned read_added : 1; 88 unsigned write_added : 1; 89}; 90 91const struct bufferevent_ops bufferevent_ops_async = { 92 "socket_async", 93 evutil_offsetof(struct bufferevent_async, bev.bev), 94 be_async_enable, 95 be_async_disable, 96 NULL, /* Unlink */ 97 be_async_destruct, 98 bufferevent_generic_adj_timeouts_, 99 be_async_flush, 100 be_async_ctrl, 101}; 102 103static inline struct bufferevent_async * 104upcast(struct bufferevent *bev) 105{ 106 struct bufferevent_async *bev_a; 107 if (bev->be_ops != &bufferevent_ops_async) 108 return NULL; 109 bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); 110 return bev_a; 111} 112 113static inline struct bufferevent_async * 114upcast_connect(struct event_overlapped *eo) 115{ 116 struct bufferevent_async *bev_a; 117 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped); 118 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 119 return bev_a; 120} 121 122static inline struct bufferevent_async * 123upcast_read(struct event_overlapped *eo) 124{ 125 struct bufferevent_async *bev_a; 126 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped); 127 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 128 return bev_a; 129} 130 131static inline struct bufferevent_async * 132upcast_write(struct event_overlapped *eo) 133{ 134 struct bufferevent_async *bev_a; 135 bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped); 136 EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); 137 return bev_a; 138} 139 140static void 141bev_async_del_write(struct bufferevent_async *beva) 142{ 143 struct bufferevent *bev = &beva->bev.bev; 144 145 if (beva->write_added) { 146 beva->write_added = 0; 147 event_base_del_virtual_(bev->ev_base); 148 } 149} 150 151static void 152bev_async_del_read(struct bufferevent_async *beva) 153{ 154 struct bufferevent *bev = &beva->bev.bev; 155 156 if (beva->read_added) { 157 beva->read_added = 0; 158 event_base_del_virtual_(bev->ev_base); 159 } 160} 161 162static void 163bev_async_add_write(struct bufferevent_async *beva) 164{ 165 struct bufferevent *bev = &beva->bev.bev; 166 167 if (!beva->write_added) { 168 beva->write_added = 1; 169 event_base_add_virtual_(bev->ev_base); 170 } 171} 172 173static void 174bev_async_add_read(struct bufferevent_async *beva) 175{ 176 struct bufferevent *bev = &beva->bev.bev; 177 178 if (!beva->read_added) { 179 beva->read_added = 1; 180 event_base_add_virtual_(bev->ev_base); 181 } 182} 183 184static void 185bev_async_consider_writing(struct bufferevent_async *beva) 186{ 187 size_t at_most; 188 int limit; 189 struct bufferevent *bev = &beva->bev.bev; 190 191 /* Don't write if there's a write in progress, or we do not 192 * want to write, or when there's nothing left to write. */ 193 if (beva->write_in_progress || beva->bev.connecting) 194 return; 195 if (!beva->ok || !(bev->enabled&EV_WRITE) || 196 !evbuffer_get_length(bev->output)) { 197 bev_async_del_write(beva); 198 return; 199 } 200 201 at_most = evbuffer_get_length(bev->output); 202 203 /* This is safe so long as bufferevent_get_write_max never returns 204 * more than INT_MAX. That's true for now. XXXX */ 205 limit = (int)bufferevent_get_write_max_(&beva->bev); 206 if (at_most >= (size_t)limit && limit >= 0) 207 at_most = limit; 208 209 if (beva->bev.write_suspended) { 210 bev_async_del_write(beva); 211 return; 212 } 213 214 /* XXXX doesn't respect low-water mark very well. */ 215 bufferevent_incref_(bev); 216 if (evbuffer_launch_write_(bev->output, at_most, 217 &beva->write_overlapped)) { 218 bufferevent_decref_(bev); 219 beva->ok = 0; 220 bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); 221 } else { 222 beva->write_in_progress = at_most; 223 bufferevent_decrement_write_buckets_(&beva->bev, at_most); 224 bev_async_add_write(beva); 225 } 226} 227 228static void 229bev_async_consider_reading(struct bufferevent_async *beva) 230{ 231 size_t cur_size; 232 size_t read_high; 233 size_t at_most; 234 int limit; 235 struct bufferevent *bev = &beva->bev.bev; 236 237 /* Don't read if there is a read in progress, or we do not 238 * want to read. */ 239 if (beva->read_in_progress || beva->bev.connecting) 240 return; 241 if (!beva->ok || !(bev->enabled&EV_READ)) { 242 bev_async_del_read(beva); 243 return; 244 } 245 246 /* Don't read if we're full */ 247 cur_size = evbuffer_get_length(bev->input); 248 read_high = bev->wm_read.high; 249 if (read_high) { 250 if (cur_size >= read_high) { 251 bev_async_del_read(beva); 252 return; 253 } 254 at_most = read_high - cur_size; 255 } else { 256 at_most = 16384; /* FIXME totally magic. */ 257 } 258 259 /* XXXX This over-commits. */ 260 /* XXXX see also not above on cast on bufferevent_get_write_max_() */ 261 limit = (int)bufferevent_get_read_max_(&beva->bev); 262 if (at_most >= (size_t)limit && limit >= 0) 263 at_most = limit; 264 265 if (beva->bev.read_suspended) { 266 bev_async_del_read(beva); 267 return; 268 } 269 270 bufferevent_incref_(bev); 271 if (evbuffer_launch_read_(bev->input, at_most, &beva->read_overlapped)) { 272 beva->ok = 0; 273 bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); 274 bufferevent_decref_(bev); 275 } else { 276 beva->read_in_progress = at_most; 277 bufferevent_decrement_read_buckets_(&beva->bev, at_most); 278 bev_async_add_read(beva); 279 } 280 281 return; 282} 283 284static void 285be_async_outbuf_callback(struct evbuffer *buf, 286 const struct evbuffer_cb_info *cbinfo, 287 void *arg) 288{ 289 struct bufferevent *bev = arg; 290 struct bufferevent_async *bev_async = upcast(bev); 291 292 /* If we added data to the outbuf and were not writing before, 293 * we may want to write now. */ 294 295 bufferevent_incref_and_lock_(bev); 296 297 if (cbinfo->n_added) 298 bev_async_consider_writing(bev_async); 299 300 bufferevent_decref_and_unlock_(bev); 301} 302 303static void 304be_async_inbuf_callback(struct evbuffer *buf, 305 const struct evbuffer_cb_info *cbinfo, 306 void *arg) 307{ 308 struct bufferevent *bev = arg; 309 struct bufferevent_async *bev_async = upcast(bev); 310 311 /* If we drained data from the inbuf and were not reading before, 312 * we may want to read now */ 313 314 bufferevent_incref_and_lock_(bev); 315 316 if (cbinfo->n_deleted) 317 bev_async_consider_reading(bev_async); 318 319 bufferevent_decref_and_unlock_(bev); 320} 321 322static int 323be_async_enable(struct bufferevent *buf, short what) 324{ 325 struct bufferevent_async *bev_async = upcast(buf); 326 327 if (!bev_async->ok) 328 return -1; 329 330 if (bev_async->bev.connecting) { 331 /* Don't launch anything during connection attempts. */ 332 return 0; 333 } 334 335 if (what & EV_READ) 336 BEV_RESET_GENERIC_READ_TIMEOUT(buf); 337 if (what & EV_WRITE) 338 BEV_RESET_GENERIC_WRITE_TIMEOUT(buf); 339 340 /* If we newly enable reading or writing, and we aren't reading or 341 writing already, consider launching a new read or write. */ 342 343 if (what & EV_READ) 344 bev_async_consider_reading(bev_async); 345 if (what & EV_WRITE) 346 bev_async_consider_writing(bev_async); 347 return 0; 348} 349 350static int 351be_async_disable(struct bufferevent *bev, short what) 352{ 353 struct bufferevent_async *bev_async = upcast(bev); 354 /* XXXX If we disable reading or writing, we may want to consider 355 * canceling any in-progress read or write operation, though it might 356 * not work. */ 357 358 if (what & EV_READ) { 359 BEV_DEL_GENERIC_READ_TIMEOUT(bev); 360 bev_async_del_read(bev_async); 361 } 362 if (what & EV_WRITE) { 363 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 364 bev_async_del_write(bev_async); 365 } 366 367 return 0; 368} 369 370static void 371be_async_destruct(struct bufferevent *bev) 372{ 373 struct bufferevent_async *bev_async = upcast(bev); 374 struct bufferevent_private *bev_p = BEV_UPCAST(bev); 375 evutil_socket_t fd; 376 377 EVUTIL_ASSERT(!upcast(bev)->write_in_progress && 378 !upcast(bev)->read_in_progress); 379 380 bev_async_del_read(bev_async); 381 bev_async_del_write(bev_async); 382 383 fd = evbuffer_overlapped_get_fd_(bev->input); 384 if (fd != (evutil_socket_t)INVALID_SOCKET && 385 (bev_p->options & BEV_OPT_CLOSE_ON_FREE)) { 386 evutil_closesocket(fd); 387 evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET); 388 } 389} 390 391/* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so 392 * we use WSAGetOverlappedResult to translate. */ 393static void 394bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo) 395{ 396 DWORD bytes, flags; 397 evutil_socket_t fd; 398 399 fd = evbuffer_overlapped_get_fd_(bev->input); 400 WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags); 401} 402 403static int 404be_async_flush(struct bufferevent *bev, short what, 405 enum bufferevent_flush_mode mode) 406{ 407 return 0; 408} 409 410static void 411connect_complete(struct event_overlapped *eo, ev_uintptr_t key, 412 ev_ssize_t nbytes, int ok) 413{ 414 struct bufferevent_async *bev_a = upcast_connect(eo); 415 struct bufferevent *bev = &bev_a->bev.bev; 416 evutil_socket_t sock; 417 418 BEV_LOCK(bev); 419 420 EVUTIL_ASSERT(bev_a->bev.connecting); 421 bev_a->bev.connecting = 0; 422 sock = evbuffer_overlapped_get_fd_(bev_a->bev.bev.input); 423 /* XXXX Handle error? */ 424 setsockopt(sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); 425 426 if (ok) 427 bufferevent_async_set_connected_(bev); 428 else 429 bev_async_set_wsa_error(bev, eo); 430 431 bufferevent_run_eventcb_(bev, 432 ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR, 0); 433 434 event_base_del_virtual_(bev->ev_base); 435 436 bufferevent_decref_and_unlock_(bev); 437} 438 439static void 440read_complete(struct event_overlapped *eo, ev_uintptr_t key, 441 ev_ssize_t nbytes, int ok) 442{ 443 struct bufferevent_async *bev_a = upcast_read(eo); 444 struct bufferevent *bev = &bev_a->bev.bev; 445 short what = BEV_EVENT_READING; 446 ev_ssize_t amount_unread; 447 BEV_LOCK(bev); 448 EVUTIL_ASSERT(bev_a->read_in_progress); 449 450 amount_unread = bev_a->read_in_progress - nbytes; 451 evbuffer_commit_read_(bev->input, nbytes); 452 bev_a->read_in_progress = 0; 453 if (amount_unread) 454 bufferevent_decrement_read_buckets_(&bev_a->bev, -amount_unread); 455 456 if (!ok) 457 bev_async_set_wsa_error(bev, eo); 458 459 if (bev_a->ok) { 460 if (ok && nbytes) { 461 BEV_RESET_GENERIC_READ_TIMEOUT(bev); 462 bufferevent_trigger_nolock_(bev, EV_READ, 0); 463 bev_async_consider_reading(bev_a); 464 } else if (!ok) { 465 what |= BEV_EVENT_ERROR; 466 bev_a->ok = 0; 467 bufferevent_run_eventcb_(bev, what, 0); 468 } else if (!nbytes) { 469 what |= BEV_EVENT_EOF; 470 bev_a->ok = 0; 471 bufferevent_run_eventcb_(bev, what, 0); 472 } 473 } 474 475 bufferevent_decref_and_unlock_(bev); 476} 477 478static void 479write_complete(struct event_overlapped *eo, ev_uintptr_t key, 480 ev_ssize_t nbytes, int ok) 481{ 482 struct bufferevent_async *bev_a = upcast_write(eo); 483 struct bufferevent *bev = &bev_a->bev.bev; 484 short what = BEV_EVENT_WRITING; 485 ev_ssize_t amount_unwritten; 486 487 BEV_LOCK(bev); 488 EVUTIL_ASSERT(bev_a->write_in_progress); 489 490 amount_unwritten = bev_a->write_in_progress - nbytes; 491 evbuffer_commit_write_(bev->output, nbytes); 492 bev_a->write_in_progress = 0; 493 494 if (amount_unwritten) 495 bufferevent_decrement_write_buckets_(&bev_a->bev, 496 -amount_unwritten); 497 498 499 if (!ok) 500 bev_async_set_wsa_error(bev, eo); 501 502 if (bev_a->ok) { 503 if (ok && nbytes) { 504 BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); 505 bufferevent_trigger_nolock_(bev, EV_WRITE, 0); 506 bev_async_consider_writing(bev_a); 507 } else if (!ok) { 508 what |= BEV_EVENT_ERROR; 509 bev_a->ok = 0; 510 bufferevent_run_eventcb_(bev, what, 0); 511 } else if (!nbytes) { 512 what |= BEV_EVENT_EOF; 513 bev_a->ok = 0; 514 bufferevent_run_eventcb_(bev, what, 0); 515 } 516 } 517 518 bufferevent_decref_and_unlock_(bev); 519} 520 521struct bufferevent * 522bufferevent_async_new_(struct event_base *base, 523 evutil_socket_t fd, int options) 524{ 525 struct bufferevent_async *bev_a; 526 struct bufferevent *bev; 527 struct event_iocp_port *iocp; 528 529 options |= BEV_OPT_THREADSAFE; 530 531 if (!(iocp = event_base_get_iocp_(base))) 532 return NULL; 533 534 if (fd >= 0 && event_iocp_port_associate_(iocp, fd, 1)<0) { 535 int err = GetLastError(); 536 /* We may have alrady associated this fd with a port. 537 * Let's hope it's this port, and that the error code 538 * for doing this neer changes. */ 539 if (err != ERROR_INVALID_PARAMETER) 540 return NULL; 541 } 542 543 if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async)))) 544 return NULL; 545 546 bev = &bev_a->bev.bev; 547 if (!(bev->input = evbuffer_overlapped_new_(fd))) { 548 mm_free(bev_a); 549 return NULL; 550 } 551 if (!(bev->output = evbuffer_overlapped_new_(fd))) { 552 evbuffer_free(bev->input); 553 mm_free(bev_a); 554 return NULL; 555 } 556 557 if (bufferevent_init_common_(&bev_a->bev, base, &bufferevent_ops_async, 558 options)<0) 559 goto err; 560 561 evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); 562 evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev); 563 564 event_overlapped_init_(&bev_a->connect_overlapped, connect_complete); 565 event_overlapped_init_(&bev_a->read_overlapped, read_complete); 566 event_overlapped_init_(&bev_a->write_overlapped, write_complete); 567 568 bufferevent_init_generic_timeout_cbs_(bev); 569 570 bev_a->ok = fd >= 0; 571 572 return bev; 573err: 574 bufferevent_free(&bev_a->bev.bev); 575 return NULL; 576} 577 578void 579bufferevent_async_set_connected_(struct bufferevent *bev) 580{ 581 struct bufferevent_async *bev_async = upcast(bev); 582 bev_async->ok = 1; 583 bufferevent_init_generic_timeout_cbs_(bev); 584 /* Now's a good time to consider reading/writing */ 585 be_async_enable(bev, bev->enabled); 586} 587 588int 589bufferevent_async_can_connect_(struct bufferevent *bev) 590{ 591 const struct win32_extension_fns *ext = 592 event_get_win32_extension_fns_(); 593 594 if (BEV_IS_ASYNC(bev) && 595 event_base_get_iocp_(bev->ev_base) && 596 ext && ext->ConnectEx) 597 return 1; 598 599 return 0; 600} 601 602int 603bufferevent_async_connect_(struct bufferevent *bev, evutil_socket_t fd, 604 const struct sockaddr *sa, int socklen) 605{ 606 BOOL rc; 607 struct bufferevent_async *bev_async = upcast(bev); 608 struct sockaddr_storage ss; 609 const struct win32_extension_fns *ext = 610 event_get_win32_extension_fns_(); 611 612 EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL); 613 614 /* ConnectEx() requires that the socket be bound to an address 615 * with bind() before using, otherwise it will fail. We attempt 616 * to issue a bind() here, taking into account that the error 617 * code is set to WSAEINVAL when the socket is already bound. */ 618 memset(&ss, 0, sizeof(ss)); 619 if (sa->sa_family == AF_INET) { 620 struct sockaddr_in *sin = (struct sockaddr_in *)&ss; 621 sin->sin_family = AF_INET; 622 sin->sin_addr.s_addr = INADDR_ANY; 623 } else if (sa->sa_family == AF_INET6) { 624 struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss; 625 sin6->sin6_family = AF_INET6; 626 sin6->sin6_addr = in6addr_any; 627 } else { 628 /* Well, the user will have to bind() */ 629 return -1; 630 } 631 if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 && 632 WSAGetLastError() != WSAEINVAL) 633 return -1; 634 635 event_base_add_virtual_(bev->ev_base); 636 bufferevent_incref_(bev); 637 rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL, 638 &bev_async->connect_overlapped.overlapped); 639 if (rc || WSAGetLastError() == ERROR_IO_PENDING) 640 return 0; 641 642 event_base_del_virtual_(bev->ev_base); 643 bufferevent_decref_(bev); 644 645 return -1; 646} 647 648static int 649be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op, 650 union bufferevent_ctrl_data *data) 651{ 652 switch (op) { 653 case BEV_CTRL_GET_FD: 654 data->fd = evbuffer_overlapped_get_fd_(bev->input); 655 return 0; 656 case BEV_CTRL_SET_FD: { 657 struct event_iocp_port *iocp; 658 659 if (data->fd == evbuffer_overlapped_get_fd_(bev->input)) 660 return 0; 661 if (!(iocp = event_base_get_iocp_(bev->ev_base))) 662 return -1; 663 if (event_iocp_port_associate_(iocp, data->fd, 1) < 0) 664 return -1; 665 evbuffer_overlapped_set_fd_(bev->input, data->fd); 666 evbuffer_overlapped_set_fd_(bev->output, data->fd); 667 return 0; 668 } 669 case BEV_CTRL_CANCEL_ALL: { 670 struct bufferevent_async *bev_a = upcast(bev); 671 evutil_socket_t fd = evbuffer_overlapped_get_fd_(bev->input); 672 if (fd != (evutil_socket_t)INVALID_SOCKET && 673 (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) { 674 closesocket(fd); 675 evbuffer_overlapped_set_fd_(bev->input, INVALID_SOCKET); 676 } 677 bev_a->ok = 0; 678 return 0; 679 } 680 case BEV_CTRL_GET_UNDERLYING: 681 default: 682 return -1; 683 } 684} 685 686 687