]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
ctdb-recovery-helper: Introduce pull database abstraction
authorAmitay Isaacs <amitay@gmail.com>
Wed, 24 Feb 2016 07:10:49 +0000 (18:10 +1100)
committerMartin Schwenke <martins@samba.org>
Fri, 25 Mar 2016 02:26:15 +0000 (03:26 +0100)
This abstraction depending on the capability of the remote node either
uses older PULL_DB control or newer DB_PULL control.

Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
ctdb/server/ctdb_recovery_helper.c

index 582f3aa1542f066dfe91fe05b901112511265fda..0558bc76e70f94113491c3930df9850ceb05acf7 100644 (file)
@@ -79,6 +79,14 @@ static bool generic_recv(struct tevent_req *req, int *perr)
        return true;
 }
 
+static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
+
+static uint64_t srvid_next(void)
+{
+       rec_srvid += 1;
+       return rec_srvid;
+}
+
 /*
  * Recovery database functions
  */
@@ -421,6 +429,265 @@ static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
        return state.num_buffers;
 }
 
+/*
+ * Pull database from a single node
+ */
+
+struct pull_database_state {
+       struct tevent_context *ev;
+       struct ctdb_client_context *client;
+       struct recdb_context *recdb;
+       uint32_t pnn;
+       uint64_t srvid;
+       int num_records;
+};
+
+static void pull_database_handler(uint64_t srvid, TDB_DATA data,
+                                 void *private_data);
+static void pull_database_register_done(struct tevent_req *subreq);
+static void pull_database_old_done(struct tevent_req *subreq);
+static void pull_database_unregister_done(struct tevent_req *subreq);
+static void pull_database_new_done(struct tevent_req *subreq);
+
+static struct tevent_req *pull_database_send(
+                       TALLOC_CTX *mem_ctx,
+                       struct tevent_context *ev,
+                       struct ctdb_client_context *client,
+                       uint32_t pnn, uint32_t caps,
+                       struct recdb_context *recdb)
+{
+       struct tevent_req *req, *subreq;
+       struct pull_database_state *state;
+       struct ctdb_req_control request;
+
+       req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       state->ev = ev;
+       state->client = client;
+       state->recdb = recdb;
+       state->pnn = pnn;
+       state->srvid = srvid_next();
+
+       if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
+               subreq = ctdb_client_set_message_handler_send(
+                                       state, state->ev, state->client,
+                                       state->srvid, pull_database_handler,
+                                       req);
+               if (tevent_req_nomem(subreq, req)) {
+                       return tevent_req_post(req, ev);
+               }
+
+               tevent_req_set_callback(subreq, pull_database_register_done,
+                                       req);
+
+       } else {
+               struct ctdb_pulldb pulldb;
+
+               pulldb.db_id = recdb_id(recdb);
+               pulldb.lmaster = CTDB_LMASTER_ANY;
+
+               ctdb_req_control_pull_db(&request, &pulldb);
+               subreq = ctdb_client_control_send(state, state->ev,
+                                                 state->client,
+                                                 pnn, TIMEOUT(),
+                                                 &request);
+               if (tevent_req_nomem(subreq, req)) {
+                       return tevent_req_post(req, ev);
+               }
+               tevent_req_set_callback(subreq, pull_database_old_done, req);
+       }
+
+       return req;
+}
+
+static void pull_database_handler(uint64_t srvid, TDB_DATA data,
+                                 void *private_data)
+{
+       struct tevent_req *req = talloc_get_type_abort(
+               private_data, struct tevent_req);
+       struct pull_database_state *state = tevent_req_data(
+               req, struct pull_database_state);
+       struct ctdb_rec_buffer *recbuf;
+       int ret;
+       bool status;
+
+       if (srvid != state->srvid) {
+               return;
+       }
+
+       ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf);
+       if (ret != 0) {
+               LOG("Invalid data received for DB_PULL messages\n");
+               return;
+       }
+
+       if (recbuf->db_id != recdb_id(state->recdb)) {
+               talloc_free(recbuf);
+               LOG("Invalid dbid:%08x for DB_PULL messages for %s\n",
+                   recbuf->db_id, recdb_name(state->recdb));
+               return;
+       }
+
+       status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
+                          recbuf);
+       if (! status) {
+               talloc_free(recbuf);
+               LOG("Failed to add records to recdb for %s\n",
+                   recdb_name(state->recdb));
+               return;
+       }
+
+       state->num_records += recbuf->count;
+       talloc_free(recbuf);
+}
+
+static void pull_database_register_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct pull_database_state *state = tevent_req_data(
+               req, struct pull_database_state);
+       struct ctdb_req_control request;
+       struct ctdb_pulldb_ext pulldb_ext;
+       int ret;
+       bool status;
+
+       status = ctdb_client_set_message_handler_recv(subreq, &ret);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               LOG("failed to set message handler for DB_PULL for %s\n",
+                   recdb_name(state->recdb));
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       pulldb_ext.db_id = recdb_id(state->recdb);
+       pulldb_ext.lmaster = CTDB_LMASTER_ANY;
+       pulldb_ext.srvid = state->srvid;
+
+       ctdb_req_control_db_pull(&request, &pulldb_ext);
+       subreq = ctdb_client_control_send(state, state->ev, state->client,
+                                         state->pnn, TIMEOUT(), &request);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, pull_database_new_done, req);
+}
+
+static void pull_database_old_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct pull_database_state *state = tevent_req_data(
+               req, struct pull_database_state);
+       struct ctdb_reply_control *reply;
+       struct ctdb_rec_buffer *recbuf;
+       int ret;
+       bool status;
+
+       status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               LOG("control PULL_DB failed for %s on node %u, ret=%d\n",
+                   recdb_name(state->recdb), state->pnn, ret);
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
+       talloc_free(reply);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
+                          recbuf);
+       if (! status) {
+               talloc_free(recbuf);
+               tevent_req_error(req, EIO);
+               return;
+       }
+
+       state->num_records = recbuf->count;
+       talloc_free(recbuf);
+
+       LOG("Pulled %d records for db %s from node %d\n",
+           state->num_records, recdb_name(state->recdb), state->pnn);
+
+       tevent_req_done(req);
+}
+
+static void pull_database_new_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct pull_database_state *state = tevent_req_data(
+               req, struct pull_database_state);
+       struct ctdb_reply_control *reply;
+       uint32_t num_records;
+       int ret;
+       bool status;
+
+       status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               LOG("control DB_PULL failed for %s on node %u, ret=%d\n",
+                   recdb_name(state->recdb), state->pnn, ret);
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       ret = ctdb_reply_control_db_pull(reply, &num_records);
+       talloc_free(reply);
+       if (num_records != state->num_records) {
+               LOG("mismatch (%u != %u) in DB_PULL records for %s\n",
+                   num_records, state->num_records, recdb_name(state->recdb));
+               tevent_req_error(req, EIO);
+               return;
+       }
+
+       LOG("Pulled %d records for db %s from node %d\n",
+           state->num_records, recdb_name(state->recdb), state->pnn);
+
+       subreq = ctdb_client_remove_message_handler_send(
+                                       state, state->ev, state->client,
+                                       state->srvid, req);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, pull_database_unregister_done, req);
+}
+
+static void pull_database_unregister_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct pull_database_state *state = tevent_req_data(
+               req, struct pull_database_state);
+       int ret;
+       bool status;
+
+       status = ctdb_client_remove_message_handler_recv(subreq, &ret);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               LOG("failed to remove message handler for DB_PULL for %s\n",
+                   recdb_name(state->recdb));
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       tevent_req_done(req);
+}
+
+static bool pull_database_recv(struct tevent_req *req, int *perr)
+{
+       return generic_recv(req, perr);
+}
+
 /*
  * Collect databases using highest sequence number
  */
