]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
added support for persistent databases in ctdbd
authorAndrew Tridgell <tridge@samba.org>
Fri, 21 Sep 2007 02:24:02 +0000 (12:24 +1000)
committerAndrew Tridgell <tridge@samba.org>
Fri, 21 Sep 2007 02:24:02 +0000 (12:24 +1000)
(This used to be ctdb commit 3115090a0d882beca9d70761130b74bb0821f201)

21 files changed:
ctdb/Makefile.in
ctdb/client/ctdb_client.c
ctdb/common/ctdb_util.c
ctdb/include/ctdb.h
ctdb/include/ctdb_private.h
ctdb/server/ctdb_control.c
ctdb/server/ctdb_daemon.c
ctdb/server/ctdb_ltdb_server.c
ctdb/server/ctdb_persistent.c [new file with mode: 0644]
ctdb/server/ctdb_recover.c
ctdb/server/ctdb_recoverd.c
ctdb/server/ctdb_server.c
ctdb/server/ctdb_traverse.c
ctdb/server/ctdbd.c
ctdb/tests/ctdb_bench.c
ctdb/tests/ctdb_fetch.c
ctdb/tests/ctdb_persistent.c [new file with mode: 0644]
ctdb/tests/ctdb_store.c
ctdb/tests/persistent.sh [new file with mode: 0755]
ctdb/tests/start_daemons.sh
ctdb/tools/ctdb.c

index a92f7c8124a486bdb75c99b6c9dc9e3e06920491..05ab99a7ef25baaaebdbd54874be7d8a7b2dc432 100644 (file)
@@ -48,10 +48,10 @@ CTDB_SERVER_OBJ = server/ctdbd.o server/ctdb_daemon.o server/ctdb_lockwait.o \
        server/ctdb_tunables.o server/ctdb_monitor.o server/ctdb_server.o \
        server/ctdb_control.o server/ctdb_call.o server/ctdb_ltdb_server.o \
        server/ctdb_traverse.o server/eventscript.o server/ctdb_takeover.o \
-       server/ctdb_serverids.o \
+       server/ctdb_serverids.o server/ctdb_persistent.o \
        $(CTDB_CLIENT_OBJ) $(CTDB_TCP_OBJ) @INFINIBAND_WRAPPER_OBJ@
 
-TEST_BINS=bin/ctdb_bench bin/ctdb_fetch bin/ctdb_store bin/rb_test \
+TEST_BINS=bin/ctdb_bench bin/ctdb_fetch bin/ctdb_store bin/ctdb_persistent bin/rb_test \
        @INFINIBAND_BINS@
 
 BINS = bin/ctdb @CTDB_SCSI_IO@ bin/smnotify
@@ -120,6 +120,10 @@ bin/ctdb_store: $(CTDB_CLIENT_OBJ) tests/ctdb_store.o
        @echo Linking $@
        @$(CC) $(CFLAGS) -o $@ tests/ctdb_store.o $(CTDB_CLIENT_OBJ) $(LIB_FLAGS)
 
+bin/ctdb_persistent: $(CTDB_CLIENT_OBJ) tests/ctdb_persistent.o 
+       @echo Linking $@
+       @$(CC) $(CFLAGS) -o $@ tests/ctdb_persistent.o $(CTDB_CLIENT_OBJ) $(LIB_FLAGS)
+
 bin/ibwrapper_test: $(CTDB_CLIENT_OBJ) ib/ibwrapper_test.o
        @echo Linking $@
        @$(CC) $(CFLAGS) -o $@ ib/ibwrapper_test.o $(CTDB_CLIENT_OBJ) $(LIB_FLAGS)
index 6463c2d8fdb6a5298395921f0398bc41f7defd37..b816d2d720c382e4ec4b74fc1cc46422411b7ab6 100644 (file)
@@ -638,7 +638,46 @@ again:
 */
 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
 {
-       return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
+       int ret;
+       int32_t status;
+       struct ctdb_rec_data *rec;
+       TDB_DATA recdata;
+
+       if (h->ctdb_db->persistent) {
+               h->header.rsn++;
+       }
+
+       ret = ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
+       if (ret != 0) {
+               return ret;
+       }
+
+       /* don't need the persistent_store control for non-persistent databases */
+       if (!h->ctdb_db->persistent) {
+               return 0;
+       }
+
+       rec = ctdb_marshall_record(h, h->ctdb_db->db_id, h->key, &h->header, data);
+       if (rec == NULL) {
+               DEBUG(0,("Unable to marshall record in ctdb_record_store\n"));
+               return -1;
+       }
+
+       recdata.dptr = (uint8_t *)rec;
+       recdata.dsize = rec->length;
+
+       ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, 
+                          CTDB_CONTROL_PERSISTENT_STORE, 0,
+                          recdata, NULL, NULL, &status, NULL, NULL);
+
+       talloc_free(rec);
+
+       if (ret != 0 || status != 0) {
+               DEBUG(0,("Failed persistent store in ctdb_record_store\n"));
+               return -1;
+       }
+
+       return 0;
 }
 
 /*
@@ -1449,7 +1488,8 @@ int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint3
 /*
   create a database
  */
-int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, const char *name)
+int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
+                      TALLOC_CTX *mem_ctx, const char *name, bool persistent)
 {
        int ret;
        int32_t res;
@@ -1459,7 +1499,8 @@ int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32
        data.dsize = strlen(name)+1;
 
        ret = ctdb_control(ctdb, destnode, 0, 
-                          CTDB_CONTROL_DB_ATTACH, 0, data, 
+                          persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
+                          0, data, 
                           mem_ctx, &data, &res, &timeout, NULL);
 
        if (ret != 0 || res != 0) {
@@ -1571,7 +1612,7 @@ int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
 /*
   attach to a specific database - client call
 */
-struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name)
+struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent)
 {
        struct ctdb_db_context *ctdb_db;
        TDB_DATA data;
@@ -1594,7 +1635,8 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name)
        data.dsize = strlen(name)+1;
 
        /* tell ctdb daemon to attach */
-       ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_ATTACH,
+       ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
+                          persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
                           0, data, ctdb_db, &data, &res, NULL, NULL);
        if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
                DEBUG(0,("Failed to attach to database '%s'\n", name));
@@ -1619,6 +1661,8 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name)
                return NULL;
        }
 
