]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
ctdb-recovery: Remove serial database recovery code
authorAmitay Isaacs <amitay@gmail.com>
Tue, 19 Jul 2016 06:06:37 +0000 (16:06 +1000)
committerAmitay Isaacs <amitay@samba.org>
Mon, 25 Jul 2016 19:29:42 +0000 (21:29 +0200)
Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
ctdb/server/ctdb_recoverd.c

index e5b94540fecf59e4bb5c57964ee340317c0daaf0..4eeb4ce6ca446e5e6ea021de2585d1b5d2ed557c 100644 (file)
@@ -344,87 +344,6 @@ static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
        ctdb_set_culprit_count(rec, culprit, 1);
 }
 
-
-/* this callback is called for every node that failed to execute the
-   recovered event
-*/
-static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
-
-       DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
-
-       ctdb_set_culprit(rec, node_pnn);
-}
-
-/*
-  run the "recovered" eventscript on all nodes
- */
-static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, const char *caller)
-{
-       TALLOC_CTX *tmp_ctx;
-       uint32_t *nodes;
-       struct ctdb_context *ctdb = rec->ctdb;
-
-       tmp_ctx = talloc_new(ctdb);
-       CTDB_NO_MEMORY(ctdb, tmp_ctx);
-
-       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, tdb_null,
-                                       NULL, recovered_fail_callback,
-                                       rec) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
-
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       talloc_free(tmp_ctx);
-       return 0;
-}
-
-/* this callback is called for every node that failed to execute the
-   start recovery event
-*/
-static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
-
-       DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
-
-       ctdb_set_culprit(rec, node_pnn);
-}
-
-/*
-  run the "startrecovery" eventscript on all nodes
- */
-static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
-{
-       TALLOC_CTX *tmp_ctx;
-       uint32_t *nodes;
-       struct ctdb_context *ctdb = rec->ctdb;
-
-       tmp_ctx = talloc_new(ctdb);
-       CTDB_NO_MEMORY(ctdb, tmp_ctx);
-
-       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, tdb_null,
-                                       NULL,
-                                       startrecovery_fail_callback,
-                                       rec) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       talloc_free(tmp_ctx);
-       return 0;
-}
-
 /*
   Retrieve capabilities from all connected nodes
  */
@@ -474,14 +393,6 @@ static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_p
        ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
 }
 
-static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
-
-       DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
-       ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
-}
-
 /*
   change recovery mode on all nodes
  */
@@ -703,244 +614,6 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
        return 0;
 }
 
