]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
initial support for two new pdus for the domain socket to do fetch_lock
authorRonnie sahlberg <ronniesahlberg@gmail.com>
Thu, 12 Apr 2007 05:46:50 +0000 (15:46 +1000)
committerRonnie sahlberg <ronniesahlberg@gmail.com>
Thu, 12 Apr 2007 05:46:50 +0000 (15:46 +1000)
no locking is yet done and the store_unlock call is still missing

the ./tests/fetch.sh --daemon  test fails with parent process dying which needs to be investigated.

(This used to be ctdb commit 7d7141c968950a8856f1be79871932b688bfb07f)

ctdb/common/ctdb_call.c
ctdb/common/ctdb_client.c
ctdb/common/ctdb_daemon.c
ctdb/common/ctdb_message.c
ctdb/include/ctdb.h
ctdb/include/ctdb_private.h
ctdb/tests/ctdb_fetch.c
ctdb/tests/ctdb_messaging.c
ctdb/tests/fetch.sh

index 35ccc4bf0ec9c8e1f7538dbb0cde007149a9395a..5938cacab2a6bf8f59c022db5539db7602ca3b79 100644 (file)
 #include "system/filesys.h"
 #include "../include/ctdb_private.h"
 
+/*
+  find the ctdb_db from a db index
+ */
+ struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
+{
+       struct ctdb_db_context *ctdb_db;
+
+       for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
+               if (ctdb_db->db_id == id) {
+                       break;
+               }
+       }
+       return ctdb_db;
+}
+
+
 /*
   local version of ctdb_call
 */
@@ -38,7 +54,7 @@ static int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *ca
        struct ctdb_call_info *c;
        struct ctdb_registered_call *fn;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
-
+       
        c = talloc(ctdb, struct ctdb_call_info);
        CTDB_NO_MEMORY(ctdb, c);
 
@@ -242,11 +258,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        data.dptr = c->data + c->keylen;
        data.dsize = c->datalen;
 
