va_end(ap);
len = strlen(msg)+1;
- r = talloc_size(ctdb, sizeof(*r) + len);
+ r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + len);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = sizeof(*r) + len;
r->hdr.operation = CTDB_REPLY_ERROR;
{
struct ctdb_reply_redirect *r;
- r = talloc_size(ctdb, sizeof(*r));
+ r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r));
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = sizeof(*r);
r->hdr.operation = CTDB_REPLY_REDIRECT;
int len;
len = sizeof(*r) + key->dsize + data->dsize;
- r = talloc_size(ctdb, len);
+ r = ctdb->methods->allocate_pkt(ctdb, len);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = len;
r->hdr.operation = CTDB_REQ_DMASTER;
}
/* send the CTDB_REPLY_DMASTER */
- r = talloc_size(ctdb, sizeof(*r) + data.dsize);
+ r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + data.dsize);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = sizeof(*r) + data.dsize;
r->hdr.operation = CTDB_REPLY_DMASTER;
call_data.dsize?&call_data:NULL,
&reply_data, c->hdr.srcnode);
- r = talloc_size(ctdb, sizeof(*r) + reply_data.dsize);
+ r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + reply_data.dsize);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = sizeof(*r) + reply_data.dsize;
r->hdr.operation = CTDB_REPLY_CALL;
CTDB_NO_MEMORY_NULL(ctdb, state);
len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
- state->c = talloc_size(ctdb, len);
+ state->c = ctdb->methods->allocate_pkt(ctdb, len);
CTDB_NO_MEMORY_NULL(ctdb, state->c);
state->c->hdr.length = len;
int (*start)(struct ctdb_context *); /* start protocol processing */
int (*add_node)(struct ctdb_node *); /* setup a new node */
int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length);
+ void *(*allocate_pkt)(struct ctdb_context *, size_t );
};
/*
int listen_fd;
};
+/*
+ incoming packet structure - only used when we get a partial packet
+ on read
+*/
+struct ctdb_tcp_partial {
+ uint8_t *data;
+ uint32_t length;
+};
+
+
/*
state associated with an incoming connection
*/
struct ctdb_incoming {
struct ctdb_context *ctdb;
int fd;
+ struct ctdb_tcp_partial partial;
};
+/*
+ outgoing packet structure - only allocated when we can't write immediately
+ to the socket
+*/
struct ctdb_tcp_packet {
struct ctdb_tcp_packet *next, *prev;
uint8_t *data;
fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
if (fd == -1) return;
- in = talloc(ctdb, struct ctdb_incoming);
+ in = talloc_zero(ctdb, struct ctdb_incoming);
in->fd = fd;
in->ctdb = ctdb;
}
+/*
+ transport packet allocator - allows transport to control memory for packets
+*/
+void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size)
+{
+ /* tcp transport needs to round to 8 byte alignment to ensure
+ that we can use a length header and 64 bit elements in
+ structures */
+ size = (size+7) & ~7;
+ return talloc_size(ctdb, size);
+}
+
+
static const struct ctdb_methods ctdb_tcp_methods = {
.start = ctdb_tcp_start,
.add_node = ctdb_tcp_add_node,
- .queue_pkt = ctdb_tcp_queue_pkt
+ .queue_pkt = ctdb_tcp_queue_pkt,
+ .allocate_pkt = ctdb_tcp_allocate_pkt
};
/*
#include "ctdb_private.h"
#include "ctdb_tcp.h"
+
/*
called when we fail to send a message to a node
*/
{
struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
int num_ready = 0;
- uint8_t *data;
-
- /* NOTE: we don't yet handle combined packets or partial
- packets. Obviously that needed fixing, using a similar
- scheme to the Samba4 packet layer */
+ ssize_t nread;
+ uint8_t *data, *data_base;
if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
num_ready == 0) {
return;
}
- data = talloc_size(in, num_ready);
- if (data == NULL) {
+ in->partial.data = talloc_realloc_size(in, in->partial.data,
+ num_ready + in->partial.length);
+ if (in->partial.data == NULL) {
/* not much we can do except drop the socket */
talloc_free(in);
return;
}
- if (read(in->fd, data, num_ready) != num_ready) {
+ nread = read(in->fd, in->partial.data+in->partial.length, num_ready);
+ if (nread <= 0) {
+ /* the connection must be dead */
talloc_free(in);
return;
}
- /* tell the ctdb layer above that we have a packet */
- in->ctdb->upcalls->recv_pkt(in->ctdb, data, num_ready);
+ data = in->partial.data;
+ nread += in->partial.length;
+
+ in->partial.data = NULL;
+ in->partial.length = 0;
+
+ if (nread >= 4 && *(uint32_t *)data == nread) {
+ /* most common case - we got a whole packet in one go
+ tell the ctdb layer above that we have a packet */
+ in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread);
+ return;
+ }
+
+ data_base = data;
+
+ while (nread >= 4 && *(uint32_t *)data <= nread) {
+ /* we have at least one packet */
+ uint8_t *d2;
+ uint32_t len;
+ len = *(uint32_t *)data;
+ d2 = talloc_memdup(in, data, len);
+ if (d2 == NULL) {
+ /* sigh */
+ talloc_free(in);
+ return;
+ }
+ in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len);
+ data += len;
+ nread -= len;
+ return;
+ }
+
+ if (nread < 4 || *(uint32_t *)data > nread) {
+ /* we have only part of a packet */
+ if (data_base == data) {
+ in->partial.data = data;
+ in->partial.length = nread;
+ } else {
+ in->partial.data = talloc_memdup(in, data, nread);
+ if (in->partial.data == NULL) {
+ talloc_free(in);
+ return;
+ }
+ in->partial.length = nread;
+ talloc_free(data_base);
+ }
+ return;
+ }
+
+ talloc_free(data_base);
}
/*