if (curstate->roident != node)
continue;
- else if (curstate->acquired_by != 0 && acquired_by == 0)
+ if (acquired_by == 0)
{
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("replication origin with ID %d is already active for PID %d",
- curstate->roident, curstate->acquired_by)));
+ /* With acquired_by == 0, we need the origin to be free */
+ if (curstate->acquired_by != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("replication origin with ID %d is already active for PID %d",
+ curstate->roident, curstate->acquired_by)));
+ }
+ else if (curstate->refcount > 0)
+ {
+ /*
+ * The origin is in use, but PID is not recorded. This can
+ * happen if the process that originally acquired the origin
+ * exited without releasing it. To ensure correctness, other
+ * processes cannot acquire the origin until all processes
+ * currently using it have released it.
+ */
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("replication origin with ID %d is already active in another process",
+ curstate->roident)));
+ }
}
-
- else if (curstate->acquired_by != acquired_by)
+ else
{
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
- node, acquired_by)));
- }
+ /*
+ * With acquired_by != 0, we need the origin to be active by the
+ * given PID
+ */
+ if (curstate->acquired_by != acquired_by)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("replication origin with ID %d is not active for PID %d",
+ curstate->roident, acquired_by)));
- /*
- * The origin is in use, but PID is not recorded. This can happen if
- * the process that originally acquired the origin exited without
- * releasing it. To ensure correctness, other processes cannot acquire
- * the origin until all processes currently using it have released it.
- */
- else if (curstate->acquired_by == 0 && curstate->refcount > 0)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("replication origin with ID %d is already active in another process",
- curstate->roident)));
+ /*
+ * Here, it is okay to have refcount > 0 as more than one process
+ * can safely re-use the origin.
+ */
+ }
/* ok, found slot */
session_replication_state = curstate;
break;
}
-
- if (session_replication_state == NULL && free_slot == -1)
- ereport(ERROR,
- (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
- errmsg("could not find free replication state slot for replication origin with ID %d",
- node),
- errhint("Increase \"max_active_replication_origins\" and try again.")));
- else if (session_replication_state == NULL)
+ if (session_replication_state == NULL)
{
- if (acquired_by)
+ if (acquired_by != 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use PID %d for inactive replication origin with ID %d",
acquired_by, node)));
/* initialize new slot */
+ if (free_slot == -1)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("could not find free replication state slot for replication origin with ID %d",
+ node),
+ errhint("Increase \"max_active_replication_origins\" and try again.")));
+
session_replication_state = &replication_states[free_slot];
Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));