]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
changed the way set_call and attach are done so that you can safely
authorAndrew Tridgell <tridge@samba.org>
Mon, 30 Apr 2007 13:31:40 +0000 (15:31 +0200)
committerAndrew Tridgell <tridge@samba.org>
Mon, 30 Apr 2007 13:31:40 +0000 (15:31 +0200)
attach to databases after the protocol has started. The daemon
broadcasts information on new databases to the other daemons.

This also eliminates the need for the client to know about the hash
between db name and db_id.

(This used to be ctdb commit 3bad91a9d987d4c09fe3322eac23c2733660ad08)

15 files changed:
ctdb/common/ctdb.c
ctdb/common/ctdb_client.c
ctdb/common/ctdb_control.c
ctdb/common/ctdb_daemon.c
ctdb/common/ctdb_ltdb.c
ctdb/common/ctdb_message.c
ctdb/common/ctdb_util.c
ctdb/direct/ctdbd.c
ctdb/include/ctdb.h
ctdb/include/ctdb_private.h
ctdb/tests/ctdb_bench.c
ctdb/tests/ctdb_fetch.c
ctdb/tests/ctdb_fetch1.c
ctdb/tests/ctdb_messaging.c
ctdb/tests/ctdb_test.c

index f2cc65b52babcd70f1ed994ff97209c1130bbc0b..69e69ec8ca2d9cc82b82d76f45a322f1204aac74 100644 (file)
@@ -184,22 +184,6 @@ int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
        ctdb->daemon.name = talloc_strdup(ctdb, socketname);
        return 0;
 }
-
-/*
-  add a node to the list of active nodes
-*/
-int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, int id)
-{
-       struct ctdb_registered_call *call;
-
-       call = talloc(ctdb_db, struct ctdb_registered_call);
-       call->fn = fn;
-       call->id = id;
-
-       DLIST_ADD(ctdb_db->calls, call);        
-       return 0;
-}
-
 /*
   return the vnn of this node
 */
@@ -420,12 +404,31 @@ static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header
 #endif
 }
 
+
+/*
+  broadcast a packet to all nodes
+*/
+static void ctdb_broadcast_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+       int i;
+       for (i=0;i<ctdb_get_num_nodes(ctdb);i++) {
+               hdr->destnode = ctdb->nodes[i]->vnn;
+               ctdb_queue_packet(ctdb, hdr);
+       }
+}
+
 /*
   queue a packet or die
 */
 void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 {
        struct ctdb_node *node;
+
+       if (hdr->destnode == CTDB_BROADCAST_VNN) {
+               ctdb_broadcast_packet(ctdb, hdr);
+               return;
+       }
+
        ctdb->status.node_packets_sent++;
 
        if (!ctdb_validate_vnn(ctdb, hdr->destnode)) {
index 7257dd5d2996acc23fe7224ef86f4e71f47807dd..53302c3bfdb94180029fff05151894e88efb390d 100644 (file)
@@ -647,7 +647,7 @@ static void ctdb_client_reply_control(struct ctdb_context *ctdb,
   send a ctdb control message
  */
 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
-                uint32_t opcode, TDB_DATA data, 
+                uint32_t opcode, uint32_t flags, TDB_DATA data, 
                 TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status)
 {
        struct ctdb_client_control_state *state;
@@ -675,6 +675,7 @@ int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid,
        c->hdr.destnode     = destnode;
        c->hdr.reqid        = state->reqid;
        c->opcode           = opcode;
+       c->flags            = flags;
        c->srvid            = srvid;
        c->datalen          = data.dsize;
        if (data.dsize) {
@@ -687,6 +688,11 @@ int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid,
                return -1;
        }
 
+       if (flags & CTDB_CTRL_FLAG_NOREPLY) {
+               talloc_free(state);
+               return 0;
+       }
+
        /* semi-async operation */
        while (state->state == CTDB_CALL_WAIT) {
                event_loop_once(ctdb->ev);
@@ -719,7 +725,7 @@ int ctdb_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
        data.dsize = sizeof(pid);
 
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_PROCESS_EXISTS, data, 
+                          CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
                           NULL, NULL, &status);
        if (ret != 0) {
                DEBUG(0,(__location__ " ctdb_control for process_exists failed\n"));
@@ -740,7 +746,7 @@ int ctdb_status(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_status
 
        ZERO_STRUCT(data);
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_STATUS, data, 
+                          CTDB_CONTROL_STATUS, 0, data, 
                           ctdb, &data, &res);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for status failed\n"));
@@ -770,7 +776,7 @@ int ctdb_getvnnmap(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_vnn
 
        ZERO_STRUCT(data);
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_GETVNNMAP, data, 
+                          CTDB_CONTROL_GETVNNMAP, 0, data, 
                           ctdb, &outdata, &res);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for getvnnmap failed\n"));
@@ -802,7 +808,7 @@ int ctdb_getdbmap(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_dbid
 
        ZERO_STRUCT(data);
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_GET_DBMAP, data, 
+                          CTDB_CONTROL_GET_DBMAP, 0, data, 
                           ctdb, &outdata, &res);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for getvnnmap failed\n"));
@@ -840,7 +846,7 @@ int ctdb_getnodemap(struct ctdb_context *ctdb, uint32_t destnode,
        ZERO_STRUCT(data);
        ZERO_STRUCT(*nodemap);
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_GET_NODEMAP, data, 
+                          CTDB_CONTROL_GET_NODEMAP, 0, data, 
                           ctdb, &outdata, &res);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for getnodes failed\n"));
@@ -879,7 +885,7 @@ int ctdb_setvnnmap(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_vnn
        }
 
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_SETVNNMAP, *data, 
+                          CTDB_CONTROL_SETVNNMAP, 0, *data, 
                           ctdb, &outdata, &res);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for setvnnmap failed\n"));
@@ -900,7 +906,8 @@ int ctdb_ping(struct ctdb_context *ctdb, uint32_t destnode)
        TDB_DATA data;
 
        ZERO_STRUCT(data);
-       ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, data, NULL, NULL, &res);
+       ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
+                          data, NULL, NULL, &res);
        if (ret != 0) {
                return -1;
        }
@@ -918,8 +925,8 @@ int ctdb_get_config(struct ctdb_context *ctdb)
        struct ctdb_context c;
 
        ZERO_STRUCT(data);
-       ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CONFIG, data, 
-                          ctdb, &data, &res);
+       ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CONFIG, 0,
+                          data, ctdb, &data, &res);
        if (ret != 0 || res != 0) {
                return -1;
        }
