return 0;
}
-struct recdb_records_traverse_state {
- struct ctdb_rec_buffer *recbuf;
- uint32_t dmaster;
- uint32_t reqid;
- bool persistent;
- bool failed;
-};
-
-static int recdb_records_traverse(struct tdb_context *tdb,
- TDB_DATA key, TDB_DATA data,
- void *private_data)
-{
- struct recdb_records_traverse_state *state =
- (struct recdb_records_traverse_state *)private_data;
- int ret;
-
- ret = recbuf_filter_add(state->recbuf, state->persistent,
- state->reqid, state->dmaster, key, data);
- if (ret != 0) {
- state->failed = true;
- return ret;
- }
-
- return 0;
-}
-
-static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
- TALLOC_CTX *mem_ctx,
- uint32_t dmaster)
-{
- struct recdb_records_traverse_state state;
- int ret;
-
- state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
- if (state.recbuf == NULL) {
- return NULL;
- }
- state.dmaster = dmaster;
- state.reqid = 0;
- state.persistent = recdb_persistent(recdb);
- state.failed = false;
-
- ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
- &state);
- if (ret == -1 || state.failed) {
- D_ERR("Failed to marshall recovery records for %s\n",
- recdb_name(recdb));
- TALLOC_FREE(state.recbuf);
- return NULL;
- }
-
- return state.recbuf;
-}
-
struct recdb_file_traverse_state {
struct ctdb_rec_buffer *recbuf;
struct recdb_context *recdb;
static void pull_database_handler(uint64_t srvid, TDB_DATA data,
void *private_data);
static void pull_database_register_done(struct tevent_req *subreq);
-static void pull_database_old_done(struct tevent_req *subreq);
static void pull_database_unregister_done(struct tevent_req *subreq);
static void pull_database_new_done(struct tevent_req *subreq);
{
struct tevent_req *req, *subreq;
struct pull_database_state *state;
- struct ctdb_req_control request;
req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
if (req == NULL) {
state->pnn = pnn;
state->srvid = srvid_next();
- if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
- subreq = ctdb_client_set_message_handler_send(
+ subreq = ctdb_client_set_message_handler_send(
state, state->ev, state->client,
state->srvid, pull_database_handler,
req);
- if (tevent_req_nomem(subreq, req)) {
- return tevent_req_post(req, ev);
- }
-
- tevent_req_set_callback(subreq, pull_database_register_done,
- req);
-
- } else {
- struct ctdb_pulldb pulldb;
-
- pulldb.db_id = recdb_id(recdb);
- pulldb.lmaster = CTDB_LMASTER_ANY;
-
- ctdb_req_control_pull_db(&request, &pulldb);
- subreq = ctdb_client_control_send(state, state->ev,
- state->client,
- pnn, TIMEOUT(),
- &request);
- if (tevent_req_nomem(subreq, req)) {
- return tevent_req_post(req, ev);
- }
- tevent_req_set_callback(subreq, pull_database_old_done, req);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
}
+ tevent_req_set_callback(subreq, pull_database_register_done, req);
+
return req;
}
tevent_req_set_callback(subreq, pull_database_new_done, req);
}
-static void pull_database_old_done(struct tevent_req *subreq)
-{
- struct tevent_req *req = tevent_req_callback_data(
- subreq, struct tevent_req);
- struct pull_database_state *state = tevent_req_data(
- req, struct pull_database_state);
- struct ctdb_reply_control *reply;
- struct ctdb_rec_buffer *recbuf;
- int ret;
- bool status;
-
- status = ctdb_client_control_recv(subreq, &ret, state, &reply);
- TALLOC_FREE(subreq);
- if (! status) {
- D_ERR("control PULL_DB failed for %s on node %u, ret=%d\n",
- recdb_name(state->recdb), state->pnn, ret);
- tevent_req_error(req, ret);
- return;
- }
-
- ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
- talloc_free(reply);
- if (ret != 0) {
- tevent_req_error(req, ret);
- return;
- }
-
- status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
- recbuf);
- if (! status) {
- talloc_free(recbuf);
- tevent_req_error(req, EIO);
- return;
- }
-
- state->num_records = recbuf->count;
- talloc_free(recbuf);
-
- D_INFO("Pulled %d records for db %s from node %d\n",
- state->num_records, recdb_name(state->recdb), state->pnn);
-
- tevent_req_done(req);
-}
-
static void pull_database_new_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
return generic_recv(req, perr);
}
-/*
- * Push database to specified nodes (old style)
- */
-
-struct push_database_old_state {
- struct tevent_context *ev;
- struct ctdb_client_context *client;
- struct recdb_context *recdb;
- uint32_t *pnn_list;
- unsigned int count;
- struct ctdb_rec_buffer *recbuf;
- unsigned int index;
-};
-
-static void push_database_old_push_done(struct tevent_req *subreq);
-
-static struct tevent_req *push_database_old_send(
- TALLOC_CTX *mem_ctx,
- struct tevent_context *ev,
- struct ctdb_client_context *client,
- uint32_t *pnn_list,
- unsigned int count,
- struct recdb_context *recdb)
-{
- struct tevent_req *req, *subreq;
- struct push_database_old_state *state;
- struct ctdb_req_control request;
- uint32_t pnn;
-
- req = tevent_req_create(mem_ctx, &state,
- struct push_database_old_state);
- if (req == NULL) {
- return NULL;
- }
-
- state->ev = ev;
- state->client = client;
- state->recdb = recdb;
- state->pnn_list = pnn_list;
- state->count = count;
- state->index = 0;
-
- state->recbuf = recdb_records(recdb, state,
- ctdb_client_pnn(client));
- if (tevent_req_nomem(state->recbuf, req)) {
- return tevent_req_post(req, ev);
- }
-
- pnn = state->pnn_list[state->index];
-
- ctdb_req_control_push_db(&request, state->recbuf);
- subreq = ctdb_client_control_send(state, ev, client, pnn,
- TIMEOUT(), &request);
- if (tevent_req_nomem(subreq, req)) {
- return tevent_req_post(req, ev);
- }
- tevent_req_set_callback(subreq, push_database_old_push_done, req);
-
- return req;
-}
-
-static void push_database_old_push_done(struct tevent_req *subreq)
-{
- struct tevent_req *req = tevent_req_callback_data(
- subreq, struct tevent_req);
- struct push_database_old_state *state = tevent_req_data(
- req, struct push_database_old_state);
- struct ctdb_req_control request;
- uint32_t pnn;
- int ret;
- bool status;
-
- status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
- TALLOC_FREE(subreq);
- if (! status) {
- D_ERR("control PUSH_DB failed for db %s on node %u, ret=%d\n",
- recdb_name(state->recdb), state->pnn_list[state->index],
- ret);
- tevent_req_error(req, ret);
- return;
- }
-
- state->index += 1;
- if (state->index == state->count) {
- TALLOC_FREE(state->recbuf);
- tevent_req_done(req);
- return;
- }
-
- pnn = state->pnn_list[state->index];
-
- ctdb_req_control_push_db(&request, state->recbuf);
- subreq = ctdb_client_control_send(state, state->ev, state->client,
- pnn, TIMEOUT(), &request);
- if (tevent_req_nomem(subreq, req)) {
- return;
- }
- tevent_req_set_callback(subreq, push_database_old_push_done, req);
-}
-
-static bool push_database_old_recv(struct tevent_req *req, int *perr)
-{
- return generic_recv(req, perr);
-}
-
/*
* Push database to specified nodes (new style)
*/
}
/*
- * wrapper for push_database_old and push_database_new
+ * wrapper for push_database_new
*/
struct push_database_state {
- bool old_done, new_done;
};
-static void push_database_old_done(struct tevent_req *subreq);
static void push_database_new_done(struct tevent_req *subreq);
static struct tevent_req *push_database_send(
{
struct tevent_req *req, *subreq;
struct push_database_state *state;
- uint32_t *old_list, *new_list;
- unsigned int old_count, new_count;
- unsigned int i;
req = tevent_req_create(mem_ctx, &state, struct push_database_state);
if (req == NULL) {
return NULL;
}
- state->old_done = false;
- state->new_done = false;
-
- old_count = 0;
- new_count = 0;
- old_list = talloc_array(state, uint32_t, nlist->count);
- new_list = talloc_array(state, uint32_t, nlist->count);
- if (tevent_req_nomem(old_list, req) ||
- tevent_req_nomem(new_list,req)) {
+ subreq = push_database_new_send(state,
+ ev,
+ client,
+ nlist->pnn_list,
+ nlist->count,
+ recdb,
+ tun_list->rec_buffer_size_limit);
+ if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
-
- for (i=0; i<nlist->count; i++) {
- if (nlist->caps[i] & CTDB_CAP_FRAGMENTED_CONTROLS) {
- new_list[new_count] = nlist->pnn_list[i];
- new_count += 1;
- } else {
- old_list[old_count] = nlist->pnn_list[i];
- old_count += 1;
- }
- }
-
- if (old_count > 0) {
- subreq = push_database_old_send(state, ev, client,
- old_list, old_count, recdb);
- if (tevent_req_nomem(subreq, req)) {
- return tevent_req_post(req, ev);
- }
- tevent_req_set_callback(subreq, push_database_old_done, req);
- } else {
- state->old_done = true;
- }
-
- if (new_count > 0) {
- subreq = push_database_new_send(state, ev, client,
- new_list, new_count, recdb,
- tun_list->rec_buffer_size_limit);
- if (tevent_req_nomem(subreq, req)) {
- return tevent_req_post(req, ev);
- }
- tevent_req_set_callback(subreq, push_database_new_done, req);
- } else {
- state->new_done = true;
- }
+ tevent_req_set_callback(subreq, push_database_new_done, req);
return req;
}
-static void push_database_old_done(struct tevent_req *subreq)
-{
- struct tevent_req *req = tevent_req_callback_data(
- subreq, struct tevent_req);
- struct push_database_state *state = tevent_req_data(
- req, struct push_database_state);
- bool status;
- int ret;
-
- status = push_database_old_recv(subreq, &ret);
- if (! status) {
- tevent_req_error(req, ret);
- return;
- }
-
- state->old_done = true;
-
- if (state->old_done && state->new_done) {
- tevent_req_done(req);
- }
-}
-
static void push_database_new_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
- struct push_database_state *state = tevent_req_data(
- req, struct push_database_state);
bool status;
int ret;
return;
}
- state->new_done = true;
-
- if (state->old_done && state->new_done) {
- tevent_req_done(req);
- }
+ tevent_req_done(req);
}
static bool push_database_recv(struct tevent_req *req, int *perr)