#include "util/netevent.h"
#include "util/fptr_wlist.h"
+/*#ifndef USE_WINSOCK TODO */
+#if 1
+/* on unix */
+
struct tube* tube_create(void)
{
struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
}
return 1;
}
+
+
+#else /* USE_WINSOCK */
+/* on windows */
+
+
+struct tube* tube_create(void)
+{
+ /* windows does not have forks like unix, so we only support
+ * threads on windows. And thus the pipe need only connect
+ * threads. We use a mutex and a list of datagrams. */
+ struct tube* tube = (struct tube*)calloc(1, sizeof(*tube));
+ if(!tube) {
+ int err = errno;
+ log_err("tube_create: out of memory");
+ errno = err;
+ return NULL;
+ }
+ tube->event = WSACreateEvent();
+ if(tube->event == WSA_INVALID_EVENT) {
+ free(tube);
+ log_err("WSACreateEvent: %s", wsa_strerror(WSAGetLastError()));
+ }
+ lock_basic_init(&tube->res_lock);
+ return tube;
+}
+
+void tube_delete(struct tube* tube)
+{
+ if(!tube) return;
+ tube_remove_bg_listen(tube);
+ tube_remove_bg_write(tube);
+ tube_close_read(tube);
+ tube_close_write(tube);
+ if(!WSACloseEvent(tube->event))
+ log_err("WSACloseEvent: %s", wsa_strerror(WSAGetLastError()));
+ lock_basic_destroy(&tube->res_lock);
+ free(tube);
+}
+
+void tube_close_read(struct tube* ATTR_UNUSED(tube))
+{
+}
+
+void tube_close_write(struct tube* ATTR_UNUSED(tube))
+{
+ /* wake up waiting reader with an empty queue */
+ if(!WSASetEvent(tube->event)) {
+ log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
+ }
+}
+
+void tube_remove_bg_listen(struct tube* tube)
+{
+}
+
+void tube_remove_bg_write(struct tube* tube)
+{
+ if(tube->res_list) {
+ struct tube_res_list* np, *p = tube->res_list;
+ tube->res_list = NULL;
+ tube->res_last = NULL;
+ while(p) {
+ np = p->next;
+ free(p->buf);
+ free(p);
+ p = np;
+ }
+ }
+}
+
+int tube_write_msg(struct tube* tube, uint8_t* buf, uint32_t len,
+ int ATTR_UNUSED(nonblock))
+{
+ /* always nonblocking, this pipe cannot get full */
+ return tube_queue_item(tube, buf, len);
+}
+
+int tube_read_msg(struct tube* tube, uint8_t** buf, uint32_t* len,
+ int nonblock)
+{
+ struct tube_res_list* item = NULL;
+ *buf = NULL;
+ if(!tube_poll(tube)) {
+ /* nothing ready right now, wait if we want to */
+ if(nonblock)
+ return -1; /* would block waiting for items */
+ if(!tube_wait(tube))
+ return 0;
+ }
+ lock_basic_lock(&tube->res_lock);
+ if(tube->res_list) {
+ item = tube->res_list;
+ tube->res_list = item->next;
+ if(tube->res_last == item) {
+ /* the list is now empty */
+ tube->res_last = NULL;
+ if(!WSAResetEvent(&tube->event)) {
+ log_err("WSAResetEvent: %s",
+ wsa_strerror(errno));
+ }
+ }
+ }
+ lock_basic_unlock(&tube->res_lock);
+ if(!item)
+ return 0; /* would block waiting for items */
+ *buf = item->buf;
+ *len = item->len;
+ free(item);
+ return 1;
+}
+
+int tube_poll(struct tube* tube)
+{
+ struct tube_res_list* item = NULL;
+ lock_basic_lock(&tube->res_lock);
+ item = tube->res_list;
+ lock_basic_unlock(&tube->res_lock);
+ if(item)
+ return 1;
+ return 0;
+}
+
+int tube_wait(struct tube* tube)
+{
+ /* block on eventhandle */
+ DWORD res = WSAWaitForMultipleEvents(
+ 1 /* one event in array */,
+ &tube->event /* the event to wait for, our pipe signal */,
+ 0 /* wait for all events is false */,
+ WSA_INFINITE /* wait, no timeout */,
+ 0 /* we are not alertable for IO completion routines */
+ );
+ if(res == WSA_WAIT_TIMEOUT) {
+ return 0;
+ }
+ if(res == WSA_WAIT_IO_COMPLETION) {
+ /* a bit unexpected, since we were not alertable */
+ return 0;
+ }
+ return 1;
+}
+
+int tube_read_fd(struct tube* ATTR_UNUSED(tube))
+{
+ /* nothing sensible on Windows */
+ return -1;
+}
+
+int
+tube_handle_listen(struct comm_point* c, void* arg, int error,
+ struct comm_reply* ATTR_UNUSED(reply_info))
+{
+ /* TODO */
+}
+
+int
+tube_handle_write(struct comm_point* ATTR_UNUSED(c), void* ATTR_UNUSED(arg),
+ int ATTR_UNUSED(error), struct comm_reply* ATTR_UNUSED(reply_info))
+{
+ log_assert(0);
+ return 0;
+}
+
+int tube_setup_bg_listen(struct tube* tube, struct comm_base* base,
+ tube_callback_t* cb, void* arg)
+{
+ /* TODO register with event base */
+}
+
+int tube_setup_bg_write(struct tube* ATTR_UNUSED(tube),
+ struct comm_base* ATTR_UNUSED(base))
+{
+ /* the queue item routine performs the signaling */
+ return 1;
+}
+
+int tube_queue_item(struct tube* tube, uint8_t* msg, size_t len)
+{
+ struct tube_res_list* item =
+ (struct tube_res_list*)malloc(sizeof(*item));
+ if(!item) {
+ free(msg);
+ log_err("out of memory for async answer");
+ return 0;
+ }
+ item->buf = msg;
+ item->len = len;
+ item->next = NULL;
+ lock_basic_lock(&tube->res_lock);
+ /* add at back of list, since the first one may be partially written */
+ if(tube->res_last)
+ tube->res_last->next = item;
+ else tube->res_list = item;
+ tube->res_last = item;
+ /* signal the eventhandle */
+ if(!WSASetEvent(tube->event)) {
+ log_err("WSASetEvent: %s", wsa_strerror(WSAGetLastError()));
+ }
+ lock_basic_unlock(&tube->res_lock);
+ return 1;
+}
+
+#endif /* USE_WINSOCK */