]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
CREATE SUBSCRIPTION ... SERVER.
authorJeff Davis <jdavis@postgresql.org>
Fri, 6 Mar 2026 16:27:56 +0000 (08:27 -0800)
committerJeff Davis <jdavis@postgresql.org>
Fri, 6 Mar 2026 16:27:56 +0000 (08:27 -0800)
Allow CREATE SUBSCRIPTION to accept a foreign server using the SERVER
clause instead of a raw connection string using the CONNECTION clause.

  * Enables a user with sufficient privileges to create a subscription
    using a foreign server by name without specifying the connection
    details.

  * Integrates with user mappings (and other FDW infrastructure) using
    the subscription owner.

  * Provides a layer of indirection to manage multiple subscriptions
    to the same remote server more easily.

Also add CREATE FOREIGN DATA WRAPPER ... CONNECTION clause to specify
a connection_function. To be eligible for a subscription, the foreign
server's foreign data wrapper must specify a connection_function.

Add connection_function support to postgres_fdw, and bump postgres_fdw
version to 1.3.

Bump catversion.

Reviewed-by: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
Reviewed-by: Shlok Kyal <shlok.kyal.oss@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/61831790a0a937038f78ce09f8dd4cef7de7456a.camel@j-davis.com

36 files changed:
contrib/postgres_fdw/Makefile
contrib/postgres_fdw/connection.c
contrib/postgres_fdw/expected/postgres_fdw.out
contrib/postgres_fdw/meson.build
contrib/postgres_fdw/postgres_fdw--1.2--1.3.sql [new file with mode: 0644]
contrib/postgres_fdw/postgres_fdw.control
contrib/postgres_fdw/sql/postgres_fdw.sql
contrib/postgres_fdw/t/010_subscription.pl [new file with mode: 0644]
doc/src/sgml/logical-replication.sgml
doc/src/sgml/postgres-fdw.sgml
doc/src/sgml/ref/alter_foreign_data_wrapper.sgml
doc/src/sgml/ref/alter_subscription.sgml
doc/src/sgml/ref/create_foreign_data_wrapper.sgml
doc/src/sgml/ref/create_server.sgml
doc/src/sgml/ref/create_subscription.sgml
src/backend/catalog/dependency.c
src/backend/catalog/pg_subscription.c
src/backend/catalog/system_views.sql
src/backend/commands/foreigncmds.c
src/backend/commands/subscriptioncmds.c
src/backend/foreign/foreign.c
src/backend/parser/gram.y
src/backend/replication/logical/worker.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h
src/bin/psql/describe.c
src/bin/psql/tab-complete.in.c
src/include/catalog/catversion.h
src/include/catalog/pg_foreign_data_wrapper.h
src/include/catalog/pg_subscription.h
src/include/foreign/foreign.h
src/include/nodes/parsenodes.h
src/test/regress/expected/oidjoins.out
src/test/regress/expected/subscription.out
src/test/regress/regress.c
src/test/regress/sql/subscription.sql

index 8eaf4d263b688db911fbf91d6c2e1e583ec11c9c..b8c78b58804aa556c3d70ecaefca1385325df941 100644 (file)
@@ -14,7 +14,7 @@ PG_CPPFLAGS = -I$(libpq_srcdir)
 SHLIB_LINK_INTERNAL = $(libpq)
 
 EXTENSION = postgres_fdw
-DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.sql
+DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.sql postgres_fdw--1.2--1.3.sql
 
 REGRESS = postgres_fdw query_cancel
 ISOLATION = eval_plan_qual
index 311936406f2fece661ab048e070d56d5d88d5c4b..7e2b822d16152ec8fee8688c0a6d1dadb3ecdeb4 100644 (file)
@@ -132,6 +132,7 @@ PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
 PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2);
 PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
 PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
+PG_FUNCTION_INFO_V1(postgres_fdw_connection);
 
 /* prototypes of private functions */
 static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
@@ -477,141 +478,159 @@ pgfdw_security_check(const char **keywords, const char **values, UserMapping *us
 }
 
 /*
- * Connect to remote server using specified server and user mapping properties.
+ * Construct connection params from generic options of ForeignServer and
+ * UserMapping.  (Some of them might not be libpq options, in which case we'll
+ * just waste a few array slots.)
  */
-static PGconn *
-connect_pg_server(ForeignServer *server, UserMapping *user)
+static void
+construct_connection_params(ForeignServer *server, UserMapping *user,
+                                                       const char ***p_keywords, const char ***p_values,
+                                                       char **p_appname)
 {
-       PGconn     *volatile conn = NULL;
+       const char **keywords;
+       const char **values;
+       char       *appname = NULL;
+       int                     n;
 
        /*
-        * Use PG_TRY block to ensure closing connection on error.
+        * Add 4 extra slots for application_name, fallback_application_name,
+        * client_encoding, end marker, and 3 extra slots for scram keys and
+        * required scram pass-through options.
         */
-       PG_TRY();
-       {
-               const char **keywords;
-               const char **values;
-               char       *appname = NULL;
-               int                     n;
+       n = list_length(server->options) + list_length(user->options) + 4 + 3;
+       keywords = (const char **) palloc(n * sizeof(char *));
+       values = (const char **) palloc(n * sizeof(char *));
 
-               /*
-                * Construct connection params from generic options of ForeignServer
-                * and UserMapping.  (Some of them might not be libpq options, in
-                * which case we'll just waste a few array slots.)  Add 4 extra slots
-                * for application_name, fallback_application_name, client_encoding,
-                * end marker, and 3 extra slots for scram keys and required scram
-                * pass-through options.
-                */
-               n = list_length(server->options) + list_length(user->options) + 4 + 3;
-               keywords = (const char **) palloc(n * sizeof(char *));
-               values = (const char **) palloc(n * sizeof(char *));
+       n = 0;
+       n += ExtractConnectionOptions(server->options,
+                                                                 keywords + n, values + n);
+       n += ExtractConnectionOptions(user->options,
+                                                                 keywords + n, values + n);
 
-               n = 0;
-               n += ExtractConnectionOptions(server->options,
-                                                                         keywords + n, values + n);
-               n += ExtractConnectionOptions(user->options,
-                                                                         keywords + n, values + n);
-
-               /*
-                * Use pgfdw_application_name as application_name if set.
-                *
-                * PQconnectdbParams() processes the parameter arrays from start to
-                * end. If any key word is repeated, the last value is used. Therefore
-                * note that pgfdw_application_name must be added to the arrays after
-                * options of ForeignServer are, so that it can override
-                * application_name set in ForeignServer.
-                */
-               if (pgfdw_application_name && *pgfdw_application_name != '\0')
-               {
-                       keywords[n] = "application_name";
-                       values[n] = pgfdw_application_name;
-                       n++;
-               }
+       /*
+        * Use pgfdw_application_name as application_name if set.
+        *
+        * PQconnectdbParams() processes the parameter arrays from start to end.
+        * If any key word is repeated, the last value is used. Therefore note
+        * that pgfdw_application_name must be added to the arrays after options
+        * of ForeignServer are, so that it can override application_name set in
+        * ForeignServer.
+        */
+       if (pgfdw_application_name && *pgfdw_application_name != '\0')
+       {
+               keywords[n] = "application_name";
+               values[n] = pgfdw_application_name;
+               n++;
+       }
 
-               /*
-                * Search the parameter arrays to find application_name setting, and
-                * replace escape sequences in it with status information if found.
-                * The arrays are searched backwards because the last value is used if
-                * application_name is repeatedly set.
-                */
-               for (int i = n - 1; i >= 0; i--)
+       /*
+        * Search the parameter arrays to find application_name setting, and
+        * replace escape sequences in it with status information if found.  The
+        * arrays are searched backwards because the last value is used if
+        * application_name is repeatedly set.
+        */
+       for (int i = n - 1; i >= 0; i--)
+       {
+               if (strcmp(keywords[i], "application_name") == 0 &&
+                       *(values[i]) != '\0')
                {
-                       if (strcmp(keywords[i], "application_name") == 0 &&
-                               *(values[i]) != '\0')
+                       /*
+                        * Use this application_name setting if it's not empty string even
+                        * after any escape sequences in it are replaced.
+                        */
+                       appname = process_pgfdw_appname(values[i]);
+                       if (appname[0] != '\0')
                        {
-                               /*
-                                * Use this application_name setting if it's not empty string
-                                * even after any escape sequences in it are replaced.
-                                */
-                               appname = process_pgfdw_appname(values[i]);
-                               if (appname[0] != '\0')
-                               {
-                                       values[i] = appname;
-                                       break;
-                               }
-
-                               /*
-                                * This empty application_name is not used, so we set
-                                * values[i] to NULL and keep searching the array to find the
-                                * next one.
-                                */
-                               values[i] = NULL;
-                               pfree(appname);
-                               appname = NULL;
+                               values[i] = appname;
+                               break;
                        }
+
+                       /*
+                        * This empty application_name is not used, so we set values[i] to
+                        * NULL and keep searching the array to find the next one.
+                        */
+                       values[i] = NULL;
+                       pfree(appname);
+                       appname = NULL;
                }
+       }
+
+       *p_appname = appname;
 
-               /* Use "postgres_fdw" as fallback_application_name */
-               keywords[n] = "fallback_application_name";
-               values[n] = "postgres_fdw";
+       /* Use "postgres_fdw" as fallback_application_name */
+       keywords[n] = "fallback_application_name";
+       values[n] = "postgres_fdw";
+       n++;
+
+       /* Set client_encoding so that libpq can convert encoding properly. */
+       keywords[n] = "client_encoding";
+       values[n] = GetDatabaseEncodingName();
+       n++;
+
+       /* Add required SCRAM pass-through connection options if it's enabled. */
+       if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(server, user))
+       {
+               int                     len;
+               int                     encoded_len;
+
+               keywords[n] = "scram_client_key";
+               len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
+               /* don't forget the zero-terminator */
+               values[n] = palloc0(len + 1);
+               encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
+                                                                       sizeof(MyProcPort->scram_ClientKey),
+                                                                       (char *) values[n], len);
+               if (encoded_len < 0)
+                       elog(ERROR, "could not encode SCRAM client key");
                n++;
 
-               /* Set client_encoding so that libpq can convert encoding properly. */
-               keywords[n] = "client_encoding";
-               values[n] = GetDatabaseEncodingName();
+               keywords[n] = "scram_server_key";
+               len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
+               /* don't forget the zero-terminator */
+               values[n] = palloc0(len + 1);
+               encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
+                                                                       sizeof(MyProcPort->scram_ServerKey),
+                                                                       (char *) values[n], len);
+               if (encoded_len < 0)
+                       elog(ERROR, "could not encode SCRAM server key");
                n++;
 
