]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
pg_createsubscriber: Introduce module-specific logging functions.
authorAmit Kapila <akapila@postgresql.org>
Mon, 23 Mar 2026 03:53:20 +0000 (09:23 +0530)
committerAmit Kapila <akapila@postgresql.org>
Mon, 23 Mar 2026 03:53:20 +0000 (09:23 +0530)
Replace generic pg_log_* calls with report_createsub_log() and
report_createsub_fatal(). This refactor provides the necessary
infrastructure to support logging to external files via the -l option.

These new functions enable the utility to route messages to both the
terminal and a log file based on the logging configuration and verbosity
levels provided by the user.

Author: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Author: Gyan Sreejith <gyan.sreejith@gmail.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAEqnbaUthOQARV1dscGvB_EsqC-YfxiM6rWkVDHc+G+f4oSUHw@mail.gmail.com

src/bin/pg_basebackup/pg_createsubscriber.c

index 2bc84505aab899283200f32c1e9101d7c584cfb1..b2bc9dae0b8321da57069fe4257533a48da2f7c5 100644 (file)
@@ -146,6 +146,11 @@ static void drop_existing_subscription(PGconn *conn, const char *subname,
                                                                           const char *dbname);
 static void get_publisher_databases(struct CreateSubscriberOptions *opt,
                                                                        bool dbnamespecified);
+static void report_createsub_log(enum pg_log_level, enum pg_log_part,
+                                                                const char *pg_restrict fmt,...)
+                       pg_attribute_printf(3, 4);
+pg_noreturn static void report_createsub_fatal(const char *pg_restrict fmt,...)
+                       pg_attribute_printf(1, 2);
 
 #define        WAIT_INTERVAL   1               /* 1 second */
 
@@ -174,6 +179,38 @@ static bool recovery_ended = false;
 static bool standby_running = false;
 static bool recovery_params_set = false;
 
+/*
+ * Report a message with a given log level
+ */
+static void
+report_createsub_log(enum pg_log_level level, enum pg_log_part part,
+                                        const char *pg_restrict fmt,...)
+{
+       va_list         args;
+
+       va_start(args, fmt);
+
+       pg_log_generic_v(level, part, fmt, args);
+
+       va_end(args);
+}
+
+/*
+ * Report a fatal error and exit
+ */
+static void
+report_createsub_fatal(const char *pg_restrict fmt,...)
+{
+       va_list         args;
+
+       va_start(args, fmt);
+
+       pg_log_generic_v(PG_LOG_ERROR, PG_LOG_PRIMARY, fmt, args);
+
+       va_end(args);
+
+       exit(1);
+}
 
 /*
  * Clean up objects created by pg_createsubscriber.
@@ -205,7 +242,8 @@ cleanup_objects_atexit(void)
                if (durable_rename(conf_filename, conf_filename_disabled) != 0)
                {
                        /* durable_rename() has already logged something. */
-                       pg_log_warning_hint("A manual removal of the recovery parameters may be required.");
+                       report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+                                                                "A manual removal of the recovery parameters may be required.");
                }
        }
 
@@ -219,9 +257,11 @@ cleanup_objects_atexit(void)
         */
        if (recovery_ended)
        {
-               pg_log_warning("failed after the end of recovery");
-               pg_log_warning_hint("The target server cannot be used as a physical replica anymore.  "
-                                                       "You must recreate the physical replica before continuing.");
+               report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+                                                        "failed after the end of recovery");
+               report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+                                                        "The target server cannot be used as a physical replica anymore.  "
+                                                        "You must recreate the physical replica before continuing.");
        }
 
        for (int i = 0; i < num_dbs; i++)
@@ -251,17 +291,21 @@ cleanup_objects_atexit(void)
                                 */
                                if (dbinfo->made_publication)
                                {
-                                       pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
-                                                                  dbinfo->pubname,
-                                                                  dbinfo->dbname);
-                                       pg_log_warning_hint("Drop this publication before trying again.");
+                                       report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+                                                                                "publication \"%s\" created in database \"%s\" on primary was left behind",
+                                                                                dbinfo->pubname,
+                                                                                dbinfo->dbname);
+                                       report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+                                                                                "Drop this publication before trying again.");
                                }
                                if (dbinfo->made_replslot)
                                {
-                                       pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
-                                                                  dbinfo->replslotname,
-                                                                  dbinfo->dbname);
-                                       pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
+                                       report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+                                                                                "replication slot \"%s\" created in database \"%s\" on primary was left behind",
+                                                                                dbinfo->replslotname,
+                                                                                dbinfo->dbname);
+                                       report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+                                                                                "Drop this replication slot soon to avoid retention of WAL files.");
                                }
                        }
                }
@@ -342,7 +386,8 @@ get_base_conninfo(const char *conninfo, char **dbname)
        conn_opts = PQconninfoParse(conninfo, &errmsg);
        if (conn_opts == NULL)
        {
-               pg_log_error("could not parse connection string: %s", errmsg);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "could not parse connection string: %s", errmsg);
                PQfreemem(errmsg);
                return NULL;
        }
@@ -419,14 +464,15 @@ get_exec_path(const char *argv0, const char *progname)
                        strlcpy(full_path, progname, sizeof(full_path));
 
                if (ret == -1)
-                       pg_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
-                                        progname, "pg_createsubscriber", full_path);
+                       report_createsub_fatal("program \"%s\" is needed by %s but was not found in the same directory as \"%s\"",
+                                                                  progname, "pg_createsubscriber", full_path);
                else
-                       pg_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
-                                        progname, full_path, "pg_createsubscriber");
+                       report_createsub_fatal("program \"%s\" was found by \"%s\" but was not the same version as %s",
+                                                                  progname, full_path, "pg_createsubscriber");
        }
 
-       pg_log_debug("%s path is:  %s", progname, exec_path);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "%s path is:  %s", progname, exec_path);
 
        return exec_path;
 }
@@ -443,15 +489,16 @@ check_data_directory(const char *datadir)
        uint32          major_version;
        char       *version_str;
 
