From: Ronnie sahlberg Date: Thu, 12 Apr 2007 05:46:50 +0000 (+1000) Subject: initial support for two new pdus for the domain socket to do fetch_lock X-Git-Tag: tevent-0.9.20~348^2~2931^2~1 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=35ffefb01f19448545c7273536006270a44919a2;p=thirdparty%2Fsamba.git initial support for two new pdus for the domain socket to do fetch_lock no locking is yet done and the store_unlock call is still missing the ./tests/fetch.sh --daemon test fails with parent process dying which needs to be investigated. (This used to be ctdb commit 7d7141c968950a8856f1be79871932b688bfb07f) --- diff --git a/ctdb/common/ctdb_call.c b/ctdb/common/ctdb_call.c index 35ccc4bf0ec..5938cacab2a 100644 --- a/ctdb/common/ctdb_call.c +++ b/ctdb/common/ctdb_call.c @@ -28,6 +28,22 @@ #include "system/filesys.h" #include "../include/ctdb_private.h" +/* + find the ctdb_db from a db index + */ + struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id) +{ + struct ctdb_db_context *ctdb_db; + + for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) { + if (ctdb_db->db_id == id) { + break; + } + } + return ctdb_db; +} + + /* local version of ctdb_call */ @@ -38,7 +54,7 @@ static int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *ca struct ctdb_call_info *c; struct ctdb_registered_call *fn; struct ctdb_context *ctdb = ctdb_db->ctdb; - + c = talloc(ctdb, struct ctdb_call_info); CTDB_NO_MEMORY(ctdb, c); @@ -242,11 +258,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr data.dptr = c->data + c->keylen; data.dsize = c->datalen; - for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) { - if (ctdb_db->db_id == c->db_id) { - break; - } - } + ctdb_db = find_ctdb_db(ctdb, c->db_id); if (!ctdb_db) { ctdb_send_error(ctdb, hdr, -1, "Unknown database in request. db_id==0x%08x", @@ -311,11 +323,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) struct ctdb_call call; struct ctdb_db_context *ctdb_db; - for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) { - if (ctdb_db->db_id == c->db_id) { - break; - } - } + ctdb_db = find_ctdb_db(ctdb, c->db_id); if (!ctdb_db) { ctdb_send_error(ctdb, hdr, -1, "Unknown database in request. db_id==0x%08x", @@ -409,6 +417,9 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) (void)talloc_reference(state, c); state->state = CTDB_CALL_DONE; + if (state->async.fn) { + state->async.fn(state); + } } /* @@ -448,6 +459,9 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn); state->state = CTDB_CALL_DONE; + if (state->async.fn) { + state->async.fn(state); + } } @@ -466,6 +480,9 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) state->state = CTDB_CALL_ERROR; state->errmsg = (char *)c->msg; + if (state->async.fn) { + state->async.fn(state); + } } @@ -517,8 +534,24 @@ void ctdb_call_timeout(struct event_context *ev, struct timed_event *te, state->state = CTDB_CALL_ERROR; ctdb_set_error(state->node->ctdb, "ctdb_call %u timed out", state->c->hdr.reqid); + if (state->async.fn) { + state->async.fn(state); + } } +/* + this allows the caller to setup a async.fn +*/ +static void call_local_trigger(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private) +{ + struct ctdb_call_state *state = talloc_get_type(private, struct ctdb_call_state); + if (state->async.fn) { + state->async.fn(state); + } +} + + /* construct an event driven local ctdb_call @@ -546,6 +579,8 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn); + event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state); + return state; } @@ -717,6 +752,10 @@ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALL struct ctdb_call_state *state; int ret; + if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) { + return ctdb_client_fetch_lock(ctdb_db, mem_ctx, key, data); + } + ZERO_STRUCT(call); call.call_id = CTDB_FETCH_FUNC; call.key = key; @@ -743,7 +782,7 @@ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALL } -int ctdb_record_store(struct ctdb_record_handle *rec, TDB_DATA data) +int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data) { int ret; struct ctdb_ltdb_header header; diff --git a/ctdb/common/ctdb_client.c b/ctdb/common/ctdb_client.c index 30049b76d8d..74e41d0d5b3 100644 --- a/ctdb/common/ctdb_client.c +++ b/ctdb/common/ctdb_client.c @@ -48,6 +48,36 @@ static void ctdb_reply_connect_wait(struct ctdb_context *ctdb, ctdb->num_connected = r->num_connected; } +/* + called in the client when we receive a CTDB_REPLY_FETCH_LOCK from the daemon + + This packet comes in response to a CTDB_REQ_FETCH_LOCK request packet. It + contains any reply data from the call +*/ +void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) +{ + struct ctdb_reply_fetch_lock *c = (struct ctdb_reply_fetch_lock *)hdr; + struct ctdb_call_state *state; + + state = idr_find(ctdb->idr, hdr->reqid); + if (state == NULL) return; + + state->call.reply_data.dptr = c->data; + state->call.reply_data.dsize = c->datalen; + state->call.status = c->state; + + talloc_steal(state, c); + + /* get an extra reference here - this prevents the free in ctdb_recv_pkt() + from freeing the data */ + (void)talloc_reference(state, c); + + state->state = CTDB_CALL_DONE; + if (state->async.fn) { + state->async.fn(state); + } +} + /* this is called in the client, when data comes in from the daemon */ @@ -89,6 +119,13 @@ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args) case CTDB_REPLY_CONNECT_WAIT: ctdb_reply_connect_wait(ctdb, hdr); break; + + case CTDB_REPLY_FETCH_LOCK: + ctdb_reply_fetch_lock(ctdb, hdr); + break; + + default: + printf("bogus operation code:%d\n",hdr->operation); } } @@ -403,3 +440,101 @@ void ctdb_connect_wait(struct ctdb_context *ctdb) ctdb_client_connect_wait(ctdb); } + + +struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db, + TALLOC_CTX *mem_ctx, + TDB_DATA key) +{ + struct ctdb_call_state *state; + struct ctdb_context *ctdb = ctdb_db->ctdb; + struct ctdb_req_fetch_lock *req; + int len, res; + + /* if the domain socket is not yet open, open it */ + if (ctdb->daemon.sd==-1) { + ux_socket_connect(ctdb); + } + + state = talloc_zero(ctdb_db, struct ctdb_call_state); + if (state == NULL) { + printf("failed to allocate state\n"); + return NULL; + } + state->state = CTDB_CALL_WAIT; + state->ctdb_db = ctdb_db; + len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize; + state->c = ctdbd_allocate_pkt(ctdb, len); + if (state->c == NULL) { + printf("failed to allocate packet\n"); + return NULL; + } + ZERO_STRUCT(*state->c); + talloc_set_name_const(state->c, "ctdbd req_fetch_lock packet"); + talloc_steal(state, state->c); + + req = (struct ctdb_req_fetch_lock *)state->c; + req->hdr.length = len; + req->hdr.ctdb_magic = CTDB_MAGIC; + req->hdr.ctdb_version = CTDB_VERSION; + req->hdr.operation = CTDB_REQ_FETCH_LOCK; + req->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF); + req->db_id = ctdb_db->db_id; + req->keylen = key.dsize; + memcpy(&req->key[0], key.dptr, key.dsize); + + res = ctdb_client_queue_pkt(ctdb, &req->hdr); + if (res != 0) { + return NULL; + } + + talloc_free(req); + + return state; +} + + +/* + make a recv call to the local ctdb daemon - called from client context + + This is called when the program wants to wait for a ctdb_fetch_lock to complete and get the + results. This call will block unless the call has already completed. +*/ +struct ctdb_record_handle *ctdb_client_fetch_lock_recv(struct ctdb_call_state *state, TALLOC_CTX *mem_ctx, TDB_DATA key) +{ + struct ctdb_record_handle *rec; + + while (state->state < CTDB_CALL_DONE) { + event_loop_once(state->ctdb_db->ctdb->ev); + } + if (state->state != CTDB_CALL_DONE) { + ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + talloc_free(state); + return NULL; + } + + rec = talloc(mem_ctx, struct ctdb_record_handle); + CTDB_NO_MEMORY_NULL(state->ctdb_db->ctdb, rec); + + rec->ctdb_db = state->ctdb_db; + rec->key = key; + rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize); + rec->data = talloc(rec, TDB_DATA); + rec->data->dsize = state->call.reply_data.dsize; + rec->data->dptr = talloc_memdup(rec, state->call.reply_data.dptr, rec->data->dsize); + + return rec; +} + +struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db, + TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data) +{ + struct ctdb_call_state *state; + struct ctdb_record_handle *rec; + + state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key); + rec = ctdb_client_fetch_lock_recv(state, mem_ctx, key); + + return rec; +} diff --git a/ctdb/common/ctdb_daemon.c b/ctdb/common/ctdb_daemon.c index 72ab7ed0b10..ad9bb325e10 100644 --- a/ctdb/common/ctdb_daemon.c +++ b/ctdb/common/ctdb_daemon.c @@ -110,6 +110,91 @@ static void daemon_request_register_message_handler(struct ctdb_client *client, } +static struct ctdb_call_state *ctdb_fetch_lock_send(struct ctdb_db_context *ctdb_db, + TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data) +{ + struct ctdb_call *call; + struct ctdb_record_handle *rec; + struct ctdb_call_state *state; + + rec = talloc(mem_ctx, struct ctdb_record_handle); + CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec); + + + call = talloc(rec, struct ctdb_call); + ZERO_STRUCT(*call); + call->call_id = CTDB_FETCH_FUNC; + call->key = key; + call->flags = CTDB_IMMEDIATE_MIGRATION; + + + rec->ctdb_db = ctdb_db; + rec->key = key; + rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize); + rec->data = data; + + state = ctdb_call_send(ctdb_db, call); + state->fetch_private = rec; + + return state; +} + +static void daemon_fetch_lock_complete(struct ctdb_call_state *state) +{ + struct ctdb_reply_fetch_lock *r; + struct ctdb_client *client = talloc_get_type(state->async.private, struct ctdb_client); + int length, res; + + length = offsetof(struct ctdb_reply_fetch_lock, data) + state->call.reply_data.dsize; + r = ctdbd_allocate_pkt(client->ctdb, length); + if (r == NULL) { + printf("Failed to allocate reply_call in ctdb daemon\n"); + return; + } + ZERO_STRUCT(*r); + r->hdr.length = length; + r->hdr.ctdb_magic = CTDB_MAGIC; + r->hdr.ctdb_version = CTDB_VERSION; + r->hdr.operation = CTDB_REPLY_FETCH_LOCK; + r->hdr.reqid = state->c->hdr.reqid; + r->state = state->state; + r->datalen = state->call.reply_data.dsize; + memcpy(&r->data[0], state->call.reply_data.dptr, r->datalen); + + res = ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, r->hdr.length); + if (res != 0) { + printf("Failed to queue packet from daemon to client\n"); + } + talloc_free(r); +} + +/* + called when the daemon gets a fetch lock request from a client + */ +static void daemon_request_fetch_lock(struct ctdb_client *client, + struct ctdb_req_fetch_lock *f) +{ + struct ctdb_call_state *state; + TDB_DATA key, *data; + struct ctdb_db_context *ctdb_db; + + ctdb_db = find_ctdb_db(client->ctdb, f->db_id); + + key.dsize = f->keylen; + key.dptr = &f->key[0]; + + data = talloc(client, TDB_DATA); + data->dptr = NULL; + data->dsize = 0; + + state = ctdb_fetch_lock_send(ctdb_db, client, key, data); + talloc_steal(state, data); + + state->async.fn = daemon_fetch_lock_complete; + state->async.private = client; +} + /* called when the daemon gets a connect wait request from a client */ @@ -191,11 +276,7 @@ static void daemon_request_call_from_client(struct ctdb_client *client, int res; uint32_t length; - for (ctdb_db=client->ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) { - if (ctdb_db->db_id == c->db_id) { - break; - } - } + ctdb_db = find_ctdb_db(client->ctdb, c->db_id); if (!ctdb_db) { printf("Unknown database in request. db_id==0x%08x",c->db_id); return; @@ -271,6 +352,9 @@ static void client_incoming_packet(struct ctdb_client *client, void *data, size_ case CTDB_REQ_CONNECT_WAIT: daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr); break; + case CTDB_REQ_FETCH_LOCK: + daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr); + break; } done: diff --git a/ctdb/common/ctdb_message.c b/ctdb/common/ctdb_message.c index ebba1d8faa2..0ee77f252b6 100644 --- a/ctdb/common/ctdb_message.c +++ b/ctdb/common/ctdb_message.c @@ -42,7 +42,7 @@ static int ctdb_dispatch_message(struct ctdb_context *ctdb, uint32_t srvid, TDB_ if (ml->srvid == srvid) break; } if (ml == NULL) { - printf("no msg handler for srvid=%u\n", srvid); + printf("daemon vnn:%d no msg handler for srvid=%u\n", ctdb_get_vnn(ctdb), srvid); /* no registered message handler */ return -1; } diff --git a/ctdb/include/ctdb.h b/ctdb/include/ctdb.h index a3bbb1c9978..d5a1b581e54 100644 --- a/ctdb/include/ctdb.h +++ b/ctdb/include/ctdb.h @@ -179,7 +179,7 @@ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALL change the data in a record held with a ctdb_record_handle if the new data is zero length, this implies a delete of the record */ -int ctdb_record_store(struct ctdb_record_handle *rec, TDB_DATA data); +int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data); int ctdb_register_message_handler(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, @@ -187,4 +187,6 @@ int ctdb_register_message_handler(struct ctdb_context *ctdb, ctdb_message_fn_t handler, void *private_data); +struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id); + #endif diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h index 6a96009f7dc..4d4fa0b562d 100644 --- a/ctdb/include/ctdb_private.h +++ b/ctdb/include/ctdb_private.h @@ -193,6 +193,10 @@ struct ctdb_call_state { int redirect_count; struct ctdb_ltdb_header header; void *fetch_private; + struct { + void (*fn)(struct ctdb_call_state *); + void *private; + } async; }; @@ -206,10 +210,14 @@ enum ctdb_operation { CTDB_REQ_DMASTER = 3, CTDB_REPLY_DMASTER = 4, CTDB_REPLY_ERROR = 5, - CTDB_REQ_REGISTER = 6, - CTDB_REQ_MESSAGE = 7, - CTDB_REQ_CONNECT_WAIT = 8, - CTDB_REPLY_CONNECT_WAIT = 9 + CTDB_REQ_MESSAGE = 6, + + /* only used on the domain socket */ + CTDB_REQ_REGISTER = 1000, + CTDB_REQ_CONNECT_WAIT = 1001, + CTDB_REPLY_CONNECT_WAIT = 1002, + CTDB_REQ_FETCH_LOCK = 1003, + CTDB_REPLY_FETCH_LOCK = 1004 }; #define CTDB_MAGIC 0x43544442 /* CTDB */ @@ -294,6 +302,20 @@ struct ctdb_reply_connect_wait { uint32_t num_connected; }; +struct ctdb_req_fetch_lock { + struct ctdb_req_header hdr; + uint32_t db_id; + uint32_t keylen; + uint8_t key[1]; /* key[] */ +}; + +struct ctdb_reply_fetch_lock { + struct ctdb_req_header hdr; + uint32_t state; + uint32_t datalen; + uint8_t data[1]; /* data[] */ +}; + /* internal prototypes */ void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...) PRINTF_ATTRIBUTE(2,3); void ctdb_fatal(struct ctdb_context *ctdb, const char *msg); @@ -400,4 +422,12 @@ int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn, */ void ctdb_daemon_connect_wait(struct ctdb_context *ctdb); + +/* + do a fetch lock from a client to the local daemon +*/ +struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db, + TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data); + #endif diff --git a/ctdb/tests/ctdb_fetch.c b/ctdb/tests/ctdb_fetch.c index da7eeda7ced..eb2f25b9c4b 100644 --- a/ctdb/tests/ctdb_fetch.c +++ b/ctdb/tests/ctdb_fetch.c @@ -87,7 +87,7 @@ static void bench_fetch_1node(struct ctdb_context *ctdb) msg_count, ctdb_get_vnn(ctdb)); data.dsize = strlen((const char *)data.dptr)+1; - ret = ctdb_record_store(rec, data); + ret = ctdb_store_unlock(rec, data); if (ret != 0) { printf("Failed to store record\n"); } diff --git a/ctdb/tests/ctdb_messaging.c b/ctdb/tests/ctdb_messaging.c index 95cfd0c0a22..0fa6bf66d2d 100644 --- a/ctdb/tests/ctdb_messaging.c +++ b/ctdb/tests/ctdb_messaging.c @@ -158,10 +158,10 @@ int main(int argc, const char *argv[]) srvid = num_clients-1; } - /* wait until all nodes are connected (should not be needed - outside of test code) */ ctdb_set_message_handler(ctdb, srvid, message_handler, NULL); + /* wait until all nodes are connected (should not be needed + outside of test code) */ ctdb_connect_wait(ctdb); sleep(3); diff --git a/ctdb/tests/fetch.sh b/ctdb/tests/fetch.sh index 42d5cc17170..acce5fbc65b 100755 --- a/ctdb/tests/fetch.sh +++ b/ctdb/tests/fetch.sh @@ -4,6 +4,6 @@ killall -q ctdb_fetch echo "Trying 2 nodes" bin/ctdb_fetch --nlist tests/nodes.txt --listen 127.0.0.2:9001 $* & -bin/ctdb_fetch --nlist tests/nodes.txt --listen 127.0.0.1:9001 $* +gdb --args bin/ctdb_fetch --nlist tests/nodes.txt --listen 127.0.0.1:9001 $* killall -q ctdb_fetch