-       for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
-               if (ctdb_db->db_id == c->db_id) {
-                       break;
-               }
-       }
+       ctdb_db = find_ctdb_db(ctdb, c->db_id);
        if (!ctdb_db) {
                ctdb_send_error(ctdb, hdr, -1,
                                "Unknown database in request. db_id==0x%08x",
@@ -311,11 +323,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        struct ctdb_call call;
        struct ctdb_db_context *ctdb_db;
 
-       for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
-               if (ctdb_db->db_id == c->db_id) {
-                       break;
-               }
-       }
+       ctdb_db = find_ctdb_db(ctdb, c->db_id);
        if (!ctdb_db) {
                ctdb_send_error(ctdb, hdr, -1,
                                "Unknown database in request. db_id==0x%08x",
@@ -409,6 +417,9 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        (void)talloc_reference(state, c);
 
        state->state = CTDB_CALL_DONE;
+       if (state->async.fn) {
+               state->async.fn(state);
+       }
 }
 
 /*
@@ -448,6 +459,9 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn);
 
        state->state = CTDB_CALL_DONE;
+       if (state->async.fn) {
+               state->async.fn(state);
+       }
 }
 
 
@@ -466,6 +480,9 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 
        state->state  = CTDB_CALL_ERROR;
        state->errmsg = (char *)c->msg;
+       if (state->async.fn) {
+               state->async.fn(state);
+       }
 }
 
 
@@ -517,8 +534,24 @@ void ctdb_call_timeout(struct event_context *ev, struct timed_event *te,
        state->state = CTDB_CALL_ERROR;
        ctdb_set_error(state->node->ctdb, "ctdb_call %u timed out",
                       state->c->hdr.reqid);
+       if (state->async.fn) {
+               state->async.fn(state);
+       }
 }
 
+/*
+  this allows the caller to setup a async.fn 
+*/
+static void call_local_trigger(struct event_context *ev, struct timed_event *te, 
+                      struct timeval t, void *private)
+{
+       struct ctdb_call_state *state = talloc_get_type(private, struct ctdb_call_state);
+       if (state->async.fn) {
+               state->async.fn(state);
+       }
+}      
+
+
 /*
   construct an event driven local ctdb_call
 
@@ -546,6 +579,8 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
 
        ret = ctdb_call_local(ctdb_db, &state->call, header, data, ctdb->vnn);
 
+       event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
+
        return state;
 }
 
@@ -717,6 +752,10 @@ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALL
        struct ctdb_call_state *state;
        int ret;
 
+       if (ctdb_db->ctdb->flags & CTDB_FLAG_DAEMON_MODE) {
+               return ctdb_client_fetch_lock(ctdb_db, mem_ctx, key, data);
+       }
+
        ZERO_STRUCT(call);
        call.call_id = CTDB_FETCH_FUNC;
        call.key = key;
@@ -743,7 +782,7 @@ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALL
 }
 
 
-int ctdb_record_store(struct ctdb_record_handle *rec, TDB_DATA data)
+int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data)
 {
        int ret;
        struct ctdb_ltdb_header header;
index 30049b76d8d2db22e99f6becb9c27c112fb43f4d..74e41d0d5b3102b32bfd1e66112fee765f76d905 100644 (file)
@@ -48,6 +48,36 @@ static void ctdb_reply_connect_wait(struct ctdb_context *ctdb,
        ctdb->num_connected = r->num_connected;
 }
 
+/*
+  called in the client when we receive a CTDB_REPLY_FETCH_LOCK from the daemon
+
+  This packet comes in response to a CTDB_REQ_FETCH_LOCK request packet. It
+  contains any reply data from the call
+*/
+void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+       struct ctdb_reply_fetch_lock *c = (struct ctdb_reply_fetch_lock *)hdr;
+       struct ctdb_call_state *state;
+
+       state = idr_find(ctdb->idr, hdr->reqid);
+       if (state == NULL) return;
+
+       state->call.reply_data.dptr = c->data;
+       state->call.reply_data.dsize = c->datalen;
+       state->call.status = c->state;
+
+       talloc_steal(state, c);
+
+       /* get an extra reference here - this prevents the free in ctdb_recv_pkt()
+          from freeing the data */
+       (void)talloc_reference(state, c);
+
+       state->state = CTDB_CALL_DONE;
+       if (state->async.fn) {
+               state->async.fn(state);
+       }
+}
+
 /*
   this is called in the client, when data comes in from the daemon
  */
@@ -89,6 +119,13 @@ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
        case CTDB_REPLY_CONNECT_WAIT:
                ctdb_reply_connect_wait(ctdb, hdr);
                break;
+
+       case CTDB_REPLY_FETCH_LOCK:
+               ctdb_reply_fetch_lock(ctdb, hdr);
+               break;
+
+       default:
+               printf("bogus operation code:%d\n",hdr->operation);
        }
 }
 
@@ -403,3 +440,101 @@ void ctdb_connect_wait(struct ctdb_context *ctdb)
 
        ctdb_client_connect_wait(ctdb);
 }
