</para></entry>
</row>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>seq_sync_error_count</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of times an error occurred in the sequence synchronization
+ worker. A single worker synchronizes all sequences, so one error
+ increment may represent failures across multiple sequences.
+ </para></entry>
+ </row>
+
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>sync_error_count</structfield> <type>bigint</type>
ss.subid,
s.subname,
ss.apply_error_count,
+ ss.seq_sync_error_count,
ss.sync_error_count,
ss.confl_insert_exists,
ss.confl_update_origin_differs,
* idle state.
*/
AbortOutOfAnyTransaction();
+ pgstat_report_subscription_error(MySubscription->oid,
+ WORKERTYPE_SEQUENCESYNC);
+
PG_RE_THROW();
}
}
* idle state.
*/
AbortOutOfAnyTransaction();
- pgstat_report_subscription_error(MySubscription->oid, false);
+ pgstat_report_subscription_error(MySubscription->oid,
+ WORKERTYPE_TABLESYNC);
PG_RE_THROW();
}
* idle state.
*/
AbortOutOfAnyTransaction();
- pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker());
+ pgstat_report_subscription_error(MySubscription->oid,
+ MyLogicalRepWorker->type);
PG_RE_THROW();
}
RESUME_INTERRUPTS();
- if (am_leader_apply_worker() || am_tablesync_worker())
- {
- /*
- * Report the worker failed during either table synchronization or
- * apply.
- */
- pgstat_report_subscription_error(MyLogicalRepWorker->subid,
- !am_tablesync_worker());
- }
+ /*
+ * Report the worker failed during sequence synchronization, table
+ * synchronization, or apply.
+ */
+ pgstat_report_subscription_error(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->type);
/* Disable the subscription */
StartTransactionCommand();
#include "postgres.h"
+#include "replication/worker_internal.h"
#include "utils/pgstat_internal.h"
* Report a subscription error.
*/
void
-pgstat_report_subscription_error(Oid subid, bool is_apply_error)
+pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype)
{
PgStat_EntryRef *entry_ref;
PgStat_BackendSubEntry *pending;
InvalidOid, subid, NULL);
pending = entry_ref->pending;
- if (is_apply_error)
- pending->apply_error_count++;
- else
- pending->sync_error_count++;
+ switch (wtype)
+ {
+ case WORKERTYPE_APPLY:
+ pending->apply_error_count++;
+ break;
+
+ case WORKERTYPE_SEQUENCESYNC:
+ pending->seq_sync_error_count++;
+ break;
+
+ case WORKERTYPE_TABLESYNC:
+ pending->sync_error_count++;
+ break;
+
+ default:
+ /* Should never happen. */
+ Assert(0);
+ break;
+ }
}
/*
#define SUB_ACC(fld) shsubent->stats.fld += localent->fld
SUB_ACC(apply_error_count);
+ SUB_ACC(seq_sync_error_count);
SUB_ACC(sync_error_count);
for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
SUB_ACC(conflict_count[i]);
Datum
pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 12
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 13
Oid subid = PG_GETARG_OID(0);
TupleDesc tupdesc;
Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
OIDOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "apply_error_count",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 3, "seq_sync_error_count",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 4, "sync_error_count",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_insert_exists",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_origin_differs",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_deleted",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_exists",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_deleted",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_update_missing",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_origin_differs",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_delete_missing",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 12, "confl_multiple_unique_conflicts",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
/* apply_error_count */
values[i++] = Int64GetDatum(subentry->apply_error_count);
+ /* seq_sync_error_count */
+ values[i++] = Int64GetDatum(subentry->seq_sync_error_count);
+
/* sync_error_count */
values[i++] = Int64GetDatum(subentry->sync_error_count);
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202511051
+#define CATALOG_VERSION_NO 202511071
#endif
{ oid => '6231', descr => 'statistics: information about subscription stats',
proname => 'pg_stat_get_subscription_stats', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
- proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
- proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_deleted,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
+ proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{subid,subid,apply_error_count,seq_sync_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_deleted,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
prosrc => 'pg_stat_get_subscription_stats' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
#include "portability/instr_time.h"
#include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */
#include "replication/conflict.h"
+#include "replication/worker_internal.h"
#include "utils/backend_progress.h" /* for backward compatibility */ /* IWYU pragma: export */
#include "utils/backend_status.h" /* for backward compatibility */ /* IWYU pragma: export */
#include "utils/pgstat_kind.h"
typedef struct PgStat_BackendSubEntry
{
PgStat_Counter apply_error_count;
+ PgStat_Counter seq_sync_error_count;
PgStat_Counter sync_error_count;
PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
} PgStat_BackendSubEntry;
typedef struct PgStat_StatSubEntry
{
PgStat_Counter apply_error_count;
+ PgStat_Counter seq_sync_error_count;
PgStat_Counter sync_error_count;
PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
TimestampTz stat_reset_timestamp;
* Functions in pgstat_subscription.c
*/
-extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
+extern void pgstat_report_subscription_error(Oid subid,
+ LogicalRepWorkerType wtype);
extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type);
extern void pgstat_create_subscription(Oid subid);
extern void pgstat_drop_subscription(Oid subid);
pg_stat_subscription_stats| SELECT ss.subid,
s.subname,
ss.apply_error_count,
+ ss.seq_sync_error_count,
ss.sync_error_count,
ss.confl_insert_exists,
ss.confl_update_origin_differs,
ss.confl_multiple_unique_conflicts,
ss.stats_reset
FROM pg_subscription s,
- LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_deleted, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
+ LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, seq_sync_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_deleted, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
pg_stat_sys_indexes| SELECT relid,
indexrelid,
schemaname,
sub create_sub_pub_w_errors
{
- my ($node_publisher, $node_subscriber, $db, $table_name) = @_;
- # Initial table setup on both publisher and subscriber. On subscriber we
- # create the same tables but with primary keys. Also, insert some data that
- # will conflict with the data replicated from publisher later.
+ my ($node_publisher, $node_subscriber, $db, $table_name, $sequence_name)
+ = @_;
+ # Initial table and sequence setup on both publisher and subscriber.
+ #
+ # Tables: Created on both nodes, but the subscriber version includes
+ # primary keys and pre-populated data that will intentionally conflict with
+ # replicated data from the publisher.
+ #
+ # Sequences: Created on both nodes with different INCREMENT values to
+ # intentionally trigger replication conflicts.
$node_publisher->safe_psql(
$db,
qq[
CREATE TABLE $table_name(a int);
ALTER TABLE $table_name REPLICA IDENTITY FULL;
INSERT INTO $table_name VALUES (1);
+ CREATE SEQUENCE $sequence_name;
COMMIT;
]);
$node_subscriber->safe_psql(
BEGIN;
CREATE TABLE $table_name(a int primary key);
INSERT INTO $table_name VALUES (1);
+ CREATE SEQUENCE $sequence_name INCREMENT BY 10;
COMMIT;
]);
# Set up publication.
my $pub_name = $table_name . '_pub';
+ my $pub_seq_name = $sequence_name . '_pub';
my $publisher_connstr = $node_publisher->connstr . qq( dbname=$db);
- $node_publisher->safe_psql($db,
- qq(CREATE PUBLICATION $pub_name FOR TABLE $table_name));
+ $node_publisher->safe_psql(
+ $db,
+ qq[
+ CREATE PUBLICATION $pub_name FOR TABLE $table_name;
+ CREATE PUBLICATION $pub_seq_name FOR ALL SEQUENCES;
+ ]);
# Create subscription. The tablesync for table on subscription will enter into
- # infinite error loop due to violating the unique constraint.
+ # infinite error loop due to violating the unique constraint. The sequencesync
+ # will also fail due to different sequence increment values on publisher and
+ # subscriber.
my $sub_name = $table_name . '_sub';
$node_subscriber->safe_psql($db,
- qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name)
+ qq(CREATE SUBSCRIPTION $sub_name CONNECTION '$publisher_connstr' PUBLICATION $pub_name, $pub_seq_name)
);
$node_publisher->wait_for_catchup($sub_name);
- # Wait for the tablesync error to be reported.
+ # Wait for the tablesync and sequencesync error to be reported.
$node_subscriber->poll_query_until(
$db,
qq[
- SELECT sync_error_count > 0
- FROM pg_stat_subscription_stats
- WHERE subname = '$sub_name'
+ SELECT count(1) = 1 FROM pg_stat_subscription_stats
+ WHERE subname = '$sub_name' AND seq_sync_error_count > 0 AND sync_error_count > 0
+ ])
+ or die
+ qq(Timed out while waiting for sequencesync errors and tablesync errors for subscription '$sub_name');
+
+ # Change the sequence INCREMENT value back to the default on the subscriber
+ # so it doesn't error out.
+ $node_subscriber->safe_psql($db,
+ qq(ALTER SEQUENCE $sequence_name INCREMENT 1));
+
+ # Wait for sequencesync to finish.
+ $node_subscriber->poll_query_until(
+ $db,
+ qq[
+ SELECT count(1) = 1 FROM pg_subscription_rel
+ WHERE srrelid = '$sequence_name'::regclass AND srsubstate = 'r'
])
or die
- qq(Timed out while waiting for tablesync errors for subscription '$sub_name');
+ qq(Timed out while waiting for subscriber to synchronize data for sequence '$sequence_name'.);
# Truncate test_tab1 so that tablesync worker can continue.
$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
# Create the publication and subscription with sync and apply errors
my $table1_name = 'test_tab1';
+my $sequence1_name = 'test_seq1';
my ($pub1_name, $sub1_name) =
create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
- $table1_name);
+ $table1_name, $sequence1_name);
-# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
+# Apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 and stats_reset
+# timestamp is NULL.
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count > 0,
+ seq_sync_error_count > 0,
sync_error_count > 0,
confl_insert_exists > 0,
confl_delete_missing > 0,
FROM pg_stat_subscription_stats
WHERE subname = '$sub1_name')
),
- qq(t|t|t|t|t),
- qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.)
+ qq(t|t|t|t|t|t),
+ qq(Check that apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.)
);
# Reset a single subscription
qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')))
);
-# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
+# Apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and
+# stats_reset timestamp is not NULL.
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count = 0,
+ seq_sync_error_count = 0,
sync_error_count = 0,
confl_insert_exists = 0,
confl_delete_missing = 0,
FROM pg_stat_subscription_stats
WHERE subname = '$sub1_name')
),
- qq(t|t|t|t|t),
- qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
+ qq(t|t|t|t|t|t),
+ qq(Confirm that apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
);
# Get reset timestamp
# Make second subscription and publication
my $table2_name = 'test_tab2';
+my $sequence2_name = 'test_seq2';
my ($pub2_name, $sub2_name) =
create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
- $table2_name);
+ $table2_name, $sequence2_name);
-# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
+# Apply errors, sequencesync errors, tablesync errors, and conflicts are > 0
+# and stats_reset timestamp is NULL
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count > 0,
+ seq_sync_error_count > 0,
sync_error_count > 0,
confl_insert_exists > 0,
confl_delete_missing > 0,
FROM pg_stat_subscription_stats
WHERE subname = '$sub2_name')
),
- qq(t|t|t|t|t),
- qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.)
+ qq(t|t|t|t|t|t),
+ qq(Confirm that apply errors, sequencesync errors, tablesync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.)
);
# Reset all subscriptions
$node_subscriber->safe_psql($db,
qq(SELECT pg_stat_reset_subscription_stats(NULL)));
-# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
+# Apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and
+# stats_reset timestamp is not NULL.
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count = 0,
+ seq_sync_error_count = 0,
sync_error_count = 0,
confl_insert_exists = 0,
confl_delete_missing = 0,
FROM pg_stat_subscription_stats
WHERE subname = '$sub1_name')
),
- qq(t|t|t|t|t),
- qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
+ qq(t|t|t|t|t|t),
+ qq(Confirm that apply errors, sequencesync errors, tablesync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
);
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count = 0,
+ seq_sync_error_count = 0,
sync_error_count = 0,
confl_insert_exists = 0,
confl_delete_missing = 0,
FROM pg_stat_subscription_stats
WHERE subname = '$sub2_name')
),
- qq(t|t|t|t|t),
- qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
+ qq(t|t|t|t|t|t),
+ qq(Confirm that apply errors, sequencesync errors, tablesync errors, errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
);
$reset_time1 = $node_subscriber->safe_psql($db,