]> git.ipfire.org Git - thirdparty/postgresql.git/commitdiff
Implement a chunking protocol for writes to the syslogger pipe, with messages
authorAndrew Dunstan <andrew@dunslane.net>
Thu, 14 Jun 2007 01:50:14 +0000 (01:50 +0000)
committerAndrew Dunstan <andrew@dunslane.net>
Thu, 14 Jun 2007 01:50:14 +0000 (01:50 +0000)
reassembled in the syslogger before writing to the log file. This prevents
partial messages from being written, which mucks up log rotation, and
messages from different backends being interleaved, which causes garbled
logs. Backport as far as 8.0, where the syslogger was introduced.

Tom Lane and Andrew Dunstan

src/backend/postmaster/syslogger.c
src/backend/utils/error/elog.c
src/include/postmaster/syslogger.h

index 07c705896d628d76e036395206fff90b8dfc667b..6cb3a48baff72575f7bf7c861e53f64d07d0db55 100644 (file)
@@ -18,7 +18,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.20.2.1 2005/11/22 18:23:16 momjian Exp $
+ *       $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.20.2.2 2007/06/14 01:50:14 adunstan Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -31,6 +31,7 @@
 #include <sys/stat.h>
 #include <sys/time.h>
 
+#include "lib/stringinfo.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "postmaster/fork_process.h"
 #define LBF_MODE       _IOLBF
 #endif
 
+/* 
+ * We read() into a temp buffer twice as big as a chunk, so that any fragment
+ * left after processing can be moved down to the front and we'll still have
+ * room to read a full chunk.
+ */
+#define READ_BUF_SIZE (2 * PIPE_CHUNK_SIZE)
+
 
 /*
  * GUC parameters.     Redirect_stderr cannot be changed after postmaster
@@ -75,15 +83,28 @@ bool                am_syslogger = false;
  * Private state
  */
 static pg_time_t next_rotation_time;
-
 static bool redirection_done = false;
-
 static bool pipe_eof_seen = false;
-
 static FILE *syslogFile = NULL;
-
 static char *last_file_name = NULL;
 
+/* 
+ * Buffers for saving partial messages from different backends. We don't expect
+ * that there will be very many outstanding at one time, so 20 seems plenty of
+ * leeway. If this array gets full we won't lose messages, but we will lose
+ * the protocol protection against them being partially written or interleaved.
+ *
+ * An inactive buffer has pid == 0 and undefined contents of data.
+ */
+typedef struct
+{
+       int32   pid;                            /* PID of source process */
+       StringInfoData data;            /* accumulated data, as a StringInfo */
+} save_buffer;
+
+#define CHUNK_SLOTS 20
+static save_buffer saved_chunks[CHUNK_SLOTS];
+
 /* These must be exported for EXEC_BACKEND case ... annoying */
 #ifndef WIN32
 int                    syslogPipe[2] = {-1, -1};
@@ -95,6 +116,8 @@ HANDLE               syslogPipe[2] = {0, 0};
 static HANDLE threadHandle = 0;
 static CRITICAL_SECTION sysfileSection;
 #endif
