]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
made all sockets handle partial IO
authorAndrew Tridgell <tridge@samba.org>
Tue, 10 Apr 2007 09:33:21 +0000 (19:33 +1000)
committerAndrew Tridgell <tridge@samba.org>
Tue, 10 Apr 2007 09:33:21 +0000 (19:33 +1000)
abstract IO via ctdb_queue_*() functions

(This used to be ctdb commit 636ae76f4632b29231db87be32c9114f58b37840)

ctdb/common/ctdb_daemon.c
ctdb/common/ctdb_io.c
ctdb/include/ctdb.h
ctdb/include/ctdb_private.h
ctdb/tcp/ctdb_tcp.h
ctdb/tcp/tcp_connect.c
ctdb/tcp/tcp_io.c

index dc1e1477163100be74ba143fed6aec176e7c4b11..f2a8e8462ad31d28a868e21ae8bf5b20dac95ac3 100644 (file)
@@ -29,6 +29,7 @@
 #include "../include/ctdb_private.h"
 
 #define CTDB_PATH      "/tmp/ctdb.socket"
+#define CTDB_DS_ALIGNMENT 8
 
 
 static void ctdb_main_loop(struct ctdb_context *ctdb)
@@ -51,12 +52,13 @@ static void set_non_blocking(int fd)
 }
 
 
-
+/*
+  structure describing a connected client in the daemon
+ */
 struct ctdb_client {
        struct ctdb_context *ctdb;
-       struct fd_event *fde;
        int fd;
-       struct ctdb_partial partial;
+       struct ctdb_queue *queue;
 };
 
 
@@ -71,7 +73,10 @@ static int ctdb_client_destructor(struct ctdb_client *client)
 }
 
 
-
+/*
+  this is called when the ctdb daemon received a ctdb request call
+  from a local client over the unix domain socket
+ */
 static void client_request_call(struct ctdb_client *client, struct ctdb_req_call *c)
 {
        struct ctdb_call_state *state;
@@ -159,7 +164,7 @@ static void client_incoming_packet(struct ctdb_client *client, void *data, size_
 }
 
 
-static void ctdb_client_read_cb(uint8_t *data, int cnt, void *args)
+static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
 {
        struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
        struct ctdb_req_header *hdr;
@@ -189,15 +194,6 @@ static void ctdb_client_read_cb(uint8_t *data, int cnt, void *args)
        client_incoming_packet(client, data, cnt);
 }
 
-static void ctdb_client_read(struct event_context *ev, struct fd_event *fde, 
-                        uint16_t flags, void *private)
-{
-       struct ctdb_client *client = talloc_get_type(private, struct ctdb_client);
-
-       ctdb_read_pdu(client->fd, client, &client->partial, ctdb_client_read_cb, client);
-}
-
-
 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
                         uint16_t flags, void *private)
 {
@@ -219,8 +215,8 @@ static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
        client->ctdb = ctdb;
        client->fd = fd;
 
-       event_add_fd(ctdb->ev, client, client->fd, EVENT_FD_READ
-                    ctdb_client_read, client); 
+       client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT
+                                        ctdb_client_read_cb, client);
 
        talloc_set_destructor(client, ctdb_client_destructor);
 }
@@ -340,7 +336,7 @@ int ctdbd_start(struct ctdb_context *ctdb)
 }
 
 
-static void ctdb_daemon_read_cb(uint8_t *data, int cnt, void *args)
+static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
 {
        struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
        struct ctdb_req_header *hdr;
@@ -370,22 +366,6 @@ static void ctdb_daemon_read_cb(uint8_t *data, int cnt, void *args)
 }
 
 
-
-static void ctdb_daemon_io(struct event_context *ev, struct fd_event *fde, 
-                        uint16_t flags, void *private)
-{
-       struct ctdb_context *ctdb = talloc_get_type(private, struct ctdb_context);
-
-
-       if (flags&EVENT_FD_READ) {
-               ctdb_read_pdu(ctdb->daemon.sd, ctdb, &ctdb->daemon.partial, ctdb_daemon_read_cb, ctdb);
-       }
-       if (flags&EVENT_FD_WRITE) {
-               printf("socket is filled.   fix this   see tcp_io/ctdb_tcp_node_write how to do this\n");
-/*             ctdb_daemon_write(ctdb);*/
-       }
-}
-
 /*
   connect to a unix domain socket
 */
@@ -408,8 +388,9 @@ static int ux_socket_connect(struct ctdb_context *ctdb)
                return -1;
        }
 