+       ctdb_db->persistent = persistent;
+
        DLIST_ADD(ctdb->db_list, ctdb_db);
 
        return ctdb_db;
index dc8128aa20fb62d1d902c8b05bd1622b256aaa31..90489a7a716c44209c782e788af4533d9e86d7a1 100644 (file)
@@ -165,13 +165,20 @@ void ctdb_reqid_remove(struct ctdb_context *ctdb, uint32_t reqid)
 
 /*
   form a ctdb_rec_data record from a key/data pair
+  
+  note that header may be NULL. If not NULL then it is included in the data portion
+  of the record
  */
-struct ctdb_rec_data *ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,        TDB_DATA key, TDB_DATA data)
+struct ctdb_rec_data *ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,        
+                                          TDB_DATA key, 
+                                          struct ctdb_ltdb_header *header,
+                                          TDB_DATA data)
 {
        size_t length;
        struct ctdb_rec_data *d;
 
-       length = offsetof(struct ctdb_rec_data, data) + key.dsize + data.dsize;
+       length = offsetof(struct ctdb_rec_data, data) + key.dsize + 
+               data.dsize + (header?sizeof(*header):0);
        d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
        if (d == NULL) {
                return NULL;
@@ -179,9 +186,15 @@ struct ctdb_rec_data *ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
        d->length = length;
        d->reqid = reqid;
        d->keylen = key.dsize;
-       d->datalen = data.dsize;
        memcpy(&d->data[0], key.dptr, key.dsize);
-       memcpy(&d->data[key.dsize], data.dptr, data.dsize);
+       if (header) {
+               d->datalen = data.dsize + sizeof(*header);
+               memcpy(&d->data[key.dsize], header, sizeof(*header));
+               memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
+       } else {
+               d->datalen = data.dsize;
+               memcpy(&d->data[key.dsize], data.dptr, data.dsize);
+       }
        return d;
 }
 
index 5248883ae7fc577410e72244c3abe023f3e0bd29..010efb5cfc95141cb95099ca8b9e59d460d31eb2 100644 (file)
@@ -134,6 +134,7 @@ int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport);
   set the directory for the local databases
 */
 int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir);
+int ctdb_set_tdb_dir_persistent(struct ctdb_context *ctdb, const char *dir);
 
 /*
   set some flags
@@ -167,7 +168,7 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork);
 /*
   attach to a ctdb database
 */
-struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name);
+struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent);
 
 /*
   find an attached ctdb_db handle given a name
@@ -267,7 +268,10 @@ int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb,
  */
 struct ctdb_dbid_map {
        uint32_t num;
-       uint32_t dbids[1];
+       struct ctdb_dbid {
+               uint32_t dbid;
+               bool persistent;
+       } dbs[1];
 };
 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, 
        struct timeval timeout, uint32_t destnode, 
@@ -295,7 +299,7 @@ int ctdb_ctrl_copydb(struct ctdb_context *ctdb,
 
 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, const char **path);
 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, const char **name);
-int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, const char *name);
+int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, const char *name, bool persistent);
 
 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid);
 
index e2be8eda709e04be4ad346c771e7f16496031a86..7aa3431ba5103582d10a3cdf126fb5830137cdfa 100644 (file)
@@ -328,6 +328,7 @@ struct ctdb_context {
        struct ctdb_address address;
        const char *name;
        const char *db_directory;
+       const char *db_directory_persistent;
        const char *transport;
        const char *logfile;
        char *node_list_file;
@@ -365,6 +366,7 @@ struct ctdb_db_context {
        struct ctdb_db_context *next, *prev;
        struct ctdb_context *ctdb;
        uint32_t db_id;
+       bool persistent;
        const char *db_name;
        const char *db_path;
        struct tdb_wrap *ltdb;
@@ -465,6 +467,9 @@ enum ctdb_controls {CTDB_CONTROL_PROCESS_EXISTS          = 0,
                    CTDB_CONTROL_UNREGISTER_SERVER_ID    = 58,
                    CTDB_CONTROL_CHECK_SERVER_ID         = 59,
                    CTDB_CONTROL_GET_SERVER_ID_LIST      = 60,
+                   CTDB_CONTROL_DB_ATTACH_PERSISTENT    = 61,
+                   CTDB_CONTROL_PERSISTENT_STORE        = 62,
+                   CTDB_CONTROL_UPDATE_RECORD           = 63,
 };     
 
 /*
@@ -516,6 +521,15 @@ struct ctdb_control_tcp_vnn {
        struct sockaddr_in dest;
 };
 
+/*
+  persistent store control - update this record on all other nodes
+ */
+struct ctdb_control_persistent_store {
+       uint32_t db_id;
+       uint32_t len;
+       uint8_t  data[1];
+};
+
 /*
   structure used for CTDB_SRVID_NODE_FLAGS_CHANGED
  */
@@ -857,7 +871,7 @@ int ctdb_daemon_send_control(struct ctdb_context *ctdb, uint32_t destnode,
                             void *private_data);
 
 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata, 
-                              TDB_DATA *outdata);
+                              TDB_DATA *outdata, bool persistent);
 
 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
                         ctdb_fn_t fn, int id);
@@ -991,7 +1005,8 @@ int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id);
 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode);
 int32_t ctdb_ltdb_set_seqnum_frequency(struct ctdb_context *ctdb, uint32_t frequency);
 
-struct ctdb_rec_data *ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,        TDB_DATA key, TDB_DATA data);
+struct ctdb_rec_data *ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,        
+                                          TDB_DATA key, struct ctdb_ltdb_header *, TDB_DATA data);
 
 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata);
 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata);
@@ -1146,4 +1161,14 @@ int32_t ctdb_control_unregister_server_id(struct ctdb_context *ctdb,
 int32_t ctdb_control_get_server_id_list(struct ctdb_context *ctdb, 
                      TDB_DATA *outdata);
 
+int ctdb_attach_persistent(struct ctdb_context *ctdb);
+
+int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
+                                     struct ctdb_req_control *c, 
+                                     TDB_DATA recdata, bool *async_reply);
+int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
+                                  struct ctdb_req_control *c, TDB_DATA recdata, 
+                                  bool *async_reply);
+
+
 #endif
index d831e019b5cf37aa4962bc898da1a424ae5b5ec6..cd6c9c6e52a8fd93c7b133baaa944f021c770a10 100644 (file)
@@ -175,7 +175,10 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
        }
 
        case CTDB_CONTROL_DB_ATTACH:
-               return ctdb_control_db_attach(ctdb, indata, outdata);
+               return ctdb_control_db_attach(ctdb, indata, outdata, false);
+
+       case CTDB_CONTROL_DB_ATTACH_PERSISTENT:
+               return ctdb_control_db_attach(ctdb, indata, outdata, true);
 
        case CTDB_CONTROL_SET_CALL: {
                struct ctdb_control_set_call *sc = 
@@ -312,6 +315,12 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
                CHECK_CONTROL_DATA_SIZE(0);
                return ctdb_control_get_server_id_list(ctdb, outdata);
 
+       case CTDB_CONTROL_PERSISTENT_STORE:
+               return ctdb_control_persistent_store(ctdb, c, indata, async_reply);
+
+       case CTDB_CONTROL_UPDATE_RECORD:
+               return ctdb_control_update_record(ctdb, c, indata, async_reply);
+
        default:
                DEBUG(0,(__location__ " Unknown CTDB control opcode %u\n", opcode));
                return -1;
index 22b13455e821a2dd085956ae4c630bc94f175b44..27b18ccba1155a60b9ac10e89dbcdd67ae96c130 100644 (file)
@@ -92,56 +92,6 @@ static void ctdb_start_transport(struct ctdb_context *ctdb, int status, void *p)
                ctdb_start_tcp_tickle_update(ctdb);
 }
 
-/* go into main ctdb loop */
-static void ctdb_main_loop(struct ctdb_context *ctdb)
-{
-       int ret = -1;
-
-       if (strcmp(ctdb->transport, "tcp") == 0) {
-               int ctdb_tcp_init(struct ctdb_context *);
-               ret = ctdb_tcp_init(ctdb);
-       }
-#ifdef USE_INFINIBAND
-       if (strcmp(ctdb->transport, "ib") == 0) {
-               int ctdb_ibw_init(struct ctdb_context *);
-               ret = ctdb_ibw_init(ctdb);
-       }
-#endif
-       if (ret != 0) {
-               DEBUG(0,("Failed to initialise transport '%s'\n", ctdb->transport));
-               return;
-       }
-
-       /* initialise the transport  */
-       if (ctdb->methods->initialise(ctdb) != 0) {
-               DEBUG(0,("transport failed to initialise!\n"));
-               ctdb_fatal(ctdb, "transport failed to initialise");
-       }
-
-       /* tell all other nodes we've just started up */
-       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
-                                0, CTDB_CONTROL_STARTUP, 0,
-                                CTDB_CTRL_FLAG_NOREPLY,
-                                tdb_null, NULL, NULL);
-
-       /* release any IPs we hold from previous runs of the daemon */
-       ctdb_release_all_ips(ctdb);
-
-       ret = ctdb_event_script_callback(ctdb, timeval_zero(), ctdb, 
-                                        ctdb_start_transport, NULL, "startup");
-       if (ret != 0) {
-               DEBUG(0,("Failed startup event script\n"));
-               return;
-       }
-
-       /* go into a wait loop to allow other nodes to complete */
-       event_loop_wait(ctdb->ev);
-
-       DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
-       exit(1);
-}
-
-
 static void block_signal(int signum)
 {
        struct sigaction act;
@@ -626,7 +576,7 @@ static void print_exit_message(void)
 */
 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
 {
-       int res;
+       int res, ret = -1;
        struct fd_event *fde;
        const char *domain_socket_name;
 
@@ -665,23 +615,66 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
 
        ctdb->ev = event_context_init(NULL);
 
+       /* force initial recovery for election */
+       ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
+
+       if (strcmp(ctdb->transport, "tcp") == 0) {
+               int ctdb_tcp_init(struct ctdb_context *);
+               ret = ctdb_tcp_init(ctdb);
+       }
+#ifdef USE_INFINIBAND
+       if (strcmp(ctdb->transport, "ib") == 0) {
+               int ctdb_ibw_init(struct ctdb_context *);
+               ret = ctdb_ibw_init(ctdb);
+       }
+#endif
+       if (ret != 0) {
+               DEBUG(0,("Failed to initialise transport '%s'\n", ctdb->transport));
+               return -1;
+       }
+
+       /* initialise the transport  */
+       if (ctdb->methods->initialise(ctdb) != 0) {
+               DEBUG(0,("transport failed to initialise!\n"));
+               ctdb_fatal(ctdb, "transport failed to initialise");
+       }
+
+       /* attach to any existing persistent databases */
+       if (ctdb_attach_persistent(ctdb) != 0) {
+               ctdb_fatal(ctdb, "Failed to attach to persistent databases\n");         
+       }
+
        /* start frozen, then let the first election sort things out */
        if (!ctdb_blocking_freeze(ctdb)) {
-               DEBUG(0,("Failed to get initial freeze\n"));
-               exit(12);
+               ctdb_fatal(ctdb, "Failed to get initial freeze\n");
        }
 
-       /* force initial recovery for election */
-       ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
-
        /* now start accepting clients, only can do this once frozen */
        fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
                           EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
                           ctdb_accept_client, ctdb);
 
-       ctdb_main_loop(ctdb);
+       /* tell all other nodes we've just started up */
+       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
+                                0, CTDB_CONTROL_STARTUP, 0,
+                                CTDB_CTRL_FLAG_NOREPLY,
+                                tdb_null, NULL, NULL);
+
+       /* release any IPs we hold from previous runs of the daemon */
+       ctdb_release_all_ips(ctdb);
 
-       return 0;
+       ret = ctdb_event_script_callback(ctdb, timeval_zero(), ctdb, 
+                                        ctdb_start_transport, NULL, "startup");
+       if (ret != 0) {
+               DEBUG(0,("Failed startup event script\n"));
+               return -1;
+       }
+
+       /* go into a wait loop to allow other nodes to complete */
+       event_loop_wait(ctdb->ev);
+
+       DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
+       exit(1);
 }
 
 /*
index 5097481bd0ae700e6686b8f25b8c2ee6e3a892af..6232fbcae1a5b1b195bdac44cbdbf8473c2e735e 100644 (file)
@@ -22,6 +22,7 @@
 #include "lib/tdb/include/tdb.h"
 #include "system/network.h"
 #include "system/filesys.h"
+#include "system/dir.h"
 #include "../include/ctdb_private.h"
 #include "db_wrap.h"
 #include "lib/util/dlinklist.h"
@@ -186,36 +187,16 @@ static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
        }
 }
 
+
 /*
-  a client has asked to attach a new database
+  attach to a database, handling both persistent and non-persistent databases
+  return 0 on success, -1 on failure
  */