+
+
+struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db, 
+                                                 TALLOC_CTX *mem_ctx, 
+                                                 TDB_DATA key)
+{
+       struct ctdb_call_state *state;
+       struct ctdb_context *ctdb = ctdb_db->ctdb;
+       struct ctdb_req_fetch_lock *req;
+       int len, res;
+
+       /* if the domain socket is not yet open, open it */
+       if (ctdb->daemon.sd==-1) {
+               ux_socket_connect(ctdb);
+       }
+
+       state = talloc_zero(ctdb_db, struct ctdb_call_state);
+       if (state == NULL) {
+               printf("failed to allocate state\n");
+               return NULL;
+       }
+       state->state   = CTDB_CALL_WAIT;
+       state->ctdb_db = ctdb_db;
+       len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize;
+       state->c = ctdbd_allocate_pkt(ctdb, len);
+       if (state->c == NULL) {
+               printf("failed to allocate packet\n");
+               return NULL;
+       }
+       ZERO_STRUCT(*state->c);
+       talloc_set_name_const(state->c, "ctdbd req_fetch_lock packet");
+       talloc_steal(state, state->c);
+
+       req = (struct ctdb_req_fetch_lock *)state->c;
+       req->hdr.length      = len;
+       req->hdr.ctdb_magic  = CTDB_MAGIC;
+       req->hdr.ctdb_version = CTDB_VERSION;
+       req->hdr.operation   = CTDB_REQ_FETCH_LOCK;
+       req->hdr.reqid       = idr_get_new(ctdb->idr, state, 0xFFFF);
+       req->db_id           = ctdb_db->db_id;
+       req->keylen          = key.dsize;
+       memcpy(&req->key[0], key.dptr, key.dsize);
+       
+       res = ctdb_client_queue_pkt(ctdb, &req->hdr);
+       if (res != 0) {
+               return NULL;
+       }
+
+       talloc_free(req);
+
+       return state;
+}
+
+
+/*
+  make a recv call to the local ctdb daemon - called from client context
+
+  This is called when the program wants to wait for a ctdb_fetch_lock to complete and get the 
+  results. This call will block unless the call has already completed.
+*/
+struct ctdb_record_handle *ctdb_client_fetch_lock_recv(struct ctdb_call_state *state, TALLOC_CTX *mem_ctx, TDB_DATA key)
+{
+       struct ctdb_record_handle *rec;
+
+       while (state->state < CTDB_CALL_DONE) {
+               event_loop_once(state->ctdb_db->ctdb->ev);
+       }
+       if (state->state != CTDB_CALL_DONE) {
+               ctdb_set_error(state->node->ctdb, "%s", state->errmsg);
+               talloc_free(state);
+               return NULL;
+       }
+
+       rec = talloc(mem_ctx, struct ctdb_record_handle);
+       CTDB_NO_MEMORY_NULL(state->ctdb_db->ctdb, rec);
+
+       rec->ctdb_db     = state->ctdb_db;
+       rec->key         = key;
+       rec->key.dptr    = talloc_memdup(rec, key.dptr, key.dsize);
+       rec->data        = talloc(rec, TDB_DATA);
+       rec->data->dsize = state->call.reply_data.dsize;
+       rec->data->dptr  = talloc_memdup(rec, state->call.reply_data.dptr, rec->data->dsize);
+
+       return rec;
+}
+
+struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db, 
+                                                 TALLOC_CTX *mem_ctx, 
+                                                 TDB_DATA key, TDB_DATA *data)
+{
+       struct ctdb_call_state *state;
+       struct ctdb_record_handle *rec;
+
+       state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key);
+       rec = ctdb_client_fetch_lock_recv(state, mem_ctx, key);
+
+       return rec;
+}
index 72ab7ed0b10a154ed84af340a04ba8937ef55c0c..ad9bb325e10ea109a33a986a8d355e31718ae576 100644 (file)
@@ -110,6 +110,91 @@ static void daemon_request_register_message_handler(struct ctdb_client *client,
 }
 
 