-       pg_log_info("checking if directory \"%s\" is a cluster data directory",
-                               datadir);
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "checking if directory \"%s\" is a cluster data directory",
+                                                datadir);
 
        if (stat(datadir, &statbuf) != 0)
        {
                if (errno == ENOENT)
-                       pg_fatal("data directory \"%s\" does not exist", datadir);
+                       report_createsub_fatal("data directory \"%s\" does not exist", datadir);
                else
-                       pg_fatal("could not access directory \"%s\": %m", datadir);
+                       report_createsub_fatal("could not access directory \"%s\": %m", datadir);
        }
 
        /*
@@ -462,9 +509,11 @@ check_data_directory(const char *datadir)
        major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
        if (major_version != PG_MAJORVERSION_NUM)
        {
-               pg_log_error("data directory is of wrong version");
-               pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
-                                                       "PG_VERSION", version_str, PG_MAJORVERSION);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "data directory is of wrong version");
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+                                                        "File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
+                                                        "PG_VERSION", version_str, PG_MAJORVERSION);
                exit(1);
        }
 }
@@ -547,14 +596,16 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
                        dbinfo[i].subname = NULL;
                /* Other fields will be filled later */
 
-               pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
-                                        dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
-                                        dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
-                                        dbinfo[i].pubconninfo);
-               pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
-                                        dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
-                                        dbinfo[i].subconninfo,
-                                        dbinfos.two_phase ? "true" : "false");
+               report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                        "publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
+                                                        dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
+                                                        dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
+                                                        dbinfo[i].pubconninfo);
+               report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                        "subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
+                                                        dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
+                                                        dbinfo[i].subconninfo,
+                                                        dbinfos.two_phase ? "true" : "false");
 
                if (num_pubs > 0)
                        pubcell = pubcell->next;
@@ -582,8 +633,9 @@ connect_database(const char *conninfo, bool exit_on_error)
        conn = PQconnectdb(conninfo);
        if (PQstatus(conn) != CONNECTION_OK)
        {
-               pg_log_error("connection to database failed: %s",
-                                        PQerrorMessage(conn));
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "connection to database failed: %s",
+                                                        PQerrorMessage(conn));
                PQfinish(conn);
 
                if (exit_on_error)
@@ -595,8 +647,9 @@ connect_database(const char *conninfo, bool exit_on_error)
        res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
-               pg_log_error("could not clear \"search_path\": %s",
-                                        PQresultErrorMessage(res));
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "could not clear \"search_path\": %s",
+                                                        PQresultErrorMessage(res));
                PQclear(res);
                PQfinish(conn);
 
@@ -635,27 +688,31 @@ get_primary_sysid(const char *conninfo)
        PGresult   *res;
        uint64          sysid;
 
-       pg_log_info("getting system identifier from publisher");
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "getting system identifier from publisher");
 
        conn = connect_database(conninfo, true);
 
        res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
-               pg_log_error("could not get system identifier: %s",
-                                        PQresultErrorMessage(res));
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "could not get system identifier: %s",
+                                                        PQresultErrorMessage(res));
                disconnect_database(conn, true);
        }
        if (PQntuples(res) != 1)
        {
-               pg_log_error("could not get system identifier: got %d rows, expected %d row",
-                                        PQntuples(res), 1);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "could not get system identifier: got %d rows, expected %d row",
+                                                        PQntuples(res), 1);
                disconnect_database(conn, true);
        }
 
        sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
 
-       pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "system identifier is %" PRIu64 " on publisher", sysid);
 
        PQclear(res);
        disconnect_database(conn, false);
@@ -675,15 +732,17 @@ get_standby_sysid(const char *datadir)
        bool            crc_ok;
        uint64          sysid;
 
-       pg_log_info("getting system identifier from subscriber");
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "getting system identifier from subscriber");
 
        cf = get_controlfile(datadir, &crc_ok);
        if (!crc_ok)
-               pg_fatal("control file appears to be corrupt");
+               report_createsub_fatal("control file appears to be corrupt");
 
        sysid = cf->system_identifier;
 
-       pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "system identifier is %" PRIu64 " on subscriber", sysid);
 
        pg_free(cf);
 
@@ -704,11 +763,12 @@ modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
 
        char       *cmd_str;
 
-       pg_log_info("modifying system identifier of subscriber");
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "modifying system identifier of subscriber");
 
        cf = get_controlfile(subscriber_dir, &crc_ok);
        if (!crc_ok)
-               pg_fatal("control file appears to be corrupt");
+               report_createsub_fatal("control file appears to be corrupt");
 
        /*
         * Select a new system identifier.
@@ -721,33 +781,39 @@ modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
        cf->system_identifier |= getpid() & 0xFFF;
 
        if (dry_run)
-               pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
-                                       cf->system_identifier);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "dry-run: would set system identifier to %" PRIu64 " on subscriber",
+                                                        cf->system_identifier);
        else
        {
                update_controlfile(subscriber_dir, cf, true);
-               pg_log_info("system identifier is %" PRIu64 " on subscriber",
-                                       cf->system_identifier);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "system identifier is %" PRIu64 " on subscriber",
+                                                        cf->system_identifier);
        }
 
        if (dry_run)
-               pg_log_info("dry-run: would run pg_resetwal on the subscriber");
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "dry-run: would run pg_resetwal on the subscriber");
        else
-               pg_log_info("running pg_resetwal on the subscriber");
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "running pg_resetwal on the subscriber");
 
        cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
                                           subscriber_dir, DEVNULL);
 
-       pg_log_debug("pg_resetwal command is: %s", cmd_str);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "pg_resetwal command is: %s", cmd_str);
 
        if (!dry_run)
        {
                int                     rc = system(cmd_str);
 
                if (rc == 0)
-                       pg_log_info("successfully reset WAL on the subscriber");
+                       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                                "successfully reset WAL on the subscriber");
                else
-                       pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
+                       report_createsub_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
        }
 
        pg_free(cf);
@@ -771,15 +837,17 @@ generate_object_name(PGconn *conn)
                                 "WHERE datname = pg_catalog.current_database()");
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
-               pg_log_error("could not obtain database OID: %s",
-                                        PQresultErrorMessage(res));
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "could not obtain database OID: %s",
+                                                        PQresultErrorMessage(res));
                disconnect_database(conn, true);
        }
 
        if (PQntuples(res) != 1)
        {
-               pg_log_error("could not obtain database OID: got %d rows, expected %d row",
-                                        PQntuples(res), 1);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "could not obtain database OID: got %d rows, expected %d row",
+                                                        PQntuples(res), 1);
                disconnect_database(conn, true);
        }
 
@@ -819,8 +887,9 @@ find_publication(PGconn *conn, const char *pubname, const char *dbname)
        res = PQexec(conn, str->data);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
-               pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
-                                        pubname, dbname, PQerrorMessage(conn));
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "could not find publication \"%s\" in database \"%s\": %s",
+                                                        pubname, dbname, PQerrorMessage(conn));
                disconnect_database(conn, true);
        }
 
@@ -873,8 +942,9 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
                if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
                {
                        /* Reuse existing publication on publisher. */