-int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
-                              TDB_DATA *outdata)
+static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name, bool persistent)
 {
-       const char *db_name = (const char *)indata.dptr;
        struct ctdb_db_context *ctdb_db, *tmp_db;
-       struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
        int ret;
-
-       /* If the node is inactive it is not part of the cluster
-          and we should not allow clients to attach to any
-          databases
-       */
-       if (node->flags & NODE_FLAGS_INACTIVE) {
-               DEBUG(0,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
-               return -1;
-       }
-
-
-       /* see if we already have this name */
-       for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
-               if (strcmp(db_name, tmp_db->db_name) == 0) {
-                       /* this is not an error */
-                       outdata->dptr  = (uint8_t *)&tmp_db->db_id;
-                       outdata->dsize = sizeof(tmp_db->db_id);
-                       return 0;
-               }
-       }
+       struct TDB_DATA key;
 
        ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
        CTDB_NO_MEMORY(ctdb, ctdb_db);
@@ -224,10 +205,10 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
        ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
        CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
 
-       ctdb_db->db_id = ctdb_hash(&indata);
-
-       outdata->dptr  = (uint8_t *)&ctdb_db->db_id;
-       outdata->dsize = sizeof(ctdb_db->db_id);
+       key.dsize = strlen(db_name)+1;
+       key.dptr  = discard_const(db_name);
+       ctdb_db->db_id = ctdb_hash(&key);
+       ctdb_db->persistent = persistent;
 
        /* check for hash collisions */
        for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
@@ -251,21 +232,31 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                return -1;
        }
 
+       if (persistent && mkdir(ctdb->db_directory_persistent, 0700) == -1 && errno != EEXIST) {
+               DEBUG(0,(__location__ " Unable to create ctdb persistent directory '%s'\n", 
+                        ctdb->db_directory_persistent));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+
        /* open the database */
        ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
-                                          ctdb->db_directory, 
+                                          persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
                                           db_name, ctdb->pnn);
 
        ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
                                      ctdb->tunable.database_hash_size, 
-                                     TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
+                                     persistent?TDB_DEFAULT:TDB_CLEAR_IF_FIRST, 
+                                     O_CREAT|O_RDWR, 0666);
        if (ctdb_db->ltdb == NULL) {
                DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
                talloc_free(ctdb_db);
                return -1;
        }
 
-       ctdb_check_db_empty(ctdb_db);
+       if (!persistent) {
+               ctdb_check_db_empty(ctdb_db);
+       }
 
        DLIST_ADD(ctdb->db_list, ctdb_db);
 
@@ -290,18 +281,113 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                talloc_free(ctdb_db);
                return -1;
        }
+
+       DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
        
+       /* success */
+       return 0;
+}
+
+
+/*
+  a client has asked to attach a new database
+ */
+int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
+                              TDB_DATA *outdata, bool persistent)
+{
+       const char *db_name = (const char *)indata.dptr;
+       struct ctdb_db_context *db;
+       struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
+
+       /* If the node is inactive it is not part of the cluster
+          and we should not allow clients to attach to any
+          databases
+       */
+       if (node->flags & NODE_FLAGS_INACTIVE) {
+               DEBUG(0,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
+               return -1;
+       }
+
+
+       /* see if we already have this name */
+       db = ctdb_db_handle(ctdb, db_name);
+       if (db) {
+               outdata->dptr  = (uint8_t *)&db->db_id;
+               outdata->dsize = sizeof(db->db_id);
+               return 0;
+       }
+
+       if (ctdb_local_attach(ctdb, db_name, persistent) != 0) {
+               return -1;
+       }
+
+       db = ctdb_db_handle(ctdb, db_name);
+       if (!db) {
+               DEBUG(0,("Failed to find db handle for name '%s'\n", db_name));
+               return -1;
+       }
+
+       outdata->dptr  = (uint8_t *)&db->db_id;
+       outdata->dsize = sizeof(db->db_id);
+
        /* tell all the other nodes about this database */
        ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
                                 CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
                                 indata, NULL, NULL);
 
-       DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
-
        /* success */
        return 0;
 }
 
+
+/*
+  attach to all existing persistent databases
+ */
+int ctdb_attach_persistent(struct ctdb_context *ctdb)
+{
+       DIR *d;
+       struct dirent *de;
+
+       /* open the persistent db directory and scan it for files */
+       d = opendir(ctdb->db_directory_persistent);
+       if (d == NULL) {
+               return 0;
+       }
+
+       while ((de=readdir(d))) {
+               char *p, *s;
+               size_t len = strlen(de->d_name);
+               uint32_t node;
+               
+               s = talloc_strdup(ctdb, de->d_name);
+               CTDB_NO_MEMORY(ctdb, s);
+
+               /* only accept names ending in .tdb */
+               p = strstr(s, ".tdb.");
+               if (len < 7 || p == NULL) {
+                       talloc_free(s);
+                       continue;
+               }
+               if (sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
+                       talloc_free(s);
+                       continue;
+               }
+               p[4] = 0;
+
+               if (ctdb_local_attach(ctdb, s, true) != 0) {
+                       DEBUG(0,("Failed to attach to persistent database '%s'\n", de->d_name));
+                       closedir(d);
+                       talloc_free(s);
+                       return -1;
+               }
+               DEBUG(0,("Attached to persistent database %s\n", s));
+
+               talloc_free(s);
+       }
+       closedir(d);
+       return 0;
+}
+
 /*
   called when a broadcast seqnum update comes in
  */