-       ctdb->daemon.fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, 
-                    ctdb_daemon_io, ctdb);     
+       ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
+                                             CTDB_DS_ALIGNMENT, 
+                                             ctdb_daemon_read_cb, ctdb);
        return 0;
 }
 
@@ -427,7 +408,6 @@ static int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
 }
 
 
-#define CTDB_DS_ALIGNMENT 8
 static void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len)
 {
        int size;
@@ -448,49 +428,7 @@ struct ctdbd_queue_packet {
 */
 int ctdbd_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 {
-       uint8_t *data = (uint8_t *)hdr;
-       uint32_t length = hdr->length;
-       struct ctdbd_queue_packet *pkt;
-       uint32_t length2;
-
-       /* enforce the length and alignment rules from the tcp packet allocator */
-       length2 = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
-       *(uint32_t *)data = length2;
-
-       if (length2 != length) {
-               memset(data+length, 0, length2-length);
-       }
-       
-       /* if the queue is empty then try an immediate write, avoiding
-          queue overhead. This relies on non-blocking sockets */
-       if (ctdb->daemon.queue == NULL) {
-               ssize_t n = write(ctdb->daemon.sd, data, length2);
-               if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
-                       printf("socket to ctdb daemon has died\n");
-                       return -1;
-               }
-               if (n > 0) {
-                       data += n;
-                       length2 -= n;
-               }
-               if (length2 == 0) return 0;
-       }
-
-       pkt = talloc(ctdb, struct ctdbd_queue_packet);
-       CTDB_NO_MEMORY(ctdb, pkt);
-
-       pkt->data = talloc_memdup(pkt, data, length2);
-       CTDB_NO_MEMORY(ctdb, pkt->data);
-
-       pkt->length = length2;
-
-       if (ctdb->daemon.queue == NULL) {
-               EVENT_FD_WRITEABLE(ctdb->daemon.fde);
-       }
-
-       DLIST_ADD_END(ctdb->daemon.queue, pkt, struct ctdbd_queue_packet *);
-
-       return 0;
+       return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
 }
 
 
index f627fedc38a57fd53d0c8cdc439e384d19bfb6c1..e6269b18ab945ef508d532bc72fc9e30b877babd 100644 (file)
@@ -21,9 +21,6 @@
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */
 
-#ifndef _CTDB_PARTIAL_H
-#define _CTDB_PARTIAL_H
-
 #include "includes.h"
 #include "lib/tdb/include/tdb.h"
 #include "lib/events/events.h"
 #include "../include/ctdb_private.h"
 #include "ctdb.h"
 
+/* structures for packet queueing - see common/ctdb_io.c */
+struct ctdb_partial {
+       uint8_t *data;
+       uint32_t length;
+};
 
-/* read a record from the file descriptor.
-   if the file descriptor has been closed  the user specifies ctx will be destryoed.
- */
-void ctdb_read_pdu(int fd, TALLOC_CTX *ctx, struct ctdb_partial *partial, partial_cb_fn_t func, void *args)
+struct ctdb_queue_pkt {
+       struct ctdb_queue_pkt *next, *prev;
+       uint8_t *data;
+       uint32_t length;
+};
+
+struct ctdb_queue {
+       struct ctdb_context *ctdb;
+       struct ctdb_partial partial; /* partial input packet */
+       struct ctdb_queue_pkt *out_queue;
+       struct fd_event *fde;
+       int fd;
+       size_t alignment;
+       void *private;
+       ctdb_queue_cb_fn_t callback;
+};
+
+
+
+/*
+  called when an incoming connection is readable
+*/
+static void queue_io_read(struct ctdb_queue *queue)
 {
        int num_ready = 0;
        ssize_t nread;
        uint8_t *data, *data_base;
 
-       if (ioctl(fd, FIONREAD, &num_ready) != 0 ||
+       if (ioctl(queue->fd, FIONREAD, &num_ready) != 0 ||
            num_ready == 0) {
                /* the descriptor has been closed */
-               func(NULL, 0, args);
-               return;
+               goto failed;
        }
 
 
-       partial->data = talloc_realloc_size(ctx, partial->data, 
-                                              num_ready + partial->length);
+       queue->partial.data = talloc_realloc_size(queue, queue->partial.data, 
+                                                 num_ready + queue->partial.length);
 
-       if (partial->data == NULL) {
-               func(NULL, 0, args);
-               return;
+       if (queue->partial.data == NULL) {
+               goto failed;
        }
 
-       nread = read(fd, partial->data+partial->length, num_ready);
+       nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready);
        if (nread <= 0) {
-               func(NULL, 0, args);
-               return;
+               goto failed;
        }
 
 
-       data = partial->data;
-       nread += partial->length;
+       data = queue->partial.data;
+       nread += queue->partial.length;
 
-       partial->data = NULL;
-       partial->length = 0;
+       queue->partial.data = NULL;
+       queue->partial.length = 0;
 
        if (nread >= 4 && *(uint32_t *)data == nread) {
-               /* it is the responsibility of the incoming packet function to free 'data' */
-               func(data, nread, args);
+               /* it is the responsibility of the incoming packet
+                function to free 'data' */
+               queue->callback(data, nread, queue->private);
                return;
        }
 
@@ -85,13 +104,12 @@ void ctdb_read_pdu(int fd, TALLOC_CTX *ctx, struct ctdb_partial *partial, partia
                uint8_t *d2;
                uint32_t len;
                len = *(uint32_t *)data;
-               d2 = talloc_memdup(ctx, data, len);
+               d2 = talloc_memdup(queue, data, len);
                if (d2 == NULL) {
                        /* sigh */
-                       func(NULL, 0, args);
-                       return;
+                       goto failed;
                }
-               func(d2, len, args);
+               queue->callback(d2, len, queue->private);
                data += len;
                nread -= len;           
        }
@@ -99,21 +117,189 @@ void ctdb_read_pdu(int fd, TALLOC_CTX *ctx, struct ctdb_partial *partial, partia
        if (nread > 0) {
                /* we have only part of a packet */
                if (data_base == data) {
-                       partial->data = data;
-                       partial->length = nread;
+                       queue->partial.data = data;
+                       queue->partial.length = nread;
                } else {
-                       partial->data = talloc_memdup(ctx, data, nread);
-                       if (partial->data == NULL) {
-                               func(NULL, 0, args);
-                               return;
+                       queue->partial.data = talloc_memdup(queue, data, nread);
+                       if (queue->partial.data == NULL) {
+                               goto failed;
                        }
-                       partial->length = nread;
+                       queue->partial.length = nread;
                        talloc_free(data_base);
                }
                return;
        }
 
        talloc_free(data_base);
+       return;
+
+failed:
+       queue->callback(NULL, 0, queue->private);
+}
+
+
+/* used when an event triggers a dead queue */
+static void queue_dead(struct event_context *ev, struct timed_event *te, 
+                      struct timeval t, void *private)
+{
+       struct ctdb_queue *queue = talloc_get_type(private, struct ctdb_queue);
+       queue->callback(NULL, 0, queue->private);
 }
 
-#endif
+
+/*
+  called when an incoming connection is writeable
+*/
+static void queue_io_write(struct ctdb_queue *queue)
+{
+       while (queue->out_queue) {
+               struct ctdb_queue_pkt *pkt = queue->out_queue;
+               ssize_t n;
+
+               n = write(queue->fd, pkt->data, pkt->length);
+
+               if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+                       event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
+                                       queue_dead, queue);
+                       EVENT_FD_NOT_WRITEABLE(queue->fde);
+                       return;
+               }
+               if (n <= 0) return;
+               
+               if (n != pkt->length) {
+                       pkt->length -= n;
+                       pkt->data += n;
+                       return;
+               }
+
+               DLIST_REMOVE(queue->out_queue, pkt);
+               talloc_free(pkt);
+       }
+
+       EVENT_FD_NOT_WRITEABLE(queue->fde);
+}
+
+/*
+  called when an incoming connection is readable or writeable
+*/
+static void queue_io_handler(struct event_context *ev, struct fd_event *fde, 
+                            uint16_t flags, void *private)
+{
+       struct ctdb_queue *queue = talloc_get_type(private, struct ctdb_queue);
+
+       if (flags & EVENT_FD_READ) {
+               queue_io_read(queue);
+       } else {
+               queue_io_write(queue);
+       }
+}
+
+
+/*
+  queue a packet for sending
+*/
+int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
+{
+       struct ctdb_queue_pkt *pkt;
+       uint32_t length2;
+
+       /* enforce the length and alignment rules from the tcp packet allocator */
+       length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
+       *(uint32_t *)data = length2;
+
+       if (length2 != length) {
+               memset(data+length, 0, length2-length);
+       }
+       
+       /* if the queue is empty then try an immediate write, avoiding
+          queue overhead. This relies on non-blocking sockets */
+       if (queue->out_queue == NULL && queue->fd != -1) {
+               ssize_t n = write(queue->fd, data, length2);
+               if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+                       event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
+                                       queue_dead, queue);
+                       /* yes, we report success, as the dead node is 
+                          handled via a separate event */
+                       return 0;
+               }
+               if (n > 0) {
+                       data += n;
+                       length2 -= n;
+               }
+               if (length2 == 0) return 0;
+       }
+
+       pkt = talloc(queue, struct ctdb_queue_pkt);
+       CTDB_NO_MEMORY(queue->ctdb, pkt);
+
+       pkt->data = talloc_memdup(pkt, data, length2);
+       CTDB_NO_MEMORY(queue->ctdb, pkt->data);
+
+       pkt->length = length2;
+
+       if (queue->out_queue == NULL && queue->fd != -1) {
+               EVENT_FD_WRITEABLE(queue->fde);
+       }
+
+       DLIST_ADD_END(queue->out_queue, pkt, struct ctdb_queue_pkt *);
+
+       return 0;
+}
+
+
+/*
+  setup the fd used by the queue
+ */
+int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
+{
+       queue->fd = fd;
+       talloc_free(queue->fde);
+       queue->fde = NULL;
+
+       if (fd != -1) {
+               queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ, 
+                                         queue_io_handler, queue);
+               if (queue->fde == NULL) {
+                       return -1;
+               }
+
+               if (queue->out_queue) {
+                       EVENT_FD_WRITEABLE(queue->fde);         
+               }
+       }
+
+       return 0;
+}
+
+
+
+/*
+  setup a packet queue on a socket
+ */
+struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
+                                   TALLOC_CTX *mem_ctx, int fd, int alignment,
+                                   
+                                   ctdb_queue_cb_fn_t callback,
+                                   void *private)
+{
+       struct ctdb_queue *queue;
+
+       queue = talloc_zero(mem_ctx, struct ctdb_queue);
+       CTDB_NO_MEMORY_NULL(ctdb, queue);
+
+       queue->ctdb = ctdb;
+       queue->fd = fd;
+       queue->alignment = alignment;
+       queue->private = private;
+       queue->callback = callback;
+       if (fd != -1) {
+               if (ctdb_queue_set_fd(queue, fd) != 0) {
+                       talloc_free(queue);
+                       return NULL;
+               }
+       }
+
+       return queue;
+}
+
+
index e1966af6b7f3c6175f0fc8ae6c1baddac6d636eb..e3d5b1f5aa70dd647cc6b46be167d15c6e94e2d8 100644 (file)
@@ -181,12 +181,4 @@ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALL
 int ctdb_record_store(struct ctdb_record_handle *rec, TDB_DATA data);
 
 
