]> git.ipfire.org Git - thirdparty/bacula.git/commitdiff
Backport bpipe enhancements
authorKern Sibbald <kern@sibbald.com>
Wed, 8 Aug 2018 09:01:26 +0000 (11:01 +0200)
committerKern Sibbald <kern@sibbald.com>
Wed, 8 Aug 2018 09:01:26 +0000 (11:01 +0200)
bacula/src/lib/bpipe.c
bacula/src/lib/bpipe.h
bacula/src/plugins/fd/bpipe-fd.c
bacula/src/plugins/fd/fd_common.h
regress/tests/bpipe-test

index 95e9f471fa73151a5261c29c5f5534a408aa8f2a..bcf64b2f0de1a185adf02e51648ff3c6fa684259 100644 (file)
@@ -1,7 +1,7 @@
 /*
    Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2000-2017 Kern Sibbald
+   Copyright (C) 2000-2018 Kern Sibbald
 
    The original author of Bacula is Kern Sibbald, with contributions
    from many others, a complete list can be found in the file AUTHORS.
@@ -26,6 +26,7 @@
 
 #include "bacula.h"
 #include "jcr.h"
+
 #ifdef HAVE_GETRLIMIT
 #include <sys/resource.h>
 #else
@@ -51,6 +52,11 @@ int num_execvp_errors = (int)(sizeof(execvp_errors)/sizeof(int));
 
 #define MAX_ARGV 100
 
+#define MODE_READ 1
+#define MODE_WRITE 2
+#define MODE_SHELL 4
+#define MODE_STDERR 8
+
 #if !defined(HAVE_WIN32)
 static void build_argc_argv(char *cmd, int *bargc, char *bargv[], int max_arg);
 
@@ -73,34 +79,32 @@ BPIPE *open_bpipe(char *prog, int wait, const char *mode, char *envp[])
 {
    char *bargv[MAX_ARGV];
    int bargc, i;
-   int readp[2], writep[2];
+   int readp[2], writep[2], errp[2];
    POOLMEM *tprog;
-   int mode_read, mode_write, mode_shell;
+   int mode_map = 0;
    BPIPE *bpipe;
    int save_errno;
-#if !defined(HAVE_FCNTL_F_CLOSEM) && !defined(HAVE_CLOSEFROM)
    struct rlimit rl;
    int64_t rlimitResult=0;
-#endif
 
    if (!prog || !*prog) {
       /* execve(3) A component of the file does not name an existing file or file is an empty string. */
       errno = ENOENT; 
       return NULL;
    }
-   
+
    bpipe = (BPIPE *)malloc(sizeof(BPIPE));
    memset(bpipe, 0, sizeof(BPIPE));
-   mode_read = (mode[0] == 'r');
-   mode_write = (mode[0] == 'w' || mode[1] == 'w');
-   /* mode is at least 2 bytes long, can be 3, rs, rws, ws */
-   mode_shell = (mode[1] == 's' || (mode[1] && mode[2] == 's'));
+   if (strchr(mode,'r')) mode_map|=MODE_READ;
+   if (strchr(mode,'w')) mode_map|=MODE_WRITE;
+   if (strchr(mode,'s')) mode_map|=MODE_SHELL;
+   if (strchr(mode,'e')) mode_map|=MODE_STDERR;
+
    /* Build arguments for running program. */
    tprog = get_pool_memory(PM_FNAME);
    pm_strcpy(tprog, prog);
