1/*
2 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 *    notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 *    notice, this list of conditions and the following disclaimer in the
12 *    documentation and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 *    derived from this software without specific prior written permission.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28#include <sys/types.h>
29
30#ifdef HAVE_CONFIG_H
31#include "config.h"
32#endif
33
34#ifdef HAVE_SYS_TIME_H
35#include <sys/time.h>
36#endif
37
38#include <errno.h>
39#include <stdio.h>
40#include <stdlib.h>
41#include <string.h>
42#ifdef HAVE_STDARG_H
43#include <stdarg.h>
44#endif
45
46#ifdef WIN32
47#include <winsock2.h>
48#endif
49
50#include "evutil.h"
51#include "event.h"
52
53/* prototypes */
54
55void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
56
57static int
58bufferevent_add(struct event *ev, int timeout)
59{
60	struct timeval tv, *ptv = NULL;
61
62	if (timeout) {
63		evutil_timerclear(&tv);
64		tv.tv_sec = timeout;
65		ptv = &tv;
66	}
67
68	return (event_add(ev, ptv));
69}
70
71/*
72 * This callback is executed when the size of the input buffer changes.
73 * We use it to apply back pressure on the reading side.
74 */
75
76void
77bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
78    void *arg) {
79	struct bufferevent *bufev = arg;
80	/*
81	 * If we are below the watermark then reschedule reading if it's
82	 * still enabled.
83	 */
84	if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
85		evbuffer_setcb(buf, NULL, NULL);
86
87		if (bufev->enabled & EV_READ)
88			bufferevent_add(&bufev->ev_read, bufev->timeout_read);
89	}
90}
91
92static void
93bufferevent_readcb(int fd, short event, void *arg)
94{
95	struct bufferevent *bufev = arg;
96	int res = 0;
97	short what = EVBUFFER_READ;
98	size_t len;
99	int howmuch = -1;
100
101	if (event == EV_TIMEOUT) {
102		what |= EVBUFFER_TIMEOUT;
103		goto error;
104	}
105
106	/*
107	 * If we have a high watermark configured then we don't want to
108	 * read more data than would make us reach the watermark.
109	 */
110	if (bufev->wm_read.high != 0) {
111		howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
112		/* we might have lowered the watermark, stop reading */
113		if (howmuch <= 0) {
114			struct evbuffer *buf = bufev->input;
115			event_del(&bufev->ev_read);
116			evbuffer_setcb(buf,
117			    bufferevent_read_pressure_cb, bufev);
118			return;
119		}
120	}
121
122	res = evbuffer_read(bufev->input, fd, howmuch);
123	if (res == -1) {
124		if (errno == EAGAIN || errno == EINTR)
125			goto reschedule;
126		/* error case */
127		what |= EVBUFFER_ERROR;
128	} else if (res == 0) {
129		/* eof case */
130		what |= EVBUFFER_EOF;
131	}
132
133	if (res <= 0)
134		goto error;
135
136	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
137
138	/* See if this callbacks meets the water marks */
139	len = EVBUFFER_LENGTH(bufev->input);
140	if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
141		return;
142	if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
143		struct evbuffer *buf = bufev->input;
144		event_del(&bufev->ev_read);
145
146		/* Now schedule a callback for us when the buffer changes */
147		evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
148	}
149
150	/* Invoke the user callback - must always be called last */
151	if (bufev->readcb != NULL)
152		(*bufev->readcb)(bufev, bufev->cbarg);
153	return;
154
155 reschedule:
156	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
157	return;
158
159 error:
160	(*bufev->errorcb)(bufev, what, bufev->cbarg);
161}
162
163static void
164bufferevent_writecb(int fd, short event, void *arg)
165{
166	struct bufferevent *bufev = arg;
167	int res = 0;
168	short what = EVBUFFER_WRITE;
169
170	if (event == EV_TIMEOUT) {
171		what |= EVBUFFER_TIMEOUT;
172		goto error;
173	}
174
175	if (EVBUFFER_LENGTH(bufev->output)) {
176	    res = evbuffer_write(bufev->output, fd);
177	    if (res == -1) {
178#ifndef WIN32
179/*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
180 *set errno. thus this error checking is not portable*/
181		    if (errno == EAGAIN ||
182			errno == EINTR ||
183			errno == EINPROGRESS)
184			    goto reschedule;
185		    /* error case */
186		    what |= EVBUFFER_ERROR;
187
188#else
189				goto reschedule;
190#endif
191
192	    } else if (res == 0) {
193		    /* eof case */
194		    what |= EVBUFFER_EOF;
195	    }
196	    if (res <= 0)
197		    goto error;
198	}
199
200	if (EVBUFFER_LENGTH(bufev->output) != 0)
201		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
202
203	/*
204	 * Invoke the user callback if our buffer is drained or below the
205	 * low watermark.
206	 */
207	if (bufev->writecb != NULL &&
208	    EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
209		(*bufev->writecb)(bufev, bufev->cbarg);
210
211	return;
212
213 reschedule:
214	if (EVBUFFER_LENGTH(bufev->output) != 0)
215		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
216	return;
217
218 error:
219	(*bufev->errorcb)(bufev, what, bufev->cbarg);
220}
221
222/*
223 * Create a new buffered event object.
224 *
225 * The read callback is invoked whenever we read new data.
226 * The write callback is invoked whenever the output buffer is drained.
227 * The error callback is invoked on a write/read error or on EOF.
228 *
229 * Both read and write callbacks maybe NULL.  The error callback is not
230 * allowed to be NULL and have to be provided always.
231 */
232
233struct bufferevent *
234bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
235    everrorcb errorcb, void *cbarg)
236{
237	struct bufferevent *bufev;
238
239	if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
240		return (NULL);
241
242	if ((bufev->input = evbuffer_new()) == NULL) {
243		free(bufev);
244		return (NULL);
245	}
246
247	if ((bufev->output = evbuffer_new()) == NULL) {
248		evbuffer_free(bufev->input);
249		free(bufev);
250		return (NULL);
251	}
252
253	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
254	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
255
256	bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
257
258	/*
259	 * Set to EV_WRITE so that using bufferevent_write is going to
260	 * trigger a callback.  Reading needs to be explicitly enabled
261	 * because otherwise no data will be available.
262	 */
263	bufev->enabled = EV_WRITE;
264
265	return (bufev);
266}
267
268void
269bufferevent_setcb(struct bufferevent *bufev,
270    evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
271{
272	bufev->readcb = readcb;
273	bufev->writecb = writecb;
274	bufev->errorcb = errorcb;
275
276	bufev->cbarg = cbarg;
277}
278
279void
280bufferevent_setfd(struct bufferevent *bufev, int fd)
281{
282	event_del(&bufev->ev_read);
283	event_del(&bufev->ev_write);
284
285	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
286	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
287	if (bufev->ev_base != NULL) {
288		event_base_set(bufev->ev_base, &bufev->ev_read);
289		event_base_set(bufev->ev_base, &bufev->ev_write);
290	}
291
292	/* might have to manually trigger event registration */
293}
294
295int
296bufferevent_priority_set(struct bufferevent *bufev, int priority)
297{
298	if (event_priority_set(&bufev->ev_read, priority) == -1)
299		return (-1);
300	if (event_priority_set(&bufev->ev_write, priority) == -1)
301		return (-1);
302
303	return (0);
304}
305
306/* Closing the file descriptor is the responsibility of the caller */
307
308void
309bufferevent_free(struct bufferevent *bufev)
310{
311	event_del(&bufev->ev_read);
312	event_del(&bufev->ev_write);
313
314	evbuffer_free(bufev->input);
315	evbuffer_free(bufev->output);
316
317	free(bufev);
318}
319
320/*
321 * Returns 0 on success;
322 *        -1 on failure.
323 */
324
325int
326bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
327{
328	int res;
329
330	res = evbuffer_add(bufev->output, data, size);
331
332	if (res == -1)
333		return (res);
334
335	/* If everything is okay, we need to schedule a write */
336	if (size > 0 && (bufev->enabled & EV_WRITE))
337		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
338
339	return (res);
340}
341
342int
343bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
344{
345	int res;
346
347	res = bufferevent_write(bufev, buf->buffer, buf->off);
348	if (res != -1)
349		evbuffer_drain(buf, buf->off);
350
351	return (res);
352}
353
354size_t
355bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
356{
357	struct evbuffer *buf = bufev->input;
358
359	if (buf->off < size)
360		size = buf->off;
361
362	/* Copy the available data to the user buffer */
363	memcpy(data, buf->buffer, size);
364
365	if (size)
366		evbuffer_drain(buf, size);
367
368	return (size);
369}
370
371int
372bufferevent_enable(struct bufferevent *bufev, short event)
373{
374	if (event & EV_READ) {
375		if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
376			return (-1);
377	}
378	if (event & EV_WRITE) {
379		if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
380			return (-1);
381	}
382
383	bufev->enabled |= event;
384	return (0);
385}
386
387int
388bufferevent_disable(struct bufferevent *bufev, short event)
389{
390	if (event & EV_READ) {
391		if (event_del(&bufev->ev_read) == -1)
392			return (-1);
393	}
394	if (event & EV_WRITE) {
395		if (event_del(&bufev->ev_write) == -1)
396			return (-1);
397	}
398
399	bufev->enabled &= ~event;
400	return (0);
401}
402
403/*
404 * Sets the read and write timeout for a buffered event.
405 */
406
407void
408bufferevent_settimeout(struct bufferevent *bufev,
409    int timeout_read, int timeout_write) {
410	bufev->timeout_read = timeout_read;
411	bufev->timeout_write = timeout_write;
412
413	if (event_pending(&bufev->ev_read, EV_READ, NULL))
414		bufferevent_add(&bufev->ev_read, timeout_read);
415	if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
416		bufferevent_add(&bufev->ev_write, timeout_write);
417}
418
419/*
420 * Sets the water marks
421 */
422
423void
424bufferevent_setwatermark(struct bufferevent *bufev, short events,
425    size_t lowmark, size_t highmark)
426{
427	if (events & EV_READ) {
428		bufev->wm_read.low = lowmark;
429		bufev->wm_read.high = highmark;
430	}
431
432	if (events & EV_WRITE) {
433		bufev->wm_write.low = lowmark;
434		bufev->wm_write.high = highmark;
435	}
436
437	/* If the watermarks changed then see if we should call read again */
438	bufferevent_read_pressure_cb(bufev->input,
439	    0, EVBUFFER_LENGTH(bufev->input), bufev);
440}
441
442int
443bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
444{
445	int res;
446
447	bufev->ev_base = base;
448
449	res = event_base_set(base, &bufev->ev_read);
450	if (res == -1)
451		return (res);
452
453	res = event_base_set(base, &bufev->ev_write);
454	return (res);
455}
456