]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Rename the logical replication global "wrconn"
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 12 May 2021 23:13:54 +0000 (19:13 -0400)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 12 May 2021 23:13:54 +0000 (19:13 -0400)
The worker.c global wrconn is only meant to be used by logical apply/
tablesync workers, but there are other variables with the same name. To
reduce future confusion rename the global from "wrconn" to
"LogRepWorkerWalRcvConn".

While this is just cosmetic, it seems better to backpatch it all the way
back to 10 where this code appeared, to avoid future backpatching
issues.

Author: Peter Smith <smithpb2250@gmail.com>
Discussion: https://postgr.es/m/CAHut+Pu7Jv9L2BOEx_Z0UtJxfDevQSAUW2mJqWU+CtmDrEZVAg@mail.gmail.com

src/backend/replication/logical/launcher.c
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/include/replication/worker_internal.h

index ada16adb67b1fc50de5cc0e0a07e7ac1316ef9f6..7484f6cc0b54263ed9770fbf70cd8438fcb9632d 100644 (file)
@@ -727,8 +727,8 @@ static void
 logicalrep_worker_onexit(int code, Datum arg)
 {
        /* Disconnect gracefully from the remote side. */
-       if (wrconn)
-               walrcv_disconnect(wrconn);
+       if (LogRepWorkerWalRcvConn)
+               walrcv_disconnect(LogRepWorkerWalRcvConn);
 
        logicalrep_worker_detach();
 
index acc6498567d07c3e93d0adf5039a10c693e38f64..14f69a6338e42369dcf60c6b9540ad87a9a0226b 100644 (file)
@@ -303,7 +303,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
                                                                   MyLogicalRepWorker->relstate,
                                                                   MyLogicalRepWorker->relstate_lsn);
 
-               walrcv_endstreaming(wrconn, &tli);
+               walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
                finish_sync_worker();
        }
        else