-   if (mode_shell) {
+   if (mode_map & MODE_SHELL) {
       build_sh_argc_argv(tprog, &bargc, bargv, MAX_ARGV);
-
    } else {
       build_argc_argv(tprog, &bargc, bargv, MAX_ARGV);
    }
@@ -122,16 +126,16 @@ BPIPE *open_bpipe(char *prog, int wait, const char *mode, char *envp[])
 #endif
 
    /* Each pipe is one way, write one end, read the other, so we need two */
-   if (mode_write && pipe(writep) == -1) {
+   if ((mode_map & MODE_WRITE) && pipe(writep) == -1) {
       save_errno = errno;
       free(bpipe);
       free_pool_memory(tprog);
       errno = save_errno;
       return NULL;
    }
-   if (mode_read && pipe(readp) == -1) {
+   if ((mode_map & MODE_READ) && pipe(readp) == -1) {
       save_errno = errno;
-      if (mode_write) {
+      if (mode_map & MODE_WRITE) {
          close(writep[0]);
          close(writep[1]);
       }
@@ -140,6 +144,21 @@ BPIPE *open_bpipe(char *prog, int wait, const char *mode, char *envp[])
       errno = save_errno;
       return NULL;
    }
+   if ((mode_map & MODE_STDERR) && pipe(errp) == -1) {
+      save_errno = errno;
+      if (mode_map & MODE_WRITE) {
+         close(writep[0]);
+         close(writep[1]);
+      }
+      if (mode_map & MODE_READ) {
+         close(readp[0]);
+         close(readp[1]);
+      }
+      free(bpipe);
+      free_pool_memory(tprog);
+      errno = save_errno;
+      return NULL;
+   }
 
    /* Many systems doesn't have the correct system call
     * to determine the FD list to close.
@@ -156,28 +175,37 @@ BPIPE *open_bpipe(char *prog, int wait, const char *mode, char *envp[])
    switch (bpipe->worker_pid = fork()) {
    case -1:                           /* error */
       save_errno = errno;
-      if (mode_write) {
+      if (mode_map & MODE_WRITE) {
          close(writep[0]);
          close(writep[1]);
       }
-      if (mode_read) {
+      if (mode_map & MODE_READ) {
          close(readp[0]);
          close(readp[1]);
       }
+      if (mode_map & MODE_STDERR) {
+         close(errp[0]);
+         close(errp[1]);
+      }
       free(bpipe);
       free_pool_memory(tprog);
       errno = save_errno;
       return NULL;
 
    case 0:                            /* child */
-      if (mode_write) {
+      if (mode_map & MODE_WRITE) {
          close(writep[1]);
          dup2(writep[0], 0);          /* Dup our write to his stdin */
       }
-      if (mode_read) {
+      if (mode_map & MODE_READ) {
          close(readp[0]);             /* Close unused child fds */
          dup2(readp[1], 1);           /* dup our read to his stdout */
-         dup2(readp[1], 2);           /*   and his stderr */
+         if (mode_map & MODE_STDERR) {  /*   and handle stderr */
+            close(errp[0]); 
+            dup2(errp[1], 2);
+         } else {
+            dup2(readp[1], 2);
+         }
       }
 
 #if HAVE_FCNTL_F_CLOSEM
@@ -210,11 +238,15 @@ BPIPE *open_bpipe(char *prog, int wait, const char *mode, char *envp[])
       break;
    }
    free_pool_memory(tprog);
-   if (mode_read) {
+   if (mode_map & MODE_READ) {
       close(readp[1]);                /* close unused parent fds */
       bpipe->rfd = fdopen(readp[0], "r"); /* open file descriptor */
    }
-   if (mode_write) {
+   if (mode_map & MODE_STDERR) {
+      close(errp[1]);                /* close unused parent fds */
+      bpipe->efd = fdopen(errp[0], "r"); /* open file descriptor */
+   }
+   if (mode_map & MODE_WRITE) {
       close(writep[0]);
       bpipe->wfd = fdopen(writep[1], "w");
    }
@@ -226,7 +258,8 @@ BPIPE *open_bpipe(char *prog, int wait, const char *mode, char *envp[])
    return bpipe;
 }
 
-/* Close the write pipe only */
+/* Close the write pipe only
+ * BE careful ! return 1 if ok */
 int close_wpipe(BPIPE *bpipe)
 {
    int stat = 1;
@@ -241,6 +274,20 @@ int close_wpipe(BPIPE *bpipe)
    return stat;
 }
 
+/* Close the stderror pipe only */
+int close_epipe(BPIPE *bpipe)
+{
+   int stat = 1;
+
+   if (bpipe->efd) {
+      if (fclose(bpipe->efd) != 0) {
+         stat = 0;
+      }
+      bpipe->efd = NULL;
+   }
+   return stat;
+}
+
 /*
  * Close both pipes and free resources
  *
@@ -265,6 +312,10 @@ int close_bpipe(BPIPE *bpipe)
       fclose(bpipe->wfd);
       bpipe->wfd = NULL;
    }
+   if (bpipe->efd) {
+      fclose(bpipe->efd);
+      bpipe->efd = NULL;
+   }
 
    if (bpipe->wait == 0) {
       wait_option = 0;                /* wait indefinitely */
index 9da6745b37359afd536dac47a707f3e0a70092f2..7d7786a2d80440d1af7f89e251b92790c29cfc1a 100644 (file)
@@ -1,7 +1,7 @@
 /*
    Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2000-2016 Kern Sibbald
+   Copyright (C) 2000-2018 Kern Sibbald
 
    The original author of Bacula is Kern Sibbald, with contributions
    from many others, a complete list can be found in the file AUTHORS.
@@ -28,4 +28,5 @@ public:
    btimer_t *timer_id;
    FILE *rfd;
    FILE *wfd;
+   FILE *efd;
 };
index b01ee9c47c24ac6a613af84e2794b6591889a494..df3695f2dc2c58c6b45576e628989d09ecc8285a 100644 (file)
@@ -1,7 +1,7 @@
 /*
    Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2000-2015 Kern Sibbald
+   Copyright (C) 2000-2018 Kern Sibbald
 
    The original author of Bacula is Kern Sibbald, with contributions
    from many others, a complete list can be found in the file AUTHORS.
  *
  */
 #include "bacula.h"
+#define USE_FULL_WRITE
+#include "fd_common.h"
 #include "fd_plugins.h"
+#include "lib/ini.h"
 
 #undef malloc
 #undef free
@@ -97,7 +100,8 @@ static pFuncs pluginFuncs = {
    createFile,
    setFileAttributes,
    checkFile,
-   handleXACLdata
+   handleXACLdata,
+   NULL                          /* No checkStream */
 };
 
 /*
@@ -106,14 +110,21 @@ static pFuncs pluginFuncs = {
 struct plugin_ctx {
    boffset_t offset;
    BPIPE *pfd;                        /* bpipe file descriptor */
-   bool backup;                       /* set for backup (not needed) */
+   int  efd;                          /* stderr */
+   int  rfd;                          /* stdout */
+   int  wfd;                          /* stdin */
+   int  maxfd;                        /* max(stderr, stdout) */
+   bool backup;                       /* set when the backup is done */
+   bool canceled;
    char *cmd;                         /* plugin command line */
    char *fname;                       /* filename to "backup/restore" */
    char *reader;                      /* reader program for backup */
    char *writer;                      /* writer program for backup */
-
    char where[512];
    int replace;
+   int job_level;
+   int estimate_mode;
+   int64_t total_bytes;         /* number of bytes read/write */
 };
 
 /*
@@ -135,16 +146,16 @@ bRC loadPlugin(bInfo *lbinfo, bFuncs *lbfuncs, pInfo **pinfo, pFuncs **pfuncs)
 }
 
 /*
- * External entry point to unload the plugin
+ * External entry point to unload the plugin 
  */
-bRC unloadPlugin()
+bRC unloadPlugin() 
 {
 // printf("bpipe-fd: Unloaded\n");
    return bRC_OK;
 }
 
 /*
- * The following entry points are accessed through the function
+ * The following entry points are accessed through the function 
  *   pointers we supplied to Bacula. Each plugin type (dir, fd, sd)
  *   has its own set of entry points that the plugin must define.
  */
@@ -182,7 +193,7 @@ static bRC freePlugin(bpContext *ctx)
 /*
  * Return some plugin value (none defined)
  */
-static bRC getPluginValue(bpContext *ctx, pVariable var, void *value)
+static bRC getPluginValue(bpContext *ctx, pVariable var, void *value) 
 {
    return bRC_OK;
 }
@@ -190,7 +201,7 @@ static bRC getPluginValue(bpContext *ctx, pVariable var, void *value)
 /*
  * Set a plugin value (none defined)
  */
-static bRC setPluginValue(bpContext *ctx, pVariable var, void *value)
+static bRC setPluginValue(bpContext *ctx, pVariable var, void *value) 
 {
    return bRC_OK;
 }
@@ -201,6 +212,7 @@ static bRC setPluginValue(bpContext *ctx, pVariable var, void *value)
 static bRC handlePluginEvent(bpContext *ctx, bEvent *event, void *value)
 {
    struct plugin_ctx *p_ctx = (struct plugin_ctx *)ctx->pContext;
+
    if (!p_ctx) {
       return bRC_Error;
    }
@@ -213,8 +225,16 @@ static bRC handlePluginEvent(bpContext *ctx, bEvent *event, void *value)
     *   what is really going on.
     */
    switch (event->eventType) {
+   case bEventLevel:
+      p_ctx->job_level = ((intptr_t)value);
+      break;
+
+   case bEventCancelCommand:
+      p_ctx->canceled = true;
+      break;
+
    case bEventPluginCommand:
-      bfuncs->DebugMessage(ctx, fi, li, dbglvl,
+      bfuncs->DebugMessage(ctx, fi, li, dbglvl, 
                            "bpipe-fd: PluginCommand=%s\n", (char *)value);
       break;
    case bEventJobStart:
@@ -229,13 +249,9 @@ static bRC handlePluginEvent(bpContext *ctx, bEvent *event, void *value)
    case bEventEndBackupJob:
 //    printf("bpipe-fd: EndBackupJob\n");
       break;
-   case bEventLevel:
-//    printf("bpipe-fd: JobLevel=%c %d\n", (int)value, (int)value);
-      break;
    case bEventSince:
 //    printf("bpipe-fd: since=%d\n", (int)value);
       break;
-
    case bEventStartRestoreJob:
 //    printf("bpipe-fd: StartRestoreJob\n");
       break;
@@ -245,14 +261,16 @@ static bRC handlePluginEvent(bpContext *ctx, bEvent *event, void *value)
       break;
 
    /* Plugin command e.g. plugin = <plugin-name>:<name-space>:read command:write command */
+   case bEventEstimateCommand:
+      p_ctx->estimate_mode = true;
+      /* Fall-through wanted */
    case bEventRestoreCommand:
 //    printf("bpipe-fd: EventRestoreCommand cmd=%s\n", (char *)value);
       /* Fall-through wanted */
-   case bEventEstimateCommand:
-      /* Fall-through wanted */
    case bEventBackupCommand:
       char *p;
       bfuncs->DebugMessage(ctx, fi, li, dbglvl, "bpipe-fd: pluginEvent cmd=%s\n", (char *)value);
+      p_ctx->backup = false;
       p_ctx->cmd = strdup((char *)value);
       p = strchr(p_ctx->cmd, ':');
       if (!p) {
@@ -275,7 +293,8 @@ static bRC handlePluginEvent(bpContext *ctx, bEvent *event, void *value)
       }
       *p++ = 0;           /* terminate reader string */
       p_ctx->writer = p;
-//    printf("bpipe-fd: plugin=%s fname=%s reader=%s writer=%s\n",
+
+//    printf("bpipe-fd: plugin=%s fname=%s reader=%s writer=%s\n", 
 //         p_ctx->cmd, p_ctx->fname, p_ctx->reader, p_ctx->writer);
       break;
 
@@ -286,7 +305,8 @@ static bRC handlePluginEvent(bpContext *ctx, bEvent *event, void *value)
    return bRC_OK;
 }
 
-/*
+
+/* 
  * Start the backup of a specific file
  */
 static bRC startBackupFile(bpContext *ctx, struct save_pkt *sp)
@@ -295,6 +315,7 @@ static bRC startBackupFile(bpContext *ctx, struct save_pkt *sp)
    if (!p_ctx) {
       return bRC_Error;
    }
+
    time_t now = time(NULL);
    sp->fname = p_ctx->fname;
    sp->type = FT_REG;
@@ -315,19 +336,37 @@ static bRC startBackupFile(bpContext *ctx, struct save_pkt *sp)
  */
 static bRC endBackupFile(bpContext *ctx)
 {
+   struct plugin_ctx *p_ctx = (struct plugin_ctx *)ctx->pContext;
+   if (!p_ctx) {
+      return bRC_Error;
+   }
+
    /*
     * We would return bRC_More if we wanted startBackupFile to be
     * called again to backup another file
     */
+   if (!p_ctx->backup) {
+      return bRC_More;
+   }
    return bRC_OK;
 }
 
+static void send_log(bpContext *ctx, char *buf)
+{
+   struct plugin_ctx *p_ctx = (struct plugin_ctx *)ctx->pContext;
+   strip_trailing_newline(buf);
+   bfuncs->JobMessage(ctx, fi, li, M_INFO, 0, "%s: %s\n", p_ctx->fname, buf);
+}
 
 /*
  * Bacula is calling us to do the actual I/O
  */
 static bRC pluginIO(bpContext *ctx, struct io_pkt *io)
 {
+   fd_set rfds;
+   fd_set wfds;
+   bool ok=false;
+   char buf[1024];
    struct plugin_ctx *p_ctx = (struct plugin_ctx *)ctx->pContext;
    if (!p_ctx) {
       return bRC_Error;
@@ -337,16 +376,18 @@ static bRC pluginIO(bpContext *ctx, struct io_pkt *io)
    io->io_errno = 0;
    switch(io->func) {
    case IO_OPEN:
+      p_ctx->total_bytes = 0;
+      p_ctx->wfd = p_ctx->efd = p_ctx->rfd = -1;
       bfuncs->DebugMessage(ctx, fi, li, dbglvl, "bpipe-fd: IO_OPEN\n");
       if (io->flags & (O_CREAT | O_WRONLY)) {
          char *writer_codes = apply_rp_codes(p_ctx);
 
-         p_ctx->pfd = open_bpipe(writer_codes, 0, "ws");
-         bfuncs->DebugMessage(ctx, fi, li, dbglvl, "bpipe-fd: IO_OPEN fd=%p writer=%s\n",
+         p_ctx->pfd = open_bpipe(writer_codes, 0, "rws");
+         bfuncs->DebugMessage(ctx, fi, li, dbglvl, "bpipe-fd: IO_OPEN fd=%p writer=%s\n", 
              p_ctx->pfd, writer_codes);
          if (!p_ctx->pfd) {
             io->io_errno = errno;
-            bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0,
+            bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0, 
                "Open pipe writer=%s failed: ERR=%s\n", writer_codes, strerror(errno));
             if (writer_codes) {
                free(writer_codes);
@@ -356,18 +397,28 @@ static bRC pluginIO(bpContext *ctx, struct io_pkt *io)
          if (writer_codes) {
             free(writer_codes);
          }
-         io->status = fileno(p_ctx->pfd->wfd);
+         /* We need to read from stdout/stderr for messages to display to the user */
+         p_ctx->rfd = fileno(p_ctx->pfd->rfd);
+         p_ctx->wfd = fileno(p_ctx->pfd->wfd);
+         p_ctx->maxfd = MAX(p_ctx->wfd, p_ctx->rfd);
+         io->status = p_ctx->wfd;
+
       } else {
-         p_ctx->pfd = open_bpipe(p_ctx->reader, 0, "rs");
-         bfuncs->DebugMessage(ctx, fi, li, dbglvl, "bpipe-fd: IO_OPEN fd=%p reader=%s\n",
+         /* Use shell mode and split stderr/stdout */
+         p_ctx->pfd = open_bpipe(p_ctx->reader, 0, "rse");
+         bfuncs->DebugMessage(ctx, fi, li, dbglvl, "bpipe-fd: IO_OPEN fd=%p reader=%s\n", 
             p_ctx->pfd, p_ctx->reader);
          if (!p_ctx->pfd) {
             io->io_errno = errno;
-            bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0,
+            bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0, 
                "Open pipe reader=%s failed: ERR=%s\n", p_ctx->reader, strerror(errno));
             return bRC_Error;
          }
-         io->status = fileno(p_ctx->pfd->rfd);
+         /* We need to read from stderr for job log and stdout for the data */
+         p_ctx->efd = fileno(p_ctx->pfd->efd);
+         p_ctx->rfd = fileno(p_ctx->pfd->rfd);
+         p_ctx->maxfd = MAX(p_ctx->efd, p_ctx->rfd);
+         io->status = p_ctx->rfd;
       }
       sleep(1);                 /* let pipe connect */
       break;
@@ -377,15 +428,41 @@ static bRC pluginIO(bpContext *ctx, struct io_pkt *io)
          bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0, "Logic error: NULL read FD\n");
          return bRC_Error;
       }
-      io->status = fread(io->buf, 1, io->count, p_ctx->pfd->rfd);
+
+      /* We first try to read stderr, but keep monitoring for data on stdout (when stderr is empty) */
+      while (!p_ctx->canceled) {
+         FD_ZERO(&rfds);
+         FD_SET(p_ctx->rfd, &rfds);
+         FD_SET(p_ctx->efd, &rfds); 
+         select(p_ctx->maxfd+1, &rfds, NULL, NULL, NULL);
+
+         if (!FD_ISSET(p_ctx->efd, &rfds)) {
+            /* nothing in stderr, then we should have something in stdout */
+            break;
+         }
+         int ret = read(p_ctx->efd, buf, sizeof(buf));
+         if (ret <= 0) {
+            /* stderr is closed or in error, stdout should be in the same state */
+            /* let handle it at the stdout level */
+            break;
+         }
+         /* TODO: buffer and split lines */
+         buf[ret]=0;
+         send_log(ctx, buf);
+      }
+
+      io->status = read(p_ctx->rfd, io->buf, io->count);
 //    bfuncs->DebugMessage(ctx, fi, li, dbglvl, "bpipe-fd: IO_READ buf=%p len=%d\n", io->buf, io->status);
-      if (!feof(p_ctx->pfd->rfd) && io->status == 0 && ferror(p_ctx->pfd->rfd)) {
-         bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0,
-            "Pipe read error: ERR=%s\n", strerror(errno));
-         bfuncs->DebugMessage(ctx, fi, li, dbglvl,
-            "Pipe read error: ERR=%s\n", strerror(errno));
+      if (io->status < 0) {
+         berrno be;
+         bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0, 
+                            "Pipe read error: ERR=%s\n", be.bstrerror());
+         bfuncs->DebugMessage(ctx, fi, li, dbglvl, 
+                              "Pipe read error: count=%lld errno=%d ERR=%s\n",
+                              p_ctx->total_bytes, (int)errno, be.bstrerror());
          return bRC_Error;
       }
+      p_ctx->total_bytes += io->status;
       break;
 
    case IO_WRITE:
@@ -393,16 +470,44 @@ static bRC pluginIO(bpContext *ctx, struct io_pkt *io)
          bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0, "Logic error: NULL write FD\n");
          return bRC_Error;
       }
+
+      /* When we write, we must check for the error channel (stdout+stderr) as well */
+      while (!ok && !p_ctx->canceled) {
+         FD_ZERO(&wfds);
+         FD_SET(p_ctx->wfd, &wfds);
+         FD_ZERO(&rfds);
+         FD_SET(p_ctx->rfd, &rfds);
+
+         select(p_ctx->maxfd+1, &rfds, &wfds, NULL, NULL);
+
+         if (FD_ISSET(p_ctx->rfd, &rfds)) {
+            int ret = read(p_ctx->rfd, buf, sizeof(buf)); /* TODO: simulate fgets() */
+            if (ret > 0) {
+               buf[ret]=0;
+               send_log(ctx, buf); 
+            } else {
+               ok = true;       /* nothing to read */
+            }
+         }
+
+         if (FD_ISSET(p_ctx->wfd, &wfds)) {
+            ok = true;
+         }
+      }
+
 //    printf("bpipe-fd: IO_WRITE fd=%p buf=%p len=%d\n", p_ctx->fd, io->buf, io->count);
-      io->status = fwrite(io->buf, 1, io->count, p_ctx->pfd->wfd);
+      io->status = full_write(p_ctx->wfd, io->buf, io->count, &p_ctx->canceled);
 //    printf("bpipe-fd: IO_WRITE buf=%p len=%d\n", io->buf, io->status);
-      if (!feof(p_ctx->pfd->wfd) && io->status == 0 && ferror(p_ctx->pfd->wfd)) {
-         bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0,
-            "Pipe write error\n");
-         bfuncs->DebugMessage(ctx, fi, li, dbglvl,
-            "Pipe write error: ERR=%s\n", strerror(errno));
+      if (io->status <= 0) {
+         berrno be;
+         bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0, 
+                            "Pipe write error: ERR=%s\n", be.bstrerror());
+         bfuncs->DebugMessage(ctx, fi, li, dbglvl, 
+                              "Pipe write error: count=%lld errno=%d ERR=%s\n",
+                              p_ctx->total_bytes, (int)errno, be.bstrerror());
          return bRC_Error;
       }
+      p_ctx->total_bytes += io->status;
       break;
 
    case IO_CLOSE:
@@ -410,6 +515,60 @@ static bRC pluginIO(bpContext *ctx, struct io_pkt *io)
          bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0, "Logic error: NULL FD on bpipe close\n");
          return bRC_Error;
       }
+
+      /* We inform the other side that we have nothing more to send */
+      if (p_ctx->wfd >= 0) {
+         int ret = close_wpipe(p_ctx->pfd);
+         if (ret == 0) {
+            bfuncs->JobMessage(ctx, fi, li, M_ERROR, 0, "bpipe-fd: Error closing for file %s: %d\n", 
+                               p_ctx->fname, ret);
+         }
+      }
+
+      /* We flush what the other program has to say */
+      while (!ok && !p_ctx->canceled) {
+         struct timeval tv = {10, 0};   // sleep for 10secs
+         FD_ZERO(&rfds);
+         p_ctx->maxfd = -1;
+
+         if (p_ctx->rfd >= 0) {
+            FD_SET(p_ctx->rfd, &rfds);
+            p_ctx->maxfd = MAX(p_ctx->maxfd, p_ctx->rfd);
+         }
+
+         if (p_ctx->efd >= 0) {
+            FD_SET(p_ctx->efd, &rfds);
+            p_ctx->maxfd = MAX(p_ctx->maxfd, p_ctx->efd);
+         }
+
+         if (p_ctx->maxfd == -1) {
+            ok = true;          /* exit the loop */
+         } else {
+            select(p_ctx->maxfd+1, &rfds, NULL, NULL, &tv);
+         }
+
+         if (p_ctx->rfd >= 0 && FD_ISSET(p_ctx->rfd, &rfds)) {
+            int ret = read(p_ctx->rfd, buf, sizeof(buf));
+            if (ret > 0) {
+               buf[ret]=0;
+               send_log(ctx, buf);
+            } else {
+               p_ctx->rfd = -1; /* closed, keep the reference in bpipe */
+            }
+         }
+
+         /* The stderr can be melted with stdout or not */
+         if (p_ctx->efd >= 0 && FD_ISSET(p_ctx->efd, &rfds)) {
+            int ret = read(p_ctx->efd, buf, sizeof(buf));
+            if (ret > 0) {
+               buf[ret]=0;
+               send_log(ctx, buf);
+            } else {
+               p_ctx->efd = -1; /* closed, keep the reference in bpipe */
+            }
+         }
+      }
+
       io->status = close_bpipe(p_ctx->pfd);
       if (io->status != 0) {
          bfuncs->JobMessage(ctx, fi, li, M_ERROR, 0, "bpipe-fd: Error closing for file %s: %d\n",
@@ -448,7 +607,7 @@ static bRC endRestoreFile(bpContext *ctx)
 /*
  * This is called during restore to create the file (if necessary)
  * We must return in rp->create_status:
- *
+ *   
  *  CF_ERROR    -- error
  *  CF_SKIP     -- skip processing this file
  *  CF_EXTRACT  -- extract the file (i.e.call i/o routines)
@@ -534,9 +693,9 @@ static char *apply_rp_codes(struct plugin_ctx * p_ctx)
       }
    }
 
-   /* Required mem:
-    * len(imsg)
-    * + number of "where" codes * (len(where)-2)
+   /* Required mem: 
+    * len(imsg) 
+    * + number of "where" codes * (len(where)-2) 
     * - number of "replace" codes
     */
    omsg = (char*)malloc(strlen(imsg) + (w_count * (strlen(p_ctx->where)-2)) - r_count + 1);
index eaa580c4c051dc828d3351b75d1bf2adede40901..a04e757658c2f7374a05835024e90b1f6ae5c0c2 100644 (file)
@@ -1,7 +1,7 @@
 /*
    Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2000-2017 Kern Sibbald
+   Copyright (C) 2000-2018 Kern Sibbald
 
    The original author of Bacula is Kern Sibbald, with contributions
    from many others, a complete list can be found in the file AUTHORS.
@@ -154,21 +154,6 @@ static void copy_drives(char *drives, char *dest) {
 
 #endif  /* ! PCOMMON_H */
 
-/* Check if the bacula version is enterprise */
-#ifndef check_beef
-# define check_beef(ctx, ret) \
-  do {                       \
-     const char *v;          \
-     bfuncs->getBaculaValue(ctx, bVarVersion, (void *)&v);  \
-     if (v[0] == '6' || v[0] == '8') {                      \
-        *(ret) = true;       \
-     } else {                \
-        *(ret) = false;      \
-     }                       \
-  } while (0)
-
-#endif  /* check_beef */
-
 #ifdef USE_JOB_LIST
 
 /* This class is used to store locally the job history, you can attach data
@@ -297,8 +282,7 @@ bool joblist::find_job(const char *name, POOLMEM **data)
    P(joblist_mutex);
    if (bopen(&fp, tmp, O_RDONLY, 0) < 0) {
       berrno be;
-      Jmsg(ctx, M_ERROR, "Unable to open job database. "
-           "Can't open %s for reading ERR=%s\n",
+      Jmsg(ctx, M_ERROR, "Unable to open job database %s for reading. ERR=%s\n",
            tmp, be.bstrerror(errno));
       goto bail_out;
    }
@@ -692,3 +676,23 @@ bail_out:
 
 #endif  /* ! USE_JOB_LIST */
 
+#ifdef USE_FULL_WRITE
+static int32_t full_write(int fd, const char *ptr, int32_t nbytes, bool *canceled=NULL)
+{
+   ssize_t nleft, nwritten;
+   nleft = nbytes;
+   while (nleft > 0 && (canceled == NULL || *canceled == false)) {
+      do {
+         errno = 0;
+         nwritten = write(fd, ptr, nleft);
+      } while (nwritten == -1 && errno == EINTR && (canceled == NULL || *canceled == false));
+
+      if (nwritten <= 0) {
+         return nwritten;          /* error */
+      }
+      nleft -= nwritten;
+      ptr += nwritten;
+   }
+   return nbytes - nleft;
+}
+#endif
index ac21afd8140dd149c42eb033c43369b370b012e5..5c2845b32a327da9417b0e9734f6479d8ef931df 100755 (executable)
@@ -25,6 +25,7 @@ messages
 @$out ${cwd}/tmp/log1.out
 label storage=File1 volume=TestVolume001
 estimate job=$JobName level=Full
+setdebug level=100 client=$CLIENT
 @#setdebug level=50 traclient=$CLIENT
 run job=$JobName storage=File1 yes
 wait
@@ -71,23 +72,6 @@ if [ $nb -ne 1 ]; then
     bstat=1
 fi
 
-nb=`grep $file $tmp/list | wc -l`
-if [ $nb -ne 1 ]; then
-    print_debug "ERROR: Should find the RestoreObject for $file in $tmp/list"
-    bstat=1
-fi
-
-nb=`grep Makefile $tmp/list | wc -l`
-if [ $nb -ne 1 ]; then
-    print_debug "ERROR: Should find the RestoreObject for Makefile in $tmp/list"
-    bstat=1
-fi
-
-nb=`grep restore_command $tmp/conf | wc -l`
-if [ $nb -ne 2 ]; then
-    print_debug "ERROR: Should find the RestoreObject for Makefile and $file in $tmp/conf"
-    bstat=1
-fi
 
 cat <<EOF >$tmp/obj
 restore_command="cat >$tmp/Makefile.bak"
@@ -101,7 +85,7 @@ messages
 @$out ${cwd}/tmp/log3.out
 setdebug level=50 client=$CLIENT trace=1
 @putfile obj1 $tmp/obj
-restore pluginrestoreconf="2:obj1" where=${cwd}/tmp  select all storage=File1 done
+restore where=${cwd}/tmp  select all storage=File1 done
 yes
 wait
 setdebug level=0 client=$CLIENT trace=0
@@ -145,12 +129,6 @@ check_two_logs
 #
 # ****FIXME**** test that all three files are restored correctly
 #
-diff ${cwd}/Makefile ${cwd}/tmp/Makefile.bak
-dstat=$?
-
-diff ${cwd}/${file} ${cwd}/tmp/${file}
-dstat=$(($dstat + $?))
-
 diff ${cwd}/tmp/@bpipe@/${file} ${cwd}/${file}
 dstat=$(($dstat + $?))