+Changes in version 2.0.14-stable (?? ??? 2011)
+BUGFIXES (bufferevents and evbuffers):
+ o Propagate errors on the underlying bufferevent to the user. (4a34394 Joachim Bauch)
+ o Ignore OpenSSL deprecation warnings on OS X (5d1b255 Sebastian Hahn)
+ o Fix handling of group rate limits under 64 bytes of burst (6d5440e)
+ o Solaris sendfile: correctly detect amount of data sent (643922e Michael Herf)
+ o Make rate limiting work with common_timeout logic (5b18f13)
+
+BUGFIXES (IOCP):
+ o IOCP: don't launch reads or writes on an unconnected socket (495c227)
+ o Make IOCP rate-limiting group support stricter and less surprising. (a98da7b)
+ o Have test-ratelim.c support IOCP (0ff2c5a)
+ o Make overlapped reads result in evbuffer callbacks getting invoked (6acfbdd)
+ o Correctly terminate IO on an async bufferevent on bufferevent_free (e6af35d)
+
+BUGFIXES (other):
+ o Fix evsig_dealloc memory leak with debugging turned on. (9b724b2 Leonid Evdokimov)
+ o Fix request_finished memory leak with debugging turned on. (aff6ba1 Leonid Evdokimov)
+
+BUILD AND TESTING FIXES:
+ o Allow OS-neutral builds for platforms where some versions have arc4random_buf (b442302 Mitchell Livingston)
+ o Try to fix 'make distcheck' errors when building out-of-tree (04656ea Dave Hart)
+ o Clean up some problems identified by Coverity. (7c11e51 Harlan Stenn)
+
+
Changes in version 2.0.13-stable (18 Jul 2011)
BUGFIXES
o Avoid race-condition when initializing global locks (b683cae)
libevent_openssl_la_SOURCES = bufferevent_openssl.c
libevent_openssl_la_LIBADD = $(MAYBE_CORE) $(OPENSSL_LIBS)
libevent_openssl_la_LDFLAGS = $(GENERIC_LDFLAGS)
+libevent_openssl_la_CPPFLAGS = $(AM_CPPFLAGS) $(OPENSSL_INCS)
endif
noinst_HEADERS = util-internal.h mm-internal.h ipv6-internal.h \
}
}
-static inline void
+void
evbuffer_invoke_callbacks(struct evbuffer *buffer)
{
if (TAILQ_EMPTY(&buffer->callbacks)) {
}
return (res);
#elif defined(SENDFILE_IS_SOLARIS)
- res = sendfile(dest_fd, source_fd, &offset, chain->off);
- if (res == -1 && EVUTIL_ERR_RW_RETRIABLE(errno)) {
- /* if this is EAGAIN or EINTR return 0; otherwise, -1 */
- return (0);
+ {
+ const off_t offset_orig = offset;
+ res = sendfile(dest_fd, source_fd, &offset, chain->off);
+ if (res == -1 && EVUTIL_ERR_RW_RETRIABLE(errno)) {
+ if (offset - offset_orig)
+ return offset - offset_orig;
+ /* if this is EAGAIN or EINTR and no bytes were
+ * written, return 0 */
+ return (0);
+ }
+ return (res);
}
- return (res);
#endif
}
#endif
buf->read_in_progress = 0;
evbuf->total_len += nBytes;
+ evbuf->n_add_for_cb += nBytes;
+
+ evbuffer_invoke_callbacks(evbuf);
_evbuffer_decref_and_unlock(evbuf);
}
/** The smallest number of bytes that any member of the group should
* be limited to read or write at a time. */
ev_ssize_t min_share;
+ ev_ssize_t configured_min_share;
+
/** Timeout event that goes off once a tick, when the bucket is ready
* to refill. */
struct event master_refill_event;
enum bufferevent_ctrl_op {
BEV_CTRL_SET_FD,
BEV_CTRL_GET_FD,
- BEV_CTRL_GET_UNDERLYING
+ BEV_CTRL_GET_UNDERLYING,
+ BEV_CTRL_CANCEL_ALL
};
/** Possible data types for a control callback */
#include "evbuffer-internal.h"
#include "util-internal.h"
+static void _bufferevent_cancel_all(struct bufferevent *bev);
+
+
void
bufferevent_suspend_read(struct bufferevent *bufev, bufferevent_suspend_flags what)
{
{
BEV_LOCK(bufev);
bufferevent_setcb(bufev, NULL, NULL, NULL, NULL);
+ _bufferevent_cancel_all(bufev);
_bufferevent_decref_and_unlock(bufev);
}
return (res<0) ? -1 : d.fd;
}
+static void
+_bufferevent_cancel_all(struct bufferevent *bev)
+{
+ union bufferevent_ctrl_data d;
+ memset(&d, 0, sizeof(d));
+ BEV_LOCK(bev);
+ if (bev->be_ops->ctrl)
+ bev->be_ops->ctrl(bev, BEV_CTRL_CANCEL_ALL, &d);
+ BEV_UNLOCK(bev);
+}
+
short
bufferevent_get_enabled(struct bufferevent *bufev)
{
struct event_overlapped connect_overlapped;
struct event_overlapped read_overlapped;
struct event_overlapped write_overlapped;
- unsigned read_in_progress : 1;
- unsigned write_in_progress : 1;
+ size_t read_in_progress;
+ size_t write_in_progress;
unsigned ok : 1;
unsigned read_added : 1;
unsigned write_added : 1;
/* Don't write if there's a write in progress, or we do not
* want to write, or when there's nothing left to write. */
- if (beva->write_in_progress)
+ if (beva->write_in_progress || beva->bev.connecting)
return;
if (!beva->ok || !(bev->enabled&EV_WRITE) ||
!evbuffer_get_length(bev->output)) {
at_most = evbuffer_get_length(bev->output);
- /* XXXX This over-commits. */
/* This is safe so long as bufferevent_get_write_max never returns
* more than INT_MAX. That's true for now. XXXX */
limit = (int)_bufferevent_get_write_max(&beva->bev);
beva->ok = 0;
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
} else {
- beva->write_in_progress = 1;
+ beva->write_in_progress = at_most;
+ _bufferevent_decrement_write_buckets(&beva->bev, at_most);
bev_async_add_write(beva);
}
}
/* Don't read if there is a read in progress, or we do not
* want to read. */
- if (beva->read_in_progress)
+ if (beva->read_in_progress || beva->bev.connecting)
return;
if (!beva->ok || !(bev->enabled&EV_READ)) {
bev_async_del_read(beva);
bufferevent_incref(bev);
if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) {
beva->ok = 0;
- bufferevent_decref(bev);
_bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
+ bufferevent_decref(bev);
} else {
- beva->read_in_progress = 1;
+ beva->read_in_progress = at_most;
+ _bufferevent_decrement_read_buckets(&beva->bev, at_most);
bev_async_add_read(beva);
}
if (!bev_async->ok)
return -1;
- /* NOTE: This interferes with non-blocking connect */
+ if (bev_async->bev.connecting) {
+ /* Don't launch anything during connection attempts. */
+ return 0;
+ }
+
if (what & EV_READ)
BEV_RESET_GENERIC_READ_TIMEOUT(buf);
if (what & EV_WRITE)
bev_async_del_write(bev_async);
fd = _evbuffer_overlapped_get_fd(bev->input);
- if (bev_p->options & BEV_OPT_CLOSE_ON_FREE)
+ if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) {
+ /* XXXX possible double-close */
evutil_closesocket(fd);
+ }
/* delete this in case non-blocking connect was used */
if (event_initialized(&bev->ev_write)) {
event_del(&bev->ev_write);
struct bufferevent_async *bev_a = upcast_read(eo);
struct bufferevent *bev = &bev_a->bev.bev;
short what = BEV_EVENT_READING;
-
+ ev_ssize_t amount_unread;
BEV_LOCK(bev);
EVUTIL_ASSERT(bev_a->read_in_progress);
+ amount_unread = bev_a->read_in_progress - nbytes;
evbuffer_commit_read(bev->input, nbytes);
bev_a->read_in_progress = 0;
+ if (amount_unread)
+ _bufferevent_decrement_read_buckets(&bev_a->bev, -amount_unread);
if (!ok)
bev_async_set_wsa_error(bev, eo);
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_READ_TIMEOUT(bev);
- _bufferevent_decrement_read_buckets(&bev_a->bev,
- nbytes);
if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
_bufferevent_run_readcb(bev);
bev_async_consider_reading(bev_a);
struct bufferevent_async *bev_a = upcast_write(eo);
struct bufferevent *bev = &bev_a->bev.bev;
short what = BEV_EVENT_WRITING;
+ ev_ssize_t amount_unwritten;
BEV_LOCK(bev);
EVUTIL_ASSERT(bev_a->write_in_progress);
+
+ amount_unwritten = bev_a->write_in_progress - nbytes;
evbuffer_commit_write(bev->output, nbytes);
bev_a->write_in_progress = 0;
+ if (amount_unwritten)
+ _bufferevent_decrement_write_buckets(&bev_a->bev,
+ -amount_unwritten);
+
+
if (!ok)
bev_async_set_wsa_error(bev, eo);
if (bev_a->ok) {
if (ok && nbytes) {
BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
- _bufferevent_decrement_write_buckets(&bev_a->bev,
- nbytes);
if (evbuffer_get_length(bev->output) <=
bev->wm_write.low)
_bufferevent_run_writecb(bev);
_evbuffer_overlapped_set_fd(bev->output, data->fd);
return 0;
}
+ case BEV_CTRL_CANCEL_ALL: {
+ struct bufferevent_async *bev_a = upcast(bev);
+ evutil_socket_t fd = _evbuffer_overlapped_get_fd(bev->input);
+ if (fd != (evutil_socket_t)INVALID_SOCKET &&
+ (bev_a->bev.options & BEV_OPT_CLOSE_ON_FREE)) {
+ closesocket(fd);
+ }
+ bev_a->ok = 0;
+ return 0;
+ }
case BEV_CTRL_GET_UNDERLYING:
default:
return -1;
}
}
+
+
return 0;
case BEV_CTRL_GET_FD:
case BEV_CTRL_SET_FD:
+ case BEV_CTRL_CANCEL_ALL:
default:
return -1;
}
} else if (what & BEV_EVENT_TIMEOUT) {
/* We sure didn't set this. Propagate it to the user. */
event = what;
+ } else if (what & BEV_EVENT_ERROR) {
+ /* An error occurred on the connection. Propagate it to the user. */
+ event = what;
} else if (what & BEV_EVENT_CONNECTED) {
/* Ignore it. We're saying SSL_connect() already, which will
eat it. */
return -1;
data->ptr = bev_ssl->underlying;
return 0;
+ case BEV_CTRL_CANCEL_ALL:
default:
return -1;
}
}
if (underlying) {
+ bufferevent_setwatermark(underlying, EV_READ, 0, 0);
bufferevent_enable(underlying, EV_READ|EV_WRITE);
if (state == BUFFEREVENT_SSL_OPEN)
bufferevent_suspend_read(underlying,
#include "bufferevent-internal.h"
#include "mm-internal.h"
#include "util-internal.h"
+#include "event-internal.h"
int
ev_token_bucket_init(struct ev_token_bucket *bucket,
r->read_maximum = read_burst;
r->write_maximum = write_burst;
memcpy(&r->tick_timeout, tick_len, sizeof(struct timeval));
- r->msec_per_tick = (tick_len->tv_sec * 1000) + tick_len->tv_usec/1000;
+ r->msec_per_tick = (tick_len->tv_sec * 1000) +
+ (tick_len->tv_usec & COMMON_TIMEOUT_MICROSECONDS_MASK)/1000;
return r;
}
static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g);
static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g);
+static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g);
+static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g);
/** Helper: figure out the maximum amount we should write if is_write, or
the maximum amount we should read if is_read. Return that maximum, or
if (event_add(&bev->rate_limiting->refill_bucket_event,
&bev->rate_limiting->cfg->tick_timeout) < 0)
r = -1;
+ } else if (bev->read_suspended & BEV_SUSPEND_BW) {
+ if (!(bev->write_suspended & BEV_SUSPEND_BW))
+ event_del(&bev->rate_limiting->refill_bucket_event);
+ bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW);
}
}
bev->rate_limiting->group->total_read += bytes;
if (bev->rate_limiting->group->rate_limit.read_limit <= 0) {
_bev_group_suspend_reading(bev->rate_limiting->group);
+ } else if (bev->rate_limiting->group->read_suspended) {
+ _bev_group_unsuspend_reading(bev->rate_limiting->group);
}
UNLOCK_GROUP(bev->rate_limiting->group);
}
if (event_add(&bev->rate_limiting->refill_bucket_event,
&bev->rate_limiting->cfg->tick_timeout) < 0)
r = -1;
+ } else if (bev->write_suspended & BEV_SUSPEND_BW) {
+ if (!(bev->read_suspended & BEV_SUSPEND_BW))
+ event_del(&bev->rate_limiting->refill_bucket_event);
+ bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW);
}
}
bev->rate_limiting->group->total_written += bytes;
if (bev->rate_limiting->group->rate_limit.write_limit <= 0) {
_bev_group_suspend_writing(bev->rate_limiting->group);
+ } else if (bev->rate_limiting->group->write_suspended) {
+ _bev_group_unsuspend_writing(bev->rate_limiting->group);
}
UNLOCK_GROUP(bev->rate_limiting->group);
}
event_base_gettimeofday_cached(event_get_base(&g->master_refill_event), &now);
LOCK_GROUP(g);
+
tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg);
ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick);
ev_token_bucket_init(&g->rate_limit, cfg, tick, 0);
- g->min_share = 64;
event_assign(&g->master_refill_event, base, -1, EV_PERSIST,
_bev_group_refill_callback, g);
/*XXXX handle event_add failure */
event_add(&g->master_refill_event, &cfg->tick_timeout);
EVTHREAD_ALLOC_LOCK(g->lock, EVTHREAD_LOCKTYPE_RECURSIVE);
+
+ bufferevent_rate_limit_group_set_min_share(g, 64);
+
return g;
}
event_add(&g->master_refill_event, &cfg->tick_timeout);
}
+ /* The new limits might force us to adjust min_share differently. */
+ bufferevent_rate_limit_group_set_min_share(g, g->configured_min_share);
+
UNLOCK_GROUP(g);
return 0;
}
if (share > EV_SSIZE_MAX)
return -1;
+ g->configured_min_share = share;
+
+ /* Can't set share to less than the one-tick maximum. IOW, at steady
+ * state, at least one connection can go per tick. */
+ if (share > g->rate_limit_cfg.read_rate)
+ share = g->rate_limit_cfg.read_rate;
+ if (share > g->rate_limit_cfg.write_rate)
+ share = g->rate_limit_cfg.write_rate;
+
g->min_share = share;
return 0;
}
!bufev_p->write_suspended) {
/* Somebody added data to the buffer, and we would like to
* write, and we were not writing. So, start writing. */
- be_socket_add(&bufev->ev_write, &bufev->timeout_write);
- /* XXXX handle failure from be_socket_add */
+ if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) == -1) {
+ // Should we log this?
+ }
}
}
data->fd = event_get_fd(&bev->ev_read);
return 0;
case BEV_CTRL_GET_UNDERLYING:
+ case BEV_CTRL_CANCEL_ALL:
default:
return -1;
}
CFLAGS="$CFLAGS -fno-strict-aliasing"
fi
+# OS X Lion started deprecating the system openssl. Let's just disable
+# all deprecation warnings on OS X.
+case "$host_os" in
+
+ darwin*)
+ CFLAGS="$CFLAGS -Wno-deprecated-declarations"
+ ;;
+esac
+
AC_ARG_ENABLE(gcc-warnings,
AS_HELP_STRING(--enable-gcc-warnings, enable verbose warnings with GCC))
AC_ARG_ENABLE(thread-support,
AC_ARG_ENABLE([function-sections],
AS_HELP_STRING([--enable-function-sections, make static library allow smaller binaries with --gc-sections]),
[], [enable_function_sections=no])
+AC_ARG_ENABLE(event-debugging,
+ AS_HELP_STRING([--enable-verbose-debug, verbose debug logging]),
+ [], [enable_verbose_debug=no])
AC_PROG_LIBTOOL
AC_SUBST(EV_LIB_WS32)
AC_SUBST(EV_LIB_GDI)
-AC_CHECK_HEADERS([openssl/bio.h])
-
-if test "$enable_openssl" = "yes"; then
-save_LIBS="$LIBS"
-LIBS=""
-OPENSSL_LIBS=""
-have_openssl=no
-AC_SEARCH_LIBS([SSL_new], [ssl],
- [have_openssl=yes
- OPENSSL_LIBS="$LIBS -lcrypto $EV_LIB_GDI $EV_LIB_WS32"
- AC_DEFINE(HAVE_OPENSSL, 1, [Define if the system has openssl])],
- [have_openssl=no],
- [-lcrypto $EV_LIB_GDI $EV_LIB_WS32])
-LIBS="$save_LIBS"
-AC_SUBST(OPENSSL_LIBS)
-fi
+LIBEVENT_OPENSSL
dnl Checks for header files.
AC_HEADER_STDC
[Define if libevent should build without support for a debug mode])
fi
+# check if we should enable verbose debugging
+if test x$enable_verbose_debug = xyes; then
+ CFLAGS="$CFLAGS -DUSE_DEBUG"
+fi
+
# check if we have and should use openssl
AM_CONDITIONAL(OPENSSL, [test "$enable_openssl" != "no" && test "$have_openssl" = "yes"])
/** Lock used to protect the queue. */
void *lock;
+ /** Which event_base does this queue associate itself with?
+ * (Used for timing) */
+ struct event_base *base;
+
/** How many entries are in the queue? */
int active_count;
/** Set the parent bufferevent object for buf to bev */
void evbuffer_set_parent(struct evbuffer *buf, struct bufferevent *bev);
+void evbuffer_invoke_callbacks(struct evbuffer *buf);
+
#ifdef __cplusplus
}
#endif
} else {
base->global_requests_waiting--;
}
+ /* it was initialized during request_new / evtimer_assign */
+ event_debug_unassign(&req->timeout_event);
if (!req->request_appended) {
/* need to free the request data on it's own */
struct event_base *base;
};
+/** Mask used to get the real tv_usec value from a common timeout. */
+#define COMMON_TIMEOUT_MICROSECONDS_MASK 0x000fffff
+
struct event_change;
/* List of 'changes' since the last call to eventop.dispatch. Only maintained
/** Flags that this base was configured with */
enum event_base_config_flag flags;
+ struct timeval max_dispatch_time;
+ int max_dispatch_callbacks;
+ int limit_callbacks_after_prio;
+
/* Notify main thread to wake up break, etc. */
/** True if the base already has a pending notify, and we don't need
* to add any more. */
TAILQ_HEAD(event_configq, event_config_entry) entries;
int n_cpus_hint;
+ struct timeval max_dispatch_interval;
+ int max_dispatch_callbacks;
+ int limit_callbacks_after_prio;
enum event_method_feature require_features;
enum event_base_config_flag flags;
};
#include <signal.h>
#include <string.h>
#include <time.h>
+#include <limits.h>
#include "event2/event.h"
#include "event2/event_struct.h"
const struct timeval *tv, int tv_is_absolute);
static inline int event_del_internal(struct event *ev);
-static void event_queue_insert(struct event_base *, struct event *, int);
-static void event_queue_remove(struct event_base *, struct event *, int);
+static void event_queue_insert_active(struct event_base *, struct event *);
+static void event_queue_insert_timeout(struct event_base *, struct event *);
+static void event_queue_insert_inserted(struct event_base *, struct event *);
+static void event_queue_remove_active(struct event_base *, struct event *);
+static void event_queue_remove_timeout(struct event_base *, struct event *);
+static void event_queue_remove_inserted(struct event_base *, struct event *);
+static void event_queue_reinsert_timeout(struct event_base *,struct event *);
+
static int event_haveevents(struct event_base *);
static int event_process_active(struct event_base *);
static int evthread_notify_base(struct event_base *base);
+static void insert_common_timeout_inorder(struct common_timeout_list *ctl,
+ struct event *ev);
+
#ifndef _EVENT_DISABLE_DEBUG_MODE
/* These functions implement a hashtable of which 'struct event *' structures
* have been setup or added. We don't want to trust the content of the struct
base->th_notify_fd[1] = -1;
event_deferred_cb_queue_init(&base->defer_queue);
+ base->defer_queue.base = base;
base->defer_queue.notify_fn = notify_base_cbq_callback;
base->defer_queue.notify_arg = base;
if (cfg)
should_check_environment =
!(cfg && (cfg->flags & EVENT_BASE_FLAG_IGNORE_ENV));
+ if (cfg) {
+ memcpy(&base->max_dispatch_time,
+ &cfg->max_dispatch_interval, sizeof(struct timeval));
+ base->limit_callbacks_after_prio =
+ cfg->limit_callbacks_after_prio;
+ } else {
+ base->max_dispatch_time.tv_sec = -1;
+ base->limit_callbacks_after_prio = 1;
+ }
+ if (cfg && cfg->max_dispatch_callbacks >= 0) {
+ base->max_dispatch_callbacks = cfg->max_dispatch_callbacks;
+ } else {
+ base->max_dispatch_callbacks = INT_MAX;
+ }
+ if (base->max_dispatch_callbacks == INT_MAX &&
+ base->max_dispatch_time.tv_sec == -1)
+ base->limit_callbacks_after_prio = INT_MAX;
+
for (i = 0; eventops[i] && !base->evbase; i++) {
if (cfg != NULL) {
/* determine if this backend should be avoided */
if (base->sig.ev_signal_added) {
/* we cannot call event_del here because the base has
* not been reinitialized yet. */
- event_queue_remove(base, &base->sig.ev_signal,
- EVLIST_INSERTED);
+ event_queue_remove_inserted(base, &base->sig.ev_signal);
if (base->sig.ev_signal.ev_flags & EVLIST_ACTIVE)
- event_queue_remove(base, &base->sig.ev_signal,
- EVLIST_ACTIVE);
+ event_queue_remove_active(base, &base->sig.ev_signal);
base->sig.ev_signal_added = 0;
}
if (base->th_notify_fd[0] != -1) {
/* we cannot call event_del here because the base has
* not been reinitialized yet. */
was_notifiable = 1;
- event_queue_remove(base, &base->th_notify,
- EVLIST_INSERTED);
+ event_queue_remove_inserted(base, &base->th_notify);
if (base->th_notify.ev_flags & EVLIST_ACTIVE)
- event_queue_remove(base, &base->th_notify,
- EVLIST_ACTIVE);
+ event_queue_remove_active(base, &base->th_notify);
base->sig.ev_signal_added = 0;
EVUTIL_CLOSESOCKET(base->th_notify_fd[0]);
if (base->th_notify_fd[1] != -1)
return (NULL);
TAILQ_INIT(&cfg->entries);
+ cfg->max_dispatch_interval.tv_sec = -1;
+ cfg->max_dispatch_callbacks = INT_MAX;
+ cfg->limit_callbacks_after_prio = 1;
return (cfg);
}
return (0);
}
+int
+event_config_set_max_dispatch_interval(struct event_config *cfg,
+ const struct timeval *max_interval, int max_callbacks, int min_priority)
+{
+ if (max_interval)
+ memcpy(&cfg->max_dispatch_interval, max_interval,
+ sizeof(struct timeval));
+ else
+ cfg->max_dispatch_interval.tv_sec = -1;
+ cfg->max_dispatch_callbacks =
+ max_callbacks >= 0 ? max_callbacks : INT_MAX;
+ if (min_priority <= 0)
+ min_priority = 1;
+ cfg->limit_callbacks_after_prio = min_priority;
+ return (0);
+}
+
int
event_priority_init(int npriorities)
{
* of index into the event_base's aray of common timeouts.
*/
-#define MICROSECONDS_MASK 0x000fffff
+#define MICROSECONDS_MASK COMMON_TIMEOUT_MICROSECONDS_MASK
#define COMMON_TIMEOUT_IDX_MASK 0x0ff00000
#define COMMON_TIMEOUT_IDX_SHIFT 20
#define COMMON_TIMEOUT_MASK 0xf0000000
*/
static int
event_process_active_single_queue(struct event_base *base,
- struct event_list *activeq)
+ struct event_list *activeq,
+ int max_to_process, const struct timeval *endtime)
{
struct event *ev;
int count = 0;
for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
if (ev->ev_events & EV_PERSIST)
- event_queue_remove(base, ev, EVLIST_ACTIVE);
+ event_queue_remove_active(base, ev);
else
event_del_internal(ev);
if (!(ev->ev_flags & EVLIST_INTERNAL))
if (base->event_break)
return -1;
+ if (count >= max_to_process)
+ return count;
+ if (count && endtime) {
+ struct timeval now;
+ update_time_cache(base);
+ gettime(base, &now);
+ if (evutil_timercmp(&now, endtime, >=))
+ return count;
+ }
}
return count;
}
we process.
*/
static int
-event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr)
+event_process_deferred_callbacks(struct deferred_cb_queue *queue, int *breakptr,
+ int max_to_process, const struct timeval *endtime)
{
int count = 0;
struct deferred_cb *cb;
-
#define MAX_DEFERRED 16
+ if (max_to_process > MAX_DEFERRED)
+ max_to_process = MAX_DEFERRED;
+#undef MAX_DEFERRED
+
while ((cb = TAILQ_FIRST(&queue->deferred_cb_list))) {
cb->queued = 0;
TAILQ_REMOVE(&queue->deferred_cb_list, cb, cb_next);
LOCK_DEFERRED_QUEUE(queue);
if (*breakptr)
return -1;
- if (++count == MAX_DEFERRED)
+ if (++count >= max_to_process)
break;
+ if (endtime) {
+ struct timeval now;
+ update_time_cache(queue->base);
+ gettime(queue->base, &now);
+ if (evutil_timercmp(&now, endtime, >=))
+ return count;
+ }
}
-#undef MAX_DEFERRED
return count;
}
/* Caller must hold th_base_lock */
struct event_list *activeq = NULL;
int i, c = 0;
+ const struct timeval *endtime;
+ struct timeval tv;
+ const int maxcb = base->max_dispatch_callbacks;
+ const int limit_after_prio = base->limit_callbacks_after_prio;
+ if (base->max_dispatch_time.tv_sec >= 0) {
+ update_time_cache(base);
+ gettime(base, &tv);
+ evutil_timeradd(&base->max_dispatch_time, &tv, &tv);
+ endtime = &tv;
+ } else {
+ endtime = NULL;
+ }
for (i = 0; i < base->nactivequeues; ++i) {
if (TAILQ_FIRST(&base->activequeues[i]) != NULL) {
activeq = &base->activequeues[i];
- c = event_process_active_single_queue(base, activeq);
+ if (i < limit_after_prio)
+ c = event_process_active_single_queue(base, activeq,
+ INT_MAX, NULL);
+ else
+ c = event_process_active_single_queue(base, activeq,
+ maxcb, endtime);
if (c < 0)
return -1;
else if (c > 0)
}
}
- event_process_deferred_callbacks(&base->defer_queue,&base->event_break);
+ event_process_deferred_callbacks(&base->defer_queue,&base->event_break,
+ maxcb-c, endtime);
return c;
}
else if (ev->ev_events & EV_SIGNAL)
res = evmap_signal_add(base, (int)ev->ev_fd, ev);
if (res != -1)
- event_queue_insert(base, ev, EVLIST_INSERTED);
+ event_queue_insert_inserted(base, ev);
if (res == 1) {
/* evmap says we need to notify the main thread. */
notify = 1;
if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute)
ev->ev_io_timeout = *tv;
- /*
- * we already reserved memory above for the case where we
- * are not replacing an existing timeout.
- */
- if (ev->ev_flags & EVLIST_TIMEOUT) {
- /* XXX I believe this is needless. */
- if (min_heap_elt_is_top(ev))
- notify = 1;
- event_queue_remove(base, ev, EVLIST_TIMEOUT);
- }
-
/* Check if it is active due to a timeout. Rescheduling
* this timeout before the callback can be executed
* removes it from the active list. */
}
}
- event_queue_remove(base, ev, EVLIST_ACTIVE);
+ event_queue_remove_active(base, ev);
}
gettime(base, &now);
}
event_debug((
- "event_add: timeout in %d seconds, call %p",
- (int)tv->tv_sec, ev->ev_callback));
+ "event_add: event %p, timeout in %d seconds %d useconds, call %p",
+ ev, (int)tv->tv_sec, (int)tv->tv_usec, ev->ev_callback));
+
+ event_queue_reinsert_timeout(base, ev);
- event_queue_insert(base, ev, EVLIST_TIMEOUT);
if (common_timeout) {
struct common_timeout_list *ctl =
get_common_timeout_list(base, &ev->ev_timeout);
* dispatch loop early anyway, so we wouldn't gain anything by
* doing it.
*/
- event_queue_remove(base, ev, EVLIST_TIMEOUT);
+ event_queue_remove_timeout(base, ev);
}
if (ev->ev_flags & EVLIST_ACTIVE)
- event_queue_remove(base, ev, EVLIST_ACTIVE);
+ event_queue_remove_active(base, ev);
if (ev->ev_flags & EVLIST_INSERTED) {
- event_queue_remove(base, ev, EVLIST_INSERTED);
+ event_queue_remove_inserted(base, ev);
if (ev->ev_events & (EV_READ|EV_WRITE))
res = evmap_io_del(base, ev->ev_fd, ev);
else
ev->ev_pncalls = NULL;
}
- event_queue_insert(base, ev, EVLIST_ACTIVE);
+ event_queue_insert_active(base, ev);
if (EVBASE_NEED_NOTIFY(base))
evthread_notify_base(base);
EVUTIL_ASSERT(tv->tv_sec >= 0);
EVUTIL_ASSERT(tv->tv_usec >= 0);
- event_debug(("timeout_next: in %d seconds", (int)tv->tv_sec));
+ event_debug(("timeout_next: event: %p, in %d seconds, %d useconds", ev, (int)tv->tv_sec, (int)tv->tv_usec));
out:
return (res);
/* delete this event from the I/O queues */
event_del_internal(ev);
- event_debug(("timeout_process: call %p",
- ev->ev_callback));
+ event_debug(("timeout_process: event: %p, call %p",
+ ev, ev->ev_callback));
event_active_nolock(ev, EV_TIMEOUT, 1);
}
}
-/* Remove 'ev' from 'queue' (EVLIST_...) in base. */
+#if (EVLIST_INTERNAL >> 4) != 1
+#error "Mismatch for value of EVLIST_INTERNAL"
+#endif
+/* These are a fancy way to spell
+ if (~ev->ev_flags & EVLIST_INTERNAL)
+ base->event_count--/++;
+*/
+#define DECR_EVENT_COUNT(base,ev) \
+ ((base)->event_count -= (~((ev)->ev_flags >> 4) & 1))
+#define INCR_EVENT_COUNT(base, ev) \
+ ((base)->event_count += (~((ev)->ev_flags >> 4) & 1))
+
static void
-event_queue_remove(struct event_base *base, struct event *ev, int queue)
+event_queue_remove_inserted(struct event_base *base, struct event *ev)
{
EVENT_BASE_ASSERT_LOCKED(base);
-
- if (!(ev->ev_flags & queue)) {
+ if (EVUTIL_FAILURE_CHECK(!(ev->ev_flags & EVLIST_INSERTED))) {
+ event_errx(1, "%s: %p(fd %d) not on queue %x", __func__,
+ ev, ev->ev_fd, EVLIST_INSERTED);
+ return;
+ }
+ DECR_EVENT_COUNT(base, ev);
+ ev->ev_flags &= ~EVLIST_INSERTED;
+ TAILQ_REMOVE(&base->eventqueue, ev, ev_next);
+}
+static void
+event_queue_remove_active(struct event_base *base, struct event *ev)
+{
+ EVENT_BASE_ASSERT_LOCKED(base);
+ if (EVUTIL_FAILURE_CHECK(!(ev->ev_flags & EVLIST_ACTIVE))) {
+ event_errx(1, "%s: %p(fd %d) not on queue %x", __func__,
+ ev, ev->ev_fd, EVLIST_ACTIVE);
+ return;
+ }
+ DECR_EVENT_COUNT(base, ev);
+ ev->ev_flags &= ~EVLIST_ACTIVE;
+ base->event_count_active--;
+ TAILQ_REMOVE(&base->activequeues[ev->ev_pri],
+ ev, ev_active_next);
+}
+static void
+event_queue_remove_timeout(struct event_base *base, struct event *ev)
+{
+ EVENT_BASE_ASSERT_LOCKED(base);
+ if (EVUTIL_FAILURE_CHECK(!(ev->ev_flags & EVLIST_TIMEOUT))) {
event_errx(1, "%s: %p(fd %d) not on queue %x", __func__,
- ev, ev->ev_fd, queue);
+ ev, ev->ev_fd, EVLIST_TIMEOUT);
return;
}
+ DECR_EVENT_COUNT(base, ev);
+ ev->ev_flags &= ~EVLIST_TIMEOUT;
- if (~ev->ev_flags & EVLIST_INTERNAL)
- base->event_count--;
-
- ev->ev_flags &= ~queue;
- switch (queue) {
- case EVLIST_INSERTED:
- TAILQ_REMOVE(&base->eventqueue, ev, ev_next);
- break;
- case EVLIST_ACTIVE:
- base->event_count_active--;
- TAILQ_REMOVE(&base->activequeues[ev->ev_pri],
- ev, ev_active_next);
- break;
- case EVLIST_TIMEOUT:
- if (is_common_timeout(&ev->ev_timeout, base)) {
- struct common_timeout_list *ctl =
- get_common_timeout_list(base, &ev->ev_timeout);
- TAILQ_REMOVE(&ctl->events, ev,
- ev_timeout_pos.ev_next_with_common_timeout);
- } else {
- min_heap_erase(&base->timeheap, ev);
- }
- break;
- default:
- event_errx(1, "%s: unknown queue %x", __func__, queue);
+ if (is_common_timeout(&ev->ev_timeout, base)) {
+ struct common_timeout_list *ctl =
+ get_common_timeout_list(base, &ev->ev_timeout);
+ TAILQ_REMOVE(&ctl->events, ev,
+ ev_timeout_pos.ev_next_with_common_timeout);
+ } else {
+ min_heap_erase(&base->timeheap, ev);
+ }
+}
+
+/* Remove and reinsert 'ev' into the timeout queue. */
+static void
+event_queue_reinsert_timeout(struct event_base *base, struct event *ev)
+{
+ if (!(ev->ev_flags & EVLIST_TIMEOUT)) {
+ event_queue_insert_timeout(base, ev);
+ return;
+ }
+
+ if (is_common_timeout(&ev->ev_timeout, base)) {
+ struct common_timeout_list *ctl =
+ get_common_timeout_list(base, &ev->ev_timeout);
+ TAILQ_REMOVE(&ctl->events, ev,
+ ev_timeout_pos.ev_next_with_common_timeout);
+ insert_common_timeout_inorder(ctl, ev);
+ } else {
+ min_heap_adjust(&base->timeheap, ev);
}
}
}
static void
-event_queue_insert(struct event_base *base, struct event *ev, int queue)
+event_queue_insert_inserted(struct event_base *base, struct event *ev)
{
EVENT_BASE_ASSERT_LOCKED(base);
- if (ev->ev_flags & queue) {
- /* Double insertion is possible for active events */
- if (queue & EVLIST_ACTIVE)
- return;
+ if (EVUTIL_FAILURE_CHECK(ev->ev_flags & EVLIST_INSERTED)) {
+ event_errx(1, "%s: %p(fd %d) already inserted", __func__,
+ ev, ev->ev_fd);
+ return;
+ }
- event_errx(1, "%s: %p(fd %d) already on queue %x", __func__,
- ev, ev->ev_fd, queue);
+ INCR_EVENT_COUNT(base, ev);
+
+ ev->ev_flags |= EVLIST_INSERTED;
+
+ TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
+}
+
+static void
+event_queue_insert_active(struct event_base *base, struct event *ev)
+{
+ EVENT_BASE_ASSERT_LOCKED(base);
+
+ if (ev->ev_flags & EVLIST_ACTIVE) {
+ /* Double insertion is possible for active events */
return;
}
- if (~ev->ev_flags & EVLIST_INTERNAL)
- base->event_count++;
-
- ev->ev_flags |= queue;
- switch (queue) {
- case EVLIST_INSERTED:
- TAILQ_INSERT_TAIL(&base->eventqueue, ev, ev_next);
- break;
- case EVLIST_ACTIVE:
- base->event_count_active++;
- TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri],
- ev,ev_active_next);
- break;
- case EVLIST_TIMEOUT: {
- if (is_common_timeout(&ev->ev_timeout, base)) {
- struct common_timeout_list *ctl =
- get_common_timeout_list(base, &ev->ev_timeout);
- insert_common_timeout_inorder(ctl, ev);
- } else
- min_heap_push(&base->timeheap, ev);
- break;
+ INCR_EVENT_COUNT(base, ev);
+
+ ev->ev_flags |= EVLIST_ACTIVE;
+
+ base->event_count_active++;
+ TAILQ_INSERT_TAIL(&base->activequeues[ev->ev_pri],
+ ev,ev_active_next);
+}
+
+static void
+event_queue_insert_timeout(struct event_base *base, struct event *ev)
+{
+ EVENT_BASE_ASSERT_LOCKED(base);
+
+ if (EVUTIL_FAILURE_CHECK(ev->ev_flags & EVLIST_TIMEOUT)) {
+ event_errx(1, "%s: %p(fd %d) already on timeout", __func__,
+ ev, ev->ev_fd);
+ return;
}
- default:
- event_errx(1, "%s: unknown queue %x", __func__, queue);
+
+ INCR_EVENT_COUNT(base, ev);
+
+ ev->ev_flags |= EVLIST_TIMEOUT;
+
+ if (is_common_timeout(&ev->ev_timeout, base)) {
+ struct common_timeout_list *ctl =
+ get_common_timeout_list(base, &ev->ev_timeout);
+ insert_common_timeout_inorder(ctl, ev);
+ } else {
+ min_heap_push(&base->timeheap, ev);
}
}
evrpc_request_cb_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
{
struct evrpc_req_generic *rpc_state = arg;
- struct evrpc *rpc = rpc_state->rpc;
- struct evhttp_request *req = rpc_state->http_req;
+ struct evrpc *rpc;
+ struct evhttp_request *req;
+
+ EVUTIL_ASSERT(rpc_state);
+ rpc = rpc_state->rpc;
+ req = rpc_state->http_req;
if (hook_res == EVRPC_TERMINATE)
goto error;
void
evrpc_request_done(struct evrpc_req_generic *rpc_state)
{
- struct evhttp_request *req = rpc_state->http_req;
- struct evrpc *rpc = rpc_state->rpc;
+ struct evhttp_request *req;
+ struct evrpc *rpc;
+
+ EVUTIL_ASSERT(rpc_state);
+
+ req = rpc_state->http_req;
+ rpc = rpc_state->rpc;
if (rpc->reply_complete(rpc_state->reply) == -1) {
/* the reply was not completely filled in. error out */
evrpc_request_done_closure(void *arg, enum EVRPC_HOOK_RESULT hook_res)
{
struct evrpc_req_generic *rpc_state = arg;
- struct evhttp_request *req = rpc_state->http_req;
+ struct evhttp_request *req;
+ EVUTIL_ASSERT(rpc_state);
+ req = rpc_state->http_req;
if (hook_res == EVRPC_TERMINATE)
goto error;
int made_fd = 0;
if (*fd_ptr < 0) {
- made_fd = 1;
if ((*fd_ptr = socket(sa->sa_family, SOCK_STREAM, 0)) < 0)
goto err;
+ made_fd = 1;
if (evutil_make_socket_nonblocking(*fd_ptr) < 0) {
goto err;
}
return 0;
}
-#ifndef _EVENT_HAVE_ARC4RANDOM_BUF
static void
-arc4random_buf(void *buf, size_t n)
+ev_arc4random_buf(void *buf, size_t n)
{
+#ifdef _EVENT_HAVE_ARC4RANDOM_BUF
+ return arc4random_buf(buf, n);
+#else
unsigned char *b = buf;
/* Make sure that we start out with b at a 4-byte alignment; plenty
* of CPUs care about this for 32-bit access. */
ev_uint32_t u = arc4random();
memcpy(b, &u, n);
}
-}
#endif
+}
#else /* !_EVENT_HAVE_ARC4RANDOM { */
return val;
}
+static void
+ev_arc4random_buf(void *buf, size_t n)
+{
+ arc4random_buf(buf, n);
+}
+
#endif /* } !_EVENT_HAVE_ARC4RANDOM */
void
evutil_secure_rng_get_bytes(void *buf, size_t n)
{
- arc4random_buf(buf, n);
+ ev_arc4random_buf(buf, n);
}
void
default:
break;
}
+ break;
case 5:
/* Method length is 5 bytes, which can only encompass PATCH and TRACE */
switch (*method) {
@param base the evdns_base that was used to make the request
@param req the evdns_request that was returned by calling a resolve function
- @see evdns_base_resolve_ip4(), evdns_base_resolve_ipv6, evdns_base_resolve_reverse
+ @see evdns_base_resolve_ipv4(), evdns_base_resolve_ipv6, evdns_base_resolve_reverse
*/
void evdns_cancel_request(struct evdns_base *base, struct evdns_request *req);
*/
int event_config_set_num_cpus_hint(struct event_config *cfg, int cpus);
+/**
+ * Record an interval and/or a number of callbacks after which the event base
+ * should check for new events. By default, the event base will run as many
+ * events are as activated at the higest activated priority before checking
+ * for new events. If you configure it by setting max_interval, it will check
+ * the time after each callback, and not allow more than max_interval to
+ * elapse before checking for new events. If you configure it by setting
+ * max_callbacks to a value >= 0, it will run no more than max_callbacks
+ * callbacks before checking for new events.
+ *
+ * This option can decrease the latency of high-priority events, and
+ * avoid priority inversions where multiple low-priority events keep us from
+ * polling for high-priority events, but at the expense of slightly decreasing
+ * the throughput. Use it with caution!
+ *
+ * @param cfg The event_base configuration object.
+ * @param max_interval An interval after which Libevent should stop running
+ * callbacks and check for more events, or NULL if there should be
+ * no such interval.
+ * @param max_callbacks A number of callbacks after which Libevent should
+ * stop running callbacks and check for more events, or -1 if there
+ * should be no such limit.
+ * @param min_priority A priority below which max_interval and max_callbacks
+ * should not be enforced. If this is set to 0, they are enforced
+ * for events of every priority; if it's set to 1, they're enforced
+ * for events of priority 1 and above, and so on.
+ * @return 0 on success, -1 on failure.
+ **/
+int event_config_set_max_dispatch_interval(struct event_config *cfg,
+ const struct timeval *max_interval, int max_callbacks,
+ int min_priority);
+
/**
Initialize the event API.
/**
@name timeout_* macros
- @deprecated These macros are deprecated because their naming is inconsisten
+ @deprecated These macros are deprecated because their naming is inconsistent
with the rest of Libevent. Use the evtimer_* macros instead.
@{
*/
/**
@name signal_* macros
- @deprecated These macros are deprecated because their naming is inconsisten
+ @deprecated These macros are deprecated because their naming is inconsistent
with the rest of Libevent. Use the evsignal_* macros instead.
@{
*/
return (NULL);
}
-static void
-kq_sighandler(int sig)
-{
- /* Do nothing here */
-}
-
#define ADD_UDATA 0x30303
static void
if (kevent(kqop->kq, &kev, 1, NULL, 0, &timeout) == -1)
return (-1);
- /* XXXX The manpage suggest we could use SIG_IGN instead of a
- * do-nothing handler */
- if (_evsig_set_handler(base, nsignal, kq_sighandler) == -1)
+ /* We can set the handler for most signals to SIG_IGN and
+ * still have them reported to us in the queue. However,
+ * if the handler for SIGCHLD is SIG_IGN, the system reaps
+ * zombie processes for us, and we don't get any notification.
+ * This appears to be the only signal with this quirk. */
+ if (_evsig_set_handler(base, nsignal,
+ nsignal == SIGCHLD ? SIG_DFL : SIG_IGN) == -1)
return (-1);
return (0);
--- /dev/null
+dnl ######################################################################
+dnl OpenSSL support
+AC_DEFUN([LIBEVENT_OPENSSL], [
+AC_REQUIRE([NTP_PKG_CONFIG])dnl
+
+case "$enable_openssl" in
+ yes)
+ have_openssl=no
+ case "$PKG_CONFIG" in
+ '')
+ ;;
+ *)
+ OPENSSL_LIBS=`$PKG_CONFIG --libs openssl 2>/dev/null`
+ case "$OPENSSL_LIBS" in
+ '') ;;
+ *) OPENSSL_LIBS="$OPENSSL_LIBS $EV_LIB_GDI $EV_LIB_WS32"
+ have_openssl=yes
+ ;;
+ esac
+ OPENSSL_INCS=`$PKG_CONFIG --cflags openssl 2>/dev/null`
+ ;;
+ esac
+ case "$have_openssl" in
+ yes) ;;
+ *)
+ save_LIBS="$LIBS"
+ LIBS=""
+ OPENSSL_LIBS=""
+ AC_SEARCH_LIBS([SSL_new], [ssl],
+ [have_openssl=yes
+ OPENSSL_LIBS="$LIBS -lcrypto $EV_LIB_GDI $EV_LIB_WS32"],
+ [have_openssl=no],
+ [-lcrypto $EV_LIB_GDI $EV_LIB_WS32])
+ LIBS="$save_LIBS"
+ ;;
+ esac
+ AC_SUBST(OPENSSL_INCS)
+ AC_SUBST(OPENSSL_LIBS)
+ case "$have_openssl" in
+ yes) AC_DEFINE(HAVE_OPENSSL, 1, [Define if the system has openssl]) ;;
+ esac
+ ;;
+esac
+
+# check if we have and should use openssl
+AM_CONDITIONAL(OPENSSL, [test "$enable_openssl" != "no" && test "$have_openssl" = "yes"])
+])
--- /dev/null
+dnl NTP_PKG_CONFIG -*- Autoconf -*-
+dnl
+dnl Look for pkg-config, which must be at least
+dnl $ntp_pkgconfig_min_version.
+dnl
+AC_DEFUN([NTP_PKG_CONFIG], [
+
+dnl lower the minimum version if you find an earlier one works
+ntp_pkgconfig_min_version='0.15.0'
+AC_PATH_TOOL([PKG_CONFIG], [pkg-config])
+AS_UNSET([ac_cv_path_PKG_CONFIG])
+AS_UNSET([ac_cv_path_ac_pt_PKG_CONFIG])
+
+case "$PKG_CONFIG" in
+ /*)
+ AC_MSG_CHECKING([if pkg-config is at least version $ntp_pkgconfig_min_version])
+ if $PKG_CONFIG --atleast-pkgconfig-version $ntp_pkgconfig_min_version; then
+ AC_MSG_RESULT([yes])
+ else
+ AC_MSG_RESULT([no])
+ PKG_CONFIG=""
+ fi
+ ;;
+esac
+
+]) dnl NTP_PKG_CONFIG
+
static inline void min_heap_dtor(min_heap_t* s);
static inline void min_heap_elem_init(struct event* e);
static inline int min_heap_elt_is_top(const struct event *e);
-static inline int min_heap_elem_greater(struct event *a, struct event *b);
static inline int min_heap_empty(min_heap_t* s);
static inline unsigned min_heap_size(min_heap_t* s);
static inline struct event* min_heap_top(min_heap_t* s);
static inline int min_heap_reserve(min_heap_t* s, unsigned n);
static inline int min_heap_push(min_heap_t* s, struct event* e);
static inline struct event* min_heap_pop(min_heap_t* s);
+static inline int min_heap_adjust(min_heap_t *s, struct event* e);
static inline int min_heap_erase(min_heap_t* s, struct event* e);
static inline void min_heap_shift_up_(min_heap_t* s, unsigned hole_index, struct event* e);
+static inline void min_heap_shift_up_unconditional_(min_heap_t* s, unsigned hole_index, struct event* e);
static inline void min_heap_shift_down_(min_heap_t* s, unsigned hole_index, struct event* e);
-int min_heap_elem_greater(struct event *a, struct event *b)
-{
- return evutil_timercmp(&a->ev_timeout, &b->ev_timeout, >);
-}
+#define min_heap_elem_greater(a, b) \
+ (evutil_timercmp(&(a)->ev_timeout, &(b)->ev_timeout, >))
void min_heap_ctor(min_heap_t* s) { s->p = 0; s->n = 0; s->a = 0; }
void min_heap_dtor(min_heap_t* s) { if (s->p) mm_free(s->p); }
to be less than the parent, it can't need to shift both up and
down. */
if (e->ev_timeout_pos.min_heap_idx > 0 && min_heap_elem_greater(s->p[parent], last))
- min_heap_shift_up_(s, e->ev_timeout_pos.min_heap_idx, last);
+ min_heap_shift_up_unconditional_(s, e->ev_timeout_pos.min_heap_idx, last);
else
min_heap_shift_down_(s, e->ev_timeout_pos.min_heap_idx, last);
e->ev_timeout_pos.min_heap_idx = -1;
return -1;
}
+int min_heap_adjust(min_heap_t *s, struct event *e)
+{
+ if (-1 == e->ev_timeout_pos.min_heap_idx) {
+ return min_heap_push(s, e);
+ } else {
+ unsigned parent = (e->ev_timeout_pos.min_heap_idx - 1) / 2;
+ /* The position of e has changed; we shift it up or down
+ * as needed. We can't need to do both. */
+ if (e->ev_timeout_pos.min_heap_idx > 0 && min_heap_elem_greater(s->p[parent], e))
+ min_heap_shift_up_unconditional_(s, e->ev_timeout_pos.min_heap_idx, e);
+ else
+ min_heap_shift_down_(s, e->ev_timeout_pos.min_heap_idx, e);
+ return 0;
+ }
+ return -1;
+}
+
int min_heap_reserve(min_heap_t* s, unsigned n)
{
if (s->a < n)
return 0;
}
+void min_heap_shift_up_unconditional_(min_heap_t* s, unsigned hole_index, struct event* e)
+{
+ unsigned parent = (hole_index - 1) / 2;
+ do
+ {
+ (s->p[hole_index] = s->p[parent])->ev_timeout_pos.min_heap_idx = hole_index;
+ hole_index = parent;
+ parent = (hole_index - 1) / 2;
+ } while (hole_index && min_heap_elem_greater(s->p[parent], e));
+ (s->p[hole_index] = e)->ev_timeout_pos.min_heap_idx = hole_index;
+}
+
void min_heap_shift_up_(min_heap_t* s, unsigned hole_index, struct event* e)
{
unsigned parent = (hole_index - 1) / 2;
http_server_sources = http-server.c
if OPENSSL
+AM_CPPFLAGS += $(OPENSSL_INCS)
noinst_PROGRAMS += le-proxy
le_proxy_sources = le-proxy.c
le_proxy_LDADD = $(LDADD) ../libevent_openssl.la
evutil_socket_t sock;
struct sockaddr_in my_addr;
sock = socket(PF_INET, SOCK_DGRAM, 0);
+ if (sock == -1) {
+ perror("socket");
+ exit(1);
+ }
evutil_make_socket_nonblocking(sock);
my_addr.sin_family = AF_INET;
my_addr.sin_port = htons(10053);
int n;
char cbuf[128];
n = evbuffer_remove(buf, cbuf, sizeof(buf)-1);
- fwrite(cbuf, 1, n, stdout);
+ if (n > 0)
+ fwrite(cbuf, 1, n, stdout);
}
puts(">>>");
/* We need to decode it, to see what path the user really wanted. */
decoded_path = evhttp_uridecode(path, 0, NULL);
+ if (decoded_path == NULL)
+ goto err;
/* Don't allow any ".."s in the path, to avoid exposing stuff outside
* of the docroot. This test is both overzealous and underzealous:
* it forbids aceptable paths like "/this/one..here", but it doesn't
int i = 0;
if (base->sig.ev_signal_added) {
event_del(&base->sig.ev_signal);
- event_debug_unassign(&base->sig.ev_signal);
base->sig.ev_signal_added = 0;
}
+ /* debug event is created in evsig_init/event_assign even when
+ * ev_signal_added == 0, so unassign is required */
+ event_debug_unassign(&base->sig.ev_signal);
for (i = 0; i < NSIG; ++i) {
if (i < base->sig.sh_old_max && base->sig.sh_old[i] != NULL)
AM_CPPFLAGS = -I$(top_srcdir) -I$(top_srcdir)/compat -I$(top_srcdir)/include -I../include -DTINYTEST_LOCAL
-EXTRA_DIST = regress.rpc regress.gen.h regress.gen.c test.sh
+EXTRA_DIST = regress.rpc regress.gen.h regress.gen.c rpcgen_wrapper.sh test.sh
noinst_PROGRAMS = test-init test-eof test-weof test-time \
bench bench_cascade bench_http bench_httpclient test-ratelim \
bench_httpclient_SOURCES = bench_httpclient.c
bench_httpclient_LDADD = $(LIBEVENT_GC_SECTIONS) ../libevent_core.la
-regress.gen.c regress.gen.h: regress.rpc $(top_srcdir)/event_rpcgen.py
- if $(top_srcdir)/event_rpcgen.py $(srcdir)/regress.rpc ; then \
- echo "HI"; \
+regress.gen.c regress.gen.h: rpcgen-attempted
+
+rpcgen-attempted: $(srcdir)/regress.rpc $(srcdir)/../event_rpcgen.py $(srcdir)/rpcgen_wrapper.sh
+ date -u > $@
+ if $(srcdir)/rpcgen_wrapper.sh $(srcdir); then \
+ echo "rpcgen okay"; \
else \
- echo "No Python installed; can't test RPC."; \
+ echo "No Python installed; stubbing out RPC test." >&2; \
echo " "> regress.gen.c; \
echo "#define NO_PYTHON_EXISTS" > regress.gen.h; \
fi
+CLEANFILES = rpcgen-attempted
+
DISTCLEANFILES = *~
verify: check
int c;
int use_iocp = 0;
unsigned short port = 8080;
+ char *endptr = NULL;
#ifdef _WIN32
WSADATA WSAData;
switch (c) {
case 'p':
- port = atoi(argv[i+1]);
+ if (i+1 >= argc || !argv[i+1]) {
+ fprintf(stderr, "Missing port\n");
+ exit(1);
+ }
+ port = (int)strtol(argv[i+1], &endptr, 10);
+ if (*endptr != '\0') {
+ fprintf(stderr, "Bad port\n");
+ exit(1);
+ }
break;
case 'l':
- content_len = atol(argv[i+1]);
- if (content_len == 0) {
+ if (i+1 >= argc || !argv[i+1]) {
+ fprintf(stderr, "Missing content length\n");
+ exit(1);
+ }
+ content_len = (size_t)strtol(argv[i+1], &endptr, 10);
+ if (*endptr != '\0' || content_len == 0) {
fprintf(stderr, "Bad content length\n");
exit(1);
}
--- /dev/null
+#!/bin/sh
+# libevent rpcgen_wrapper.sh
+# Transforms event_rpcgen.py failure into success for make, only if
+# regress.gen.c and regress.gen.h already exist in $srcdir. This
+# is needed for "make distcheck" to pass the read-only $srcdir build,
+# as with read-only sources fresh from tarball, regress.gen.[ch] will
+# be correct in $srcdir but unwritable. This previously triggered
+# Makefile.am to create stub regress.gen.c and regress.gen.h in the
+# distcheck _build directory, which were then detected as leftover
+# files in the build tree after distclean, breaking distcheck.
+# Note that regress.gen.[ch] are not in fresh git clones, making
+# working Python a requirement for make distcheck of a git tree.
+
+exit_updated() {
+ echo "Updated ${srcdir}\regress.gen.c and ${srcdir}\regress.gen.h"
+ exit 0
+}
+
+exit_reuse() {
+ echo "event_rpcgen.py failed, ${srcdir}\regress.gen.\[ch\] will be reused." >&2
+ exit 0
+}
+
+exit_failed() {
+ echo "Could not generate regress.gen.\[ch\] using event_rpcgen.sh" >&2
+ exit 1
+}
+
+srcdir=$1
+srcdir=${srcdir:-.}
+${srcdir}/../event_rpcgen.py ${srcdir}/regress.rpc
+case "$?" in
+ 0)
+ exit_updated
+ ;;
+ *)
+ test -r ${srcdir}/regress.gen.c -a -r ${srcdir}/regress.gen.h && \
+ exit_reuse
+ exit_failed
+ ;;
+esac
static int cfg_grouplimit_tolerance = -1;
static int cfg_stddev_tolerance = -1;
+#ifdef _WIN32
+static int cfg_enable_iocp = 0;
+#endif
+
static struct timeval cfg_tick = { 0, 500*1000 };
static struct ev_token_bucket_cfg *conn_bucket_cfg = NULL;
double variance;
double expected_total_persec = -1.0, expected_avg_persec = -1.0;
int ok = 1;
+ struct event_config *base_cfg;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
if (0)
event_enable_debug_mode();
- base = event_base_new();
+ base_cfg = event_config_new();
+
+#ifdef _WIN32
+ if (cfg_enable_iocp) {
+ evthread_use_windows_threads();
+ event_config_set_flag(base_cfg, EVENT_BASE_FLAG_STARTUP_IOCP);
+ }
+#endif
+
+ base = event_base_new_with_config(base_cfg);
listener = evconnlistener_new_bind(base, echo_listenercb, base,
LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1,
{ "--check-connlimit", &cfg_connlimit_tolerance, 0, 0 },
{ "--check-grouplimit", &cfg_grouplimit_tolerance, 0, 0 },
{ "--check-stddev", &cfg_stddev_tolerance, 0, 0 },
+#ifdef _WIN32
+ { "--iocp", &cfg_enable_iocp, 0, 1 },
+#endif
{ NULL, NULL, -1, 0 },
};