EVENTS_OBJ = lib/events/events.o lib/events/events_standard.o
CTDB_COMMON_OBJ = common/ctdb.o common/ctdb_daemon.o common/ctdb_client.o common/ctdb_io.o common/util.o common/ctdb_util.o \
- common/ctdb_call.o common/ctdb_ltdb.o common/ctdb_message.o \
+ common/ctdb_call.o common/ctdb_ltdb.o common/ctdb_lockwait.o common/ctdb_message.o \
lib/util/idtree.o lib/util/db_wrap.o
CTDB_TCP_OBJ = tcp/tcp_connect.o tcp/tcp_io.o tcp/tcp_init.o
/*
called by the transport layer when a packet comes in
*/
-static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
+void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
{
struct ctdb_req_header *hdr;
return tdb_chainunlock(ctdb_db->ltdb->tdb, key);
}
+/*
+ called when we should retry the operation
+ */
+static void lock_fetch_callback(void *p)
+{
+ struct ctdb_req_header *hdr = p;
+ struct ctdb_context *ctdb = talloc_find_parent_bytype(p, struct ctdb_context);
+ ctdb_recv_pkt(ctdb, (uint8_t *)hdr, hdr->length);
+ printf("PACKET REQUEUED\n");
+}
+
+/*
+ do a non-blocking ltdb_fetch with a locked record, deferring this
+ ctdb request until we have the chainlock
+ */
+int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
+ TDB_DATA key, struct ctdb_ltdb_header *header,
+ struct ctdb_req_header *hdr, TDB_DATA *data)
+{
+ int ret;
+ struct tdb_context *tdb = ctdb_db->ltdb->tdb;
+ struct lockwait_handle *h;
+
+ ret = tdb_chainlock_nonblock(tdb, key);
+
+ /* first the non-contended path */
+ if (ret == 0) {
+ ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
+ if (ret != 0) {
+ tdb_chainunlock(tdb, key);
+ }
+ return ret;
+ }
+
+ /* now the contended path */
+ h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, hdr);
+ if (h == NULL) {
+ tdb_chainunlock(tdb, key);
+ return -1;
+ }
+
+ /* we get an extra reference to the packet here, to
+ stop it being freed in the top level packet handler */
+ (void)talloc_reference(ctdb_db, hdr);
+ return 0;
+}
int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
struct ctdb_ltdb_header *header, TDB_DATA data);
void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
+int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
+ TDB_DATA key, struct ctdb_ltdb_header *header,
+ struct ctdb_req_header *hdr, TDB_DATA *data);
+void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length);
struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
struct ctdb_call *call,
*/
int ctdb_client_store_unlock(struct ctdb_record_handle *rec, TDB_DATA data);
+struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db,
+ TDB_DATA key,
+ void (*callback)(void *), void *private_data);
+
#endif