]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Handle DROP DATABASE getting interrupted
authorAndres Freund <andres@anarazel.de>
Thu, 13 Jul 2023 20:03:36 +0000 (13:03 -0700)
committerAndres Freund <andres@anarazel.de>
Thu, 13 Jul 2023 20:03:36 +0000 (13:03 -0700)
Until now, when DROP DATABASE got interrupted in the wrong moment, the removal
of the pg_database row would also roll back, even though some irreversible
steps have already been taken. E.g. DropDatabaseBuffers() might have thrown
out dirty buffers, or files could have been unlinked. But we continued to
allow connections to such a corrupted database.

To fix this, mark databases invalid with an in-place update, just before
starting to perform irreversible steps. As we can't add a new column in the
back branches, we use pg_database.datconnlimit = -2 for this purpose.

An invalid database cannot be connected to anymore, but can still be
dropped.

Unfortunately we can't easily add output to psql's \l to indicate that some
database is invalid, it doesn't fit in any of the existing columns.

Add tests verifying that a interrupted DROP DATABASE is handled correctly in
the backend and in various tools.

Reported-by: Evgeny Morozov <postgresql3@realityexists.net>
Author: Andres Freund <andres@anarazel.de>
Reviewed-by: Daniel Gustafsson <daniel@yesql.se>
Reviewed-by: Thomas Munro <thomas.munro@gmail.com>
Discussion: https://postgr.es/m/20230509004637.cgvmfwrbht7xm7p6@awork3.anarazel.de
Discussion: https://postgr.es/m/20230314174521.74jl6ffqsee5mtug@awork3.anarazel.de
Backpatch: 11-, bug present in all supported versions

16 files changed:
doc/src/sgml/catalogs.sgml
src/backend/commands/dbcommands.c
src/backend/commands/vacuum.c
src/backend/postmaster/autovacuum.c
src/backend/utils/init/postinit.c
src/bin/pg_dump/pg_dumpall.c
src/bin/pg_dump/t/002_pg_dump.pl
src/bin/scripts/clusterdb.c
src/bin/scripts/reindexdb.c
src/bin/scripts/t/011_clusterdb_all.pl
src/bin/scripts/t/050_dropdb.pl
src/bin/scripts/t/091_reindexdb_all.pl
src/bin/scripts/t/101_vacuumdb_all.pl
src/bin/scripts/vacuumdb.c
src/include/catalog/pg_database.h
src/test/recovery/t/037_invalid_database.pl [new file with mode: 0644]

index 3f8508060d95d5c0b06fdd5fbe42da9b31f5595d..0036f69175e8603eaecb98a0893fccacdfda3c7e 100644 (file)
@@ -2647,7 +2647,8 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
       <entry></entry>
       <entry>
        Sets maximum number of concurrent connections that can be made
-       to this database.  -1 means no limit.
+       to this database.  -1 means no limit, -2 indicates the database is
+       invalid.
       </entry>
      </row>
 
index 361620eecd5c54bc7beede6fd7458ac7750770b8..963119602139acde4888baeb815258afd86c8afc 100644 (file)
@@ -139,7 +139,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
        int                     encoding = -1;
        bool            dbistemplate = false;
        bool            dballowconnections = true;
-       int                     dbconnlimit = -1;
+       int                     dbconnlimit = DATCONNLIMIT_UNLIMITED;
        int                     notherbackends;
        int                     npreparedxacts;
        createdb_failure_params fparms;
@@ -288,7 +288,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
        if (dconnlimit && dconnlimit->arg)
        {
                dbconnlimit = defGetInt32(dconnlimit);
-               if (dbconnlimit < -1)
+               if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
                        ereport(ERROR,
                                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                         errmsg("invalid connection limit: %d", dbconnlimit)));
@@ -336,6 +336,16 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
                                 errmsg("template database \"%s\" does not exist",
                                                dbtemplate)));
 