+static struct ctdb_call_state *ctdb_fetch_lock_send(struct ctdb_db_context *ctdb_db, 
+                                                   TALLOC_CTX *mem_ctx, 
+                                                   TDB_DATA key, TDB_DATA *data)
+{
+       struct ctdb_call *call;
+       struct ctdb_record_handle *rec;
+       struct ctdb_call_state *state;
+
+       rec = talloc(mem_ctx, struct ctdb_record_handle);
+       CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec);
+
+       
+       call = talloc(rec, struct ctdb_call);
+       ZERO_STRUCT(*call);
+       call->call_id = CTDB_FETCH_FUNC;
+       call->key = key;
+       call->flags = CTDB_IMMEDIATE_MIGRATION;
+
+
+       rec->ctdb_db = ctdb_db;
+       rec->key = key;
+       rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize);
+       rec->data = data;
+
+       state = ctdb_call_send(ctdb_db, call);
+       state->fetch_private = rec;
+
+       return state;
+}
+
+static void daemon_fetch_lock_complete(struct ctdb_call_state *state)
+{
+       struct ctdb_reply_fetch_lock *r;
+       struct ctdb_client *client = talloc_get_type(state->async.private, struct ctdb_client);
+       int length, res;
+
+       length = offsetof(struct ctdb_reply_fetch_lock, data) + state->call.reply_data.dsize;
+       r = ctdbd_allocate_pkt(client->ctdb, length);
+       if (r == NULL) {
+               printf("Failed to allocate reply_call in ctdb daemon\n");
+               return;
+       }
+       ZERO_STRUCT(*r);
+       r->hdr.length       = length;
+       r->hdr.ctdb_magic   = CTDB_MAGIC;
+       r->hdr.ctdb_version = CTDB_VERSION;
+       r->hdr.operation    = CTDB_REPLY_FETCH_LOCK;
+       r->hdr.reqid        = state->c->hdr.reqid;
+       r->state            = state->state;
+       r->datalen          = state->call.reply_data.dsize;
+       memcpy(&r->data[0], state->call.reply_data.dptr, r->datalen);
+
+       res = ctdb_queue_send(client->queue, (uint8_t *)&r->hdr, r->hdr.length);
+       if (res != 0) {
+               printf("Failed to queue packet from daemon to client\n");
+       }
+       talloc_free(r);
+}
+
+/*
+  called when the daemon gets a fetch lock request from a client
+ */
+static void daemon_request_fetch_lock(struct ctdb_client *client, 
+                                       struct ctdb_req_fetch_lock *f)
+{
+       struct ctdb_call_state *state;
+       TDB_DATA key, *data;
+       struct ctdb_db_context *ctdb_db;
+
+       ctdb_db = find_ctdb_db(client->ctdb, f->db_id);
+
+       key.dsize = f->keylen;
+       key.dptr = &f->key[0];
+
+       data        = talloc(client, TDB_DATA);
+       data->dptr  = NULL;
+       data->dsize = 0;
+
+       state = ctdb_fetch_lock_send(ctdb_db, client, key, data);
+       talloc_steal(state, data);
+
+       state->async.fn = daemon_fetch_lock_complete;
+       state->async.private = client;
+}
+
 /*
   called when the daemon gets a connect wait request from a client
  */
@@ -191,11 +276,7 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
        int res;
        uint32_t length;
 
-       for (ctdb_db=client->ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
-               if (ctdb_db->db_id == c->db_id) {
-                       break;
-               }
-       }
+       ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
        if (!ctdb_db) {
                printf("Unknown database in request. db_id==0x%08x",c->db_id);
                return;
@@ -271,6 +352,9 @@ static void client_incoming_packet(struct ctdb_client *client, void *data, size_
        case CTDB_REQ_CONNECT_WAIT:
                daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
                break;
+       case CTDB_REQ_FETCH_LOCK:
+               daemon_request_fetch_lock(client, (struct ctdb_req_fetch_lock *)hdr);
+               break;
        }
 
 done:
index ebba1d8faa24bf7418218547ae8ef0ffe0bcb14b..0ee77f252b65acd576a40b9c6dd92d470f1ea762 100644 (file)
@@ -42,7 +42,7 @@ static int ctdb_dispatch_message(struct ctdb_context *ctdb, uint32_t srvid, TDB_
                if (ml->srvid == srvid) break;
        }
        if (ml == NULL) {
-               printf("no msg handler for srvid=%u\n", srvid);
+               printf("daemon vnn:%d  no msg handler for srvid=%u\n", ctdb_get_vnn(ctdb), srvid);
                /* no registered message handler */
                return -1;
        }
index a3bbb1c99782e70587b68d7fa6aab5ab4a99a3f9..d5a1b581e54bd266d48f3a7ae88c918b1ea6a75e 100644 (file)
@@ -179,7 +179,7 @@ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALL
   change the data in a record held with a ctdb_record_handle
   if the new data is zero length, this implies a delete of the record
  */
-int ctdb_record_store(struct ctdb_record_handle *rec, TDB_DATA data);
+int ctdb_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data);
 
 int ctdb_register_message_handler(struct ctdb_context *ctdb, 
                                  TALLOC_CTX *mem_ctx,
@@ -187,4 +187,6 @@ int ctdb_register_message_handler(struct ctdb_context *ctdb,
                                  ctdb_message_fn_t handler,
                                  void *private_data);
 
+struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id);
+
 #endif
index 6a96009f7dc2787deeea17422b16a753aa8ccbd3..4d4fa0b562dfa60e0f7e417ff61ce660c078fd0a 100644 (file)
@@ -193,6 +193,10 @@ struct ctdb_call_state {
        int redirect_count;
        struct ctdb_ltdb_header header;
        void *fetch_private;
+       struct {
+               void (*fn)(struct ctdb_call_state *);
+               void *private;
+       } async;
 };
 
 
