]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Fix running out of file descriptors for spill files.
authorAmit Kapila <akapila@postgresql.org>
Thu, 2 Jan 2020 05:38:07 +0000 (11:08 +0530)
committerAmit Kapila <akapila@postgresql.org>
Thu, 2 Jan 2020 06:41:55 +0000 (12:11 +0530)
Currently while decoding changes, if the number of changes exceeds a
certain threshold, we spill those to disk.  And this happens for each
(sub)transaction.  Now, while reading all these files, we don't close them
until we read all the files.  While reading these files, if the number of
such files exceeds the maximum number of file descriptors, the operation
errors out.

Use PathNameOpenFile interface to open these files as that internally has
the mechanism to release kernel FDs as needed to get us under the
max_safe_fds limit.

Reported-by: Amit Khandekar
Author: Amit Khandekar
Reviewed-by: Amit Kapila
Backpatch-through: 9.4
Discussion: https://postgr.es/m/CAJ3gD9c-sECEn79zXw4yBnBdOttacoE-6gAyP0oy60nfs_sabQ@mail.gmail.com

src/backend/replication/logical/reorderbuffer.c
src/test/recovery/t/006_logical_decoding.pl

index be441c7c7d6e5155e485872fbef28ec9c6b28f51..6dcd97313f78a4f75441188e54760d50e1909eea 100644 (file)
@@ -103,7 +103,7 @@ typedef struct ReorderBufferIterTXNEntry
        XLogRecPtr      lsn;
        ReorderBufferChange *change;
        ReorderBufferTXN *txn;
-       int                     fd;
+       File            fd;
        XLogSegNo       segno;
 } ReorderBufferIterTXNEntry;
 
@@ -181,7 +181,8 @@ static void AssertTXNLsnOrder(ReorderBuffer *rb);
  * subtransactions
  * ---------------------------------------
  */
-static ReorderBufferIterTXNState *ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
+                                                                        ReorderBufferIterTXNState *volatile *iter_state);
 static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
 static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
                                                   ReorderBufferIterTXNState *state);
@@ -197,7 +198,7 @@ static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
                                                         int fd, ReorderBufferChange *change);
 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
-                                                       int *fd, XLogSegNo *segno);
+                                                       File *fd, XLogSegNo *segno);
 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
                                                   char *change);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
@@ -953,15 +954,23 @@ ReorderBufferIterCompare(Datum a, Datum b, void *arg)
 /*
  * Allocate & initialize an iterator which iterates in lsn order over a
  * transaction and all its subtransactions.
+ *
+ * Note: The iterator state is returned through iter_state parameter rather
+ * than the function's return value.  This is because the state gets cleaned up
+ * in a PG_CATCH block in the caller, so we want to make sure the caller gets
+ * back the state even if this function throws an exception.
  */
-static ReorderBufferIterTXNState *
-ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
+static void
+ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
+                                                ReorderBufferIterTXNState *volatile *iter_state)
 {
        Size            nr_txns = 0;
        ReorderBufferIterTXNState *state;
        dlist_iter      cur_txn_i;
        int32           off;
 
+       *iter_state = NULL;
+
        /*
         * Calculate the size of our heap: one element for every transaction that
         * contains changes.  (Besides the transactions already in the reorder
@@ -1005,6 +1014,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
                                                                          ReorderBufferIterCompare,
                                                                          state);
 
+       /* Now that the state fields are initialized, it is safe to return it. */
+       *iter_state = state;
+
        /*
         * Now insert items into the binary heap, in an unordered fashion.  (We
         * will run a heap assembly step at the end; this is more efficient.)
@@ -1067,8 +1079,6 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
        /* assemble a valid binary heap */
        binaryheap_build(state->heap);
-
-       return state;
 }
 
 /*
@@ -1172,7 +1182,7 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
        for (off = 0; off < state->nr_txns; off++)
        {
                if (state->entries[off].fd != -1)
-                       CloseTransientFile(state->entries[off].fd);
+                       FileClose(state->entries[off].fd);
        }
 
        /* free memory we might have "leaked" in the last *Next call */