-
-/*
-  pull the remote database contents from one node into the recdb
- */
-static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
-                                   struct tdb_wrap *recdb, uint32_t dbid)
-{
-       int ret;
-       TDB_DATA outdata;
-       struct ctdb_marshall_buffer *reply;
-       struct ctdb_rec_data_old *recdata;
-       int i;
-       TALLOC_CTX *tmp_ctx = talloc_new(recdb);
-
-       ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
-                              CONTROL_TIMEOUT(), &outdata);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       reply = (struct ctdb_marshall_buffer *)outdata.dptr;
-
-       if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
-               DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       recdata = (struct ctdb_rec_data_old *)&reply->data[0];
-
-       for (i=0;
-            i<reply->count;
-            recdata = (struct ctdb_rec_data_old *)(recdata->length + (uint8_t *)recdata), i++) {
-               TDB_DATA key, data;
-               struct ctdb_ltdb_header *hdr;
-               TDB_DATA existing;
-
-               key.dptr = &recdata->data[0];
-               key.dsize = recdata->keylen;
-               data.dptr = &recdata->data[key.dsize];
-               data.dsize = recdata->datalen;
-
-               hdr = (struct ctdb_ltdb_header *)data.dptr;
-
-               if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
-                       DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
-                       talloc_free(tmp_ctx);
-                       return -1;
-               }
-
-               /* fetch the existing record, if any */
-               existing = tdb_fetch(recdb->tdb, key);
-
-               if (existing.dptr != NULL) {
-                       struct ctdb_ltdb_header header;
-                       if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
-                               DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n",
-                                        (unsigned)existing.dsize, srcnode));
-                               free(existing.dptr);
-                               talloc_free(tmp_ctx);
-                               return -1;
-                       }
-                       header = *(struct ctdb_ltdb_header *)existing.dptr;
-                       free(existing.dptr);
-                       if (!(header.rsn < hdr->rsn ||
-                             (header.dmaster != ctdb_get_pnn(ctdb) &&
-                              header.rsn == hdr->rsn))) {
-                               continue;
-                       }
-               }
-
-               if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
-                       DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
-                       talloc_free(tmp_ctx);
-                       return -1;
-               }
-       }
-
-       talloc_free(tmp_ctx);
-
-       return 0;
-}
-
-
-struct pull_seqnum_cbdata {
-       int failed;
-       uint32_t pnn;
-       uint64_t seqnum;
-};
-
-static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
-       uint64_t seqnum;
-
-       if (cb_data->failed != 0) {
-               DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
-               return;
-       }
-
-       if (res != 0) {
-               DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
-               cb_data->failed = 1;
-               return;
-       }
-
-       if (outdata.dsize != sizeof(uint64_t)) {
-               DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
-               cb_data->failed = -1;
-               return;
-       }
-
-       seqnum = *((uint64_t *)outdata.dptr);
-
-       if (seqnum > cb_data->seqnum ||
-           (cb_data->pnn == -1 && seqnum == 0)) {
-               cb_data->seqnum = seqnum;
-               cb_data->pnn = node_pnn;
-       }
-}
-
-static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
-
-       DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
-       cb_data->failed = 1;
-}
-
-static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
-                               struct ctdb_recoverd *rec, 
-                               struct ctdb_node_map_old *nodemap, 
-                               struct tdb_wrap *recdb, uint32_t dbid)
-{
-       TALLOC_CTX *tmp_ctx = talloc_new(NULL);
-       uint32_t *nodes;
-       TDB_DATA data;
-       uint32_t outdata[2];
-       struct pull_seqnum_cbdata *cb_data;
-
-       DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
-
-       outdata[0] = dbid;
-       outdata[1] = 0;
-
-       data.dsize = sizeof(outdata);
-       data.dptr  = (uint8_t *)&outdata[0];
-
-       cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
-       if (cb_data == NULL) {
-               DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       cb_data->failed = 0;
-       cb_data->pnn    = -1;
-       cb_data->seqnum = 0;
-       
-       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, data,
-                                       pull_seqnum_cb,
-                                       pull_seqnum_fail_cb,
-                                       cb_data) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
-
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       if (cb_data->failed != 0) {
-               DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       if (cb_data->pnn == -1) {
-               DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
-
-       if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
-               DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       talloc_free(tmp_ctx);
-       return 0;
-}
-
-
-/*
-  pull all the remote database contents into the recdb
- */
-static int pull_remote_database(struct ctdb_context *ctdb,
-                               struct ctdb_recoverd *rec, 
-                               struct ctdb_node_map_old *nodemap, 
-                               struct tdb_wrap *recdb, uint32_t dbid,
-                               bool persistent)
-{
-       int j;
-
-       if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
-               int ret;
-               ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
-               if (ret == 0) {
-                       return 0;
-               }
-       }
-
-       /* pull all records from all other nodes across onto this node
-          (this merges based on rsn)
-       */
-       for (j=0; j<nodemap->num; j++) {
-               /* don't merge from nodes that are unavailable */
-               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
-                       DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
-                                nodemap->nodes[j].pnn));
-                       ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
-                       return -1;
-               }
-       }
-       
-       return 0;
-}
-
-
 /*
   update flags on all active nodes
  */
@@ -957,32 +630,6 @@ static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node
        return 0;
 }
 
