From: Andrew Tridgell Date: Tue, 10 Apr 2007 09:33:21 +0000 (+1000) Subject: made all sockets handle partial IO X-Git-Tag: tevent-0.9.20~348^2~2950 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=f1e0174e8314143408c1aa6637bafafdd445c9f9;p=thirdparty%2Fsamba.git made all sockets handle partial IO abstract IO via ctdb_queue_*() functions (This used to be ctdb commit 636ae76f4632b29231db87be32c9114f58b37840) --- diff --git a/ctdb/common/ctdb_daemon.c b/ctdb/common/ctdb_daemon.c index dc1e1477163..f2a8e8462ad 100644 --- a/ctdb/common/ctdb_daemon.c +++ b/ctdb/common/ctdb_daemon.c @@ -29,6 +29,7 @@ #include "../include/ctdb_private.h" #define CTDB_PATH "/tmp/ctdb.socket" +#define CTDB_DS_ALIGNMENT 8 static void ctdb_main_loop(struct ctdb_context *ctdb) @@ -51,12 +52,13 @@ static void set_non_blocking(int fd) } - +/* + structure describing a connected client in the daemon + */ struct ctdb_client { struct ctdb_context *ctdb; - struct fd_event *fde; int fd; - struct ctdb_partial partial; + struct ctdb_queue *queue; }; @@ -71,7 +73,10 @@ static int ctdb_client_destructor(struct ctdb_client *client) } - +/* + this is called when the ctdb daemon received a ctdb request call + from a local client over the unix domain socket + */ static void client_request_call(struct ctdb_client *client, struct ctdb_req_call *c) { struct ctdb_call_state *state; @@ -159,7 +164,7 @@ static void client_incoming_packet(struct ctdb_client *client, void *data, size_ } -static void ctdb_client_read_cb(uint8_t *data, int cnt, void *args) +static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) { struct ctdb_client *client = talloc_get_type(args, struct ctdb_client); struct ctdb_req_header *hdr; @@ -189,15 +194,6 @@ static void ctdb_client_read_cb(uint8_t *data, int cnt, void *args) client_incoming_packet(client, data, cnt); } -static void ctdb_client_read(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private) -{ - struct ctdb_client *client = talloc_get_type(private, struct ctdb_client); - - ctdb_read_pdu(client->fd, client, &client->partial, ctdb_client_read_cb, client); -} - - static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, uint16_t flags, void *private) { @@ -219,8 +215,8 @@ static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, client->ctdb = ctdb; client->fd = fd; - event_add_fd(ctdb->ev, client, client->fd, EVENT_FD_READ, - ctdb_client_read, client); + client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, + ctdb_client_read_cb, client); talloc_set_destructor(client, ctdb_client_destructor); } @@ -340,7 +336,7 @@ int ctdbd_start(struct ctdb_context *ctdb) } -static void ctdb_daemon_read_cb(uint8_t *data, int cnt, void *args) +static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args) { struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context); struct ctdb_req_header *hdr; @@ -370,22 +366,6 @@ static void ctdb_daemon_read_cb(uint8_t *data, int cnt, void *args) } - -static void ctdb_daemon_io(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private) -{ - struct ctdb_context *ctdb = talloc_get_type(private, struct ctdb_context); - - - if (flags&EVENT_FD_READ) { - ctdb_read_pdu(ctdb->daemon.sd, ctdb, &ctdb->daemon.partial, ctdb_daemon_read_cb, ctdb); - } - if (flags&EVENT_FD_WRITE) { - printf("socket is filled. fix this see tcp_io/ctdb_tcp_node_write how to do this\n"); -/* ctdb_daemon_write(ctdb);*/ - } -} - /* connect to a unix domain socket */ @@ -408,8 +388,9 @@ static int ux_socket_connect(struct ctdb_context *ctdb) return -1; } - ctdb->daemon.fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ, - ctdb_daemon_io, ctdb); + ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, + CTDB_DS_ALIGNMENT, + ctdb_daemon_read_cb, ctdb); return 0; } @@ -427,7 +408,6 @@ static int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key) } -#define CTDB_DS_ALIGNMENT 8 static void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len) { int size; @@ -448,49 +428,7 @@ struct ctdbd_queue_packet { */ int ctdbd_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { - uint8_t *data = (uint8_t *)hdr; - uint32_t length = hdr->length; - struct ctdbd_queue_packet *pkt; - uint32_t length2; - - /* enforce the length and alignment rules from the tcp packet allocator */ - length2 = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1); - *(uint32_t *)data = length2; - - if (length2 != length) { - memset(data+length, 0, length2-length); - } - - /* if the queue is empty then try an immediate write, avoiding - queue overhead. This relies on non-blocking sockets */ - if (ctdb->daemon.queue == NULL) { - ssize_t n = write(ctdb->daemon.sd, data, length2); - if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { - printf("socket to ctdb daemon has died\n"); - return -1; - } - if (n > 0) { - data += n; - length2 -= n; - } - if (length2 == 0) return 0; - } - - pkt = talloc(ctdb, struct ctdbd_queue_packet); - CTDB_NO_MEMORY(ctdb, pkt); - - pkt->data = talloc_memdup(pkt, data, length2); - CTDB_NO_MEMORY(ctdb, pkt->data); - - pkt->length = length2; - - if (ctdb->daemon.queue == NULL) { - EVENT_FD_WRITEABLE(ctdb->daemon.fde); - } - - DLIST_ADD_END(ctdb->daemon.queue, pkt, struct ctdbd_queue_packet *); - - return 0; + return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length); } diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c index f627fedc38a..e6269b18ab9 100644 --- a/ctdb/common/ctdb_io.c +++ b/ctdb/common/ctdb_io.c @@ -21,9 +21,6 @@ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#ifndef _CTDB_PARTIAL_H -#define _CTDB_PARTIAL_H - #include "includes.h" #include "lib/tdb/include/tdb.h" #include "lib/events/events.h" @@ -33,48 +30,70 @@ #include "../include/ctdb_private.h" #include "ctdb.h" +/* structures for packet queueing - see common/ctdb_io.c */ +struct ctdb_partial { + uint8_t *data; + uint32_t length; +}; -/* read a record from the file descriptor. - if the file descriptor has been closed the user specifies ctx will be destryoed. - */ -void ctdb_read_pdu(int fd, TALLOC_CTX *ctx, struct ctdb_partial *partial, partial_cb_fn_t func, void *args) +struct ctdb_queue_pkt { + struct ctdb_queue_pkt *next, *prev; + uint8_t *data; + uint32_t length; +}; + +struct ctdb_queue { + struct ctdb_context *ctdb; + struct ctdb_partial partial; /* partial input packet */ + struct ctdb_queue_pkt *out_queue; + struct fd_event *fde; + int fd; + size_t alignment; + void *private; + ctdb_queue_cb_fn_t callback; +}; + + + +/* + called when an incoming connection is readable +*/ +static void queue_io_read(struct ctdb_queue *queue) { int num_ready = 0; ssize_t nread; uint8_t *data, *data_base; - if (ioctl(fd, FIONREAD, &num_ready) != 0 || + if (ioctl(queue->fd, FIONREAD, &num_ready) != 0 || num_ready == 0) { /* the descriptor has been closed */ - func(NULL, 0, args); - return; + goto failed; } - partial->data = talloc_realloc_size(ctx, partial->data, - num_ready + partial->length); + queue->partial.data = talloc_realloc_size(queue, queue->partial.data, + num_ready + queue->partial.length); - if (partial->data == NULL) { - func(NULL, 0, args); - return; + if (queue->partial.data == NULL) { + goto failed; } - nread = read(fd, partial->data+partial->length, num_ready); + nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready); if (nread <= 0) { - func(NULL, 0, args); - return; + goto failed; } - data = partial->data; - nread += partial->length; + data = queue->partial.data; + nread += queue->partial.length; - partial->data = NULL; - partial->length = 0; + queue->partial.data = NULL; + queue->partial.length = 0; if (nread >= 4 && *(uint32_t *)data == nread) { - /* it is the responsibility of the incoming packet function to free 'data' */ - func(data, nread, args); + /* it is the responsibility of the incoming packet + function to free 'data' */ + queue->callback(data, nread, queue->private); return; } @@ -85,13 +104,12 @@ void ctdb_read_pdu(int fd, TALLOC_CTX *ctx, struct ctdb_partial *partial, partia uint8_t *d2; uint32_t len; len = *(uint32_t *)data; - d2 = talloc_memdup(ctx, data, len); + d2 = talloc_memdup(queue, data, len); if (d2 == NULL) { /* sigh */ - func(NULL, 0, args); - return; + goto failed; } - func(d2, len, args); + queue->callback(d2, len, queue->private); data += len; nread -= len; } @@ -99,21 +117,189 @@ void ctdb_read_pdu(int fd, TALLOC_CTX *ctx, struct ctdb_partial *partial, partia if (nread > 0) { /* we have only part of a packet */ if (data_base == data) { - partial->data = data; - partial->length = nread; + queue->partial.data = data; + queue->partial.length = nread; } else { - partial->data = talloc_memdup(ctx, data, nread); - if (partial->data == NULL) { - func(NULL, 0, args); - return; + queue->partial.data = talloc_memdup(queue, data, nread); + if (queue->partial.data == NULL) { + goto failed; } - partial->length = nread; + queue->partial.length = nread; talloc_free(data_base); } return; } talloc_free(data_base); + return; + +failed: + queue->callback(NULL, 0, queue->private); +} + + +/* used when an event triggers a dead queue */ +static void queue_dead(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private) +{ + struct ctdb_queue *queue = talloc_get_type(private, struct ctdb_queue); + queue->callback(NULL, 0, queue->private); } -#endif + +/* + called when an incoming connection is writeable +*/ +static void queue_io_write(struct ctdb_queue *queue) +{ + while (queue->out_queue) { + struct ctdb_queue_pkt *pkt = queue->out_queue; + ssize_t n; + + n = write(queue->fd, pkt->data, pkt->length); + + if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + event_add_timed(queue->ctdb->ev, queue, timeval_zero(), + queue_dead, queue); + EVENT_FD_NOT_WRITEABLE(queue->fde); + return; + } + if (n <= 0) return; + + if (n != pkt->length) { + pkt->length -= n; + pkt->data += n; + return; + } + + DLIST_REMOVE(queue->out_queue, pkt); + talloc_free(pkt); + } + + EVENT_FD_NOT_WRITEABLE(queue->fde); +} + +/* + called when an incoming connection is readable or writeable +*/ +static void queue_io_handler(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private) +{ + struct ctdb_queue *queue = talloc_get_type(private, struct ctdb_queue); + + if (flags & EVENT_FD_READ) { + queue_io_read(queue); + } else { + queue_io_write(queue); + } +} + + +/* + queue a packet for sending +*/ +int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length) +{ + struct ctdb_queue_pkt *pkt; + uint32_t length2; + + /* enforce the length and alignment rules from the tcp packet allocator */ + length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1); + *(uint32_t *)data = length2; + + if (length2 != length) { + memset(data+length, 0, length2-length); + } + + /* if the queue is empty then try an immediate write, avoiding + queue overhead. This relies on non-blocking sockets */ + if (queue->out_queue == NULL && queue->fd != -1) { + ssize_t n = write(queue->fd, data, length2); + if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + event_add_timed(queue->ctdb->ev, queue, timeval_zero(), + queue_dead, queue); + /* yes, we report success, as the dead node is + handled via a separate event */ + return 0; + } + if (n > 0) { + data += n; + length2 -= n; + } + if (length2 == 0) return 0; + } + + pkt = talloc(queue, struct ctdb_queue_pkt); + CTDB_NO_MEMORY(queue->ctdb, pkt); + + pkt->data = talloc_memdup(pkt, data, length2); + CTDB_NO_MEMORY(queue->ctdb, pkt->data); + + pkt->length = length2; + + if (queue->out_queue == NULL && queue->fd != -1) { + EVENT_FD_WRITEABLE(queue->fde); + } + + DLIST_ADD_END(queue->out_queue, pkt, struct ctdb_queue_pkt *); + + return 0; +} + + +/* + setup the fd used by the queue + */ +int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd) +{ + queue->fd = fd; + talloc_free(queue->fde); + queue->fde = NULL; + + if (fd != -1) { + queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ, + queue_io_handler, queue); + if (queue->fde == NULL) { + return -1; + } + + if (queue->out_queue) { + EVENT_FD_WRITEABLE(queue->fde); + } + } + + return 0; +} + + + +/* + setup a packet queue on a socket + */ +struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb, + TALLOC_CTX *mem_ctx, int fd, int alignment, + + ctdb_queue_cb_fn_t callback, + void *private) +{ + struct ctdb_queue *queue; + + queue = talloc_zero(mem_ctx, struct ctdb_queue); + CTDB_NO_MEMORY_NULL(ctdb, queue); + + queue->ctdb = ctdb; + queue->fd = fd; + queue->alignment = alignment; + queue->private = private; + queue->callback = callback; + if (fd != -1) { + if (ctdb_queue_set_fd(queue, fd) != 0) { + talloc_free(queue); + return NULL; + } + } + + return queue; +} + + diff --git a/ctdb/include/ctdb.h b/ctdb/include/ctdb.h index e1966af6b7f..e3d5b1f5aa7 100644 --- a/ctdb/include/ctdb.h +++ b/ctdb/include/ctdb.h @@ -181,12 +181,4 @@ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALL int ctdb_record_store(struct ctdb_record_handle *rec, TDB_DATA data); -struct ctdb_partial { - uint8_t *data; - uint32_t length; -}; -/* callback is called with data==NULL for fauilures. the callback must test for this and do cleanup appropriately */ -typedef void (*partial_cb_fn_t)(uint8_t *data, int cnt, void *args); -void ctdb_read_pdu(int fd, TALLOC_CTX *ctx, struct ctdb_partial *partial, partial_cb_fn_t func, void *args); - #endif diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h index 22b1d32f298..792fb75ba48 100644 --- a/ctdb/include/ctdb_private.h +++ b/ctdb/include/ctdb_private.h @@ -43,6 +43,12 @@ struct ctdb_address { int port; }; + +/* called from the queue code when a packet comes in. Called with data==NULL + on error */ +typedef void (*ctdb_queue_cb_fn_t)(uint8_t *data, size_t length, void *private); + + /* state associated with one node */ @@ -88,9 +94,7 @@ struct ctdb_upcalls { struct ctdb_daemon_data { int sd; char *name; - struct ctdbd_queue_packet *queue; - struct fd_event *fde; - struct ctdb_partial partial; + struct ctdb_queue *queue; }; /* main state of the ctdb daemon */ @@ -289,5 +293,25 @@ int ctdbd_start(struct ctdb_context *ctdb); struct ctdb_call_state *ctdbd_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call); int ctdbd_call_recv(struct ctdb_call_state *state, struct ctdb_call *call); +/* + queue a packet for sending +*/ +int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length); + +/* + setup the fd used by the queue + */ +int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd); + +/* + setup a packet queue on a socket + */ +struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb, + TALLOC_CTX *mem_ctx, int fd, int alignment, + + ctdb_queue_cb_fn_t callback, + void *private); + + #endif diff --git a/ctdb/tcp/ctdb_tcp.h b/ctdb/tcp/ctdb_tcp.h index 5b6cd299b92..4fa496cd5ac 100644 --- a/ctdb/tcp/ctdb_tcp.h +++ b/ctdb/tcp/ctdb_tcp.h @@ -30,17 +30,7 @@ struct ctdb_tcp { struct ctdb_incoming { struct ctdb_context *ctdb; int fd; - struct ctdb_partial partial; -}; - -/* - outgoing packet structure - only allocated when we can't write immediately - to the socket -*/ -struct ctdb_tcp_packet { - struct ctdb_tcp_packet *next, *prev; - uint8_t *data; - uint32_t length; + struct ctdb_queue *queue; }; /* @@ -48,19 +38,15 @@ struct ctdb_tcp_packet { */ struct ctdb_tcp_node { int fd; - struct fd_event *fde; - struct ctdb_tcp_packet *queue; + struct ctdb_queue *queue; }; /* prototypes internal to tcp transport */ -void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private); -void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private); int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length); int ctdb_tcp_listen(struct ctdb_context *ctdb); void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, struct timeval t, void *private); +void ctdb_tcp_read_cb(uint8_t *data, size_t cnt, void *args); #define CTDB_TCP_ALIGNMENT 8 diff --git a/ctdb/tcp/tcp_connect.c b/ctdb/tcp/tcp_connect.c index 85fffc2f703..bccc8a63aa0 100644 --- a/ctdb/tcp/tcp_connect.c +++ b/ctdb/tcp/tcp_connect.c @@ -34,6 +34,24 @@ static void set_nonblocking(int fd) } +/* + called when a complete packet has come in - should not happen on this socket + */ +void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private) +{ + struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); + struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_tcp_node); + + /* start a new connect cycle to try to re-establish the + link */ + close(tnode->fd); + ctdb_queue_set_fd(tnode->queue, -1); + tnode->fd = -1; + event_add_timed(node->ctdb->ev, node, timeval_zero(), + ctdb_tcp_node_connect, node); +} + /* called when socket becomes writeable on connect */ @@ -59,17 +77,14 @@ static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *f } talloc_free(fde); - tnode->fde = event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_READ, - ctdb_tcp_node_write, node); + + setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one)); + + tnode->queue = ctdb_queue_setup(node->ctdb, node, tnode->fd, CTDB_TCP_ALIGNMENT, + ctdb_tcp_tnode_cb, node); /* tell the ctdb layer we are connected */ node->ctdb->upcalls->node_connected(node); - - setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one)); - - if (tnode->queue) { - EVENT_FD_WRITEABLE(tnode->fde); - } } @@ -177,8 +192,8 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, set_nonblocking(in->fd); - event_add_fd(ctdb->ev, in, in->fd, EVENT_FD_READ, - ctdb_tcp_incoming_read, in); + in->queue = ctdb_queue_setup(ctdb, in, in->fd, CTDB_TCP_ALIGNMENT, + ctdb_tcp_read_cb, in); talloc_set_destructor(in, ctdb_incoming_destructor); } diff --git a/ctdb/tcp/tcp_io.c b/ctdb/tcp/tcp_io.c index 22771669869..8ec0a1e538c 100644 --- a/ctdb/tcp/tcp_io.c +++ b/ctdb/tcp/tcp_io.c @@ -29,81 +29,19 @@ /* - called when we fail to send a message to a node -*/ -static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private) + called when a complete packet has come in + */ +void ctdb_tcp_read_cb(uint8_t *data, size_t cnt, void *args) { - struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); - struct ctdb_tcp_node *tnode = talloc_get_type(node->private, - struct ctdb_tcp_node); - - /* start a new connect cycle to try to re-establish the - link */ - talloc_free(tnode->fde); - close(tnode->fd); - tnode->fd = -1; - event_add_timed(node->ctdb->ev, node, timeval_zero(), - ctdb_tcp_node_connect, node); -} + struct ctdb_incoming *in = talloc_get_type(args, struct ctdb_incoming); + struct ctdb_req_header *hdr; -/* - called when socket becomes readable -*/ -void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private) -{ - struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); - struct ctdb_tcp_node *tnode = talloc_get_type(node->private, - struct ctdb_tcp_node); - if (flags & EVENT_FD_READ) { - /* getting a read event on this fd in the current tcp model is - always an error, as we have separate read and write - sockets. In future we may combine them, but for now it must - mean that the socket is dead, so we try to reconnect */ - node->ctdb->upcalls->node_dead(node); - talloc_free(tnode->fde); - close(tnode->fd); - tnode->fd = -1; - event_add_timed(node->ctdb->ev, node, timeval_zero(), - ctdb_tcp_node_connect, node); + if (data == NULL) { + /* incoming socket has died */ + talloc_free(in); return; } - while (tnode->queue) { - struct ctdb_tcp_packet *pkt = tnode->queue; - ssize_t n; - - n = write(tnode->fd, pkt->data, pkt->length); - - if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { - event_add_timed(node->ctdb->ev, node, timeval_zero(), - ctdb_tcp_node_dead, node); - EVENT_FD_NOT_WRITEABLE(tnode->fde); - return; - } - if (n <= 0) return; - - if (n != pkt->length) { - pkt->length -= n; - pkt->data += n; - return; - } - - DLIST_REMOVE(tnode->queue, pkt); - talloc_free(pkt); - } - - EVENT_FD_NOT_WRITEABLE(tnode->fde); -} - - - -static void tcp_read_cb(uint8_t *data, int cnt, void *args) -{ - struct ctdb_incoming *in = talloc_get_type(args, struct ctdb_incoming); - struct ctdb_req_header *hdr; - if (cnt < sizeof(*hdr)) { ctdb_set_error(in->ctdb, "Bad packet length %d\n", cnt); return; @@ -130,66 +68,12 @@ static void tcp_read_cb(uint8_t *data, int cnt, void *args) in->ctdb->upcalls->recv_pkt(in->ctdb, data, cnt); } -/* - called when an incoming connection is readable -*/ -void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private) -{ - struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming); - - ctdb_read_pdu(in->fd, in, &in->partial, tcp_read_cb, in); -} - /* queue a packet for sending */ int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length) { - struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_tcp_node *tnode = talloc_get_type(node->private, struct ctdb_tcp_node); - struct ctdb_tcp_packet *pkt; - uint32_t length2; - - /* enforce the length and alignment rules from the tcp packet allocator */ - length2 = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1); - *(uint32_t *)data = length2; - - if (length2 != length) { - memset(data+length, 0, length2-length); - } - - /* if the queue is empty then try an immediate write, avoiding - queue overhead. This relies on non-blocking sockets */ - if (tnode->queue == NULL && tnode->fd != -1) { - ssize_t n = write(tnode->fd, data, length2); - if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { - event_add_timed(node->ctdb->ev, node, timeval_zero(), - ctdb_tcp_node_dead, node); - /* yes, we report success, as the dead node is - handled via a separate event */ - return 0; - } - if (n > 0) { - data += n; - length2 -= n; - } - if (length2 == 0) return 0; - } - - pkt = talloc(tnode, struct ctdb_tcp_packet); - CTDB_NO_MEMORY(node->ctdb, pkt); - - pkt->data = talloc_memdup(pkt, data, length2); - CTDB_NO_MEMORY(node->ctdb, pkt->data); - - pkt->length = length2; - - if (tnode->queue == NULL && tnode->fd != -1) { - EVENT_FD_WRITEABLE(tnode->fde); - } - - DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *); - - return 0; + return ctdb_queue_send(tnode->queue, data, length); }