/* loop over all key/data pairs */
ptr=&outdata.dptr[8];
for(i=0;i<keys->num;i++){
- uint32_t len;
TDB_DATA *key, *data;
- keys->lmasters[i]= *((uint32_t *)ptr);
- ptr+=4;
+ keys->lmasters[i] = *((uint32_t *)ptr);
+ ptr += 4;
- key=&keys->keys[i];
- key->dsize= *((uint32_t *)ptr);
- ptr+=4;
- key->dptr=talloc_size(mem_ctx, key->dsize);
+ key = &keys->keys[i];
+ key->dsize = *((uint32_t *)ptr);
+ key->dptr = talloc_size(mem_ctx, key->dsize);
+ ptr += 4;
+
+ data = &keys->data[i];
+ data->dsize = *((uint32_t *)ptr);
+ data->dptr = talloc_size(mem_ctx, data->dsize);
+ ptr += 4;
+
+ ptr = outdata.dptr+(((ptr-outdata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
memcpy(key->dptr, ptr, key->dsize);
- len = (key->dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
- ptr+=len;
+ ptr += key->dsize;
+ ptr = outdata.dptr+(((ptr-outdata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
memcpy(&keys->headers[i], ptr, sizeof(struct ctdb_ltdb_header));
- len = (sizeof(struct ctdb_ltdb_header)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
- ptr+=len;
+ ptr += sizeof(struct ctdb_ltdb_header);
- data=&keys->data[i];
- data->dsize= *((uint32_t *)ptr);
- ptr+=4;
- data->dptr=talloc_size(mem_ctx, data->dsize);
+ ptr = outdata.dptr+(((ptr-outdata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
memcpy(data->dptr, ptr, data->dsize);
- len = (data->dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
- ptr+=len;
+ ptr += data->dsize;
+ ptr = outdata.dptr+(((ptr-outdata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
}
return 0;
return 0;
}
+int ctdb_ctrl_write_record(struct ctdb_context *ctdb, uint32_t destnode, TALLOC_CTX *mem_ctx, uint32_t dbid, TDB_DATA key, TDB_DATA data)
+{
+ int ret, len;
+ TDB_DATA indata, outdata;
+ int32_t res;
+ unsigned char *ptr;
+
+ len = 4; /* dbid */
+ len += 4; /* keylen */
+ len += (key.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
+ len += 4; /* datalen */
+ len += (data.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
+
+ indata.dsize = len;
+ indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint8_t, len);
+ ptr = indata.dptr;
+ *((uint32_t *)ptr) = dbid;
+ ptr += 4;
+
+ *((uint32_t *)ptr) = key.dsize;
+ ptr += 4;
+ memcpy(ptr, key.dptr, key.dsize);
+ ptr += (key.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
+
+ *((uint32_t *)ptr) = data.dsize;
+ ptr += 4;
+ memcpy(ptr, data.dptr, data.dsize);
+ ptr += (data.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
+
+ ret = ctdb_control(ctdb, destnode, 0,
+ CTDB_CONTROL_WRITE_RECORD, 0, indata,
+ mem_ctx, &outdata, &res);
+ if (ret != 0 || res != 0) {
+ DEBUG(0,(__location__ " ctdb_control for write record failed\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
/*
ping a node, return number of clients connected
*/
/* only include this record if the lmaster matches or if
the wildcard lmaster (-1) was specified.
*/
- if((lmaster!=CTDB_LMASTER_ANY)
- && (lmaster!=params->lmaster) ){
+ if((params->lmaster!=CTDB_LMASTER_ANY)
+ && (params->lmaster!=lmaster) ){
return 0;
}
len=outdata->dsize;
len+=4; /*lmaster*/
len+=4; /*key len*/
+ len+=4; /*data len */
len+=key.dsize;
len=(len+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
len+=sizeof(struct ctdb_ltdb_header);
len=(len+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
- len+=4; /*data len */
- len+=data.dsize;
+ len+=(data.dsize-sizeof(struct ctdb_ltdb_header));
len=(len+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
ptr=outdata->dptr=talloc_realloc_size(outdata, outdata->dptr, len);
*((uint32_t *)ptr)=lmaster;
ptr+=4;
-
*((uint32_t *)ptr)=key.dsize;
ptr+=4;
+ *((uint32_t *)ptr)=data.dsize-sizeof(struct ctdb_ltdb_header);
+ ptr+=4;
+
+ ptr = outdata->dptr+(((ptr-outdata->dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
memcpy(ptr, key.dptr, key.dsize);
- ptr+= (key.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
+ ptr += key.dsize;
+ ptr = outdata->dptr+(((ptr-outdata->dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
memcpy(ptr, data.dptr, sizeof(struct ctdb_ltdb_header));
- ptr+=(sizeof(struct ctdb_ltdb_header)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
+ ptr += sizeof(struct ctdb_ltdb_header);
- *((uint32_t *)ptr)=data.dsize-sizeof(struct ctdb_ltdb_header);
- ptr+=4;
+ ptr = outdata->dptr+(((ptr-outdata->dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
memcpy(ptr, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
- ptr+= (data.dsize-sizeof(struct ctdb_ltdb_header)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
return 0;
}
ptr=&indata.dptr[8];
for(i=0;i<num;i++){
/* skip the lmaster*/
- ptr+=4;
+ ptr += 4;
- /* key */
- key.dsize=*((uint32_t *)ptr);
- ptr+=4;
- key.dptr=ptr;
- ptr+=(key.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
+ /* keylength */
+ key.dsize = *((uint32_t *)ptr);
+ ptr += 4;
+ /* data length */
+ data.dsize = *((uint32_t *)ptr);
+ ptr += 4;
+
+ /* key */
+ ptr = indata.dptr+(((ptr-indata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
+ key.dptr = ptr;
+ ptr += key.dsize;
+
/* header */
+ ptr = indata.dptr+(((ptr-indata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
hdr = (struct ctdb_ltdb_header *)ptr;
- ptr+=(sizeof(struct ctdb_ltdb_header)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
+ ptr += sizeof(struct ctdb_ltdb_header);
/* data */
- data.dsize=*((uint32_t *)ptr);
- ptr+=4;
+ ptr = indata.dptr+(((ptr-indata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
data.dptr=ptr;
- ptr+=(data.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
+ ptr += data.dsize;
+
+ ptr = indata.dptr+(((ptr-indata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
ret = ctdb_ltdb_lock(ctdb_db, key);
if (ret != 0) {
return 0;
}
+ case CTDB_CONTROL_WRITE_RECORD: {
+ uint32_t dbid;
+ struct ctdb_db_context *ctdb_db;
+ unsigned char *ptr;
+ TDB_DATA key, data;
+ struct ctdb_ltdb_header header;
+ int ret;
+
+ outdata->dsize = 0;
+ outdata->dptr = NULL;
+
+ dbid = ((uint32_t *)(&indata.dptr[0]))[0];
+ ctdb_db = find_ctdb_db(ctdb, dbid);
+ if (!ctdb_db) {
+ DEBUG(0,(__location__ " Unknown db 0x%08x\n",dbid));
+ return -1;
+ }
+
+ ptr = &indata.dptr[4];
+
+ key.dsize = *((uint32_t *)(ptr));
+ ptr += 4;
+ key.dptr = ptr;
+ ptr += (key.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
+
+ data.dsize = *((uint32_t *)(ptr));
+ ptr += 4;
+ data.dptr = ptr;
+ ptr += (data.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
+
+ ret = ctdb_ltdb_lock(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(0, (__location__ "Unable to lock db\n"));
+ return -1;
+ }
+ ret = ctdb_ltdb_fetch(ctdb_db, key, &header, outdata, NULL);
+ if (ret != 0) {
+ DEBUG(0, (__location__ "Unable to fetch record\n"));
+ ctdb_ltdb_unlock(ctdb_db, key);
+ return -1;
+ }
+ header.rsn++;
+
+ ret = ctdb_ltdb_store(ctdb_db, key, &header, data);
+ if (ret != 0) {
+ DEBUG(0, (__location__ "Unable to store record\n"));
+ ctdb_ltdb_unlock(ctdb_db, key);
+ return -1;
+ }
+ ctdb_ltdb_unlock(ctdb_db, key);
+
+ return 0;
+ }
+
case CTDB_CONTROL_SET_RECMODE: {
ctdb->recovery_mode = ((uint32_t *)(&indata.dptr[0]))[0];
return ctdb_control_db_attach(ctdb, indata, outdata);
case CTDB_CONTROL_SET_CALL: {
- struct ctdb_control_set_call *c =
+ struct ctdb_control_set_call *sc =
(struct ctdb_control_set_call *)indata.dptr;
CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_set_call));
- return ctdb_daemon_set_call(ctdb, c->db_id, c->fn, c->id);
+ return ctdb_daemon_set_call(ctdb, sc->db_id, sc->fn, sc->id);
}
default:
*/
int ctdb_ctrl_cleardb(struct ctdb_context *ctdb, uint32_t destnode, TALLOC_CTX *mem_ctx, uint32_t dbid);
+/*
+ write a record on a specific db (this implicitely updates dmaster of the record to locally be the vnn of the node where the control is executed on)
+ */
+int ctdb_ctrl_write_record(struct ctdb_context *ctdb, uint32_t destnode, TALLOC_CTX *mem_ctx, uint32_t dbid, TDB_DATA key, TDB_DATA data);
#define CTDB_RECOVERY_NORMAL 0
#define CTDB_RECOVERY_ACTIVE 1
CTDB_CONTROL_SET_RECMODE,
CTDB_CONTROL_STATUS_RESET,
CTDB_CONTROL_DB_ATTACH,
- CTDB_CONTROL_SET_CALL};
+ CTDB_CONTROL_SET_CALL,
+ CTDB_CONTROL_WRITE_RECORD};
/*
structure passed in set_call control
bin/ctdbd --nlist direct/4nodes.txt
bin/ctdbd --nlist direct/4nodes.txt
-echo "Testing ping"
-bin/ctdb_control ping || exit 1
-
-echo "Testing status"
-bin/ctdb_control status all || exit 1
-
-echo "Testing statusreset"
-bin/ctdb_control statusreset all || exit 1
-
-echo "Testing debug"
-bin/ctdb_control debug all 5 || exit 1
-bin/ctdb_control debuglevel || exit 1
-bin/ctdb_control debug all 0 || exit 1
-bin/ctdb_control debuglevel || exit 1
-
-echo "Testing map calls"
-bin/ctdb_control getvnnmap 0 || exit 1
-
echo "Attaching to some databases"
-bin/ctdb_control attach test1.tdb || exit 1
-bin/ctdb_control attach test2.tdb || exit 1
-
-echo "Testing getdbmap"
-bin/ctdb_control getdbmap 0 || exit 1
+bin/ctdb_control --socket=/tmp/ctdb.socket attach test1.tdb || exit 1
+bin/ctdb_control --socket=/tmp/ctdb.socket attach test2.tdb || exit 1
+bin/ctdb_control --socket=/tmp/ctdb.socket attach test3.tdb || exit 1
+bin/ctdb_control --socket=/tmp/ctdb.socket attach test4.tdb || exit 1
+
+echo "Clearing all databases to make sure they are all empty"
+bin/ctdb_control --socket=/tmp/ctdb.socket getdbmap 0 | egrep "^dbid:" | sed -e "s/^dbid://" -e "s/ .*$//" | while read DB; do
+ seq 0 3 | while read NODE; do
+ bin/ctdb_control --socket=/tmp/ctdb.socket cleardb $NODE $DB
+ done
+done
+
+
+echo
+echo
+echo "Printing all databases on all nodes. they should all be empty"
+echo "============================================================="
+bin/ctdb_control --socket=/tmp/ctdb.socket getdbmap 0 | egrep "^dbid:" | sed -e "s/^dbid://" -e "s/ .*$//" | while read DB; do
+ seq 0 3 | while read NODE; do
+ echo "Content of DB:$DB NODE:$NODE :"
+ bin/ctdb_control --socket=/tmp/ctdb.socket catdb $NODE $DB
+ done
+done
+
+
+echo
+echo
+echo "Populating the databases"
#leave the ctdb daemons running
#killall -q ctdbd
printf(" cpdb <fromvnn> <tovnn> <dbid> lists all keys in a remote tdb\n");
printf(" setdmaster <vnn> <dbid> <dmaster> sets new dmaster for all records in the database\n");
printf(" cleardb <vnn> <dbid> deletes all records in a db\n");
+ printf(" writerecord <vnn> <dbid> <key> <data>\n");
printf(" getrecmode <vnn> get recovery mode\n");
printf(" setrecmode <vnn> <mode> set recovery mode\n");
printf(" recover <vnn> recover the cluster\n");
return 0;
}
+/*
+ write a record to a remote tdb
+ */
+static int control_writerecord(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+ uint32_t vnn, dbid;
+ TDB_DATA key, data;
+ int ret;
+
+ if (argc < 4) {
+ usage();
+ }
+
+ vnn = strtoul(argv[0], NULL, 0);
+ dbid = strtoul(argv[1], NULL, 0);
+
+ key.dptr = discard_const(argv[2]);
+ key.dsize = strlen((const char *)(key.dptr));
+ data.dptr = discard_const(argv[3]);
+ data.dsize = strlen((const char *)(data.dptr));
+
+ ret = ctdb_ctrl_write_record(ctdb, vnn, ctdb, dbid, key, data);
+ if (ret != 0) {
+ printf("Unable to set vnnmap for node %u\n", vnn);
+ return ret;
+ }
+ return 0;
+}
+
/*
set the dmaster for all records in a database
*/
ret = control_cpdb(ctdb, extra_argc-1, extra_argv+1);
} else if (strcmp(control, "setvnnmap") == 0) {
ret = control_setvnnmap(ctdb, extra_argc-1, extra_argv+1);
+ } else if (strcmp(control, "setvnnmap") == 0) {
+ ret = control_setvnnmap(ctdb, extra_argc-1, extra_argv+1);
+ } else if (strcmp(control, "writerecord") == 0) {
+ ret = control_writerecord(ctdb, extra_argc-1, extra_argv+1);
} else if (strcmp(control, "setdmaster") == 0) {
ret = control_setdmaster(ctdb, extra_argc-1, extra_argv+1);
} else if (strcmp(control, "cleardb") == 0) {