}
/* if this nodes has done enough consecutive calls on the same record
- then give them the record */
- if (header.laccessor == c->hdr.srcnode &&
- header.lacount >= ctdb->max_lacount) {
+ then give them the record
+ or if the node requested an immediate migration
+ */
+ if ( (header.laccessor == c->hdr.srcnode
+ && header.lacount >= ctdb->max_lacount)
+ || c->flags&CTDB_IMMEDIATE_MIGRATION ) {
ctdb_call_send_dmaster(ctdb_db, c, &header, &call.key, &data);
talloc_free(data.dptr);
return;
struct ctdb_call call;
int redirect_count;
struct ctdb_ltdb_header header;
+ void *fetch_private;
};
state->c->hdr.srcnode = ctdb->vnn;
/* this limits us to 16k outstanding messages - not unreasonable */
state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
+ state->c->flags = call->flags;
state->c->db_id = ctdb_db->db_id;
state->c->callid = call->call_id;
state->c->keylen = call->key.dsize;
}
+
+struct ctdb_record_handle {
+ struct ctdb_db_context *ctdb_db;
+ TDB_DATA key;
+ TDB_DATA *data;
+};
+
/*
make a remote ctdb call - async recv.
*/
int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
{
+ struct ctdb_record_handle *rec = state->fetch_private;
+
+ /* ugly hack to manage forced migration */
+ if (rec != NULL) {
+ rec->data->dptr = talloc_memdup(rec, state->call.reply_data.dptr,
+ state->call.reply_data.dsize);
+ rec->data->dsize = state->call.reply_data.dsize;
+ talloc_free(state);
+ return 0;
+ }
+
while (state->state < CTDB_CALL_DONE) {
event_loop_once(state->node->ctdb->ev);
}
state = ctdb_call_send(ctdb_db, call);
return ctdb_call_recv(state, call);
}
+
+
+
+
+
+
+struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
+ TDB_DATA key, TDB_DATA *data)
+{
+ struct ctdb_call call;
+ struct ctdb_record_handle *rec;
+ struct ctdb_call_state *state;
+ int ret;
+
+ ZERO_STRUCT(call);
+ call.call_id = CTDB_FETCH_FUNC;
+ call.key = key;
+ call.flags = CTDB_IMMEDIATE_MIGRATION;
+
+ rec = talloc(mem_ctx, struct ctdb_record_handle);
+ CTDB_NO_MEMORY_NULL(ctdb_db->ctdb, rec);
+
+ rec->ctdb_db = ctdb_db;
+ rec->key = key;
+ rec->key.dptr = talloc_memdup(rec, key.dptr, key.dsize);
+
+ state = ctdb_call_send(ctdb_db, &call);
+ state->fetch_private = rec;
+
+ ret = ctdb_call_recv(state, &call);
+ if (ret != 0) {
+ talloc_free(rec);
+ return NULL;
+ }
+
+ return rec;
+}
+
+
+int ctdb_record_store(struct ctdb_record_handle *rec, TDB_DATA data)
+{
+ int ret;
+ struct ctdb_ltdb_header header;
+
+ /* should be avoided if possible hang header off rec ? */
+ ret = ctdb_ltdb_fetch(rec->ctdb_db, rec->key, &header, NULL);
+ if (ret) {
+ ctdb_set_error(rec->ctdb_db->ctdb, "Fetch of locally held record failed");
+ return ret;
+ }
+
+ ret = ctdb_ltdb_store(rec->ctdb_db, rec->key, &header, data);
+
+ return ret;
+}
return NULL;
}
+
+/*
+ this is the dummy null procedure that all databases support
+*/
+static int ctdb_fetch_func(struct ctdb_call_info *call)
+{
+ call->reply_data = &call->record_data;
+ return 0;
+}
+
+
/*
attach to a specific database
*/
{
struct ctdb_db_context *ctdb_db, *tmp_db;
TDB_DATA data;
+ int ret;
ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
return NULL;
}
+
+ /*
+ all databases support the "fetch" function. we need this in order to do forced migration of records
+ */
+ ret = ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
+ if (ret != 0) {
+ talloc_free(ctdb_db);
+ return NULL;
+ }
+
DLIST_ADD(ctdb->db_list, ctdb_db);
+
return ctdb_db;
}
/* return an initial header */
free(rec.dptr);
ltdb_initial_header(ctdb_db, key, header);
- data->dptr = NULL;
- data->dsize = 0;
+ if (data) {
+ data->dptr = NULL;
+ data->dsize = 0;
+ }
return 0;
}
*header = *(struct ctdb_ltdb_header *)rec.dptr;
- data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
- data->dptr = talloc_memdup(ctdb_db, sizeof(struct ctdb_ltdb_header)+rec.dptr,
- data->dsize);
+ if (data) {
+ data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
+ data->dptr = talloc_memdup(ctdb_db, sizeof(struct ctdb_ltdb_header)+rec.dptr,
+ data->dsize);
+ }
+
free(rec.dptr);
- CTDB_NO_MEMORY(ctdb, data->dptr);
+ if (data) {
+ CTDB_NO_MEMORY(ctdb, data->dptr);
+ }
return 0;
}
#ifndef _CTDB_H
#define _CTDB_H
+#define CTDB_IMMEDIATE_MIGRATION 0x00000001
struct ctdb_call {
int call_id;
TDB_DATA key;
TDB_DATA call_data;
TDB_DATA reply_data;
uint32_t status;
+ uint32_t flags;
};
/*
int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
uint32_t srvid, TDB_DATA data);
+
+/*
+ fetch and lock a ctdb record. Underneath this will force the
+ dmaster for the record to be moved to the local node.
+
+ The lock is released when is talloc_free() is called on the
+ returned ctdb_record_handle.
+*/
+struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data);
+
+/*
+ change the data in a record held with a ctdb_record_handle
+ if the new data is zero length, this implies a delete of the record
+ */
+int ctdb_record_store(struct ctdb_record_handle *rec, TDB_DATA data);
+
+
#endif