-                       pg_log_info("use existing publication \"%s\" in database \"%s\"",
-                                               dbinfo[i].pubname, dbinfo[i].dbname);
+                       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                                "use existing publication \"%s\" in database \"%s\"",
+                                                                dbinfo[i].pubname, dbinfo[i].dbname);
                        /* Don't remove pre-existing publication if an error occurs. */
                        dbinfo[i].made_publication = false;
                }
@@ -912,8 +982,9 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
                        res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
                        if (PQresultStatus(res) != PGRES_TUPLES_OK)
                        {
-                               pg_log_error("could not write an additional WAL record: %s",
-                                                        PQresultErrorMessage(res));
+                               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                        "could not write an additional WAL record: %s",
+                                                                        PQresultErrorMessage(res));
                                disconnect_database(conn, true);
                        }
                        PQclear(res);
@@ -938,8 +1009,9 @@ server_is_in_recovery(PGconn *conn)
 
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
-               pg_log_error("could not obtain recovery progress: %s",
-                                        PQresultErrorMessage(res));
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "could not obtain recovery progress: %s",
+                                                        PQresultErrorMessage(res));
                disconnect_database(conn, true);
        }
 
@@ -971,7 +1043,8 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
        int                     max_prepared_transactions;
        char       *max_slot_wal_keep_size;
 
-       pg_log_info("checking settings on publisher");
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "checking settings on publisher");
 
        conn = connect_database(dbinfo[0].pubconninfo, true);
 
@@ -981,7 +1054,8 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
         */
        if (server_is_in_recovery(conn))
        {
-               pg_log_error("primary server cannot be in recovery");
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "primary server cannot be in recovery");
                disconnect_database(conn, true);
        }
 
@@ -1007,8 +1081,9 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
 
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
-               pg_log_error("could not obtain publisher settings: %s",
-                                        PQresultErrorMessage(res));
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "could not obtain publisher settings: %s",
+                                                        PQresultErrorMessage(res));
                disconnect_database(conn, true);
        }
 
@@ -1022,48 +1097,63 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
 
        PQclear(res);
 
-       pg_log_debug("publisher: wal_level: %s", wal_level);
-       pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
-       pg_log_debug("publisher: current replication slots: %d", cur_repslots);
-       pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
-       pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
-       pg_log_debug("publisher: max_prepared_transactions: %d",
-                                max_prepared_transactions);
-       pg_log_debug("publisher: max_slot_wal_keep_size: %s",
-                                max_slot_wal_keep_size);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "publisher: wal_level: %s", wal_level);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "publisher: max_replication_slots: %d", max_repslots);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "publisher: current replication slots: %d", cur_repslots);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "publisher: max_wal_senders: %d", max_walsenders);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "publisher: current wal senders: %d", cur_walsenders);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "publisher: max_prepared_transactions: %d",
+                                                max_prepared_transactions);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "publisher: max_slot_wal_keep_size: %s",
+                                                max_slot_wal_keep_size);
 
        disconnect_database(conn, false);
 
        if (strcmp(wal_level, "minimal") == 0)
        {
-               pg_log_error("publisher requires \"wal_level\" >= \"replica\"");
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "publisher requires \"wal_level\" >= \"replica\"");
                failed = true;
        }
 
        if (max_repslots - cur_repslots < num_dbs)
        {
-               pg_log_error("publisher requires %d replication slots, but only %d remain",
-                                        num_dbs, max_repslots - cur_repslots);
-               pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
-                                                 "max_replication_slots", cur_repslots + num_dbs);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "publisher requires %d replication slots, but only %d remain",
+                                                        num_dbs, max_repslots - cur_repslots);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+                                                        "Increase the configuration parameter \"%s\" to at least %d.",
+                                                        "max_replication_slots", cur_repslots + num_dbs);
                failed = true;
        }
 
        if (max_walsenders - cur_walsenders < num_dbs)
        {
-               pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
-                                        num_dbs, max_walsenders - cur_walsenders);
-               pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
-                                                 "max_wal_senders", cur_walsenders + num_dbs);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "publisher requires %d WAL sender processes, but only %d remain",
+                                                        num_dbs, max_walsenders - cur_walsenders);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+                                                        "Increase the configuration parameter \"%s\" to at least %d.",
+                                                        "max_wal_senders", cur_walsenders + num_dbs);
                failed = true;
        }
 
        if (max_prepared_transactions != 0 && !dbinfos.two_phase)
        {
-               pg_log_warning("two_phase option will not be enabled for replication slots");
-               pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled.  "
-                                                         "Prepared transactions will be replicated at COMMIT PREPARED.");
-               pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
+               report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+                                                        "two_phase option will not be enabled for replication slots");
+               report_createsub_log(PG_LOG_WARNING, PG_LOG_DETAIL,
+                                                        "Subscriptions will be created with the two_phase option disabled.  "
+                                                        "Prepared transactions will be replicated at COMMIT PREPARED.");
+               report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+                                                        "You can use the command-line option --enable-two-phase to enable two_phase.");
        }
 
        /*
@@ -1073,9 +1163,11 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
         */
        if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
        {
-               pg_log_warning("required WAL could be removed from the publisher");
-               pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
-                                                       "max_slot_wal_keep_size");
+               report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+                                                        "required WAL could be removed from the publisher");
+               report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+                                                        "Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
+                                                        "max_slot_wal_keep_size");
        }
 
        pg_free(wal_level);
