/* The usual case */
break;
+ case WALRCV_CONNECTING:
case WALRCV_WAITING:
case WALRCV_STREAMING:
case WALRCV_RESTARTING:
}
/* Advertise our PID so that the startup process can kill us */
walrcv->pid = MyProcPid;
- walrcv->walRcvState = WALRCV_STREAMING;
+ walrcv->walRcvState = WALRCV_CONNECTING;
/* Fetch information required to start streaming */
walrcv->ready_to_display = false;
LSN_FORMAT_ARGS(startpoint), startpointTLI));
first_stream = false;
+ /*
+ * Switch to STREAMING after a successful connection if current
+ * state is CONNECTING. This switch happens after an initial
+ * startup, or after a restart as determined by
+ * WalRcvWaitForStartPosition().
+ */
+ SpinLockAcquire(&walrcv->mutex);
+ if (walrcv->walRcvState == WALRCV_CONNECTING)
+ walrcv->walRcvState = WALRCV_STREAMING;
+ SpinLockRelease(&walrcv->mutex);
+
/* Initialize LogstreamResult and buffers for processing messages */
LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
initStringInfo(&reply_message);
SpinLockAcquire(&walrcv->mutex);
state = walrcv->walRcvState;
- if (state != WALRCV_STREAMING)
+ if (state != WALRCV_STREAMING && state != WALRCV_CONNECTING)
{
SpinLockRelease(&walrcv->mutex);
if (state == WALRCV_STOPPING)
*/
*startpoint = walrcv->receiveStart;
*startpointTLI = walrcv->receiveStartTLI;
- walrcv->walRcvState = WALRCV_STREAMING;
+ walrcv->walRcvState = WALRCV_CONNECTING;
SpinLockRelease(&walrcv->mutex);
break;
}
/* Mark ourselves inactive in shared memory */
SpinLockAcquire(&walrcv->mutex);
Assert(walrcv->walRcvState == WALRCV_STREAMING ||
+ walrcv->walRcvState == WALRCV_CONNECTING ||
walrcv->walRcvState == WALRCV_RESTARTING ||
walrcv->walRcvState == WALRCV_STARTING ||
walrcv->walRcvState == WALRCV_WAITING ||
return "stopped";
case WALRCV_STARTING:
return "starting";
+ case WALRCV_CONNECTING:
+ return "connecting";
case WALRCV_STREAMING:
return "streaming";
case WALRCV_WAITING:
WALRCV_STOPPED, /* stopped and mustn't start up again */
WALRCV_STARTING, /* launched, but the process hasn't
* initialized yet */
+ WALRCV_CONNECTING, /* connecting to upstream server */
WALRCV_STREAMING, /* walreceiver is streaming */
WALRCV_WAITING, /* stopped streaming, waiting for orders */
WALRCV_RESTARTING, /* asked to restart streaming */