+static void process_pipe_input(char *logbuffer, int *bytes_in_logbuffer);
+static void flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer);
 
 /*
  * Flags set by interrupt handlers for later service in the main loop.
@@ -127,6 +150,10 @@ static void sigUsr1Handler(SIGNAL_ARGS);
 NON_EXEC_STATIC void
 SysLoggerMain(int argc, char *argv[])
 {
+#ifndef WIN32
+       char            logbuffer[READ_BUF_SIZE];
+       int                     bytes_in_logbuffer = 0;
+#endif
        char       *currentLogDir;
        char       *currentLogFilename;
        int                     currentLogRotationAge;
@@ -238,7 +265,6 @@ SysLoggerMain(int argc, char *argv[])
                bool            time_based_rotation = false;
 
 #ifndef WIN32
-               char            logbuffer[1024];
                int                     bytesRead;
                int                     rc;
                fd_set          rfds;
@@ -320,8 +346,8 @@ SysLoggerMain(int argc, char *argv[])
                else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds))
                {
                        bytesRead = piperead(syslogPipe[0],
-                                                                logbuffer, sizeof(logbuffer));
-
+                                                                logbuffer + bytes_in_logbuffer,
+                                                                sizeof(logbuffer) - bytes_in_logbuffer);
                        if (bytesRead < 0)
                        {
                                if (errno != EINTR)
@@ -331,7 +357,8 @@ SysLoggerMain(int argc, char *argv[])
                        }
                        else if (bytesRead > 0)
                        {
-                               write_syslogger_file_binary(logbuffer, bytesRead);
+                               bytes_in_logbuffer += bytesRead;
+                               process_pipe_input(logbuffer, &bytes_in_logbuffer);
                                continue;
                        }
                        else
@@ -343,6 +370,9 @@ SysLoggerMain(int argc, char *argv[])
                                 * and all backends are shut down, and we are done.
                                 */
                                pipe_eof_seen = true;
+
+                               /* if there's any data left then force it out now */
+                               flush_pipe_input(logbuffer, &bytes_in_logbuffer);
                        }
                }
 #else                                                  /* WIN32 */
@@ -602,6 +632,207 @@ syslogger_parseArgs(int argc, char *argv[])
 #endif   /* EXEC_BACKEND */
 
 