@@ -484,8 +751,6 @@ static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
        struct collect_highseqnum_db_state *state = tevent_req_data(
                req, struct collect_highseqnum_db_state);
        struct ctdb_reply_control **reply;
-       struct ctdb_req_control request;
-       struct ctdb_pulldb pulldb;
        int *err_list;
        bool status;
        int ret, i;
@@ -532,12 +797,10 @@ static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
        LOG("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
            recdb_name(state->recdb), state->max_pnn, max_seqnum);
 
-       pulldb.db_id = state->db_id;
-       pulldb.lmaster = CTDB_LMASTER_ANY;
-
-       ctdb_req_control_pull_db(&request, &pulldb);
-       subreq = ctdb_client_control_send(state, state->ev, state->client,
-                                         state->max_pnn, TIMEOUT(), &request);
+       subreq = pull_database_send(state, state->ev, state->client,
+                                   state->max_pnn,
+                                   state->caps[state->max_pnn],
+                                   state->recdb);
        if (tevent_req_nomem(subreq, req)) {
                return;
        }
@@ -549,37 +812,16 @@ static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
 {
        struct tevent_req *req = tevent_req_callback_data(
                subreq, struct tevent_req);
-       struct collect_highseqnum_db_state *state = tevent_req_data(
-               req, struct collect_highseqnum_db_state);
-       struct ctdb_reply_control *reply;
-       struct ctdb_rec_buffer *recbuf;
        int ret;
        bool status;
 
-       status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+       status = pull_database_recv(subreq, &ret);
        TALLOC_FREE(subreq);
        if (! status) {
-               LOG("control PULL_DB failed for %s on node %u, ret=%d\n",
-                   recdb_name(state->recdb), state->max_pnn, ret);
                tevent_req_error(req, ret);
                return;
        }
 
-       ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
-       if (ret != 0) {
-               tevent_req_error(req, EPROTO);
-               return;
-       }
-
-       talloc_free(reply);
-
-       ret = recdb_add(state->recdb, ctdb_client_pnn(state->client), recbuf);
-       talloc_free(recbuf);
-       if (! ret) {
-               tevent_req_error(req, EIO);
-               return;
-       }
-
        tevent_req_done(req);
 }
 