@@ -1508,7 +1518,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 
                rb->begin(rb, txn);
 
-               iterstate = ReorderBufferIterTXNInit(rb, txn);
+               ReorderBufferIterTXNInit(rb, txn, &iterstate);
                while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
                {
                        Relation        relation = NULL;
@@ -2465,7 +2475,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
  */
 static Size
 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
-                                                       int *fd, XLogSegNo *segno)
+                                                       File *fd, XLogSegNo *segno)
 {
        Size            restored = 0;
        XLogSegNo       last_segno;
@@ -2510,7 +2520,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
                        ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
                                                                                *segno);
 
-                       *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0);
+                       *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY, 0);
                        if (*fd < 0 && errno == ENOENT)
                        {
                                *fd = -1;
@@ -2531,14 +2541,13 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
                 * end of this file.
                 */
                ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
-               pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
-               readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
-               pgstat_report_wait_end();
+               readBytes = FileRead(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange),
+                                                        WAIT_EVENT_REORDER_BUFFER_READ);
 
                /* eof */
                if (readBytes == 0)
                {
-                       CloseTransientFile(*fd);
+                       FileClose(*fd);
                        *fd = -1;
                        (*segno)++;
                        continue;
@@ -2560,10 +2569,10 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
                                                                          sizeof(ReorderBufferDiskChange) + ondisk->size);
                ondisk = (ReorderBufferDiskChange *) rb->outbuf;
 
-               pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
-               readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
-                                                ondisk->size - sizeof(ReorderBufferDiskChange));
-               pgstat_report_wait_end();
+               readBytes = FileRead(*fd,
+                                                        rb->outbuf + sizeof(ReorderBufferDiskChange),
+                                                        ondisk->size - sizeof(ReorderBufferDiskChange),
+                                                        WAIT_EVENT_REORDER_BUFFER_READ);
 
                if (readBytes < 0)
                        ereport(ERROR,
index 3f7f135291ded32ce2267c8ffed6c5f1ce655388..2942fcf1e45bb15dd62aa4a1790e5ac0c6c0f9a0 100644 (file)
@@ -7,7 +7,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 10;
+use Test::More tests => 11;
 use Config;
 
 # Initialize master node
@@ -133,5 +133,42 @@ is($node_master->psql('postgres', 'DROP DATABASE otherdb'),
 is($node_master->slot('otherdb_slot')->{'slot_name'},
        undef, 'logical slot was actually dropped with DB');
 
+# Test to ensure that we don't run out of file descriptors even if there
+# are more spill files than maxAllocatedDescs.
+
+# Set max_files_per_process to a small value to make it more likely to run out
+# of max open file descriptors.
+$node_master->safe_psql('postgres',
+       'ALTER SYSTEM SET max_files_per_process = 26;');
+$node_master->restart;
+
+$node_master->safe_psql(
+       'postgres', q{
+do $$
+BEGIN
+    FOR i IN 1..10 LOOP
+        BEGIN
+            INSERT INTO decoding_test(x) SELECT generate_series(1,5000);
+        EXCEPTION
+            when division_by_zero then perform 'dummy';
+        END;
+    END LOOP;
+END $$;
+});
+
+$result = $node_master->safe_psql('postgres',
+       qq[
+SELECT data from pg_logical_slot_get_changes('test_slot', NULL, NULL)
+    WHERE data LIKE '%INSERT%' ORDER BY lsn LIMIT 1;
+]);
+
+$expected = q{table public.decoding_test: INSERT: x[integer]:1 y[text]:null};
+is($result, $expected, 'got expected output from spilling subxacts session');
+
+# Reset back max_files_per_process
+$node_master->safe_psql('postgres',
+       'ALTER SYSTEM SET max_files_per_process = DEFAULT;');
+$node_master->restart;
+
 # done with the node
 $node_master->stop;