@@ -953,7 +960,7 @@ int ctdb_getdbpath(struct ctdb_context *ctdb, uint32_t dbid, TALLOC_CTX *mem_ctx
        data.dsize = sizeof(dbid);
 
        ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
-                          CTDB_CONTROL_GETDBPATH, data, 
+                          CTDB_CONTROL_GETDBPATH, 0, data, 
                           mem_ctx, &data, &res);
        if (ret != 0 || res != 0) {
                return -1;
@@ -979,7 +986,7 @@ int ctdb_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, uint32_t *
        TDB_DATA data;
 
        ZERO_STRUCT(data);
-       ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, data, 
+       ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, data, 
                           ctdb, &data, &res);
        if (ret != 0 || res != 0) {
                return -1;
@@ -1006,7 +1013,7 @@ int ctdb_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, uint32_t l
        data.dptr = (uint8_t *)&level;
        data.dsize = sizeof(level);
 
-       ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, data, 
+       ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
                           NULL, NULL, &res);
        if (ret != 0 || res != 0) {
                return -1;
@@ -1065,7 +1072,7 @@ int ctdb_status_reset(struct ctdb_context *ctdb, uint32_t destnode)
 
        ZERO_STRUCT(data);
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_STATUS_RESET, data, 
+                          CTDB_CONTROL_STATUS_RESET, 0, data, 
                           NULL, NULL, &res);
        if (ret != 0 || res != 0) {
                DEBUG(0,(__location__ " ctdb_control for reset status failed\n"));
@@ -1073,3 +1080,89 @@ int ctdb_status_reset(struct ctdb_context *ctdb, uint32_t destnode)
        }
        return 0;
 }