+/* --------------------------------
+ *             pipe protocol handling
+ * --------------------------------
+ */
+
+/*
+ * Process data received through the syslogger pipe.
+ *
+ * This routine interprets the log pipe protocol which sends log messages as
+ * (hopefully atomic) chunks - such chunks are detected and reassembled here. 
+ *
+ * The protocol has a header that starts with two nul bytes, then has a 16 bit
+ * length, the pid of the sending process, and a flag to indicate if it is 
+ * the last chunk in a message. Incomplete chunks are saved until we read some
+ * more, and non-final chunks are accumulated until we get the final chunk.
+ *
+ * All of this is to avoid 2 problems:
+ * . partial messages being written to logfiles (messes rotation), and
+ * . messages from different backends being interleaved (messages garbled).
+ *
+ * Any non-protocol messages are written out directly. These should only come
+ * from non-PostgreSQL sources, however (e.g. third party libraries writing to
+ * stderr).
+ *
+ * logbuffer is the data input buffer, and *bytes_in_logbuffer is the number
+ * of bytes present.  On exit, any not-yet-eaten data is left-justified in
+ * logbuffer, and *bytes_in_logbuffer is updated.
+ */
+static void
+process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
+{
+       char   *cursor = logbuffer;
+       int             count = *bytes_in_logbuffer;
+
+       /* While we have enough for a header, process data... */
+       while (count >= (int) sizeof(PipeProtoHeader))
+       {
+               PipeProtoHeader p;
+               int             chunklen;
+
+               /* Do we have a valid header? */
+               memcpy(&p, cursor, sizeof(PipeProtoHeader));
+               if (p.nuls[0] == '\0' && p.nuls[1] == '\0' &&
+                       p.len > 0 && p.len <= PIPE_MAX_PAYLOAD &&
+                       p.pid != 0 &&
+                       (p.is_last == 't' || p.is_last == 'f'))
+               {
+                       chunklen = PIPE_HEADER_SIZE + p.len;
+
+                       /* Fall out of loop if we don't have the whole chunk yet */
+                       if (count < chunklen)
+                               break;
+
+                       if (p.is_last == 'f')
+                       {
+                               /* 
+                                * Save a complete non-final chunk in the per-pid buffer 
+                                * if possible - if not just write it out.
+                                */
+                               int free_slot = -1, existing_slot = -1;
+                               int i;
+                               StringInfo str;
+
+                               for (i = 0; i < CHUNK_SLOTS; i++)
+                               {
+                                       if (saved_chunks[i].pid == p.pid)
+                                       {
+                                               existing_slot = i;
+                                               break;
+                                       }
+                                       if (free_slot < 0 && saved_chunks[i].pid == 0)
+                                               free_slot = i;
+                               }
+                               if (existing_slot >= 0)
+                               {
+                                       str = &(saved_chunks[existing_slot].data);
+                                       appendBinaryStringInfo(str,
+                                                                                  cursor + PIPE_HEADER_SIZE, 
+                                                                                  p.len);
+                               }
+                               else if (free_slot >= 0)
+                               {
+                                       saved_chunks[free_slot].pid = p.pid;
+                                       str = &(saved_chunks[free_slot].data);
+                                       initStringInfo(str);
+                                       appendBinaryStringInfo(str,
+                                                                                  cursor + PIPE_HEADER_SIZE, 
+                                                                                  p.len);
+                               }
+                               else
+                               {
+                                       /* 
+                                        * If there is no free slot we'll just have to take our
+                                        * chances and write out a partial message and hope that
+                                        * it's not followed by something from another pid.
+                                        */
+                                       write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len);
+                               }
+                       }
+                       else
+                       {
+                               /* 
+                                * Final chunk --- add it to anything saved for that pid, and
+                                * either way write the whole thing out.
+                                */
+                               int existing_slot = -1;
+                               int i;
+                               StringInfo str;
+
+                               for (i = 0; i < CHUNK_SLOTS; i++)
+                               {
+                                       if (saved_chunks[i].pid == p.pid)
+                                       {
+                                               existing_slot = i;
+                                               break;
+                                       }
+                               }
+                               if (existing_slot >= 0)
+                               {
+                                       str = &(saved_chunks[existing_slot].data);
+                                       appendBinaryStringInfo(str,
+                                                                                  cursor + PIPE_HEADER_SIZE,
+                                                                                  p.len);
+                                       write_syslogger_file(str->data, str->len);
+                                       saved_chunks[existing_slot].pid = 0;
+                                       pfree(str->data);
+                               }
+                               else
+                               {
+                                       /* The whole message was one chunk, evidently. */
+                                       write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len);
+                               }
+                       }
+
+                       /* Finished processing this chunk */
+                       cursor += chunklen;
+                       count -= chunklen;
+               }
+               else 
+               {
+                       /* Process non-protocol data */
+
+                       /*
+                        * Look for the start of a protocol header.  If found, dump data
+                        * up to there and repeat the loop.  Otherwise, dump it all and
+                        * fall out of the loop.  (Note: we want to dump it all if
+                        * at all possible, so as to avoid dividing non-protocol messages
+                        * across logfiles.  We expect that in many scenarios, a
+                        * non-protocol message will arrive all in one read(), and we
+                        * want to respect the read() boundary if possible.)
+                        */
+                       for (chunklen = 1; chunklen < count; chunklen++)
+                       {
+                               if (cursor[chunklen] == '\0')
+                                       break;
+                       }
+                       write_syslogger_file(cursor, chunklen);
+                       cursor += chunklen;
+                       count -= chunklen;
+               }
+       }
+
+       /* We don't have a full chunk, so left-align what remains in the buffer */
+       if (count > 0 && cursor != logbuffer)
+               memmove(logbuffer, cursor, count);
+       *bytes_in_logbuffer = count;
+}
+
+/*
+ * Force out any buffered data
+ *
+ * This is currently used only at syslogger shutdown, but could perhaps be
+ * useful at other times, so it is careful to leave things in a clean state.
+ */
+static void
+flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
+{
+       int i;
+       StringInfo str;
+
+       /* Dump any incomplete protocol messages */
+       for (i = 0; i < CHUNK_SLOTS; i++)
+       {
+               if (saved_chunks[i].pid != 0)
+               {
+                       str = &(saved_chunks[i].data);
+                       write_syslogger_file(str->data, str->len);
+                       saved_chunks[i].pid = 0;
+                       pfree(str->data);
+               }
+       }
+       /*
+        * Force out any remaining pipe data as-is; we don't bother trying to
+        * remove any protocol headers that may exist in it.
+        */
+       if (*bytes_in_logbuffer > 0)
+               write_syslogger_file(logbuffer, *bytes_in_logbuffer);
+       *bytes_in_logbuffer = 0;
+}
+
+
 /* --------------------------------
  *             logfile routines
  * --------------------------------
@@ -687,12 +918,16 @@ write_syslogger_file_binary(const char *buffer, int count)
 static unsigned int __stdcall
 pipeThread(void *arg)
 {
-       DWORD           bytesRead;
-       char            logbuffer[1024];
+       char            logbuffer[READ_BUF_SIZE];
+       int                     bytes_in_logbuffer = 0;
 
        for (;;)
        {
-               if (!ReadFile(syslogPipe[0], logbuffer, sizeof(logbuffer),
+               DWORD           bytesRead;
+
+               if (!ReadFile(syslogPipe[0],
+                                         logbuffer + bytes_in_logbuffer,
+                                         sizeof(logbuffer) - bytes_in_logbuffer,
                                          &bytesRead, 0))
                {
                        DWORD           error = GetLastError();
@@ -706,11 +941,18 @@ pipeThread(void *arg)
                                         errmsg("could not read from logger pipe: %m")));
                }
                else if (bytesRead > 0)
-                       write_syslogger_file_binary(logbuffer, bytesRead);
+               {
+                       bytes_in_logbuffer += bytesRead;
+                       process_pipe_input(logbuffer, &bytes_in_logbuffer);
+               }
        }
 
        /* We exit the above loop only upon detecting pipe EOF */
        pipe_eof_seen = true;