-struct ctdb_partial {
-       uint8_t *data;
-       uint32_t length;
-};
-/* callback is called with data==NULL for fauilures.    the callback must test for this and do cleanup appropriately */
-typedef void (*partial_cb_fn_t)(uint8_t *data, int cnt, void *args);
-void ctdb_read_pdu(int fd, TALLOC_CTX *ctx, struct ctdb_partial *partial, partial_cb_fn_t func, void *args);
-
 #endif
index 22b1d32f298e409462bba223d27c65f766aaae88..792fb75ba4897eb7643838a70320f36912ccd4bd 100644 (file)
@@ -43,6 +43,12 @@ struct ctdb_address {
        int port;
 };
 
+
+/* called from the queue code when a packet comes in. Called with data==NULL
+   on error */
+typedef void (*ctdb_queue_cb_fn_t)(uint8_t *data, size_t length, void *private);
+
+
 /*
   state associated with one node
 */
@@ -88,9 +94,7 @@ struct ctdb_upcalls {
 struct ctdb_daemon_data {
        int sd;
        char *name;
-       struct ctdbd_queue_packet *queue;
-       struct fd_event *fde;
-       struct ctdb_partial partial;
+       struct ctdb_queue *queue;
 };
 
 /* main state of the ctdb daemon */
@@ -289,5 +293,25 @@ int ctdbd_start(struct ctdb_context *ctdb);
 struct ctdb_call_state *ctdbd_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call);
 int ctdbd_call_recv(struct ctdb_call_state *state, struct ctdb_call *call);
 