+
+
+/*
+  attach to a specific database - client call
+*/
+struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name)
+{
+       struct ctdb_db_context *ctdb_db;
+       TDB_DATA data;
+       int ret;
+       int32_t res;
+
+       ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
+       CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
+
+       ctdb_db->ctdb = ctdb;
+       ctdb_db->db_name = talloc_strdup(ctdb_db, name);
+       CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
+
+       data.dptr = discard_const(name);
+       data.dsize = strlen(name)+1;
+
+       /* tell ctdb daemon to attach */
+       ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_ATTACH,
+                          0, data, ctdb_db, &data, &res);
+       if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
+               DEBUG(0,("Failed to attach to database '%s'\n", name));
+               talloc_free(ctdb_db);
+               return NULL;
+       }
+       
+       ctdb_db->db_id = *(uint32_t *)data.dptr;
+
+       ret = ctdb_getdbpath(ctdb, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
+       if (ret != 0) {
+               DEBUG(0,("Failed to get dbpath for database '%s'\n", name));
+               talloc_free(ctdb_db);
+               return NULL;
+       }
+
+       ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, 0, O_RDWR, 0);
+       if (ctdb_db->ltdb == NULL) {
+               ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
+               talloc_free(ctdb_db);
+               return NULL;
+       }
+
+       DLIST_ADD(ctdb->db_list, ctdb_db);
+
+       return ctdb_db;
+}
+
+
+/*
+  setup a call for a database
+ */
+int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
+{
+       TDB_DATA data;
+       int32_t status;
+       struct ctdb_control_set_call c;
+       int ret;
+       struct ctdb_registered_call *call;
+
+       c.db_id = ctdb_db->db_id;
+       c.fn    = fn;
+       c.id    = id;
+
+       data.dptr = (uint8_t *)&c;
+       data.dsize = sizeof(c);
+
+       ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
+                          data, NULL, NULL, &status);
+       if (ret != 0 || status != 0) {
+               DEBUG(0,("ctdb_set_call failed for call %u\n", id));
+               return -1;
+       }
+
+       /* also register locally */
+       call = talloc(ctdb_db, struct ctdb_registered_call);
+       call->fn = fn;
+       call->id = id;
+
+       DLIST_ADD(ctdb_db->calls, call);        
+       return 0;
+}
index b24c2686cc470a6d3995fb7f96a6d61f8940ffa8..7bc705d60a1664f9f8f63017679833a67b127b47 100644 (file)
@@ -41,6 +41,8 @@ struct ctdb_control_state {
  } \
  } while (0)
 
+
+
 /*
   process a control request
  */
@@ -197,6 +199,16 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
                return 0;
        }
 
+       case CTDB_CONTROL_DB_ATTACH:
+               return ctdb_control_db_attach(ctdb, indata, outdata);
+
+       case CTDB_CONTROL_SET_CALL: {
+               struct ctdb_control_set_call *c = 
+                       (struct ctdb_control_set_call *)indata.dptr;
+               CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_set_call));
+               return ctdb_daemon_set_call(ctdb, c->db_id, c->fn, c->id);
+       }
+
        default:
                DEBUG(0,(__location__ " Unknown CTDB control opcode %u\n", opcode));
                return -1;
@@ -220,6 +232,11 @@ void ctdb_request_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        outdata = talloc_zero(c, TDB_DATA);
        status = ctdb_control_dispatch(ctdb, c->opcode, data, outdata);
 
+       /* some controls send no reply */
+       if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
+               return;
+       }
+
        len = offsetof(struct ctdb_reply_control, data) + outdata->dsize;
        r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CONTROL, len, struct ctdb_reply_control);
        CTDB_NO_MEMORY_VOID(ctdb, r);