-               /* Add required SCRAM pass-through connection options if it's enabled. */
-               if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(server, user))
-               {
-                       int                     len;
-                       int                     encoded_len;
-
-                       keywords[n] = "scram_client_key";
-                       len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
-                       /* don't forget the zero-terminator */
-                       values[n] = palloc0(len + 1);
-                       encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
-                                                                               sizeof(MyProcPort->scram_ClientKey),
-                                                                               (char *) values[n], len);
-                       if (encoded_len < 0)
-                               elog(ERROR, "could not encode SCRAM client key");
-                       n++;
-
-                       keywords[n] = "scram_server_key";
-                       len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
-                       /* don't forget the zero-terminator */
-                       values[n] = palloc0(len + 1);
-                       encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
-                                                                               sizeof(MyProcPort->scram_ServerKey),
-                                                                               (char *) values[n], len);
-                       if (encoded_len < 0)
-                               elog(ERROR, "could not encode SCRAM server key");
-                       n++;
+               /*
+                * Require scram-sha-256 to ensure that no other auth method is used
+                * when connecting with foreign server.
+                */
+               keywords[n] = "require_auth";
+               values[n] = "scram-sha-256";
+               n++;
+       }
 
-                       /*
-                        * Require scram-sha-256 to ensure that no other auth method is
-                        * used when connecting with foreign server.
-                        */
-                       keywords[n] = "require_auth";
-                       values[n] = "scram-sha-256";
-                       n++;
-               }
+       keywords[n] = values[n] = NULL;
+
+       /* Verify the set of connection parameters. */
+       check_conn_params(keywords, values, user);
 
-               keywords[n] = values[n] = NULL;
+       *p_keywords = keywords;
+       *p_values = values;
+}
+
+/*
+ * Connect to remote server using specified server and user mapping properties.
+ */
+static PGconn *
+connect_pg_server(ForeignServer *server, UserMapping *user)
+{
+       PGconn     *volatile conn = NULL;
+
+       /*
+        * Use PG_TRY block to ensure closing connection on error.
+        */
+       PG_TRY();
+       {
+               const char **keywords;
+               const char **values;
+               char       *appname;
 
-               /* Verify the set of connection parameters. */
-               check_conn_params(keywords, values, user);
+               construct_connection_params(server, user, &keywords, &values, &appname);
 
                /* first time, allocate or get the custom wait event */
                if (pgfdw_we_connect == 0)
@@ -2310,6 +2329,56 @@ postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
        }
 }
 
+/*
+ * Values in connection strings must be enclosed in single quotes. Single
+ * quotes and backslashes must be escaped with backslash. NB: these rules are
+ * different from the rules for escaping a SQL literal.
+ */
+static void
+appendEscapedValue(StringInfo str, const char *val)
+{
+       appendStringInfoChar(str, '\'');
+       for (int i = 0; val[i] != '\0'; i++)
+       {
+               if (val[i] == '\\' || val[i] == '\'')
+                       appendStringInfoChar(str, '\\');
+               appendStringInfoChar(str, val[i]);
+       }
+       appendStringInfoChar(str, '\'');
+}
+
+Datum
+postgres_fdw_connection(PG_FUNCTION_ARGS)
+{
+       Oid                     userid = PG_GETARG_OID(0);
+       Oid                     serverid = PG_GETARG_OID(1);
+       ForeignServer *server = GetForeignServer(serverid);
+       UserMapping *user = GetUserMapping(userid, serverid);
+       StringInfoData str;
+       const char **keywords;
+       const char **values;
+       char       *appname;
+       char       *sep = "";
+
+       construct_connection_params(server, user, &keywords, &values, &appname);
+
+       initStringInfo(&str);
+       for (int i = 0; keywords[i] != NULL; i++)
+       {
+               if (values[i] == NULL)
+                       continue;
+               appendStringInfo(&str, "%s%s = ", sep, keywords[i]);
+               appendEscapedValue(&str, values[i]);
+               sep = " ";
+       }
+
+       if (appname != NULL)
+               pfree(appname);
+       pfree(keywords);
+       pfree(values);
+       PG_RETURN_TEXT_P(cstring_to_text(str.data));
+}
+
 /*
  * List active foreign server connections.
  *
index 2ccb72c539ac55408f3042c7f6ee09d10ddaca53..0f5271d476ee48e1f615ad34f695eb0a01cbc02f 100644 (file)
@@ -255,6 +255,14 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
 ANALYZE ft1;
 ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
 -- ===================================================================
+-- test subscription
+-- ===================================================================
+CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
+  PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+DROP SUBSCRIPTION regress_pgfdw_subscription;
+-- ===================================================================
 -- test error case for create publication on foreign table
 -- ===================================================================
 CREATE PUBLICATION testpub_ftbl FOR TABLE ft1;  -- should fail
index ea4cd9fcd46ca20b4518f907f116f974a6aec045..3e2ed06b7665c88564fb1f11e987fddcb0e57a05 100644 (file)
@@ -27,6 +27,7 @@ install_data(
   'postgres_fdw--1.0.sql',
   'postgres_fdw--1.0--1.1.sql',
   'postgres_fdw--1.1--1.2.sql',
+  'postgres_fdw--1.2--1.3.sql',
   kwargs: contrib_data_args,
 )
 
@@ -50,6 +51,7 @@ tests += {
   'tap': {
     'tests': [
       't/001_auth_scram.pl',
+      't/010_subscription.pl',
     ],
   },
 }
diff --git a/contrib/postgres_fdw/postgres_fdw--1.2--1.3.sql b/contrib/postgres_fdw/postgres_fdw--1.2--1.3.sql
new file mode 100644 (file)
index 0000000..5bcf0ba
--- /dev/null
@@ -0,0 +1,12 @@
+/* contrib/postgres_fdw/postgres_fdw--1.2--1.3.sql */
+
+-- complain if script is sourced in psql, rather than via ALTER EXTENSION
+\echo Use "ALTER EXTENSION postgres_fdw UPDATE TO '1.3'" to load this file. \quit
+
+-- takes internal parameter to prevent calling from SQL
+CREATE FUNCTION postgres_fdw_connection(oid, oid, internal)
+RETURNS text
+AS 'MODULE_PATHNAME'
+LANGUAGE C STRICT PARALLEL RESTRICTED;
+
+ALTER FOREIGN DATA WRAPPER postgres_fdw CONNECTION postgres_fdw_connection;
index a4b800be4fca01d0929cfea0f9e69d536ea8a28d..ae2963d480d3a0eb8499d9c69475eeb3b02b6f13 100644 (file)
@@ -1,5 +1,5 @@
 # postgres_fdw extension
 comment = 'foreign-data wrapper for remote PostgreSQL servers'
-default_version = '1.2'
+default_version = '1.3'
 module_pathname = '$libdir/postgres_fdw'
 relocatable = true
index 72d2d9c311bd029f99020deb60bc15dd91cacc35..49ed797e8efd7dd6bd1afb0f34a6b6c490b04257 100644 (file)
@@ -244,6 +244,13 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
 ANALYZE ft1;
 ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
 
+-- ===================================================================
+-- test subscription
+-- ===================================================================
+CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
+  PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
+DROP SUBSCRIPTION regress_pgfdw_subscription;
+
 -- ===================================================================
 -- test error case for create publication on foreign table
 -- ===================================================================
