#include "../include/ctdb_private.h"
#define CTDB_PATH "/tmp/ctdb.socket"
+#define CTDB_DS_ALIGNMENT 8
static void ctdb_main_loop(struct ctdb_context *ctdb)
}
-
+/*
+ structure describing a connected client in the daemon
+ */
struct ctdb_client {
struct ctdb_context *ctdb;
- struct fd_event *fde;
int fd;
- struct ctdb_partial partial;
+ struct ctdb_queue *queue;
};
}
-
+/*
+ this is called when the ctdb daemon received a ctdb request call
+ from a local client over the unix domain socket
+ */
static void client_request_call(struct ctdb_client *client, struct ctdb_req_call *c)
{
struct ctdb_call_state *state;
}
-static void ctdb_client_read_cb(uint8_t *data, int cnt, void *args)
+static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
{
struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
struct ctdb_req_header *hdr;
client_incoming_packet(client, data, cnt);
}
-static void ctdb_client_read(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private)
-{
- struct ctdb_client *client = talloc_get_type(private, struct ctdb_client);
-
- ctdb_read_pdu(client->fd, client, &client->partial, ctdb_client_read_cb, client);
-}
-
-
static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
{
client->ctdb = ctdb;
client->fd = fd;
- event_add_fd(ctdb->ev, client, client->fd, EVENT_FD_READ,
- ctdb_client_read, client);
+ client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
+ ctdb_client_read_cb, client);
talloc_set_destructor(client, ctdb_client_destructor);
}
}
-static void ctdb_daemon_read_cb(uint8_t *data, int cnt, void *args)
+static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
{
struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
struct ctdb_req_header *hdr;
}
-
-static void ctdb_daemon_io(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private)
-{
- struct ctdb_context *ctdb = talloc_get_type(private, struct ctdb_context);
-
-
- if (flags&EVENT_FD_READ) {
- ctdb_read_pdu(ctdb->daemon.sd, ctdb, &ctdb->daemon.partial, ctdb_daemon_read_cb, ctdb);
- }
- if (flags&EVENT_FD_WRITE) {
- printf("socket is filled. fix this see tcp_io/ctdb_tcp_node_write how to do this\n");
-/* ctdb_daemon_write(ctdb);*/
- }
-}
-
/*
connect to a unix domain socket
*/
return -1;
}
- ctdb->daemon.fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, EVENT_FD_READ,
- ctdb_daemon_io, ctdb);
+ ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
+ CTDB_DS_ALIGNMENT,
+ ctdb_daemon_read_cb, ctdb);
return 0;
}
}
-#define CTDB_DS_ALIGNMENT 8
static void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len)
{
int size;
*/
int ctdbd_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
{
- uint8_t *data = (uint8_t *)hdr;
- uint32_t length = hdr->length;
- struct ctdbd_queue_packet *pkt;
- uint32_t length2;
-
- /* enforce the length and alignment rules from the tcp packet allocator */
- length2 = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
- *(uint32_t *)data = length2;
-
- if (length2 != length) {
- memset(data+length, 0, length2-length);
- }
-
- /* if the queue is empty then try an immediate write, avoiding
- queue overhead. This relies on non-blocking sockets */
- if (ctdb->daemon.queue == NULL) {
- ssize_t n = write(ctdb->daemon.sd, data, length2);
- if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
- printf("socket to ctdb daemon has died\n");
- return -1;
- }
- if (n > 0) {
- data += n;
- length2 -= n;
- }
- if (length2 == 0) return 0;
- }
-
- pkt = talloc(ctdb, struct ctdbd_queue_packet);
- CTDB_NO_MEMORY(ctdb, pkt);
-
- pkt->data = talloc_memdup(pkt, data, length2);
- CTDB_NO_MEMORY(ctdb, pkt->data);
-
- pkt->length = length2;
-
- if (ctdb->daemon.queue == NULL) {
- EVENT_FD_WRITEABLE(ctdb->daemon.fde);
- }
-
- DLIST_ADD_END(ctdb->daemon.queue, pkt, struct ctdbd_queue_packet *);
-
- return 0;
+ return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
}
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
-#ifndef _CTDB_PARTIAL_H
-#define _CTDB_PARTIAL_H
-
#include "includes.h"
#include "lib/tdb/include/tdb.h"
#include "lib/events/events.h"
#include "../include/ctdb_private.h"
#include "ctdb.h"
+/* structures for packet queueing - see common/ctdb_io.c */
+struct ctdb_partial {
+ uint8_t *data;
+ uint32_t length;
+};
-/* read a record from the file descriptor.
- if the file descriptor has been closed the user specifies ctx will be destryoed.
- */
-void ctdb_read_pdu(int fd, TALLOC_CTX *ctx, struct ctdb_partial *partial, partial_cb_fn_t func, void *args)
+struct ctdb_queue_pkt {
+ struct ctdb_queue_pkt *next, *prev;
+ uint8_t *data;
+ uint32_t length;
+};
+
+struct ctdb_queue {
+ struct ctdb_context *ctdb;
+ struct ctdb_partial partial; /* partial input packet */
+ struct ctdb_queue_pkt *out_queue;
+ struct fd_event *fde;
+ int fd;
+ size_t alignment;
+ void *private;
+ ctdb_queue_cb_fn_t callback;
+};
+
+
+
+/*
+ called when an incoming connection is readable
+*/
+static void queue_io_read(struct ctdb_queue *queue)
{
int num_ready = 0;
ssize_t nread;
uint8_t *data, *data_base;
- if (ioctl(fd, FIONREAD, &num_ready) != 0 ||
+ if (ioctl(queue->fd, FIONREAD, &num_ready) != 0 ||
num_ready == 0) {
/* the descriptor has been closed */
- func(NULL, 0, args);
- return;
+ goto failed;
}
- partial->data = talloc_realloc_size(ctx, partial->data,
- num_ready + partial->length);
+ queue->partial.data = talloc_realloc_size(queue, queue->partial.data,
+ num_ready + queue->partial.length);
- if (partial->data == NULL) {
- func(NULL, 0, args);
- return;
+ if (queue->partial.data == NULL) {
+ goto failed;
}
- nread = read(fd, partial->data+partial->length, num_ready);
+ nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready);
if (nread <= 0) {
- func(NULL, 0, args);
- return;
+ goto failed;
}
- data = partial->data;
- nread += partial->length;
+ data = queue->partial.data;
+ nread += queue->partial.length;
- partial->data = NULL;
- partial->length = 0;
+ queue->partial.data = NULL;
+ queue->partial.length = 0;
if (nread >= 4 && *(uint32_t *)data == nread) {
- /* it is the responsibility of the incoming packet function to free 'data' */
- func(data, nread, args);
+ /* it is the responsibility of the incoming packet
+ function to free 'data' */
+ queue->callback(data, nread, queue->private);
return;
}
uint8_t *d2;
uint32_t len;
len = *(uint32_t *)data;
- d2 = talloc_memdup(ctx, data, len);
+ d2 = talloc_memdup(queue, data, len);
if (d2 == NULL) {
/* sigh */
- func(NULL, 0, args);
- return;
+ goto failed;
}
- func(d2, len, args);
+ queue->callback(d2, len, queue->private);
data += len;
nread -= len;
}
if (nread > 0) {
/* we have only part of a packet */
if (data_base == data) {
- partial->data = data;
- partial->length = nread;
+ queue->partial.data = data;
+ queue->partial.length = nread;
} else {
- partial->data = talloc_memdup(ctx, data, nread);
- if (partial->data == NULL) {
- func(NULL, 0, args);
- return;
+ queue->partial.data = talloc_memdup(queue, data, nread);
+ if (queue->partial.data == NULL) {
+ goto failed;
}
- partial->length = nread;
+ queue->partial.length = nread;
talloc_free(data_base);
}
return;
}
talloc_free(data_base);
+ return;
+
+failed:
+ queue->callback(NULL, 0, queue->private);
+}
+
+
+/* used when an event triggers a dead queue */
+static void queue_dead(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private)
+{
+ struct ctdb_queue *queue = talloc_get_type(private, struct ctdb_queue);
+ queue->callback(NULL, 0, queue->private);
}
-#endif
+
+/*
+ called when an incoming connection is writeable
+*/
+static void queue_io_write(struct ctdb_queue *queue)
+{
+ while (queue->out_queue) {
+ struct ctdb_queue_pkt *pkt = queue->out_queue;
+ ssize_t n;
+
+ n = write(queue->fd, pkt->data, pkt->length);
+
+ if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ event_add_timed(queue->ctdb->ev, queue, timeval_zero(),
+ queue_dead, queue);
+ EVENT_FD_NOT_WRITEABLE(queue->fde);
+ return;
+ }
+ if (n <= 0) return;
+
+ if (n != pkt->length) {
+ pkt->length -= n;
+ pkt->data += n;
+ return;
+ }
+
+ DLIST_REMOVE(queue->out_queue, pkt);
+ talloc_free(pkt);
+ }
+
+ EVENT_FD_NOT_WRITEABLE(queue->fde);
+}
+
+/*
+ called when an incoming connection is readable or writeable
+*/
+static void queue_io_handler(struct event_context *ev, struct fd_event *fde,
+ uint16_t flags, void *private)
+{
+ struct ctdb_queue *queue = talloc_get_type(private, struct ctdb_queue);
+
+ if (flags & EVENT_FD_READ) {
+ queue_io_read(queue);
+ } else {
+ queue_io_write(queue);
+ }
+}
+
+
+/*
+ queue a packet for sending
+*/
+int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
+{
+ struct ctdb_queue_pkt *pkt;
+ uint32_t length2;
+
+ /* enforce the length and alignment rules from the tcp packet allocator */
+ length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
+ *(uint32_t *)data = length2;
+
+ if (length2 != length) {
+ memset(data+length, 0, length2-length);
+ }
+
+ /* if the queue is empty then try an immediate write, avoiding
+ queue overhead. This relies on non-blocking sockets */
+ if (queue->out_queue == NULL && queue->fd != -1) {
+ ssize_t n = write(queue->fd, data, length2);
+ if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ event_add_timed(queue->ctdb->ev, queue, timeval_zero(),
+ queue_dead, queue);
+ /* yes, we report success, as the dead node is
+ handled via a separate event */
+ return 0;
+ }
+ if (n > 0) {
+ data += n;
+ length2 -= n;
+ }
+ if (length2 == 0) return 0;
+ }
+
+ pkt = talloc(queue, struct ctdb_queue_pkt);
+ CTDB_NO_MEMORY(queue->ctdb, pkt);
+
+ pkt->data = talloc_memdup(pkt, data, length2);
+ CTDB_NO_MEMORY(queue->ctdb, pkt->data);
+
+ pkt->length = length2;
+
+ if (queue->out_queue == NULL && queue->fd != -1) {
+ EVENT_FD_WRITEABLE(queue->fde);
+ }
+
+ DLIST_ADD_END(queue->out_queue, pkt, struct ctdb_queue_pkt *);
+
+ return 0;
+}
+
+
+/*
+ setup the fd used by the queue
+ */
+int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
+{
+ queue->fd = fd;
+ talloc_free(queue->fde);
+ queue->fde = NULL;
+
+ if (fd != -1) {
+ queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ,
+ queue_io_handler, queue);
+ if (queue->fde == NULL) {
+ return -1;
+ }
+
+ if (queue->out_queue) {
+ EVENT_FD_WRITEABLE(queue->fde);
+ }
+ }
+
+ return 0;
+}
+
+
+
+/*
+ setup a packet queue on a socket
+ */
+struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
+ TALLOC_CTX *mem_ctx, int fd, int alignment,
+
+ ctdb_queue_cb_fn_t callback,
+ void *private)
+{
+ struct ctdb_queue *queue;
+
+ queue = talloc_zero(mem_ctx, struct ctdb_queue);
+ CTDB_NO_MEMORY_NULL(ctdb, queue);
+
+ queue->ctdb = ctdb;
+ queue->fd = fd;
+ queue->alignment = alignment;
+ queue->private = private;
+ queue->callback = callback;
+ if (fd != -1) {
+ if (ctdb_queue_set_fd(queue, fd) != 0) {
+ talloc_free(queue);
+ return NULL;
+ }
+ }
+
+ return queue;
+}
+
+
int ctdb_record_store(struct ctdb_record_handle *rec, TDB_DATA data);
-struct ctdb_partial {
- uint8_t *data;
- uint32_t length;
-};
-/* callback is called with data==NULL for fauilures. the callback must test for this and do cleanup appropriately */
-typedef void (*partial_cb_fn_t)(uint8_t *data, int cnt, void *args);
-void ctdb_read_pdu(int fd, TALLOC_CTX *ctx, struct ctdb_partial *partial, partial_cb_fn_t func, void *args);
-
#endif
int port;
};
+
+/* called from the queue code when a packet comes in. Called with data==NULL
+ on error */
+typedef void (*ctdb_queue_cb_fn_t)(uint8_t *data, size_t length, void *private);
+
+
/*
state associated with one node
*/
struct ctdb_daemon_data {
int sd;
char *name;
- struct ctdbd_queue_packet *queue;
- struct fd_event *fde;
- struct ctdb_partial partial;
+ struct ctdb_queue *queue;
};
/* main state of the ctdb daemon */
struct ctdb_call_state *ctdbd_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call);
int ctdbd_call_recv(struct ctdb_call_state *state, struct ctdb_call *call);
+/*
+ queue a packet for sending
+*/
+int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length);
+
+/*
+ setup the fd used by the queue
+ */
+int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd);
+
+/*
+ setup a packet queue on a socket
+ */
+struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
+ TALLOC_CTX *mem_ctx, int fd, int alignment,
+
+ ctdb_queue_cb_fn_t callback,
+ void *private);
+
+
#endif
struct ctdb_incoming {
struct ctdb_context *ctdb;
int fd;
- struct ctdb_partial partial;
-};
-
-/*
- outgoing packet structure - only allocated when we can't write immediately
- to the socket
-*/
-struct ctdb_tcp_packet {
- struct ctdb_tcp_packet *next, *prev;
- uint8_t *data;
- uint32_t length;
+ struct ctdb_queue *queue;
};
/*
*/
struct ctdb_tcp_node {
int fd;
- struct fd_event *fde;
- struct ctdb_tcp_packet *queue;
+ struct ctdb_queue *queue;
};
/* prototypes internal to tcp transport */
-void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private);
-void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private);
int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length);
int ctdb_tcp_listen(struct ctdb_context *ctdb);
void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private);
+void ctdb_tcp_read_cb(uint8_t *data, size_t cnt, void *args);
#define CTDB_TCP_ALIGNMENT 8
}
+/*
+ called when a complete packet has come in - should not happen on this socket
+ */
+void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private)
+{
+ struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
+ struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
+ struct ctdb_tcp_node);
+
+ /* start a new connect cycle to try to re-establish the
+ link */
+ close(tnode->fd);
+ ctdb_queue_set_fd(tnode->queue, -1);
+ tnode->fd = -1;
+ event_add_timed(node->ctdb->ev, node, timeval_zero(),
+ ctdb_tcp_node_connect, node);
+}
+
/*
called when socket becomes writeable on connect
*/
}
talloc_free(fde);
- tnode->fde = event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_READ,
- ctdb_tcp_node_write, node);
+
+ setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one));
+
+ tnode->queue = ctdb_queue_setup(node->ctdb, node, tnode->fd, CTDB_TCP_ALIGNMENT,
+ ctdb_tcp_tnode_cb, node);
/* tell the ctdb layer we are connected */
node->ctdb->upcalls->node_connected(node);
-
- setsockopt(tnode->fd,IPPROTO_TCP,TCP_NODELAY,(char *)&one,sizeof(one));
-
- if (tnode->queue) {
- EVENT_FD_WRITEABLE(tnode->fde);
- }
}
set_nonblocking(in->fd);
- event_add_fd(ctdb->ev, in, in->fd, EVENT_FD_READ,
- ctdb_tcp_incoming_read, in);
+ in->queue = ctdb_queue_setup(ctdb, in, in->fd, CTDB_TCP_ALIGNMENT,
+ ctdb_tcp_read_cb, in);
talloc_set_destructor(in, ctdb_incoming_destructor);
}
/*
- called when we fail to send a message to a node
-*/
-static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te,
- struct timeval t, void *private)
+ called when a complete packet has come in
+ */
+void ctdb_tcp_read_cb(uint8_t *data, size_t cnt, void *args)
{
- struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
- struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
- struct ctdb_tcp_node);
-
- /* start a new connect cycle to try to re-establish the
- link */
- talloc_free(tnode->fde);
- close(tnode->fd);
- tnode->fd = -1;
- event_add_timed(node->ctdb->ev, node, timeval_zero(),
- ctdb_tcp_node_connect, node);
-}
+ struct ctdb_incoming *in = talloc_get_type(args, struct ctdb_incoming);
+ struct ctdb_req_header *hdr;
-/*
- called when socket becomes readable
-*/
-void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private)
-{
- struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
- struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
- struct ctdb_tcp_node);
- if (flags & EVENT_FD_READ) {
- /* getting a read event on this fd in the current tcp model is
- always an error, as we have separate read and write
- sockets. In future we may combine them, but for now it must
- mean that the socket is dead, so we try to reconnect */
- node->ctdb->upcalls->node_dead(node);
- talloc_free(tnode->fde);
- close(tnode->fd);
- tnode->fd = -1;
- event_add_timed(node->ctdb->ev, node, timeval_zero(),
- ctdb_tcp_node_connect, node);
+ if (data == NULL) {
+ /* incoming socket has died */
+ talloc_free(in);
return;
}
- while (tnode->queue) {
- struct ctdb_tcp_packet *pkt = tnode->queue;
- ssize_t n;
-
- n = write(tnode->fd, pkt->data, pkt->length);
-
- if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
- event_add_timed(node->ctdb->ev, node, timeval_zero(),
- ctdb_tcp_node_dead, node);
- EVENT_FD_NOT_WRITEABLE(tnode->fde);
- return;
- }
- if (n <= 0) return;
-
- if (n != pkt->length) {
- pkt->length -= n;
- pkt->data += n;
- return;
- }
-
- DLIST_REMOVE(tnode->queue, pkt);
- talloc_free(pkt);
- }
-
- EVENT_FD_NOT_WRITEABLE(tnode->fde);
-}
-
-
-
-static void tcp_read_cb(uint8_t *data, int cnt, void *args)
-{
- struct ctdb_incoming *in = talloc_get_type(args, struct ctdb_incoming);
- struct ctdb_req_header *hdr;
-
if (cnt < sizeof(*hdr)) {
ctdb_set_error(in->ctdb, "Bad packet length %d\n", cnt);
return;
in->ctdb->upcalls->recv_pkt(in->ctdb, data, cnt);
}
-/*
- called when an incoming connection is readable
-*/
-void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private)
-{
- struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
-
- ctdb_read_pdu(in->fd, in, &in->partial, tcp_read_cb, in);
-}
-
/*
queue a packet for sending
*/
int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
{
- struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
+ struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
struct ctdb_tcp_node);
- struct ctdb_tcp_packet *pkt;
- uint32_t length2;
-
- /* enforce the length and alignment rules from the tcp packet allocator */
- length2 = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
- *(uint32_t *)data = length2;
-
- if (length2 != length) {
- memset(data+length, 0, length2-length);
- }
-
- /* if the queue is empty then try an immediate write, avoiding
- queue overhead. This relies on non-blocking sockets */
- if (tnode->queue == NULL && tnode->fd != -1) {
- ssize_t n = write(tnode->fd, data, length2);
- if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
- event_add_timed(node->ctdb->ev, node, timeval_zero(),
- ctdb_tcp_node_dead, node);
- /* yes, we report success, as the dead node is
- handled via a separate event */
- return 0;
- }
- if (n > 0) {
- data += n;
- length2 -= n;
- }
- if (length2 == 0) return 0;
- }
-
- pkt = talloc(tnode, struct ctdb_tcp_packet);
- CTDB_NO_MEMORY(node->ctdb, pkt);
-
- pkt->data = talloc_memdup(pkt, data, length2);
- CTDB_NO_MEMORY(node->ctdb, pkt->data);
-
- pkt->length = length2;
-
- if (tnode->queue == NULL && tnode->fd != -1) {
- EVENT_FD_WRITEABLE(tnode->fde);
- }
-
- DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);
-
- return 0;
+ return ctdb_queue_send(tnode->queue, data, length);
}