+/*
+  queue a packet for sending
+*/
+int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length);
+
+/*
+  setup the fd used by the queue
+ */
+int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd);
+
+/*
+  setup a packet queue on a socket
+ */
+struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
+                                   TALLOC_CTX *mem_ctx, int fd, int alignment,
+                                   
+                                   ctdb_queue_cb_fn_t callback,
+                                   void *private);
+
+
 
 #endif
index 5b6cd299b9295b6154804f101adced848b7b30ce..4fa496cd5ac1508b28749a29de4e9bc784df7778 100644 (file)
@@ -30,17 +30,7 @@ struct ctdb_tcp {
 struct ctdb_incoming {
        struct ctdb_context *ctdb;
        int fd;
-       struct ctdb_partial partial;
-};
-
-/*
-  outgoing packet structure - only allocated when we can't write immediately
-  to the socket
-*/
-struct ctdb_tcp_packet {
-       struct ctdb_tcp_packet *next, *prev;
-       uint8_t *data;
-       uint32_t length;
+       struct ctdb_queue *queue;
 };
 
 /*
@@ -48,19 +38,15 @@ struct ctdb_tcp_packet {
 */
 struct ctdb_tcp_node {
        int fd;
-       struct fd_event *fde;
-       struct ctdb_tcp_packet *queue;
+       struct ctdb_queue *queue;
 };
 
 
 /* prototypes internal to tcp transport */
-void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, 
-                        uint16_t flags, void *private);
-void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, 
-                           uint16_t flags, void *private);
 int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length);
 int ctdb_tcp_listen(struct ctdb_context *ctdb);
 void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, 
                           struct timeval t, void *private);