@@ -1106,14 +1198,16 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
        int                     max_replorigins;
        int                     max_wprocs;
 
-       pg_log_info("checking settings on subscriber");
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "checking settings on subscriber");
 
        conn = connect_database(dbinfo[0].subconninfo, true);
 
        /* The target server must be a standby */
        if (!server_is_in_recovery(conn))
        {
-               pg_log_error("target server must be a standby");
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "target server must be a standby");
                disconnect_database(conn, true);
        }
 
@@ -1137,8 +1231,9 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
 
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
-               pg_log_error("could not obtain subscriber settings: %s",
-                                        PQresultErrorMessage(res));
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "could not obtain subscriber settings: %s",
+                                                        PQresultErrorMessage(res));
                disconnect_database(conn, true);
        }
 
@@ -1148,12 +1243,16 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
        if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
                primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
 
-       pg_log_debug("subscriber: max_logical_replication_workers: %d",
-                                max_lrworkers);
-       pg_log_debug("subscriber: max_active_replication_origins: %d", max_replorigins);
-       pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "subscriber: max_logical_replication_workers: %d",
+                                                max_lrworkers);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "subscriber: max_active_replication_origins: %d", max_replorigins);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "subscriber: max_worker_processes: %d", max_wprocs);
        if (primary_slot_name)
-               pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
+               report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                        "subscriber: primary_slot_name: %s", primary_slot_name);
 
        PQclear(res);
 
@@ -1161,28 +1260,34 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
 
        if (max_replorigins < num_dbs)
        {
-               pg_log_error("subscriber requires %d active replication origins, but only %d remain",
-                                        num_dbs, max_replorigins);
-               pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
-                                                 "max_active_replication_origins", num_dbs);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "subscriber requires %d active replication origins, but only %d remain",
+                                                        num_dbs, max_replorigins);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+                                                        "Increase the configuration parameter \"%s\" to at least %d.",
+                                                        "max_active_replication_origins", num_dbs);
                failed = true;
        }
 
        if (max_lrworkers < num_dbs)
        {
-               pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
-                                        num_dbs, max_lrworkers);
-               pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
-                                                 "max_logical_replication_workers", num_dbs);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "subscriber requires %d logical replication workers, but only %d remain",
+                                                        num_dbs, max_lrworkers);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+                                                        "Increase the configuration parameter \"%s\" to at least %d.",
+                                                        "max_logical_replication_workers", num_dbs);
                failed = true;
        }
 
        if (max_wprocs < num_dbs + 1)
        {
-               pg_log_error("subscriber requires %d worker processes, but only %d remain",
-                                        num_dbs + 1, max_wprocs);
-               pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
-                                                 "max_worker_processes", num_dbs + 1);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "subscriber requires %d worker processes, but only %d remain",
+                                                        num_dbs + 1, max_wprocs);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+                                                        "Increase the configuration parameter \"%s\" to at least %d.",
+                                                        "max_worker_processes", num_dbs + 1);
                failed = true;
        }
 
@@ -1215,19 +1320,22 @@ drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname
        appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
 
        if (dry_run)
-               pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
-                                       subname, dbname);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "dry-run: would drop subscription \"%s\" in database \"%s\"",
+                                                        subname, dbname);
        else
        {
-               pg_log_info("dropping subscription \"%s\" in database \"%s\"",
-                                       subname, dbname);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "dropping subscription \"%s\" in database \"%s\"",
+                                                        subname, dbname);
 
                res = PQexec(conn, query->data);
 
                if (PQresultStatus(res) != PGRES_COMMAND_OK)
                {
-                       pg_log_error("could not drop subscription \"%s\": %s",
-                                                subname, PQresultErrorMessage(res));
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                "could not drop subscription \"%s\": %s",
+                                                                subname, PQresultErrorMessage(res));
                        disconnect_database(conn, true);
                }
 
@@ -1261,8 +1369,9 @@ check_and_drop_existing_subscriptions(PGconn *conn,
 
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
-               pg_log_error("could not obtain pre-existing subscriptions: %s",
-                                        PQresultErrorMessage(res));
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "could not obtain pre-existing subscriptions: %s",
+                                                        PQresultErrorMessage(res));
                disconnect_database(conn, true);
        }
 
@@ -1373,7 +1482,8 @@ setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const c
                                                  lsn);
        }
 
-       pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "recovery parameters:\n%s", recoveryconfcontents->data);
 
        if (!dry_run)
        {
@@ -1385,10 +1495,10 @@ setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const c
                                 INCLUDED_CONF_FILE);
                fd = fopen(conf_filename, "w");
                if (fd == NULL)
-                       pg_fatal("could not open file \"%s\": %m", conf_filename);
+                       report_createsub_fatal("could not open file \"%s\": %m", conf_filename);
 
                if (fwrite(recoveryconfcontents->data, recoveryconfcontents->len, 1, fd) != 1)
-                       pg_fatal("could not write to file \"%s\": %m", conf_filename);
+                       report_createsub_fatal("could not write to file \"%s\": %m", conf_filename);
 
                fclose(fd);
                recovery_params_set = true;
@@ -1427,9 +1537,11 @@ drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotnam
        }
        else
        {
-               pg_log_warning("could not drop replication slot \"%s\" on primary",
-                                          slotname);
-               pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
+               report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+                                                        "could not drop replication slot \"%s\" on primary",
+                                                        slotname);
+               report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+                                                        "Drop this replication slot soon to avoid retention of WAL files.");
        }
 }
 