+       /*
+        * If the source database was in the process of being dropped, we can't
+        * use it as a template.
+        */
+       if (database_is_invalid_oid(src_dboid))
+               ereport(ERROR,
+                               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                               errmsg("cannot use invalid database \"%s\" as template", dbtemplate),
+                               errhint("Use DROP DATABASE to drop invalid databases."));
+
        /*
         * Permission check: to copy a DB that's not marked datistemplate, you
         * must be superuser or the owner thereof.
@@ -796,6 +806,7 @@ dropdb(const char *dbname, bool missing_ok)
        bool            db_istemplate;
        Relation        pgdbrel;
        HeapTuple       tup;
+       Form_pg_database datform;
        int                     notherbackends;
        int                     npreparedxacts;
        int                     nslots,
@@ -903,17 +914,6 @@ dropdb(const char *dbname, bool missing_ok)
                                                                  "There are %d subscriptions.",
                                                                  nsubscriptions, nsubscriptions)));
 
-       /*
-        * Remove the database's tuple from pg_database.
-        */
-       tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
-       if (!HeapTupleIsValid(tup))
-               elog(ERROR, "cache lookup failed for database %u", db_id);
-
-       CatalogTupleDelete(pgdbrel, &tup->t_self);
-
-       ReleaseSysCache(tup);
-
        /*
         * Delete any comments or security labels associated with the database.
         */
@@ -930,6 +930,32 @@ dropdb(const char *dbname, bool missing_ok)
         */
        dropDatabaseDependencies(db_id);
 
+       tup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
+       if (!HeapTupleIsValid(tup))
+               elog(ERROR, "cache lookup failed for database %u", db_id);
+       datform = (Form_pg_database) GETSTRUCT(tup);
+
+       /*
+        * Except for the deletion of the catalog row, subsequent actions are not
+        * transactional (consider DropDatabaseBuffers() discarding modified
+        * buffers). But we might crash or get interrupted below. To prevent
+        * accesses to a database with invalid contents, mark the database as
+        * invalid using an in-place update.
+        *
+        * We need to flush the WAL before continuing, to guarantee the
+        * modification is durable before performing irreversible filesystem
+        * operations.
+        */
+       datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
+       heap_inplace_update(pgdbrel, tup);
+       XLogFlush(XactLastRecEnd);
+
+       /*
+        * Also delete the tuple - transactionally. If this transaction commits,
+        * the row will be gone, but if we fail, dropdb() can be invoked again.
+        */
+       CatalogTupleDelete(pgdbrel, &tup->t_self);
+
        /*
         * Drop db-specific replication slots.
         */
@@ -1427,7 +1453,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
        ListCell   *option;
        bool            dbistemplate = false;
        bool            dballowconnections = true;
-       int                     dbconnlimit = -1;
+       int                     dbconnlimit = DATCONNLIMIT_UNLIMITED;
        DefElem    *distemplate = NULL;
        DefElem    *dallowconnections = NULL;
        DefElem    *dconnlimit = NULL;
@@ -1510,7 +1536,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
        if (dconnlimit && dconnlimit->arg)
        {
                dbconnlimit = defGetInt32(dconnlimit);
-               if (dbconnlimit < -1)
+               if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
                        ereport(ERROR,
                                        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                         errmsg("invalid connection limit: %d", dbconnlimit)));
@@ -1537,6 +1563,14 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
        datform = (Form_pg_database) GETSTRUCT(tuple);
        dboid = datform->oid;
 
+       if (database_is_invalid_form(datform))
+       {
+               ereport(FATAL,
+                               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                               errmsg("cannot alter invalid database \"%s\"", stmt->dbname),
+                               errhint("Use DROP DATABASE to drop invalid databases."));
+       }
+
        if (!pg_database_ownercheck(dboid, GetUserId()))
                aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
                                           stmt->dbname);
@@ -2093,6 +2127,42 @@ get_database_name(Oid dbid)
        return result;
 }
 
