struct ctdb_call *c;
struct ctdb_registered_call *fn;
TDB_DATA data;
+ struct ctdb_ltdb_header header;
+ int ret;
c = talloc(ctdb, struct ctdb_call);
CTDB_NO_MEMORY(ctdb, c);
- data = tdb_fetch(ctdb->ltdb, key);
+ ret = ctdb_ltdb_fetch(ctdb, c, key, &header, &data);
+ if (ret != 0) return -1;
c->key = key;
c->call_data = call_data;
c->record_data.dptr = talloc_memdup(c, data.dptr, data.dsize);
c->record_data.dsize = data.dsize;
CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
- if (data.dptr) free(data.dptr);
c->new_data = NULL;
c->reply_data = NULL;
}
if (fn->fn(c) != 0) {
- free(c->record_data.dptr);
ctdb_set_error(ctdb, "ctdb_call %u failed\n", call_id);
return -1;
}
if (c->new_data) {
- if (tdb_store(ctdb->ltdb, key, *c->new_data, TDB_REPLACE) != 0) {
+ if (ctdb_ltdb_store(ctdb, key, &header, *c->new_data) != 0) {
ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
return -1;
}
}
return 0;
}
+
+
+/*
+ construct an initial header for a record with no ltdb header yet
+*/
+static void ltdb_initial_header(struct ctdb_context *ctdb,
+ TDB_DATA key,
+ struct ctdb_ltdb_header *header)
+{
+ header->rsn = 0;
+ header->dmaster = ctdb_hash(&key) % ctdb->num_nodes;
+ header->laccessor = header->dmaster;
+ header->lacount = 0;
+}
+
+
+/*
+ fetch a record from the ltdb, separating out the header information
+ and returning the body of the record. A valid (initial) header is
+ returned if the record is not present
+*/
+int ctdb_ltdb_fetch(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
+ TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA *data)
+{
+ TDB_DATA rec;
+
+ rec = tdb_fetch(ctdb->ltdb, key);
+ if (rec.dsize < sizeof(*header)) {
+ /* return an initial header */
+ ltdb_initial_header(ctdb, key, header);
+ data->dptr = NULL;
+ data->dsize = 0;
+ return 0;
+ }
+
+ *header = *(struct ctdb_ltdb_header *)rec.dptr;
+
+ data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
+ data->dptr = talloc_memdup(mem_ctx, sizeof(struct ctdb_ltdb_header)+rec.dptr,
+ data->dsize);
+ CTDB_NO_MEMORY(ctdb, data->dptr);
+
+ return 0;
+}
+
+
+/*
+ fetch a record from the ltdb, separating out the header information
+ and returning the body of the record. A valid (initial) header is
+ returned if the record is not present
+*/
+int ctdb_ltdb_store(struct ctdb_context *ctdb, TDB_DATA key,
+ struct ctdb_ltdb_header *header, TDB_DATA data)
+{
+ TDB_DATA rec;
+ int ret;
+
+ rec.dsize = sizeof(struct ctdb_ltdb_header) + data.dsize;
+ rec.dptr = talloc_size(ctdb, rec.dsize);
+ CTDB_NO_MEMORY(ctdb, rec.dptr);
+
+ memcpy(rec.dptr, header, sizeof(*header));
+ memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
+
+ ret = tdb_store(ctdb->ltdb, key, rec, TDB_REPLACE);
+ talloc_free(rec.dptr);
+
+ return ret;
+}
/* arbitrary maximum timeout for ctdb operations */
#define CTDB_REQ_TIMEOUT 10
+/*
+ the extended header for records in the ltdb
+*/
+struct ctdb_ltdb_header {
+ uint64_t rsn;
+ uint32_t dmaster;
+ uint32_t laccessor;
+ uint32_t lacount;
+};
+
/*
operation IDs
*/
enum ctdb_operation {
CTDB_REQ_CALL = 0,
- CTDB_REPLY_CALL = 1
+ CTDB_REPLY_CALL = 1,
+ CTDB_REPLY_REDIRECT = 2,
+ CTDB_REQ_DMASTER = 3,
+ CTDB_REPLY_DMASTER = 4,
};
/*
void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
+int ctdb_ltdb_fetch(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
+ TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA *data);
+int ctdb_ltdb_store(struct ctdb_context *ctdb, TDB_DATA key,
+ struct ctdb_ltdb_header *header, TDB_DATA data);
+
+