diff --git a/contrib/postgres_fdw/t/010_subscription.pl b/contrib/postgres_fdw/t/010_subscription.pl
new file mode 100644 (file)
index 0000000..1e41091
--- /dev/null
@@ -0,0 +1,71 @@
+
+# Copyright (c) 2021-2026, PostgreSQL Global Development Group
+
+# Basic logical replication test
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+       "CREATE TABLE tab_ins AS SELECT a, a + 1 as b FROM generate_series(1,1002) AS a");
+
+# Replicate the changes without columns
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_no_col()");
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab_no_col default VALUES");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE EXTENSION postgres_fdw");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int, b int)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_ins");
+
+my $publisher_host = $node_publisher->host;
+my $publisher_port = $node_publisher->port;
+$node_subscriber->safe_psql('postgres',
+       "CREATE SERVER tap_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
+);
+
+$node_subscriber->safe_psql('postgres',
+       "CREATE USER MAPPING FOR PUBLIC SERVER tap_server"
+);
+
+$node_subscriber->safe_psql('postgres',
+       "CREATE FOREIGN TABLE f_tab_ins (a int, b int) SERVER tap_server OPTIONS(table_name 'tab_ins')"
+);
+$node_subscriber->safe_psql('postgres',
+       "CREATE SUBSCRIPTION tap_sub SERVER tap_server PUBLICATION tap_pub WITH (password_required=false)"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+my $result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match");
+is($result, qq(1002), 'check that initial data was copied to subscriber');
+
+$node_publisher->safe_psql('postgres',
+       "INSERT INTO tab_ins SELECT a, a + 1 FROM generate_series(1003,1050) a");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match");
+is($result, qq(1050), 'check that inserted data was copied to subscriber');
+
+done_testing();
index bcb473c078ba66e41f8c8653239b7831e95027df..72c8d3d59bd28b2fb1f155df4126edf22ad64105 100644 (file)
@@ -2577,7 +2577,9 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
   <para>
    To create a subscription, the user must have the privileges of
    the <literal>pg_create_subscription</literal> role, as well as
-   <literal>CREATE</literal> privileges on the database.
+   <literal>CREATE</literal> privileges on the database.  If
+   <literal>SERVER</literal> is specified, the user also must have
+   <literal>USAGE</literal> privileges on the server.
   </para>
 
   <para>
index fcf10e4317e80b27afcb0debcbd225e76c5eabe6..de69ddcdebcc7a25208c130878859dd0687b92af 100644 (file)
@@ -1049,6 +1049,32 @@ postgres=# SELECT postgres_fdw_disconnect_all();
   </para>
  </sect2>
 
+ <sect2 id="postgres-fdw-server-subscription">
+  <title>Subscription Management</title>
+
+  <para>
+   <filename>postgres_fdw</filename> supports subscription connections using
+   the same options described in <xref
+   linkend="postgres-fdw-options-connection"/>.
+  </para>
+
+  <para>
+   For example, assuming the remote server <literal>foreign-host</literal> has
+   a publication <literal>testpub</literal>:
+<programlisting>
+CREATE SERVER subscription_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'foreign-host', dbname 'foreign_db');
+CREATE USER MAPPING FOR local_user SERVER subscription_server OPTIONS (user 'foreign_user', password 'password');
+CREATE SUBSCRIPTION my_subscription SERVER subscription_server PUBLICATION testpub;
+</programlisting>
+  </para>
+
+  <para>
+   To create a subscription, the user must be a member of the <xref
+   linkend="predefined-role-pg-create-subscription"/> role and have
+   <literal>USAGE</literal> privileges on the server.
+  </para>
+ </sect2>
+
  <sect2 id="postgres-fdw-transaction-management">
   <title>Transaction Management</title>
 
index dc0957d965a623c558405da8c197c9baa9a07072..640c02893cf0105ca7c0ed43bbc922958016ed46 100644 (file)
@@ -24,6 +24,7 @@ PostgreSQL documentation
 ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable>
     [ HANDLER <replaceable class="parameter">handler_function</replaceable> | NO HANDLER ]
     [ VALIDATOR <replaceable class="parameter">validator_function</replaceable> | NO VALIDATOR ]
+    [ CONNECTION <replaceable class="parameter">connection_function</replaceable> | NO CONNECTION ]
     [ OPTIONS ( [ ADD | SET | DROP ] <replaceable class="parameter">option</replaceable> ['<replaceable class="parameter">value</replaceable>'] [, ... ]) ]
 ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
 ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
@@ -112,6 +113,25 @@ ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable> REN
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>CONNECTION <replaceable class="parameter">connection_function</replaceable></literal></term>
+    <listitem>
+     <para>
+      Specifies a new connection function for the foreign-data wrapper.
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>NO CONNECTION</literal></term>
+    <listitem>
+     <para>
+      This is used to specify that the foreign-data wrapper should no
+      longer have a connection function.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>OPTIONS ( [ ADD | SET | DROP ] <replaceable class="parameter">option</replaceable> ['<replaceable class="parameter">value</replaceable>'] [, ... ] )</literal></term>
     <listitem>
index 5318998e80c33d2f83765a7e6943deb71b9fade2..f215fb0e5a2b8bf95ca9a483a9c6efd8fedf84d0 100644 (file)
@@ -21,6 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SERVER <replaceable>servername</replaceable>
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>'
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
@@ -102,13 +103,24 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
     </listitem>
    </varlistentry>
 
+   <varlistentry id="sql-altersubscription-params-server">
+    <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+    <listitem>
+     <para>
+      This clause replaces the foreign server or connection string originally
+      set by <xref linkend="sql-createsubscription"/> with the foreign server
+      <replaceable>servername</replaceable>.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry id="sql-altersubscription-params-connection">
     <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
     <listitem>
      <para>
-      This clause replaces the connection string originally set by
-      <xref linkend="sql-createsubscription"/>.  See there for more
-      information.
+      This clause replaces the foreign server or connection string originally
+      set by <xref linkend="sql-createsubscription"/> with the connection
+      string <replaceable>conninfo</replaceable>.
      </para>
     </listitem>
    </varlistentry>
index 0fcba18a3471fa67e480fe1e82d1b4db1b7b5d0c..7b83f500b255c1b45a2899e923ec88f5e4edf6c8 100644 (file)
@@ -24,6 +24,7 @@ PostgreSQL documentation
 CREATE FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable>
     [ HANDLER <replaceable class="parameter">handler_function</replaceable> | NO HANDLER ]
     [ VALIDATOR <replaceable class="parameter">validator_function</replaceable> | NO VALIDATOR ]
+    [ CONNECTION <replaceable class="parameter">connection_function</replaceable> | NO CONNECTION ]
     [ OPTIONS ( <replaceable class="parameter">option</replaceable> '<replaceable class="parameter">value</replaceable>' [, ... ] ) ]
 </synopsis>
  </refsynopsisdiv>
@@ -99,6 +100,25 @@ CREATE FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable>
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>CONNECTION <replaceable class="parameter">connection_function</replaceable></literal></term>
+    <listitem>
+     <para>
+      <replaceable class="parameter">connection_function</replaceable> is the
+      name of a previously registered function that will be called to generate
+      the postgres connection string when a foreign server is used as part of
+      <xref linkend="sql-createsubscription"/>.  If no connection function or
+      <literal>NO CONNECTION</literal> is specified, then servers using this
+      foreign data wrapper cannot be used for <literal>CREATE
+      SUBSCRIPTION</literal>.  The connection function must take three
+      arguments: one of type <type>oid</type> for the user, one of type
+      <type>oid</type> for the server, and an unused third argument of type
+      <type>internal</type> (which prevents calling the function in other
+      contexts).
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>OPTIONS ( <replaceable class="parameter">option</replaceable> '<replaceable class="parameter">value</replaceable>' [, ... ] )</literal></term>
     <listitem>
index 05f4019453ba2d9ed07e31ef89cb6fd344e96697..ce4a064eabb4bdfbe7ec2369f7458fef6c2f1abe 100644 (file)
@@ -42,6 +42,13 @@ CREATE SERVER [ IF NOT EXISTS ] <replaceable class="parameter">server_name</repl
    means of user mappings.
   </para>
 
+  <para>
+   If the foreign data wrapper <replaceable>fdw_name</replaceable> is
+   specified with a <literal>CONNECTION</literal> clause, then <xref
+   linkend="sql-createsubscription"/> may use this foreign server for
+   connection information.
+  </para>
+
   <para>
    The server name must be unique within the database.
   </para>
index eb0cc645d8f8bcf39f5361bd8cb54c9daa8e4884..07d5b1bd77c53fd821d11a5fde7eb9cd40cfc9de 100644 (file)
@@ -22,7 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable>
-    CONNECTION '<replaceable class="parameter">conninfo</replaceable>'
+    { SERVER <replaceable class="parameter">servername</replaceable> | CONNECTION '<replaceable class="parameter">conninfo</replaceable>' }
     PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...]
     [ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 </synopsis>
@@ -77,6 +77,20 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
     </listitem>
    </varlistentry>
 
+   <varlistentry id="sql-createsubscription-params-server">
+    <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+    <listitem>
+     <para>
+      A foreign server to use for the connection.  The server's foreign data
+      wrapper must have a <replaceable>connection_function</replaceable>
+      registered, and a user mapping for the subscription owner on the server
+      must exist.  Additionally, the subscription owner must have
+      <literal>USAGE</literal> privileges on
+      <replaceable>servername</replaceable>.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry id="sql-createsubscription-params-connection">
     <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
     <listitem>
index 570c434ede8372015ea9b6bf00b063d061304dda..09575278de31d1e0b0da5db336b9d3f3df7fe2ed 100644 (file)
@@ -895,6 +895,17 @@ findDependentObjects(const ObjectAddress *object,
                        object->objectSubId == 0)
                        continue;
 
+               /*
+                * Check that the dependent object is not in a shared catalog, which
+                * is not supported by doDeletion().
+                */
+               if (IsSharedRelation(otherObject.classId))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
+                                        errmsg("cannot drop %s because %s depends on it",
+                                                       getObjectDescription(object, false),
+                                                       getObjectDescription(&otherObject, false))));
+
                /*
                 * Must lock the dependent object before recursing to it.
                 */
index acf42b853ed4d98df68fd987aff7a9dc556cad12..3673d4f0bc183198d8d80c647a8b5e87ac890355 100644 (file)
 #include "access/htup_details.h"
 #include "access/tableam.h"
 #include "catalog/indexing.h"