diff --git a/ctdb/server/ctdb_persistent.c b/ctdb/server/ctdb_persistent.c
new file mode 100644 (file)
index 0000000..7d9bd48
--- /dev/null
@@ -0,0 +1,262 @@
+/* 
+   persistent store logic
+
+   Copyright (C) Andrew Tridgell  2007
+   Copyright (C) Ronnie Sahlberg  2007
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/filesys.h"
+#include "system/wait.h"
+#include "db_wrap.h"
+#include "lib/tdb/include/tdb.h"
+#include "../include/ctdb_private.h"
+
+struct ctdb_persistent_state {
+       struct ctdb_context *ctdb;
+       struct ctdb_req_control *c;
+       const char *errormsg;
+       uint32_t num_pending;
+       int32_t status;
+};
+
+/*
+  called when a node has acknowledged a ctdb_control_update_record call
+ */
+static void ctdb_persistent_callback(struct ctdb_context *ctdb,
+                                    int32_t status, TDB_DATA data, 
+                                    const char *errormsg,
+                                    void *private_data)
+{
+       struct ctdb_persistent_state *state = talloc_get_type(private_data, 
+                                                             struct ctdb_persistent_state);
+       if (status != 0) {
+               DEBUG(0,("ctdb_persistent_callback failed with status %d (%s)\n",
+                        status, errormsg));
+               state->status = status;
+               state->errormsg = errormsg;
+       }
+       state->num_pending--;
+       if (state->num_pending == 0) {
+               ctdb_request_control_reply(state->ctdb, state->c, NULL, state->status, state->errormsg);
+               talloc_free(state);
+       }
+}
+
+
+/*
+  store a persistent record - called from a ctdb client when it has updated
+  a record in a persistent database. The client will have the record
+  locked for the duration of this call. The client is the dmaster when 
+  this call is made
+ */
+int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb, 
+                                     struct ctdb_req_control *c, 
+                                     TDB_DATA recdata, bool *async_reply)
+{
+       struct ctdb_persistent_state *state;
+       int i;
+
+       state = talloc_zero(c, struct ctdb_persistent_state);
+       CTDB_NO_MEMORY(ctdb, state);
+
+       state->ctdb = ctdb;
+       state->c    = c;
+
+       for (i=0;i<ctdb->num_nodes;i++) {
+               struct ctdb_node *node = ctdb->nodes[i];
+               int ret;
+
+               /* only send to active nodes */
+               if (node->flags & NODE_FLAGS_INACTIVE) {
+                       continue;
+               }
+
+               /* don't send to ourselves */
+               if (node->pnn == ctdb->pnn) {
+                       continue;
+               }
+               
+               ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
+                                              c->client_id, 0, recdata, 
+                                              ctdb_persistent_callback, state);
+               if (ret == -1) {
+                       DEBUG(0,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
+                       talloc_free(state);
+                       return -1;
+               }
+
+               state->num_pending++;
+       }
+
+       if (state->num_pending == 0) {
+               talloc_free(state);
+               return 0;
+       }
+       
+       /* we need to wait for the replies */
+       *async_reply = true;
+       return 0;
+}
+
+
+struct ctdb_persistent_lock_state {
+       struct ctdb_db_context *ctdb_db;
+       TDB_DATA key;
+       TDB_DATA data;
+       struct ctdb_ltdb_header *header;
+       struct tdb_context *tdb;
+       struct ctdb_req_control *c;
+};
+
+
+/*
+  called with a lock held in the current process
+ */
+static int ctdb_persistent_store(struct ctdb_persistent_lock_state *state)
+{
+       struct ctdb_ltdb_header oldheader;
+       int ret;
+
+       /* fetch the old header and ensure the rsn is less than the new rsn */
+       ret = ctdb_ltdb_fetch(state->ctdb_db, state->key, &oldheader, NULL, NULL);
+       if (ret != 0) {
+               DEBUG(0,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
+                        state->ctdb_db->db_id));
+               return -1;
+       }
+
+       if (oldheader.rsn >= state->header->rsn) {
+               DEBUG(0,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
+                        state->ctdb_db->db_id, oldheader.rsn, state->header->rsn));
+               return -1;
+       }
+
+       ret = ctdb_ltdb_store(state->ctdb_db, state->key, state->header, state->data);
+       if (ret != 0) {
+               DEBUG(0,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n", 
+                        state->ctdb_db->db_id));
+               return -1;
+       }
+
+       return 0;
+}
+
+
+/*
+  called when we get the lock on the given record
+  at this point the lockwait child holds a lock on our behalf
+ */
+static void ctdb_persistent_lock_callback(void *private_data)
+{
+       struct ctdb_persistent_lock_state *state = talloc_get_type(private_data, 
+                                                                  struct ctdb_persistent_lock_state);
+       int ret;
+
+       ret = tdb_chainlock_mark(state->tdb, state->key);
+       if (ret != 0) {
+               DEBUG(0,("Failed to mark lock in ctdb_persistent_lock_callback\n"));
+               ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, ret, NULL);
+               return;
+       }
+
+       ret = ctdb_persistent_store(state);
+       ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, ret, NULL);
+       tdb_chainlock_unmark(state->tdb, state->key);
+}
+
+/*
+  called if our lockwait child times out
+ */
+static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te, 
+                                        struct timeval t, void *private_data)
+{
+       struct ctdb_persistent_lock_state *state = talloc_get_type(private_data, 
+                                                                  struct ctdb_persistent_lock_state);
+       ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
+       talloc_free(state);
+}
+
+
+/* 
+   update a record on this node if the new record has a higher rsn than the
+   current record
+ */
+int32_t ctdb_control_update_record(struct ctdb_context *ctdb, 
+                                  struct ctdb_req_control *c, TDB_DATA recdata, 
+                                  bool *async_reply)
+{
+       struct ctdb_rec_data *rec = (struct ctdb_rec_data *)&recdata.dptr[0];
+       int ret;
+       struct ctdb_db_context *ctdb_db;
+       uint32_t db_id = rec->reqid;
+       struct lockwait_handle *handle;
+       struct ctdb_persistent_lock_state *state;
+
+       if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
+               DEBUG(0,("rejecting ctdb_control_update_record when recovery active\n"));
+               return -1;
+       }
+
+       ctdb_db = find_ctdb_db(ctdb, db_id);
+       if (ctdb_db == NULL) {
+               DEBUG(0,("Unknown database 0x%08x in ctdb_control_update_record\n", db_id));
+               return -1;
+       }
+
+       state = talloc(c, struct ctdb_persistent_lock_state);
+       CTDB_NO_MEMORY(ctdb, state);
+
+       state->ctdb_db = ctdb_db;
+       state->c       = c;
+       state->tdb     = ctdb_db->ltdb->tdb;
+       state->key.dptr   = &rec->data[0];
+       state->key.dsize  = rec->keylen;
+       state->data.dptr  = &rec->data[rec->keylen];
+       state->data.dsize = rec->datalen;
+
+       if (state->data.dsize < sizeof(struct ctdb_ltdb_header)) {
+               DEBUG(0,("Invalid data size %u in ctdb_control_update_record\n", state->data.dsize));
+               return -1;
+       }
+
+       state->header = (struct ctdb_ltdb_header *)&state->data.dptr[0];
+       state->data.dptr  += sizeof(struct ctdb_ltdb_header);
+       state->data.dsize -= sizeof(struct ctdb_ltdb_header);
+
+       /* try and do it without a lockwait */
+       ret = tdb_chainlock_nonblock(state->tdb, state->key);
+       if (ret == 0) {
+               ret = ctdb_persistent_store(state);
+               tdb_chainunlock(state->tdb, state->key);
+               return ret;
+       }
+
+       /* wait until we have a lock on this record */
+       handle = ctdb_lockwait(ctdb_db, state->key, ctdb_persistent_lock_callback, state);
+       if (handle == NULL) {
+               DEBUG(0,("Failed to setup lockwait handler in ctdb_control_update_record\n"));
+               return -1;
+       }
+
+       *async_reply = true;
+
+       event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
+                       ctdb_persistent_lock_timeout, state);
+
+       return 0;
+}
index c79c85d4e7d5c8c003df856eab0a65d559fbf5a8..f89980880d2bbacb93b24e37671bd81bc6852838 100644 (file)
@@ -125,7 +125,7 @@ ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indat
        }
 
 
