#include "system/filesys.h"
#include "../include/ctdb_private.h"
+/*
+ find the ctdb_db from a db index
+ */
+ struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
+{
+ struct ctdb_db_context *ctdb_db;
+
+ for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
+ if (ctdb_db->db_id == id) {
+ break;
+ }
+ }
+ return ctdb_db;
+}
+
+
/*
local version of ctdb_call
*/
struct ctdb_call_info *c;
struct ctdb_registered_call *fn;
struct ctdb_context *ctdb = ctdb_db->ctdb;
-
+
c = talloc(ctdb, struct ctdb_call_info);
CTDB_NO_MEMORY(ctdb, c);
data.dptr = c->data + c->keylen;
data.dsize = c->datalen;
- for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
- if (ctdb_db->db_id == c->db_id) {
- break;
- }
- }
+ ctdb_db = find_ctdb_db(ctdb, c->db_id);
if (!ctdb_db) {
ctdb_send_error(ctdb, hdr, -1,
"Unknown database in request. db_id==0x%08x",
struct ctdb_call call;
struct ctdb_db_context *ctdb_db;
- for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
- if (ctdb_db->db_id == c->db_id) {
- break;
- }
- }
+ ctdb_db = find_ctdb_db(ctdb, c->db_id);
if (!ctdb_db) {
ctdb_send_error(ctdb, hdr, -1,
"Unknown database in request. db_id==0x%08x",
(void)talloc_reference(state, c);
state->state = CTDB_CALL_DONE;
+ if (state->async.fn) {
+ state->async.fn(state);
+ }
}
/*
ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn);
state->state = CTDB_CALL_DONE;
+ if (state->async.fn) {
+ state->async.fn(state);
+ }
}
state->state = CTDB_CALL_ERROR;
state->errmsg = (char *)c->msg;
+ if (state->async.fn) {
+ state->async.fn(state);
+ }
}
state->state = CTDB_CALL_ERROR;
ctdb_set_error(state->node->ctdb, "ctdb_call %u timed out",
state->c->hdr.reqid);
+ if (state->async.fn) {
+ state->async.fn(state);
+ }
}
+/*
+ this allows the caller to setup a async.fn
+*/
+static void call_local_trigger(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private)
+{
+ struct ctdb_call_state *state = talloc_get_type(private, struct ctdb_call_state);
+ if (state->async.fn) {
+ state->async.fn(state);
+ }
+}
+
+
/*
construct an event driven local ctdb_call
ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn);
+ event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
+
return state;
}
struct ctdb_call_state *state;
int ret;
+ if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) {
+ return ctdb_client_fetch_lock(ctdb_db, mem_ctx, key, data);
+ }
+
ZERO_STRUCT(call);
call.call_id = CTDB_FETCH_FUNC;
call.key = key;
}
-int ctdb_record_store(struct ctdb_record_handle *rec, TDB_DATA data)
+int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data)
{
int ret;
struct ctdb_ltdb_header header;
ctdb->num_connected = r->num_connected;
}
+/*
+ called in the client when we receive a CTDB_REPLY_FETCH_LOCK from the daemon
+
+ This packet comes in response to a CTDB_REQ_FETCH_LOCK request packet. It
+ contains any reply data from the call
+*/
+void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+ struct ctdb_reply_fetch_lock *c = (struct ctdb_reply_fetch_lock *)hdr;
+ struct ctdb_call_state *state;
+
+ state = idr_find(ctdb->idr, hdr->reqid);
+ if (state == NULL) return;
+
+ state->call.reply_data.dptr = c->data;
+ state->call.reply_data.dsize = c->datalen;
+ state->call.status = c->state;
+
+ talloc_steal(state, c);
+
+ /* get an extra reference here - this prevents the free in ctdb_recv_pkt()
+ from freeing the data */
+ (void)talloc_reference(state, c);
+
+ state->state = CTDB_CALL_DONE;
+ if (state->async.fn) {
+ state->async.fn(state);
+ }
+}
+
/*
this is called in the client, when data comes in from the daemon
*/
case CTDB_REPLY_CONNECT_WAIT:
ctdb_reply_connect_wait(ctdb, hdr);
break;
+
+ case CTDB_REPLY_FETCH_LOCK:
+ ctdb_reply_fetch_lock(ctdb, hdr);
+ break;
+
+ default:
+ printf("bogus operation code:%d\n",hdr->operation);
}
}
ctdb_client_connect_wait(ctdb);
}
+
+
+struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db,
+ TALLOC_CTX *mem_ctx,
+ TDB_DATA key)
+{
+ struct ctdb_call_state *state;
+ struct ctdb_context *ctdb = ctdb_db->ctdb;
+ struct ctdb_req_fetch_lock *req;
+ int len, res;
+
+ /* if the domain socket is not yet open, open it */
+ if (ctdb->daemon.sd==-1) {
+ ux_socket_connect(ctdb);
+ }
+
+ state = talloc_zero(ctdb_db, struct ctdb_call_state);
+ if (state == NULL) {
+ printf("failed to allocate state\n");
+ return NULL;
+ }
+ state->state = CTDB_CALL_WAIT;
+ state->ctdb_db = ctdb_db;
+ len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize;
+ state->c = ctdbd_allocate_pkt(ctdb, len);
+ if (state->c == NULL) {
+ printf("failed to allocate packet\n");
+ return NULL;
+ }
+ ZERO_STRUCT(*state->c);
+ talloc_set_name_const(state->c, "ctdbd req_fetch_lock packet");
+ talloc_steal(state, state->c);
+
+ req = (struct ctdb_req_fetch_lock *)state->c;
+ req->hdr.length = len;
+ req->hdr.ctdb_magic = CTDB_MAGIC;
+ req->hdr.ctdb_version = CTDB_VERSION;
+ req->hdr.operation = CTDB_REQ_FETCH_LOCK;
+ req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
+ req->db_id = ctdb_db->db_id;
+ req->keylen = key.dsize;
+ memcpy(&req->key[0], key.dptr, key.dsize);
+
+ res = ctdb_client_queue_pkt(ctdb, &req->hdr);
+ if (res != 0) {
+ return NULL;
+ }
+
+ talloc_free(req);
+
+ return state;
+}
+
+
+/*
+ make a recv call to the local ctdb daemon - called from client context
+
+ This is called when the program wants to wait for a ctdb_fetch_lock to complete and get the
+ results. This call will block unless the call has already completed.
+*/
+struct ctdb_record_handle *ctdb_client_fetch_lock_recv(struct ctdb_call_state *state, TALLOC_CTX *mem_ctx, TDB_DATA key)
+{
+ struct ctdb_record_handle *rec;
+
+ while (state->state < CTDB_CALL_DONE) {
+ event_loop_once(state->ctdb_db->ctdb->ev);
+ }
+ if (state->state != CTDB_CALL_DONE) {
+ ctdb_set_error(state->node->ctdb, "%s", state->errmsg);
+ talloc_free(state);
+ return NULL;
+ }
+
+ rec = talloc(mem_ctx, struct ctdb_record_handle);
+ CTDB_NO_MEMORY_NULL(state->ctdb_db->ctdb, rec);
+
+ rec->ctdb_db = state->ctdb_db;
+ rec->key = key;
+ rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize);
+ rec->data = talloc(rec, TDB_DATA);
+ rec->data->dsize = state->call.reply_data.dsize;
+ rec->data->dptr = talloc_memdup(rec, state->call.reply_data.dptr, rec->data->dsize);
+
+ return rec;
+}
+
+struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db,
+ TALLOC_CTX *mem_ctx,
+ TDB_DATA key, TDB_DATA *data)
+{
+ struct ctdb_call_state *state;
+ struct ctdb_record_handle *rec;
+
+ state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key);
+ rec = ctdb_client_fetch_lock_recv(state, mem_ctx, key);
+
+ return rec;
+}
}
+static struct ctdb_call_state *ctdb_fetch_lock_send(struct ctdb_db_context *ctdb_db,
+ TALLOC_CTX *mem_ctx,
+ TDB_DATA key, TDB_DATA *data)
+{
+ struct ctdb_call *call;
+ struct ctdb_record_handle *rec;
+ struct ctdb_call_state *state;
+
+ rec = talloc(mem_ctx, struct ctdb_record_handle);
+ CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec);
+
+
+ call = talloc(rec, struct ctdb_call);
+ ZERO_STRUCT(*call);
+ call->call_id = CTDB_FETCH_FUNC;
+ call->key = key;
+ call->flags = CTDB_IMMEDIATE_MIGRATION;
+
+
+ rec->ctdb_db = ctdb_db;
+ rec->key = key;
+ rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize);
+ rec->data = data;
+
+ state = ctdb_call_send(ctdb_db, call);
+ state->fetch_private = rec;
+
+ return state;
+}
+
+static void daemon_fetch_lock_complete(struct ctdb_call_state *state)
+{
+ struct ctdb_reply_fetch_lock *r;
+ struct ctdb_client *client = talloc_get_type(state->async.private, struct ctdb_client);
+ int length, res;
+
+ length = offsetof(struct ctdb_reply_fetch_lock, data) + state->call.reply_data.dsize;
+ r = ctdbd_allocate_pkt(client->ctdb, length);
+ if (r == NULL) {
+ printf("Failed to allocate reply_call in ctdb daemon\n");
+ return;
+ }
+ ZERO_STRUCT(*r);
+ r->hdr.length = length;
+ r->hdr.ctdb_magic = CTDB_MAGIC;
+ r->hdr.ctdb_version = CTDB_VERSION;
+ r->hdr.operation = CTDB_REPLY_FETCH_LOCK;
+ r->hdr.reqid = state->c->hdr.reqid;
+ r->state = state->state;
+ r->datalen = state->call.reply_data.dsize;
+ memcpy(&r->data[0], state->call.reply_data.dptr, r->datalen);
+
+ res = ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, r->hdr.length);
+ if (res != 0) {
+ printf("Failed to queue packet from daemon to client\n");
+ }
+ talloc_free(r);
+}
+
+/*
+ called when the daemon gets a fetch lock request from a client
+ */
+static void daemon_request_fetch_lock(struct ctdb_client *client,
+ struct ctdb_req_fetch_lock *f)
+{
+ struct ctdb_call_state *state;
+ TDB_DATA key, *data;
+ struct ctdb_db_context *ctdb_db;
+
+ ctdb_db = find_ctdb_db(client->ctdb, f->db_id);
+
+ key.dsize = f->keylen;
+ key.dptr = &f->key[0];
+
+ data = talloc(client, TDB_DATA);
+ data->dptr = NULL;
+ data->dsize = 0;
+
+ state = ctdb_fetch_lock_send(ctdb_db, client, key, data);
+ talloc_steal(state, data);
+
+ state->async.fn = daemon_fetch_lock_complete;
+ state->async.private = client;
+}
+
/*
called when the daemon gets a connect wait request from a client
*/
int res;
uint32_t length;
- for (ctdb_db=client->ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
- if (ctdb_db->db_id == c->db_id) {
- break;
- }
- }
+ ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
if (!ctdb_db) {
printf("Unknown database in request. db_id==0x%08x",c->db_id);
return;
case CTDB_REQ_CONNECT_WAIT:
daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
break;
+ case CTDB_REQ_FETCH_LOCK:
+ daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr);
+ break;
}
done:
if (ml->srvid == srvid) break;
}
if (ml == NULL) {
- printf("no msg handler for srvid=%u\n", srvid);
+ printf("daemon vnn:%d no msg handler for srvid=%u\n", ctdb_get_vnn(ctdb), srvid);
/* no registered message handler */
return -1;
}
change the data in a record held with a ctdb_record_handle
if the new data is zero length, this implies a delete of the record
*/
-int ctdb_record_store(struct ctdb_record_handle *rec, TDB_DATA data);
+int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data);
int ctdb_register_message_handler(struct ctdb_context *ctdb,
TALLOC_CTX *mem_ctx,
ctdb_message_fn_t handler,
void *private_data);
+struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id);
+
#endif
int redirect_count;
struct ctdb_ltdb_header header;
void *fetch_private;
+ struct {
+ void (*fn)(struct ctdb_call_state *);
+ void *private;
+ } async;
};
CTDB_REQ_DMASTER = 3,
CTDB_REPLY_DMASTER = 4,
CTDB_REPLY_ERROR = 5,
- CTDB_REQ_REGISTER = 6,
- CTDB_REQ_MESSAGE = 7,
- CTDB_REQ_CONNECT_WAIT = 8,
- CTDB_REPLY_CONNECT_WAIT = 9
+ CTDB_REQ_MESSAGE = 6,
+
+ /* only used on the domain socket */
+ CTDB_REQ_REGISTER = 1000,
+ CTDB_REQ_CONNECT_WAIT = 1001,
+ CTDB_REPLY_CONNECT_WAIT = 1002,
+ CTDB_REQ_FETCH_LOCK = 1003,
+ CTDB_REPLY_FETCH_LOCK = 1004
};
#define CTDB_MAGIC 0x43544442 /* CTDB */
uint32_t num_connected;
};
+struct ctdb_req_fetch_lock {
+ struct ctdb_req_header hdr;
+ uint32_t db_id;
+ uint32_t keylen;
+ uint8_t key[1]; /* key[] */
+};
+
+struct ctdb_reply_fetch_lock {
+ struct ctdb_req_header hdr;
+ uint32_t state;
+ uint32_t datalen;
+ uint8_t data[1]; /* data[] */
+};
+
/* internal prototypes */
void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...) PRINTF_ATTRIBUTE(2,3);
void ctdb_fatal(struct ctdb_context *ctdb, const char *msg);
*/
void ctdb_daemon_connect_wait(struct ctdb_context *ctdb);
+
+/*
+ do a fetch lock from a client to the local daemon
+*/
+struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db,
+ TALLOC_CTX *mem_ctx,
+ TDB_DATA key, TDB_DATA *data);
+
#endif
msg_count, ctdb_get_vnn(ctdb));
data.dsize = strlen((const char *)data.dptr)+1;
- ret = ctdb_record_store(rec, data);
+ ret = ctdb_store_unlock(rec, data);
if (ret != 0) {
printf("Failed to store record\n");
}
srvid = num_clients-1;
}
- /* wait until all nodes are connected (should not be needed
- outside of test code) */
ctdb_set_message_handler(ctdb, srvid, message_handler, NULL);
+ /* wait until all nodes are connected (should not be needed
+ outside of test code) */
ctdb_connect_wait(ctdb);
sleep(3);
echo "Trying 2 nodes"
bin/ctdb_fetch --nlist tests/nodes.txt --listen 127.0.0.2:9001 $* &
-bin/ctdb_fetch --nlist tests/nodes.txt --listen 127.0.0.1:9001 $*
+gdb --args bin/ctdb_fetch --nlist tests/nodes.txt --listen 127.0.0.1:9001 $*
killall -q ctdb_fetch