return true;
}
+static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
+
+static uint64_t srvid_next(void)
+{
+ rec_srvid += 1;
+ return rec_srvid;
+}
+
/*
* Recovery database functions
*/
return state.num_buffers;
}
+/*
+ * Pull database from a single node
+ */
+
+struct pull_database_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ struct recdb_context *recdb;
+ uint32_t pnn;
+ uint64_t srvid;
+ int num_records;
+};
+
+static void pull_database_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data);
+static void pull_database_register_done(struct tevent_req *subreq);
+static void pull_database_old_done(struct tevent_req *subreq);
+static void pull_database_unregister_done(struct tevent_req *subreq);
+static void pull_database_new_done(struct tevent_req *subreq);
+
+static struct tevent_req *pull_database_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ uint32_t pnn, uint32_t caps,
+ struct recdb_context *recdb)
+{
+ struct tevent_req *req, *subreq;
+ struct pull_database_state *state;
+ struct ctdb_req_control request;
+
+ req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->recdb = recdb;
+ state->pnn = pnn;
+ state->srvid = srvid_next();
+
+ if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
+ subreq = ctdb_client_set_message_handler_send(
+ state, state->ev, state->client,
+ state->srvid, pull_database_handler,
+ req);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ tevent_req_set_callback(subreq, pull_database_register_done,
+ req);
+
+ } else {
+ struct ctdb_pulldb pulldb;
+
+ pulldb.db_id = recdb_id(recdb);
+ pulldb.lmaster = CTDB_LMASTER_ANY;
+
+ ctdb_req_control_pull_db(&request, &pulldb);
+ subreq = ctdb_client_control_send(state, state->ev,
+ state->client,
+ pnn, TIMEOUT(),
+ &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, pull_database_old_done, req);
+ }
+
+ return req;
+}
+
+static void pull_database_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data)
+{
+ struct tevent_req *req = talloc_get_type_abort(
+ private_data, struct tevent_req);
+ struct pull_database_state *state = tevent_req_data(
+ req, struct pull_database_state);
+ struct ctdb_rec_buffer *recbuf;
+ int ret;
+ bool status;
+
+ if (srvid != state->srvid) {
+ return;
+ }
+
+ ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf);
+ if (ret != 0) {
+ LOG("Invalid data received for DB_PULL messages\n");
+ return;
+ }
+
+ if (recbuf->db_id != recdb_id(state->recdb)) {
+ talloc_free(recbuf);
+ LOG("Invalid dbid:%08x for DB_PULL messages for %s\n",
+ recbuf->db_id, recdb_name(state->recdb));
+ return;
+ }
+
+ status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
+ recbuf);
+ if (! status) {
+ talloc_free(recbuf);
+ LOG("Failed to add records to recdb for %s\n",
+ recdb_name(state->recdb));
+ return;
+ }
+
+ state->num_records += recbuf->count;
+ talloc_free(recbuf);
+}
+
+static void pull_database_register_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct pull_database_state *state = tevent_req_data(
+ req, struct pull_database_state);
+ struct ctdb_req_control request;
+ struct ctdb_pulldb_ext pulldb_ext;
+ int ret;
+ bool status;
+
+ status = ctdb_client_set_message_handler_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ LOG("failed to set message handler for DB_PULL for %s\n",
+ recdb_name(state->recdb));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ pulldb_ext.db_id = recdb_id(state->recdb);
+ pulldb_ext.lmaster = CTDB_LMASTER_ANY;
+ pulldb_ext.srvid = state->srvid;
+
+ ctdb_req_control_db_pull(&request, &pulldb_ext);
+ subreq = ctdb_client_control_send(state, state->ev, state->client,
+ state->pnn, TIMEOUT(), &request);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, pull_database_new_done, req);
+}
+
+static void pull_database_old_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct pull_database_state *state = tevent_req_data(
+ req, struct pull_database_state);
+ struct ctdb_reply_control *reply;
+ struct ctdb_rec_buffer *recbuf;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ LOG("control PULL_DB failed for %s on node %u, ret=%d\n",
+ recdb_name(state->recdb), state->pnn, ret);
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
+ talloc_free(reply);
+ if (ret != 0) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
+ recbuf);
+ if (! status) {
+ talloc_free(recbuf);
+ tevent_req_error(req, EIO);
+ return;
+ }
+
+ state->num_records = recbuf->count;
+ talloc_free(recbuf);
+
+ LOG("Pulled %d records for db %s from node %d\n",
+ state->num_records, recdb_name(state->recdb), state->pnn);
+
+ tevent_req_done(req);
+}
+
+static void pull_database_new_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct pull_database_state *state = tevent_req_data(
+ req, struct pull_database_state);
+ struct ctdb_reply_control *reply;
+ uint32_t num_records;
+ int ret;
+ bool status;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ LOG("control DB_PULL failed for %s on node %u, ret=%d\n",
+ recdb_name(state->recdb), state->pnn, ret);
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ ret = ctdb_reply_control_db_pull(reply, &num_records);
+ talloc_free(reply);
+ if (num_records != state->num_records) {
+ LOG("mismatch (%u != %u) in DB_PULL records for %s\n",
+ num_records, state->num_records, recdb_name(state->recdb));
+ tevent_req_error(req, EIO);
+ return;
+ }
+
+ LOG("Pulled %d records for db %s from node %d\n",
+ state->num_records, recdb_name(state->recdb), state->pnn);
+
+ subreq = ctdb_client_remove_message_handler_send(
+ state, state->ev, state->client,
+ state->srvid, req);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, pull_database_unregister_done, req);
+}
+
+static void pull_database_unregister_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct pull_database_state *state = tevent_req_data(
+ req, struct pull_database_state);
+ int ret;
+ bool status;
+
+ status = ctdb_client_remove_message_handler_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ LOG("failed to remove message handler for DB_PULL for %s\n",
+ recdb_name(state->recdb));
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ tevent_req_done(req);
+}
+
+static bool pull_database_recv(struct tevent_req *req, int *perr)
+{
+ return generic_recv(req, perr);
+}
+
/*
* Collect databases using highest sequence number
*/
struct collect_highseqnum_db_state *state = tevent_req_data(
req, struct collect_highseqnum_db_state);
struct ctdb_reply_control **reply;
- struct ctdb_req_control request;
- struct ctdb_pulldb pulldb;
int *err_list;
bool status;
int ret, i;
LOG("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
recdb_name(state->recdb), state->max_pnn, max_seqnum);
- pulldb.db_id = state->db_id;
- pulldb.lmaster = CTDB_LMASTER_ANY;
-
- ctdb_req_control_pull_db(&request, &pulldb);
- subreq = ctdb_client_control_send(state, state->ev, state->client,
- state->max_pnn, TIMEOUT(), &request);
+ subreq = pull_database_send(state, state->ev, state->client,
+ state->max_pnn,
+ state->caps[state->max_pnn],
+ state->recdb);
if (tevent_req_nomem(subreq, req)) {
return;
}
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
- struct collect_highseqnum_db_state *state = tevent_req_data(
- req, struct collect_highseqnum_db_state);
- struct ctdb_reply_control *reply;
- struct ctdb_rec_buffer *recbuf;
int ret;
bool status;
- status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ status = pull_database_recv(subreq, &ret);
TALLOC_FREE(subreq);
if (! status) {
- LOG("control PULL_DB failed for %s on node %u, ret=%d\n",
- recdb_name(state->recdb), state->max_pnn, ret);
tevent_req_error(req, ret);
return;
}
- ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
- if (ret != 0) {
- tevent_req_error(req, EPROTO);
- return;
- }
-
- talloc_free(reply);
-
- ret = recdb_add(state->recdb, ctdb_client_pnn(state->client), recbuf);
- talloc_free(recbuf);
- if (! ret) {
- tevent_req_error(req, EIO);
- return;
- }
-
tevent_req_done(req);
}
{
struct tevent_req *req, *subreq;
struct collect_all_db_state *state;
- struct ctdb_req_control request;
+ uint32_t pnn;
req = tevent_req_create(mem_ctx, &state,
struct collect_all_db_state);
state->client = client;
state->pnn_list = pnn_list;
state->count = count;
+ state->caps = caps;
state->db_id = db_id;
state->recdb = recdb;
-
- state->pulldb.db_id = db_id;
- state->pulldb.lmaster = CTDB_LMASTER_ANY;
-
state->index = 0;
- ctdb_req_control_pull_db(&request, &state->pulldb);
- subreq = ctdb_client_control_send(state, ev, client,
- state->pnn_list[state->index],
- TIMEOUT(), &request);
+ pnn = state->pnn_list[state->index];
+
+ subreq = pull_database_send(state, ev, client, pnn, caps[pnn], recdb);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
subreq, struct tevent_req);
struct collect_all_db_state *state = tevent_req_data(
req, struct collect_all_db_state);
- struct ctdb_reply_control *reply;
- struct ctdb_req_control request;
- struct ctdb_rec_buffer *recbuf;
+ uint32_t pnn;
int ret;
bool status;
- status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ status = pull_database_recv(subreq, &ret);
TALLOC_FREE(subreq);
if (! status) {
- LOG("control PULL_DB failed for %s from node %u, ret=%d\n",
- recdb_name(state->recdb), state->pnn_list[state->index],
- ret);
tevent_req_error(req, ret);
return;
}
- ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
- if (ret != 0) {
- LOG("control PULL_DB failed for %s, ret=%d\n",
- recdb_name(state->recdb), ret);
- tevent_req_error(req, EPROTO);
- return;
- }
-
- talloc_free(reply);
-
- status = recdb_add(state->recdb, ctdb_client_pnn(state->client), recbuf);
- talloc_free(recbuf);
- if (! status) {
- tevent_req_error(req, EIO);
- return;
- }
-
state->index += 1;
if (state->index == state->count) {
tevent_req_done(req);
return;
}
- ctdb_req_control_pull_db(&request, &state->pulldb);
- subreq = ctdb_client_control_send(state, state->ev, state->client,
- state->pnn_list[state->index],
- TIMEOUT(), &request);
+ pnn = state->pnn_list[state->index];
+ subreq = pull_database_send(state, state->ev, state->client,
+ pnn, state->caps[pnn], state->recdb);
if (tevent_req_nomem(subreq, req)) {
return;
}