+#include "catalog/pg_foreign_server.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
+#include "foreign/foreign.h"
 #include "miscadmin.h"
 #include "storage/lmgr.h"
+#include "utils/acl.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
@@ -69,7 +72,7 @@ GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
  * Fetch the subscription from the syscache.
  */
 Subscription *
-GetSubscription(Oid subid, bool missing_ok)
+GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
 {
        HeapTuple       tup;
        Subscription *sub;
@@ -108,10 +111,35 @@ GetSubscription(Oid subid, bool missing_ok)
        sub->retentionactive = subform->subretentionactive;
 
        /* Get conninfo */
-       datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
-                                                                  tup,
-                                                                  Anum_pg_subscription_subconninfo);
-       sub->conninfo = TextDatumGetCString(datum);
+       if (OidIsValid(subform->subserver))
+       {
+               AclResult       aclresult;
+
+               /* recheck ACL if requested */
+               if (aclcheck)
+               {
+                       aclresult = object_aclcheck(ForeignServerRelationId,
+                                                                               subform->subserver,
+                                                                               subform->subowner, ACL_USAGE);
+
+                       if (aclresult != ACLCHECK_OK)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                                errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
+                                                               GetUserNameFromId(subform->subowner, false),
+                                                               ForeignServerName(subform->subserver))));
+               }
+
+               sub->conninfo = ForeignServerConnectionString(subform->subowner,
+                                                                                                         subform->subserver);
+       }
+       else
+       {
+               datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
+                                                                          tup,
+                                                                          Anum_pg_subscription_subconninfo);
+               sub->conninfo = TextDatumGetCString(datum);
+       }
 
        /* Get slotname */
        datum = SysCacheGetAttr(SUBSCRIPTIONOID,
index 2eda7d80d022fac48ba872ec8537bed29d93b7eb..ecb7c996e8646bdb3b2b09510cee7a494ff0b3e0 100644 (file)
@@ -1449,7 +1449,7 @@ GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
               subbinary, substream, subtwophasestate, subdisableonerr,
                          subpasswordrequired, subrunasowner, subfailover,
               subretaindeadtuples, submaxretention, subretentionactive,
-              subslotname, subsynccommit, subpublications, suborigin)
+              subserver, subslotname, subsynccommit, subpublications, suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
index b56d1ad6785c2d162b5e1cef92d8e90f20a05215..45681235782aa31df10b102c0b4304cfe8f191d2 100644 (file)
@@ -522,21 +522,53 @@ lookup_fdw_validator_func(DefElem *validator)
        /* validator's return value is ignored, so we don't check the type */
 }
 
+/*
+ * Convert a connection string function name passed from the parser to an Oid.
+ */
+static Oid
+lookup_fdw_connection_func(DefElem *connection)
+{
+       Oid                     connectionOid;
+       Oid                     funcargtypes[3];
+
+       if (connection == NULL || connection->arg == NULL)
+               return InvalidOid;
+
+       /* connection string functions take user oid, server oid */
+       funcargtypes[0] = OIDOID;
+       funcargtypes[1] = OIDOID;
+       funcargtypes[2] = INTERNALOID;
+
+       connectionOid = LookupFuncName((List *) connection->arg, 3, funcargtypes, false);
+
+       /* check that connection string function has correct return type */
+       if (get_func_rettype(connectionOid) != TEXTOID)
+               ereport(ERROR,
+                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                errmsg("function %s must return type %s",
+                                               NameListToString((List *) connection->arg), "text")));
+
+       return connectionOid;
+}
+
 /*
  * Process function options of CREATE/ALTER FDW
  */
 static void
 parse_func_options(ParseState *pstate, List *func_options,
                                   bool *handler_given, Oid *fdwhandler,
-                                  bool *validator_given, Oid *fdwvalidator)
+                                  bool *validator_given, Oid *fdwvalidator,
+                                  bool *connection_given, Oid *fdwconnection)
 {
        ListCell   *cell;
 
        *handler_given = false;
        *validator_given = false;
+       *connection_given = false;
        /* return InvalidOid if not given */
        *fdwhandler = InvalidOid;
        *fdwvalidator = InvalidOid;
+       *fdwconnection = InvalidOid;
 
        foreach(cell, func_options)
        {
@@ -556,6 +588,13 @@ parse_func_options(ParseState *pstate, List *func_options,
                        *validator_given = true;
                        *fdwvalidator = lookup_fdw_validator_func(def);
                }
+               else if (strcmp(def->defname, "connection") == 0)
+               {
+                       if (*connection_given)
+                               errorConflictingDefElem(def, pstate);
+                       *connection_given = true;
+                       *fdwconnection = lookup_fdw_connection_func(def);
+               }
                else
                        elog(ERROR, "option \"%s\" not recognized",
                                 def->defname);
@@ -575,8 +614,10 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt)
        Oid                     fdwId;
        bool            handler_given;
        bool            validator_given;
+       bool            connection_given;
        Oid                     fdwhandler;
        Oid                     fdwvalidator;
+       Oid                     fdwconnection;
        Datum           fdwoptions;
        Oid                     ownerId;
        ObjectAddress myself;
@@ -620,10 +661,12 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt)
        /* Lookup handler and validator functions, if given */
        parse_func_options(pstate, stmt->func_options,
                                           &handler_given, &fdwhandler,
-                                          &validator_given, &fdwvalidator);
+                                          &validator_given, &fdwvalidator,
+                                          &connection_given, &fdwconnection);
 
        values[Anum_pg_foreign_data_wrapper_fdwhandler - 1] = ObjectIdGetDatum(fdwhandler);
        values[Anum_pg_foreign_data_wrapper_fdwvalidator - 1] = ObjectIdGetDatum(fdwvalidator);
+       values[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection);
 
        nulls[Anum_pg_foreign_data_wrapper_fdwacl - 1] = true;
 
@@ -695,8 +738,10 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
        Datum           datum;
        bool            handler_given;
        bool            validator_given;
+       bool            connection_given;
        Oid                     fdwhandler;
        Oid                     fdwvalidator;
+       Oid                     fdwconnection;
        ObjectAddress myself;
 
        rel = table_open(ForeignDataWrapperRelationId, RowExclusiveLock);
@@ -726,7 +771,8 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
 
        parse_func_options(pstate, stmt->func_options,
                                           &handler_given, &fdwhandler,
-                                          &validator_given, &fdwvalidator);
+                                          &validator_given, &fdwvalidator,
+                                          &connection_given, &fdwconnection);
 
        if (handler_given)
        {
@@ -764,6 +810,12 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
                fdwvalidator = fdwForm->fdwvalidator;
        }
 
+       if (connection_given)
+       {
+               repl_val[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection);
+               repl_repl[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = true;
+       }
+
        /*
         * If options specified, validate and update.
         */
index 5e3c0964d38eb1daba7907d5fe7675fe6e42f20f..9e21d7a7df9410769556329bb0ea565d787e89cf 100644 (file)
 #include "catalog/objectaddress.h"
 #include "catalog/pg_authid_d.h"
 #include "catalog/pg_database_d.h"
+#include "catalog/pg_foreign_server.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
+#include "catalog/pg_user_mapping.h"
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
 #include "executor/executor.h"
+#include "foreign/foreign.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
@@ -619,6 +622,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
        Datum           values[Natts_pg_subscription];
        Oid                     owner = GetUserId();
        HeapTuple       tup;
+       Oid                     serverid;
        char       *conninfo;
        char            originname[NAMEDATALEN];
        List       *publications;
@@ -730,15 +734,40 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
        if (opts.wal_receiver_timeout == NULL)
                opts.wal_receiver_timeout = "-1";
 
-       conninfo = stmt->conninfo;
-       publications = stmt->publication;
-
        /* Load the library providing us libpq calls. */
        load_file("libpqwalreceiver", false);
 
+       if (stmt->servername)
+       {
+               ForeignServer *server;
+
+               Assert(!stmt->conninfo);
+               conninfo = NULL;
+
+               server = GetForeignServerByName(stmt->servername, false);
+               aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE);
+               if (aclresult != ACLCHECK_OK)
+                       aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
+
+               /* make sure a user mapping exists */
+               GetUserMapping(owner, server->serverid);
+
+               serverid = server->serverid;
+               conninfo = ForeignServerConnectionString(owner, serverid);
+       }
+       else
+       {
+               Assert(stmt->conninfo);
+
+               serverid = InvalidOid;
+               conninfo = stmt->conninfo;
+       }
+
        /* Check the connection info string. */
        walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
 
+       publications = stmt->publication;
+
        /* Everything ok, form a new tuple. */
        memset(values, 0, sizeof(values));
        memset(nulls, false, sizeof(nulls));
@@ -768,8 +797,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
                Int32GetDatum(opts.maxretention);
        values[Anum_pg_subscription_subretentionactive - 1] =
                Int32GetDatum(opts.retaindeadtuples);
-       values[Anum_pg_subscription_subconninfo - 1] =
-               CStringGetTextDatum(conninfo);
+       values[Anum_pg_subscription_subserver - 1] = serverid;
+       if (!OidIsValid(serverid))
+               values[Anum_pg_subscription_subconninfo - 1] =
+                       CStringGetTextDatum(conninfo);
+       else
+               nulls[Anum_pg_subscription_subconninfo - 1] = true;
        if (opts.slot_name)
                values[Anum_pg_subscription_subslotname - 1] =
                        DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
@@ -792,6 +825,18 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
        recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
 
