typedef struct fr_event_fd_t {
int fd;
+
+#ifdef HAVE_KQUEUE
+ bool want_read;
+ bool want_write;
+#endif
+
fr_event_fd_handler_t handler;
fr_event_fd_handler_t write_handler;
void *ctx;
int max_readers;
int max_fd;
+ fd_set want_read_fds;
+ fd_set want_write_fds;
+
fd_set read_fds;
fd_set write_fds;
#else
int kq;
struct kevent events[FR_EV_MAX_FDS]; /* so it doesn't go on the stack every time */
#endif
+
+ /*
+ * Hacks for want_read / want_write
+ */
+ bool want_read;
+ bool want_write;
+ pthread_mutex_t mutex; /* for want_read / want_write */
+
fr_event_fd_t readers[FR_EV_MAX_FDS];
};
#ifdef HAVE_KQUEUE
close(el->kq);
#endif
+ pthread_mutex_destroy(&el->mutex);
return 0;
}
el->max_fd = 0;
FD_ZERO(&el->read_fds);
FD_ZERO(&el->write_fds);
+
+ FD_ZERO(&el->want_read_fds);
+ FD_ZERO(&el->want_write_fds);
#else
el->kq = kqueue();
if (el->kq < 0) {
}
#endif
+ pthread_mutex_init(&el->mutex, NULL);
+
el->status = status;
return el;
return 1;
}
+void fr_event_fd_want_read(fr_event_list_t *el, int fd)
+{
+ pthread_mutex_lock(&el->mutex);
+ el->want_read = true;
+
+#ifndef HAVE_KQUEUE
+ FD_SET(fd, &el->want_read_fds);
+#else
+ for (int i = 0; i < FR_EV_MAX_FDS; i++) {
+ int j;
+
+ j = (i + fd) & (FR_EV_MAX_FDS - 1);
+
+ if (el->readers[j].fd != fd) continue;
+ el->readers[j].want_read = true;
+ break;
+ }
+#endif
+ pthread_mutex_unlock(&el->mutex);
+}
+
+void fr_event_fd_want_write(fr_event_list_t *el, int fd)
+{
+ pthread_mutex_lock(&el->mutex);
+ el->want_write = true;
+#ifndef HAVE_KQUEUE
+
+ FD_SET(fd, &el->want_write_fds);
+#else
+ for (int i = 0; i < FR_EV_MAX_FDS; i++) {
+ int j;
+
+ j = (i + fd) & (FR_EV_MAX_FDS - 1);
+
+ if (el->readers[j].fd != fd) continue;
+ el->readers[j].want_write = true;
+ break;
+ }
+#endif
+ pthread_mutex_unlock(&el->mutex);
+}
+
int fr_event_fd_write_handler(fr_event_list_t *el, int type, int fd,
fr_event_fd_handler_t write_handler, void *ctx)
{
FD_CLR(fd, &el->read_fds);
FD_CLR(fd, &el->write_fds);
+ FD_CLR(fd, &el->want_read_fds);
+ FD_CLR(fd, &el->want_write_fds);
+
/*
* @todo - update el->max_fd, too.
*/
return -1;
}
+ /*
+ * Merge the flags together for FDs where we need
+ * to call the read or write function.
+ */
+ pthread_mutex_lock(&el->mutex);
+ if (el->want_read || el->want_write) {
+ when.tv_sec = 0;
+ when.tv_usec = 0;
+ wake = &when;
+
+ for (i = 0; i <= el->max_fd; i++) {
+ if (FD_ISSET(i, &el->want_read_fds)) FD_SET(i, &read_fds);
+ if (FD_ISSET(i, &el->want_write_fds)) FD_SET(i, &write_fds);
+ }
+
+ el->want_read = el->want_write = 0;
+
+ FD_ZERO(&el->want_read_fds);
+ FD_ZERO(&el->want_write_fds);
+ }
+ pthread_mutex_unlock(&el->mutex);
+
#else /* HAVE_KQUEUE */
- if (wake) {
+ pthread_mutex_lock(&el->mutex);
+ if (el->want_read || el->want_write) {
+ ts_wake = &ts_when;
+ ts_when.tv_sec = 0;
+ ts_when.tv_nsec = 0;
+
+ } else if (wake) {
ts_wake = &ts_when;
ts_when.tv_sec = when.tv_sec;
ts_when.tv_nsec = when.tv_usec * 1000;
+
} else {
ts_wake = NULL;
}
+ pthread_mutex_unlock(&el->mutex);
rcode = kevent(el->kq, NULL, 0, el->events, FR_EV_MAX_FDS, ts_wake);
#endif /* HAVE_KQUEUE */
* Call the handler, which SHOULD
* delete the connection.
*/
+ ef->want_read = false;
ef->handler(el, ef->fd, ef->ctx);
continue;
}
if (el->events[i].filter == EVFILT_WRITE) {
+ ef->want_write = false;
ef->write_handler(el, ef->fd, ef->ctx);
continue;
}
* EVFILT_READ, so it must be a read
* event.
*/
+ ef->want_read = false;
ef->handler(el, ef->fd, ef->ctx);
}
+
+ /*
+ * Loop over all of the sockets (again) if we have want_read or want_write
+ */
+ pthread_mutex_lock(&el->mutex);
+ if (el->want_read || el->want_write) {
+ for (i = 0; i < el->num_readers; i++) {
+ fr_event_fd_t *ef = &el->readers[i];
+
+ if (ef->fd < 0) continue;
+
+ /*
+ * Check if the socket is available for writing.
+ */
+ if (ef->want_write) {
+ ef->want_write = false;
+
+ pthread_mutex_unlock(&el->mutex);
+ ef->write_handler(el, ef->fd, ef->ctx);
+ pthread_mutex_lock(&el->mutex);
+ }
+
+ if (ef->want_read) {
+ ef->want_read = false;
+
+ pthread_mutex_unlock(&el->mutex);
+ ef->handler(el, ef->fd, ef->ctx);
+ pthread_mutex_lock(&el->mutex);
+ }
+
+ }
+
+ el->want_read = el->want_write = false;
+ }
+ pthread_mutex_unlock(&el->mutex);
#endif /* HAVE_KQUEUE */
}