static bool CLOGPagePrecedes(int64 page1, int64 page2);
+static int clog_errdetail_for_io_error(const void *opaque_data);
static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXact,
Oid oldestXactDb);
static void TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
* write-busy, since we don't care if the update reaches disk sooner than
* we think.
*/
- slotno = SimpleLruReadPage(XactCtl, pageno, !XLogRecPtrIsValid(lsn),
- xid);
+ slotno = SimpleLruReadPage(XactCtl, pageno, !XLogRecPtrIsValid(lsn), &xid);
/*
* Set the main transaction id, if any.
/* lock is acquired by SimpleLruReadPage_ReadOnly */
- slotno = SimpleLruReadPage_ReadOnly(XactCtl, pageno, xid);
+ slotno = SimpleLruReadPage_ReadOnly(XactCtl, pageno, &xid);
byteptr = XactCtl->shared->page_buffer[slotno] + byteno;
status = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
Assert(transaction_buffers != 0);
XactCtl->PagePrecedes = CLOGPagePrecedes;
+ XactCtl->errdetail_for_io_error = clog_errdetail_for_io_error;
SimpleLruInit(XactCtl, "transaction", CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE,
"pg_xact", LWTRANCHE_XACT_BUFFER,
LWTRANCHE_XACT_SLRU, SYNC_HANDLER_CLOG, false);
int slotno;
char *byteptr;
- slotno = SimpleLruReadPage(XactCtl, pageno, false, xid);
+ slotno = SimpleLruReadPage(XactCtl, pageno, false, &xid);
byteptr = XactCtl->shared->page_buffer[slotno] + byteno;
/* Zero so-far-unused positions in the current byte */
TransactionIdPrecedes(xid1, xid2 + CLOG_XACTS_PER_PAGE - 1));
}
+static int
+clog_errdetail_for_io_error(const void *opaque_data)
+{
+ TransactionId xid = *(const TransactionId *) opaque_data;
+
+ return errdetail("Could not access commit status of transaction %u.", xid);
+}
+
/*
* Write a TRUNCATE xlog record
ReplOriginId nodeid, int slotno);
static void error_commit_ts_disabled(void);
static bool CommitTsPagePrecedes(int64 page1, int64 page2);
+static int commit_ts_errdetail_for_io_error(const void *opaque_data);
static void ActivateCommitTs(void);
static void DeactivateCommitTs(void);
static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
LWLockAcquire(lock, LW_EXCLUSIVE);
- slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
+ slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, &xid);
TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
for (i = 0; i < nsubxids; i++)
}
/* lock is acquired by SimpleLruReadPage_ReadOnly */
- slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
+ slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, &xid);
memcpy(&entry,
CommitTsCtl->shared->page_buffer[slotno] +
SizeOfCommitTimestampEntry * entryno,
Assert(commit_timestamp_buffers != 0);
CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
+ CommitTsCtl->errdetail_for_io_error = commit_ts_errdetail_for_io_error;
SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
"pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
LWTRANCHE_COMMITTS_SLRU,
TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
}
+static int
+commit_ts_errdetail_for_io_error(const void *opaque_data)
+{
+ TransactionId xid = *(const TransactionId *) opaque_data;
+
+ return errdetail("Could not access commit timestamp of transaction %u.", xid);
+}
/*
* Write a TRUNCATE xlog record
MultiXactMember *members);
/* management of SLRU infrastructure */
+
+/* opaque_data type for MultiXactMemberIoErrorDetail */
+typedef struct MultiXactMemberSlruReadContext
+{
+ MultiXactId multi;
+ MultiXactOffset offset;
+} MultiXactMemberSlruReadContext;
+
static bool MultiXactOffsetPagePrecedes(int64 page1, int64 page2);
static bool MultiXactMemberPagePrecedes(int64 page1, int64 page2);
+static int MultiXactOffsetIoErrorDetail(const void *opaque_data);
+static int MultiXactMemberIoErrorDetail(const void *opaque_data);
static void ExtendMultiXactOffset(MultiXactId multi);
static void ExtendMultiXactMember(MultiXactOffset offset, int nmembers);
static void SetOldestOffset(void);
lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
- /*
- * Note: we pass the MultiXactId to SimpleLruReadPage as the "transaction"
- * to complain about if there's any I/O error. This is kinda bogus, but
- * since the errors will always give the full pathname, it should be clear
- * enough that a MultiXactId is really involved. Perhaps someday we'll
- * take the trouble to generalize the slru.c error reporting code.
- */
- slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, multi);
+ slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, &multi);
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
offptr += entryno;
lock = SimpleLruGetBankLock(MultiXactOffsetCtl, next_pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
- slotno = SimpleLruReadPage(MultiXactOffsetCtl, next_pageno, true, next);
+ slotno = SimpleLruReadPage(MultiXactOffsetCtl, next_pageno, true, &next);
next_offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
next_offptr += next_entryno;
}
if (pageno != prev_pageno)
{
+ MultiXactMemberSlruReadContext slru_read_context = {multi, offset};
+
/*
* MultiXactMember SLRU page is changed so check if this new page
* fall into the different SLRU bank then release the old bank's
LWLockAcquire(lock, LW_EXCLUSIVE);
prevlock = lock;
}
- slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, multi);
+ slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true,
+ &slru_read_context);
prev_pageno = pageno;
}
LWLockAcquire(lock, LW_EXCLUSIVE);
/* read this multi's offset */
- slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, multi);
+ slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, &multi);
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
offptr += entryno;
offset = *offptr;
LWLockAcquire(newlock, LW_EXCLUSIVE);
lock = newlock;
}
- slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, tmpMXact);
+ slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, &tmpMXact);
}
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
if (pageno != prev_pageno)
{
+ MultiXactMemberSlruReadContext slru_read_context = {multi, offset};
LWLock *newlock;
/*
LWLockAcquire(newlock, LW_EXCLUSIVE);
lock = newlock;
}
-
- slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, multi);
+ slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true,
+ &slru_read_context);
prev_pageno = pageno;
}
MultiXactOffsetCtl->PagePrecedes = MultiXactOffsetPagePrecedes;
MultiXactMemberCtl->PagePrecedes = MultiXactMemberPagePrecedes;
+ MultiXactOffsetCtl->errdetail_for_io_error = MultiXactOffsetIoErrorDetail;
+ MultiXactMemberCtl->errdetail_for_io_error = MultiXactMemberIoErrorDetail;
SimpleLruInit(MultiXactOffsetCtl,
"multixact_offset", multixact_offset_buffers, 0,
if (entryno == 0 || nextMXact == FirstMultiXactId)
slotno = SimpleLruZeroPage(MultiXactOffsetCtl, pageno);
else
- slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, nextMXact);
+ slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, &nextMXact);
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
offptr += entryno;
flagsoff = MXOffsetToFlagsOffset(offset);
if (flagsoff != 0)
{
+ MultiXactMemberSlruReadContext slru_read_context = {InvalidMultiXactId, offset};
int slotno;
TransactionId *xidptr;
int memberoff;
LWLockAcquire(lock, LW_EXCLUSIVE);
memberoff = MXOffsetToMemberOffset(offset);
- slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, offset);
+ slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, &slru_read_context);
xidptr = (TransactionId *)
(MultiXactMemberCtl->shared->page_buffer[slotno] + memberoff);
return false;
/* lock is acquired by SimpleLruReadPage_ReadOnly */
- slotno = SimpleLruReadPage_ReadOnly(MultiXactOffsetCtl, pageno, multi);
+ slotno = SimpleLruReadPage_ReadOnly(MultiXactOffsetCtl, pageno, &multi);
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
offptr += entryno;
offset = *offptr;
return page1 < page2;
}
+static int
+MultiXactOffsetIoErrorDetail(const void *opaque_data)
+{
+ MultiXactId multixid = *(const MultiXactId *) opaque_data;
+
+ return errdetail("Could not access offset of multixact %u.", multixid);
+}
+
+static int
+MultiXactMemberIoErrorDetail(const void *opaque_data)
+{
+ const MultiXactMemberSlruReadContext *context = opaque_data;
+
+ if (MultiXactIdIsValid(context->multi))
+ return errdetail("Could not access member of multixact %u at offset %" PRIu64 ".",
+ context->multi, context->offset);
+ else
+ return errdetail("Could not access multixact member at offset %" PRIu64 ".",
+ context->offset);
+}
+
/*
* Decide which of two MultiXactIds is earlier.
*
static bool SlruPhysicalReadPage(SlruCtl ctl, int64 pageno, int slotno);
static bool SlruPhysicalWritePage(SlruCtl ctl, int64 pageno, int slotno,
SlruWriteAll fdata);
-static void SlruReportIOError(SlruCtl ctl, int64 pageno, TransactionId xid);
+static void SlruReportIOError(SlruCtl ctl, int64 pageno,
+ const void *opaque_data);
static int SlruSelectLRUPage(SlruCtl ctl, int64 pageno);
static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename,
Assert(nslots <= SLRU_MAX_ALLOWED_BUFFERS);
+ Assert(ctl->PagePrecedes != NULL);
+ Assert(ctl->errdetail_for_io_error != NULL);
+
shared = (SlruShared) ShmemInitStruct(name,
SimpleLruShmemSize(nslots, nlsns),
&found);
* that modification of the page is safe. If write_ok is false then we
* will not return the page until it is not undergoing active I/O.
*
- * The passed-in xid is used only for error reporting, and may be
- * InvalidTransactionId if no specific xid is associated with the action.
+ * On error, the passed-in 'opaque_data' is passed to the
+ * 'errdetail_for_io_error' callback, to provide details on the operation that
+ * failed. It is only used for error reporting.
*
* Return value is the shared-buffer slot number now holding the page.
* The buffer's LRU access info is updated.
*/
int
SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
- TransactionId xid)
+ const void *opaque_data)
{
SlruShared shared = ctl->shared;
LWLock *banklock = SimpleLruGetBankLock(ctl, pageno);
/* Now it's okay to ereport if we failed */
if (!ok)
- SlruReportIOError(ctl, pageno, xid);
+ SlruReportIOError(ctl, pageno, opaque_data);
SlruRecentlyUsed(shared, slotno);
* The page number must correspond to an already-initialized page.
* The caller must intend only read-only access to the page.
*
- * The passed-in xid is used only for error reporting, and may be
- * InvalidTransactionId if no specific xid is associated with the action.
+ * On error, the passed-in 'opaque_data' is passed to the
+ * 'errdetail_for_io_error' callback, to provide details on the operation that
+ * failed. It is only used for error reporting.
*
* Return value is the shared-buffer slot number now holding the page.
* The buffer's LRU access info is updated.
* It is unspecified whether the lock will be shared or exclusive.
*/
int
-SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
+SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, const void *opaque_data)
{
SlruShared shared = ctl->shared;
LWLock *banklock = SimpleLruGetBankLock(ctl, pageno);
LWLockRelease(banklock);
LWLockAcquire(banklock, LW_EXCLUSIVE);
- return SimpleLruReadPage(ctl, pageno, true, xid);
+ return SimpleLruReadPage(ctl, pageno, true, opaque_data);
}
/*
/* Now it's okay to ereport if we failed */
if (!ok)
- SlruReportIOError(ctl, pageno, InvalidTransactionId);
+ SlruReportIOError(ctl, pageno, NULL);
/* If part of a checkpoint, count this as a SLRU buffer written. */
if (fdata)
/* report error normally */
slru_errcause = SLRU_OPEN_FAILED;
slru_errno = errno;
- SlruReportIOError(ctl, pageno, 0);
+ SlruReportIOError(ctl, pageno, NULL);
}
if ((endpos = lseek(fd, 0, SEEK_END)) < 0)
{
slru_errcause = SLRU_SEEK_FAILED;
slru_errno = errno;
- SlruReportIOError(ctl, pageno, 0);
+ SlruReportIOError(ctl, pageno, NULL);
}
result = endpos >= (off_t) (offset + BLCKSZ);
* SlruPhysicalWritePage. Call this after cleaning up shared-memory state.
*/
static void
-SlruReportIOError(SlruCtl ctl, int64 pageno, TransactionId xid)
+SlruReportIOError(SlruCtl ctl, int64 pageno, const void *opaque_data)
{
int64 segno = pageno / SLRU_PAGES_PER_SEGMENT;
int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
case SLRU_OPEN_FAILED:
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not access status of transaction %u", xid),
- errdetail("Could not open file \"%s\": %m.", path)));
+ errmsg("could not open file \"%s\": %m", path),
+ opaque_data ? ctl->errdetail_for_io_error(opaque_data) : 0));
break;
case SLRU_SEEK_FAILED:
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not access status of transaction %u", xid),
- errdetail("Could not seek in file \"%s\" to offset %d: %m.",
- path, offset)));
+ errmsg("could not seek in file \"%s\" to offset %d: %m",
+ path, offset),
+ opaque_data ? ctl->errdetail_for_io_error(opaque_data) : 0));
break;
case SLRU_READ_FAILED:
if (errno)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not access status of transaction %u", xid),
- errdetail("Could not read from file \"%s\" at offset %d: %m.",
- path, offset)));
+ errmsg("could not read from file \"%s\" at offset %d: %m",
+ path, offset),
+ opaque_data ? ctl->errdetail_for_io_error(opaque_data) : 0));
else
ereport(ERROR,
- (errmsg("could not access status of transaction %u", xid),
- errdetail("Could not read from file \"%s\" at offset %d: read too few bytes.", path, offset)));
+ (errmsg("could not read from file \"%s\" at offset %d: read too few bytes",
+ path, offset),
+ opaque_data ? ctl->errdetail_for_io_error(opaque_data) : 0));
break;
case SLRU_WRITE_FAILED:
if (errno)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not access status of transaction %u", xid),
- errdetail("Could not write to file \"%s\" at offset %d: %m.",
- path, offset)));
+ errmsg("Could not write to file \"%s\" at offset %d: %m",
+ path, offset),
+ opaque_data ? ctl->errdetail_for_io_error(opaque_data) : 0));
else
ereport(ERROR,
- (errmsg("could not access status of transaction %u", xid),
- errdetail("Could not write to file \"%s\" at offset %d: wrote too few bytes.",
- path, offset)));
+ (errmsg("Could not write to file \"%s\" at offset %d: wrote too few bytes.",
+ path, offset),
+ opaque_data ? ctl->errdetail_for_io_error(opaque_data) : 0));
break;
case SLRU_FSYNC_FAILED:
ereport(data_sync_elevel(ERROR),
(errcode_for_file_access(),
- errmsg("could not access status of transaction %u", xid),
- errdetail("Could not fsync file \"%s\": %m.",
- path)));
+ errmsg("could not fsync file \"%s\": %m",
+ path),
+ opaque_data ? ctl->errdetail_for_io_error(opaque_data) : 0));
break;
case SLRU_CLOSE_FAILED:
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not access status of transaction %u", xid),
- errdetail("Could not close file \"%s\": %m.",
- path)));
+ errmsg("could not close file \"%s\": %m",
+ path),
+ opaque_data ? ctl->errdetail_for_io_error(opaque_data) : 0));
break;
default:
/* can't get here, we trust */
}
}
if (!ok)
- SlruReportIOError(ctl, pageno, InvalidTransactionId);
+ SlruReportIOError(ctl, pageno, NULL);
/* Ensure that directory entries for new files are on disk. */
if (ctl->sync_handler != SYNC_HANDLER_NONE)
static bool SubTransPagePrecedes(int64 page1, int64 page2);
+static int subtrans_errdetail_for_io_error(const void *opaque_data);
/*
lock = SimpleLruGetBankLock(SubTransCtl, pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
- slotno = SimpleLruReadPage(SubTransCtl, pageno, true, xid);
+ slotno = SimpleLruReadPage(SubTransCtl, pageno, true, &xid);
ptr = (TransactionId *) SubTransCtl->shared->page_buffer[slotno];
ptr += entryno;
/* lock is acquired by SimpleLruReadPage_ReadOnly */
- slotno = SimpleLruReadPage_ReadOnly(SubTransCtl, pageno, xid);
+ slotno = SimpleLruReadPage_ReadOnly(SubTransCtl, pageno, &xid);
ptr = (TransactionId *) SubTransCtl->shared->page_buffer[slotno];
ptr += entryno;
Assert(subtransaction_buffers != 0);
SubTransCtl->PagePrecedes = SubTransPagePrecedes;
+ SubTransCtl->errdetail_for_io_error = subtrans_errdetail_for_io_error;
SimpleLruInit(SubTransCtl, "subtransaction", SUBTRANSShmemBuffers(), 0,
"pg_subtrans", LWTRANCHE_SUBTRANS_BUFFER,
LWTRANCHE_SUBTRANS_SLRU, SYNC_HANDLER_NONE, false);
return (TransactionIdPrecedes(xid1, xid2) &&
TransactionIdPrecedes(xid1, xid2 + SUBTRANS_XACTS_PER_PAGE - 1));
}
+
+static int
+subtrans_errdetail_for_io_error(const void *opaque_data)
+{
+ TransactionId xid = *(const TransactionId *) opaque_data;
+
+ return errdetail("Could not access subtransaction status of transaction %u.", xid);
+}
int max_notify_queue_pages = 1048576;
/* local function prototypes */
+static int asyncQueueErrdetailForIoError(const void *opaque_data);
static inline int64 asyncQueuePageDiff(int64 p, int64 q);
static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void);
+static int
+asyncQueueErrdetailForIoError(const void *opaque_data)
+{
+ const QueuePosition *position = opaque_data;
+
+ return errdetail("Could not access async queue at page %" PRId64 ", offset %d.",
+ position->page, position->offset);
+}
+
/*
* Compute the difference between two queue page numbers.
* Previously this function accounted for a wraparound.
* names are used in order to avoid wraparound.
*/
NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
+ NotifyCtl->errdetail_for_io_error = asyncQueueErrdetailForIoError;
SimpleLruInit(NotifyCtl, "notify", notify_buffers, 0,
"pg_notify", LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU,
SYNC_HANDLER_NONE, true);
if (QUEUE_POS_IS_ZERO(queue_head))
slotno = SimpleLruZeroPage(NotifyCtl, pageno);
else
- slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
- InvalidTransactionId);
+ slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &queue_head);
/* Note we mark the page dirty before writing in it */
NotifyCtl->shared->page_dirty[slotno] = true;
alignas(AsyncQueueEntry) char local_buf[QUEUE_PAGESIZE];
char *local_buf_end = local_buf;
- slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
- InvalidTransactionId);
+ slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, current);
page_buffer = NotifyCtl->shared->page_buffer[slotno];
do
lock = SimpleLruGetBankLock(NotifyCtl, pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
- slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
- InvalidTransactionId);
+ slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &pos);
page_buffer = NotifyCtl->shared->page_buffer[slotno];
curpage = pageno;
}
static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact);
static bool SerialPagePrecedesLogically(int64 page1, int64 page2);
+static int serial_errdetail_for_io_error(const void *opaque_data);
static void SerialInit(void);
static void SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo);
static SerCommitSeqNo SerialGetMinConflictCommitSeqNo(TransactionId xid);
TransactionIdPrecedes(xid1, xid2 + SERIAL_ENTRIESPERPAGE - 1));
}
+static int
+serial_errdetail_for_io_error(const void *opaque_data)
+{
+ TransactionId xid = *(const TransactionId *) opaque_data;
+
+ return errdetail("Could not access serializable CSN of transaction %u.", xid);
+}
+
#ifdef USE_ASSERT_CHECKING
static void
SerialPagePrecedesLogicallyUnitTests(void)
* Set up SLRU management of the pg_serial data.
*/
SerialSlruCtl->PagePrecedes = SerialPagePrecedesLogically;
+ SerialSlruCtl->errdetail_for_io_error = serial_errdetail_for_io_error;
SimpleLruInit(SerialSlruCtl, "serializable",
serializable_buffers, 0, "pg_serial",
LWTRANCHE_SERIAL_BUFFER, LWTRANCHE_SERIAL_SLRU,
else
{
LWLockAcquire(lock, LW_EXCLUSIVE);
- slotno = SimpleLruReadPage(SerialSlruCtl, targetPage, true, xid);
+ slotno = SimpleLruReadPage(SerialSlruCtl, targetPage, true, &xid);
}
SerialValue(slotno, xid) = minConflictCommitSeqNo;
* but will return with that lock held, which must then be released.
*/
slotno = SimpleLruReadPage_ReadOnly(SerialSlruCtl,
- SerialPage(xid), xid);
+ SerialPage(xid), &xid);
val = SerialValue(slotno, xid);
LWLockRelease(SimpleLruGetBankLock(SerialSlruCtl, SerialPage(xid)));
return val;
#ifndef SLRU_H
#define SLRU_H
+#include "access/transam.h"
#include "access/xlogdefs.h"
#include "storage/lwlock.h"
#include "storage/sync.h"
*/
bool (*PagePrecedes) (int64, int64);
+ /*
+ * Callback to provide more details on an I/O error. This is called as
+ * part of ereport(), and the callback function is expected to call
+ * errdetail() to provide more context on the SLRU access.
+ *
+ * The opaque_data argument here is the argument that was passed to the
+ * SimpleLruReadPage() function.
+ */
+ int (*errdetail_for_io_error) (const void *opaque_data);
+
/*
* Dir is set during SimpleLruInit and does not change thereafter. Since
* it's always the same, it doesn't need to be in shared memory.
extern int SimpleLruZeroPage(SlruCtl ctl, int64 pageno);
extern void SimpleLruZeroAndWritePage(SlruCtl ctl, int64 pageno);
extern int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
- TransactionId xid);
+ const void *opaque_data);
extern int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno,
- TransactionId xid);
+ const void *opaque_data);
extern void SimpleLruWritePage(SlruCtl ctl, int slotno);
extern void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied);
#ifdef USE_ASSERT_CHECKING
t
(1 row)
+-- Test read failure
+SELECT test_slru_page_read(54321, false, '123'::xid);
+ERROR: could not open file "pg_test_slru/0000000000006A1": No such file or directory
+DETAIL: Could not access test_slru entry 123.
-- 48 extra pages
SELECT count(test_slru_page_write(a, 'Test SLRU'))
FROM generate_series(12346, 12393, 1) as a;
SELECT test_slru_page_read(12345);
SELECT test_slru_page_exists(12345);
+-- Test read failure
+SELECT test_slru_page_read(54321, false, '123'::xid);
+
-- 48 extra pages
SELECT count(test_slru_page_write(a, 'Test SLRU'))
FROM generate_series(12346, 12393, 1) as a;
AS 'MODULE_PATHNAME', 'test_slru_page_writeall' LANGUAGE C;
CREATE OR REPLACE FUNCTION test_slru_page_sync(bigint) RETURNS VOID
AS 'MODULE_PATHNAME', 'test_slru_page_sync' LANGUAGE C;
-CREATE OR REPLACE FUNCTION test_slru_page_read(bigint, bool DEFAULT true) RETURNS text
+CREATE OR REPLACE FUNCTION test_slru_page_read(bigint, bool DEFAULT true, xid DEFAULT '0') RETURNS text
AS 'MODULE_PATHNAME', 'test_slru_page_read' LANGUAGE C;
CREATE OR REPLACE FUNCTION test_slru_page_readonly(bigint) RETURNS text
AS 'MODULE_PATHNAME', 'test_slru_page_readonly' LANGUAGE C;
{
int64 pageno = PG_GETARG_INT64(0);
bool write_ok = PG_GETARG_BOOL(1);
+ TransactionId xid = PG_GETARG_TRANSACTIONID(2);
char *data = NULL;
int slotno;
LWLock *lock = SimpleLruGetBankLock(TestSlruCtl, pageno);
/* find page in buffers, reading it if necessary */
LWLockAcquire(lock, LW_EXCLUSIVE);
- slotno = SimpleLruReadPage(TestSlruCtl, pageno,
- write_ok, InvalidTransactionId);
+ slotno = SimpleLruReadPage(TestSlruCtl, pageno, write_ok, &xid);
data = (char *) TestSlruCtl->shared->page_buffer[slotno];
LWLockRelease(lock);
/* find page in buffers, reading it if necessary */
slotno = SimpleLruReadPage_ReadOnly(TestSlruCtl,
pageno,
- InvalidTransactionId);
+ NULL);
Assert(LWLockHeldByMe(lock));
data = (char *) TestSlruCtl->shared->page_buffer[slotno];
LWLockRelease(lock);
return page1 < page2;
}
+static int
+test_slru_errdetail_for_io_error(const void *opaque_data)
+{
+ TransactionId xid = *(const TransactionId *) opaque_data;
+
+ return errdetail("Could not access test_slru entry %u.", xid);
+}
+
static void
test_slru_shmem_startup(void)
{
}
TestSlruCtl->PagePrecedes = test_slru_page_precedes_logically;
+ TestSlruCtl->errdetail_for_io_error = test_slru_errdetail_for_io_error;
SimpleLruInit(TestSlruCtl, "TestSLRU",
NUM_TEST_BUFFERS, 0, slru_dir_name,
test_buffer_tranche_id, test_tranche_id, SYNC_HANDLER_NONE,
MultiSortSupportData
MultiXactId
MultiXactMember
+MultiXactMemberSlruReadContext
MultiXactOffset
MultiXactOffset32
MultiXactStateData