*/
int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
{
- struct ctdb_fetch_handle *rec;
-
while (state->state < CTDB_CALL_DONE) {
event_loop_once(state->node->ctdb->ev);
}
return -1;
}
- rec = state->fetch_private;
-
- /* ugly hack to manage forced migration */
- if (rec != NULL) {
- rec->header = state->header;
- rec->data->dptr = talloc_steal(rec, state->call.reply_data.dptr);
- rec->data->dsize = state->call.reply_data.dsize;
- talloc_set_name_const(rec->data->dptr, __location__);
- talloc_free(state);
- return 0;
- }
-
if (state->call.reply_data.dsize) {
call->reply_data.dptr = talloc_memdup(state->node->ctdb,
state->call.reply_data.dptr,
ctdb->num_connected = r->num_connected;
}
-enum fetch_lock_state { CTDB_FETCH_LOCK_WAIT, CTDB_FETCH_LOCK_DONE, CTDB_FETCH_LOCK_ERROR };
-
-/*
- state of a in-progress ctdb call
-*/
-struct ctdb_fetch_lock_state {
- enum fetch_lock_state state;
- struct ctdb_db_context *ctdb_db;
- struct ctdb_reply_fetch_lock *r;
- struct ctdb_req_fetch_lock *req;
- struct ctdb_ltdb_header header;
-};
-
-
-
-/*
- called in the client when we receive a CTDB_REPLY_FETCH_LOCK from the daemon
-
- This packet comes in response to a CTDB_REQ_FETCH_LOCK request packet. It
- contains any reply data from the call
-*/
-void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
-{
- struct ctdb_reply_fetch_lock *r = (struct ctdb_reply_fetch_lock *)hdr;
- struct ctdb_fetch_lock_state *state;
-
- state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_fetch_lock_state);
- if (state == NULL) {
- DEBUG(0, ("reqid %d not found at %s\n", hdr->reqid,
- __location__));
- return;
- }
-
- state->r = talloc_steal(state, r);
-
- state->state = CTDB_FETCH_LOCK_DONE;
-}
-
/*
state of a in-progress ctdb call in client
*/
ctdb_reply_connect_wait(ctdb, hdr);
break;
- case CTDB_REPLY_FETCH_LOCK:
- ctdb_reply_fetch_lock(ctdb, hdr);
- break;
-
default:
DEBUG(0,("bogus operation code:%d\n",hdr->operation));
}
ctdb_daemon_connect_wait(ctdb);
}
-static int ctdb_fetch_lock_destructor(struct ctdb_fetch_lock_state *state)
-{
- idr_remove(state->ctdb_db->ctdb->idr, state->req->hdr.reqid);
- return 0;
-}
-
/*
- send a fetch lock request from the client to the daemon. This just asks for
- a record migration to the current node
+ cancel a ctdb_fetch_lock operation, releasing the lock
*/
-static struct ctdb_fetch_lock_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db,
- TALLOC_CTX *mem_ctx,
- TDB_DATA key)
-{
- struct ctdb_fetch_lock_state *state;
- struct ctdb_context *ctdb = ctdb_db->ctdb;
- struct ctdb_req_fetch_lock *req;
- int len, res;
-
- /* if the domain socket is not yet open, open it */
- if (ctdb->daemon.sd==-1) {
- ux_socket_connect(ctdb);
- }
-
- state = talloc_zero(ctdb_db, struct ctdb_fetch_lock_state);
- if (state == NULL) {
- DEBUG(0, (__location__ " failed to allocate state\n"));
- return NULL;
- }
- state->state = CTDB_FETCH_LOCK_WAIT;
- state->ctdb_db = ctdb_db;
- len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize;
- state->req = req = ctdbd_allocate_pkt(state, len);
- if (req == NULL) {
- DEBUG(0, (__location__ " failed to allocate packet\n"));
- return NULL;
- }
- ZERO_STRUCT(*req);
- talloc_set_name_const(req, "ctdbd req_fetch_lock packet");
-
- req->hdr.length = len;
- req->hdr.ctdb_magic = CTDB_MAGIC;
- req->hdr.ctdb_version = CTDB_VERSION;
- req->hdr.operation = CTDB_REQ_FETCH_LOCK;
- req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
- req->db_id = ctdb_db->db_id;
- req->keylen = key.dsize;
- memcpy(&req->key[0], key.dptr, key.dsize);
-
- talloc_set_destructor(state, ctdb_fetch_lock_destructor);
-
- res = ctdb_client_queue_pkt(ctdb, &req->hdr);
- if (res != 0) {
- return NULL;
- }
-
- return state;
-}
-
-
-/*
- make a recv call to the local ctdb daemon - called from client context
-
- This is called when the program wants to wait for a ctdb_fetch_lock to complete and get the
- results. This call will block unless the call has already completed.
-*/
-int ctdb_client_fetch_lock_recv(struct ctdb_fetch_lock_state *state)
+static int fetch_lock_destructor(struct ctdb_record_handle *h)
{
- while (state->state < CTDB_FETCH_LOCK_DONE) {
- event_loop_once(state->ctdb_db->ctdb->ev);
- }
- if (state->state != CTDB_FETCH_LOCK_DONE) {
- talloc_free(state);
- return -1;
- }
- talloc_free(state);
+ ctdb_ltdb_unlock(h->ctdb_db, h->key);
return 0;
}
/*
- cancel a ctdb_fetch_lock operation, releasing the lock
+ force the migration of a record to this node
*/
-static int fetch_lock_destructor(struct ctdb_record_handle *h)
+static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
{
- ctdb_ltdb_unlock(h->ctdb_db, h->key);
- return 0;
+ struct ctdb_call call;
+ ZERO_STRUCT(call);
+ call.call_id = CTDB_NULL_FUNC;
+ call.key = key;
+ call.flags = CTDB_IMMEDIATE_MIGRATION;
+ return ctdb_call(ctdb_db, &call);
}
/*
{
int ret;
struct ctdb_record_handle *h;
- struct ctdb_fetch_lock_state *state;
/*
procedure is as follows:
DEBUG(4,("ctdb_fetch_lock: done local fetch\n"));
if (h->header.dmaster != ctdb_db->ctdb->vnn) {
- /* we're not the dmaster - ask the ctdb daemon to make us dmaster */
- state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key);
ctdb_ltdb_unlock(ctdb_db, key);
- DEBUG(4,("ctdb_fetch_lock: done fetch_lock_send\n"));
- ret = ctdb_client_fetch_lock_recv(state);
+ ret = ctdb_client_force_migration(ctdb_db, key);
if (ret != 0) {
- DEBUG(4,("ctdb_fetch_lock: fetch_lock_recv failed\n"));
+ DEBUG(4,("ctdb_fetch_lock: force_migration failed\n"));
talloc_free(h);
return NULL;
}
}
-/*
- send a fetch lock request (actually a ctdb_call()) to a remote node
- */
-static struct ctdb_call_state *ctdb_daemon_fetch_lock_send(struct ctdb_db_context *ctdb_db,
- TALLOC_CTX *mem_ctx,
- TDB_DATA key, struct ctdb_ltdb_header *header,
- TDB_DATA *data)
-{
- struct ctdb_call *call;
- struct ctdb_fetch_handle *rec;
- struct ctdb_call_state *state;
-
- rec = talloc(mem_ctx, struct ctdb_fetch_handle);
- CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec);
-
-
- call = talloc(rec, struct ctdb_call);
- ZERO_STRUCT(*call);
- call->call_id = CTDB_FETCH_FUNC;
- call->key = key;
- call->flags = CTDB_IMMEDIATE_MIGRATION;
-
- rec->ctdb_db = ctdb_db;
- rec->key = key;
- rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize);
- rec->data = data;
-
- state = ctdb_daemon_call_send_remote(ctdb_db, call, header);
- state->fetch_private = rec;
- talloc_steal(state, rec);
- talloc_steal(mem_ctx, state);
-
- return state;
-}
-
/*
called when the daemon gets a shutdown request from a client
*/
}
-struct client_fetch_lock_data {
- struct ctdb_client *client;
- struct ctdb_req_fetch_lock *f;
-};
-
-/*
- send a fetch lock error reply to the client
- */
-static void daemon_fetch_lock_reply(struct ctdb_client *client,
- struct ctdb_req_fetch_lock *f,
- int state)
-{
- struct ctdb_reply_fetch_lock r;
-
- ZERO_STRUCT(r);
- r.hdr.length = sizeof(r);
- r.hdr.ctdb_magic = CTDB_MAGIC;
- r.hdr.ctdb_version = CTDB_VERSION;
- r.hdr.operation = CTDB_REPLY_FETCH_LOCK;
- r.hdr.reqid = f->hdr.reqid;
- r.state = state;
-
- ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length);
-}
-
-/*
- called when a remote fetch lock finishes
- */
-static void daemon_fetch_lock_complete(struct ctdb_call_state *state)
-{
- struct client_fetch_lock_data *data = talloc_get_type(state->async.private_data,
- struct client_fetch_lock_data);
- struct ctdb_client *client = talloc_get_type(data->client, struct ctdb_client);
-
- daemon_fetch_lock_reply(client, data->f, 0);
- talloc_free(state);
-}
-
-
-
-/*
- called when the daemon gets a fetch lock request from a client
- */
-static void daemon_request_fetch_lock(struct ctdb_client *client,
- struct ctdb_req_fetch_lock *f)
-{
- struct ctdb_call_state *state;
- TDB_DATA key, *data;
- struct ctdb_db_context *ctdb_db;
- struct client_fetch_lock_data *fl_data;
- struct ctdb_ltdb_header header;
- int ret;
-
- ctdb_db = find_ctdb_db(client->ctdb, f->db_id);
- if (ctdb_db == NULL) {
- daemon_fetch_lock_reply(client, f, -1);
- return;
- }
-
- key.dsize = f->keylen;
- key.dptr = &f->key[0];
-
- /* XXX - needs non-blocking lock here */
-
- ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, NULL);
- if (ret != 0) {
- daemon_fetch_lock_reply(client, f, -1);
- return;
- }
-
- if (header.dmaster == ctdb_db->ctdb->vnn) {
- /* we already are the dmaster */
- daemon_fetch_lock_reply(client, f, 0);
- return;
- }
-
- data = talloc(client, TDB_DATA);
- data->dptr = NULL;
- data->dsize = 0;
-
- state = ctdb_daemon_fetch_lock_send(ctdb_db, client, key, &header, data);
- talloc_steal(state, data);
-
- fl_data = talloc(state, struct client_fetch_lock_data);
- fl_data->client = client;
- fl_data->f = talloc_steal(fl_data, f);
-
- state->async.fn = daemon_fetch_lock_complete;
- state->async.private_data = fl_data;
-}
/*
called when the daemon gets a connect wait request from a client
daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
break;
- case CTDB_REQ_FETCH_LOCK:
- daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr);
- break;
-
case CTDB_REQ_SHUTDOWN:
daemon_request_shutdown(client, (struct ctdb_req_shutdown *)hdr);
break;
/*
this is the dummy null procedure that all databases support
*/
-static int ctdb_fetch_func(struct ctdb_call_info *call)
+static int ctdb_null_func(struct ctdb_call_info *call)
{
- call->reply_data = &call->record_data;
return 0;
}
/*
- all databases support the "fetch" function. we need this in order to do forced migration of records
+ all databases support the "null" function. we need this in
+ order to do forced migration of records
*/
- ret = ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
+ ret = ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
if (ret != 0) {
talloc_free(ctdb_db);
return NULL;
#define CTDB_DS_ALIGNMENT 8
+#define CTDB_NULL_FUNC 0xf0000001
-#define CTDB_FETCH_FUNC 0xf0000001
/*
an installed ctdb remote call
*/
struct ctdb_call call;
int redirect_count;
struct ctdb_ltdb_header header;
- void *fetch_private;
struct {
void (*fn)(struct ctdb_call_state *);
void *private_data;
CTDB_REQ_REGISTER = 1000,
CTDB_REQ_CONNECT_WAIT = 1001,
CTDB_REPLY_CONNECT_WAIT = 1002,
- CTDB_REQ_FETCH_LOCK = 1003,
- CTDB_REPLY_FETCH_LOCK = 1004,
- CTDB_REQ_SHUTDOWN = 1005
+ CTDB_REQ_SHUTDOWN = 1003
};
#define CTDB_MAGIC 0x43544442 /* CTDB */