+       ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
+       if (stmt->servername)
+       {
+               ObjectAddress referenced;
+
+               Assert(OidIsValid(serverid));
+
+               ObjectAddressSet(referenced, ForeignServerRelationId, serverid);
+               recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+       }
+
        /*
         * A replication origin is currently created for all subscriptions,
         * including those that only contain sequences or are otherwise empty.
@@ -945,8 +990,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
        if (opts.enabled || opts.retaindeadtuples)
                ApplyLauncherWakeupAtCommit();
 
-       ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
        InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
 
        return myself;
@@ -1410,7 +1453,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
                aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
                                           stmt->subname);
 
-       sub = GetSubscription(subid, false);
+       /*
+        * Skip ACL checks on the subscription's foreign server, if any. If
+        * changing the server (or replacing it with a raw connection), then the
+        * old one will be removed anyway. If changing something unrelated,
+        * there's no need to do an additional ACL check here; that will be done
+        * by the subscription worker anyway.
+        */
+       sub = GetSubscription(subid, false, false);
 
        retain_dead_tuples = sub->retaindeadtuples;
        origin = sub->origin;
@@ -1435,6 +1485,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
        memset(nulls, false, sizeof(nulls));
        memset(replaces, false, sizeof(replaces));
 
+       ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
        switch (stmt->kind)
        {
                case ALTER_SUBSCRIPTION_OPTIONS:
@@ -1753,7 +1805,78 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
                                break;
                        }
 
+               case ALTER_SUBSCRIPTION_SERVER:
+                       {
+                               ForeignServer *new_server;
+                               ObjectAddress referenced;
+                               AclResult       aclresult;
+                               char       *conninfo;
+
+                               /*
+                                * Remove what was there before, either another foreign server
+                                * or a connection string.
+                                */
+                               if (form->subserver)
+                               {
+                                       deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+                                                                                                          DEPENDENCY_NORMAL,
+                                                                                                          ForeignServerRelationId, form->subserver);
+                               }
+                               else
+                               {
+                                       nulls[Anum_pg_subscription_subconninfo - 1] = true;
+                                       replaces[Anum_pg_subscription_subconninfo - 1] = true;
+                               }
+
+                               /*
+                                * Check that the subscription owner has USAGE privileges on
+                                * the server.
+                                */
+                               new_server = GetForeignServerByName(stmt->servername, false);
+                               aclresult = object_aclcheck(ForeignServerRelationId,
+                                                                                       new_server->serverid,
+                                                                                       form->subowner, ACL_USAGE);
+                               if (aclresult != ACLCHECK_OK)
+                                       ereport(ERROR,
+                                                       errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                                       errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
+                                                                  GetUserNameFromId(form->subowner, false),
+                                                                  ForeignServerName(new_server->serverid)));
+
+                               /* make sure a user mapping exists */
+                               GetUserMapping(form->subowner, new_server->serverid);
+
+                               conninfo = ForeignServerConnectionString(form->subowner,
+                                                                                                                new_server->serverid);
+
+                               /* Load the library providing us libpq calls. */
+                               load_file("libpqwalreceiver", false);
+                               /* Check the connection info string. */
+                               walrcv_check_conninfo(conninfo,
+                                                                         sub->passwordrequired && !sub->ownersuperuser);
+
+                               values[Anum_pg_subscription_subserver - 1] = new_server->serverid;
+                               replaces[Anum_pg_subscription_subserver - 1] = true;
+
+                               ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid);
+                               recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+
+                               update_tuple = true;
+                       }
+                       break;
+
                case ALTER_SUBSCRIPTION_CONNECTION:
+                       /* remove reference to foreign server and dependencies, if present */
+                       if (form->subserver)
+                       {
+                               deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+                                                                                                  DEPENDENCY_NORMAL,
+                                                                                                  ForeignServerRelationId, form->subserver);
+
+                               values[Anum_pg_subscription_subserver - 1] = InvalidOid;
+                               replaces[Anum_pg_subscription_subserver - 1] = true;
+                       }
+
                        /* Load the library providing us libpq calls. */
                        load_file("libpqwalreceiver", false);
                        /* Check the connection info string. */
@@ -2038,8 +2161,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
        table_close(rel, RowExclusiveLock);
 
-       ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
        InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
        /* Wake up related replication workers to handle this change quickly. */
@@ -2068,7 +2189,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
        ListCell   *lc;
        char            originname[NAMEDATALEN];
        char       *err = NULL;
-       WalReceiverConn *wrconn;
+       WalReceiverConn *wrconn = NULL;
        Form_pg_subscription form;
        List       *rstates;
        bool            must_use_password;
@@ -2126,9 +2247,35 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
        subname = pstrdup(NameStr(*DatumGetName(datum)));
 
        /* Get conninfo */
-       datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
-                                                                  Anum_pg_subscription_subconninfo);
-       conninfo = TextDatumGetCString(datum);
+       if (OidIsValid(form->subserver))
+       {
+               AclResult       aclresult;
+
+               aclresult = object_aclcheck(ForeignServerRelationId, form->subserver,
+                                                                       form->subowner, ACL_USAGE);
+               if (aclresult != ACLCHECK_OK)
+               {
+                       /*
+                        * Unable to generate connection string because permissions on the
+                        * foreign server have been removed. Follow the same logic as an
+                        * unusable subconninfo (which will result in an ERROR later
+                        * unless slot_name = NONE).
+                        */
+                       err = psprintf(_("subscription owner \"%s\" does not have permission on foreign server \"%s\""),
+                                                  GetUserNameFromId(form->subowner, false),
+                                                  ForeignServerName(form->subserver));
+                       conninfo = NULL;
+               }
+               else
+                       conninfo = ForeignServerConnectionString(form->subowner,
+                                                                                                        form->subserver);
+       }
+       else
+       {
+               datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
+                                                                          Anum_pg_subscription_subconninfo);
+               conninfo = TextDatumGetCString(datum);
+       }
 
        /* Get slotname */
        datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
@@ -2227,6 +2374,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
        }
 
        /* Clean up dependencies */
+       deleteDependencyRecordsFor(SubscriptionRelationId, subid, false);
        deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
        /* Remove any associated relation synchronization states. */
@@ -2265,8 +2413,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
         */
        load_file("libpqwalreceiver", false);
 
-       wrconn = walrcv_connect(conninfo, true, true, must_use_password,
-                                                       subname, &err);
+       if (conninfo)
+               wrconn = walrcv_connect(conninfo, true, true, must_use_password,
+                                                               subname, &err);
+
        if (wrconn == NULL)
        {
                if (!slotname)
@@ -2436,6 +2586,27 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
                aclcheck_error(aclresult, OBJECT_DATABASE,
                                           get_database_name(MyDatabaseId));
 
+       /*
+        * If the subscription uses a server, check that the new owner has USAGE
+        * privileges on the server and that a user mapping exists. Note: does not
+        * re-check the resulting connection string.
+        */
+       if (OidIsValid(form->subserver))
+       {
+               Oid                     serverid = form->subserver;
+
+               aclresult = object_aclcheck(ForeignServerRelationId, serverid, newOwnerId, ACL_USAGE);
+               if (aclresult != ACLCHECK_OK)
+                       ereport(ERROR,
+                                       errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                       errmsg("new subscription owner \"%s\" does not have permission on foreign server \"%s\"",
+                                                  GetUserNameFromId(newOwnerId, false),
+                                                  ForeignServerName(serverid)));
+
+               /* make sure a user mapping exists */
+               GetUserMapping(newOwnerId, serverid);
+       }
+
        form->subowner = newOwnerId;
        CatalogTupleUpdate(rel, &tup->t_self, tup);
 
index b912a06dd15a5a65ef39445a99edaeeb2ffd8e99..c53699959eafeb68911186cd3af674a9fc83753b 100644 (file)
@@ -72,6 +72,7 @@ GetForeignDataWrapperExtended(Oid fdwid, bits16 flags)
        fdw->fdwname = pstrdup(NameStr(fdwform->fdwname));
        fdw->fdwhandler = fdwform->fdwhandler;
        fdw->fdwvalidator = fdwform->fdwvalidator;
+       fdw->fdwconnection = fdwform->fdwconnection;
 
        /* Extract the fdwoptions */
        datum = SysCacheGetAttr(FOREIGNDATAWRAPPEROID,
@@ -176,6 +177,31 @@ GetForeignServerExtended(Oid serverid, bits16 flags)
 }
 
 
+/*
+ * ForeignServerName - get name of foreign server.
+ */
+char *
+ForeignServerName(Oid serverid)
+{
+       Form_pg_foreign_server serverform;
+       char       *servername;
+       HeapTuple       tp;
+
+       tp = SearchSysCache1(FOREIGNSERVEROID, ObjectIdGetDatum(serverid));
+
+       if (!HeapTupleIsValid(tp))
+               elog(ERROR, "cache lookup failed for foreign server %u", serverid);
+
+       serverform = (Form_pg_foreign_server) GETSTRUCT(tp);
+
+       servername = pstrdup(NameStr(serverform->srvname));
+
+       ReleaseSysCache(tp);
+
+       return servername;
+}
+
+
 /*
  * GetForeignServerByName - look up the foreign server definition by name.
  */
@@ -191,6 +217,66 @@ GetForeignServerByName(const char *srvname, bool missing_ok)
 }
 
 
