]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
added handling of partial packet reads
authorAndrew Tridgell <tridge@samba.org>
Tue, 19 Dec 2006 01:03:10 +0000 (12:03 +1100)
committerAndrew Tridgell <tridge@samba.org>
Tue, 19 Dec 2006 01:03:10 +0000 (12:03 +1100)
added transport level packet allocator, allowing the transport to
enforce alignment or special memory rules

(This used to be ctdb commit 50304a5c4d8d640732678eeed793857334ca5ec1)

ctdb/common/ctdb_call.c
ctdb/include/ctdb_private.h
ctdb/tcp/ctdb_tcp.h
ctdb/tcp/tcp_connect.c
ctdb/tcp/tcp_init.c
ctdb/tcp/tcp_io.c

index ca111b7e21f0988bece2e2ca27227ae07c7c84b9..81f3cbdea10212c27bb6d24f71e87925a8be6567 100644 (file)
@@ -130,7 +130,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
        va_end(ap);
 
        len = strlen(msg)+1;
-       r = talloc_size(ctdb, sizeof(*r) + len);
+       r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + len);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.length = sizeof(*r) + len;
        r->hdr.operation = CTDB_REPLY_ERROR;
@@ -158,7 +158,7 @@ static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
 {
        struct ctdb_reply_redirect *r;
 
-       r = talloc_size(ctdb, sizeof(*r));
+       r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r));
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.length = sizeof(*r);
        r->hdr.operation = CTDB_REPLY_REDIRECT;
@@ -188,7 +188,7 @@ static void ctdb_call_send_dmaster(struct ctdb_context *ctdb,
        int len;
        
        len = sizeof(*r) + key->dsize + data->dsize;
-       r = talloc_size(ctdb, len);
+       r = ctdb->methods->allocate_pkt(ctdb, len);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.length    = len;
        r->hdr.operation = CTDB_REQ_DMASTER;
@@ -255,7 +255,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        }
 
        /* send the CTDB_REPLY_DMASTER */
-       r = talloc_size(ctdb, sizeof(*r) + data.dsize);
+       r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + data.dsize);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.length = sizeof(*r) + data.dsize;
        r->hdr.operation = CTDB_REPLY_DMASTER;
@@ -317,7 +317,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                        call_data.dsize?&call_data:NULL,
                        &reply_data, c->hdr.srcnode);
 
-       r = talloc_size(ctdb, sizeof(*r) + reply_data.dsize);
+       r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + reply_data.dsize);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.length = sizeof(*r) + reply_data.dsize;
        r->hdr.operation = CTDB_REPLY_CALL;
@@ -539,7 +539,7 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb,
        CTDB_NO_MEMORY_NULL(ctdb, state);
 
        len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
-       state->c = talloc_size(ctdb, len);
+       state->c = ctdb->methods->allocate_pkt(ctdb, len);
        CTDB_NO_MEMORY_NULL(ctdb, state->c);
 
        state->c->hdr.length    = len;
index f21139eab2cab36b3f4d5449588d3106c8ae1776..a024147c63e39002d044b06a0dff0977e6b3760d 100644 (file)
@@ -55,6 +55,7 @@ struct ctdb_methods {
        int (*start)(struct ctdb_context *); /* start protocol processing */    
        int (*add_node)(struct ctdb_node *); /* setup a new node */     
        int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length);
+       void *(*allocate_pkt)(struct ctdb_context *, size_t );
 };
 
 /*
index 14e96ea89795917b8006e9783d0a1f94c5d0c1ce..039343a99858d29b1f78e54c574ca055d0248c6f 100644 (file)
@@ -24,14 +24,29 @@ struct ctdb_tcp {
        int listen_fd;
 };
 
+/*
+  incoming packet structure - only used when we get a partial packet
+  on read
+*/
+struct ctdb_tcp_partial {
+       uint8_t *data;
+       uint32_t length;
+};
+
+
 /*
   state associated with an incoming connection
 */
 struct ctdb_incoming {
        struct ctdb_context *ctdb;
        int fd;
+       struct ctdb_tcp_partial partial;
 };
 