@@ -276,7 +293,8 @@ static int ctdb_control_destructor(struct ctdb_control_state *state)
   send a control message to a node
  */
 int ctdb_daemon_send_control(struct ctdb_context *ctdb, uint32_t destnode,
-                            uint64_t srvid, uint32_t opcode, TDB_DATA data,
+                            uint64_t srvid, uint32_t opcode, uint32_t flags,
+                            TDB_DATA data,
                             ctdb_control_callback_fn_t callback,
                             void *private_data)
 {
@@ -284,6 +302,11 @@ int ctdb_daemon_send_control(struct ctdb_context *ctdb, uint32_t destnode,
        struct ctdb_control_state *state;
        size_t len;
 
+       if (destnode == CTDB_BROADCAST_VNN && !(flags & CTDB_CTRL_FLAG_NOREPLY)) {
+               DEBUG(0,("Attempt to broadcast control without NOREPLY\n"));
+               return -1;
+       }
+
        state = talloc(ctdb, struct ctdb_control_state);
        CTDB_NO_MEMORY(ctdb, state);
 
@@ -303,14 +326,20 @@ int ctdb_daemon_send_control(struct ctdb_context *ctdb, uint32_t destnode,
        c->hdr.destnode     = destnode;
        c->hdr.reqid        = state->reqid;
        c->opcode           = opcode;
+       c->flags            = flags;
        c->srvid            = srvid;
        c->datalen          = data.dsize;
        if (data.dsize) {
                memcpy(&c->data[0], data.dptr, data.dsize);
        }
-       
+
        ctdb_queue_packet(ctdb, &c->hdr);       
 
+       if (flags & CTDB_CTRL_FLAG_NOREPLY) {
+               talloc_free(state);
+               return 0;
+       }
+
 #if CTDB_REQ_TIMEOUT
        event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), 
                        ctdb_control_timeout, state);
index a6d60fc00e557ba6e527a0666cb3d33eb909fd9b..e994e76147eafa7c22d062550d9cd8a42fc52028 100644 (file)
@@ -848,10 +848,33 @@ static void daemon_request_control_from_client(struct ctdb_client *client,
        data.dptr = &c->data[0];
        data.dsize = c->datalen;
        res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
-                                      c->srvid, c->opcode, data, daemon_control_callback,
+                                      c->srvid, c->opcode, c->flags,
+                                      data, daemon_control_callback,
                                       state);
        if (res != 0) {
                DEBUG(0,(__location__ " Failed to send control to remote node %u\n",
                         c->hdr.destnode));
        }
 }
+
+/*
+  register a call function
+*/
+int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
+                        ctdb_fn_t fn, int id)
+{
+       struct ctdb_registered_call *call;
+       struct ctdb_db_context *ctdb_db;
+
+       ctdb_db = find_ctdb_db(ctdb, db_id);
+       if (ctdb_db == NULL) {
+               return -1;
+       }
+
+       call = talloc(ctdb_db, struct ctdb_registered_call);
+       call->fn = fn;
+       call->id = id;
+
+       DLIST_ADD(ctdb_db->calls, call);        
+       return 0;
+}
index 6606ea1f31f08e8311f4a04a413d73dafa00909a..2cf90a177c86fed9b0bd988c386fa6c414c2d394 100644 (file)
@@ -51,74 +51,6 @@ static int ctdb_null_func(struct ctdb_call_info *call)
 }
 
 
-/*
-  attach to a specific database
-*/
-struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags, 
-                                   int open_flags, mode_t mode)
-{
-       struct ctdb_db_context *ctdb_db, *tmp_db;
-       TDB_DATA data;
-       int ret;
-
-       ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
-       CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
-
-       ctdb_db->ctdb = ctdb;
-       ctdb_db->db_name = talloc_strdup(ctdb_db, name);
-       CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
-
-       data.dptr = discard_const(name);
-       data.dsize = strlen(name);
-       ctdb_db->db_id = ctdb_hash(&data);
-
-       for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
-               if (tmp_db->db_id == ctdb_db->db_id) {
-                       ctdb_set_error(ctdb, "CTDB database hash collission '%s' : '%s'",
-                                       name, tmp_db->db_name);
-                       talloc_free(ctdb_db);
-                       return NULL;
-               }
-       }
-
-       if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
-               DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n", 
-                        ctdb->db_directory));
-               talloc_free(ctdb_db);
-               return NULL;
-       }
-
-       /* add the node id to the database name, so when we run on loopback
-          we don't conflict in the local filesystem */
-       ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s", ctdb->db_directory, name);
-
-       /* when we have a separate daemon this will need to be a real
-          file, not a TDB_INTERNAL, so the parent can access it to
-          for ltdb bypass */
-       ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, 
-                                     TDB_CLEAR_IF_FIRST, open_flags, mode);
-       if (ctdb_db->ltdb == NULL) {
-               ctdb_set_error(ctdb, "Failed to open tdb %s\n", name);
-               talloc_free(ctdb_db);
-               return NULL;
-       }
-
-
-       /* 
-          all databases support the "null" function. we need this in
-          order to do forced migration of records
-        */
-       ret = ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
-       if (ret != 0) {
-               talloc_free(ctdb_db);
-               return NULL;
-       }
-
-       DLIST_ADD(ctdb->db_list, ctdb_db);
-
-       return ctdb_db;
-}
-
 /*
   return the lmaster given a key
 */
@@ -353,3 +285,87 @@ int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
        }
        return ret;
 }