+void ctdb_tcp_read_cb(uint8_t *data, size_t cnt, void *args);
 
 #define CTDB_TCP_ALIGNMENT 8
index 85fffc2f7030ea40e452938effd3565f60275e57..bccc8a63aa0572e4596fbe5526f26c73ad143215 100644 (file)
@@ -34,6 +34,24 @@ static void set_nonblocking(int fd)
 }
 
 
+/*
+  called when a complete packet has come in - should not happen on this socket
+ */
+void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private)
+{
+       struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
+       struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
+                                                     struct ctdb_tcp_node);
+
+       /* start a new connect cycle to try to re-establish the
+          link */
+       close(tnode->fd);
+       ctdb_queue_set_fd(tnode->queue, -1);
+       tnode->fd = -1;
+       event_add_timed(node->ctdb->ev, node, timeval_zero(), 
+                       ctdb_tcp_node_connect, node);
+}
+
 /*
   called when socket becomes writeable on connect
 */
@@ -59,17 +77,14 @@ static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *f
        }
 
        talloc_free(fde);
-       tnode->fde = event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_READ, 
-                                 ctdb_tcp_node_write, node);
+       
+        setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one));
+
+       tnode->queue = ctdb_queue_setup(node->ctdb, node, tnode->fd, CTDB_TCP_ALIGNMENT,
+                                       ctdb_tcp_tnode_cb, node);
 
        /* tell the ctdb layer we are connected */
        node->ctdb->upcalls->node_connected(node);
-
-        setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one));
-
-       if (tnode->queue) {
-               EVENT_FD_WRITEABLE(tnode->fde);         
-       }
 }
 
 
@@ -177,8 +192,8 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
 
        set_nonblocking(in->fd);
 
-       event_add_fd(ctdb->ev, in, in->fd, EVENT_FD_READ
-                    ctdb_tcp_incoming_read, in);       
+       in->queue = ctdb_queue_setup(ctdb, in, in->fd, CTDB_TCP_ALIGNMENT
+                                    ctdb_tcp_read_cb, in);
 
        talloc_set_destructor(in, ctdb_incoming_destructor);
 }
index 22771669869b3dc5380a735d4bab8ec0f69b3da9..8ec0a1e538cc88dc3a7e4010bf2a68673e58e376 100644 (file)
 
 
 /*
-  called when we fail to send a message to a node
-*/
-static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te, 
-                              struct timeval t, void *private)
+  called when a complete packet has come in
+ */
+void ctdb_tcp_read_cb(uint8_t *data, size_t cnt, void *args)
 {
-       struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
-       struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
-                                                     struct ctdb_tcp_node);
-
-       /* start a new connect cycle to try to re-establish the
-          link */
-       talloc_free(tnode->fde);
-       close(tnode->fd);
-       tnode->fd = -1;
-       event_add_timed(node->ctdb->ev, node, timeval_zero(), 
-                       ctdb_tcp_node_connect, node);
-}
+       struct ctdb_incoming *in = talloc_get_type(args, struct ctdb_incoming);
+       struct ctdb_req_header *hdr;
 