+/*
+  outgoing packet structure - only allocated when we can't write immediately
+  to the socket
+*/
 struct ctdb_tcp_packet {
        struct ctdb_tcp_packet *next, *prev;
        uint8_t *data;
index 6074b646aa4e9776474f1d52209b8ff89bd97379..bdcab2d5cac6abe9872a38fb3bfd574e90ba21fd 100644 (file)
@@ -135,7 +135,7 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
        fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
        if (fd == -1) return;
 
-       in = talloc(ctdb, struct ctdb_incoming);
+       in = talloc_zero(ctdb, struct ctdb_incoming);
        in->fd = fd;
        in->ctdb = ctdb;
 
index b3378677035fd4e10b9ccf6433aeeb6cd3869ea4..fca6506676f8215534f0d4162b47ad7858be9b19 100644 (file)
@@ -64,10 +64,24 @@ int ctdb_tcp_add_node(struct ctdb_node *node)
 }
 
 
+/*
+  transport packet allocator - allows transport to control memory for packets
+*/
+void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size)
+{
+       /* tcp transport needs to round to 8 byte alignment to ensure
+          that we can use a length header and 64 bit elements in
+          structures */
+       size = (size+7) & ~7;
+       return talloc_size(ctdb, size);
+}
+
+
 static const struct ctdb_methods ctdb_tcp_methods = {
        .start     = ctdb_tcp_start,
        .add_node  = ctdb_tcp_add_node,
-       .queue_pkt = ctdb_tcp_queue_pkt
+       .queue_pkt = ctdb_tcp_queue_pkt,
+       .allocate_pkt = ctdb_tcp_allocate_pkt
 };
 
 /*
index d572ba533fda7f6baf05d398288b0885971b4c13..167e3a2ca7d6b66b5fbb1d714315f19d33b9c4c5 100644 (file)
@@ -25,6 +25,7 @@
 #include "ctdb_private.h"
 #include "ctdb_tcp.h"
 
+
 /*
   called when we fail to send a message to a node
 */
@@ -109,11 +110,8 @@ void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
 {
        struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
        int num_ready = 0;
-       uint8_t *data;
-
-       /* NOTE: we don't yet handle combined packets or partial
-          packets. Obviously that needed fixing, using a similar
-          scheme to the Samba4 packet layer */
+       ssize_t nread;
+       uint8_t *data, *data_base;
 
        if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
            num_ready == 0) {
@@ -126,20 +124,71 @@ void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
                return;
        }
 
-       data = talloc_size(in, num_ready);
-       if (data == NULL) {
+       in->partial.data = talloc_realloc_size(in, in->partial.data, 
+                                              num_ready + in->partial.length);
+       if (in->partial.data == NULL) {
                /* not much we can do except drop the socket */
                talloc_free(in);
                return;
        }
 
-       if (read(in->fd, data, num_ready) != num_ready) {
+       nread = read(in->fd, in->partial.data+in->partial.length, num_ready);
+       if (nread <= 0) {
+               /* the connection must be dead */
                talloc_free(in);
                return;
        }
 
-       /* tell the ctdb layer above that we have a packet */
-       in->ctdb->upcalls->recv_pkt(in->ctdb, data, num_ready);
+       data = in->partial.data;
+       nread += in->partial.length;
+
+       in->partial.data = NULL;
+       in->partial.length = 0;
+
+       if (nread >= 4 && *(uint32_t *)data == nread) {
+               /* most common case - we got a whole packet in one go
+                  tell the ctdb layer above that we have a packet */
+               in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread);
+               return;
+       }
+
+       data_base = data;
+
+       while (nread >= 4 && *(uint32_t *)data <= nread) {
+               /* we have at least one packet */
+               uint8_t *d2;
+               uint32_t len;
+               len = *(uint32_t *)data;
+               d2 = talloc_memdup(in, data, len);
+               if (d2 == NULL) {
+                       /* sigh */
+                       talloc_free(in);
+                       return;
+               }
+               in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len);
+               data += len;
+               nread -= len;           
+               return;
+       }
+
+       if (nread < 4 || *(uint32_t *)data > nread) {
+               /* we have only part of a packet */
+               if (data_base == data) {
+                       in->partial.data = data;
+                       in->partial.length = nread;
+               } else {
+                       in->partial.data = talloc_memdup(in, data, nread);
+                       if (in->partial.data == NULL) {
+                               talloc_free(in);
+                               return;
+                       }
+                       in->partial.length = nread;
+                       talloc_free(data_base);
+               }
+               return;
+       }
+
+       talloc_free(data_base);
 }
 
 /*