+
+       /* if there's any data left then force it out now */
+       flush_pipe_input(logbuffer, &bytes_in_logbuffer);
+       
        _endthread();
        return 0;
 }
index a5c61858a85a7aa58927d9218ea2e55cef78d0fc..c91531ee27c235a82bd8693cd90cab87f41f44ee 100644 (file)
@@ -42,7 +42,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/utils/error/elog.c,v 1.167.2.2 2007/02/11 15:12:48 mha Exp $
+ *       $PostgreSQL: pgsql/src/backend/utils/error/elog.c,v 1.167.2.3 2007/06/14 01:50:14 adunstan Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -122,7 +122,7 @@ static char *expand_fmt_string(const char *fmt, ErrorData *edata);
 static const char *useful_strerror(int errnum);
 static const char *error_severity(int elevel);
 static void append_with_tabs(StringInfo buf, const char *str);
-
+static void write_pipe_chunks(int fd, char *data, int len);
 
 /*
  * errstart --- begin an error-reporting cycle
@@ -1710,7 +1710,10 @@ send_message_to_server_log(ErrorData *edata)
                        write_eventlog(edata->elevel, buf.data);
                else
 #endif
-                       fprintf(stderr, "%s", buf.data);
+                       if (Redirect_stderr)
+                               write_pipe_chunks(fileno(stderr), buf.data, buf.len);
+                       else
+                               write(fileno(stderr), buf.data, buf.len);
        }
 
        /* If in the syslogger process, try to write messages direct to file */
@@ -1720,6 +1723,37 @@ send_message_to_server_log(ErrorData *edata)
        pfree(buf.data);
 }
 