@@ -1461,9 +1573,11 @@ drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
                }
                else
                {
-                       pg_log_warning("could not obtain failover replication slot information: %s",
-                                                  PQresultErrorMessage(res));
-                       pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
+                       report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+                                                                "could not obtain failover replication slot information: %s",
+                                                                PQresultErrorMessage(res));
+                       report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+                                                                "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
                }
 
                PQclear(res);
@@ -1471,8 +1585,10 @@ drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
        }
        else
        {
-               pg_log_warning("could not drop failover replication slot");
-               pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
+               report_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+                                                        "could not drop failover replication slot");
+               report_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+                                                        "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
        }
 }
 
@@ -1494,11 +1610,13 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
        Assert(conn != NULL);
 
        if (dry_run)
-               pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
-                                       slot_name, dbinfo->dbname);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
+                                                        slot_name, dbinfo->dbname);
        else
-               pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
-                                       slot_name, dbinfo->dbname);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "creating the replication slot \"%s\" in database \"%s\" on publisher",
+                                                        slot_name, dbinfo->dbname);
 
        slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
 
@@ -1509,16 +1627,18 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
 
        PQfreemem(slot_name_esc);
 
-       pg_log_debug("command is: %s", str->data);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "command is: %s", str->data);
 
        if (!dry_run)
        {
                res = PQexec(conn, str->data);
                if (PQresultStatus(res) != PGRES_TUPLES_OK)
                {
-                       pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
-                                                slot_name, dbinfo->dbname,
-                                                PQresultErrorMessage(res));
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                "could not create replication slot \"%s\" in database \"%s\": %s",
+                                                                slot_name, dbinfo->dbname,
+                                                                PQresultErrorMessage(res));
                        PQclear(res);
                        destroyPQExpBuffer(str);
                        return NULL;
@@ -1547,11 +1667,13 @@ drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
        Assert(conn != NULL);
 
        if (dry_run)
-               pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
-                                       slot_name, dbinfo->dbname);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "dry-run: would drop the replication slot \"%s\" in database \"%s\"",
+                                                        slot_name, dbinfo->dbname);
        else
-               pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
-                                       slot_name, dbinfo->dbname);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "dropping the replication slot \"%s\" in database \"%s\"",
+                                                        slot_name, dbinfo->dbname);
 
        slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
 
@@ -1559,15 +1681,17 @@ drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
 
        PQfreemem(slot_name_esc);
 
-       pg_log_debug("command is: %s", str->data);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "command is: %s", str->data);
 
        if (!dry_run)
        {
                res = PQexec(conn, str->data);
                if (PQresultStatus(res) != PGRES_TUPLES_OK)
                {
-                       pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
-                                                slot_name, dbinfo->dbname, PQresultErrorMessage(res));
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                "could not drop replication slot \"%s\" in database \"%s\": %s",
+                                                                slot_name, dbinfo->dbname, PQresultErrorMessage(res));
                        dbinfo->made_replslot = false;  /* don't try again. */
                }
 
@@ -1587,25 +1711,32 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc)
        {
                if (WIFEXITED(rc))
                {
-                       pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                "pg_ctl failed with exit code %d",
+                                                                WEXITSTATUS(rc));
                }
                else if (WIFSIGNALED(rc))
                {
 #if defined(WIN32)
-                       pg_log_error("pg_ctl was terminated by exception 0x%X",
-                                                WTERMSIG(rc));
-                       pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                "pg_ctl was terminated by exception 0x%X",
+                                                                WTERMSIG(rc));
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+                                                                "See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
 #else
-                       pg_log_error("pg_ctl was terminated by signal %d: %s",
-                                                WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                "pg_ctl was terminated by signal %d: %s",
+                                                                WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
 #endif
                }
                else
                {
-                       pg_log_error("pg_ctl exited with unrecognized status %d", rc);
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                "pg_ctl exited with unrecognized status %d", rc);
                }
 
-               pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+                                                        "The failed command was: %s", pg_ctl_cmd);
                exit(1);
        }
 }
@@ -1650,12 +1781,14 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
        if (restrict_logical_worker)
                appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
 
-       pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "pg_ctl command is: %s", pg_ctl_cmd->data);
        rc = system(pg_ctl_cmd->data);
        pg_ctl_status(pg_ctl_cmd->data, rc);
        standby_running = true;
        destroyPQExpBuffer(pg_ctl_cmd);
-       pg_log_info("server was started");
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "server was started");
 }
 
 static void
@@ -1666,11 +1799,13 @@ stop_standby_server(const char *datadir)
 
        pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
                                                  datadir);
-       pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "pg_ctl command is: %s", pg_ctl_cmd);
        rc = system(pg_ctl_cmd);
        pg_ctl_status(pg_ctl_cmd, rc);
        standby_running = false;
-       pg_log_info("server was stopped");
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "server was stopped");
 }
 
 /*
@@ -1689,7 +1824,8 @@ wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions
        bool            ready = false;
        int                     timer = 0;
 
-       pg_log_info("waiting for the target server to reach the consistent state");
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "waiting for the target server to reach the consistent state");
 
        conn = connect_database(conninfo, true);
 
@@ -1707,7 +1843,8 @@ wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions
                if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
                {
                        stop_standby_server(subscriber_dir);
-                       pg_log_error("recovery timed out");
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                "recovery timed out");
                        disconnect_database(conn, true);
                }
 
@@ -1719,10 +1856,12 @@ wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions
        disconnect_database(conn, false);
 
        if (!ready)
-               pg_fatal("server did not end recovery");
+               report_createsub_fatal("server did not end recovery");
 
-       pg_log_info("target server reached the consistent state");
-       pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "target server reached the consistent state");
+       report_createsub_log(PG_LOG_INFO, PG_LOG_HINT,
+                                                "If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
 }
 
 /*
@@ -1749,8 +1888,9 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
        res = PQexec(conn, str->data);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
-               pg_log_error("could not obtain publication information: %s",
-                                        PQresultErrorMessage(res));
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "could not obtain publication information: %s",
+                                                        PQresultErrorMessage(res));
                disconnect_database(conn, true);
        }
 
@@ -1763,8 +1903,10 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
                 * pg_createsubscriber_ prefix followed by the exact database oid and
                 * a random number.
                 */
