/* Try to connect to the publisher. */
must_use_password = !superuser_arg(owner) && opts.passwordrequired;
- wrconn = walrcv_connect(conninfo, true, must_use_password,
+ wrconn = walrcv_connect(conninfo, true, true, must_use_password,
stmt->subname, &err);
if (!wrconn)
ereport(ERROR,
/* Try to connect to the publisher. */
must_use_password = sub->passwordrequired && !sub->ownersuperuser;
- wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
+ wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
sub->name, &err);
if (!wrconn)
ereport(ERROR,
/* Try to connect to the publisher. */
must_use_password = sub->passwordrequired && !sub->ownersuperuser;
- wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
+ wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
sub->name, &err);
if (!wrconn)
ereport(ERROR,
*/
load_file("libpqwalreceiver", false);
- wrconn = walrcv_connect(conninfo, true, must_use_password,
+ wrconn = walrcv_connect(conninfo, true, true, must_use_password,
subname, &err);
if (wrconn == NULL)
{
/* Prototypes for interface functions */
static WalReceiverConn *libpqrcv_connect(const char *conninfo,
- bool logical, bool must_use_password,
+ bool replication, bool logical,
+ bool must_use_password,
const char *appname, char **err);
static void libpqrcv_check_conninfo(const char *conninfo,
bool must_use_password);
char **sender_host, int *sender_port);
static char *libpqrcv_identify_system(WalReceiverConn *conn,
TimeLineID *primary_tli);
+static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo);
static int libpqrcv_server_version(WalReceiverConn *conn);
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
TimeLineID tli, char **filename,
.walrcv_send = libpqrcv_send,
.walrcv_create_slot = libpqrcv_create_slot,
.walrcv_alter_slot = libpqrcv_alter_slot,
+ .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo,
.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
.walrcv_exec = libpqrcv_exec,
.walrcv_disconnect = libpqrcv_disconnect
}
/*
- * Establish the connection to the primary server for XLOG streaming
+ * Establish the connection to the primary server.
+ *
+ * This function can be used for both replication and regular connections.
+ * If it is a replication connection, it could be either logical or physical
+ * based on input argument 'logical'.
*
* If an error occurs, this function will normally return NULL and set *err
* to a palloc'ed error message. However, if must_use_password is true and
* case.
*/
static WalReceiverConn *
-libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
- const char *appname, char **err)
+libpqrcv_connect(const char *conninfo, bool replication, bool logical,
+ bool must_use_password, const char *appname, char **err)
{
WalReceiverConn *conn;
PostgresPollingStatusType status;
*/
keys[i] = "dbname";
vals[i] = conninfo;
- keys[++i] = "replication";
- vals[i] = logical ? "database" : "true";
- if (!logical)
+
+ /* We can not have logical without replication */
+ Assert(replication || !logical);
+
+ if (replication)
{
- /*
- * The database name is ignored by the server in replication mode, but
- * specify "replication" for .pgpass lookup.
- */
- keys[++i] = "dbname";
- vals[i] = "replication";
+ keys[++i] = "replication";
+ vals[i] = logical ? "database" : "true";
+
+ if (logical)
+ {
+ /* Tell the publisher to translate to our encoding */
+ keys[++i] = "client_encoding";
+ vals[i] = GetDatabaseEncodingName();
+
+ /*
+ * Force assorted GUC parameters to settings that ensure that the
+ * publisher will output data values in a form that is unambiguous
+ * to the subscriber. (We don't want to modify the subscriber's
+ * GUC settings, since that might surprise user-defined code
+ * running in the subscriber, such as triggers.) This should
+ * match what pg_dump does.
+ */
+ keys[++i] = "options";
+ vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
+ }
+ else
+ {
+ /*
+ * The database name is ignored by the server in replication mode,
+ * but specify "replication" for .pgpass lookup.
+ */
+ keys[++i] = "dbname";
+ vals[i] = "replication";
+ }
}
+
keys[++i] = "fallback_application_name";
vals[i] = appname;
- if (logical)
- {
- /* Tell the publisher to translate to our encoding */
- keys[++i] = "client_encoding";
- vals[i] = GetDatabaseEncodingName();
- /*
- * Force assorted GUC parameters to settings that ensure that the
- * publisher will output data values in a form that is unambiguous to
- * the subscriber. (We don't want to modify the subscriber's GUC
- * settings, since that might surprise user-defined code running in
- * the subscriber, such as triggers.) This should match what pg_dump
- * does.
- */
- keys[++i] = "options";
- vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
- }
keys[++i] = NULL;
vals[i] = NULL;
return PQserverVersion(conn->streamConn);
}
+/*
+ * Get database name from the primary server's conninfo.
+ *
+ * If dbname is not found in connInfo, return NULL value.
+ */
+static char *
+libpqrcv_get_dbname_from_conninfo(const char *connInfo)
+{
+ PQconninfoOption *opts;
+ char *dbname = NULL;
+ char *err = NULL;
+
+ opts = PQconninfoParse(connInfo, &err);
+ if (opts == NULL)
+ {
+ /* The error string is malloc'd, so we must free it explicitly */
+ char *errcopy = err ? pstrdup(err) : "out of memory";
+
+ PQfreemem(err);
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("invalid connection string syntax: %s", errcopy)));
+ }
+
+ for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
+ {
+ /*
+ * If multiple dbnames are specified, then the last one will be
+ * returned
+ */
+ if (strcmp(opt->keyword, "dbname") == 0 && opt->val &&
+ *opt->val)
+ {
+ if (dbname)
+ pfree(dbname);
+
+ dbname = pstrdup(opt->val);
+ }
+ }
+
+ PQconninfoFree(opts);
+ return dbname;
+}
+
/*
* Start streaming WAL data from given streaming options.
*
* so that synchronous replication can distinguish them.
*/
LogRepWorkerWalRcvConn =
- walrcv_connect(MySubscription->conninfo, true,
+ walrcv_connect(MySubscription->conninfo, true, true,
must_use_password,
slotname, &err);
if (LogRepWorkerWalRcvConn == NULL)
!MySubscription->ownersuperuser;
LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
- must_use_password,
+ true, must_use_password,
MySubscription->name, &err);
if (LogRepWorkerWalRcvConn == NULL)
sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
/* Establish the connection to the primary for XLOG streaming */
- wrconn = walrcv_connect(conninfo, false, false,
+ wrconn = walrcv_connect(conninfo, true, false, false,
cluster_name[0] ? cluster_name : "walreceiver",
&err);
if (!wrconn)
/*
* walrcv_connect_fn
*
- * Establish connection to a cluster. 'logical' is true if the
- * connection is logical, and false if the connection is physical.
+ * Establish connection to a cluster. 'replication' is true if the
+ * connection is a replication connection, and false if it is a
+ * regular connection. If it is a replication connection, it could
+ * be either logical or physical based on input argument 'logical'.
* 'appname' is a name associated to the connection, to use for example
* with fallback_application_name or application_name. Returns the
* details about the connection established, as defined by
* returned with 'err' including the error generated.
*/
typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo,
+ bool replication,
bool logical,
bool must_use_password,
const char *appname,
typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
TimeLineID *primary_tli);
+/*
+ * walrcv_get_dbname_from_conninfo_fn
+ *
+ * Returns the database name from the primary_conninfo
+ */
+typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo);
+
/*
* walrcv_server_version_fn
*
walrcv_get_conninfo_fn walrcv_get_conninfo;
walrcv_get_senderinfo_fn walrcv_get_senderinfo;
walrcv_identify_system_fn walrcv_identify_system;
+ walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo;
walrcv_server_version_fn walrcv_server_version;
walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
walrcv_startstreaming_fn walrcv_startstreaming;
extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
-#define walrcv_connect(conninfo, logical, must_use_password, appname, err) \
- WalReceiverFunctions->walrcv_connect(conninfo, logical, must_use_password, appname, err)
+#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) \
+ WalReceiverFunctions->walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_check_conninfo(conninfo, must_use_password) \
WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password)
#define walrcv_get_conninfo(conn) \
WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
#define walrcv_identify_system(conn, primary_tli) \
WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
+#define walrcv_get_dbname_from_conninfo(conninfo) \
+ WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo)
#define walrcv_server_version(conn) \
WalReceiverFunctions->walrcv_server_version(conn)
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \