/*
local version of ctdb_call
*/
-static int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
- struct ctdb_ltdb_header *header, TDB_DATA *data,
- uint32_t caller)
+int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
+ struct ctdb_ltdb_header *header, TDB_DATA *data,
+ uint32_t caller)
{
struct ctdb_call_info *c;
struct ctdb_registered_call *fn;
struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
struct ctdb_call_state *state;
- state = idr_find(ctdb->idr, hdr->reqid);
+ state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
if (state == NULL) {
DEBUG(0, ("reqid %d not found\n", hdr->reqid));
return;
}
- if (!talloc_get_type(state, struct ctdb_call_state)) {
- DEBUG(0,("ctdb idr type error at %s\n", __location__));
- return;
- }
-
state->call.reply_data.dptr = c->data;
state->call.reply_data.dsize = c->datalen;
state->call.status = c->status;
struct ctdb_db_context *ctdb_db;
TDB_DATA data;
- state = idr_find(ctdb->idr, hdr->reqid);
+ state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
if (state == NULL) {
return;
}
- if (!talloc_get_type(state, struct ctdb_call_state)) {
- DEBUG(0,("ctdb idr type error at %s\n", __location__));
- return;
- }
-
ctdb_db = state->ctdb_db;
data.dptr = c->data;
struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
struct ctdb_call_state *state;
- state = idr_find(ctdb->idr, hdr->reqid);
+ state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
if (state == NULL) return;
- if (!talloc_get_type(state, struct ctdb_call_state)) {
- DEBUG(0,("ctdb idr type error at %s\n", __location__));
- return;
- }
-
talloc_steal(state, c);
state->state = CTDB_CALL_ERROR;
struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr;
struct ctdb_call_state *state;
- state = idr_find(ctdb->idr, hdr->reqid);
+ state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
if (state == NULL) return;
- if (!talloc_get_type(state, struct ctdb_call_state)) {
- DEBUG(0,("ctdb idr type error at %s\n", __location__));
- return;
- }
-
talloc_steal(state, c);
/* don't allow for too many redirects */
struct ctdb_reply_fetch_lock *r = (struct ctdb_reply_fetch_lock *)hdr;
struct ctdb_fetch_lock_state *state;
- state = idr_find(ctdb->idr, hdr->reqid);
+ state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_fetch_lock_state);
if (state == NULL) {
DEBUG(0, ("reqid %d not found at %s\n", hdr->reqid,
__location__));
return;
}
- if (!talloc_get_type(state, struct ctdb_fetch_lock_state)) {
- DEBUG(0, ("ctdb idr type error at %s, it's a %s\n",
- __location__, talloc_get_name(state)));
+ state->r = talloc_steal(state, r);
+
+ state->state = CTDB_FETCH_LOCK_DONE;
+}
+
+/*
+ state of a in-progress ctdb call in client
+*/
+struct ctdb_client_call_state {
+ enum call_state state;
+ uint32_t reqid;
+ struct ctdb_db_context *ctdb_db;
+ struct ctdb_call call;
+};
+
+/*
+ called when a CTDB_REPLY_CALL packet comes in in the client
+
+ This packet comes in response to a CTDB_REQ_CALL request packet. It
+ contains any reply data from the call
+*/
+static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+ struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
+ struct ctdb_client_call_state *state;
+
+ state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_client_call_state);
+ if (state == NULL) {
+ DEBUG(0, ("reqid %d not found\n", hdr->reqid));
return;
}
- state->r = talloc_steal(state, r);
+ state->call.reply_data.dptr = c->data;
+ state->call.reply_data.dsize = c->datalen;
+ state->call.status = c->status;
- state->state = CTDB_FETCH_LOCK_DONE;
+ talloc_steal(state, c);
+
+ state->state = CTDB_CALL_DONE;
}
/*
switch (hdr->operation) {
case CTDB_REPLY_CALL:
- ctdb_reply_call(ctdb, hdr);
+ ctdb_client_reply_call(ctdb, hdr);
break;
case CTDB_REQ_MESSAGE:
struct ctdb_ltdb_header header;
};
+
/*
make a recv call to the local ctdb daemon - called from client context
This is called when the program wants to wait for a ctdb_call to complete and get the
results. This call will block unless the call has already completed.
*/
-int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
+int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
{
- struct ctdb_record_handle *rec;
-
while (state->state < CTDB_CALL_DONE) {
- event_loop_once(state->node->ctdb->ev);
+ event_loop_once(state->ctdb_db->ctdb->ev);
}
if (state->state != CTDB_CALL_DONE) {
- ctdb_set_error(state->node->ctdb, "%s", state->errmsg);
+ DEBUG(0,(__location__ " ctdb_call_recv failed\n"));
talloc_free(state);
return -1;
}
- rec = state->fetch_private;
-
- /* ugly hack to manage forced migration */
- if (rec != NULL) {
- rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr);
- rec->data->dsize = state->call.reply_data.dsize;
- talloc_free(state);
- return 0;
- }
-
if (state->call.reply_data.dsize) {
- call->reply_data.dptr = talloc_memdup(state->node->ctdb,
+ call->reply_data.dptr = talloc_memdup(state->ctdb_db,
state->call.reply_data.dptr,
state->call.reply_data.dsize);
call->reply_data.dsize = state->call.reply_data.dsize;
/*
destroy a ctdb_call in client
*/
-static int ctdb_client_call_destructor(struct ctdb_call_state *state)
+static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)
{
- idr_remove(state->node->ctdb->idr, state->c->hdr.reqid);
+ idr_remove(state->ctdb_db->ctdb->idr, state->reqid);
return 0;
}
+/*
+ construct an event driven local ctdb_call
+
+ this is used so that locally processed ctdb_call requests are processed
+ in an event driven manner
+*/
+static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db,
+ struct ctdb_call *call,
+ struct ctdb_ltdb_header *header,
+ TDB_DATA *data)
+{
+ struct ctdb_client_call_state *state;
+ struct ctdb_context *ctdb = ctdb_db->ctdb;
+ int ret;
+
+ state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
+ CTDB_NO_MEMORY_NULL(ctdb, state);
+
+ talloc_steal(state, data->dptr);
+
+ state->state = CTDB_CALL_DONE;
+ state->call = *call;
+ state->ctdb_db = ctdb_db;
+
+ ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn);
+ talloc_steal(state, state->call.reply_data.dptr);
+ return state;
+}
/*
make a ctdb call to the local daemon - async send. Called from client context.
This constructs a ctdb_call request and queues it for processing.
This call never blocks.
*/
-struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
+struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
struct ctdb_call *call)
{
- struct ctdb_call_state *state;
+ struct ctdb_client_call_state *state;
struct ctdb_context *ctdb = ctdb_db->ctdb;
struct ctdb_ltdb_header header;
TDB_DATA data;
int ret;
size_t len;
+ struct ctdb_req_call *c;
/* if the domain socket is not yet open, open it */
if (ctdb->daemon.sd==-1) {
ux_socket_connect(ctdb);
}
+ ret = ctdb_ltdb_lock(ctdb_db, call->key);
+ if (ret != 0) {
+ DEBUG(0,(__location__ " Failed to get chainlock\n"));
+ return NULL;
+ }
+
ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
if (ret != 0) {
+ ctdb_ltdb_unlock(ctdb_db, call->key);
+ DEBUG(0,(__location__ " Failed to fetch record\n"));
return NULL;
}
-#if 0
if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
- state = ctdb_call_local_send(ctdb_db, call, &header, &data);
+ state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
talloc_free(data.dptr);
+ ctdb_ltdb_unlock(ctdb_db, call->key);
return state;
}
-#endif
- state = talloc_zero(ctdb_db, struct ctdb_call_state);
+ ctdb_ltdb_unlock(ctdb_db, call->key);
+ talloc_free(data.dptr);
+
+ state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
if (state == NULL) {
DEBUG(0, (__location__ " failed to allocate state\n"));
return NULL;
}
- talloc_steal(state, data.dptr);
-
len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
- state->c = ctdbd_allocate_pkt(state, len);
- if (state->c == NULL) {
+ c = ctdbd_allocate_pkt(state, len);
+ if (c == NULL) {
DEBUG(0, (__location__ " failed to allocate packet\n"));
return NULL;
}
- talloc_set_name_const(state->c, "ctdbd req_call packet");
+ talloc_set_name_const(c, "ctdb client req_call packet");
+ memset(c, 0, offsetof(struct ctdb_req_call, data));
- state->c->hdr.length = len;
- state->c->hdr.ctdb_magic = CTDB_MAGIC;
- state->c->hdr.ctdb_version = CTDB_VERSION;
- state->c->hdr.operation = CTDB_REQ_CALL;
- state->c->hdr.destnode = header.dmaster;
- state->c->hdr.srcnode = ctdb->vnn;
+ c->hdr.length = len;
+ c->hdr.ctdb_magic = CTDB_MAGIC;
+ c->hdr.ctdb_version = CTDB_VERSION;
+ c->hdr.operation = CTDB_REQ_CALL;
/* this limits us to 16k outstanding messages - not unreasonable */
- state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
- state->c->flags = call->flags;
- state->c->db_id = ctdb_db->db_id;
- state->c->callid = call->call_id;
- state->c->keylen = call->key.dsize;
- state->c->calldatalen = call->call_data.dsize;
- memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
- memcpy(&state->c->data[call->key.dsize],
+ c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
+ c->flags = call->flags;
+ c->db_id = ctdb_db->db_id;
+ c->callid = call->call_id;
+ c->keylen = call->key.dsize;
+ c->calldatalen = call->call_data.dsize;
+ memcpy(&c->data[0], call->key.dptr, call->key.dsize);
+ memcpy(&c->data[call->key.dsize],
call->call_data.dptr, call->call_data.dsize);
state->call = *call;
- state->call.call_data.dptr = &state->c->data[call->key.dsize];
- state->call.key.dptr = &state->c->data[0];
+ state->call.call_data.dptr = &c->data[call->key.dsize];
+ state->call.key.dptr = &c->data[0];
- state->node = ctdb->nodes[header.dmaster];
state->state = CTDB_CALL_WAIT;
- state->header = header;
state->ctdb_db = ctdb_db;
+ state->reqid = c->hdr.reqid;
talloc_set_destructor(state, ctdb_client_call_destructor);
- ctdb_client_queue_pkt(ctdb, &state->c->hdr);
-
-/*XXX set up timeout to cleanup if server doesnt respond
- event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0),
- ctdb_call_timeout, state);
-*/
+ ctdb_client_queue_pkt(ctdb, &c->hdr);
return state;
}
*/
int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
{
- struct ctdb_call_state *state;
+ struct ctdb_client_call_state *state;
state = ctdb_call_send(ctdb_db, call);
return ctdb_call_recv(state, call);