+
+/*
+ * While dropping a database the pg_database row is marked invalid, but the
+ * catalog contents still exist. Connections to such a database are not
+ * allowed.
+ */
+bool
+database_is_invalid_form(Form_pg_database datform)
+{
+       return datform->datconnlimit == DATCONNLIMIT_INVALID_DB;
+}
+
+
+/*
+ * Convenience wrapper around database_is_invalid_form()
+ */
+bool
+database_is_invalid_oid(Oid dboid)
+{
+       HeapTuple       dbtup;
+       Form_pg_database dbform;
+       bool            invalid;
+
+       dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid));
+       if (!HeapTupleIsValid(dbtup))
+               elog(ERROR, "cache lookup failed for database %u", dboid);
+       dbform = (Form_pg_database) GETSTRUCT(dbtup);
+
+       invalid = database_is_invalid_form(dbform);
+
+       ReleaseSysCache(dbtup);
+
+       return invalid;
+}
+
+
 /*
  * recovery_create_dbdir()
  *
index 794841e46c7c2db32f5b373f5d8d1f4a072575b3..fc8f1384cb450ab515d71e72d23c7b5e0dbdb9e8 100644 (file)
@@ -1547,6 +1547,20 @@ vac_truncate_clog(TransactionId frozenXID,
                Assert(TransactionIdIsNormal(datfrozenxid));
                Assert(MultiXactIdIsValid(datminmxid));
 
+               /*
+                * If database is in the process of getting dropped, or has been
+                * interrupted while doing so, no connections to it are possible
+                * anymore. Therefore we don't need to take it into account here.
+                * Which is good, because it can't be processed by autovacuum either.
+                */
+               if (database_is_invalid_form((Form_pg_database) dbform))
+               {
+                       elog(DEBUG2,
+                                "skipping invalid database \"%s\" while computing relfrozenxid",
+                                NameStr(dbform->datname));
+                       continue;
+               }
+
                /*
                 * If things are working properly, no database should have a
                 * datfrozenxid or datminmxid that is "in the future".  However, such
index 5e64dcf4d69961680d6ae003feee34caa6431cbe..996f88abf8b4ca876196d6e1b7bbc55d9117d04b 100644 (file)
@@ -1894,6 +1894,18 @@ get_database_list(void)
                avw_dbase  *avdb;
                MemoryContext oldcxt;
 
+               /*
+                * If database has partially been dropped, we can't, nor need to,
+                * vacuum it.
+                */
+               if (database_is_invalid_form(pgdatabase))
+               {
+                       elog(DEBUG2,
+                                "autovacuum: skipping invalid database \"%s\"",
+                                NameStr(pgdatabase->datname));
+                       continue;
+               }
+
                /*
                 * Allocate our results in the caller's context, not the
                 * transaction's. We do this inside the loop, and restore the original
index ff83196ad17dc1ae1d8c27d09c5012632b89bfb5..1f5b2a88f5a1ea9aa6151b1e1ef5a21f8d89c39a 100644 (file)
@@ -974,6 +974,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
        if (!bootstrap)
        {
                HeapTuple       tuple;
+               Form_pg_database datform;
 
                tuple = GetDatabaseTuple(dbname);
                if (!HeapTupleIsValid(tuple) ||
@@ -983,6 +984,15 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
                                        (errcode(ERRCODE_UNDEFINED_DATABASE),
                                         errmsg("database \"%s\" does not exist", dbname),
                                         errdetail("It seems to have just been dropped or renamed.")));
+
+               datform = (Form_pg_database) GETSTRUCT(tuple);
+               if (database_is_invalid_form(datform))
+               {
+                       ereport(FATAL,
+                                       errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                       errmsg("cannot connect to invalid database \"%s\"", dbname),
+                                       errhint("Use DROP DATABASE to drop invalid databases."));
+               }
        }
 
        /*
index 4f924b5741da6170c2af557f0e673231658dc422..63b5b9106bed29a5af1eaebd9081350a123680ca 100644 (file)
@@ -1320,7 +1320,7 @@ dropDBs(PGconn *conn)
        res = executeQuery(conn,
                                           "SELECT datname "
                                           "FROM pg_database d "
-                                          "WHERE datallowconn "
+                                          "WHERE datallowconn AND datconnlimit != -2 "
                                           "ORDER BY datname");
 
        if (PQntuples(res) > 0)
@@ -1473,7 +1473,7 @@ dumpDatabases(PGconn *conn)
        res = executeQuery(conn,
                                           "SELECT datname "
                                           "FROM pg_database d "
-                                          "WHERE datallowconn "
+                                          "WHERE datallowconn AND datconnlimit != -2 "
                                           "ORDER BY (datname <> 'template1'), datname");
 
        if (PQntuples(res) > 0)
index 055cc91dfe0b3250357d99563d50b6dc30c12c6e..50e70113a01bb8caeef49269fa3b5840c32871ca 100644 (file)
@@ -1428,6 +1428,17 @@ my %tests = (
                },
        },
 
+       'CREATE DATABASE regression_invalid...' => {
+               create_order => 1,
+               create_sql => q(
+                   CREATE DATABASE regression_invalid;
+                       UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid'),
+               regexp => qr/^CREATE DATABASE regression_invalid/m,
+               not_like => {
+                       pg_dumpall_dbprivs => 1,
+               },
+       },
+
        'CREATE ACCESS METHOD gist2' => {
                create_order => 52,
                create_sql =>
@@ -3495,7 +3506,7 @@ $node->psql('postgres', 'create database regress_pg_dump_test;');
 
 # Start with number of command_fails_like()*2 tests below (each
 # command_fails_like is actually 2 tests)
-my $num_tests = 12;
+my $num_tests = 14;
 
 foreach my $run (sort keys %pgdump_runs)
 {
@@ -3623,6 +3634,14 @@ command_fails_like(
        qr/\Qpg_dump: error: connection to database "qqq" failed: FATAL:  database "qqq" does not exist\E/,
        'connecting to a non-existent database');
 
+#########################################
+# Test connecting to an invalid database
+
+command_fails_like(
+       [ 'pg_dump', '-p', "$port", '-d', 'regression_invalid' ],
+       qr/pg_dump: error: connection to database .* failed: FATAL:  cannot connect to invalid database "regression_invalid"/,
+       'connecting to an invalid database');
+
 #########################################
 # Test connecting with an unprivileged user
 
index 28100c9fbb3cfeae49622c1bf283bd67c6fe15d3..4daa8b733edaf69d57502c2bcbd96eb5a978e1d3 100644 (file)
@@ -236,7 +236,9 @@ cluster_all_databases(ConnParams *cparams, const char *progname,
        int                     i;
 
        conn = connectMaintenanceDatabase(cparams, progname, echo);
-       result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", progname, echo);
+       result = executeQuery(conn,
+                                                 "SELECT datname FROM pg_database WHERE datallowconn AND datconnlimit <> -2 ORDER BY 1;",
+                                                 progname, echo);
        PQfinish(conn);
 
        for (i = 0; i < PQntuples(result); i++)
index 26f2e5f237776c29dfa915ced554920cb2e2a62a..39b4078b4118d6fb92218c43ea5c50a228ddb35a 100644 (file)
@@ -360,7 +360,9 @@ reindex_all_databases(ConnParams *cparams,
        int                     i;
 
        conn = connectMaintenanceDatabase(cparams, progname, echo);
-       result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", progname, echo);
+       result = executeQuery(conn,
+                                                 "SELECT datname FROM pg_database WHERE datallowconn AND datconnlimit <> -2 ORDER BY 1;",
+                                                 progname, echo);
        PQfinish(conn);
 
        for (i = 0; i < PQntuples(result); i++)
index efd541bc4d65613f329b529fb5f14c7380e022e2..8ac734bdfa298b7548dcb30cdf27f5d79ed5e410 100644 (file)
@@ -3,7 +3,7 @@ use warnings;
 
 use PostgresNode;
 use TestLib;
-use Test::More tests => 2;
+use Test::More tests => 4;
 
 my $node = get_new_node('main');
 $node->init;
@@ -17,3 +17,16 @@ $node->issues_sql_like(
        [ 'clusterdb', '-a' ],
        qr/statement: CLUSTER.*statement: CLUSTER/s,
        'cluster all databases');
+
+$node->safe_psql(
+       'postgres', q(
+       CREATE DATABASE regression_invalid;
+       UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid';
+));
+$node->command_ok([ 'clusterdb', '-a' ],
+  'invalid database not targeted by clusterdb -a');
+
+# Doesn't quite belong here, but don't want to waste time by creating an
+# invalid database in 010_clusterdb.pl as well.
+$node->command_fails([ 'clusterdb', '-d', 'regression_invalid'],
+  'clusterdb cannot target invalid database');
index 25aa54a4ae4b2ee98872c67312214c85db10dad2..9e29437570cb2893c5dcc7daccdac062bfaf8f7c 100644 (file)
@@ -3,7 +3,7 @@ use warnings;
 
 use PostgresNode;
 use TestLib;
-use Test::More tests => 11;
+use Test::More tests => 12;
 
 program_help_ok('dropdb');
 program_version_ok('dropdb');
@@ -21,3 +21,12 @@ $node->issues_sql_like(
 
 $node->command_fails([ 'dropdb', 'nonexistent' ],
        'fails with nonexistent database');
+
+# check that invalid database can be dropped with dropdb
+$node->safe_psql(
+       'postgres', q(
+       CREATE DATABASE regression_invalid;
+       UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid';
+));
+$node->command_ok([ 'dropdb', 'regression_invalid' ],
+  'invalid database can be dropped');
index 8e6041460c5c8939a5bf8d80d0211268fd9b1f37..6990fcea9ae8506c5af3d011dd7b3a13324d81a4 100644 (file)
@@ -2,7 +2,7 @@ use strict;
 use warnings;
 
 use PostgresNode;
-use Test::More tests => 2;
+use Test::More tests => 4;
 
 my $node = get_new_node('main');
 $node->init;
@@ -14,3 +14,16 @@ $node->issues_sql_like(
        [ 'reindexdb', '-a' ],
        qr/statement: REINDEX.*statement: REINDEX/s,
        'reindex all databases');
+
+$node->safe_psql(
+       'postgres', q(
+       CREATE DATABASE regression_invalid;
+       UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid';
+));
+$node->command_ok([ 'reindexdb', '-a' ],
+  'invalid database not targeted by reindexdb -a');
+
+# Doesn't quite belong here, but don't want to waste time by creating an
+# invalid database in 090_reindexdb.pl as well.
+$node->command_fails([ 'reindexdb', '-d', 'regression_invalid'],
+  'reindexdb cannot target invalid database');
index 43212587e5ab7204d3b646a00a57a13c8352cf9b..d90557265402f3f4cbc239311856cefd50d22e82 100644 (file)
@@ -2,7 +2,7 @@ use strict;
 use warnings;
 
 use PostgresNode;
-use Test::More tests => 2;
+use Test::More tests => 4;
 
 my $node = get_new_node('main');
 $node->init;
@@ -12,3 +12,16 @@ $node->issues_sql_like(
        [ 'vacuumdb', '-a' ],
        qr/statement: VACUUM.*statement: VACUUM/s,
        'vacuum all databases');
+
+$node->safe_psql(
+       'postgres', q(
+       CREATE DATABASE regression_invalid;
+       UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid';
+));
+$node->command_ok([ 'vacuumdb', '-a' ],
+  'invalid database not targeted by vacuumdb -a');
+
+# Doesn't quite belong here, but don't want to waste time by creating an
+# invalid database in 010_vacuumdb.pl as well.
+$node->command_fails([ 'vacuumdb', '-d', 'regression_invalid'],
+  'vacuumdb cannot target invalid database');
index d0db3fb88329d6b4470c62e694e881082b31b617..bff6802dd36e93d28429a99f2db6317f722caeba 100644 (file)
@@ -763,7 +763,7 @@ vacuum_all_databases(ConnParams *cparams,
 
        conn = connectMaintenanceDatabase(cparams, progname, echo);
        result = executeQuery(conn,
-                                                 "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;",
+                                                 "SELECT datname FROM pg_database WHERE datallowconn AND datconnlimit <> -2 ORDER BY 1;",
                                                  progname, echo);
        PQfinish(conn);
 
index 06fea45f53a463e1e3a38df53f2e3c9ddf5b9306..3b0adaef3afac7a4960dcd44642e05457f852374 100644 (file)
@@ -52,7 +52,10 @@ CATALOG(pg_database,1262,DatabaseRelationId) BKI_SHARED_RELATION BKI_ROWTYPE_OID
        /* new connections allowed? */
        bool            datallowconn;
 