+/*
+ * Retrieve connection string from server's FDW.
+ */
+char *
+ForeignServerConnectionString(Oid userid, Oid serverid)
+{
+       MemoryContext tempContext;
+       MemoryContext oldcxt;
+       volatile text *connection_text = NULL;
+       char       *result = NULL;
+
+       /*
+        * GetForeignServer, GetForeignDataWrapper, and the connection function
+        * itself all leak memory into CurrentMemoryContext. Switch to a temporary
+        * context for easy cleanup.
+        */
+       tempContext = AllocSetContextCreate(CurrentMemoryContext,
+                                                                               "FDWConnectionContext",
+                                                                               ALLOCSET_SMALL_SIZES);
+
+       oldcxt = MemoryContextSwitchTo(tempContext);
+
+       PG_TRY();
+       {
+               ForeignServer *server;
+               ForeignDataWrapper *fdw;
+               Datum           connection_datum;
+
+               server = GetForeignServer(serverid);
+               fdw = GetForeignDataWrapper(server->fdwid);
+
+               if (!OidIsValid(fdw->fdwconnection))
+                       ereport(ERROR,
+                                       (errmsg("foreign data wrapper \"%s\" does not support subscription connections",
+                                                       fdw->fdwname),
+                                        errdetail("Foreign data wrapper must be defined with CONNECTION specified.")));
+
+
+               connection_datum = OidFunctionCall3(fdw->fdwconnection,
+                                                                                       ObjectIdGetDatum(userid),
+                                                                                       ObjectIdGetDatum(serverid),
+                                                                                       PointerGetDatum(NULL));
+
+               connection_text = DatumGetTextPP(connection_datum);
+       }
+       PG_FINALLY();
+       {
+               MemoryContextSwitchTo(oldcxt);
+
+               if (connection_text)
+                       result = text_to_cstring((text *) connection_text);
+
+               MemoryContextDelete(tempContext);
+       }
+       PG_END_TRY();
+
+       return result;
+}
+
+
 /*
  * GetUserMapping - look up the user mapping.
  *
index 3c3e24324a8ef2d9eb3a2585fb009c7e911a8f13..9cbe8eafc4545e5c6c55f9e7bff6d6e322fee1dd 100644 (file)
@@ -5583,6 +5583,8 @@ fdw_option:
                        | NO HANDLER                                            { $$ = makeDefElem("handler", NULL, @1); }
                        | VALIDATOR handler_name                        { $$ = makeDefElem("validator", (Node *) $2, @1); }
                        | NO VALIDATOR                                          { $$ = makeDefElem("validator", NULL, @1); }
+                       | CONNECTION handler_name                       { $$ = makeDefElem("connection", (Node *) $2, @1); }
+                       | NO CONNECTION                                         { $$ = makeDefElem("connection", NULL, @1); }
                ;
 
 fdw_options:
@@ -11057,6 +11059,16 @@ CreateSubscriptionStmt:
                                        n->options = $8;
                                        $$ = (Node *) n;
                                }
+                       | CREATE SUBSCRIPTION name SERVER name PUBLICATION name_list opt_definition
+                               {
+                                       CreateSubscriptionStmt *n =
+                                               makeNode(CreateSubscriptionStmt);
+                                       n->subname = $3;
+                                       n->servername = $5;
+                                       n->publication = $7;
+                                       n->options = $8;
+                                       $$ = (Node *) n;
+                               }
                ;
 
 /*****************************************************************************
@@ -11086,6 +11098,16 @@ AlterSubscriptionStmt:
                                        n->conninfo = $5;
                                        $$ = (Node *) n;
                                }
+                       | ALTER SUBSCRIPTION name SERVER name
+                               {
+                                       AlterSubscriptionStmt *n =
+                                               makeNode(AlterSubscriptionStmt);
+
+                                       n->kind = ALTER_SUBSCRIPTION_SERVER;
+                                       n->subname = $3;
+                                       n->servername = $5;
+                                       $$ = (Node *) n;
+                               }
                        | ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition
                                {
                                        AlterSubscriptionStmt *n =
index 274dc71c6a1f2b82190ec6692c700393d56c8f85..3f7de2efce05356ebdfcc486cd5f39e1ef482058 100644 (file)
@@ -5059,7 +5059,7 @@ maybe_reread_subscription(void)
        /* Ensure allocations in permanent context. */
        oldctx = MemoryContextSwitchTo(ApplyContext);
 
-       newsub = GetSubscription(MyLogicalRepWorker->subid, true);
+       newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
 
        /*
         * Exit if the subscription was removed. This normally should not happen
@@ -5201,7 +5201,9 @@ set_wal_receiver_timeout(void)
 }
 
 /*
- * Callback from subscription syscache invalidation.
+ * Callback from subscription syscache invalidation. Also needed for server or
+ * user mapping invalidation, which can change the connection information for
+ * subscriptions that connect using a server object.
  */
 static void
 subscription_change_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
@@ -5806,7 +5808,7 @@ InitializeLogRepWorker(void)
         */
        LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
                                         AccessShareLock);
-       MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
+       MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
        if (!MySubscription)
        {
                ereport(LOG,
@@ -5871,6 +5873,22 @@ InitializeLogRepWorker(void)
        CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
                                                                  subscription_change_cb,
                                                                  (Datum) 0);
+       /* Changes to foreign servers may affect subscriptions using SERVER. */
+       CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+                                                                 subscription_change_cb,
+                                                                 (Datum) 0);
+       /* Changes to user mappings may affect subscriptions using SERVER. */
+       CacheRegisterSyscacheCallback(USERMAPPINGOID,
+                                                                 subscription_change_cb,
+                                                                 (Datum) 0);
+
+       /*
+        * Changes to FDW connection_function may affect subscriptions using
+        * SERVER.
+        */
+       CacheRegisterSyscacheCallback(FOREIGNDATAWRAPPEROID,
+                                                                 subscription_change_cb,
+                                                                 (Datum) 0);
 
        CacheRegisterSyscacheCallback(AUTHOID,
                                                                  subscription_change_cb,
index 1035bba72cee5920726b9e21cdd2a02f9de1669f..88e0a55f8f1441c7f71096472228aeb2fe54df87 100644 (file)
@@ -5182,6 +5182,7 @@ getSubscriptions(Archive *fout)
        int                     i_subdisableonerr;
        int                     i_subpasswordrequired;
        int                     i_subrunasowner;
+       int                     i_subservername;
        int                     i_subconninfo;
        int                     i_subslotname;
        int                     i_subsynccommit;
@@ -5286,14 +5287,24 @@ getSubscriptions(Archive *fout)
 
        if (fout->remoteVersion >= 190000)
                appendPQExpBufferStr(query,
-                                                        " s.subwalrcvtimeout\n");
+                                                        " s.subwalrcvtimeout,\n");
        else
                appendPQExpBufferStr(query,
-                                                        " '-1' AS subwalrcvtimeout\n");
+                                                        " '-1' AS subwalrcvtimeout,\n");
+
+       if (fout->remoteVersion >= 190000)
+               appendPQExpBufferStr(query, " fs.srvname AS subservername\n");
+       else
+               appendPQExpBufferStr(query, " NULL AS subservername\n");
 
        appendPQExpBufferStr(query,
                                                 "FROM pg_subscription s\n");
 
+       if (fout->remoteVersion >= 190000)
+               appendPQExpBufferStr(query,
+                                                        "LEFT JOIN pg_catalog.pg_foreign_server fs \n"
+                                                        "    ON fs.oid = s.subserver \n");
+
        if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
                appendPQExpBufferStr(query,
                                                         "LEFT JOIN pg_catalog.pg_replication_origin_status o \n"
@@ -5325,6 +5336,7 @@ getSubscriptions(Archive *fout)
        i_subfailover = PQfnumber(res, "subfailover");
        i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples");
        i_submaxretention = PQfnumber(res, "submaxretention");
+       i_subservername = PQfnumber(res, "subservername");
        i_subconninfo = PQfnumber(res, "subconninfo");
        i_subslotname = PQfnumber(res, "subslotname");
        i_subsynccommit = PQfnumber(res, "subsynccommit");
@@ -5347,6 +5359,10 @@ getSubscriptions(Archive *fout)
 
                subinfo[i].subenabled =
                        (strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0);
+               if (PQgetisnull(res, i, i_subservername))
+                       subinfo[i].subservername = NULL;
+               else
+                       subinfo[i].subservername = pg_strdup(PQgetvalue(res, i, i_subservername));
                subinfo[i].subbinary =
                        (strcmp(PQgetvalue(res, i, i_subbinary), "t") == 0);
                subinfo[i].substream = *(PQgetvalue(res, i, i_substream));
@@ -5363,8 +5379,11 @@ getSubscriptions(Archive *fout)
                        (strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0);
                subinfo[i].submaxretention =
                        atoi(PQgetvalue(res, i, i_submaxretention));
-               subinfo[i].subconninfo =
-                       pg_strdup(PQgetvalue(res, i, i_subconninfo));
+               if (PQgetisnull(res, i, i_subconninfo))
+                       subinfo[i].subconninfo = NULL;
+               else
+                       subinfo[i].subconninfo =
+                               pg_strdup(PQgetvalue(res, i, i_subconninfo));
                if (PQgetisnull(res, i, i_subslotname))
                        subinfo[i].subslotname = NULL;
                else
@@ -5575,9 +5594,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
        appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
                                          qsubname);
 
-       appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
+       appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s ",
                                          qsubname);
-       appendStringLiteralAH(query, subinfo->subconninfo, fout);
+       if (subinfo->subservername)
+       {
+               appendPQExpBuffer(query, "SERVER %s", fmtId(subinfo->subservername));
+       }
+       else
+       {
+               appendPQExpBuffer(query, "CONNECTION ");
+               appendStringLiteralAH(query, subinfo->subconninfo, fout);
+       }
 
        /* Build list of quoted publications and append them to query. */
        if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