-       outdata->dsize = offsetof(struct ctdb_dbid_map, dbids) + 4*len;
+       outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
        outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
        if (!outdata->dptr) {
                DEBUG(0, (__location__ " Failed to allocate dbmap array\n"));
@@ -134,8 +134,9 @@ ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indat
 
        dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
        dbid_map->num = len;
-       for(i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
-               dbid_map->dbids[i] = ctdb_db->db_id;
+       for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
+               dbid_map->dbs[i].dbid       = ctdb_db->db_id;
+               dbid_map->dbs[i].persistent = ctdb_db->persistent;
        }
 
        return 0;
@@ -254,7 +255,7 @@ int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DAT
 
        for (i=0;i<reply->count;i++) {
                struct ctdb_rec_data *rec;
-               rec = ctdb_marshall_record(outdata, 0, params.recs[i].key, params.recs[i].data);
+               rec = ctdb_marshall_record(outdata, 0, params.recs[i].key, NULL, params.recs[i].data);
                reply = talloc_realloc_size(outdata, reply, rec->length + len);
                memcpy(len+(uint8_t *)reply, rec, rec->length);
                len += rec->length;
index 7d16b18015c38a1258c6b12a74f4802c3a9b2e58..703690f05367b5a3d22935c0293c4c418121a4ae 100644 (file)
@@ -301,7 +301,7 @@ static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctd
 
 
                        for (i=0;i<remote_dbmap->num;i++) {
-                               if (dbmap->dbids[db] == remote_dbmap->dbids[i]) {
+                               if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
                                        break;
                                }
                        }
@@ -310,12 +310,14 @@ static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctd
                                continue;
                        }
                        /* ok so we need to create this database */
-                       ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbids[db], mem_ctx, &name);
+                       ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
+                                           mem_ctx, &name);
                        if (ret != 0) {
                                DEBUG(0, (__location__ " Unable to get dbname from node %u\n", pnn));
                                return -1;
                        }
-                       ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, name);
+                       ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
+                                          mem_ctx, name, dbmap->dbs[db].persistent);
                        if (ret != 0) {
                                DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
                                return -1;
@@ -359,7 +361,7 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
                        const char *name;
 
                        for (i=0;i<(*dbmap)->num;i++) {
-                               if (remote_dbmap->dbids[db] == (*dbmap)->dbids[i]) {
+                               if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
                                        break;
                                }
                        }
@@ -371,13 +373,14 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
                           rebuild dbmap
                         */
                        ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
-                                           remote_dbmap->dbids[db], mem_ctx, &name);
+                                           remote_dbmap->dbs[db].dbid, mem_ctx, &name);
                        if (ret != 0) {
                                DEBUG(0, (__location__ " Unable to get dbname from node %u\n", 
                                          nodemap->nodes[j].pnn));
                                return -1;
                        }
-                       ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name);
+                       ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
+                                          remote_dbmap->dbs[db].persistent);
                        if (ret != 0) {
                                DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
                                return -1;
@@ -416,7 +419,7 @@ static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node
                                continue;
                        }
                        ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
-                                              pnn, dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
+                                              pnn, dbmap->dbs[i].dbid, CTDB_LMASTER_ANY, mem_ctx);
                        if (ret != 0) {
                                DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
                                          nodemap->nodes[j].pnn, pnn));
@@ -444,9 +447,11 @@ static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctd
                        if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                                continue;
                        }
-                       ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, ctdb, dbmap->dbids[i], pnn);
+                       ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
+                                                  ctdb, dbmap->dbs[i].dbid, pnn);
                        if (ret != 0) {
-                               DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n", nodemap->nodes[j].pnn, dbmap->dbids[i]));
+                               DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n", 
+                                         nodemap->nodes[j].pnn, dbmap->dbs[i].dbid));
                                return -1;
                        }
                }
@@ -537,7 +542,7 @@ static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map
 
        /* update dmaster to point to this node for all databases/nodes */
        for (i=0;i<dbmap->num;i++) {
-               if (vacuum_db(ctdb, dbmap->dbids[i], nodemap) != 0) {
+               if (vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap) != 0) {
                        return -1;
                }
        }
@@ -565,7 +570,7 @@ static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_
                                continue;
                        }
                        ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), pnn, nodemap->nodes[j].pnn, 
-                                              dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
+                                              dbmap->dbs[i].dbid, CTDB_LMASTER_ANY, mem_ctx);
                        if (ret != 0) {
                                DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n", 
                                          pnn, nodemap->nodes[j].pnn));
@@ -919,6 +924,9 @@ static int do_recovery(struct ctdb_recoverd *rec,
                DEBUG(1, (__location__ " Recovery - done takeover\n"));
        }
 
+       for (i=0;i<dbmap->num;i++) {
+               DEBUG(0,("Recovered database with db_id 0x%08x\n", dbmap->dbs[i].dbid));
+       }
 
        /* disable recovery mode */
        ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
@@ -1805,8 +1813,6 @@ static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde,
        _exit(1);
 }
 