-               pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
-               pg_log_error_hint("Consider renaming this publication before continuing.");
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "publication \"%s\" already exists", dbinfo->pubname);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+                                                        "Consider renaming this publication before continuing.");
                disconnect_database(conn, true);
        }
 
@@ -1772,24 +1914,28 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
        resetPQExpBuffer(str);
 
        if (dry_run)
-               pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
-                                       dbinfo->pubname, dbinfo->dbname);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "dry-run: would create publication \"%s\" in database \"%s\"",
+                                                        dbinfo->pubname, dbinfo->dbname);
        else
-               pg_log_info("creating publication \"%s\" in database \"%s\"",
-                                       dbinfo->pubname, dbinfo->dbname);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "creating publication \"%s\" in database \"%s\"",
+                                                        dbinfo->pubname, dbinfo->dbname);
 
        appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
                                          ipubname_esc);
 
-       pg_log_debug("command is: %s", str->data);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "command is: %s", str->data);
 
        if (!dry_run)
        {
                res = PQexec(conn, str->data);
                if (PQresultStatus(res) != PGRES_COMMAND_OK)
                {
-                       pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
-                                                dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                "could not create publication \"%s\" in database \"%s\": %s",
+                                                                dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
                        disconnect_database(conn, true);
                }
                PQclear(res);
@@ -1819,25 +1965,29 @@ drop_publication(PGconn *conn, const char *pubname, const char *dbname,
        pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
 
        if (dry_run)
-               pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
-                                       pubname, dbname);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "dry-run: would drop publication \"%s\" in database \"%s\"",
+                                                        pubname, dbname);
        else
-               pg_log_info("dropping publication \"%s\" in database \"%s\"",
-                                       pubname, dbname);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "dropping publication \"%s\" in database \"%s\"",
+                                                        pubname, dbname);
 
        appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
 
        PQfreemem(pubname_esc);
 
-       pg_log_debug("command is: %s", str->data);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "command is: %s", str->data);
 
        if (!dry_run)
        {
                res = PQexec(conn, str->data);
                if (PQresultStatus(res) != PGRES_COMMAND_OK)
                {
-                       pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
-                                                pubname, dbname, PQresultErrorMessage(res));
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                "could not drop publication \"%s\" in database \"%s\": %s",
+                                                                pubname, dbname, PQresultErrorMessage(res));
                        *made_publication = false;      /* don't try again. */
 
                        /*
@@ -1872,15 +2022,17 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
 
        if (drop_all_pubs)
        {
-               pg_log_info("dropping all existing publications in database \"%s\"",
-                                       dbinfo->dbname);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "dropping all existing publications in database \"%s\"",
+                                                        dbinfo->dbname);
 
                /* Fetch all publication names */
                res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
                if (PQresultStatus(res) != PGRES_TUPLES_OK)
                {
-                       pg_log_error("could not obtain publication information: %s",
-                                                PQresultErrorMessage(res));
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                "could not obtain publication information: %s",
+                                                                PQresultErrorMessage(res));
                        PQclear(res);
                        disconnect_database(conn, true);
                }
@@ -1903,11 +2055,13 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
                else
                {
                        if (dry_run)
-                               pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
-                                                       dbinfo->pubname, dbinfo->dbname);
+                               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                                        "dry-run: would preserve existing publication \"%s\" in database \"%s\"",
+                                                                        dbinfo->pubname, dbinfo->dbname);
                        else
-                               pg_log_info("preserve existing publication \"%s\" in database \"%s\"",
-                                                       dbinfo->pubname, dbinfo->dbname);
+                               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                                        "preserve existing publication \"%s\" in database \"%s\"",
+                                                                        dbinfo->pubname, dbinfo->dbname);
                }
        }
 }
@@ -1941,11 +2095,13 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
        replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
 
        if (dry_run)
-               pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
-                                       dbinfo->subname, dbinfo->dbname);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "dry-run: would create subscription \"%s\" in database \"%s\"",
+                                                        dbinfo->subname, dbinfo->dbname);
        else
-               pg_log_info("creating subscription \"%s\" in database \"%s\"",
-                                       dbinfo->subname, dbinfo->dbname);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "creating subscription \"%s\" in database \"%s\"",
+                                                        dbinfo->subname, dbinfo->dbname);
 
        appendPQExpBuffer(str,
                                          "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
@@ -1959,15 +2115,17 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
        PQfreemem(pubconninfo_esc);
        PQfreemem(replslotname_esc);
 
-       pg_log_debug("command is: %s", str->data);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "command is: %s", str->data);
 
        if (!dry_run)
        {
                res = PQexec(conn, str->data);
                if (PQresultStatus(res) != PGRES_COMMAND_OK)
                {
-                       pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
-                                                dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                "could not create subscription \"%s\" in database \"%s\": %s",
+                                                                dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
                        disconnect_database(conn, true);
                }
                PQclear(res);
@@ -2011,15 +2169,17 @@ set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, cons
        res = PQexec(conn, str->data);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
-               pg_log_error("could not obtain subscription OID: %s",
-                                        PQresultErrorMessage(res));
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "could not obtain subscription OID: %s",
+                                                        PQresultErrorMessage(res));
                disconnect_database(conn, true);
        }
 
        if (PQntuples(res) != 1 && !dry_run)
        {
-               pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
-                                        PQntuples(res), 1);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "could not obtain subscription OID: got %d rows, expected %d row",
+                                                        PQntuples(res), 1);
                disconnect_database(conn, true);
        }
 
@@ -2043,26 +2203,30 @@ set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, cons
        originname = psprintf("pg_%u", suboid);
 
        if (dry_run)
-               pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
-                                       originname, lsnstr, dbinfo->dbname);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
+                                                        originname, lsnstr, dbinfo->dbname);
        else
-               pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
-                                       originname, lsnstr, dbinfo->dbname);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
+                                                        originname, lsnstr, dbinfo->dbname);
 
        resetPQExpBuffer(str);
        appendPQExpBuffer(str,
                                          "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
                                          originname, lsnstr);
 