index e138ef1276c31f339f58b23fec434cd7c6f5cdb3..1c11a79083f513df1ee6e64f0a42cb428dd19c61 100644 (file)
@@ -720,6 +720,7 @@ typedef struct _SubscriptionInfo
        bool            subfailover;
        bool            subretaindeadtuples;
        int                     submaxretention;
+       char       *subservername;
        char       *subconninfo;
        char       *subslotname;
        char       *subsynccommit;
index a94eade282f4cd6845ccf6029e737b2d3b4d15ab..211d8f3b1ecdb7d5c0a4a131f82c32e6455346ae 100644 (file)
@@ -6895,7 +6895,7 @@ describeSubscriptions(const char *pattern, bool verbose)
        printQueryOpt myopt = pset.popt;
        static const bool translate_columns[] = {false, false, false, false,
                false, false, false, false, false, false, false, false, false, false,
-       false, false, false, false, false, false};
+       false, false, false, false, false, false, false};
 
        if (pset.sversion < 100000)
        {
@@ -6965,6 +6965,10 @@ describeSubscriptions(const char *pattern, bool verbose)
                                                          gettext_noop("Failover"));
                if (pset.sversion >= 190000)
                {
+                       appendPQExpBuffer(&buf,
+                                                         ", (select srvname from pg_foreign_server where oid=subserver) AS \"%s\"\n",
+                                                         gettext_noop("Server"));
+
                        appendPQExpBuffer(&buf,
                                                          ", subretaindeadtuples AS \"%s\"\n",
                                                          gettext_noop("Retain dead tuples"));
index f8c0865ca89501553c012e9ffeb68d5d26dabf00..6484c6a3dd4e2d79291239785ee6dc846ea97e07 100644 (file)
@@ -2332,7 +2332,7 @@ match_previous_words(int pattern_id,
        else if (Matches("ALTER", "SUBSCRIPTION", MatchAny))
                COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO",
                                          "RENAME TO", "REFRESH PUBLICATION", "REFRESH SEQUENCES",
-                                         "SET", "SKIP (", "ADD PUBLICATION", "DROP PUBLICATION");
+                                         "SERVER", "SET", "SKIP (", "ADD PUBLICATION", "DROP PUBLICATION");
        /* ALTER SUBSCRIPTION <name> REFRESH */
        else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH"))
                COMPLETE_WITH("PUBLICATION", "SEQUENCES");
@@ -3870,9 +3870,16 @@ match_previous_words(int pattern_id,
 
 /* CREATE SUBSCRIPTION */
        else if (Matches("CREATE", "SUBSCRIPTION", MatchAny))
-               COMPLETE_WITH("CONNECTION");
+               COMPLETE_WITH("SERVER", "CONNECTION");
+       else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "SERVER", MatchAny))
+               COMPLETE_WITH("PUBLICATION");
        else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny))
                COMPLETE_WITH("PUBLICATION");
+       else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "SERVER",
+                                        MatchAny, "PUBLICATION"))
+       {
+               /* complete with nothing here as this refers to remote publications */
+       }
        else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION",
                                         MatchAny, "PUBLICATION"))
        {
index b863edfabdad89afc13847ff83f5138aa3bbcdbe..b6508b60a843dd04d7e8cca85776e2e26505f423 100644 (file)
@@ -57,6 +57,6 @@
  */
 
 /*                                                     yyyymmddN */
-#define CATALOG_VERSION_NO     202603061
+#define CATALOG_VERSION_NO     202603062
 
 #endif
index e6009069e82b7c3fa38541d6f73e2b2c8997f725..3d8389de65e18a379ad34894b78f6e6d5e3af14d 100644 (file)
@@ -38,6 +38,9 @@ CATALOG(pg_foreign_data_wrapper,2328,ForeignDataWrapperRelationId)
        Oid                     fdwvalidator BKI_LOOKUP_OPT(pg_proc);   /* option validation
                                                                                                                 * function, or 0 if
                                                                                                                 * none */
+       Oid                     fdwconnection BKI_LOOKUP_OPT(pg_proc);  /* connection string
+                                                                                                                * function, or 0 if
+                                                                                                                * none */
 
 #ifdef CATALOG_VARLEN                  /* variable-length fields start here */
        aclitem         fdwacl[1];              /* access permissions */
index c369b5abfb362516a3b412fb8cc8c01a9b4293f9..0058d9387d74e06329424d3b6216c986449de354 100644 (file)
@@ -92,9 +92,12 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
                                                                         * exceeded max_retention_duration, when
                                                                         * defined */
 
+       Oid                     subserver BKI_LOOKUP_OPT(pg_foreign_server);    /* If connection uses
+                                                                                                                                * server */
+
 #ifdef CATALOG_VARLEN                  /* variable-length fields start here */
        /* Connection string to the publisher */
-       text            subconninfo BKI_FORCE_NOT_NULL;
+       text            subconninfo;    /* Set if connecting with connection string */
 
        /* Slot name on publisher */
        NameData        subslotname BKI_FORCE_NULL;
@@ -207,7 +210,8 @@ typedef struct Subscription
 
 #endif                                                 /* EXPOSE_TO_CLIENT_CODE */
 
-extern Subscription *GetSubscription(Oid subid, bool missing_ok);
+extern Subscription *GetSubscription(Oid subid, bool missing_ok,
+                                                                        bool aclcheck);
 extern void FreeSubscription(Subscription *sub);
 extern void DisableSubscription(Oid subid);
 
index c185d1458a28015bd62d4bc5b44d5c82f24ac74b..65ed9a7f9870a68453d1dfe1379cdbaa10d1e1f8 100644 (file)
@@ -28,6 +28,7 @@ typedef struct ForeignDataWrapper
        char       *fdwname;            /* Name of the FDW */
        Oid                     fdwhandler;             /* Oid of handler function, or 0 */
        Oid                     fdwvalidator;   /* Oid of validator function, or 0 */
+       Oid                     fdwconnection;  /* Oid of connection string function, or 0 */
        List       *options;            /* fdwoptions as DefElem list */
 } ForeignDataWrapper;
 
@@ -65,10 +66,12 @@ typedef struct ForeignTable
 
 
 extern ForeignServer *GetForeignServer(Oid serverid);
+extern char *ForeignServerName(Oid serverid);
 extern ForeignServer *GetForeignServerExtended(Oid serverid,
                                                                                           bits16 flags);
 extern ForeignServer *GetForeignServerByName(const char *srvname,
                                                                                         bool missing_ok);
+extern char *ForeignServerConnectionString(Oid userid, Oid serverid);
 extern UserMapping *GetUserMapping(Oid userid, Oid serverid);
 extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid);
 extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid,