-
-
 /*
   startup the recovery daemon as a child of the main ctdb daemon
  */
index a9ae79bc7ec2e5c28d2d274feaf4a2a633e39d86..afa1b317f241a5bdc0ae14762aa302fc1c9985e9 100644 (file)
@@ -81,6 +81,18 @@ int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir)
        return 0;
 }
 
+/*
+  set the directory for the persistent databases
+*/
+int ctdb_set_tdb_dir_persistent(struct ctdb_context *ctdb, const char *dir)
+{
+       ctdb->db_directory_persistent = talloc_strdup(ctdb, dir);
+       if (ctdb->db_directory_persistent == NULL) {
+               return -1;
+       }
+       return 0;
+}
+
 /*
   add a node to the list of active nodes
 */
index a18ce654d3fa2c9a99fff1145d5f570d472056c4..c48387af34c3074a381b7b1294460290eeb04dd6 100644 (file)
@@ -95,7 +95,7 @@ static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DAT
                return 0;
        }
 
-       d = ctdb_marshall_record(h, 0, key, data);
+       d = ctdb_marshall_record(h, 0, key, NULL, data);
        if (d == NULL) {
                /* error handling is tricky in this child code .... */
                return -1;
@@ -280,7 +280,7 @@ static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
        struct ctdb_rec_data *d;
        TDB_DATA cdata;
 
-       d = ctdb_marshall_record(state, state->reqid, key, data);
+       d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
        if (d == NULL) {
                /* darn .... */
                DEBUG(0,("Out of memory in traverse_all_callback\n"));
@@ -408,7 +408,7 @@ static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
 
        state = talloc_get_type(p, struct traverse_start_state);
 
-       d = ctdb_marshall_record(state, state->reqid, key, data);
+       d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
        if (d == NULL) {
                return;
        }
index 92f4b86c53780068972af7d3204cc673b3bf940e..6ba76ffc953cb4148b746377d3a5aa63f92d2731 100644 (file)
@@ -46,6 +46,7 @@ static struct {
        const char *logfile;
        const char *recovery_lock_file;
        const char *db_dir;
+       const char *db_dir_persistent;
        const char *public_interface;
        int         no_setsched;
 } options = {
@@ -54,6 +55,7 @@ static struct {
        .event_script_dir = ETCDIR "/ctdb/events.d",
        .logfile = VARDIR "/log/log.ctdb",
        .db_dir = VARDIR "/ctdb",
+       .db_dir_persistent = VARDIR "/ctdb/persistent",
 };
 
 
@@ -108,6 +110,7 @@ int main(int argc, const char *argv[])
                { "listen", 0, POPT_ARG_STRING, &options.myaddress, 0, "address to listen on", "address" },
                { "transport", 0, POPT_ARG_STRING, &options.transport, 0, "protocol transport", NULL },
                { "dbdir", 0, POPT_ARG_STRING, &options.db_dir, 0, "directory for the tdb files", NULL },
+               { "dbdir-persistent", 0, POPT_ARG_STRING, &options.db_dir_persistent, 0, "directory for persistent tdb files", NULL },
                { "reclock", 0, POPT_ARG_STRING, &options.recovery_lock_file, 0, "location of recovery lock file", "filename" },
                { "nosetsched", 0, POPT_ARG_NONE, &options.no_setsched, 0, "disable setscheduler SCHED_FIFO call", NULL },
                POPT_TABLEEND
@@ -199,6 +202,13 @@ int main(int argc, const char *argv[])
                        exit(1);
                }
        }
+       if (options.db_dir_persistent) {
+               ret = ctdb_set_tdb_dir_persistent(ctdb, options.db_dir_persistent);
+               if (ret == -1) {
+                       DEBUG(0,("ctdb_set_tdb_dir_persistent failed - %s\n", ctdb_errstr(ctdb)));
+                       exit(1);
+               }
+       }
 
        if (options.public_interface) {
                ctdb->default_public_interface = talloc_strdup(ctdb, options.public_interface);
index 1eefe9ca4f4cab18e76fb802a484c55c474aa832..c14ef2b3cd5c09b201340bc086ec7f33573aafe0 100644 (file)
@@ -201,7 +201,7 @@ int main(int argc, const char *argv[])
                                 &cluster_ready);
 
        /* attach to a specific database */
-       ctdb_db = ctdb_attach(ctdb, "test.tdb");
+       ctdb_db = ctdb_attach(ctdb, "test.tdb", false);
        if (!ctdb_db) {
                printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
                exit(1);
index 1ac87017beea0fc5910c8f7e16eed2bc00c19041..56eb244a704538c83d6212f32958aa5914cf88ae 100644 (file)
@@ -219,7 +219,7 @@ int main(int argc, const char *argv[])
                                 &cluster_ready);
 
        /* attach to a specific database */
-       ctdb_db = ctdb_attach(ctdb, "test.tdb");
+       ctdb_db = ctdb_attach(ctdb, "test.tdb", false);
        if (!ctdb_db) {
                printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
                exit(1);
diff --git a/ctdb/tests/ctdb_persistent.c b/ctdb/tests/ctdb_persistent.c
new file mode 100644 (file)
index 0000000..f46fd40
--- /dev/null
@@ -0,0 +1,139 @@
+/* 
+   simple tool to test persistent databases
+
+   Copyright (C) Andrew Tridgell  2006-2007
+   Copyright (c) Ronnie sahlberg  2007
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/filesys.h"
+#include "popt.h"
+#include "cmdline.h"
+
+#include <sys/time.h>
+#include <time.h>
+
+static void test_store_records(struct ctdb_context *ctdb, struct event_context *ev)
+{
+       TDB_DATA key, data;
+       struct ctdb_db_context *ctdb_db;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       int ret, i;
+       struct ctdb_record_handle *h;
+       unsigned node=0, count=0;
+       
+       ctdb_db = ctdb_db_handle(ctdb, "persistent.tdb");
+
+       key.dptr = discard_const("testkey");
+       key.dsize = strlen((const char *)key.dptr)+1;
+
+       for (i=0;i<10;i++) {
+               h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
+               if (h == NULL) {
+                       printf("Failed to fetch record '%s' on node %d\n", 
+                              (const char *)key.dptr, ctdb_get_pnn(ctdb));
+                       talloc_free(tmp_ctx);
+                       return;
+               }
+               
+               printf("Current value: %*.*s\n", data.dsize, data.dsize, data.dptr);
+               
+               if (data.dsize != 0) {
+                       if (sscanf((char *)data.dptr, "Node %u Count %u", &node, &count) != 2) {
+                               printf("Badly formatted node data!\n");
+                               exit(1);
+                       }
+               }
+               
+               node = ctdb_get_pnn(ctdb);
+               count++;
+               
+               data.dptr = (uint8_t *)talloc_asprintf(h, "Node %u Count %u", node, count);
+               data.dsize = strlen((char *)data.dptr)+1;
+               
+               ret = ctdb_record_store(h, data);
+               if (ret != 0) {
+                       DEBUG(0,("Failed to store record\n"));
+                       exit(1);
+               }
+               talloc_free(h);
+       }
+
+       talloc_free(tmp_ctx);
+}
+
+/*
+  main program
+*/
+int main(int argc, const char *argv[])
+{
+       struct ctdb_context *ctdb;
+       struct ctdb_db_context *ctdb_db;
+
+       struct poptOption popt_options[] = {
+               POPT_AUTOHELP
+               POPT_CTDB_CMDLINE
+               POPT_TABLEEND
+       };
+       int opt;
+       const char **extra_argv;
+       int extra_argc = 0;
+       poptContext pc;
+       struct event_context *ev;
+
+       pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
+
+       while ((opt = poptGetNextOpt(pc)) != -1) {
+               switch (opt) {
+               default:
+                       fprintf(stderr, "Invalid option %s: %s\n", 
+                               poptBadOption(pc, 0), poptStrerror(opt));
+                       exit(1);
+               }
+       }
+
+       /* setup the remaining options for the main program to use */
+       extra_argv = poptGetArgs(pc);
+       if (extra_argv) {
+               extra_argv++;
+               while (extra_argv[extra_argc]) extra_argc++;
+       }
+
+       ev = event_context_init(NULL);
+
+       ctdb = ctdb_cmdline_client(ev);
+
+       /* attach to a specific database */
+       ctdb_db = ctdb_attach(ctdb, "persistent.tdb", true);
+       if (!ctdb_db) {
+               printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
+               exit(1);
+       }
+
+       printf("Waiting for cluster\n");
+       while (1) {
+               uint32_t recmode=1;
+               ctdb_ctrl_getrecmode(ctdb, ctdb, timeval_zero(), CTDB_CURRENT_NODE, &recmode);
+               if (recmode == 0) break;
+               event_loop_once(ev);
+       }
+
+       printf("Starting test\n");
+       test_store_records(ctdb, ev);
+
+       return 0;
+}
index 9d4b30c2a7fd5fbeef4b8d7a994e6c4efd6972df..45373dd28ab592938c180820b19d31a18a261ed8 100644 (file)
@@ -136,7 +136,7 @@ int main(int argc, const char *argv[])
        ctdb = ctdb_cmdline_client(ev);
 
        /* attach to a specific database */
-       ctdb_db = ctdb_attach(ctdb, "test.tdb");
+       ctdb_db = ctdb_attach(ctdb, "test.tdb", false);
        if (!ctdb_db) {
                printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
                exit(1);
diff --git a/ctdb/tests/persistent.sh b/ctdb/tests/persistent.sh
new file mode 100755 (executable)
index 0000000..e53e11f
--- /dev/null
@@ -0,0 +1,24 @@
+#!/bin/sh
+
+NUMNODES=2
+if [ $# -gt 0 ]; then
+    NUMNODES=$1
+fi
+
+rm -f nodes.txt
+for i in `seq 1 $NUMNODES`; do
+  echo 127.0.0.$i >> nodes.txt
+done
+
+tests/start_daemons.sh $NUMNODES nodes.txt || exit 1
+
+
+killall -9 -q ctdb_persistent
+for i in `seq 1 $NUMNODES`; do
+  $VALGRIND bin/ctdb_persistent --socket sock.$i $* &
+done
+wait
+
+echo "Shutting down"
+bin/ctdb shutdown -n all --socket=sock.1
+exit 0
index f577ef42bee244e01346dd18c4b50386ce0b7a93..966cc5dfd7d2c4f984bb52c28ce57604f79493a7 100755 (executable)
@@ -7,7 +7,7 @@ shift
 
 killall -q ctdbd
 
-CTDB_OPTIONS="--reclock=rec.lock --nlist $NODES --event-script-dir=tests/events.d --logfile=-  --dbdir=test.db $*"
+CTDB_OPTIONS="--reclock=rec.lock --nlist $NODES --event-script-dir=tests/events.d --logfile=-  --dbdir=test.db --dbdir-persistent=test.db/persistent $*"
 if [ `id -u` -eq 0 ]; then
     CTDB_OPTIONS="$CTDB_OPTIONS --public-addresses=tests/public_addresses --public-interface=lo"
 fi
index b4f2ae575c3fc232ff87fbf2e19a70800291c3fc..ed544e5c41b3696dd33007349f752b6be81aad7a 100644 (file)
@@ -762,7 +762,7 @@ static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
        }
 
        db_name = argv[0];
-       ctdb_db = ctdb_attach(ctdb, db_name);
+       ctdb_db = ctdb_attach(ctdb, db_name, false);
 
        if (ctdb_db == NULL) {
                DEBUG(0,("Unable to attach to database '%s'\n", db_name));
@@ -800,10 +800,13 @@ static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **ar
        for(i=0;i<dbmap->num;i++){
                const char *path;
                const char *name;
+               bool persistent;
 
-               ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbids[i], ctdb, &path);
-               ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbids[i], ctdb, &name);
-               printf("dbid:0x%08x name:%s path:%s\n", dbmap->dbids[i], name, path);
+               ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
+               ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
+               persistent = dbmap->dbs[i].persistent;
+               printf("dbid:0x%08x name:%s path:%s %s\n", dbmap->dbs[i].dbid, name, 
+                      path, persistent?"PERSISTENT":"");
        }
 
        return 0;
@@ -982,7 +985,7 @@ static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv
        }
        db_name = argv[0];
 
-       ctdb_db = ctdb_attach(ctdb, db_name);
+       ctdb_db = ctdb_attach(ctdb, db_name, false);
        if (ctdb_db == NULL) {
                DEBUG(0,("Unable to attach to database '%s'\n", db_name));
                return -1;