@@ -600,7 +600,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
                for (;;)
                {
                        /* Try read the data. */
-                       len = walrcv_receive(wrconn, &buf, &fd);
+                       len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
                        CHECK_FOR_INTERRUPTS();
 
@@ -678,7 +678,8 @@ fetch_remote_table_info(char *nspname, char *relname,
                                         "   AND c.relkind = 'r'",
                                         quote_literal_cstr(nspname),
                                         quote_literal_cstr(relname));
-       res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+       res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+                                         lengthof(tableRow), tableRow);
 
        if (res->status != WALRCV_OK_TUPLES)
                ereport(ERROR,
@@ -714,7 +715,8 @@ fetch_remote_table_info(char *nspname, char *relname,
                                         "   AND a.attrelid = %u"
                                         " ORDER BY a.attnum",
                                         lrel->remoteid, lrel->remoteid);
-       res = walrcv_exec(wrconn, cmd.data, 4, attrRow);
+       res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+                                         lengthof(attrRow), attrRow);
 
        if (res->status != WALRCV_OK_TUPLES)
                ereport(ERROR,
@@ -784,7 +786,7 @@ copy_table(Relation rel)
        initStringInfo(&cmd);
        appendStringInfo(&cmd, "COPY %s TO STDOUT",
                                         quote_qualified_identifier(lrel.nspname, lrel.relname));
-       res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+       res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
        pfree(cmd.data);
        if (res->status != WALRCV_OK_COPY_OUT)
                ereport(ERROR,
@@ -850,8 +852,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
         * application_name, so that it is different from the main apply worker,
         * so that synchronous replication can distinguish them.
         */
-       wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
-       if (wrconn == NULL)
+       LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+                                                                                       slotname, &err);
+       if (LogRepWorkerWalRcvConn == NULL)
                ereport(ERROR,
                                (errmsg("could not connect to the publisher: %s", err)));
 
@@ -896,7 +899,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                                 * inside the transaction so that we can use the snapshot made
                                 * by the slot to get existing data.
                                 */
-                               res = walrcv_exec(wrconn,
+                               res = walrcv_exec(LogRepWorkerWalRcvConn,
                                                                  "BEGIN READ ONLY ISOLATION LEVEL "
                                                                  "REPEATABLE READ", 0, NULL);
                                if (res->status != WALRCV_OK_COMMAND)
@@ -913,14 +916,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                                 * that is consistent with the lsn used by the slot to start
                                 * decoding.
                                 */
-                               walrcv_create_slot(wrconn, slotname, true,
+                               walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, true,
                                                                   CRS_USE_SNAPSHOT, origin_startpos);
 
                                PushActiveSnapshot(GetTransactionSnapshot());
                                copy_table(rel);
                                PopActiveSnapshot();
 
-                               res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+                               res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
                                if (res->status != WALRCV_OK_COMMAND)
                                        ereport(ERROR,
                                                        (errmsg("table copy could not finish transaction on publisher"),
index 55eecc30359a8860411e9003fbb730378971d276..bd27ef3b851b8cff96a23e897181f56d722916b0 100644 (file)
@@ -112,7 +112,7 @@ typedef struct SlotErrCallbackArg
 static MemoryContext ApplyMessageContext = NULL;
 MemoryContext ApplyContext = NULL;
 
-WalReceiverConn *wrconn = NULL;
+WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 
 Subscription *MySubscription = NULL;
 bool           MySubscriptionValid = false;
@@ -1172,7 +1172,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
                MemoryContextSwitchTo(ApplyMessageContext);
 
-               len = walrcv_receive(wrconn, &buf, &fd);
+               len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
                if (len != 0)
                {
@@ -1252,7 +1252,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                                        MemoryContextReset(ApplyMessageContext);
                                }
 
-                               len = walrcv_receive(wrconn, &buf, &fd);
+                               len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
                        }
                }
 
@@ -1282,7 +1282,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                {
                        TimeLineID      tli;
 
-                       walrcv_endstreaming(wrconn, &tli);
+                       walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
                        break;
                }
 
@@ -1449,7 +1449,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
                 (uint32) (flushpos >> 32), (uint32) flushpos
                );
 
-       walrcv_send(wrconn, reply_message->data, reply_message->len);
+       walrcv_send(LogRepWorkerWalRcvConn,
+                               reply_message->data, reply_message->len);
 
        if (recvpos > last_recvpos)
                last_recvpos = recvpos;
@@ -1761,9 +1762,9 @@ ApplyWorkerMain(Datum main_arg)
                origin_startpos = replorigin_session_get_progress(false);
                CommitTransactionCommand();
 
-               wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
-                                                               &err);
-               if (wrconn == NULL)
+               LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+                                                                                               MySubscription->name, &err);
+               if (LogRepWorkerWalRcvConn == NULL)
                        ereport(ERROR,
                                        (errmsg("could not connect to the publisher: %s", err)));
 
@@ -1771,9 +1772,8 @@ ApplyWorkerMain(Datum main_arg)
                 * We don't really use the output identify_system for anything but it
                 * does some initializations on the upstream so let's still call it.
                 */
-               (void) walrcv_identify_system(wrconn, &startpointTLI,
+               (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI,
                                                                          &server_version);
-
        }
 
        /*
@@ -1792,7 +1792,7 @@ ApplyWorkerMain(Datum main_arg)
        options.proto.logical.publication_names = MySubscription->publications;
 
        /* Start normal logical streaming replication. */
-       walrcv_startstreaming(wrconn, &options);
+       walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
 
        /* Run the main loop. */
        LogicalRepApplyLoop(origin_startpos);
index 1ce3b6b05873dd140b5700796cc42031cb0ad990..5ea7fd2bd6a61c72ea4906fb7f6b6e8d71dcf658 100644 (file)
@@ -60,7 +60,7 @@ typedef struct LogicalRepWorker
 extern MemoryContext ApplyContext;
 
 /* libpqreceiver connection */
-extern struct WalReceiverConn *wrconn;
+extern struct WalReceiverConn *LogRepWorkerWalRcvConn;
 
 /* Worker and subscription objects. */
 extern Subscription *MySubscription;