-       pg_log_debug("command is: %s", str->data);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "command is: %s", str->data);
 
        if (!dry_run)
        {
                res = PQexec(conn, str->data);
                if (PQresultStatus(res) != PGRES_TUPLES_OK)
                {
-                       pg_log_error("could not set replication progress for subscription \"%s\": %s",
-                                                dbinfo->subname, PQresultErrorMessage(res));
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                "could not set replication progress for subscription \"%s\": %s",
+                                                                dbinfo->subname, PQresultErrorMessage(res));
                        disconnect_database(conn, true);
                }
                PQclear(res);
@@ -2093,23 +2257,27 @@ enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
        subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
 
        if (dry_run)
-               pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
-                                       dbinfo->subname, dbinfo->dbname);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "dry-run: would enable subscription \"%s\" in database \"%s\"",
+                                                        dbinfo->subname, dbinfo->dbname);
        else
-               pg_log_info("enabling subscription \"%s\" in database \"%s\"",
-                                       dbinfo->subname, dbinfo->dbname);
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "enabling subscription \"%s\" in database \"%s\"",
+                                                        dbinfo->subname, dbinfo->dbname);
 
        appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
 
-       pg_log_debug("command is: %s", str->data);
+       report_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+                                                "command is: %s", str->data);
 
        if (!dry_run)
        {
                res = PQexec(conn, str->data);
                if (PQresultStatus(res) != PGRES_COMMAND_OK)
                {
-                       pg_log_error("could not enable subscription \"%s\": %s",
-                                                dbinfo->subname, PQresultErrorMessage(res));
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                "could not enable subscription \"%s\": %s",
+                                                                dbinfo->subname, PQresultErrorMessage(res));
                        disconnect_database(conn, true);
                }
 
@@ -2154,7 +2322,9 @@ get_publisher_databases(struct CreateSubscriberOptions *opt,
        res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
-               pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "could not obtain a list of databases: %s",
+                                                        PQresultErrorMessage(res));
                PQclear(res);
                disconnect_database(conn, true);
        }
@@ -2258,9 +2428,11 @@ main(int argc, char **argv)
 #ifndef WIN32
        if (geteuid() == 0)
        {
-               pg_log_error("cannot be executed by \"root\"");
-               pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
-                                                 progname);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "cannot be executed by \"root\"");
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+                                                        "You must run %s as the PostgreSQL superuser.",
+                                                        progname);
                exit(1);
        }
 #endif
@@ -2282,7 +2454,7 @@ main(int argc, char **argv)
                                        num_dbs++;
                                }
                                else
-                                       pg_fatal("database \"%s\" specified more than once for -d/--database", optarg);
+                                       report_createsub_fatal("database \"%s\" specified more than once for -d/--database", optarg);
                                break;
                        case 'D':
                                subscriber_dir = pg_strdup(optarg);
@@ -2323,7 +2495,7 @@ main(int argc, char **argv)
                                        num_pubs++;
                                }
                                else
-                                       pg_fatal("publication \"%s\" specified more than once for --publication", optarg);
+                                       report_createsub_fatal("publication \"%s\" specified more than once for --publication", optarg);
                                break;
                        case 3:
                                if (!simple_string_list_member(&opt.replslot_names, optarg))
@@ -2332,7 +2504,7 @@ main(int argc, char **argv)
                                        num_replslots++;
                                }
                                else
-                                       pg_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
+                                       report_createsub_fatal("replication slot \"%s\" specified more than once for --replication-slot", optarg);
                                break;
                        case 4:
                                if (!simple_string_list_member(&opt.sub_names, optarg))
@@ -2341,17 +2513,19 @@ main(int argc, char **argv)
                                        num_subs++;
                                }
                                else
-                                       pg_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
+                                       report_createsub_fatal("subscription \"%s\" specified more than once for --subscription", optarg);
                                break;
                        case 5:
                                if (!simple_string_list_member(&opt.objecttypes_to_clean, optarg))
                                        simple_string_list_append(&opt.objecttypes_to_clean, optarg);
                                else
-                                       pg_fatal("object type \"%s\" specified more than once for --clean", optarg);
+                                       report_createsub_fatal("object type \"%s\" specified more than once for --clean", optarg);
                                break;
                        default:
                                /* getopt_long already emitted a complaint */
-                               pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+                               report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+                                                                        "Try \"%s --help\" for more information.",
+                                                                        progname);
                                exit(1);
                }
        }
@@ -2372,9 +2546,12 @@ main(int argc, char **argv)
 
                if (bad_switch)
                {
-                       pg_log_error("options %s and %s cannot be used together",
-                                                bad_switch, "-a/--all");
-                       pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                "options %s and %s cannot be used together",
+                                                                bad_switch, "-a/--all");
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+                                                                "Try \"%s --help\" for more information.",
+                                                                progname);
                        exit(1);
                }
        }
@@ -2382,17 +2559,21 @@ main(int argc, char **argv)
        /* Any non-option arguments? */
        if (optind < argc)
        {
-               pg_log_error("too many command-line arguments (first is \"%s\")",
-                                        argv[optind]);
-               pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "too many command-line arguments (first is \"%s\")",
+                                                        argv[optind]);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+                                                        "Try \"%s --help\" for more information.", progname);
                exit(1);
        }
 
        /* Required arguments */
        if (subscriber_dir == NULL)
        {
-               pg_log_error("no subscriber data directory specified");
-               pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "no subscriber data directory specified");
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+                                                        "Try \"%s --help\" for more information.", progname);
                exit(1);
        }
 
@@ -2402,7 +2583,7 @@ main(int argc, char **argv)
                char            cwd[MAXPGPATH];
 
                if (!getcwd(cwd, MAXPGPATH))
-                       pg_fatal("could not determine current directory");
+                       report_createsub_fatal("could not determine current directory");
                opt.socket_dir = pg_strdup(cwd);
                canonicalize_path(opt.socket_dir);
        }
