memcpy(&r->data[0], data.dptr, data.dsize);
if (r->hdr.destnode == r->hdr.srcnode) {
- ctdb_reply_dmaster(ctdb, &r->hdr);
+ /* inject the packet back into the input queue */
+ talloc_steal(ctdb, r);
+ ctdb_recv_pkt(ctdb, (uint8_t *)&r->hdr, r->hdr.length);
} else {
ctdb_queue_packet(ctdb, &r->hdr);
}
struct ctdb_call_state *state;
struct ctdb_db_context *ctdb_db;
TDB_DATA data;
+ int ret;
state = idr_find_type(ctdb->idr, hdr->reqid, struct ctdb_call_state);
if (state == NULL) {
ctdb_db = state->ctdb_db;
+ ret = ctdb_ltdb_lock_requeue(ctdb_db, state->call.key, hdr,
+ ctdb_recv_raw_pkt, ctdb);
+ if (ret == -2) {
+ return;
+ }
+ if (ret != 0) {
+ DEBUG(0,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
+ return;
+ }
+
data.dptr = c->data;
data.dsize = c->datalen;
state->header.dmaster = ctdb->vnn;
if (ctdb_ltdb_store(ctdb_db, state->call.key, &state->header, data) != 0) {
+ ctdb_ltdb_unlock(ctdb_db, state->call.key);
ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
return;
}
ctdb_call_local(ctdb_db, &state->call, &state->header, &data, ctdb->vnn);
+ ctdb_ltdb_unlock(ctdb_db, state->call.key);
+
talloc_steal(state, state->call.reply_data.dptr);
state->state = CTDB_CALL_DONE;
DEBUG(2,(__location__ " PACKET REQUEUED\n"));
}
+
/*
- do a non-blocking ltdb_fetch with a locked record, deferring this
- ctdb request until we have the chainlock
+ do a non-blocking ltdb_lock, deferring this ctdb request until we
+ have the chainlock
It does the following:
- 1) tries to get the chainlock. If it succeeds, then it fetches the record, and
- returns 0
+ 1) tries to get the chainlock. If it succeeds, then it returns 0
2) if it fails to get a chainlock immediately then it sets up a
non-blocking chainlock via ctdb_lockwait, and when it gets the
-1: means that it failed to get the lock, and won't retry
-2: means that it failed to get the lock immediately, but will retry
*/
-int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
- TDB_DATA key, struct ctdb_ltdb_header *header,
- struct ctdb_req_header *hdr, TDB_DATA *data,
- void (*recv_pkt)(void *, uint8_t *, uint32_t ),
- void *recv_context)
+int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
+ TDB_DATA key, struct ctdb_req_header *hdr,
+ void (*recv_pkt)(void *, uint8_t *, uint32_t ),
+ void *recv_context)
{
int ret;
struct tdb_context *tdb = ctdb_db->ltdb->tdb;
/* first the non-contended path */
if (ret == 0) {
- ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
- if (ret != 0) {
- tdb_chainunlock(tdb, key);
- }
- return ret;
+ return 0;
}
state = talloc(ctdb_db, struct lock_fetch_state);
/* now tell the caller than we will retry asynchronously */
return -2;
}
+
+/*
+ a varient of ctdb_ltdb_lock_requeue that also fetches the record
+ */
+int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
+ TDB_DATA key, struct ctdb_ltdb_header *header,
+ struct ctdb_req_header *hdr, TDB_DATA *data,
+ void (*recv_pkt)(void *, uint8_t *, uint32_t ),
+ void *recv_context)
+{
+ int ret;
+
+ ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, recv_context);
+ if (ret == 0) {
+ ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
+ if (ret != 0) {
+ ctdb_ltdb_unlock(ctdb_db, key);
+ }
+ }
+ return ret;
+}
int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
struct ctdb_ltdb_header *header, TDB_DATA data);
void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
+int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
+ TDB_DATA key, struct ctdb_req_header *hdr,
+ void (*recv_pkt)(void *, uint8_t *, uint32_t ),
+ void *recv_context);
int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
TDB_DATA key, struct ctdb_ltdb_header *header,
struct ctdb_req_header *hdr, TDB_DATA *data,