/*
* Fetch the subscription from the syscache.
+ *
+ * If conninfo_needed is true, conninfo will be constructed, possibly
+ * encountering errors in ForeignServerConnectionString(). Callers not
+ * expecting such errors should pass false, in which case conninfo will be
+ * NULL.
*/
Subscription *
-GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
+GetSubscription(Oid subid, bool missing_ok, bool conninfo_needed,
+ bool conninfo_aclcheck)
{
HeapTuple tup;
Subscription *sub;
MemoryContext cxt;
MemoryContext oldcxt;
+ Assert(conninfo_needed || !conninfo_aclcheck);
+
tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
if (!HeapTupleIsValid(tup))
subform = (Form_pg_subscription) GETSTRUCT(tup);
- sub = palloc_object(Subscription);
+ sub = palloc0_object(Subscription);
sub->cxt = cxt;
sub->oid = subid;
sub->dbid = subform->subdbid;
sub->maxretention = subform->submaxretention;
sub->retentionactive = subform->subretentionactive;
- /* Get conninfo */
- if (OidIsValid(subform->subserver))
+ if (conninfo_needed)
{
- AclResult aclresult;
- ForeignServer *server;
-
- server = GetForeignServer(subform->subserver);
-
- /* recheck ACL if requested */
- if (aclcheck)
+ if (OidIsValid(subform->subserver))
{
- aclresult = object_aclcheck(ForeignServerRelationId,
- subform->subserver,
- subform->subowner, ACL_USAGE);
-
- if (aclresult != ACLCHECK_OK)
- ereport(ERROR,
- (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
- errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
- GetUserNameFromId(subform->subowner, false),
- server->servername)));
+ AclResult aclresult;
+ ForeignServer *server;
+
+ server = GetForeignServer(subform->subserver);
+
+ if (conninfo_aclcheck)
+ {
+ /* recheck ACL if requested */
+ aclresult = object_aclcheck(ForeignServerRelationId,
+ subform->subserver,
+ subform->subowner, ACL_USAGE);
+
+ if (aclresult != ACLCHECK_OK)
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
+ GetUserNameFromId(subform->subowner, false),
+ server->servername)));
+ }
+
+ sub->conninfo = ForeignServerConnectionString(subform->subowner,
+ server);
+ }
+ else
+ {
+ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
+ tup,
+ Anum_pg_subscription_subconninfo);
+ sub->conninfo = TextDatumGetCString(datum);
}
-
- sub->conninfo = ForeignServerConnectionString(subform->subowner,
- server);
- }
- else
- {
- datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
- tup,
- Anum_pg_subscription_subconninfo);
- sub->conninfo = TextDatumGetCString(datum);
}
/* Get slotname */
Datum values[Natts_pg_subscription];
HeapTuple tup;
Oid subid;
+ bool orig_conninfo_needed = true;
bool update_tuple = false;
bool update_failover = false;
bool update_two_phase = false;
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
stmt->subname);
+ /* parse and check options */
+ switch (stmt->kind)
+ {
+ case ALTER_SUBSCRIPTION_OPTIONS:
+ supported_opts = (SUBOPT_SLOT_NAME |
+ SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
+ SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+ SUBOPT_DISABLE_ON_ERR |
+ SUBOPT_PASSWORD_REQUIRED |
+ SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+ SUBOPT_RETAIN_DEAD_TUPLES |
+ SUBOPT_MAX_RETENTION_DURATION |
+ SUBOPT_WAL_RECEIVER_TIMEOUT |
+ SUBOPT_ORIGIN);
+ break;
+
+ case ALTER_SUBSCRIPTION_ENABLED:
+ supported_opts = SUBOPT_ENABLED;
+ break;
+
+ case ALTER_SUBSCRIPTION_SET_PUBLICATION:
+ supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
+ break;
+
+ case ALTER_SUBSCRIPTION_ADD_PUBLICATION:
+ case ALTER_SUBSCRIPTION_DROP_PUBLICATION:
+ supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
+ break;
+
+ case ALTER_SUBSCRIPTION_REFRESH_PUBLICATION:
+ supported_opts = SUBOPT_COPY_DATA;
+ break;
+
+ case ALTER_SUBSCRIPTION_SKIP:
+ supported_opts = SUBOPT_LSN;
+ break;
+
+ default:
+ supported_opts = 0;
+ break;
+ }
+
+ if (supported_opts > 0)
+ parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
+
+ /*
+ * Ensure that ALTER SUBSCRIPTION commands that could be used to fix a
+ * broken connection or prepare to drop a broken subscription don't
+ * attempt to construct the conninfo. Otherwise, we might encounter the
+ * error the user is trying to fix.
+ *
+ * Specifically, ALTER SUBSCRIPTION DISABLE, ALTER SUBSCRIPTION SERVER,
+ * ALTER SUBSCRIPTION CONNECTION, or ALTER SUBSCRIPTION SET
+ * (slot_name=NONE).
+ *
+ * NB: if the user specifies multiple SET options, then we may still need
+ * to construct conninfo even if slot_name is set to NONE.
+ */
+ if (stmt->kind == ALTER_SUBSCRIPTION_ENABLED)
+ {
+ if (opts.specified_opts == SUBOPT_ENABLED && !opts.enabled)
+ orig_conninfo_needed = false;
+ }
+ else if (stmt->kind == ALTER_SUBSCRIPTION_SERVER ||
+ stmt->kind == ALTER_SUBSCRIPTION_CONNECTION)
+ {
+ orig_conninfo_needed = false;
+ }
+ else if (stmt->kind == ALTER_SUBSCRIPTION_OPTIONS)
+ {
+ /* ... SET (slot_name = NONE) with no other options */
+ if (opts.specified_opts == SUBOPT_SLOT_NAME && !opts.slot_name)
+ orig_conninfo_needed = false;
+ }
+
/*
* Skip ACL checks on the subscription's foreign server, if any. If
* changing the server (or replacing it with a raw connection), then the
* old one will be removed anyway. If changing something unrelated,
* there's no need to do an additional ACL check here; that will be done
- * by the subscription worker anyway.
+ * by the subscription worker.
*/
- sub = GetSubscription(subid, false, false);
+ sub = GetSubscription(subid, false, orig_conninfo_needed, false);
retain_dead_tuples = sub->retaindeadtuples;
origin = sub->origin;
{
case ALTER_SUBSCRIPTION_OPTIONS:
{
- supported_opts = (SUBOPT_SLOT_NAME |
- SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
- SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
- SUBOPT_DISABLE_ON_ERR |
- SUBOPT_PASSWORD_REQUIRED |
- SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
- SUBOPT_RETAIN_DEAD_TUPLES |
- SUBOPT_MAX_RETENTION_DURATION |
- SUBOPT_WAL_RECEIVER_TIMEOUT |
- SUBOPT_ORIGIN);
-
- parse_subscription_options(pstate, stmt->options,
- supported_opts, &opts);
-
if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
{
/*
case ALTER_SUBSCRIPTION_ENABLED:
{
- parse_subscription_options(pstate, stmt->options,
- SUBOPT_ENABLED, &opts);
Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
if (!sub->slotname && opts.enabled)
case ALTER_SUBSCRIPTION_SET_PUBLICATION:
{
- supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
- parse_subscription_options(pstate, stmt->options,
- supported_opts, &opts);
-
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(stmt->publication);
replaces[Anum_pg_subscription_subpublications - 1] = true;
List *publist;
bool isadd = stmt->kind == ALTER_SUBSCRIPTION_ADD_PUBLICATION;
- supported_opts = SUBOPT_REFRESH | SUBOPT_COPY_DATA;
- parse_subscription_options(pstate, stmt->options,
- supported_opts, &opts);
-
publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(publist);
errmsg("%s is not allowed for disabled subscriptions",
"ALTER SUBSCRIPTION ... REFRESH PUBLICATION")));
- parse_subscription_options(pstate, stmt->options,
- SUBOPT_COPY_DATA, &opts);
-
/*
* The subscription option "two_phase" requires that
* replication has passed the initial table synchronization
case ALTER_SUBSCRIPTION_SKIP:
{
- parse_subscription_options(pstate, stmt->options, SUBOPT_LSN, &opts);
-
/* ALTER SUBSCRIPTION ... SKIP supports only LSN option */
Assert(IsSet(opts.specified_opts, SUBOPT_LSN));
char *err;
WalReceiverConn *wrconn;
+ Assert(new_conninfo || orig_conninfo_needed);
+
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
started_tx = true;
}
- newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
+ newsub = GetSubscription(MyLogicalRepWorker->subid, true, true, true);
if (newsub)
{
LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
AccessShareLock);
- MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
+ MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true, true);
if (MySubscription)
{
#endif /* EXPOSE_TO_CLIENT_CODE */
extern Subscription *GetSubscription(Oid subid, bool missing_ok,
- bool aclcheck);
+ bool conninfo_needed,
+ bool conninfo_aclcheck);
extern void DisableSubscription(Oid subid);
extern int CountDBSubscriptions(Oid dbid);
RESET SESSION AUTHORIZATION;
ALTER FOREIGN DATA WRAPPER test_fdw CONNECTION test_fdw_connection;
SET SESSION AUTHORIZATION regress_subscription_user3;
-CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = 'dummy', connect = false);
+CREATE SUBSCRIPTION regress_testsub6 SERVER test_server
+ PUBLICATION testpub WITH (slot_name = 'dummy', connect = false);
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
-DROP USER MAPPING FOR regress_subscription_user3 SERVER test_server;
RESET SESSION AUTHORIZATION;
REVOKE USAGE ON FOREIGN SERVER test_server FROM regress_subscription_user3;
SET SESSION AUTHORIZATION regress_subscription_user3;
--- fail, must connect but lacks USAGE on server, as well as user mapping
+-- ok, lacks USAGE on test_server, but replacing connection anyway
+BEGIN;
+ALTER SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=secret';
+ABORT;
+-- fails, cannot drop slot
DROP SUBSCRIPTION regress_testsub6;
ERROR: could not connect to publisher when attempting to drop replication slot "dummy": subscription owner "regress_subscription_user3" does not have permission on foreign server "test_server"
HINT: Use ALTER SUBSCRIPTION ... DISABLE to disable the subscription, and then use ALTER SUBSCRIPTION ... SET (slot_name = NONE) to disassociate it from the slot.
+RESET SESSION AUTHORIZATION;
+GRANT USAGE ON FOREIGN SERVER test_server TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
ALTER SUBSCRIPTION regress_testsub6 SET (slot_name = NONE);
-DROP SUBSCRIPTION regress_testsub6;
+DROP SUBSCRIPTION regress_testsub6; --ok
+CREATE SUBSCRIPTION regress_testsub6 SERVER test_server
+ PUBLICATION testpub WITH (slot_name = 'dummy', connect = false);
+WARNING: subscription was created, but is not connected
+HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
+DROP USER MAPPING FOR regress_subscription_user3 SERVER test_server;
+-- ok, test_server lacks user mapping, but replacing connection anyway
+BEGIN;
+ALTER SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=secret';
+ABORT;
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER test_server OPTIONS(user 'foo', password 'secret');
+ALTER SUBSCRIPTION regress_testsub6 DISABLE;
+ALTER SUBSCRIPTION regress_testsub6 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub6; --ok
+DROP USER MAPPING FOR regress_subscription_user3 SERVER test_server;
SET SESSION AUTHORIZATION regress_subscription_user;
REVOKE CREATE ON DATABASE REGRESSION FROM regress_subscription_user3;
DROP SERVER test_server;
#include "executor/executor.h"
#include "executor/functions.h"
#include "executor/spi.h"
+#include "foreign/foreign.h"
#include "funcapi.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
Datum
test_fdw_connection(PG_FUNCTION_ARGS)
{
+ /* Ensure the test fails if no valid user mapping exists. */
+ GetUserMapping(PG_GETARG_OID(0), PG_GETARG_OID(1));
PG_RETURN_TEXT_P(cstring_to_text("dbname=regress_doesnotexist user=doesnotexist password=secret"));
}
ALTER FOREIGN DATA WRAPPER test_fdw CONNECTION test_fdw_connection;
SET SESSION AUTHORIZATION regress_subscription_user3;
-CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = 'dummy', connect = false);
+CREATE SUBSCRIPTION regress_testsub6 SERVER test_server
+ PUBLICATION testpub WITH (slot_name = 'dummy', connect = false);
-DROP USER MAPPING FOR regress_subscription_user3 SERVER test_server;
RESET SESSION AUTHORIZATION;
REVOKE USAGE ON FOREIGN SERVER test_server FROM regress_subscription_user3;
SET SESSION AUTHORIZATION regress_subscription_user3;
--- fail, must connect but lacks USAGE on server, as well as user mapping
+-- ok, lacks USAGE on test_server, but replacing connection anyway
+BEGIN;
+ALTER SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=secret';
+ABORT;
+
+-- fails, cannot drop slot
DROP SUBSCRIPTION regress_testsub6;
+RESET SESSION AUTHORIZATION;
+GRANT USAGE ON FOREIGN SERVER test_server TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+
ALTER SUBSCRIPTION regress_testsub6 SET (slot_name = NONE);
-DROP SUBSCRIPTION regress_testsub6;
+DROP SUBSCRIPTION regress_testsub6; --ok
+
+CREATE SUBSCRIPTION regress_testsub6 SERVER test_server
+ PUBLICATION testpub WITH (slot_name = 'dummy', connect = false);
+
+DROP USER MAPPING FOR regress_subscription_user3 SERVER test_server;
+
+-- ok, test_server lacks user mapping, but replacing connection anyway
+BEGIN;
+ALTER SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=secret';
+ABORT;
+
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER test_server OPTIONS(user 'foo', password 'secret');
+
+ALTER SUBSCRIPTION regress_testsub6 DISABLE;
+ALTER SUBSCRIPTION regress_testsub6 SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub6; --ok
+
+DROP USER MAPPING FOR regress_subscription_user3 SERVER test_server;
SET SESSION AUTHORIZATION regress_subscription_user;
REVOKE CREATE ON DATABASE REGRESSION FROM regress_subscription_user3;