From: Amitay Isaacs Date: Tue, 4 Apr 2017 08:25:28 +0000 (+1000) Subject: ctdb-client: Refactor cluster-wide database traverse api X-Git-Tag: ldb-1.1.31~147 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=da9e0195a82a82258e80e01a3131160ce7f9e53b;p=thirdparty%2Fsamba.git ctdb-client: Refactor cluster-wide database traverse api This implements the async version of the traverse code in the ctdb tool for catdb command. Signed-off-by: Amitay Isaacs Reviewed-by: Martin Schwenke --- diff --git a/ctdb/client/client.h b/ctdb/client/client.h index 2205112b4f1..928fcd52d29 100644 --- a/ctdb/client/client.h +++ b/ctdb/client/client.h @@ -699,6 +699,23 @@ int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly, bool extract_header, ctdb_rec_parser_func_t parser, void *private_data); +struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + uint32_t destnode, + struct timeval timeout, + ctdb_rec_parser_func_t parser, + void *private_data); + +bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr); + +int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + uint32_t destnode, struct timeval timeout, + ctdb_rec_parser_func_t parser, void *private_data); + int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key, struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx, TDB_DATA *data); diff --git a/ctdb/client/client_db.c b/ctdb/client/client_db.c index cb3dc44c529..604034335a1 100644 --- a/ctdb/client/client_db.c +++ b/ctdb/client/client_db.c @@ -689,6 +689,252 @@ int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly, return state.error; } +struct ctdb_db_traverse_state { + struct tevent_context *ev; + struct ctdb_client_context *client; + struct ctdb_db_context *db; + uint32_t destnode; + uint64_t srvid; + struct timeval timeout; + ctdb_rec_parser_func_t parser; + void *private_data; + int result; +}; + +static void ctdb_db_traverse_handler_set(struct tevent_req *subreq); +static void ctdb_db_traverse_started(struct tevent_req *subreq); +static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data, + void *private_data); +static void ctdb_db_traverse_remove_handler(struct tevent_req *req); +static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq); + +struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + uint32_t destnode, + struct timeval timeout, + ctdb_rec_parser_func_t parser, + void *private_data) +{ + struct tevent_req *req, *subreq; + struct ctdb_db_traverse_state *state; + + req = tevent_req_create(mem_ctx, &state, + struct ctdb_db_traverse_state); + if (req == NULL) { + return NULL; + } + + state->ev = ev; + state->client = client; + state->db = db; + state->destnode = destnode; + state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid(); + state->timeout = timeout; + state->parser = parser; + state->private_data = private_data; + + subreq = ctdb_client_set_message_handler_send(state, ev, client, + state->srvid, + ctdb_db_traverse_handler, + req); + if (tevent_req_nomem(subreq, req)) { + return tevent_req_post(req, ev); + } + tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req); + + return req; +} + +static void ctdb_db_traverse_handler_set(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_db_traverse_state *state = tevent_req_data( + req, struct ctdb_db_traverse_state); + struct ctdb_traverse_start_ext traverse; + struct ctdb_req_control request; + int ret = 0; + bool status; + + status = ctdb_client_set_message_handler_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + traverse = (struct ctdb_traverse_start_ext) { + .db_id = ctdb_db_id(state->db), + .reqid = 0, + .srvid = state->srvid, + .withemptyrecords = false, + }; + + ctdb_req_control_traverse_start_ext(&request, &traverse); + subreq = ctdb_client_control_send(state, state->ev, state->client, + state->destnode, state->timeout, + &request); + if (subreq == NULL) { + state->result = ENOMEM; + ctdb_db_traverse_remove_handler(req); + return; + } + tevent_req_set_callback(subreq, ctdb_db_traverse_started, req); +} + +static void ctdb_db_traverse_started(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_db_traverse_state *state = tevent_req_data( + req, struct ctdb_db_traverse_state); + struct ctdb_reply_control *reply; + int ret = 0; + bool status; + + status = ctdb_client_control_recv(subreq, &ret, state, &reply); + TALLOC_FREE(subreq); + if (! status) { + DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret)); + state->result = ret; + ctdb_db_traverse_remove_handler(req); + return; + } + + ret = ctdb_reply_control_traverse_start_ext(reply); + talloc_free(reply); + if (ret != 0) { + DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n", + ret)); + state->result = ret; + ctdb_db_traverse_remove_handler(req); + return; + } +} + +static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data, + void *private_data) +{ + struct tevent_req *req = talloc_get_type_abort( + private_data, struct tevent_req); + struct ctdb_db_traverse_state *state = tevent_req_data( + req, struct ctdb_db_traverse_state); + struct ctdb_rec_data *rec; + struct ctdb_ltdb_header header; + int ret; + + ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec); + if (ret != 0) { + return; + } + + if (rec->key.dsize == 0 && rec->data.dsize == 0) { + talloc_free(rec); + ctdb_db_traverse_remove_handler(req); + return; + } + + ret = ctdb_ltdb_header_extract(&rec->data, &header); + if (ret != 0) { + talloc_free(rec); + return; + } + + if (rec->data.dsize == 0) { + talloc_free(rec); + return; + } + + ret = state->parser(rec->reqid, &header, rec->key, rec->data, + state->private_data); + talloc_free(rec); + if (ret != 0) { + state->result = ret; + ctdb_db_traverse_remove_handler(req); + } +} + +static void ctdb_db_traverse_remove_handler(struct tevent_req *req) +{ + struct ctdb_db_traverse_state *state = tevent_req_data( + req, struct ctdb_db_traverse_state); + struct tevent_req *subreq; + + subreq = ctdb_client_remove_message_handler_send(state, state->ev, + state->client, + state->srvid, req); + if (tevent_req_nomem(subreq, req)) { + return; + } + tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req); +} + +static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq) +{ + struct tevent_req *req = tevent_req_callback_data( + subreq, struct tevent_req); + struct ctdb_db_traverse_state *state = tevent_req_data( + req, struct ctdb_db_traverse_state); + int ret; + bool status; + + status = ctdb_client_remove_message_handler_recv(subreq, &ret); + TALLOC_FREE(subreq); + if (! status) { + tevent_req_error(req, ret); + return; + } + + if (state->result != 0) { + tevent_req_error(req, state->result); + return; + } + + tevent_req_done(req); +} + +bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr) +{ + int ret; + + if (tevent_req_is_unix_error(req, &ret)) { + if (perr != NULL) { + *perr = ret; + } + return false; + } + + return true; +} + +int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev, + struct ctdb_client_context *client, + struct ctdb_db_context *db, + uint32_t destnode, struct timeval timeout, + ctdb_rec_parser_func_t parser, void *private_data) +{ + struct tevent_req *req; + int ret = 0; + bool status; + + req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode, + timeout, parser, private_data); + if (req == NULL) { + return ENOMEM; + } + + tevent_req_poll(req, ev); + + status = ctdb_db_traverse_recv(req, &ret); + if (! status) { + return ret; + } + + return 0; +} + int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key, struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx, TDB_DATA *data)