@@ -206,10 +210,14 @@ enum ctdb_operation {
        CTDB_REQ_DMASTER        = 3,
        CTDB_REPLY_DMASTER      = 4,
        CTDB_REPLY_ERROR        = 5,
-       CTDB_REQ_REGISTER       = 6,
-       CTDB_REQ_MESSAGE        = 7,
-       CTDB_REQ_CONNECT_WAIT   = 8,
-       CTDB_REPLY_CONNECT_WAIT = 9
+       CTDB_REQ_MESSAGE        = 6,
+       
+       /* only used on the domain socket */
+       CTDB_REQ_REGISTER       = 1000,     
+       CTDB_REQ_CONNECT_WAIT   = 1001,
+       CTDB_REPLY_CONNECT_WAIT = 1002,
+       CTDB_REQ_FETCH_LOCK     = 1003,
+       CTDB_REPLY_FETCH_LOCK   = 1004
 };
 
 #define CTDB_MAGIC 0x43544442 /* CTDB */
@@ -294,6 +302,20 @@ struct ctdb_reply_connect_wait {
        uint32_t num_connected;
 };
 
+struct ctdb_req_fetch_lock {
+       struct ctdb_req_header hdr;
+       uint32_t db_id;
+       uint32_t keylen;
+       uint8_t key[1]; /* key[] */
+};
+
+struct ctdb_reply_fetch_lock {
+       struct ctdb_req_header hdr;
+       uint32_t state;
+       uint32_t datalen;
+       uint8_t data[1]; /* data[] */
+};
+
 /* internal prototypes */
 void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...) PRINTF_ATTRIBUTE(2,3);
 void ctdb_fatal(struct ctdb_context *ctdb, const char *msg);
@@ -400,4 +422,12 @@ int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn,
 */
 void ctdb_daemon_connect_wait(struct ctdb_context *ctdb);
 
+
+/*
+  do a fetch lock from a client to the local daemon
+*/
+struct ctdb_record_handle *ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db, 
+                                                 TALLOC_CTX *mem_ctx, 
+                                                 TDB_DATA key, TDB_DATA *data);
+
 #endif
index da7eeda7cedf32a62f4f334eead22bb19bc5ed66..eb2f25b9c4b67e40bd3df6a2faca38766e074005 100644 (file)
@@ -87,7 +87,7 @@ static void bench_fetch_1node(struct ctdb_context *ctdb)
                                                      msg_count, ctdb_get_vnn(ctdb));
        data.dsize = strlen((const char *)data.dptr)+1;
 
-       ret = ctdb_record_store(rec, data);
+       ret = ctdb_store_unlock(rec, data);
        if (ret != 0) {
                printf("Failed to store record\n");
        }
index 95cfd0c0a22e820dd61f81238f685cda2a2093bd..0fa6bf66d2d1470ab60f603075507991d2691c8a 100644 (file)
@@ -158,10 +158,10 @@ int main(int argc, const char *argv[])
                srvid = num_clients-1;
        }
 
-       /* wait until all nodes are connected (should not be needed
-          outside of test code) */
        ctdb_set_message_handler(ctdb, srvid, message_handler, NULL);
 
+       /* wait until all nodes are connected (should not be needed
+          outside of test code) */
        ctdb_connect_wait(ctdb);
 
        sleep(3);
index 42d5cc1717051fbb3a5ecefd68230c02c681fc22..acce5fbc65bfbeba0ed5d56cdc119cd41d8ce74a 100755 (executable)
@@ -4,6 +4,6 @@ killall -q ctdb_fetch
 
 echo "Trying 2 nodes"
 bin/ctdb_fetch --nlist tests/nodes.txt --listen 127.0.0.2:9001 $* &
-bin/ctdb_fetch --nlist tests/nodes.txt --listen 127.0.0.1:9001 $*
+gdb --args bin/ctdb_fetch --nlist tests/nodes.txt --listen 127.0.0.1:9001 $* 
 
 killall -q ctdb_fetch