* If the current transaction has executed any LISTEN/UNLISTEN actions,
* PreCommit_Notify() prepares to commit those. For LISTEN, it
* pre-allocates entries in both the per-backend localChannelTable and the
- * shared globalChannelTable (with listening=false so that these entries
- * are no-ops for the moment). It also records the final per-channel
- * intent in pendingListenActions, so post-commit/abort processing can
- * apply that in a single step. Since all these allocations happen before
- * committing to clog, we can safely abort the transaction on failure.
+ * shared globalChannelTable, marking new shared entries removeOnAbort.
+ * It also records the final per-channel intent in pendingListenActions,
+ * so post-commit/abort processing can apply that in a single step.
+ * Since all these allocations happen before committing to clog, we can
+ * safely abort the transaction on failure.
*
* After commit, AtCommit_Notify() runs through pendingListenActions and
- * updates the backend's per-channel listening flags to activate or
- * deactivate listening. This happens before sending signals.
+ * applies the final per-channel listen/unlisten state. This happens
+ * before sending signals.
*
* SignalBackends() consults the shared global channel table to identify
* listeners for the channels that the current transaction sent
*
* This hash table maps (database OID, channel name) keys to arrays of
* ProcNumbers representing the backends listening or about to listen
- * on each channel. The "listening" flags allow us to create hash table
+ * on each channel. The removeOnAbort flags allow us to create hash table
* entries pre-commit and not have to assume that creating them post-commit
* will succeed.
*/
typedef struct ListenerEntry
{
ProcNumber procNo; /* listener's ProcNumber */
- bool listening; /* true if committed listener */
+ bool removeOnAbort; /* remove entry if current xact aborts */
} ListenerEntry;
typedef struct GlobalChannelEntry
*
* Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
* an entry in localChannelTable, and pre-allocating an entry in the shared
- * globalChannelTable with listening=false. The listening flag will be set
- * to true in AtCommit_Notify. If we abort later, unwanted table entries
- * will be removed.
+ * globalChannelTable with removeOnAbort set. AtCommit_Notify will clear
+ * removeOnAbort; abort processing will remove entries still marked so.
*/
static void
PrepareTableEntriesForListen(const char *channel)
*/
(void) hash_search(localChannelTable, channel, HASH_ENTER, NULL);
- /* Pre-allocate entry in shared globalChannelTable with listening=false */
+ /* Pre-allocate entry in shared globalChannelTable */
GlobalChannelKeyInit(&key, MyDatabaseId, channel);
entry = dshash_find_or_insert(globalChannelTable, &key, &found);
{
if (listeners[i].procNo == MyProcNumber)
{
- /* Already have an entry; listening flag stays as-is until commit */
+ /* Already have an entry; leave removeOnAbort as-is */
dshash_release_lock(globalChannelTable, entry);
return;
}
}
listeners[entry->numListeners].procNo = MyProcNumber;
- listeners[entry->numListeners].listening = false; /* staged, not yet
- * committed */
+ listeners[entry->numListeners].removeOnAbort = true;
entry->numListeners++;
dshash_release_lock(globalChannelTable, entry);
if (pending->action == PENDING_LISTEN)
{
/*
- * LISTEN being committed: set listening=true.
+ * LISTEN being committed: entry is now permanent.
* localChannelTable entry was created during
* PreCommit and should be kept.
*/
- listeners[i].listening = true;
+ listeners[i].removeOnAbort = false;
removeLocal = false;
}
else
* pendingListenActions entries, so it's pretty hard to
* test.
*/
- if (!listeners[i].listening)
+ if (listeners[i].removeOnAbort)
{
/*
* Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
- * and we weren't listening before, so remove
- * pre-allocated entries from both tables.
+ * so remove pre-allocated entries from both tables.
*/
RemoveListenerFromChannel(&entry, listeners, i);
}
else
{
/*
- * We're aborting, but the previous state was that
- * we're listening, so keep localChannelTable entry.
+ * Entry predates this transaction, so keep the
+ * localChannelTable entry.
*/
removeLocal = false;
}
listeners = (ListenerEntry *) dsa_get_address(globalChannelDSA,
entry->listenersArray);
- /* Identify listeners that now need waking, add them to arrays */
+ /*
+ * Identify listeners that now need waking, add them to arrays.
+ *
+ * Note that we signal listeners regardless of the state of their
+ * removeOnAbort flags. Hence a new listener that reached PreCommit,
+ * but then failed before AtCommit_Notify, can receive a signal even
+ * though it was never really listening. This is okay because it will
+ * not do anything in response to that signal. If we did not do it
+ * like this then a new listener might miss some messages due to the
+ * direct-advance logic below.
+ */
for (int j = 0; j < entry->numListeners; j++)
{
- ProcNumber i;
+ ProcNumber i = listeners[j].procNo;
int32 pid;
QueuePosition pos;
- if (!listeners[j].listening)
- continue; /* ignore not-yet-committed listeners */
-
- i = listeners[j].procNo;
-
if (QUEUE_BACKEND_WAKEUP_PENDING(i))
continue; /* already signaled, no need to repeat */