void *private_data)
{
- struct ctdb_req_register *c;
int res;
-
- /* if the domain socket is not yet open, open it */
- if (ctdb->daemon.sd==-1) {
- ctdb_socket_connect(ctdb);
+ int32_t status;
+
+ res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0,
+ tdb_null, NULL, NULL, &status, NULL);
+ if (res != 0 || status != 0) {
+ DEBUG(0,("Failed to register srvid %llu\n", (unsigned long long)srvid));
+ return -1;
}
- c = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_REGISTER, sizeof(*c),
- struct ctdb_req_register);
- CTDB_NO_MEMORY(ctdb, c);
- c->srvid = srvid;
+ /* also need to register the handler with our own ctdb structure */
+ return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
+}
- res = ctdb_client_queue_pkt(ctdb, &c->hdr);
- talloc_free(c);
- if (res != 0) {
- return res;
+/*
+ tell the daemon we no longer want a srvid
+*/
+int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
+{
+ int res;
+ int32_t status;
+
+ res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0,
+ tdb_null, NULL, NULL, &status, NULL);
+ if (res != 0 || status != 0) {
+ DEBUG(0,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
+ return -1;
}
- /* also need to register the handler with our ctdb structure */
- return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
+ /* also need to register the handler with our own ctdb structure */
+ ctdb_deregister_message_handler(ctdb, srvid, private_data);
+ return 0;
}
c->hdr.destnode = destnode;
c->hdr.reqid = state->reqid;
c->opcode = opcode;
+ c->client_id = 0;
c->flags = flags;
c->srvid = srvid;
c->datalen = data.dsize;
event_loop_once(ctdb_db->ctdb->ev);
}
+ ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
+ if (ret != 0) {
+ DEBUG(0,("Failed to remove list keys handler\n"));
+ return -1;
+ }
+
return state.count;
}
process a control request
*/
static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
- uint32_t opcode, TDB_DATA indata,
+ uint32_t opcode,
+ uint64_t srvid, uint32_t client_id,
+ TDB_DATA indata,
TDB_DATA *outdata, uint32_t srcnode)
{
switch (opcode) {
case CTDB_CONTROL_TRAVERSE_DATA:
return ctdb_control_traverse_data(ctdb, indata, outdata);
+ case CTDB_CONTROL_REGISTER_SRVID:
+ return daemon_register_message_handler(ctdb, client_id, srvid);
+
+ case CTDB_CONTROL_DEREGISTER_SRVID:
+ return daemon_deregister_message_handler(ctdb, client_id, srvid);
+
default:
DEBUG(0,(__location__ " Unknown CTDB control opcode %u\n", opcode));
return -1;
data.dsize = c->datalen;
outdata = talloc_zero(c, TDB_DATA);
- status = ctdb_control_dispatch(ctdb, c->opcode, data, outdata, hdr->srcnode);
+ status = ctdb_control_dispatch(ctdb, c->opcode, c->srvid, c->client_id,
+ data, outdata, hdr->srcnode);
/* some controls send no reply */
if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
send a control message to a node
*/
int ctdb_daemon_send_control(struct ctdb_context *ctdb, uint32_t destnode,
- uint64_t srvid, uint32_t opcode, uint32_t flags,
+ uint64_t srvid, uint32_t opcode, uint32_t client_id,
+ uint32_t flags,
TDB_DATA data,
ctdb_control_callback_fn_t callback,
void *private_data)
c->hdr.destnode = destnode;
c->hdr.reqid = state->reqid;
c->opcode = opcode;
+ c->client_id = client_id;
c->flags = flags;
c->srvid = srvid;
c->datalen = data.dsize;
struct ctdb_context *ctdb;
int fd;
struct ctdb_queue *queue;
+ uint32_t client_id;
};
-
static void daemon_incoming_packet(void *, uint8_t *, uint32_t );
static void ctdb_main_loop(struct ctdb_context *ctdb)
this is called when the ctdb daemon received a ctdb request to
set the srvid from the client
*/
-static void daemon_request_register_message_handler(struct ctdb_client *client,
- struct ctdb_req_register *c)
+int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
{
+ struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
int res;
- res = ctdb_register_message_handler(client->ctdb, client,
- c->srvid, daemon_message_handler,
- client);
+ if (client == NULL) {
+ DEBUG(0,("Bad client_id in daemon_request_register_message_handler\n"));
+ return -1;
+ }
+ res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
if (res != 0) {
- DEBUG(0,(__location__ " Failed to register handler %llu in daemon\n",
- c->srvid));
+ DEBUG(0,(__location__ " Failed to register handler %llu in daemon\n", srvid));
} else {
- DEBUG(2,(__location__ " Registered message handler for srvid=%llu\n",
- c->srvid));
+ DEBUG(2,(__location__ " Registered message handler for srvid=%llu\n", srvid));
}
+ return res;
+}
+
+/*
+ this is called when the ctdb daemon received a ctdb request to
+ remove a srvid from the client
+ */
+int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
+{
+ struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
+ if (client == NULL) {
+ DEBUG(0,("Bad client_id in daemon_request_deregister_message_handler\n"));
+ return -1;
+ }
+ return ctdb_deregister_message_handler(ctdb, srvid, client);
}
*/
static int ctdb_client_destructor(struct ctdb_client *client)
{
+ ctdb_reqid_remove(client->ctdb, client->client_id);
client->ctdb->num_clients--;
close(client->fd);
client->fd = -1;
daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
break;
- case CTDB_REQ_REGISTER:
- ctdb->status.client.req_register++;
- daemon_request_register_message_handler(client,
- (struct ctdb_req_register *)hdr);
- break;
-
case CTDB_REQ_MESSAGE:
ctdb->status.client.req_message++;
daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
client = talloc_zero(ctdb, struct ctdb_client);
client->ctdb = ctdb;
client->fd = fd;
+ client->client_id = ctdb_reqid_new(ctdb, client);
ctdb->num_clients++;
client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
data.dptr = &c->data[0];
data.dsize = c->datalen;
res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
- c->srvid, c->opcode, c->flags,
+ c->srvid, c->opcode, client->client_id,
+ c->flags,
data, daemon_control_callback,
state);
if (res != 0) {
/* tell all the other nodes about this database */
ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNN, 0,
- CTDB_CONTROL_DB_ATTACH, CTDB_CTRL_FLAG_NOREPLY,
+ CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
indata, NULL, NULL);
DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
return 0;
}
-
/*
called when a CTDB_REQ_MESSAGE packet comes in
*/
return 0;
}
+
+
+/*
+ setup handler for receipt of ctdb messages from ctdb_send_message()
+*/
+int ctdb_deregister_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
+{
+ struct ctdb_message_list *m;
+
+ for (m=ctdb->message_list;m;m=m->next) {
+ if (m->srvid == srvid && m->message_private == private_data) {
+ talloc_free(m);
+ return 0;
+ }
+ }
+ return -1;
+}
/* tell all the nodes in the cluster to start sending records to this node */
ret = ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNN, 0, CTDB_CONTROL_TRAVERSE_ALL,
- CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+ 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
if (ret != 0) {
talloc_free(state);
return NULL;
data.dsize = d->length;
ret = ctdb_daemon_send_control(state->ctdb, state->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
- CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+ 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
if (ret != 0) {
DEBUG(0,("Failed to send traverse data\n"));
}
return fd;
}
-void register_pid_with_daemon(int fd, int pid)
-{
- struct ctdb_req_register r;
-
- bzero(&r, sizeof(r));
- r.hdr.length = sizeof(r);
- r.hdr.ctdb_magic = CTDB_MAGIC;
- r.hdr.ctdb_version = CTDB_VERSION;
- r.hdr.generation = 1;
- r.hdr.operation = CTDB_REQ_REGISTER;
- r.srvid = pid;
-
- /* XXX must deal with partial writes here */
- write(fd, &r, sizeof(r));
-}
-
/* send a command to the cluster to wait until all nodes are connected
and the cluster is fully operational
*/
}
- /* register our local server id with the daemon so that it knows
- where to send messages addressed to our local pid.
- */
- pid=getpid();
- register_pid_with_daemon(fd, pid);
-
-
/* do a connect wait to ensure that all nodes in the cluster are up
and operational.
this also tells us the vnn of the local cluster.
uint32_t req_call;
uint32_t req_message;
uint32_t req_finished;
- uint32_t req_register;
uint32_t req_connect_wait;
uint32_t req_shutdown;
uint32_t req_control;
CTDB_CONTROL_TRAVERSE_START,
CTDB_CONTROL_TRAVERSE_ALL,
CTDB_CONTROL_TRAVERSE_DATA,
+ CTDB_CONTROL_REGISTER_SRVID,
+ CTDB_CONTROL_DEREGISTER_SRVID,
};
CTDB_REPLY_CONTROL,
/* only used on the domain socket */
- CTDB_REQ_REGISTER = 1000,
- CTDB_REQ_CONNECT_WAIT,
+ CTDB_REQ_CONNECT_WAIT = 1000,
CTDB_REPLY_CONNECT_WAIT,
CTDB_REQ_SHUTDOWN
};
uint8_t data[1];
};
-struct ctdb_req_register {
- struct ctdb_req_header hdr;
- uint64_t srvid;
-};
-
struct ctdb_req_message {
struct ctdb_req_header hdr;
uint64_t srvid;
struct ctdb_req_header hdr;
uint32_t opcode;
uint64_t srvid;
+ uint32_t client_id;
#define CTDB_CTRL_FLAG_NOREPLY 1
uint32_t flags;
uint32_t datalen;
void ctdb_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
int ctdb_daemon_send_control(struct ctdb_context *ctdb, uint32_t destnode,
- uint64_t srvid, uint32_t opcode, uint32_t flags,
+ uint64_t srvid, uint32_t opcode, uint32_t client_id, uint32_t flags,
TDB_DATA data,
ctdb_control_callback_fn_t callback,
void *private_data);
int ctdb_dispatch_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data);
+int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid);
+int ctdb_deregister_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data);
+int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid);
#endif
echo "Testing ping"
$VALGRIND bin/ctdb_control ping || exit 1
+exit 0
echo "Testing status"
$VALGRIND bin/ctdb_control status all || exit 1
printf(" req_call %u\n", s->client.req_call);
printf(" req_message %u\n", s->client.req_message);
printf(" req_finished %u\n", s->client.req_finished);
- printf(" req_register %u\n", s->client.req_register);
printf(" req_connect_wait %u\n", s->client.req_connect_wait);
printf(" req_shutdown %u\n", s->client.req_shutdown);
printf(" req_control %u\n", s->client.req_control);