/*
- make a remote ctdb call - async send
+ make a remote ctdb call - async send. Called in daemon context.
This constructs a ctdb_call request and queues it for processing.
This call never blocks.
*/
-struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
+static struct ctdb_call_state *ctdb_daemon_call_send(struct ctdb_db_context *ctdb_db,
+ struct ctdb_call *call)
{
uint32_t len;
struct ctdb_call_state *state;
TDB_DATA data;
struct ctdb_context *ctdb = ctdb_db->ctdb;
- if (ctdb_db->ctdb->flags&CTDB_FLAG_DAEMON_MODE) {
- return ctdbd_call_send(ctdb_db, call);
- }
-
/*
if we are the dmaster for this key then we don't need to
send it off at all, we can bypass the network and handle it
return state;
}
+/*
+ make a remote ctdb call - async send
+
+ This constructs a ctdb_call request and queues it for processing.
+ This call never blocks.
+*/
+struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
+{
+ if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) {
+ return ctdb_client_call_send(ctdb_db, call);
+ } else {
+ return ctdb_daemon_call_send(ctdb_db, call);
+ }
+}
/*
- make a remote ctdb call - async recv.
+ make a remote ctdb call - async recv - called in daemon context
This is called when the program wants to wait for a ctdb_call to complete and get the
results. This call will block unless the call has already completed.
*/
-int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
+static int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
{
struct ctdb_record_handle *rec;
- if (state->ctdb_db->ctdb->flags&CTDB_FLAG_DAEMON_MODE) {
- return ctdbd_call_recv(state, call);
- }
-
while (state->state < CTDB_CALL_DONE) {
event_loop_once(state->node->ctdb->ev);
}
return 0;
}
+
+/*
+ make a remote ctdb call - async recv.
+
+ This is called when the program wants to wait for a ctdb_call to complete and get the
+ results. This call will block unless the call has already completed.
+*/
+int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
+{
+ if (state->ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) {
+ return ctdb_client_call_recv(state, call);
+ } else {
+ return ctdb_daemon_call_recv(state, call);
+ }
+}
+
/*
full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
*/
#include "../include/ctdb.h"
#include "../include/ctdb_private.h"
-#define CTDB_PATH "/tmp/ctdb.socket"
-#define CTDB_DS_ALIGNMENT 8
-
-
static void ctdb_main_loop(struct ctdb_context *ctdb)
{
ctdb->methods->start(ctdb);
this is called when the ctdb daemon received a ctdb request call
from a local client over the unix domain socket
*/
-static void client_request_call(struct ctdb_client *client, struct ctdb_req_call *c)
+static void daemon_request_call_from_client(struct ctdb_client *client,
+ struct ctdb_req_call *c)
{
struct ctdb_call_state *state;
struct ctdb_db_context *ctdb_db;
struct ctdb_call call;
- struct ctdb_reply_call r;
+ struct ctdb_reply_call *r;
int res;
+ uint32_t length;
for (ctdb_db=client->ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
if (ctdb_db->db_id == c->db_id) {
return;
}
-
-
ZERO_STRUCT(call);
call.call_id = c->callid;
call.key.dptr = c->data;
exit(1);
}
- ZERO_STRUCT(r);
-#if 0
- r.status =
-#endif
- r.datalen = call.reply_data.dsize;
-
- r.hdr.length = offsetof(struct ctdb_reply_call, data) + r.datalen;
- r.hdr.ctdb_magic = c->hdr.ctdb_magic;
- r.hdr.ctdb_version = c->hdr.ctdb_version;
- r.hdr.operation = CTDB_REPLY_CALL;
-#if 0
- r.hdr.destnode =
- r.hdr.srcnode =
-#endif
- r.hdr.reqid = c->hdr.reqid;
-
-
-/*XXX need to handle the case of partial writes logic for partial writes in tcp/ctdb_tcp_node_write */
- res = write(client->fd, &r, offsetof(struct ctdb_reply_call, data));
- if (r.datalen) {
- res = write(client->fd, call.reply_data.dptr, r.datalen);
+ length = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize;
+ r = ctdbd_allocate_pkt(client->ctdb, length);
+ if (r == NULL) {
+ printf("Failed to allocate reply_call in ctdb daemon\n");
+ return;
+ }
+ ZERO_STRUCT(*r);
+ r->hdr.length = length;
+ r->hdr.ctdb_magic = CTDB_MAGIC;
+ r->hdr.ctdb_version = CTDB_VERSION;
+ r->hdr.operation = CTDB_REPLY_CALL;
+ r->hdr.reqid = c->hdr.reqid;
+ r->datalen = call.reply_data.dsize;
+ memcpy(&r->data[0], call.reply_data.dptr, r->datalen);
+
+ res = ctdb_queue_send(client->queue, (uint8_t *)&r, r->hdr.length);
+ if (res != 0) {
+ printf("Failed to queue packet from daemon to client\n");
}
+ talloc_free(r);
}
switch (hdr->operation) {
case CTDB_REQ_CALL:
- client_request_call(client, (struct ctdb_req_call *)hdr);
+ daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
break;
}
return 0;
}
-static char *domain_socket_name=NULL;
-static void unlink_domain_socket(void)
+/*
+ delete the socket on exit - called on destruction of autofree context
+ */
+static int unlink_destructor(const char *name)
{
- if (domain_socket_name) {
- unlink(domain_socket_name);
- }
+ unlink(name);
+ return 0;
}
/*
static int fd[2];
int res;
struct fd_event *fde;
+ const char *domain_socket_name;
/* generate a name to use for our local socket */
ctdb->daemon.name = talloc_asprintf(ctdb, "%s.%s", CTDB_PATH, ctdb->address.address);
/* get rid of any old sockets */
unlink(ctdb->daemon.name);
- domain_socket_name = ctdb->daemon.name;
- atexit(unlink_domain_socket);
-
/* create a unix domain stream socket to listen to */
res = ux_socket_bind(ctdb);
if (res!=0) {
return 0;
}
+ /* ensure the socket is deleted on exit of the daemon */
+ domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
+ talloc_set_destructor(domain_socket_name, unlink_destructor);
close(fd[1]);
ctdb_clear_flags(ctdb, CTDB_FLAG_DAEMON_MODE);
return 0;
}
-
-static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
-{
- struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
- struct ctdb_req_header *hdr;
-
- if (cnt < sizeof(*hdr)) {
- ctdb_set_error(ctdb, "Bad packet length %d\n", cnt);
- return;
- }
- hdr = (struct ctdb_req_header *)data;
- if (cnt != hdr->length) {
- ctdb_set_error(ctdb, "Bad header length %d expected %d\n",
- hdr->length, cnt);
- return;
- }
-
- if (hdr->ctdb_magic != CTDB_MAGIC) {
- ctdb_set_error(ctdb, "Non CTDB packet rejected\n");
- return;
- }
-
- if (hdr->ctdb_version != CTDB_VERSION) {
- ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
- return;
- }
-
- ctdb_reply_call(ctdb, hdr);
-}
-
-
/*
- connect to a unix domain socket
-*/
-static int ux_socket_connect(struct ctdb_context *ctdb)
-{
- struct sockaddr_un addr;
-
- memset(&addr, 0, sizeof(addr));
- addr.sun_family = AF_UNIX;
- strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
-
- ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
- if (ctdb->daemon.sd == -1) {
- return -1;
- }
-
- if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
- close(ctdb->daemon.sd);
- ctdb->daemon.sd = -1;
- return -1;
- }
-
- ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
- CTDB_DS_ALIGNMENT,
- ctdb_daemon_read_cb, ctdb);
- return 0;
-}
-
-
-
-
-static int ctdb_ltdb_lock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
-{
- return tdb_chainlock(ctdb_db->ltdb->tdb, key);
-}
-
-static int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
-{
- return tdb_chainunlock(ctdb_db->ltdb->tdb, key);
-}
-
-
-static void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len)
+ allocate a packet for use in client<->daemon communication
+ */
+void *ctdbd_allocate_pkt(struct ctdb_context *ctdb, size_t len)
{
int size;
return talloc_size(ctdb, size);
}
-
-/*
- queue a packet for sending
-*/
-static int ctdbd_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
-{
- return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
-}
-
-
-/*
- destroy a ctdb_call
-*/
-static int ctdbd_call_destructor(struct ctdb_call_state *state)
-{
- idr_remove(state->node->ctdb->idr, state->c->hdr.reqid);
- return 0;
-}
-
-/*
- make a recv call to the local ctdb daemon
-
- This is called when the program wants to wait for a ctdb_call to complete and get the
- results. This call will block unless the call has already completed.
-*/
-int ctdbd_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
-{
- struct ctdb_record_handle *rec;
-
- while (state->state < CTDB_CALL_DONE) {
- event_loop_once(state->node->ctdb->ev);
- }
- if (state->state != CTDB_CALL_DONE) {
- ctdb_set_error(state->node->ctdb, "%s", state->errmsg);
- talloc_free(state);
- return -1;
- }
-
- rec = state->fetch_private;
-
- /* ugly hack to manage forced migration */
- if (rec != NULL) {
- rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr);
- rec->data->dsize = state->call.reply_data.dsize;
- talloc_free(state);
- return 0;
- }
-
- if (state->call.reply_data.dsize) {
- call->reply_data.dptr = talloc_memdup(state->node->ctdb,
- state->call.reply_data.dptr,
- state->call.reply_data.dsize);
- call->reply_data.dsize = state->call.reply_data.dsize;
- } else {
- call->reply_data.dptr = NULL;
- call->reply_data.dsize = 0;
- }
- call->status = state->call.status;
- talloc_free(state);
-
- return 0;
-}
-
-/*
- make a ctdb call to the local daemon - async send
-
- This constructs a ctdb_call request and queues it for processing.
- This call never blocks.
-*/
-struct ctdb_call_state *ctdbd_call_send(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
-{
- struct ctdb_call_state *state;
- struct ctdb_context *ctdb = ctdb_db->ctdb;
- struct ctdb_ltdb_header header;
- TDB_DATA data;
- int ret;
- size_t len;
-
- /* if the domain socket is not yet open, open it */
- if (ctdb->daemon.sd==-1) {
- ux_socket_connect(ctdb);
- }
-
- ret = ctdb_ltdb_lock(ctdb_db, call->key);
- if (ret != 0) {
- printf("failed to lock ltdb record\n");
- return NULL;
- }
-
- ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
- if (ret != 0) {
- ctdb_ltdb_unlock(ctdb_db, call->key);
- return NULL;
- }
-
-#if 0
- if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
- state = ctdb_call_local_send(ctdb_db, call, &header, &data);
- ctdb_ltdb_unlock(ctdb_db, call->key);
- return state;
- }
-#endif
-
- state = talloc_zero(ctdb_db, struct ctdb_call_state);
- if (state == NULL) {
- printf("failed to allocate state\n");
- ctdb_ltdb_unlock(ctdb_db, call->key);
- return NULL;
- }
-
- talloc_steal(state, data.dptr);
-
- len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
- state->c = ctdbd_allocate_pkt(ctdb, len);
- if (state->c == NULL) {
- printf("failed to allocate packet\n");
- ctdb_ltdb_unlock(ctdb_db, call->key);
- return NULL;
- }
- talloc_set_name_const(state->c, "ctdbd req_call packet");
- talloc_steal(state, state->c);
-
- state->c->hdr.length = len;
- state->c->hdr.ctdb_magic = CTDB_MAGIC;
- state->c->hdr.ctdb_version = CTDB_VERSION;
- state->c->hdr.operation = CTDB_REQ_CALL;
- state->c->hdr.destnode = header.dmaster;
- state->c->hdr.srcnode = ctdb->vnn;
- /* this limits us to 16k outstanding messages - not unreasonable */
- state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
- state->c->flags = call->flags;
- state->c->db_id = ctdb_db->db_id;
- state->c->callid = call->call_id;
- state->c->keylen = call->key.dsize;
- state->c->calldatalen = call->call_data.dsize;
- memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
- memcpy(&state->c->data[call->key.dsize],
- call->call_data.dptr, call->call_data.dsize);
- state->call = *call;
- state->call.call_data.dptr = &state->c->data[call->key.dsize];
- state->call.key.dptr = &state->c->data[0];
-
- state->node = ctdb->nodes[header.dmaster];
- state->state = CTDB_CALL_WAIT;
- state->header = header;
- state->ctdb_db = ctdb_db;
-
- talloc_set_destructor(state, ctdbd_call_destructor);
-
- ctdbd_queue_pkt(ctdb, &state->c->hdr);
-
-/*XXX set up timeout to cleanup if server doesnt respond
- event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0),
- ctdb_call_timeout, state);
-*/
-
- ctdb_ltdb_unlock(ctdb_db, call->key);
- return state;
-}
-
-
-
const char *transport = "tcp";
const char *myaddress = NULL;
int self_connect=0;
+ int daemon_mode=0;
struct poptOption popt_options[] = {
POPT_AUTOHELP
{ "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" },
{ "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL },
{ "self-connect", 0, POPT_ARG_NONE, &self_connect, 0, "enable self connect", "boolean" },
+ { "daemon", 0, POPT_ARG_NONE, &daemon_mode, 0, "spawn a ctdb daemon", "boolean" },
{ "timelimit", 't', POPT_ARG_INT, &timelimit, 0, "timelimit", "integer" },
{ "num-records", 'r', POPT_ARG_INT, &num_records, 0, "num_records", "integer" },
{ "num-msgs", 'n', POPT_ARG_INT, &num_msgs, 0, "num_msgs", "integer" },
if (self_connect) {
ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT);
}
+ if (daemon_mode) {
+ ctdb_set_flags(ctdb, CTDB_FLAG_DAEMON_MODE);
+ }
ret = ctdb_set_transport(ctdb, transport);
if (ret == -1) {
ret = ctdb_set_call(ctdb_db, fetch_func, FUNC_FETCH);
- ctdb_set_message_handler(ctdb, message_handler, &msg_count);
-
/* start the protocol running */
ret = ctdb_start(ctdb);
+ ctdb_set_message_handler(ctdb, message_handler, 0, &msg_count);
+
/* wait until all nodes are connected (should not be needed
outside of test code) */
ctdb_connect_wait(ctdb);