@@ -2419,22 +2600,27 @@ main(int argc, char **argv)
                 * identical entries for physical and logical replication. If there is
                 * not, we would fail anyway.
                 */
-               pg_log_error("no publisher connection string specified");
-               pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "no publisher connection string specified");
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+                                                        "Try \"%s --help\" for more information.", progname);
                exit(1);
        }
 
        if (dry_run)
-               pg_log_info("Executing in dry-run mode.\n"
-                                       "The target directory will not be modified.");
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "Executing in dry-run mode.\n"
+                                                        "The target directory will not be modified.");
 
-       pg_log_info("validating publisher connection string");
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "validating publisher connection string");
        pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
                                                                                  &dbname_conninfo);
        if (pub_base_conninfo == NULL)
                exit(1);
 
-       pg_log_info("validating subscriber connection string");
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "validating subscriber connection string");
        sub_base_conninfo = get_sub_conninfo(&opt);
 
        /*
@@ -2451,7 +2637,8 @@ main(int argc, char **argv)
 
        if (opt.database_names.head == NULL)
        {
-               pg_log_info("no database was specified");
+               report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                        "no database was specified");
 
                /*
                 * Try to obtain the dbname from the publisher conninfo. If dbname
@@ -2462,14 +2649,17 @@ main(int argc, char **argv)
                        simple_string_list_append(&opt.database_names, dbname_conninfo);
                        num_dbs++;
 
-                       pg_log_info("database name \"%s\" was extracted from the publisher connection string",
-                                               dbname_conninfo);
+                       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                                "database name \"%s\" was extracted from the publisher connection string",
+                                                                dbname_conninfo);
                }
                else
                {
-                       pg_log_error("no database name specified");
-                       pg_log_error_hint("Try \"%s --help\" for more information.",
-                                                         progname);
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                "no database name specified");
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+                                                                "Try \"%s --help\" for more information.",
+                                                                progname);
                        exit(1);
                }
        }
@@ -2477,23 +2667,29 @@ main(int argc, char **argv)
        /* Number of object names must match number of databases */
        if (num_pubs > 0 && num_pubs != num_dbs)
        {
-               pg_log_error("wrong number of publication names specified");
-               pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
-                                                       num_pubs, num_dbs);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "wrong number of publication names specified");
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+                                                        "The number of specified publication names (%d) must match the number of specified database names (%d).",
+                                                        num_pubs, num_dbs);
                exit(1);
        }
        if (num_subs > 0 && num_subs != num_dbs)
        {
-               pg_log_error("wrong number of subscription names specified");
-               pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
-                                                       num_subs, num_dbs);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "wrong number of subscription names specified");
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+                                                        "The number of specified subscription names (%d) must match the number of specified database names (%d).",
+                                                        num_subs, num_dbs);
                exit(1);
        }
        if (num_replslots > 0 && num_replslots != num_dbs)
        {
-               pg_log_error("wrong number of replication slot names specified");
-               pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
-                                                       num_replslots, num_dbs);
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "wrong number of replication slot names specified");
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+                                                        "The number of specified replication slot names (%d) must match the number of specified database names (%d).",
+                                                        num_replslots, num_dbs);
                exit(1);
        }
 
@@ -2504,9 +2700,11 @@ main(int argc, char **argv)
                        dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
                else
                {
-                       pg_log_error("invalid object type \"%s\" specified for %s",
-                                                cell->val, "--clean");
-                       pg_log_error_hint("The valid value is: \"%s\"", "publications");
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                                "invalid object type \"%s\" specified for %s",
+                                                                cell->val, "--clean");
+                       report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+                                                                "The valid value is: \"%s\"", "publications");
                        exit(1);
                }
        }
@@ -2537,7 +2735,7 @@ main(int argc, char **argv)
        pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo);
        sub_sysid = get_standby_sysid(subscriber_dir);
        if (pub_sysid != sub_sysid)
-               pg_fatal("subscriber data directory is not a copy of the source database cluster");
+               report_createsub_fatal("subscriber data directory is not a copy of the source database cluster");
 
        /* Subscriber PID file */
        snprintf(pidfile, MAXPGPATH, "%s/postmaster.pid", subscriber_dir);
@@ -2550,8 +2748,10 @@ main(int argc, char **argv)
         */
        if (stat(pidfile, &statbuf) == 0)
        {
-               pg_log_error("standby server is running");
-               pg_log_error_hint("Stop the standby server and try again.");
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+                                                        "standby server is running");
+               report_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+                                                        "Stop the standby server and try again.");
                exit(1);
        }
 
@@ -2560,7 +2760,8 @@ main(int argc, char **argv)
         * by command-line options). The goal is to avoid connections during the
         * transformation steps.
         */
-       pg_log_info("starting the standby server with command-line options");
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "starting the standby server with command-line options");
        start_standby_server(&opt, true, false);
 
        /* Check if the standby server is ready for logical replication */
@@ -2576,7 +2777,8 @@ main(int argc, char **argv)
         * guarantees it) *before* creating the replication slots in
         * setup_publisher().
         */
-       pg_log_info("stopping the subscriber");
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "stopping the subscriber");
        stop_standby_server(subscriber_dir);
 
        /* Create the required objects for each database on publisher */
@@ -2590,7 +2792,8 @@ main(int argc, char **argv)
         * until accepting connections. We don't want to start logical replication
         * during setup.
         */
-       pg_log_info("starting the subscriber");
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "starting the subscriber");
        start_standby_server(&opt, true, true);
 
        /* Waiting the subscriber to be promoted */
@@ -2611,7 +2814,8 @@ main(int argc, char **argv)
        drop_failover_replication_slots(dbinfos.dbinfo);
 
        /* Stop the subscriber */
-       pg_log_info("stopping the subscriber");
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "stopping the subscriber");
        stop_standby_server(subscriber_dir);
 
        /* Change system identifier from subscriber */
@@ -2619,7 +2823,8 @@ main(int argc, char **argv)
 
        success = true;
 
-       pg_log_info("Done!");
+       report_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+                                                "Done!");
 
        return 0;
 }