From 95a15cde45c47e36d87d54464bcd769ee96e43c2 Mon Sep 17 00:00:00 2001 From: Amitay Isaacs Date: Fri, 19 Feb 2016 17:32:09 +1100 Subject: [PATCH] ctdb-daemon: Implement new controls DB_PULL and DB_PUSH_START/DB_PUSH_CONFIRM Signed-off-by: Amitay Isaacs Reviewed-by: Martin Schwenke --- ctdb/include/ctdb_private.h | 11 ++ ctdb/server/ctdb_control.c | 12 ++ ctdb/server/ctdb_recover.c | 340 ++++++++++++++++++++++++++++++++++++ 3 files changed, 363 insertions(+) diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h index b7c3e5d31a6..04574fe9c4c 100644 --- a/ctdb/include/ctdb_private.h +++ b/ctdb/include/ctdb_private.h @@ -437,6 +437,9 @@ struct ctdb_db_context { bool freeze_transaction_started; uint32_t freeze_transaction_id; uint32_t generation; + + bool push_started; + void *push_state; }; @@ -873,6 +876,14 @@ int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata); int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata); +int32_t ctdb_control_db_pull(struct ctdb_context *ctdb, + struct ctdb_req_control_old *c, + TDB_DATA indata, TDB_DATA *outdata); +int32_t ctdb_control_db_push_start(struct ctdb_context *ctdb, + TDB_DATA indata); +int32_t ctdb_control_db_push_confirm(struct ctdb_context *ctdb, + TDB_DATA indata, TDB_DATA *outdata); + int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb); int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb, diff --git a/ctdb/server/ctdb_control.c b/ctdb/server/ctdb_control.c index e38832653b2..7d18969b1b0 100644 --- a/ctdb/server/ctdb_control.c +++ b/ctdb/server/ctdb_control.c @@ -705,6 +705,18 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb, CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t)); return ctdb_control_db_transaction_cancel(ctdb, indata); + case CTDB_CONTROL_DB_PULL: + CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_pulldb_ext)); + return ctdb_control_db_pull(ctdb, c, indata, outdata); + + case CTDB_CONTROL_DB_PUSH_START: + CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_pulldb_ext)); + return ctdb_control_db_push_start(ctdb, indata); + + case CTDB_CONTROL_DB_PUSH_CONFIRM: + CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t)); + return ctdb_control_db_push_confirm(ctdb, indata, outdata); + default: DEBUG(DEBUG_CRIT,(__location__ " Unknown CTDB control opcode %u\n", opcode)); return -1; diff --git a/ctdb/server/ctdb_recover.c b/ctdb/server/ctdb_recover.c index 79b5b2b9748..102854564bc 100644 --- a/ctdb/server/ctdb_recover.c +++ b/ctdb/server/ctdb_recover.c @@ -313,6 +313,133 @@ int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DAT return 0; } +struct db_pull_state { + struct ctdb_context *ctdb; + struct ctdb_db_context *ctdb_db; + struct ctdb_marshall_buffer *recs; + uint32_t pnn; + uint64_t srvid; + uint32_t num_records; +}; + +static int traverse_db_pull(struct tdb_context *tdb, TDB_DATA key, + TDB_DATA data, void *private_data) +{ + struct db_pull_state *state = (struct db_pull_state *)private_data; + struct ctdb_marshall_buffer *recs; + + recs = ctdb_marshall_add(state->ctdb, state->recs, + state->ctdb_db->db_id, 0, key, NULL, data); + if (recs == NULL) { + TALLOC_FREE(state->recs); + return -1; + } + state->recs = recs; + + if (talloc_get_size(state->recs) >= + state->ctdb->tunable.rec_buffer_size_limit) { + TDB_DATA buffer; + int ret; + + buffer = ctdb_marshall_finish(state->recs); + ret = ctdb_daemon_send_message(state->ctdb, state->pnn, + state->srvid, buffer); + if (ret != 0) { + TALLOC_FREE(state->recs); + return -1; + } + + state->num_records += state->recs->count; + TALLOC_FREE(state->recs); + } + + return 0; +} + +int32_t ctdb_control_db_pull(struct ctdb_context *ctdb, + struct ctdb_req_control_old *c, + TDB_DATA indata, TDB_DATA *outdata) +{ + struct ctdb_pulldb_ext *pulldb_ext; + struct ctdb_db_context *ctdb_db; + struct db_pull_state state; + int ret; + + pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr; + + ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id); + if (ctdb_db == NULL) { + DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", + pulldb_ext->db_id)); + return -1; + } + + if (!ctdb_db_frozen(ctdb_db)) { + DEBUG(DEBUG_ERR, + ("rejecting ctdb_control_pull_db when not frozen\n")); + return -1; + } + + if (ctdb_db->unhealthy_reason) { + /* this is just a warning, as the tdb should be empty anyway */ + DEBUG(DEBUG_WARNING, + ("db(%s) unhealty in ctdb_control_db_pull: %s\n", + ctdb_db->db_name, ctdb_db->unhealthy_reason)); + } + + state.ctdb = ctdb; + state.ctdb_db = ctdb_db; + state.recs = NULL; + state.pnn = c->hdr.srcnode; + state.srvid = pulldb_ext->srvid; + state.num_records = 0; + + if (ctdb_lockdb_mark(ctdb_db) != 0) { + DEBUG(DEBUG_ERR, + (__location__ " Failed to get lock on entire db - failing\n")); + return -1; + } + + ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_db_pull, &state); + if (ret == -1) { + DEBUG(DEBUG_ERR, + (__location__ " Failed to get traverse db '%s'\n", + ctdb_db->db_name)); + ctdb_lockdb_unmark(ctdb_db); + return -1; + } + + /* Last few records */ + if (state.recs != NULL) { + TDB_DATA buffer; + + buffer = ctdb_marshall_finish(state.recs); + ret = ctdb_daemon_send_message(state.ctdb, state.pnn, + state.srvid, buffer); + if (ret != 0) { + TALLOC_FREE(state.recs); + ctdb_lockdb_unmark(ctdb_db); + return -1; + } + + state.num_records += state.recs->count; + TALLOC_FREE(state.recs); + } + + ctdb_lockdb_unmark(ctdb_db); + + outdata->dptr = talloc_size(outdata, sizeof(uint32_t)); + if (outdata->dptr == NULL) { + DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n")); + return -1; + } + + memcpy(outdata->dptr, (uint8_t *)&state.num_records, sizeof(uint32_t)); + outdata->dsize = sizeof(uint32_t); + + return 0; +} + /* push a bunch of records into a ltdb, filtering by rsn */ @@ -407,6 +534,219 @@ failed: return -1; } +struct db_push_state { + struct ctdb_context *ctdb; + struct ctdb_db_context *ctdb_db; + uint64_t srvid; + uint32_t num_records; + bool failed; +}; + +static void db_push_msg_handler(uint64_t srvid, TDB_DATA indata, + void *private_data) +{ + struct db_push_state *state = talloc_get_type( + private_data, struct db_push_state); + struct ctdb_marshall_buffer *recs; + struct ctdb_rec_data_old *rec; + int i, ret; + + if (state->failed) { + return; + } + + recs = (struct ctdb_marshall_buffer *)indata.dptr; + rec = (struct ctdb_rec_data_old *)&recs->data[0]; + + DEBUG(DEBUG_INFO, ("starting push of %u records for dbid 0x%x\n", + recs->count, recs->db_id)); + + for (i=0; icount; i++) { + TDB_DATA key, data; + struct ctdb_ltdb_header *hdr; + + key.dptr = &rec->data[0]; + key.dsize = rec->keylen; + data.dptr = &rec->data[key.dsize]; + data.dsize = rec->datalen; + + if (data.dsize < sizeof(struct ctdb_ltdb_header)) { + DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n")); + goto failed; + } + + hdr = (struct ctdb_ltdb_header *)data.dptr; + /* Strip off any read only record flags. + * All readonly records are revoked implicitely by a recovery. + */ + hdr->flags &= ~CTDB_REC_RO_FLAGS; + + data.dptr += sizeof(*hdr); + data.dsize -= sizeof(*hdr); + + ret = ctdb_ltdb_store(state->ctdb_db, key, hdr, data); + if (ret != 0) { + DEBUG(DEBUG_ERR, + (__location__ " Unable to store record\n")); + goto failed; + } + + rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec); + } + + DEBUG(DEBUG_DEBUG, ("finished push of %u records for dbid 0x%x\n", + recs->count, recs->db_id)); + + state->num_records += recs->count; + return; + +failed: + state->failed = true; +} + +int32_t ctdb_control_db_push_start(struct ctdb_context *ctdb, TDB_DATA indata) +{ + struct ctdb_pulldb_ext *pulldb_ext; + struct ctdb_db_context *ctdb_db; + struct db_push_state *state; + int ret; + + pulldb_ext = (struct ctdb_pulldb_ext *)indata.dptr; + + ctdb_db = find_ctdb_db(ctdb, pulldb_ext->db_id); + if (ctdb_db == NULL) { + DEBUG(DEBUG_ERR, + (__location__ " Unknown db 0x%08x\n", pulldb_ext->db_id)); + return -1; + } + + if (!ctdb_db_frozen(ctdb_db)) { + DEBUG(DEBUG_ERR, + ("rejecting ctdb_control_db_push_start when not frozen\n")); + return -1; + } + + if (ctdb_db->push_started) { + DEBUG(DEBUG_WARNING, + (__location__ " DB push already started for %s\n", + ctdb_db->db_name)); + + /* De-register old state */ + state = (struct db_push_state *)ctdb_db->push_state; + if (state != NULL) { + srvid_deregister(ctdb->srv, state->srvid, state); + talloc_free(state); + ctdb_db->push_state = NULL; + } + } + + state = talloc_zero(ctdb_db, struct db_push_state); + if (state == NULL) { + DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n")); + return -1; + } + + state->ctdb = ctdb; + state->ctdb_db = ctdb_db; + state->srvid = pulldb_ext->srvid; + state->failed = false; + + ret = srvid_register(ctdb->srv, state, state->srvid, + db_push_msg_handler, state); + if (ret != 0) { + DEBUG(DEBUG_ERR, + (__location__ " Failed to register srvid for db push\n")); + talloc_free(state); + return -1; + } + + if (ctdb_lockdb_mark(ctdb_db) != 0) { + DEBUG(DEBUG_ERR, + (__location__ " Failed to get lock on entire db - failing\n")); + srvid_deregister(ctdb->srv, state->srvid, state); + talloc_free(state); + return -1; + } + + ctdb_db->push_started = true; + ctdb_db->push_state = state; + + return 0; +} + +int32_t ctdb_control_db_push_confirm(struct ctdb_context *ctdb, + TDB_DATA indata, TDB_DATA *outdata) +{ + uint32_t db_id; + struct ctdb_db_context *ctdb_db; + struct db_push_state *state; + + db_id = *(uint32_t *)indata.dptr; + + ctdb_db = find_ctdb_db(ctdb, db_id); + if (ctdb_db == NULL) { + DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", db_id)); + return -1; + } + + if (!ctdb_db_frozen(ctdb_db)) { + DEBUG(DEBUG_ERR, + ("rejecting ctdb_control_db_push_confirm when not frozen\n")); + return -1; + } + + if (!ctdb_db->push_started) { + DEBUG(DEBUG_ERR, (__location__ " DB push not started\n")); + return -1; + } + + if (ctdb_db->readonly) { + DEBUG(DEBUG_ERR, + ("Clearing the tracking database for dbid 0x%x\n", + ctdb_db->db_id)); + if (tdb_wipe_all(ctdb_db->rottdb) != 0) { + DEBUG(DEBUG_ERR, + ("Failed to wipe tracking database for 0x%x." + " Dropping read-only delegation support\n", + ctdb_db->db_id)); + ctdb_db->readonly = false; + tdb_close(ctdb_db->rottdb); + ctdb_db->rottdb = NULL; + ctdb_db->readonly = false; + } + + while (ctdb_db->revokechild_active != NULL) { + talloc_free(ctdb_db->revokechild_active); + } + } + + ctdb_lockdb_unmark(ctdb_db); + + state = (struct db_push_state *)ctdb_db->push_state; + if (state == NULL) { + DEBUG(DEBUG_ERR, (__location__ " Missing push db state\n")); + return -1; + } + + srvid_deregister(ctdb->srv, state->srvid, state); + + outdata->dptr = talloc_size(outdata, sizeof(uint32_t)); + if (outdata->dptr == NULL) { + DEBUG(DEBUG_ERR, (__location__ " Memory allocation error\n")); + talloc_free(state); + ctdb_db->push_state = NULL; + return -1; + } + + memcpy(outdata->dptr, (uint8_t *)&state->num_records, sizeof(uint32_t)); + outdata->dsize = sizeof(uint32_t); + + talloc_free(state); + ctdb_db->push_state = NULL; + + return 0; +} + struct ctdb_set_recmode_state { struct ctdb_context *ctdb; struct ctdb_req_control_old *c; -- 2.47.3