-       /* max connections allowed (-1=no limit) */
+       /*
+        * Max connections allowed. Negative values have special meaning, see
+        * DATCONNLIMIT_* defines below.
+        */
        int32           datconnlimit;
 
        /* highest OID to consider a system OID */
@@ -80,4 +83,19 @@ CATALOG(pg_database,1262,DatabaseRelationId) BKI_SHARED_RELATION BKI_ROWTYPE_OID
  */
 typedef FormData_pg_database *Form_pg_database;
 
+/*
+ * Special values for pg_database.datconnlimit. Normal values are >= 0.
+ */
+#define                  DATCONNLIMIT_UNLIMITED        -1      /* no limit */
+
+/*
+ * A database is set to invalid partway through being dropped.  Using
+ * datconnlimit=-2 for this purpose isn't particularly clean, but is
+ * backpatchable.
+ */
+#define                  DATCONNLIMIT_INVALID_DB       -2
+
+extern bool database_is_invalid_form(Form_pg_database datform);
+extern bool database_is_invalid_oid(Oid dboid);
+
 #endif                                                 /* PG_DATABASE_H */
diff --git a/src/test/recovery/t/037_invalid_database.pl b/src/test/recovery/t/037_invalid_database.pl
new file mode 100644 (file)
index 0000000..a061fab
--- /dev/null
@@ -0,0 +1,157 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+#
+# Test we handle interrupted DROP DATABASE correctly.
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->append_conf(
+       "postgresql.conf", qq(
+autovacuum = off
+max_prepared_transactions=5
+log_min_duration_statement=0
+log_connections=on
+log_disconnections=on
+));
+
+$node->start;
+
+
+# First verify that we can't connect to or ALTER an invalid database. Just
+# mark the database as invalid ourselves, that's more reliable than hitting the
+# required race conditions (see testing further down)...
+
+$node->safe_psql(
+       "postgres", qq(
+CREATE DATABASE regression_invalid;
+UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid';
+));
+
+my $psql_stdout = '';
+my $psql_stderr = '';
+
+is($node->psql('regression_invalid', '', stderr => \$psql_stderr),
+       2, "can't connect to invalid database - error code");
+like(
+       $psql_stderr,
+       qr/FATAL:\s+cannot connect to invalid database "regression_invalid"/,
+       "can't connect to invalid database - error message");
+
+is($node->psql('postgres', 'ALTER DATABASE regression_invalid CONNECTION LIMIT 10'),
+       2, "can't ALTER invalid database");
+
+# check invalid database can't be used as a template
+is( $node->psql('postgres', 'CREATE DATABASE copy_invalid TEMPLATE regression_invalid'),
+       3,
+       "can't use invalid database as template");
+
+
+# Verify that VACUUM ignores an invalid database when computing how much of
+# the clog is needed (vac_truncate_clog()). For that we modify the pg_database
+# row of the invalid database to have an outdated datfrozenxid.
+$psql_stderr = '';
+$node->psql(
+       'postgres',
+       qq(
+UPDATE pg_database SET datfrozenxid = '123456' WHERE datname = 'regression_invalid';
+DROP TABLE IF EXISTS foo_tbl; CREATE TABLE foo_tbl();
+VACUUM FREEZE;),
+       stderr => \$psql_stderr);
+unlike(
+       $psql_stderr,
+       qr/some databases have not been vacuumed in over 2 billion transactions/,
+       "invalid databases are ignored by vac_truncate_clog");
+
+
+# But we need to be able to drop an invalid database.
+is( $node->psql(
+               'postgres', 'DROP DATABASE regression_invalid',
+               stdout => \$psql_stdout,
+               stderr => \$psql_stderr),
+       0,
+       "can DROP invalid database");
+
+# Ensure database is gone
+is($node->psql('postgres', 'DROP DATABASE regression_invalid'),
+       3, "can't drop already dropped database");
+
+
+# Test that interruption of DROP DATABASE is handled properly. To ensure the
+# interruption happens at the appropriate moment, we lock pg_tablespace. DROP
+# DATABASE scans pg_tablespace once it has reached the "irreversible" part of
+# dropping the database, making it a suitable point to wait.
+my $bgpsql_in    = '';
+my $bgpsql_out   = '';
+my $bgpsql_err   = '';
+my $bgpsql_timer = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);
+my $bgpsql = $node->background_psql('postgres', \$bgpsql_in, \$bgpsql_out,
+       $bgpsql_timer, on_error_stop => 0);
+$bgpsql_out = '';
+$bgpsql_in .= "SELECT pg_backend_pid();\n";
+
+pump_until($bgpsql, $bgpsql_timer, \$bgpsql_out, qr/\d/);
+
+my $pid = $bgpsql_out;
+$bgpsql_out = '';
+
+# create the database, prevent drop database via lock held by a 2PC transaction
+$bgpsql_in .= qq(
+  CREATE DATABASE regression_invalid_interrupt;
+  BEGIN;
+  LOCK pg_tablespace;
+  PREPARE TRANSACTION 'lock_tblspc';
+  \\echo done
+);
+
+ok(pump_until($bgpsql, $bgpsql_timer, \$bgpsql_out, qr/done/),
+       "blocked DROP DATABASE completion");
+$bgpsql_out = '';
+
+# Try to drop. This will wait due to the still held lock.
+$bgpsql_in .= qq(
+  DROP DATABASE regression_invalid_interrupt;
+  \\echo DROP DATABASE completed
+);
+$bgpsql->pump_nb;
+
+# Ensure we're waiting for the lock
+$node->poll_query_until('postgres',
+       qq(SELECT EXISTS(SELECT * FROM pg_locks WHERE NOT granted AND relation = 'pg_tablespace'::regclass AND mode = 'AccessShareLock');)
+);
+
+# and finally interrupt the DROP DATABASE
+ok($node->safe_psql('postgres', "SELECT pg_cancel_backend($pid)"),
+       "canceling DROP DATABASE");
+
+# wait for cancellation to be processed
+ok( pump_until(
+               $bgpsql, $bgpsql_timer, \$bgpsql_out, qr/DROP DATABASE completed/),
+       "cancel processed");
+$bgpsql_out = '';
+
+# verify that connection to the database aren't allowed
+is($node->psql('regression_invalid_interrupt', ''),
+       2, "can't connect to invalid_interrupt database");
+
+# To properly drop the database, we need to release the lock previously preventing
+# doing so.
+$bgpsql_in .= qq(
+  ROLLBACK PREPARED 'lock_tblspc';
+  \\echo ROLLBACK PREPARED
+);
+ok(pump_until($bgpsql, $bgpsql_timer, \$bgpsql_out, qr/ROLLBACK PREPARED/),
+       "unblock DROP DATABASE");
+$bgpsql_out = '';
+
+is($node->psql('postgres', "DROP DATABASE regression_invalid_interrupt"),
+       0, "DROP DATABASE invalid_interrupt");
+
+$bgpsql_in .= "\\q\n";
+$bgpsql->finish();
+
+done_testing();