+
+
+/*
+  a client has asked to attach a new database
+ */
+int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
+                              TDB_DATA *outdata)
+{
+       const char *db_name = (const char *)indata.dptr;
+       struct ctdb_db_context *ctdb_db, *tmp_db;
+       int ret;
+
+       /* see if we already have this name */
+       for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
+               if (strcmp(db_name, tmp_db->db_name) == 0) {
+                       /* this is not an error */
+                       return 0;
+               }
+       }
+
+       ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
+       CTDB_NO_MEMORY(ctdb, ctdb_db);
+
+       ctdb_db->ctdb = ctdb;
+       ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
+       CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
+
+       ctdb_db->db_id = ctdb_hash(&indata);
+
+       outdata->dptr  = (uint8_t *)&ctdb_db->db_id;
+       outdata->dsize = sizeof(ctdb_db->db_id);
+
+       /* check for hash collisions */
+       for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
+               if (tmp_db->db_id == ctdb_db->db_id) {
+                       DEBUG(0,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
+                                db_name, tmp_db->db_name));
+                       talloc_free(ctdb_db);
+                       return -1;
+               }
+       }
+
+       /* make sure the db directory exists */
+       if (mkdir(ctdb->db_directory, 0700) == -1 && errno != EEXIST) {
+               DEBUG(0,(__location__ " Unable to create ctdb directory '%s'\n", 
+                        ctdb->db_directory));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+
+       /* open the database */
+       ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s", 
+                                          ctdb->db_directory, db_name);
+
+       ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, 
+                                     TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
+       if (ctdb_db->ltdb == NULL) {
+               DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+       
+       DLIST_ADD(ctdb->db_list, ctdb_db);
+
+       /* 
+          all databases support the "null" function. we need this in
+          order to do forced migration of records
+       */
+       ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
+       if (ret != 0) {
+               DEBUG(0,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+       
+       /* tell all the other nodes about this database */
+       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNN, 0,
+                                CTDB_CONTROL_DB_ATTACH, CTDB_CTRL_FLAG_NOREPLY,
+                                indata, NULL, NULL);
+
+       /* success */
+       return 0;
+}
+
index 11d87b09e7e40cd8854274352f4d3caa571c9c61..abc1fb9dd89ba90e7cc97a58cd36c33809d321e9 100644 (file)
@@ -129,27 +129,7 @@ int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn,
        r->datalen       = data.dsize;
        memcpy(&r->data[0], data.dptr, data.dsize);
 
-       if (vnn != CTDB_BROADCAST_VNN) {
-               ctdb_queue_packet(ctdb, &r->hdr);
-       } else {
-               struct ctdb_node *node;
-               int i;
-
-               /* this was a broadcast message
-                  loop over all other nodes and send them each a copy
-               */
-               for (i=0; i<ctdb_get_num_nodes(ctdb); i++) {
-                       node=ctdb->nodes[i];
-
-                       /* we do not send the message to ourself */
-                       if (node && node->vnn!=ctdb->vnn) {
-                               r->hdr.destnode = node->vnn;
-                               ctdb_queue_packet(ctdb, &r->hdr);
-                       }
-               }
-               /* also make sure to dispatch the message locally */
-               ctdb_dispatch_message(ctdb, srvid, data);
-       }
+       ctdb_queue_packet(ctdb, &r->hdr);
 
        talloc_free(r);
        return 0;
index b74162bb3f5da60a421d5931117e3437f35ac26e..41d4fe5de415fb001f1b5b54166739f71b15aa83 100644 (file)
@@ -102,6 +102,17 @@ uint32_t ctdb_hash(const TDB_DATA *key)
        return (1103515243 * value + 12345);  
 }
 
+/*
+  hash function for a string
+*/
+uint32_t ctdb_hash_string(const char *str)
+{
+       TDB_DATA data;
+       data.dptr = (uint8_t *)discard_const(str);
+       data.dsize = strlen(str)+1;
+       return ctdb_hash(&data);
+}
+
 /*
   a type checking varient of idr_find
  */
index bf30ad86d6d2f73a0f25ccb853ebe9b9dd7d53a3..2ed1ce5ae6a8e9dfdd656363576d492d187635e4 100644 (file)
@@ -45,13 +45,10 @@ static void block_signal(int signum)
 int main(int argc, const char *argv[])
 {
        struct ctdb_context *ctdb;
-       const char *db_list = "test.tdb";
-       char *s, *tok;
 
        struct poptOption popt_options[] = {
                POPT_AUTOHELP
                POPT_CTDB_CMDLINE
-               { "dblist", 0, POPT_ARG_STRING, &db_list, 0, "list of databases", NULL },
                POPT_TABLEEND
        };
        int opt;
@@ -92,20 +89,6 @@ int main(int argc, const char *argv[])
                talloc_free(name);
        }
 