@@ -615,7 +857,7 @@ static struct tevent_req *collect_all_db_send(
 {
        struct tevent_req *req, *subreq;
        struct collect_all_db_state *state;
-       struct ctdb_req_control request;
+       uint32_t pnn;
 
        req = tevent_req_create(mem_ctx, &state,
                                struct collect_all_db_state);
@@ -627,18 +869,14 @@ static struct tevent_req *collect_all_db_send(
        state->client = client;
        state->pnn_list = pnn_list;
        state->count = count;
+       state->caps = caps;
        state->db_id = db_id;
        state->recdb = recdb;
-
-       state->pulldb.db_id = db_id;
-       state->pulldb.lmaster = CTDB_LMASTER_ANY;
-
        state->index = 0;
 
-       ctdb_req_control_pull_db(&request, &state->pulldb);
-       subreq = ctdb_client_control_send(state, ev, client,
-                                         state->pnn_list[state->index],
-                                         TIMEOUT(), &request);
+       pnn = state->pnn_list[state->index];
+
+       subreq = pull_database_send(state, ev, client, pnn, caps[pnn], recdb);
        if (tevent_req_nomem(subreq, req)) {
                return tevent_req_post(req, ev);
        }
@@ -653,49 +891,26 @@ static void collect_all_db_pulldb_done(struct tevent_req *subreq)
                subreq, struct tevent_req);
        struct collect_all_db_state *state = tevent_req_data(
                req, struct collect_all_db_state);
-       struct ctdb_reply_control *reply;
-       struct ctdb_req_control request;
-       struct ctdb_rec_buffer *recbuf;
+       uint32_t pnn;
        int ret;
        bool status;
 
-       status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+       status = pull_database_recv(subreq, &ret);
        TALLOC_FREE(subreq);
        if (! status) {
-               LOG("control PULL_DB failed for %s from node %u, ret=%d\n",
-                   recdb_name(state->recdb), state->pnn_list[state->index],
-                   ret);
                tevent_req_error(req, ret);
                return;
        }
 
-       ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
-       if (ret != 0) {
-               LOG("control PULL_DB failed for %s, ret=%d\n",
-                   recdb_name(state->recdb), ret);
-               tevent_req_error(req, EPROTO);
-               return;
-       }
-
-       talloc_free(reply);
-
-       status = recdb_add(state->recdb, ctdb_client_pnn(state->client), recbuf);
-       talloc_free(recbuf);
-       if (! status) {
-               tevent_req_error(req, EIO);
-               return;
-       }
-
        state->index += 1;
        if (state->index == state->count) {
                tevent_req_done(req);
                return;
        }
 
-       ctdb_req_control_pull_db(&request, &state->pulldb);
-       subreq = ctdb_client_control_send(state, state->ev, state->client,
-                                         state->pnn_list[state->index],
-                                         TIMEOUT(), &request);
+       pnn = state->pnn_list[state->index];
+       subreq = pull_database_send(state, state->ev, state->client,
+                                   pnn, state->caps[pnn], state->recdb);
        if (tevent_req_nomem(subreq, req)) {
                return;
        }