index ff41943a6db0b9e6a301b9d834f444a12e962827..4ee092206b0dc55fc1d959bddbf3e85b31f30250 100644 (file)
@@ -4383,6 +4383,7 @@ typedef struct CreateSubscriptionStmt
 {
        NodeTag         type;
        char       *subname;            /* Name of the subscription */
+       char       *servername;         /* Server name of publisher */
        char       *conninfo;           /* Connection string to publisher */
        List       *publication;        /* One or more publication to subscribe to */
        List       *options;            /* List of DefElem nodes */
@@ -4391,6 +4392,7 @@ typedef struct CreateSubscriptionStmt
 typedef enum AlterSubscriptionType
 {
        ALTER_SUBSCRIPTION_OPTIONS,
+       ALTER_SUBSCRIPTION_SERVER,
        ALTER_SUBSCRIPTION_CONNECTION,
        ALTER_SUBSCRIPTION_SET_PUBLICATION,
        ALTER_SUBSCRIPTION_ADD_PUBLICATION,
@@ -4406,6 +4408,7 @@ typedef struct AlterSubscriptionStmt
        NodeTag         type;
        AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */
        char       *subname;            /* Name of the subscription */
+       char       *servername;         /* Server name of publisher */
        char       *conninfo;           /* Connection string to publisher */
        List       *publication;        /* One or more publication to subscribe to */
        List       *options;            /* List of DefElem nodes */
index 25aaae8d05a341223bd42cad8abce5788b4052bb..51b9608a6680854bce7ae6e47fea380ad970a427 100644 (file)
@@ -224,6 +224,7 @@ NOTICE:  checking pg_extension {extconfig} => pg_class {oid}
 NOTICE:  checking pg_foreign_data_wrapper {fdwowner} => pg_authid {oid}
 NOTICE:  checking pg_foreign_data_wrapper {fdwhandler} => pg_proc {oid}
 NOTICE:  checking pg_foreign_data_wrapper {fdwvalidator} => pg_proc {oid}
+NOTICE:  checking pg_foreign_data_wrapper {fdwconnection} => pg_proc {oid}
 NOTICE:  checking pg_foreign_server {srvowner} => pg_authid {oid}
 NOTICE:  checking pg_foreign_server {srvfdw} => pg_foreign_data_wrapper {oid}
 NOTICE:  checking pg_user_mapping {umuser} => pg_authid {oid}
@@ -269,5 +270,6 @@ NOTICE:  checking pg_publication_rel {prpubid} => pg_publication {oid}
 NOTICE:  checking pg_publication_rel {prrelid} => pg_class {oid}
 NOTICE:  checking pg_subscription {subdbid} => pg_database {oid}
 NOTICE:  checking pg_subscription {subowner} => pg_authid {oid}
+NOTICE:  checking pg_subscription {subserver} => pg_foreign_server {oid}
 NOTICE:  checking pg_subscription_rel {srsubid} => pg_subscription {oid}
 NOTICE:  checking pg_subscription_rel {srrelid} => pg_class {oid}
index a5fdfe68a0e10a111f02d2d285c6ff736226ec36..f57f359127be3e94b54cbfcbe36a6f65a938f1f9 100644 (file)
@@ -1,6 +1,14 @@
 --
 -- SUBSCRIPTION
 --
+-- directory paths and dlsuffix are passed to us in environment variables
+\getenv libdir PG_LIBDIR
+\getenv dlsuffix PG_DLSUFFIX
+\set regresslib :libdir '/regress' :dlsuffix
+CREATE FUNCTION test_fdw_connection(oid, oid, internal)
+    RETURNS text
+    AS :'regresslib', 'test_fdw_connection'
+    LANGUAGE C;
 CREATE ROLE regress_subscription_user LOGIN SUPERUSER;
 CREATE ROLE regress_subscription_user2;
 CREATE ROLE regress_subscription_user3 IN ROLE pg_create_subscription;
@@ -116,18 +124,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+ regress_testsub4
-                                                                                                                                                                   List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | none   | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                       List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | none   | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                                                                                                   List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                       List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
@@ -140,15 +148,53 @@ ERROR:  invalid connection string syntax: invalid connection option "i_dont_exis
 -- connecting, so this is reliable and safe)
 CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub;
 ERROR:  subscription "regress_testsub5" could not connect to the publisher: invalid port number: "-1"
+CREATE FOREIGN DATA WRAPPER test_fdw;
+CREATE SERVER test_server FOREIGN DATA WRAPPER test_fdw;
+GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+-- fail, need USAGE privileges on server
+CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+ERROR:  permission denied for foreign server test_server
+RESET SESSION AUTHORIZATION;
+GRANT USAGE ON FOREIGN SERVER test_server TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+-- fail, need user mapping
+CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+ERROR:  user mapping not found for user "regress_subscription_user3", server "test_server"
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER test_server OPTIONS(user 'foo', password 'secret');
+-- fail, need CONNECTION clause
+CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+ERROR:  foreign data wrapper "test_fdw" does not support subscription connections
+DETAIL:  Foreign data wrapper must be defined with CONNECTION specified.
+RESET SESSION AUTHORIZATION;
+ALTER FOREIGN DATA WRAPPER test_fdw CONNECTION test_fdw_connection;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = 'dummy', connect = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+DROP USER MAPPING FOR regress_subscription_user3 SERVER test_server;
+RESET SESSION AUTHORIZATION;
+REVOKE USAGE ON FOREIGN SERVER test_server FROM regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+-- fail, must connect but lacks USAGE on server, as well as user mapping
+DROP SUBSCRIPTION regress_testsub6;
+ERROR:  could not connect to publisher when attempting to drop replication slot "dummy": subscription owner "regress_subscription_user3" does not have permission on foreign server "test_server"
+HINT:  Use ALTER SUBSCRIPTION ... DISABLE to disable the subscription, and then use ALTER SUBSCRIPTION ... SET (slot_name = NONE) to disassociate it from the slot.
+ALTER SUBSCRIPTION regress_testsub6 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub6;
+SET SESSION AUTHORIZATION regress_subscription_user;
+REVOKE CREATE ON DATABASE REGRESSION FROM regress_subscription_user3;
+DROP SERVER test_server;
+DROP FOREIGN DATA WRAPPER test_fdw;
 -- fail - invalid connection string during ALTER
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                                                                                                     List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  |    Description    
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | test subscription
+                                                                                                                                                                          List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  |    Description    
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | test subscription
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -157,10 +203,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
 ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
 ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
 \dRs+
-                                                                                                                                                                          List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | f                 | t             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | -1               | 0/00000000 | test subscription
+                                                                                                                                                                              List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | f                 | t             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | -1               | 0/00000000 | test subscription
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@@ -176,10 +222,10 @@ ERROR:  unrecognized subscription parameter: "create_slot"
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                                                                                                          List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | -1               | 0/00012345 | test subscription
+                                                                                                                                                                              List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | -1               | 0/00012345 | test subscription
 (1 row)
 
 -- ok - with lsn = NONE
@@ -188,10 +234,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                                                                                                          List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | -1               | 0/00000000 | test subscription
+                                                                                                                                                                              List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist2 | -1               | 0/00000000 | test subscription
 (1 row)
 
 BEGIN;
@@ -227,10 +273,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (wal_receiver_timeout = '80s');
 ALTER SUBSCRIPTION regress_testsub_foo SET (wal_receiver_timeout = 'foobar');
 ERROR:  invalid value for parameter "wal_receiver_timeout": "foobar"
 \dRs+
-                                                                                                                                                                            List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | local              | dbname=regress_doesnotexist2 | 80s              | 0/00000000 | test subscription
+                                                                                                                                                                                List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |           Conninfo           | Receiver timeout |  Skip LSN  |    Description    
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | local              | dbname=regress_doesnotexist2 | 80s              | 0/00000000 | test subscription
 (1 row)
 
 -- rename back to keep the rest simple
@@ -259,19 +305,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                       List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                       List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -283,27 +329,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                       List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
 \dRs+
-                                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                       List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                       List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 -- fail - publication already exists
@@ -318,10 +364,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                                                                                          List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                               List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 -- fail - publication used more than once
@@ -336,10 +382,10 @@ ERROR:  publication "testpub3" is not in subscription "regress_testsub"
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                       List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
@@ -375,19 +421,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | p                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                       List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | p                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 -- we can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                       List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -397,10 +443,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                       List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -413,18 +459,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                       List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | t                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                       List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | t                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -437,10 +483,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                       List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@@ -454,19 +500,19 @@ NOTICE:  max_retention_duration is ineffective when retain_dead_tuples is disabl
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
 \dRs+
-                                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                   1000 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                       List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                   1000 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 -- ok
 ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0);
 \dRs+
-                                                                                                                                                                  List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
+                                                                                                                                                                       List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit |          Conninfo           | Receiver timeout |  Skip LSN  | Description 
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        |        | f                  |                      0 | f                | off                | dbname=regress_doesnotexist | -1               | 0/00000000 | 
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
index a02f41c97279ebcc4cb44f49776276303cf25d9d..68a01a1dde01483d18c0c967c025194a8eb9aa0e 100644 (file)
@@ -729,6 +729,13 @@ test_fdw_handler(PG_FUNCTION_ARGS)
        PG_RETURN_NULL();
 }
 
+PG_FUNCTION_INFO_V1(test_fdw_connection);
+Datum
+test_fdw_connection(PG_FUNCTION_ARGS)
+{
+       PG_RETURN_TEXT_P(cstring_to_text("dbname=regress_doesnotexist user=doesnotexist password=secret"));
+}
+
 PG_FUNCTION_INFO_V1(is_catalog_text_unique_index_oid);
 Datum
 is_catalog_text_unique_index_oid(PG_FUNCTION_ARGS)
index d93cbc279d9cc7bb08fc703b80abd18f4353b0bc..a642b3681837537801a236696923f2b5f30df993 100644 (file)
@@ -2,6 +2,17 @@
 -- SUBSCRIPTION
 --
 
+-- directory paths and dlsuffix are passed to us in environment variables
+\getenv libdir PG_LIBDIR
+\getenv dlsuffix PG_DLSUFFIX
+
+\set regresslib :libdir '/regress' :dlsuffix
+
+CREATE FUNCTION test_fdw_connection(oid, oid, internal)
+    RETURNS text
+    AS :'regresslib', 'test_fdw_connection'
+    LANGUAGE C;
+
 CREATE ROLE regress_subscription_user LOGIN SUPERUSER;
 CREATE ROLE regress_subscription_user2;
 CREATE ROLE regress_subscription_user3 IN ROLE pg_create_subscription;
@@ -85,6 +96,50 @@ CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'i_dont_exist=param' PUBLICATION
 -- connecting, so this is reliable and safe)
 CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub;
 
+CREATE FOREIGN DATA WRAPPER test_fdw;
+CREATE SERVER test_server FOREIGN DATA WRAPPER test_fdw;
+
+GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+
+-- fail, need USAGE privileges on server
+CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+
+RESET SESSION AUTHORIZATION;
+GRANT USAGE ON FOREIGN SERVER test_server TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+
+-- fail, need user mapping
+CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER test_server OPTIONS(user 'foo', password 'secret');
+
+-- fail, need CONNECTION clause
+CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+
+RESET SESSION AUTHORIZATION;
+ALTER FOREIGN DATA WRAPPER test_fdw CONNECTION test_fdw_connection;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+
+CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = 'dummy', connect = false);
+
+DROP USER MAPPING FOR regress_subscription_user3 SERVER test_server;
+RESET SESSION AUTHORIZATION;
+REVOKE USAGE ON FOREIGN SERVER test_server FROM regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+
+-- fail, must connect but lacks USAGE on server, as well as user mapping
+DROP SUBSCRIPTION regress_testsub6;
+
+ALTER SUBSCRIPTION regress_testsub6 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub6;
+
+SET SESSION AUTHORIZATION regress_subscription_user;
+REVOKE CREATE ON DATABASE REGRESSION FROM regress_subscription_user3;
+
+DROP SERVER test_server;
+DROP FOREIGN DATA WRAPPER test_fdw;
+
 -- fail - invalid connection string during ALTER
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';