-       /* attach to the list of databases */
-       s = talloc_strdup(ctdb, db_list);
-       for (tok=strtok(s, ", "); tok; tok=strtok(NULL, ", ")) {
-               struct ctdb_db_context *ctdb_db;
-               ctdb_db = ctdb_attach(ctdb, tok, TDB_DEFAULT, 
-                                     O_RDWR|O_CREAT|O_TRUNC, 0666);
-               if (!ctdb_db) {
-                       DEBUG(0,("ctdb_attach to '%s'failed - %s\n", tok, 
-                                ctdb_errstr(ctdb)));
-                       exit(1);
-               }
-               DEBUG(1, ("Attached to database '%s'\n", tok));
-       }
-
        /* start the protocol running (as a child) */
        return ctdb_start_daemon(ctdb);
 }
index d0af61843170aef47a8422c29bd7d4f7ed20bc21..ce2b8bdf27bf9d7aeaa984b847e2bd9b600b5e27 100644 (file)
@@ -112,8 +112,7 @@ int ctdb_start_daemon(struct ctdb_context *ctdb);
 /*
   attach to a ctdb database
 */
-struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags, 
-                                   int open_flags, mode_t mode);
+struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name);
 
 /*
   find an attached ctdb_db handle given a name
@@ -131,7 +130,7 @@ typedef int (*ctdb_fn_t)(struct ctdb_call_info *);
 /*
   setup a ctdb call function
 */
-int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, int id);
+int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id);
 
 
 
index 4850003542739282d6a17612f871c1326dbd2579..1f56776c9a59aa9ca0cd77cb6391714b543c0549 100644 (file)
@@ -254,7 +254,19 @@ enum ctdb_controls {CTDB_CONTROL_PROCESS_EXISTS,
                    CTDB_CONTROL_SET_DEBUG,
                    CTDB_CONTROL_GET_DBMAP,
                    CTDB_CONTROL_GET_NODEMAP,
-                   CTDB_CONTROL_STATUS_RESET};
+                   CTDB_CONTROL_STATUS_RESET,
+                   CTDB_CONTROL_DB_ATTACH,
+                   CTDB_CONTROL_SET_CALL};
+
+/*
+  structure passed in set_call control
+ */
+struct ctdb_control_set_call {
+       uint32_t db_id;
+       ctdb_fn_t fn;
+       uint32_t id;
+};
+
 
 enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR};
 
@@ -409,6 +421,8 @@ struct ctdb_req_control {
        struct ctdb_req_header hdr;
        uint32_t opcode;
        uint64_t srvid;
+#define CTDB_CTRL_FLAG_NOREPLY 1
+       uint32_t flags;
        uint32_t datalen;
        uint8_t data[1];
 };
@@ -429,6 +443,7 @@ int ctdb_parse_address(struct ctdb_context *ctdb,
                       TALLOC_CTX *mem_ctx, const char *str,
                       struct ctdb_address *address);
 uint32_t ctdb_hash(const TDB_DATA *key);
+uint32_t ctdb_hash_string(const char *str);
 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
 void ctdb_request_message(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
@@ -586,8 +601,15 @@ void ctdb_request_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
 void ctdb_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
 
 int ctdb_daemon_send_control(struct ctdb_context *ctdb, uint32_t destnode,
-                            uint64_t srvid, uint32_t opcode, TDB_DATA data,
+                            uint64_t srvid, uint32_t opcode, uint32_t flags,
+                            TDB_DATA data,
                             ctdb_control_callback_fn_t callback,
                             void *private_data);
 
+int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata, 
+                              TDB_DATA *outdata);
+
+int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
+                        ctdb_fn_t fn, int id);
+
 #endif
index 296bfc48cd3d820429f6ac9e61cdb20ea21cb49e..78f1ea2373e4ea489fee8e9d69ebdaa7b3d4c541 100644 (file)
@@ -190,8 +190,11 @@ int main(int argc, const char *argv[])
        /* initialise ctdb */
        ctdb = ctdb_cmdline_init(ev);
 
+       /* start the protocol running */
+       ret = ctdb_start(ctdb);
+
        /* attach to a specific database */
