From ffea827bae2a8054ad488ae82eedb021cdfb71c4 Mon Sep 17 00:00:00 2001 From: Amitay Isaacs Date: Thu, 25 Feb 2016 18:07:11 +1100 Subject: [PATCH] ctdb-recovery-helper: Introduce push database abstraction This abstraction uses capabilities of the remote nodes to either send older PUSH_DB controls or newer DB_PUSH_START and DB_PUSH_CONFIRM controls. Signed-off-by: Amitay Isaacs Reviewed-by: Martin Schwenke --- ctdb/server/ctdb_recovery_helper.c | 556 +++++++++++++++++++++++++++-- 1 file changed, 524 insertions(+), 32 deletions(-) diff --git a/ctdb/server/ctdb_recovery_helper.c b/ctdb/server/ctdb_recovery_helper.c index 0558bc76e70..4be992a0e5c 100644 --- a/ctdb/server/ctdb_recovery_helper.c +++ b/ctdb/server/ctdb_recovery_helper.c @@ -426,6 +426,9 @@ static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx, } state.num_buffers += 1; + LOG("Wrote %d buffers of recovery records for %s\n", + state.num_buffers, recdb_name(recdb)); + return state.num_buffers; } @@ -688,6 +691,521 @@ static bool pull_database_recv(struct tevent_req *req, int *perr) return generic_recv(req, perr); } +/* + * Push database to specified nodes (old style) + */ + +struct push_database_old_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct recdb_context *recdb; + uint32_t *pnn_list; + int count; + struct ctdb_rec_buffer *recbuf; + int index; +}; + +static void push_database_old_push_done(struct tevent_req *subreq); + +static struct tevent_req *push_database_old_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t *pnn_list, int count, + struct recdb_context *recdb) +{ + struct tevent_req *req, *subreq; + struct push_database_old_state *state; + struct ctdb_req_control request; + uint32_t pnn; + + req = tevent_req_create(mem_ctx, &state, + struct push_database_old_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->recdb = recdb; + state->pnn_list = pnn_list; + state->count = count; + state->index = 0; + + state->recbuf = recdb_records(recdb, state, + ctdb_client_pnn(client)); + if (tevent_req_nomem(state->recbuf, req)) { + return tevent_req_post(req, ev); + } + + pnn = state->pnn_list[state->index]; + + ctdb_req_control_push_db(&request, state->recbuf); + subreq = ctdb_client_control_send(state, ev, client, pnn, + TIMEOUT(), &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, push_database_old_push_done, req); + + return req; +} + +static void push_database_old_push_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct push_database_old_state *state = tevent_req_data( + req, struct push_database_old_state); + struct ctdb_req_control request; + uint32_t pnn; + int ret; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, NULL, NULL); + TALLOC_FREE(subreq); + if (! status) { + LOG("control PUSH_DB failed for db %s on node %u, ret=%d\n", + recdb_name(state->recdb), state->pnn_list[state->index], + ret); + tevent_req_error(req, ret); + return; + } + + state->index += 1; + if (state->index == state->count) { + TALLOC_FREE(state->recbuf); + tevent_req_done(req); + return; + } + + pnn = state->pnn_list[state->index]; + + ctdb_req_control_push_db(&request, state->recbuf); + subreq = ctdb_client_control_send(state, state->ev, state->client, + pnn, TIMEOUT(), &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, push_database_old_push_done, req); +} + +static bool push_database_old_recv(struct tevent_req *req, int *perr) +{ + return generic_recv(req, perr); +} + +/* + * Push database to specified nodes (new style) + */ + +struct push_database_new_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct recdb_context *recdb; + uint32_t *pnn_list; + int count; + uint64_t srvid; + uint32_t dmaster; + int fd; + int num_buffers; + int num_buffers_sent; + int num_records; +}; + +static void push_database_new_started(struct tevent_req *subreq); +static void push_database_new_send_msg(struct tevent_req *req); +static void push_database_new_send_done(struct tevent_req *subreq); +static void push_database_new_confirmed(struct tevent_req *subreq); + +static struct tevent_req *push_database_new_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t *pnn_list, int count, + struct recdb_context *recdb, + int max_size) +{ + struct tevent_req *req, *subreq; + struct push_database_new_state *state; + struct ctdb_req_control request; + struct ctdb_pulldb_ext pulldb_ext; + char *filename; + off_t offset; + + req = tevent_req_create(mem_ctx, &state, + struct push_database_new_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->recdb = recdb; + state->pnn_list = pnn_list; + state->count = count; + + state->srvid = srvid_next(); + state->dmaster = ctdb_client_pnn(client); + state->num_buffers_sent = 0; + state->num_records = 0; + + filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb)); + if (tevent_req_nomem(filename, req)) { + return tevent_req_post(req, ev); + } + + state->fd = open(filename, O_RDWR|O_CREAT, 0644); + if (state->fd == -1) { + tevent_req_error(req, errno); + return tevent_req_post(req, ev); + } + unlink(filename); + talloc_free(filename); + + state->num_buffers = recdb_file(recdb, state, state->dmaster, + state->fd, max_size); + if (state->num_buffers == -1) { + tevent_req_error(req, ENOMEM); + return tevent_req_post(req, ev); + } + + offset = lseek(state->fd, 0, SEEK_SET); + if (offset != 0) { + tevent_req_error(req, EIO); + return tevent_req_post(req, ev); + } + + pulldb_ext.db_id = recdb_id(recdb); + pulldb_ext.srvid = state->srvid; + + ctdb_req_control_db_push_start(&request, &pulldb_ext); + subreq = ctdb_client_control_multi_send(state, ev, client, + pnn_list, count, + TIMEOUT(), &request); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, push_database_new_started, req); + + return req; +} + +static void push_database_new_started(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct push_database_new_state *state = tevent_req_data( + req, struct push_database_new_state); + int *err_list; + int ret; + bool status; + + status = ctdb_client_control_multi_recv(subreq, &ret, state, + &err_list, NULL); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->pnn_list, + state->count, + err_list, &pnn); + if (ret2 != 0) { + LOG("control DB_PUSH_START failed for db %s " + "on node %u, ret=%d\n", + recdb_name(state->recdb), pnn, ret2); + } else { + LOG("control DB_PUSH_START failed for db %s, ret=%d\n", + recdb_name(state->recdb), ret); + } + talloc_free(err_list); + + tevent_req_error(req, ret); + return; + } + + push_database_new_send_msg(req); +} + +static void push_database_new_send_msg(struct tevent_req *req) +{ + struct push_database_new_state *state = tevent_req_data( + req, struct push_database_new_state); + struct tevent_req *subreq; + struct ctdb_rec_buffer *recbuf; + struct ctdb_req_message message; + TDB_DATA data; + int ret; + + if (state->num_buffers_sent == state->num_buffers) { + struct ctdb_req_control request; + + ctdb_req_control_db_push_confirm(&request, + recdb_id(state->recdb)); + subreq = ctdb_client_control_multi_send(state, state->ev, + state->client, + state->pnn_list, + state->count, + TIMEOUT(), &request); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, push_database_new_confirmed, + req); + return; + } + + ret = ctdb_rec_buffer_read(state->fd, state, &recbuf); + if (ret != 0) { + tevent_req_error(req, ret); + return; + } + + data.dsize = ctdb_rec_buffer_len(recbuf); + data.dptr = talloc_size(state, data.dsize); + if (tevent_req_nomem(data.dptr, req)) { + return; + } + + ctdb_rec_buffer_push(recbuf, data.dptr); + + message.srvid = state->srvid; + message.data.data = data; + + LOG("Pushing buffer %d with %d records for %s\n", + state->num_buffers_sent, recbuf->count, recdb_name(state->recdb)); + + subreq = ctdb_client_message_multi_send(state, state->ev, + state->client, + state->pnn_list, state->count, + &message); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, push_database_new_send_done, req); + + state->num_records += recbuf->count; + + talloc_free(data.dptr); + talloc_free(recbuf); +} + +static void push_database_new_send_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct push_database_new_state *state = tevent_req_data( + req, struct push_database_new_state); + bool status; + int ret; + + status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL); + TALLOC_FREE(subreq); + if (! status) { + LOG("Sending recovery records failed for %s\n", + recdb_name(state->recdb)); + tevent_req_error(req, ret); + return; + } + + state->num_buffers_sent += 1; + + push_database_new_send_msg(req); +} + +static void push_database_new_confirmed(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct push_database_new_state *state = tevent_req_data( + req, struct push_database_new_state); + struct ctdb_reply_control **reply; + int *err_list; + bool status; + int ret, i; + uint32_t num_records; + + status = ctdb_client_control_multi_recv(subreq, &ret, state, + &err_list, &reply); + TALLOC_FREE(subreq); + if (! status) { + int ret2; + uint32_t pnn; + + ret2 = ctdb_client_control_multi_error(state->pnn_list, + state->count, err_list, + &pnn); + if (ret2 != 0) { + LOG("control DB_PUSH_CONFIRM failed for %s on node %u," + " ret=%d\n", recdb_name(state->recdb), pnn, ret2); + } else { + LOG("control DB_PUSH_CONFIRM failed for %s, ret=%d\n", + recdb_name(state->recdb), ret); + } + tevent_req_error(req, ret); + return; + } + + for (i=0; icount; i++) { + ret = ctdb_reply_control_db_push_confirm(reply[i], + &num_records); + if (ret != 0) { + tevent_req_error(req, EPROTO); + return; + } + + if (num_records != state->num_records) { + LOG("Node %u received %d of %d records for %s\n", + state->pnn_list[i], num_records, + state->num_records, recdb_name(state->recdb)); + tevent_req_error(req, EPROTO); + return; + } + } + + talloc_free(reply); + + LOG("Pushed %d records for db %s\n", + state->num_records, recdb_name(state->recdb)); + + tevent_req_done(req); +} + +static bool push_database_new_recv(struct tevent_req *req, int *perr) +{ + return generic_recv(req, perr); +} + +/* + * wrapper for push_database_old and push_database_new + */ + +struct push_database_state { + bool old_done, new_done; +}; + +static void push_database_old_done(struct tevent_req *subreq); +static void push_database_new_done(struct tevent_req *subreq); + +static struct tevent_req *push_database_send( + TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + uint32_t *pnn_list, int count, uint32_t *caps, + struct ctdb_tunable_list *tun_list, + struct recdb_context *recdb) +{ + struct tevent_req *req, *subreq; + struct push_database_state *state; + uint32_t *old_list, *new_list; + int old_count, new_count; + int i; + + req = tevent_req_create(mem_ctx, &state, struct push_database_state); + if (req == NULL) { + return NULL; + } + + state->old_done = false; + state->new_done = false; + + old_count = 0; + new_count = 0; + old_list = talloc_array(state, uint32_t, count); + new_list = talloc_array(state, uint32_t, count); + if (tevent_req_nomem(old_list, req) || + tevent_req_nomem(new_list,req)) { + return tevent_req_post(req, ev); + } + + for (i=0; i 0) { + subreq = push_database_old_send(state, ev, client, + old_list, old_count, recdb); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, push_database_old_done, req); + } else { + state->old_done = true; + } + + if (new_count > 0) { + subreq = push_database_new_send(state, ev, client, + new_list, new_count, recdb, + tun_list->rec_buffer_size_limit); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, push_database_new_done, req); + } else { + state->new_done = true; + } + + return req; +} + +static void push_database_old_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct push_database_state *state = tevent_req_data( + req, struct push_database_state); + bool status; + int ret; + + status = push_database_old_recv(subreq, &ret); + if (! status) { + tevent_req_error(req, ret); + return; + } + + state->old_done = true; + + if (state->old_done && state->new_done) { + tevent_req_done(req); + } +} + +static void push_database_new_done(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct push_database_state *state = tevent_req_data( + req, struct push_database_state); + bool status; + int ret; + + status = push_database_new_recv(subreq, &ret); + if (! status) { + tevent_req_error(req, ret); + return; + } + + state->new_done = true; + + if (state->old_done && state->new_done) { + tevent_req_done(req); + } +} + +static bool push_database_recv(struct tevent_req *req, int *perr) +{ + return generic_recv(req, perr); +} + /* * Collect databases using highest sequence number */ @@ -951,8 +1469,6 @@ struct recover_db_state { const char *db_name, *db_path; struct recdb_context *recdb; - struct ctdb_rec_buffer *recbuf; - }; static void recover_db_name_done(struct tevent_req *subreq); @@ -1225,7 +1741,6 @@ static void recover_db_wipedb_done(struct tevent_req *subreq) subreq, struct tevent_req); struct recover_db_state *state = tevent_req_data( req, struct recover_db_state); - struct ctdb_req_control request; int *err_list; int ret; bool status; @@ -1251,18 +1766,10 @@ static void recover_db_wipedb_done(struct tevent_req *subreq) return; } - state->recbuf = recdb_records(state->recdb, state, state->destnode); - if (tevent_req_nomem(state->recbuf, req)) { - return; - } - - TALLOC_FREE(state->recdb); - - ctdb_req_control_push_db(&request, state->recbuf); - subreq = ctdb_client_control_multi_send(state, state->ev, - state->client, - state->pnn_list, state->count, - TIMEOUT(), &request); + subreq = push_database_send(state, state->ev, state->client, + state->pnn_list, state->count, + state->caps, state->tun_list, + state->recdb); if (tevent_req_nomem(subreq, req)) { return; } @@ -1276,32 +1783,17 @@ static void recover_db_pushdb_done(struct tevent_req *subreq) struct recover_db_state *state = tevent_req_data( req, struct recover_db_state); struct ctdb_req_control request; - int *err_list; int ret; bool status; - status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, - NULL); + status = push_database_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { - int ret2; - uint32_t pnn; - - ret2 = ctdb_client_control_multi_error(state->pnn_list, - state->count, - err_list, &pnn); - if (ret2 != 0) { - LOG("control PUSHDB failed for db %s on node %u," - " ret=%d\n", state->db_name, pnn, ret2); - } else { - LOG("control PUSHDB failed for db %s, ret=%d\n", - state->db_name, ret); - } tevent_req_error(req, ret); return; } - TALLOC_FREE(state->recbuf); + TALLOC_FREE(state->recdb); ctdb_req_control_db_transaction_commit(&request, &state->transdb); subreq = ctdb_client_control_multi_send(state, state->ev, -- 2.47.3