#include "utils/syscache.h"
#include "utils/usercontext.h"
-#define REMOTE_SEQ_COL_COUNT 10
+#define REMOTE_SEQ_COL_COUNT 11
typedef enum CopySeqResult
{
COPYSEQ_SUCCESS,
COPYSEQ_MISMATCH,
- COPYSEQ_INSUFFICIENT_PERM,
+ COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM,
+ COPYSEQ_PUBLISHER_INSUFFICIENT_PERM,
COPYSEQ_SKIPPED
} CopySeqResult;
* Report discrepancies found during sequence synchronization between
* the publisher and subscriber. Emits warnings for:
* a) mismatched definitions or concurrent rename
- * b) insufficient privileges
- * c) missing sequences on the subscriber
+ * b) insufficient privileges on the subscriber
+ * c) insufficient privileges on the publisher
+ * d) missing sequences on the publisher
* Then raises an ERROR to indicate synchronization failure.
*/
static void
-report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
+report_sequence_errors(List *mismatched_seqs_idx,
+ List *sub_insuffperm_seqs_idx,
+ List *pub_insuffperm_seqs_idx,
List *missing_seqs_idx)
{
StringInfoData seqstr;
/* Quick exit if there are no errors to report */
- if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
+ if (!mismatched_seqs_idx && !sub_insuffperm_seqs_idx &&
+ !pub_insuffperm_seqs_idx && !missing_seqs_idx)
return;
initStringInfo(&seqstr);
seqstr.data));
}
- if (insuffperm_seqs_idx)
+ if (sub_insuffperm_seqs_idx)
{
- get_sequences_string(insuffperm_seqs_idx, &seqstr);
+ get_sequences_string(sub_insuffperm_seqs_idx, &seqstr);
ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg_plural("insufficient privileges on sequence (%s)",
- "insufficient privileges on sequences (%s)",
- list_length(insuffperm_seqs_idx),
+ errmsg_plural("insufficient privileges on subscriber sequence (%s)",
+ "insufficient privileges on subscriber sequences (%s)",
+ list_length(sub_insuffperm_seqs_idx),
+ seqstr.data));
+ }
+
+ if (pub_insuffperm_seqs_idx)
+ {
+ get_sequences_string(pub_insuffperm_seqs_idx, &seqstr);
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg_plural("insufficient privileges on publisher sequence (%s)",
+ "insufficient privileges on publisher sequences (%s)",
+ list_length(pub_insuffperm_seqs_idx),
seqstr.data));
}
bool isnull;
int col = 0;
Datum datum;
+ bool remote_has_select_priv;
Oid remote_typid;
int64 remote_start;
int64 remote_increment;
(LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);
/*
- * The sequence data can be NULL due to insufficient privileges or if the
- * sequence was dropped concurrently (see pg_get_sequence_data()).
+ * The remote sequence state can be NULL if the publisher lacks the
+ * required privileges or if the sequence was dropped concurrently after
+ * it was identified in the catalog snapshot (see pg_get_sequence_data()).
*/
+ remote_has_select_priv = DatumGetBool(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
datum = slot_getattr(slot, ++col, &isnull);
if (isnull)
- return COPYSEQ_SKIPPED;
+ return remote_has_select_priv ? COPYSEQ_SKIPPED :
+ COPYSEQ_PUBLISHER_INSUFFICIENT_PERM;
+
seqinfo_local->last_value = DatumGetInt64(datum);
seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
if (!run_as_owner)
RestoreUserContext(&ucxt);
- return COPYSEQ_INSUFFICIENT_PERM;
+ return COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM;
}
/*
int n_seqinfos = list_length(seqinfos);
List *mismatched_seqs_idx = NIL;
List *missing_seqs_idx = NIL;
- List *insuffperm_seqs_idx = NIL;
+ List *sub_insuffperm_seqs_idx = NIL;
+ List *pub_insuffperm_seqs_idx = NIL;
StringInfoData seqstr;
StringInfoData cmd;
MemoryContext oldctx;
while (cur_batch_base_index < n_seqinfos)
{
- Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
+ Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, BOOLOID, INT8OID,
BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
int batch_size = 0;
int batch_succeeded_count = 0;
int batch_mismatched_count = 0;
int batch_skipped_count = 0;
- int batch_insuffperm_count = 0;
+ int batch_sub_insuffperm_count = 0;
+ int batch_pub_insuffperm_count = 0;
int batch_missing_count;
WalRcvExecResult *res;
* matching.
*/
appendStringInfo(&cmd,
- "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
+ "SELECT s.seqidx, has_sequence_privilege(c.oid, 'SELECT'),\n"
+ " ps.*, seq.seqtypid,\n"
" seq.seqstart, seq.seqincrement, seq.seqmin,\n"
" seq.seqmax, seq.seqcycle\n"
"FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
MemoryContextSwitchTo(oldctx);
batch_mismatched_count++;
break;
- case COPYSEQ_INSUFFICIENT_PERM:
+ case COPYSEQ_SUBSCRIBER_INSUFFICIENT_PERM:
/*
* Remember sequences with insufficient privileges in a
* after the transaction is committed.
*/
oldctx = MemoryContextSwitchTo(ApplyContext);
- insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
- seqidx);
+ sub_insuffperm_seqs_idx = lappend_int(sub_insuffperm_seqs_idx,
+ seqidx);
+ MemoryContextSwitchTo(oldctx);
+ batch_sub_insuffperm_count++;
+ break;
+ case COPYSEQ_PUBLISHER_INSUFFICIENT_PERM:
+
+ /*
+ * Remember sequences for which the publisher lacks the
+ * privileges required by pg_get_sequence_data().
+ */
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+ pub_insuffperm_seqs_idx = lappend_int(pub_insuffperm_seqs_idx,
+ seqidx);
MemoryContextSwitchTo(oldctx);
- batch_insuffperm_count++;
+ batch_pub_insuffperm_count++;
break;
case COPYSEQ_SKIPPED:
batch_missing_count = batch_size - (batch_succeeded_count +
batch_mismatched_count +
- batch_insuffperm_count +
+ batch_sub_insuffperm_count +
+ batch_pub_insuffperm_count +
batch_skipped_count);
elog(DEBUG1,
- "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
+ "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d subscriber insufficient permission, %d publisher insufficient permission, %d missing from publisher, %d skipped",
MySubscription->name,
(cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
batch_size, batch_succeeded_count, batch_mismatched_count,
- batch_insuffperm_count, batch_missing_count, batch_skipped_count);
+ batch_sub_insuffperm_count, batch_pub_insuffperm_count, batch_missing_count, batch_skipped_count);
/* Commit this batch, and prepare for next batch */
CommitTransactionCommand();
}
/* Report mismatches, permission issues, or missing sequences */
- report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
- missing_seqs_idx);
+ report_sequence_errors(mismatched_seqs_idx, sub_insuffperm_seqs_idx,
+ pub_insuffperm_seqs_idx, missing_seqs_idx);
}
/*