-/*
-  ensure all nodes have the same vnnmap we do
- */
-static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
-                                     uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
-{
-       int j, ret;
-
-       /* push the new vnn map out to all the nodes */
-       for (j=0; j<nodemap->num; j++) {
-               /* don't push to nodes that are unavailable */
-               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-
-               ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
-                       return -1;
-               }
-       }
-
-       return 0;
-}
-
-
 /*
   called when a vacuum fetch has completed - just free it and do the next one
  */
@@ -1289,259 +936,6 @@ static uint32_t new_generation(void)
        return generation;
 }
 
-
-/*
-  create a temporary working database
- */
-static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
-{
-       char *name;
-       struct tdb_wrap *recdb;
-       unsigned tdb_flags;
-
-       /* open up the temporary recovery database */
-       name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
-                              ctdb->db_directory_state,
-                              ctdb->pnn);
-       if (name == NULL) {
-               return NULL;
-       }
-       unlink(name);
-
-       tdb_flags = TDB_NOLOCK;
-       if (ctdb->valgrinding) {
-               tdb_flags |= TDB_NOMMAP;
-       }
-       tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
-
-       recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
-                             tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
-       if (recdb == NULL) {
-               DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
-       }
-
-       talloc_free(name);
-
-       return recdb;
-}
-
-
-/* 
-   a traverse function for pulling all relevant records from recdb
- */
-struct recdb_data {
-       struct ctdb_context *ctdb;
-       struct ctdb_marshall_buffer *recdata;
-       uint32_t len;
-       uint32_t allocated_len;
-       bool failed;
-       bool persistent;
-};
-
-static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
-{
-       struct recdb_data *params = (struct recdb_data *)p;
-       struct ctdb_rec_data_old *recdata;
-       struct ctdb_ltdb_header *hdr;
-
-       /*
-        * skip empty records - but NOT for persistent databases:
-        *
-        * The record-by-record mode of recovery deletes empty records.
-        * For persistent databases, this can lead to data corruption
-        * by deleting records that should be there:
-        *
-        * - Assume the cluster has been running for a while.
-        *
-        * - A record R in a persistent database has been created and
-        *   deleted a couple of times, the last operation being deletion,
-        *   leaving an empty record with a high RSN, say 10.
-        *
-        * - Now a node N is turned off.
-        *
-        * - This leaves the local database copy of D on N with the empty
-        *   copy of R and RSN 10. On all other nodes, the recovery has deleted
-        *   the copy of record R.
-        *
-        * - Now the record is created again while node N is turned off.
-        *   This creates R with RSN = 1 on all nodes except for N.
-        *
-        * - Now node N is turned on again. The following recovery will chose
-        *   the older empty copy of R due to RSN 10 > RSN 1.
-        *
-        * ==> Hence the record is gone after the recovery.
-        *
-        * On databases like Samba's registry, this can damage the higher-level
-        * data structures built from the various tdb-level records.
-        */
-       if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
-               return 0;
-       }
-
-       /* update the dmaster field to point to us */
-       hdr = (struct ctdb_ltdb_header *)data.dptr;
-       if (!params->persistent) {
-               hdr->dmaster = params->ctdb->pnn;
-               hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
-       }
-
-       /* add the record to the blob ready to send to the nodes */
-       recdata = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
-       if (recdata == NULL) {
-               params->failed = true;
-               return -1;
-       }
-       if (params->len + recdata->length >= params->allocated_len) {
-               params->allocated_len = recdata->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
-               params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
-       }
-       if (params->recdata == NULL) {
-               DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
-                        recdata->length + params->len));
-               params->failed = true;
-               return -1;
-       }
-       params->recdata->count++;
-       memcpy(params->len+(uint8_t *)params->recdata, recdata, recdata->length);
-       params->len += recdata->length;
-       talloc_free(recdata);
-
-       return 0;
-}
-
-/*
-  push the recdb database out to all nodes
- */
-static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
-                              bool persistent,
-                              struct tdb_wrap *recdb, struct ctdb_node_map_old *nodemap)
-{
-       struct recdb_data params;
-       struct ctdb_marshall_buffer *recdata;
-       TDB_DATA outdata;
-       TALLOC_CTX *tmp_ctx;
-       uint32_t *nodes;
-
-       tmp_ctx = talloc_new(ctdb);
-       CTDB_NO_MEMORY(ctdb, tmp_ctx);
-
-       recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
-       CTDB_NO_MEMORY(ctdb, recdata);
-
-       recdata->db_id = dbid;
-
-       params.ctdb = ctdb;
-       params.recdata = recdata;
-       params.len = offsetof(struct ctdb_marshall_buffer, data);
-       params.allocated_len = params.len;
-       params.failed = false;
-       params.persistent = persistent;
-
-       if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
-               talloc_free(params.recdata);
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       if (params.failed) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
-               talloc_free(params.recdata);
-               talloc_free(tmp_ctx);
-               return -1;              
-       }
-
-       recdata = params.recdata;
-
-       outdata.dptr = (void *)recdata;
-       outdata.dsize = params.len;
-
-       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, outdata,
-                                       NULL, NULL,
-                                       NULL) != 0) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
-               talloc_free(recdata);
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
-                 dbid, recdata->count));
-
-       talloc_free(recdata);
-       talloc_free(tmp_ctx);
-
-       return 0;
-}
-
-
-/*
-  go through a full recovery on one database 
- */
-static int recover_database(struct ctdb_recoverd *rec, 
-                           TALLOC_CTX *mem_ctx,
-                           uint32_t dbid,
-                           bool persistent,
-                           uint32_t pnn, 
-                           struct ctdb_node_map_old *nodemap,
-                           uint32_t transaction_id)
-{
-       struct tdb_wrap *recdb;
-       int ret;
-       struct ctdb_context *ctdb = rec->ctdb;
-       TDB_DATA data;
-       struct ctdb_transdb w;
-       uint32_t *nodes;
-
-       recdb = create_recdb(ctdb, mem_ctx);
-       if (recdb == NULL) {
-               return -1;
-       }
-
-       /* pull all remote databases onto the recdb */
-       ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
-               return -1;
-       }
-
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
-
-       /* wipe all the remote databases. This is safe as we are in a transaction */
-       w.db_id = dbid;
-       w.tid = transaction_id;
-
-       data.dptr = (void *)&w;
-       data.dsize = sizeof(w);
-
-       nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, data,
-                                       NULL, NULL,
-                                       NULL) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
-               talloc_free(recdb);
-               return -1;
-       }
-       
-       /* push out the correct database. This sets the dmaster and skips 
-          the empty records */
-       ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
-       if (ret != 0) {
-               talloc_free(recdb);
-               return -1;
-       }
-
-       /* all done with this database */
-       talloc_free(recdb);
-
-       return 0;
-}
-
 static bool ctdb_recovery_have_lock(struct ctdb_recoverd *rec)
 {
        return (rec->recovery_lock_handle != NULL);
@@ -1875,170 +1269,6 @@ fail:
        return -1;
 }
 