+/*
+ * Send data to the syslogger using the chunked protocol
+ */
+static void
+write_pipe_chunks(int fd, char *data, int len)
+{
+       PipeProtoChunk p;
+
+       Assert(len > 0);
+
+       p.proto.nuls[0] = p.proto.nuls[1] = '\0';
+       p.proto.pid = MyProcPid;
+
+       /* write all but the last chunk */
+       while (len > PIPE_MAX_PAYLOAD)
+       {
+               p.proto.is_last = 'f';
+               p.proto.len = PIPE_MAX_PAYLOAD;
+               memcpy(p.proto.data, data, PIPE_MAX_PAYLOAD);
+               write(fd, &p, PIPE_HEADER_SIZE + PIPE_MAX_PAYLOAD);
+               data += PIPE_MAX_PAYLOAD;
+               len -= PIPE_MAX_PAYLOAD;
+       }
+
+       /* write the last chunk */
+       p.proto.is_last = 't';
+       p.proto.len = len;
+       memcpy(p.proto.data, data, len);
+       write(fd, &p, PIPE_HEADER_SIZE + len);
+}
+
 
 /*
  * Write error report to client
@@ -2043,6 +2077,7 @@ write_stderr(const char *fmt,...)
 #ifndef WIN32
        /* On Unix, we just fprintf to stderr */
        vfprintf(stderr, fmt, ap);
+       fflush(stderr);
 #else
 
        /*
@@ -2058,8 +2093,11 @@ write_stderr(const char *fmt,...)
                write_eventlog(EVENTLOG_ERROR_TYPE, errbuf);
        }
        else
+       {
                /* Not running as service, write to stderr */
                vfprintf(stderr, fmt, ap);
+               fflush(stderr);
+       }
 #endif
        va_end(ap);
 }
index 6a8c2b712618663436efda7969a373c6fc13fec4..bbbee5732adc87edfb4759ce7cf402230db92d3f 100644 (file)
@@ -5,13 +5,61 @@
  *
  * Copyright (c) 2004-2005, PostgreSQL Global Development Group
  *
- * $PostgreSQL: pgsql/src/include/postmaster/syslogger.h,v 1.5 2005/10/15 02:49:46 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/postmaster/syslogger.h,v 1.5.2.1 2007/06/14 01:50:14 adunstan Exp $
  *
  *-------------------------------------------------------------------------
  */
 #ifndef _SYSLOGGER_H
 #define _SYSLOGGER_H
 
+#include <limits.h>                            /* for PIPE_BUF */
+
+
+/* 
+ * Primitive protocol structure for writing to syslogger pipe(s).  The idea
+ * here is to divide long messages into chunks that are not more than
+ * PIPE_BUF bytes long, which according to POSIX spec must be written into
+ * the pipe atomically.  The pipe reader then uses the protocol headers to
+ * reassemble the parts of a message into a single string.  The reader can
+ * also cope with non-protocol data coming down the pipe, though we cannot
+ * guarantee long strings won't get split apart.
+ *
+ * We use 't' or 'f' instead of a bool for is_last to make the protocol a tiny
+ * bit more robust against finding a false double nul byte prologue.  But we
+ * still might find it in the len and/or pid bytes unless we're careful.
+ */
+
+#ifdef PIPE_BUF
+/* Are there any systems with PIPE_BUF > 64K?  Unlikely, but ... */
+#if PIPE_BUF > 65536
+#define PIPE_CHUNK_SIZE  65536
+#else
+#define PIPE_CHUNK_SIZE  ((int) PIPE_BUF)
+#endif
+#else  /* not defined */
+/* POSIX says the value of PIPE_BUF must be at least 512, so use that */
+#define PIPE_CHUNK_SIZE  512
+#endif
+
+typedef struct 
+{
+       char            nuls[2];                /* always \0\0 */
+       uint16          len;                    /* size of this chunk (counts data only) */
+       int32           pid;                    /* writer's pid */
+       char            is_last;                /* last chunk of message? 't' or 'f' */
+       char            data[1];                /* data payload starts here */
+} PipeProtoHeader;
+
+typedef union
+{
+       PipeProtoHeader proto;
+       char            filler[PIPE_CHUNK_SIZE];
+} PipeProtoChunk;
+
+#define PIPE_HEADER_SIZE  offsetof(PipeProtoHeader, data)
+#define PIPE_MAX_PAYLOAD  ((int) (PIPE_CHUNK_SIZE - PIPE_HEADER_SIZE))
+
+
 /* GUC options */
 extern bool Redirect_stderr;
 extern int     Log_RotationAge;