/*
Bacula(R) - The Network Backup Solution
- Copyright (C) 2000-2017 Kern Sibbald
+ Copyright (C) 2000-2018 Kern Sibbald
The original author of Bacula is Kern Sibbald, with contributions
from many others, a complete list can be found in the file AUTHORS.
#include "bacula.h"
#include "jcr.h"
+
#ifdef HAVE_GETRLIMIT
#include <sys/resource.h>
#else
#define MAX_ARGV 100
+#define MODE_READ 1
+#define MODE_WRITE 2
+#define MODE_SHELL 4
+#define MODE_STDERR 8
+
#if !defined(HAVE_WIN32)
static void build_argc_argv(char *cmd, int *bargc, char *bargv[], int max_arg);
{
char *bargv[MAX_ARGV];
int bargc, i;
- int readp[2], writep[2];
+ int readp[2], writep[2], errp[2];
POOLMEM *tprog;
- int mode_read, mode_write, mode_shell;
+ int mode_map = 0;
BPIPE *bpipe;
int save_errno;
-#if !defined(HAVE_FCNTL_F_CLOSEM) && !defined(HAVE_CLOSEFROM)
struct rlimit rl;
int64_t rlimitResult=0;
-#endif
if (!prog || !*prog) {
/* execve(3) A component of the file does not name an existing file or file is an empty string. */
errno = ENOENT;
return NULL;
}
-
+
bpipe = (BPIPE *)malloc(sizeof(BPIPE));
memset(bpipe, 0, sizeof(BPIPE));
- mode_read = (mode[0] == 'r');
- mode_write = (mode[0] == 'w' || mode[1] == 'w');
- /* mode is at least 2 bytes long, can be 3, rs, rws, ws */
- mode_shell = (mode[1] == 's' || (mode[1] && mode[2] == 's'));
+ if (strchr(mode,'r')) mode_map|=MODE_READ;
+ if (strchr(mode,'w')) mode_map|=MODE_WRITE;
+ if (strchr(mode,'s')) mode_map|=MODE_SHELL;
+ if (strchr(mode,'e')) mode_map|=MODE_STDERR;
+
/* Build arguments for running program. */
tprog = get_pool_memory(PM_FNAME);
pm_strcpy(tprog, prog);
- if (mode_shell) {
+ if (mode_map & MODE_SHELL) {
build_sh_argc_argv(tprog, &bargc, bargv, MAX_ARGV);
-
} else {
build_argc_argv(tprog, &bargc, bargv, MAX_ARGV);
}
#endif
/* Each pipe is one way, write one end, read the other, so we need two */
- if (mode_write && pipe(writep) == -1) {
+ if ((mode_map & MODE_WRITE) && pipe(writep) == -1) {
save_errno = errno;
free(bpipe);
free_pool_memory(tprog);
errno = save_errno;
return NULL;
}
- if (mode_read && pipe(readp) == -1) {
+ if ((mode_map & MODE_READ) && pipe(readp) == -1) {
save_errno = errno;
- if (mode_write) {
+ if (mode_map & MODE_WRITE) {
close(writep[0]);
close(writep[1]);
}
errno = save_errno;
return NULL;
}
+ if ((mode_map & MODE_STDERR) && pipe(errp) == -1) {
+ save_errno = errno;
+ if (mode_map & MODE_WRITE) {
+ close(writep[0]);
+ close(writep[1]);
+ }
+ if (mode_map & MODE_READ) {
+ close(readp[0]);
+ close(readp[1]);
+ }
+ free(bpipe);
+ free_pool_memory(tprog);
+ errno = save_errno;
+ return NULL;
+ }
/* Many systems doesn't have the correct system call
* to determine the FD list to close.
switch (bpipe->worker_pid = fork()) {
case -1: /* error */
save_errno = errno;
- if (mode_write) {
+ if (mode_map & MODE_WRITE) {
close(writep[0]);
close(writep[1]);
}
- if (mode_read) {
+ if (mode_map & MODE_READ) {
close(readp[0]);
close(readp[1]);
}
+ if (mode_map & MODE_STDERR) {
+ close(errp[0]);
+ close(errp[1]);
+ }
free(bpipe);
free_pool_memory(tprog);
errno = save_errno;
return NULL;
case 0: /* child */
- if (mode_write) {
+ if (mode_map & MODE_WRITE) {
close(writep[1]);
dup2(writep[0], 0); /* Dup our write to his stdin */
}
- if (mode_read) {
+ if (mode_map & MODE_READ) {
close(readp[0]); /* Close unused child fds */
dup2(readp[1], 1); /* dup our read to his stdout */
- dup2(readp[1], 2); /* and his stderr */
+ if (mode_map & MODE_STDERR) { /* and handle stderr */
+ close(errp[0]);
+ dup2(errp[1], 2);
+ } else {
+ dup2(readp[1], 2);
+ }
}
#if HAVE_FCNTL_F_CLOSEM
break;
}
free_pool_memory(tprog);
- if (mode_read) {
+ if (mode_map & MODE_READ) {
close(readp[1]); /* close unused parent fds */
bpipe->rfd = fdopen(readp[0], "r"); /* open file descriptor */
}
- if (mode_write) {
+ if (mode_map & MODE_STDERR) {
+ close(errp[1]); /* close unused parent fds */
+ bpipe->efd = fdopen(errp[0], "r"); /* open file descriptor */
+ }
+ if (mode_map & MODE_WRITE) {
close(writep[0]);
bpipe->wfd = fdopen(writep[1], "w");
}
return bpipe;
}
-/* Close the write pipe only */
+/* Close the write pipe only
+ * BE careful ! return 1 if ok */
int close_wpipe(BPIPE *bpipe)
{
int stat = 1;
return stat;
}
+/* Close the stderror pipe only */
+int close_epipe(BPIPE *bpipe)
+{
+ int stat = 1;
+
+ if (bpipe->efd) {
+ if (fclose(bpipe->efd) != 0) {
+ stat = 0;
+ }
+ bpipe->efd = NULL;
+ }
+ return stat;
+}
+
/*
* Close both pipes and free resources
*
fclose(bpipe->wfd);
bpipe->wfd = NULL;
}
+ if (bpipe->efd) {
+ fclose(bpipe->efd);
+ bpipe->efd = NULL;
+ }
if (bpipe->wait == 0) {
wait_option = 0; /* wait indefinitely */
/*
Bacula(R) - The Network Backup Solution
- Copyright (C) 2000-2015 Kern Sibbald
+ Copyright (C) 2000-2018 Kern Sibbald
The original author of Bacula is Kern Sibbald, with contributions
from many others, a complete list can be found in the file AUTHORS.
*
*/
#include "bacula.h"
+#define USE_FULL_WRITE
+#include "fd_common.h"
#include "fd_plugins.h"
+#include "lib/ini.h"
#undef malloc
#undef free
createFile,
setFileAttributes,
checkFile,
- handleXACLdata
+ handleXACLdata,
+ NULL /* No checkStream */
};
/*
struct plugin_ctx {
boffset_t offset;
BPIPE *pfd; /* bpipe file descriptor */
- bool backup; /* set for backup (not needed) */
+ int efd; /* stderr */
+ int rfd; /* stdout */
+ int wfd; /* stdin */
+ int maxfd; /* max(stderr, stdout) */
+ bool backup; /* set when the backup is done */
+ bool canceled;
char *cmd; /* plugin command line */
char *fname; /* filename to "backup/restore" */
char *reader; /* reader program for backup */
char *writer; /* writer program for backup */
-
char where[512];
int replace;
+ int job_level;
+ int estimate_mode;
+ int64_t total_bytes; /* number of bytes read/write */
};
/*
}
/*
- * External entry point to unload the plugin
+ * External entry point to unload the plugin
*/
-bRC unloadPlugin()
+bRC unloadPlugin()
{
// printf("bpipe-fd: Unloaded\n");
return bRC_OK;
}
/*
- * The following entry points are accessed through the function
+ * The following entry points are accessed through the function
* pointers we supplied to Bacula. Each plugin type (dir, fd, sd)
* has its own set of entry points that the plugin must define.
*/
/*
* Return some plugin value (none defined)
*/
-static bRC getPluginValue(bpContext *ctx, pVariable var, void *value)
+static bRC getPluginValue(bpContext *ctx, pVariable var, void *value)
{
return bRC_OK;
}
/*
* Set a plugin value (none defined)
*/
-static bRC setPluginValue(bpContext *ctx, pVariable var, void *value)
+static bRC setPluginValue(bpContext *ctx, pVariable var, void *value)
{
return bRC_OK;
}
static bRC handlePluginEvent(bpContext *ctx, bEvent *event, void *value)
{
struct plugin_ctx *p_ctx = (struct plugin_ctx *)ctx->pContext;
+
if (!p_ctx) {
return bRC_Error;
}
* what is really going on.
*/
switch (event->eventType) {
+ case bEventLevel:
+ p_ctx->job_level = ((intptr_t)value);
+ break;
+
+ case bEventCancelCommand:
+ p_ctx->canceled = true;
+ break;
+
case bEventPluginCommand:
- bfuncs->DebugMessage(ctx, fi, li, dbglvl,
+ bfuncs->DebugMessage(ctx, fi, li, dbglvl,
"bpipe-fd: PluginCommand=%s\n", (char *)value);
break;
case bEventJobStart:
case bEventEndBackupJob:
// printf("bpipe-fd: EndBackupJob\n");
break;
- case bEventLevel:
-// printf("bpipe-fd: JobLevel=%c %d\n", (int)value, (int)value);
- break;
case bEventSince:
// printf("bpipe-fd: since=%d\n", (int)value);
break;
-
case bEventStartRestoreJob:
// printf("bpipe-fd: StartRestoreJob\n");
break;
break;
/* Plugin command e.g. plugin = <plugin-name>:<name-space>:read command:write command */
+ case bEventEstimateCommand:
+ p_ctx->estimate_mode = true;
+ /* Fall-through wanted */
case bEventRestoreCommand:
// printf("bpipe-fd: EventRestoreCommand cmd=%s\n", (char *)value);
/* Fall-through wanted */
- case bEventEstimateCommand:
- /* Fall-through wanted */
case bEventBackupCommand:
char *p;
bfuncs->DebugMessage(ctx, fi, li, dbglvl, "bpipe-fd: pluginEvent cmd=%s\n", (char *)value);
+ p_ctx->backup = false;
p_ctx->cmd = strdup((char *)value);
p = strchr(p_ctx->cmd, ':');
if (!p) {
}
*p++ = 0; /* terminate reader string */
p_ctx->writer = p;
-// printf("bpipe-fd: plugin=%s fname=%s reader=%s writer=%s\n",
+
+// printf("bpipe-fd: plugin=%s fname=%s reader=%s writer=%s\n",
// p_ctx->cmd, p_ctx->fname, p_ctx->reader, p_ctx->writer);
break;
return bRC_OK;
}
-/*
+
+/*
* Start the backup of a specific file
*/
static bRC startBackupFile(bpContext *ctx, struct save_pkt *sp)
if (!p_ctx) {
return bRC_Error;
}
+
time_t now = time(NULL);
sp->fname = p_ctx->fname;
sp->type = FT_REG;
*/
static bRC endBackupFile(bpContext *ctx)
{
+ struct plugin_ctx *p_ctx = (struct plugin_ctx *)ctx->pContext;
+ if (!p_ctx) {
+ return bRC_Error;
+ }
+
/*
* We would return bRC_More if we wanted startBackupFile to be
* called again to backup another file
*/
+ if (!p_ctx->backup) {
+ return bRC_More;
+ }
return bRC_OK;
}
+static void send_log(bpContext *ctx, char *buf)
+{
+ struct plugin_ctx *p_ctx = (struct plugin_ctx *)ctx->pContext;
+ strip_trailing_newline(buf);
+ bfuncs->JobMessage(ctx, fi, li, M_INFO, 0, "%s: %s\n", p_ctx->fname, buf);
+}
/*
* Bacula is calling us to do the actual I/O
*/
static bRC pluginIO(bpContext *ctx, struct io_pkt *io)
{
+ fd_set rfds;
+ fd_set wfds;
+ bool ok=false;
+ char buf[1024];
struct plugin_ctx *p_ctx = (struct plugin_ctx *)ctx->pContext;
if (!p_ctx) {
return bRC_Error;
io->io_errno = 0;
switch(io->func) {
case IO_OPEN:
+ p_ctx->total_bytes = 0;
+ p_ctx->wfd = p_ctx->efd = p_ctx->rfd = -1;
bfuncs->DebugMessage(ctx, fi, li, dbglvl, "bpipe-fd: IO_OPEN\n");
if (io->flags & (O_CREAT | O_WRONLY)) {
char *writer_codes = apply_rp_codes(p_ctx);
- p_ctx->pfd = open_bpipe(writer_codes, 0, "ws");
- bfuncs->DebugMessage(ctx, fi, li, dbglvl, "bpipe-fd: IO_OPEN fd=%p writer=%s\n",
+ p_ctx->pfd = open_bpipe(writer_codes, 0, "rws");
+ bfuncs->DebugMessage(ctx, fi, li, dbglvl, "bpipe-fd: IO_OPEN fd=%p writer=%s\n",
p_ctx->pfd, writer_codes);
if (!p_ctx->pfd) {
io->io_errno = errno;
- bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0,
+ bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0,
"Open pipe writer=%s failed: ERR=%s\n", writer_codes, strerror(errno));
if (writer_codes) {
free(writer_codes);
if (writer_codes) {
free(writer_codes);
}
- io->status = fileno(p_ctx->pfd->wfd);
+ /* We need to read from stdout/stderr for messages to display to the user */
+ p_ctx->rfd = fileno(p_ctx->pfd->rfd);
+ p_ctx->wfd = fileno(p_ctx->pfd->wfd);
+ p_ctx->maxfd = MAX(p_ctx->wfd, p_ctx->rfd);
+ io->status = p_ctx->wfd;
+
} else {
- p_ctx->pfd = open_bpipe(p_ctx->reader, 0, "rs");
- bfuncs->DebugMessage(ctx, fi, li, dbglvl, "bpipe-fd: IO_OPEN fd=%p reader=%s\n",
+ /* Use shell mode and split stderr/stdout */
+ p_ctx->pfd = open_bpipe(p_ctx->reader, 0, "rse");
+ bfuncs->DebugMessage(ctx, fi, li, dbglvl, "bpipe-fd: IO_OPEN fd=%p reader=%s\n",
p_ctx->pfd, p_ctx->reader);
if (!p_ctx->pfd) {
io->io_errno = errno;
- bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0,
+ bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0,
"Open pipe reader=%s failed: ERR=%s\n", p_ctx->reader, strerror(errno));
return bRC_Error;
}
- io->status = fileno(p_ctx->pfd->rfd);
+ /* We need to read from stderr for job log and stdout for the data */
+ p_ctx->efd = fileno(p_ctx->pfd->efd);
+ p_ctx->rfd = fileno(p_ctx->pfd->rfd);
+ p_ctx->maxfd = MAX(p_ctx->efd, p_ctx->rfd);
+ io->status = p_ctx->rfd;
}
sleep(1); /* let pipe connect */
break;
bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0, "Logic error: NULL read FD\n");
return bRC_Error;
}
- io->status = fread(io->buf, 1, io->count, p_ctx->pfd->rfd);
+
+ /* We first try to read stderr, but keep monitoring for data on stdout (when stderr is empty) */
+ while (!p_ctx->canceled) {
+ FD_ZERO(&rfds);
+ FD_SET(p_ctx->rfd, &rfds);
+ FD_SET(p_ctx->efd, &rfds);
+ select(p_ctx->maxfd+1, &rfds, NULL, NULL, NULL);
+
+ if (!FD_ISSET(p_ctx->efd, &rfds)) {
+ /* nothing in stderr, then we should have something in stdout */
+ break;
+ }
+ int ret = read(p_ctx->efd, buf, sizeof(buf));
+ if (ret <= 0) {
+ /* stderr is closed or in error, stdout should be in the same state */
+ /* let handle it at the stdout level */
+ break;
+ }
+ /* TODO: buffer and split lines */
+ buf[ret]=0;
+ send_log(ctx, buf);
+ }
+
+ io->status = read(p_ctx->rfd, io->buf, io->count);
// bfuncs->DebugMessage(ctx, fi, li, dbglvl, "bpipe-fd: IO_READ buf=%p len=%d\n", io->buf, io->status);
- if (!feof(p_ctx->pfd->rfd) && io->status == 0 && ferror(p_ctx->pfd->rfd)) {
- bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0,
- "Pipe read error: ERR=%s\n", strerror(errno));
- bfuncs->DebugMessage(ctx, fi, li, dbglvl,
- "Pipe read error: ERR=%s\n", strerror(errno));
+ if (io->status < 0) {
+ berrno be;
+ bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0,
+ "Pipe read error: ERR=%s\n", be.bstrerror());
+ bfuncs->DebugMessage(ctx, fi, li, dbglvl,
+ "Pipe read error: count=%lld errno=%d ERR=%s\n",
+ p_ctx->total_bytes, (int)errno, be.bstrerror());
return bRC_Error;
}
+ p_ctx->total_bytes += io->status;
break;
case IO_WRITE:
bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0, "Logic error: NULL write FD\n");
return bRC_Error;
}
+
+ /* When we write, we must check for the error channel (stdout+stderr) as well */
+ while (!ok && !p_ctx->canceled) {
+ FD_ZERO(&wfds);
+ FD_SET(p_ctx->wfd, &wfds);
+ FD_ZERO(&rfds);
+ FD_SET(p_ctx->rfd, &rfds);
+
+ select(p_ctx->maxfd+1, &rfds, &wfds, NULL, NULL);
+
+ if (FD_ISSET(p_ctx->rfd, &rfds)) {
+ int ret = read(p_ctx->rfd, buf, sizeof(buf)); /* TODO: simulate fgets() */
+ if (ret > 0) {
+ buf[ret]=0;
+ send_log(ctx, buf);
+ } else {
+ ok = true; /* nothing to read */
+ }
+ }
+
+ if (FD_ISSET(p_ctx->wfd, &wfds)) {
+ ok = true;
+ }
+ }
+
// printf("bpipe-fd: IO_WRITE fd=%p buf=%p len=%d\n", p_ctx->fd, io->buf, io->count);
- io->status = fwrite(io->buf, 1, io->count, p_ctx->pfd->wfd);
+ io->status = full_write(p_ctx->wfd, io->buf, io->count, &p_ctx->canceled);
// printf("bpipe-fd: IO_WRITE buf=%p len=%d\n", io->buf, io->status);
- if (!feof(p_ctx->pfd->wfd) && io->status == 0 && ferror(p_ctx->pfd->wfd)) {
- bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0,
- "Pipe write error\n");
- bfuncs->DebugMessage(ctx, fi, li, dbglvl,
- "Pipe write error: ERR=%s\n", strerror(errno));
+ if (io->status <= 0) {
+ berrno be;
+ bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0,
+ "Pipe write error: ERR=%s\n", be.bstrerror());
+ bfuncs->DebugMessage(ctx, fi, li, dbglvl,
+ "Pipe write error: count=%lld errno=%d ERR=%s\n",
+ p_ctx->total_bytes, (int)errno, be.bstrerror());
return bRC_Error;
}
+ p_ctx->total_bytes += io->status;
break;
case IO_CLOSE:
bfuncs->JobMessage(ctx, fi, li, M_FATAL, 0, "Logic error: NULL FD on bpipe close\n");
return bRC_Error;
}
+
+ /* We inform the other side that we have nothing more to send */
+ if (p_ctx->wfd >= 0) {
+ int ret = close_wpipe(p_ctx->pfd);
+ if (ret == 0) {
+ bfuncs->JobMessage(ctx, fi, li, M_ERROR, 0, "bpipe-fd: Error closing for file %s: %d\n",
+ p_ctx->fname, ret);
+ }
+ }
+
+ /* We flush what the other program has to say */
+ while (!ok && !p_ctx->canceled) {
+ struct timeval tv = {10, 0}; // sleep for 10secs
+ FD_ZERO(&rfds);
+ p_ctx->maxfd = -1;
+
+ if (p_ctx->rfd >= 0) {
+ FD_SET(p_ctx->rfd, &rfds);
+ p_ctx->maxfd = MAX(p_ctx->maxfd, p_ctx->rfd);
+ }
+
+ if (p_ctx->efd >= 0) {
+ FD_SET(p_ctx->efd, &rfds);
+ p_ctx->maxfd = MAX(p_ctx->maxfd, p_ctx->efd);
+ }
+
+ if (p_ctx->maxfd == -1) {
+ ok = true; /* exit the loop */
+ } else {
+ select(p_ctx->maxfd+1, &rfds, NULL, NULL, &tv);
+ }
+
+ if (p_ctx->rfd >= 0 && FD_ISSET(p_ctx->rfd, &rfds)) {
+ int ret = read(p_ctx->rfd, buf, sizeof(buf));
+ if (ret > 0) {
+ buf[ret]=0;
+ send_log(ctx, buf);
+ } else {
+ p_ctx->rfd = -1; /* closed, keep the reference in bpipe */
+ }
+ }
+
+ /* The stderr can be melted with stdout or not */
+ if (p_ctx->efd >= 0 && FD_ISSET(p_ctx->efd, &rfds)) {
+ int ret = read(p_ctx->efd, buf, sizeof(buf));
+ if (ret > 0) {
+ buf[ret]=0;
+ send_log(ctx, buf);
+ } else {
+ p_ctx->efd = -1; /* closed, keep the reference in bpipe */
+ }
+ }
+ }
+
io->status = close_bpipe(p_ctx->pfd);
if (io->status != 0) {
bfuncs->JobMessage(ctx, fi, li, M_ERROR, 0, "bpipe-fd: Error closing for file %s: %d\n",
/*
* This is called during restore to create the file (if necessary)
* We must return in rp->create_status:
- *
+ *
* CF_ERROR -- error
* CF_SKIP -- skip processing this file
* CF_EXTRACT -- extract the file (i.e.call i/o routines)
}
}
- /* Required mem:
- * len(imsg)
- * + number of "where" codes * (len(where)-2)
+ /* Required mem:
+ * len(imsg)
+ * + number of "where" codes * (len(where)-2)
* - number of "replace" codes
*/
omsg = (char*)malloc(strlen(imsg) + (w_count * (strlen(p_ctx->where)-2)) - r_count + 1);