-/*
-  called when socket becomes readable
-*/
-void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, 
-                        uint16_t flags, void *private)
-{
-       struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
-       struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
-                                                     struct ctdb_tcp_node);
-       if (flags & EVENT_FD_READ) {
-               /* getting a read event on this fd in the current tcp model is
-                  always an error, as we have separate read and write
-                  sockets. In future we may combine them, but for now it must
-                  mean that the socket is dead, so we try to reconnect */
-               node->ctdb->upcalls->node_dead(node);
-               talloc_free(tnode->fde);
-               close(tnode->fd);
-               tnode->fd = -1;
-               event_add_timed(node->ctdb->ev, node, timeval_zero(), 
-                               ctdb_tcp_node_connect, node);
+       if (data == NULL) {
+               /* incoming socket has died */
+               talloc_free(in);
                return;
        }
 
-       while (tnode->queue) {
-               struct ctdb_tcp_packet *pkt = tnode->queue;
-               ssize_t n;
-
-               n = write(tnode->fd, pkt->data, pkt->length);
-
-               if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
-                       event_add_timed(node->ctdb->ev, node, timeval_zero(), 
-                                       ctdb_tcp_node_dead, node);
-                       EVENT_FD_NOT_WRITEABLE(tnode->fde);
-                       return;
-               }
-               if (n <= 0) return;
-               
-               if (n != pkt->length) {
-                       pkt->length -= n;
-                       pkt->data += n;
-                       return;
-               }
-
-               DLIST_REMOVE(tnode->queue, pkt);
-               talloc_free(pkt);
-       }
-
-       EVENT_FD_NOT_WRITEABLE(tnode->fde);
-}
-
-
-
-static void tcp_read_cb(uint8_t *data, int cnt, void *args)
-{
-       struct ctdb_incoming *in = talloc_get_type(args, struct ctdb_incoming);
-       struct ctdb_req_header *hdr;
-
        if (cnt < sizeof(*hdr)) {
                ctdb_set_error(in->ctdb, "Bad packet length %d\n", cnt);
                return;
@@ -130,66 +68,12 @@ static void tcp_read_cb(uint8_t *data, int cnt, void *args)
        in->ctdb->upcalls->recv_pkt(in->ctdb, data, cnt);
 }
 
-/*
-  called when an incoming connection is readable
-*/
-void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, 
-                           uint16_t flags, void *private)
-{
-       struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
-
-       ctdb_read_pdu(in->fd, in, &in->partial, tcp_read_cb, in);
-}
-
 /*
   queue a packet for sending
 */
 int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
 {
-       struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
+       struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
                                                      struct ctdb_tcp_node);
-       struct ctdb_tcp_packet *pkt;
-       uint32_t length2;
-
-       /* enforce the length and alignment rules from the tcp packet allocator */
-       length2 = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
-       *(uint32_t *)data = length2;
-
-       if (length2 != length) {
-               memset(data+length, 0, length2-length);
-       }
-       
-       /* if the queue is empty then try an immediate write, avoiding
-          queue overhead. This relies on non-blocking sockets */
-       if (tnode->queue == NULL && tnode->fd != -1) {
-               ssize_t n = write(tnode->fd, data, length2);
-               if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
-                       event_add_timed(node->ctdb->ev, node, timeval_zero(), 
-                                       ctdb_tcp_node_dead, node);
-                       /* yes, we report success, as the dead node is 
-                          handled via a separate event */
-                       return 0;
-               }
-               if (n > 0) {
-                       data += n;
-                       length2 -= n;
-               }
-               if (length2 == 0) return 0;
-       }
-
-       pkt = talloc(tnode, struct ctdb_tcp_packet);
-       CTDB_NO_MEMORY(node->ctdb, pkt);
-
-       pkt->data = talloc_memdup(pkt, data, length2);
-       CTDB_NO_MEMORY(node->ctdb, pkt->data);
-
-       pkt->length = length2;
-
-       if (tnode->queue == NULL && tnode->fd != -1) {
-               EVENT_FD_WRITEABLE(tnode->fde);
-       }
-
-       DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);
-
-       return 0;
+       return ctdb_queue_send(tnode->queue, data, length);
 }