ctdb->upcalls = &ctdb_upcalls;
ctdb->idr = idr_init(ctdb);
ctdb->max_lacount = CTDB_DEFAULT_MAX_LACOUNT;
+ ctdb->seqnum_frequency = CTDB_DEFAULT_SEQNUM_FREQUENCY;
return ctdb;
}
case CTDB_CONTROL_DEREGISTER_SRVID:
return daemon_deregister_message_handler(ctdb, client_id, srvid);
- case CTDB_CONTROL_ENABLE_SEQNUM: {
- uint32_t db_id;
- struct ctdb_db_context *ctdb_db;
- CHECK_CONTROL_DATA_SIZE(sizeof(db_id));
- ctdb_db = find_ctdb_db(ctdb, db_id);
- if (!ctdb_db) return -1;
- tdb_enable_seqnum(ctdb_db->ltdb->tdb);
- return 0;
- }
+ case CTDB_CONTROL_ENABLE_SEQNUM:
+ CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
+ return ctdb_ltdb_enable_seqnum(ctdb, *(uint32_t *)indata.dptr);
+
+ case CTDB_CONTROL_UPDATE_SEQNUM:
+ CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
+ return ctdb_ltdb_update_seqnum(ctdb, *(uint32_t *)indata.dptr, srcnode);
+
+ case CTDB_CONTROL_SET_SEQNUM_FREQUENCY:
+ CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
+ return ctdb_ltdb_set_seqnum_frequency(ctdb, *(uint32_t *)indata.dptr);
default:
DEBUG(0,(__location__ " Unknown CTDB control opcode %u\n", opcode));
return 0;
}
+/*
+ called when a broadcast seqnum update comes in
+ */
+int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
+{
+ struct ctdb_db_context *ctdb_db;
+ if (srcnode == ctdb->vnn) {
+ /* don't update ourselves! */
+ return 0;
+ }
+
+ ctdb_db = find_ctdb_db(ctdb, db_id);
+ if (!ctdb_db) {
+ DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n"));
+ return -1;
+ }
+
+ tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
+ ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
+ return 0;
+}
+
+/*
+ timer to check for seqnum changes in a ltdb and propogate them
+ */
+static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *p)
+{
+ struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
+ struct ctdb_context *ctdb = ctdb_db->ctdb;
+ uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
+ if (new_seqnum != ctdb_db->seqnum) {
+ /* something has changed - propogate it */
+ TDB_DATA data;
+ data.dptr = (uint8_t *)&ctdb_db->db_id;
+ data.dsize = sizeof(uint32_t);
+ ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNN, 0,
+ CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
+ data, NULL, NULL);
+ }
+ ctdb_db->seqnum = new_seqnum;
+
+ /* setup a new timer */
+ event_add_timed(ctdb->ev, ctdb_db, timeval_current_ofs(ctdb->seqnum_frequency, 0),
+ ctdb_ltdb_seqnum_check, ctdb_db);
+}
+
+/*
+ enable seqnum handling on this db
+ */
+int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
+{
+ struct ctdb_db_context *ctdb_db;
+ ctdb_db = find_ctdb_db(ctdb, db_id);
+ if (!ctdb_db) {
+ DEBUG(0,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n"));
+ return -1;
+ }
+
+ event_add_timed(ctdb->ev, ctdb_db, timeval_current_ofs(ctdb->seqnum_frequency, 0),
+ ctdb_ltdb_seqnum_check, ctdb_db);
+
+ tdb_enable_seqnum(ctdb_db->ltdb->tdb);
+ ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
+ return 0;
+}
+
+/*
+ enable seqnum handling on this db
+ */
+int32_t ctdb_ltdb_set_seqnum_frequency(struct ctdb_context *ctdb, uint32_t frequency)
+{
+ ctdb->seqnum_frequency = frequency;
+ return 0;
+}
#define CTDB_BROADCAST_VNN 0xF0000002
#define CTDB_MAX_REDIRECT_COUNT 3
+#define CTDB_DEFAULT_SEQNUM_FREQUENCY 1
/*
an installed ctdb remote call
struct ctdb_status status;
struct ctdb_vnn_map *vnn_map;
uint32_t num_clients;
+ uint32_t seqnum_frequency;
};
struct ctdb_db_context {
const char *db_path;
struct tdb_wrap *ltdb;
struct ctdb_registered_call *calls; /* list of registered calls */
+ uint32_t seqnum;
};
CTDB_CONTROL_REGISTER_SRVID,
CTDB_CONTROL_DEREGISTER_SRVID,
CTDB_CONTROL_ENABLE_SEQNUM,
+ CTDB_CONTROL_UPDATE_SEQNUM,
+ CTDB_CONTROL_SET_SEQNUM_FREQUENCY,
};
int ctdb_deregister_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data);
int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid);
+int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id);
+int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode);
+int32_t ctdb_ltdb_set_seqnum_frequency(struct ctdb_context *ctdb, uint32_t frequency);
+
#endif
TDB_DATA tdb_null;
/*
- increment the tdb sequence number if the tdb has been opened using
+ non-blocking increment of the tdb sequence number if the tdb has been opened using
the TDB_SEQNUM flag
*/
-static void tdb_increment_seqnum(struct tdb_context *tdb)
+void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
{
tdb_off_t seqnum=0;
return;
}
- if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
- return;
- }
-
/* we ignore errors from this, as we have no sane way of
dealing with them.
*/
tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
seqnum++;
tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
+}
+
+/*
+ increment the tdb sequence number if the tdb has been opened using
+ the TDB_SEQNUM flag
+*/
+static void tdb_increment_seqnum(struct tdb_context *tdb)
+{
+ if (!(tdb->flags & TDB_SEQNUM)) {
+ return;
+ }
+
+ if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
+ return;
+ }
+
+ tdb_increment_seqnum_nonblock(tdb);
tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
}
size_t tdb_map_size(struct tdb_context *tdb);
int tdb_get_flags(struct tdb_context *tdb);
void tdb_enable_seqnum(struct tdb_context *tdb);
+void tdb_increment_seqnum_nonblock(struct tdb_context *tdb);
/* Low level locking functions: use with care */
int tdb_chainlock(struct tdb_context *tdb, TDB_DATA key);