-       ctdb_db = ctdb_attach(ctdb, "test.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666);
+       ctdb_db = ctdb_attach(ctdb, "test.tdb");
        if (!ctdb_db) {
                printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
                exit(1);
@@ -201,9 +204,6 @@ int main(int argc, const char *argv[])
        ret = ctdb_set_call(ctdb_db, incr_func,  FUNC_INCR);
        ret = ctdb_set_call(ctdb_db, fetch_func, FUNC_FETCH);
 
-       /* start the protocol running */
-       ret = ctdb_start(ctdb);
-
        if (ctdb_set_message_handler(ctdb, 0, ring_message_handler,&msg_count))
                goto error;
 
index 2af25ed2c18449c750f8976492cb3391d4e14197..7738b19cae70da2ad8bb7e6b1e4d4d67a4c23cec 100644 (file)
@@ -210,8 +210,11 @@ int main(int argc, const char *argv[])
 
        ctdb = ctdb_cmdline_init(ev);
 
+       /* start the protocol running */
+       ret = ctdb_start(ctdb);
+
        /* attach to a specific database */
-       ctdb_db = ctdb_attach(ctdb, "test.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666);
+       ctdb_db = ctdb_attach(ctdb, "test.tdb");
        if (!ctdb_db) {
                printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
                exit(1);
@@ -219,9 +222,6 @@ int main(int argc, const char *argv[])
 
        ret = ctdb_set_call(ctdb_db, fetch_func, FUNC_FETCH);
 
-       /* start the protocol running */
-       ret = ctdb_start(ctdb);
-
        ctdb_set_message_handler(ctdb, 0, message_handler, &msg_count);
 
        /* wait until all nodes are connected (should not be needed
index bfe340ea72578f670c3f7b2ce134dea28ce8d868..ef3b197e500fa4bfd3edb15cf3011c0a95039f75 100644 (file)
@@ -195,22 +195,16 @@ int main(int argc, const char *argv[])
                exit(1);
        }
 
+       /* start the protocol running */
+       ret = ctdb_start(ctdb);
+
        /* attach to a specific database */
-       ctdb_db = ctdb_attach(ctdb, "test.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666);
+       ctdb_db = ctdb_attach(ctdb, "test.tdb");
        if (!ctdb_db) {
                printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
                exit(1);
        }
 
-       /* start the protocol running */
-       ret = ctdb_start(ctdb);
-
-#if 0
-       /* wait until all nodes are connected (should not be needed
-          outside of test code) */
-       ctdb_connect_wait(ctdb);
-#endif
-
        /*
           start two child processes
         */
index 84f187e512afe228966ad77aa2a634a372d233fc..04e6ae39e3ae80f81876420a0cb2b397928a8cc9 100644 (file)
@@ -96,16 +96,16 @@ int main(int argc, const char *argv[])
                exit(1);
        }
 
+       /* start the protocol running */
+       ret = ctdb_start(ctdb);
+
        /* attach to a specific database */
-       ctdb_db = ctdb_attach(ctdb, "test.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666);
+       ctdb_db = ctdb_attach(ctdb, "test.tdb");
        if (!ctdb_db) {
                printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
                exit(1);
        }
 
-       /* start the protocol running */
-       ret = ctdb_start(ctdb);
-
        srvid = -1;
        for (i=0;i<num_clients-1;i++) {
                pid=fork();
index 5e86041b744b95c7c6d3f84e736714f1982bc468..aa19213f5690490781cd24b0f2a93908c5c880ff 100644 (file)
@@ -121,8 +121,11 @@ int main(int argc, const char *argv[])
                exit(1);
        }
 
+       /* start the protocol running */
+       ret = ctdb_start(ctdb);
+
        /* attach to a specific database */
-       ctdb_db = ctdb_attach(ctdb, "test.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666);
+       ctdb_db = ctdb_attach(ctdb, "test.tdb");
        if (!ctdb_db) {
                printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
                exit(1);
@@ -132,9 +135,6 @@ int main(int argc, const char *argv[])
        ret = ctdb_set_call(ctdb_db, sort_func,  FUNC_SORT);
        ret = ctdb_set_call(ctdb_db, fetch_func, FUNC_FETCH);
 
-       /* start the protocol running */
-       ret = ctdb_start(ctdb);
-
        ctdb_connect_wait(ctdb);
 
        /* find the full path to the database file */