From: Amitay Isaacs Date: Wed, 24 Feb 2016 07:10:49 +0000 (+1100) Subject: ctdb-recovery-helper: Introduce pull database abstraction X-Git-Tag: tdb-1.3.9~103 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=b96a4759b397d873d56ccdd0c0b26e770cc10b89;p=thirdparty%2Fsamba.git ctdb-recovery-helper: Introduce pull database abstraction This abstraction depending on the capability of the remote node either uses older PULL_DB control or newer DB_PULL control. Signed-off-by: Amitay Isaacs Reviewed-by: Martin Schwenke --- diff --git a/ctdb/server/ctdb_recovery_helper.c b/ctdb/server/ctdb_recovery_helper.c index 582f3aa1542..0558bc76e70 100644 --- a/ctdb/server/ctdb_recovery_helper.c +++ b/ctdb/server/ctdb_recovery_helper.c @@ -79,6 +79,14 @@ static bool generic_recv(struct tevent_req *req, int *perr) return true; } +static uint64_t rec_srvid = CTDB_SRVID_RECOVERY; + +static uint64_t srvid_next(void) +{ + rec_srvid += 1; + return rec_srvid; +} + /* * Recovery database functions */ @@ -421,6 +429,265 @@ static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx, return state.num_buffers; } +/* + * Pull database from a single node + */ + +struct pull_database_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct recdb_context *recdb; + uint32_t pnn; + uint64_t srvid; + int num_records; +}; + +static void pull_database_handler(uint64_t srvid, TDB_DATA data, + void *private_data); +static void pull_database_register_done(struct tevent_req *subreq); +static void pull_database_old_done(struct tevent_req *subreq); +static void pull_database_unregister_done(struct tevent_req *subreq); +static void pull_database_new_done(struct tevent_req *subreq); + +static struct tevent_req *pull_database_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t pnn, uint32_t caps, + struct recdb_context *recdb) +{ + struct tevent_req *req, *subreq; + struct pull_database_state *state; + struct ctdb_req_control request; + + req = tevent_req_create(mem_ctx, &state, struct pull_database_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->recdb = recdb; + state->pnn = pnn; + state->srvid = srvid_next(); + + if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) { + subreq = ctdb_client_set_message_handler_send( + state, state->ev, state->client, + state->srvid, pull_database_handler, + req); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + + tevent_req_set_callback(subreq, pull_database_register_done, + req); + + } else { + struct ctdb_pulldb pulldb; + + pulldb.db_id = recdb_id(recdb); + pulldb.lmaster = CTDB_LMASTER_ANY; + + ctdb_req_control_pull_db(&request, &pulldb); + subreq = ctdb_client_control_send(state, state->ev, + state->client, + pnn, TIMEOUT(), + &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, pull_database_old_done, req); + } + + return req; +} + +static void pull_database_handler(uint64_t srvid, TDB_DATA data, + void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort( + private_data, struct tevent_req); + struct pull_database_state *state = tevent_req_data( + req, struct pull_database_state); + struct ctdb_rec_buffer *recbuf; + int ret; + bool status; + + if (srvid != state->srvid) { + return; + } + + ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf); + if (ret != 0) { + LOG("Invalid data received for DB_PULL messages\n"); + return; + } + + if (recbuf->db_id != recdb_id(state->recdb)) { + talloc_free(recbuf); + LOG("Invalid dbid:%08x for DB_PULL messages for %s\n", + recbuf->db_id, recdb_name(state->recdb)); + return; + } + + status = recdb_add(state->recdb, ctdb_client_pnn(state->client), + recbuf); + if (! status) { + talloc_free(recbuf); + LOG("Failed to add records to recdb for %s\n", + recdb_name(state->recdb)); + return; + } + + state->num_records += recbuf->count; + talloc_free(recbuf); +} + +static void pull_database_register_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct pull_database_state *state = tevent_req_data( + req, struct pull_database_state); + struct ctdb_req_control request; + struct ctdb_pulldb_ext pulldb_ext; + int ret; + bool status; + + status = ctdb_client_set_message_handler_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + LOG("failed to set message handler for DB_PULL for %s\n", + recdb_name(state->recdb)); + tevent_req_error(req, ret); + return; + } + + pulldb_ext.db_id = recdb_id(state->recdb); + pulldb_ext.lmaster = CTDB_LMASTER_ANY; + pulldb_ext.srvid = state->srvid; + + ctdb_req_control_db_pull(&request, &pulldb_ext); + subreq = ctdb_client_control_send(state, state->ev, state->client, + state->pnn, TIMEOUT(), &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, pull_database_new_done, req); +} + +static void pull_database_old_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct pull_database_state *state = tevent_req_data( + req, struct pull_database_state); + struct ctdb_reply_control *reply; + struct ctdb_rec_buffer *recbuf; + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + LOG("control PULL_DB failed for %s on node %u, ret=%d\n", + recdb_name(state->recdb), state->pnn, ret); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_pull_db(reply, state, &recbuf); + talloc_free(reply); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + status = recdb_add(state->recdb, ctdb_client_pnn(state->client), + recbuf); + if (! status) { + talloc_free(recbuf); + tevent_req_error(req, EIO); + return; + } + + state->num_records = recbuf->count; + talloc_free(recbuf); + + LOG("Pulled %d records for db %s from node %d\n", + state->num_records, recdb_name(state->recdb), state->pnn); + + tevent_req_done(req); +} + +static void pull_database_new_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct pull_database_state *state = tevent_req_data( + req, struct pull_database_state); + struct ctdb_reply_control *reply; + uint32_t num_records; + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + LOG("control DB_PULL failed for %s on node %u, ret=%d\n", + recdb_name(state->recdb), state->pnn, ret); + tevent_req_error(req, ret); + return; + } + + ret = ctdb_reply_control_db_pull(reply, &num_records); + talloc_free(reply); + if (num_records != state->num_records) { + LOG("mismatch (%u != %u) in DB_PULL records for %s\n", + num_records, state->num_records, recdb_name(state->recdb)); + tevent_req_error(req, EIO); + return; + } + + LOG("Pulled %d records for db %s from node %d\n", + state->num_records, recdb_name(state->recdb), state->pnn); + + subreq = ctdb_client_remove_message_handler_send( + state, state->ev, state->client, + state->srvid, req); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, pull_database_unregister_done, req); +} + +static void pull_database_unregister_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct pull_database_state *state = tevent_req_data( + req, struct pull_database_state); + int ret; + bool status; + + status = ctdb_client_remove_message_handler_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + LOG("failed to remove message handler for DB_PULL for %s\n", + recdb_name(state->recdb)); + tevent_req_error(req, ret); + return; + } + + tevent_req_done(req); +} + +static bool pull_database_recv(struct tevent_req *req, int *perr) +{ + return generic_recv(req, perr); +} + /* * Collect databases using highest sequence number */ @@ -484,8 +751,6 @@ static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq) struct collect_highseqnum_db_state *state = tevent_req_data( req, struct collect_highseqnum_db_state); struct ctdb_reply_control **reply; - struct ctdb_req_control request; - struct ctdb_pulldb pulldb; int *err_list; bool status; int ret, i; @@ -532,12 +797,10 @@ static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq) LOG("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n", recdb_name(state->recdb), state->max_pnn, max_seqnum); - pulldb.db_id = state->db_id; - pulldb.lmaster = CTDB_LMASTER_ANY; - - ctdb_req_control_pull_db(&request, &pulldb); - subreq = ctdb_client_control_send(state, state->ev, state->client, - state->max_pnn, TIMEOUT(), &request); + subreq = pull_database_send(state, state->ev, state->client, + state->max_pnn, + state->caps[state->max_pnn], + state->recdb); if (tevent_req_nomem(subreq, req)) { return; } @@ -549,37 +812,16 @@ static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); - struct collect_highseqnum_db_state *state = tevent_req_data( - req, struct collect_highseqnum_db_state); - struct ctdb_reply_control *reply; - struct ctdb_rec_buffer *recbuf; int ret; bool status; - status = ctdb_client_control_recv(subreq, &ret, state, &reply); + status = pull_database_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { - LOG("control PULL_DB failed for %s on node %u, ret=%d\n", - recdb_name(state->recdb), state->max_pnn, ret); tevent_req_error(req, ret); return; } - ret = ctdb_reply_control_pull_db(reply, state, &recbuf); - if (ret != 0) { - tevent_req_error(req, EPROTO); - return; - } - - talloc_free(reply); - - ret = recdb_add(state->recdb, ctdb_client_pnn(state->client), recbuf); - talloc_free(recbuf); - if (! ret) { - tevent_req_error(req, EIO); - return; - } - tevent_req_done(req); } @@ -615,7 +857,7 @@ static struct tevent_req *collect_all_db_send( { struct tevent_req *req, *subreq; struct collect_all_db_state *state; - struct ctdb_req_control request; + uint32_t pnn; req = tevent_req_create(mem_ctx, &state, struct collect_all_db_state); @@ -627,18 +869,14 @@ static struct tevent_req *collect_all_db_send( state->client = client; state->pnn_list = pnn_list; state->count = count; + state->caps = caps; state->db_id = db_id; state->recdb = recdb; - - state->pulldb.db_id = db_id; - state->pulldb.lmaster = CTDB_LMASTER_ANY; - state->index = 0; - ctdb_req_control_pull_db(&request, &state->pulldb); - subreq = ctdb_client_control_send(state, ev, client, - state->pnn_list[state->index], - TIMEOUT(), &request); + pnn = state->pnn_list[state->index]; + + subreq = pull_database_send(state, ev, client, pnn, caps[pnn], recdb); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } @@ -653,49 +891,26 @@ static void collect_all_db_pulldb_done(struct tevent_req *subreq) subreq, struct tevent_req); struct collect_all_db_state *state = tevent_req_data( req, struct collect_all_db_state); - struct ctdb_reply_control *reply; - struct ctdb_req_control request; - struct ctdb_rec_buffer *recbuf; + uint32_t pnn; int ret; bool status; - status = ctdb_client_control_recv(subreq, &ret, state, &reply); + status = pull_database_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { - LOG("control PULL_DB failed for %s from node %u, ret=%d\n", - recdb_name(state->recdb), state->pnn_list[state->index], - ret); tevent_req_error(req, ret); return; } - ret = ctdb_reply_control_pull_db(reply, state, &recbuf); - if (ret != 0) { - LOG("control PULL_DB failed for %s, ret=%d\n", - recdb_name(state->recdb), ret); - tevent_req_error(req, EPROTO); - return; - } - - talloc_free(reply); - - status = recdb_add(state->recdb, ctdb_client_pnn(state->client), recbuf); - talloc_free(recbuf); - if (! status) { - tevent_req_error(req, EIO); - return; - } - state->index += 1; if (state->index == state->count) { tevent_req_done(req); return; } - ctdb_req_control_pull_db(&request, &state->pulldb); - subreq = ctdb_client_control_send(state, state->ev, state->client, - state->pnn_list[state->index], - TIMEOUT(), &request); + pnn = state->pnn_list[state->index]; + subreq = pull_database_send(state, state->ev, state->client, + pnn, state->caps[pnn], state->recdb); if (tevent_req_nomem(subreq, req)) { return; }