-static int db_recovery_serial(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
-                             uint32_t pnn, struct ctdb_node_map_old *nodemap,
-                             struct ctdb_vnn_map *vnnmap,
-                             struct ctdb_dbid_map_old *dbmap)
-{
-       struct ctdb_context *ctdb = rec->ctdb;
-       uint32_t generation;
-       TDB_DATA data;
-       uint32_t *nodes;
-       int ret, i, j;
-
-       /* set recovery mode to active on all nodes */
-       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, true);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
-               return -1;
-       }
-
-       /* execute the "startrecovery" event script on all nodes */
-       ret = run_startrecovery_eventscript(rec, nodemap);
-       if (ret!=0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
-               return -1;
-       }
-
-       /* pick a new generation number */
-       generation = new_generation();
-
-       /* change the vnnmap on this node to use the new generation 
-          number but not on any other nodes.
-          this guarantees that if we abort the recovery prematurely
-          for some reason (a node stops responding?)
-          that we can just return immediately and we will reenter
-          recovery shortly again.
-          I.e. we deliberately leave the cluster with an inconsistent
-          generation id to allow us to abort recovery at any stage and
-          just restart it from scratch.
-        */
-       vnnmap->generation = generation;
-       ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
-               return -1;
-       }
-
-       /* Database generations are updated when the transaction is commited to
-        * the databases.  So make sure to use the final generation as the
-        * transaction id
-        */
-       generation = new_generation();
-
-       data.dptr = (void *)&generation;
-       data.dsize = sizeof(uint32_t);
-
-       nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, data,
-                                       NULL,
-                                       transaction_start_fail_callback,
-                                       rec) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
-               if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, tdb_null,
-                                       NULL,
-                                       NULL,
-                                       NULL) != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
-               }
-               return -1;
-       }
-
-       DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
-
-       for (i=0;i<dbmap->num;i++) {
-               ret = recover_database(rec, mem_ctx,
-                                      dbmap->dbs[i].db_id,
-                                      dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
-                                      pnn, nodemap, generation);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].db_id));
-                       return -1;
-               }
-       }
-
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
-
-       /* commit all the changes */
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, data,
-                                       NULL, NULL,
-                                       NULL) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
-               return -1;
-       }
-
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
-
-       /* build a new vnn map with all the currently active and
-          unbanned nodes */
-       vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
-       CTDB_NO_MEMORY(ctdb, vnnmap);
-       vnnmap->generation = generation;
-       vnnmap->size = 0;
-       vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
-       CTDB_NO_MEMORY(ctdb, vnnmap->map);
-       for (i=j=0;i<nodemap->num;i++) {
-               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               if (!ctdb_node_has_capabilities(rec->caps,
-                                               ctdb->nodes[i]->pnn,
-                                               CTDB_CAP_LMASTER)) {
-                       /* this node can not be an lmaster */
-                       DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
-                       continue;
-               }
-
-               vnnmap->size++;
-               vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
-               CTDB_NO_MEMORY(ctdb, vnnmap->map);
-               vnnmap->map[j++] = nodemap->nodes[i].pnn;
-
-       }
-       if (vnnmap->size == 0) {
-               DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
-               vnnmap->size++;
-               vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
-               CTDB_NO_MEMORY(ctdb, vnnmap->map);
-               vnnmap->map[0] = pnn;
-       }
-
-       /* update to the new vnnmap on all nodes */
-       ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
-               return -1;
-       }
-
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
-
-       /* disable recovery mode */
-       ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL, false);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
-               return -1;
-       }
-
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
-
-       /* execute the "recovered" event script on all nodes */
-       ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
-       if (ret!=0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
-               return -1;
-       }
-
-       DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
-
-       return 0;
-}
-
 /*
   we are the recmaster, and recovery is needed - start a recovery run
  */
@@ -2050,7 +1280,6 @@ static int do_recovery(struct ctdb_recoverd *rec,
        int i, ret;
        struct ctdb_dbid_map_old *dbmap;
        bool self_ban;
-       bool par_recovery;
 
        DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
 
@@ -2174,27 +1403,7 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
 
-       /* Check if all participating nodes have parallel recovery capability */
-       par_recovery = true;
-       for (i=0; i<nodemap->num; i++) {
-               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-
-               if (!(rec->caps[i].capabilities &
-                     CTDB_CAP_PARALLEL_RECOVERY)) {
-                       par_recovery = false;
-                       break;
-               }
-       }
-
-       if (par_recovery) {
-               ret = db_recovery_parallel(rec, mem_ctx);
-       } else {
-               ret = db_recovery_serial(rec, mem_ctx, pnn, nodemap, vnnmap,
-                                        dbmap);
-       }
-
+       ret = db_recovery_parallel(rec, mem_ctx);
        if (ret != 0) {
                goto fail;
        }