/*
Bacula(R) - The Network Backup Solution
- Copyright (C) 2000-2018 Kern Sibbald
+ Copyright (C) 2000-2020 Kern Sibbald
The original author of Bacula is Kern Sibbald, with contributions
from many others, a complete list can be found in the file AUTHORS.
/*
* Upload the given part to the cloud
*/
-bool cloud_dev::upload_part_to_cloud(DCR *dcr, const char *VolumeName, uint32_t upart)
+bool cloud_dev::upload_part_to_cloud(DCR *dcr, const char *VolumeName, uint32_t upart, bool do_truncate)
{
- if (upload_opt == UPLOAD_NO) {
- /* lets pretend everything is OK */
+ /* for a "regular" backup job, don't proceed with the upload, but pretend everything went OK. */
+ /* Otherwise, pass thru and proceed with the upload, even if UPLOAD_NO is set */
+ bool internal_job = dcr->jcr->is_internal_job() || dcr->jcr->is_JobType(JT_ADMIN);
+ if ((upload_opt == UPLOAD_NO) && !internal_job) {
return true;
}
bool ret=false;
dcr->uploads->append(item);
/* transfer are queued manually, so the caller has control on when the transfer is scheduled
* this should come handy for upload_opt */
- item->set_do_cache_truncate(trunc_opt == TRUNC_AFTER_UPLOAD);
- if (upload_opt == UPLOAD_EACHPART) {
+ item->set_do_cache_truncate(do_truncate);
+ if ( (upload_opt == UPLOAD_EACHPART) ||
+ ((upload_opt == UPLOAD_NO) && internal_job) ) {
/* in each part upload option, queue right away */
item->queue();
}
pm_strcpy(cache_fname, dev_name);
/* create a uniq xfer file name with XFER_TMP_NAME and the pid */
char xferbuf[32];
- bsnprintf(xferbuf, sizeof(xferbuf), "%s_%d", XFER_TMP_NAME, (int)getpid());
- add_vol_and_part(cache_fname, VolumeName, xferbuf, dpart);
+ bsnprintf(xferbuf, sizeof(xferbuf), "%s_%d_%d", XFER_TMP_NAME, (int)getpid(), (int)dcr->jcr->JobId);
+ cloud_driver::add_vol_and_part(cache_fname, VolumeName, xferbuf, dpart);
/* use the cloud proxy to retrieve the transfer size */
uint64_t cloud_size = cloud_prox->get_size(VolumeName, dpart);
POOLMEM *fname = get_pool_memory(PM_NAME);
uint32_t cpart;
bool ok = false;
-
POOL_MEM dname(PM_FNAME);
int status = 0;
return ok;
}
-
-/* Utility routines */
-
-void cloud_dev::add_vol_and_part(POOLMEM *&filename,
- const char *VolumeName, const char *name, uint32_t apart)
-{
- Enter(dbglvl);
- char partnumber[20];
- int len = strlen(filename);
-
- if (len > 0 && !IsPathSeparator((filename)[len-1])) {
- pm_strcat(filename, "/");
- }
-
- pm_strcat(filename, VolumeName);
- bsnprintf(partnumber, sizeof(partnumber), "/%s.%d", name, apart);
- pm_strcat(filename, partnumber);
-}
-
void cloud_dev::make_cache_filename(POOLMEM *&filename,
const char *VolumeName, uint32_t upart)
{
Enter(dbglvl);
pm_strcpy(filename, dev_name);
- add_vol_and_part(filename, VolumeName, "part", upart);
+ cloud_driver::add_vol_and_part(filename, VolumeName, "part", upart);
}
void cloud_dev::make_cache_volume_name(POOLMEM *&volname,
cache_sizes = NULL;
}
if (driver) {
- driver->term(NULL);
+ driver->term(errmsg);
delete driver;
driver = NULL;
}
download_mgr.m_wq.max_workers = device->cloud->max_concurrent_downloads;
}
+ POOL_MEM err; /* device->errmsg is not yet ready */
/* Initialize the driver */
- driver->init(jcr, this, device);
+ if (!driver->init(device->cloud, err.addr())) {
+ Qmsg1(jcr, M_FATAL, 0, "Cloud driver initialization error %s\n", err.c_str());
+ Tmsg1(0, "Cloud driver initialization error %s\n", err.c_str());
+ }
}
/* the cloud proxy owns its cloud_parts, so we can 'set and forget' them */
cloud_prox = cloud_proxy::get_instance();
-
}
/*
/* Ensure the last written part is uploaded */
if ((part > 0) && dcr->is_writing()) {
- if (!upload_part_to_cloud(dcr, VolHdr.VolumeName, part)) {
+ if (!upload_part_to_cloud(dcr, VolHdr.VolumeName, part, (trunc_opt == TRUNC_AFTER_UPLOAD))) {
if (errmsg[0]) {
Qmsg(dcr->jcr, M_ERROR, 0, "%s", errmsg);
}
jcr_not_killable jkl(dcr->jcr);
ilist cloud_parts(100, false); /* !! dont own the parts here */
/* first, retrieve the volume content within cloud_parts list*/
- if (!driver->get_cloud_volume_parts_list(dcr, VolName, &cloud_parts, errmsg)) {
+ cancel_callback cancel_cb;
+ cancel_cb.fct = DCR_cancel_cb;
+ cancel_cb.arg = dcr;
+ if (!driver->get_cloud_volume_parts_list(VolName, &cloud_parts, &cancel_cb, errmsg)) {
Dmsg2(dbglvl, "Cannot get cloud sizes for Volume=%s Err=%s\n", VolName, errmsg);
return false;
}
bool ok = false;
POOL_MEM dname(PM_FNAME);
int status = 0;
- ilist * iuploads = New(ilist(100,true)); /* owns the parts */
- ilist *truncate_list = NULL;
+ ilist * iuploads=New(ilist(100,true)); /* owns the parts */
+ ilist *truncate_list=NULL;
FILE *fp;
errmsg[0] = 0;
Enter(dbglvl);
iuploads->put(part->index, part);
}
/* returns the list of items to truncate : cloud parts-uploads*/
+ cancel_callback cancel_cb;
+ cancel_cb.fct = DCR_cancel_cb;
+ cancel_cb.arg = dcr;
truncate_list = cloud_prox->exclude(getVolCatName(), iuploads);
- if (truncate_list && !driver->truncate_cloud_volume(dcr, getVolCatName(), truncate_list, errmsg)) {
- Qmsg(dcr->jcr, M_ERROR, 0, "truncate_cloud_volume for %s: ERR=%s\n", getVolCatName(), errmsg);
+ if (truncate_list && !driver->truncate_cloud_volume(getVolCatName(), truncate_list, &cancel_cb, errmsg)) {
+ Dmsg1(dbglvl, "%s", errmsg);
+ Qmsg(dcr->jcr, M_ERROR, 0, "%s", errmsg);
goto get_out;
+ } else {
+ Dmsg1(dbglvl, "%s", errmsg);
}
/* force proxy refresh (volume should be empty so it should be fast) */
/* another approach would be to reuse truncate_list to remove items */
/* Write part to cloud */
Dmsg2(dbglvl, "=== part=%d num_cache_parts=%d\n", part, num_cache_parts);
if (dcr->is_writing()) {
- if (!upload_part_to_cloud(dcr, getVolCatName(), part)) {
+ if (!upload_part_to_cloud(dcr, getVolCatName(), part, (trunc_opt == TRUNC_AFTER_UPLOAD))) {
if (errmsg[0]) {
Qmsg(dcr->jcr, M_ERROR, 0, "%s", errmsg);
}
bool cloud_dev::start_of_job(DCR *dcr)
{
+ bool ret=false;
if (driver) {
- driver->start_of_job(dcr);
+ ret = driver->start_of_job(errmsg);
+ } else {
+ Mmsg(errmsg, "Cloud driver not properly loaded"); /* We should always have a dummy driver */
}
- return true;
+ Jmsg(dcr->jcr, ret? M_INFO : M_FATAL, 0, "%s\n", errmsg);
+ return ret;
}
}
}
-bool cloud_dev::end_of_job(DCR *dcr)
+bool cloud_dev::end_of_job(DCR *dcr, uint32_t truncate)
{
Enter(dbglvl);
transfer *tpkt; /* current packet */
tpkt->append_status(umsg);
Jmsg(dcr->jcr, (tpkt->m_state == TRANS_STATE_ERROR) ? M_ERROR : M_INFO, 0, "%s%s", prefix, umsg.c_str());
Dmsg1(dbglvl, "%s", umsg.c_str());
-
+ bool do_truncate = (truncate==TRUNC_AT_ENDOFJOB) || (truncate==TRUNC_CONF_DEFAULT && trunc_opt==TRUNC_AT_ENDOFJOB);
if (tpkt->m_state == TRANS_STATE_ERROR) {
Mmsg(dcr->jcr->StatusErrMsg, _("Upload to Cloud failed"));
- } else if (trunc_opt == TRUNC_AT_ENDOFJOB && tpkt->m_part!=1) {
+ } else if (do_truncate && tpkt->m_part!=1) {
/* else -> don't remove the cache file if the upload failed */
if (unlink(tpkt->m_cache_fname) != 0) {
berrno be;
dcr->uploads->destroy();
if (driver) {
- driver->end_of_job(dcr);
+ driver->end_of_job(errmsg);
}
Leave(dbglvl);
Leave(dbglvl);
return (stat == 0);
}
+/* return the volumes list */
+bool cloud_dev::get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err)
+{
+ cancel_callback cancel_cb;
+ cancel_cb.fct = DCR_cancel_cb;
+ cancel_cb.arg = dcr;
+ return driver->get_cloud_volumes_list(volumes, &cancel_cb, err);
+}
+
+/* return the list of parts contained in VolumeName */
+bool cloud_dev::get_cloud_volume_parts_list(DCR *dcr, const char *VolumeName, ilist *parts, POOLMEM *&err)
+{
+ cancel_callback cancel_cb;
+ cancel_cb.fct = DCR_cancel_cb;
+ cancel_cb.arg = dcr;
+ return driver->get_cloud_volume_parts_list(VolumeName, parts, &cancel_cb, err);
+}
/* TODO: Add .api2 mode for the status message */
/* format a status message of the cloud transfers. Verbose gives details on each transfer */
uint32_t cloud_dev::get_cloud_upload_transfer_status(POOL_MEM& msg, bool verbose)
{
- upload_mgr.update_statistics();
uint32_t ret = 0;
ret = Mmsg(msg,_(" Uploads "));
ret += upload_mgr.append_status(msg, verbose);
/* format a status message of the cloud transfers. Verbose gives details on each transfer */
uint32_t cloud_dev::get_cloud_download_transfer_status(POOL_MEM& msg, bool verbose)
{
- download_mgr.update_statistics();
uint32_t ret = 0;
ret = Mmsg(msg,_(" Downloads "));
ret += download_mgr.append_status(msg, verbose);
/*
* Upload cache parts that are not in the cloud
*/
-bool cloud_dev::upload_cache(DCR *dcr, const char *VolumeName, POOLMEM *&err)
+bool cloud_dev::upload_cache(DCR *dcr, const char *VolumeName, uint32_t truncate, POOLMEM *&err)
{
int i;
Enter(dbglvl);
ilist cache_parts;
POOLMEM *vol_dir = get_pool_memory(PM_NAME);
POOLMEM *fname = get_pool_memory(PM_NAME);
-
- if (!driver->get_cloud_volume_parts_list(dcr, VolumeName, &cloud_parts, err)) {
+ cancel_callback cancel_cb;
+ cancel_cb.fct = DCR_cancel_cb;
+ cancel_cb.arg = dcr;
+ if (!driver->get_cloud_volume_parts_list(VolumeName, &cloud_parts, &cancel_cb, err)) {
Qmsg2(dcr->jcr, M_ERROR, 0, "Error while uploading parts for volume %s. %s\n", VolumeName, err);
ret = false;
goto bail_out;
}
Mmsg(fname, "%s/part.%d", vol_dir, i);
Dmsg1(dbglvl, "Do upload of %s\n", fname);
- if (!upload_part_to_cloud(dcr, VolumeName, i)) {
+ bool do_truncate = (truncate==TRUNC_AFTER_UPLOAD) || (truncate==TRUNC_CONF_DEFAULT && (trunc_opt == TRUNC_AFTER_UPLOAD));
+ if (!upload_part_to_cloud(dcr, VolumeName, i, do_truncate)) {
if (errmsg[0]) {
Qmsg(dcr->jcr, M_ERROR, 0, "%s", errmsg);
}
/*
Bacula(R) - The Network Backup Solution
- Copyright (C) 2000-2018 Kern Sibbald
+ Copyright (C) 2000-2020 Kern Sibbald
The original author of Bacula is Kern Sibbald, with contributions
from many others, a complete list can be found in the file AUTHORS.
#include "cloud_transfer_mgr.h"
#include "cloud_parts.h"
+/* enum loadable drivers */
+enum {
+ C_S3_DRIVER = 1,
+ C_FILE_DRIVER = 2
+};
+
class cloud_dev: public file_dev {
public:
int64_t obj_len;
uint32_t trunc_opt;
uint32_t upload_opt;
+ uint32_t current_driver_type;
+
cloud_driver *driver;
static transfer_manager download_mgr;
cloud_proxy *cloud_prox;
- void add_vol_and_part(POOLMEM *&filename, const char *VolumeName, const char *name, uint32_t part);
-
private:
- char *cache_directory;
bool download_parts_to_read(DCR *dcr, alist* parts);
- bool upload_part_to_cloud(DCR *dcr, const char *VolumeName, uint32_t part);
+ bool upload_part_to_cloud(DCR *dcr, const char *VolumeName, uint32_t part, bool do_truncate);
transfer *download_part_to_cache(DCR *dcr, const char *VolumeName, uint32_t part);
void make_cache_filename(POOLMEM *&filename, const char *VolumeName, uint32_t part);
void make_cache_volume_name(POOLMEM *&full_volname, const char *VolumeName);
bool open_next_part(DCR *dcr);
bool truncate(DCR *dcr);
int truncate_cache(DCR *dcr, const char *VolName, int64_t *size);
- bool upload_cache(DCR *dcr, const char *VolName, POOLMEM *&err);
+ bool upload_cache(DCR *dcr, const char *VolName, uint32_t truncate, POOLMEM *&err);
bool close(DCR *dcr);
bool update_pos(DCR *dcr);
bool is_eod_valid(DCR *dcr);
bool relabel, bool no_prelabel);
bool rewrite_volume_label(DCR *dcr, bool recycle);
bool start_of_job(DCR *dcr);
- bool end_of_job(DCR *dcr);
- bool get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err)
- { return !driver?false:driver->get_cloud_volumes_list(dcr, volumes, err); };
- bool get_cloud_volume_parts_list(DCR *dcr, const char *VolumeName, ilist *parts, POOLMEM *&err) { return driver->get_cloud_volume_parts_list(dcr, VolumeName, parts, err);};
+ bool end_of_job(DCR *dcr, uint32_t truncate);
+ bool get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err);
+ bool get_cloud_volume_parts_list(DCR *dcr, const char *VolumeName, ilist *parts, POOLMEM *&err);
uint32_t get_cloud_upload_transfer_status(POOL_MEM &msg, bool verbose);
uint32_t get_cloud_download_transfer_status(POOL_MEM &msg, bool verbose);
};
/*
- Bacula(R) - The Network Backup Solution
+ Bacula® - The Network Backup Solution
- Copyright (C) 2000-2018 Kern Sibbald
+ Copyright (C) 2000-2016 Bacula Systems SA
+ All rights reserved.
- The original author of Bacula is Kern Sibbald, with contributions
- from many others, a complete list can be found in the file AUTHORS.
+ The main author of Bacula is Kern Sibbald, with contributions from many
+ others, a complete list can be found in the file AUTHORS.
- You may use this file and others of this release according to the
- license defined in the LICENSE file, which includes the Affero General
- Public License, v3.0 ("AGPLv3") and some additional permissions and
- terms pursuant to its AGPLv3 Section 7.
+ Licensees holding a valid Bacula Systems SA license may use this file
+ and others of this release in accordance with the proprietary license
+ agreement provided in the LICENSE file. Redistribution of any part of
+ this release is not permitted.
- This notice must be preserved when any source code is
- conveyed and/or propagated.
-
- Bacula(R) is a registered trademark of Kern Sibbald.
+ Bacula® is a registered trademark of Kern Sibbald.
*/
/*
* Routines for writing Cloud drivers
*
* Written by Kern Sibbald, May MMXVI
+ *
*/
#include "bacula.h"
#define NUM_UPLOAD_RETRIES 2
class cloud_dev;
-enum {
- C_S3_DRIVER = 1,
- C_FILE_DRIVER = 2
-};
+/* define the cancel callback type */
+typedef bool (cancel_cb_type)(void*);
+typedef struct {
+ cancel_cb_type* fct;
+ void *arg;
+} cancel_callback;
/* Abstract class cannot be instantiated */
class cloud_driver: public SMARTALLOC {
virtual bool copy_cache_part_to_cloud(transfer *xfer) = 0;
virtual bool copy_cloud_part_to_cache(transfer *xfer) = 0;
- virtual bool truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *trunc_parts, POOLMEM *&err) = 0;
- virtual bool init(JCR *jcr, cloud_dev *dev, DEVRES *device) = 0;
- virtual bool term(DCR *dcr) = 0;
- virtual bool start_of_job(DCR *dcr) = 0;
- virtual bool end_of_job(DCR *dcr) = 0;
+ virtual bool truncate_cloud_volume(const char *VolumeName, ilist *trunc_parts, cancel_callback *cancel_cb, POOLMEM *&err) = 0;
+ virtual bool init(CLOUD *cloud, POOLMEM *&err) = 0;
+ virtual bool term(POOLMEM *&err) = 0;
+ virtual bool start_of_job(POOLMEM *&err) = 0;
+ virtual bool end_of_job(POOLMEM *&err) = 0;
+ virtual bool get_cloud_volume_parts_list(const char* VolumeName, ilist *parts, cancel_callback *cancel_cb, POOLMEM *&err) = 0;
+ virtual bool get_cloud_volumes_list(alist *volumes, cancel_callback *cancel_cb, POOLMEM *&err) = 0;
+ static void add_vol_and_part(POOLMEM *&filename, const char *VolumeName, const char *name, uint32_t apart)
+ {
+ char partnumber[20];
+ int len = strlen(filename);
+
+ if (len > 0 && !IsPathSeparator((filename)[len-1])) {
+ pm_strcat(filename, "/");
+ }
- virtual bool get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, ilist *parts, POOLMEM *&err) = 0;
- virtual bool get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err) = 0; /* TODO: Adapt the prototype to have a handler instead */
+ pm_strcat(filename, VolumeName);
+ bsnprintf(partnumber, sizeof(partnumber), "/%s.%d", name, apart);
+ pm_strcat(filename, partnumber);
+ }
bwlimit upload_limit;
bwlimit download_limit;
/*
- Bacula(R) - The Network Backup Solution
+ Bacula® - The Network Backup Solution
- Copyright (C) 2000-2018 Kern Sibbald
+ Copyright (C) 2000-2016 Bacula Systems SA
+ All rights reserved.
- The original author of Bacula is Kern Sibbald, with contributions
- from many others, a complete list can be found in the file AUTHORS.
+ The main author of Bacula is Kern Sibbald, with contributions from many
+ others, a complete list can be found in the file AUTHORS.
- You may use this file and others of this release according to the
- license defined in the LICENSE file, which includes the Affero General
- Public License, v3.0 ("AGPLv3") and some additional permissions and
- terms pursuant to its AGPLv3 Section 7.
+ Licensees holding a valid Bacula Systems SA license may use this file
+ and others of this release in accordance with the proprietary license
+ agreement provided in the LICENSE file. Redistribution of any part of
+ this release is not permitted.
- This notice must be preserved when any source code is
- conveyed and/or propagated.
-
- Bacula(R) is a registered trademark of Kern Sibbald.
+ Bacula® is a registered trademark of Kern Sibbald.
*/
/*
* Routines for writing Cloud drivers
*
* Written by Kern Sibbald, May MMXVI
+ *
*/
#include "cloud_parts.h"
}
-#endif /* TEST_PROGRAM */
+#endif /* TEST_PROGRAM */
\ No newline at end of file
/*
- Bacula(R) - The Network Backup Solution
+ Bacula® - The Network Backup Solution
- Copyright (C) 2000-2018 Kern Sibbald
+ Copyright (C) 2000-2016 Bacula Systems SA
+ All rights reserved.
- The original author of Bacula is Kern Sibbald, with contributions
- from many others, a complete list can be found in the file AUTHORS.
+ The main author of Bacula is Kern Sibbald, with contributions from many
+ others, a complete list can be found in the file AUTHORS.
- You may use this file and others of this release according to the
- license defined in the LICENSE file, which includes the Affero General
- Public License, v3.0 ("AGPLv3") and some additional permissions and
- terms pursuant to its AGPLv3 Section 7.
+ Licensees holding a valid Bacula Systems SA license may use this file
+ and others of this release in accordance with the proprietary license
+ agreement provided in the LICENSE file. Redistribution of any part of
+ this release is not permitted.
- This notice must be preserved when any source code is
- conveyed and/or propagated.
-
- Bacula(R) is a registered trademark of Kern Sibbald.
+ Bacula® is a registered trademark of Kern Sibbald.
*/
/*
* Routines for managing Volumes contains and comparing parts
*
* Written by Norbert Bizet, May MMXVI
+ *
*/
#ifndef _CLOUD_PARTS_H_
#define _CLOUD_PARTS_H_
private:
htable *m_hash; /* the root htable */
bool m_owns; /* determines if ilist own the cloud_parts */
- pthread_mutex_t m_mutex; /* protect access*/
+ pthread_mutex_t m_mutex; /* protect access*/
static cloud_proxy *m_pinstance; /* singleton instance */
static uint64_t m_count; /* static refcount */
-
+
~cloud_proxy();
public:
/* Check if the volume entry exists and return true if it's the case */
bool volume_lookup(const char *volume);
-
+
/* reset the volume list content with the content of part_list */
bool reset(const char *volume, ilist *part_list);
/*
Bacula(R) - The Network Backup Solution
- Copyright (C) 2000-2018 Kern Sibbald
+ Copyright (C) 2000-2020 Kern Sibbald
The original author of Bacula is Kern Sibbald, with contributions
from many others, a complete list can be found in the file AUTHORS.
#include "cloud_transfer_mgr.h"
#include "stored.h"
+#define ONE_SEC 1000000LL /* number of microseconds in a second */
+
+static const int dbglvl = 450;
+
/* constructor
* size : the size in bytes of the transfer
* funct : function to process
DCR *dcr,
cloud_proxy *proxy) :
m_stat_size(size),
+ m_stat_processed_size(0),
m_stat_start(0),
m_stat_duration(0),
m_stat_eta(0),
+ m_stat_average_rate(0),
m_message(NULL),
m_state(TRANS_STATE_CREATED),
m_mgr(NULL),
free(m_cache_fname);
if (m_use_count > 0) {
ASSERT(FALSE);
- Dmsg1(0, "!!!m_use_count = %d\n", m_use_count);
+ Dmsg1(dbglvl, "!!!m_use_count = %d\n", m_use_count);
}
}
return true;
}
+/* reset processed size */
+void transfer::reset_processed_size()
+{
+ lock_guard lg(m_stat_mutex);
+ m_stat_processed_size = 0;
+ ASSERTD(m_stat_processed_size==0, "invalid locking of processed size");
+}
+
+/* set absolute value process size */
+void transfer::set_processed_size(uint64_t size)
+{
+ lock_guard lg(m_stat_mutex);
+ m_stat_processed_size = size;
+ m_stat_duration = get_current_btime()-m_stat_start;
+ if (m_stat_duration > 0) {
+ m_stat_average_rate = (m_stat_processed_size*ONE_SEC)/m_stat_duration;
+ }
+ ASSERTD(m_stat_processed_size <= m_stat_size, "increment_processed_size increment too big");
+
+}
+/* add increment to the current processed size */
+void transfer::increment_processed_size(uint64_t increment)
+{
+ set_processed_size(m_stat_processed_size+increment);
+}
/* opaque function that processes m_funct with m_arg as parameter
* depending on m_funct return value, changes state to TRANS_STATE_DONE
}
/* checking the cancel status : doesnt request locking */
-bool transfer::is_cancelled() const
+bool transfer::is_canceled() const
{
return m_cancel;
}
uint32_t ret=0;
static const char *state[] = {"created", "queued", "process", "done", "error"};
+ lock_guard lg(m_stat_mutex);
if (m_state > TRANS_STATE_PROCESSED) {
ret = Mmsg(tmp_msg,_("%s/part.%-5d state=%-7s size=%sB duration=%ds%s%s\n"),
m_volume_name, m_part,
state[m_state],
edit_uint64_with_suffix(m_stat_size, ec),
- m_stat_duration,
+ m_stat_duration/ONE_SEC,
(strlen(m_message) != 0)?" msg=":"",
(strlen(m_message) != 0)?m_message:"");
pm_strcat(msg, tmp_msg);
} else {
- ret = Mmsg(tmp_msg,_("%s/part.%-5d, state=%-7s size=%sB eta=%dss%s%s\n"),
+ ret = Mmsg(tmp_msg,_("%s/part.%-5d state=%-7s size=%sB eta=%ds%s%s\n"),
m_volume_name, m_part,
state[m_state],
edit_uint64_with_suffix(m_stat_size, ec),
- m_stat_eta,
+ m_stat_eta/ONE_SEC,
(strlen(m_message) != 0)?" msg=":"",
(strlen(m_message) != 0)?m_message:"");
pm_strcat(msg, tmp_msg);
return ret;
}
+void transfer::append_api_status(OutputWriter &ow)
+{
+ static const char *state[] = {"created", "queued", "process", "done", "error"};
+
+ lock_guard lg(m_stat_mutex);
+ if (m_state > TRANS_STATE_PROCESSED) {
+ ow.get_output(OT_START_OBJ,
+ OT_STRING,"volume_name", m_volume_name,
+ OT_INT32, "part", m_part,
+ OT_INT32, "jobid", m_dcr->jcr->JobId,
+ OT_STRING,"state", state[m_state],
+ OT_INT64, "size", m_stat_size,
+ OT_DURATION, "duration", m_stat_duration/ONE_SEC,
+ OT_STRING,"message", m_message,
+ OT_END);
+ } else {
+ ow.get_output(OT_START_OBJ,
+ OT_STRING,"volume_name", m_volume_name,
+ OT_INT32, "part", m_part,
+ OT_INT32, "jobid", m_dcr->jcr->JobId,
+ OT_STRING,"state", state[m_state],
+ OT_INT64, "size", m_stat_size,
+ OT_INT64, "processed_size", m_stat_processed_size,
+ OT_DURATION, "eta", m_stat_eta/ONE_SEC,
+ OT_STRING,"message", m_message,
+ OT_END);
+ }
+}
/* the manager references itself through this function */
void transfer::set_manager(transfer_manager *mgr)
V(m_mgr->m_stat_mutex);
P(m_mgr->m_mutex);
- ++m_use_count;
m_mgr->add_work(this);
V(m_mgr->m_mutex);
}
P(m_mgr->m_mutex);
m_mgr->remove_work(m_workq_elem);
- --m_use_count;
V(m_mgr->m_mutex);
}
}
/*transfer starts now*/
P(m_stat_mutex);
- m_stat_start = (utime_t)time(NULL);
+ m_stat_start = get_current_btime();
V(m_stat_mutex);
}
}
ret = true;
/*transfer stops now : compute transfer duration*/
P(m_stat_mutex);
- m_stat_duration = (utime_t)time(NULL)-m_stat_start;
+ m_stat_duration = get_current_btime()-m_stat_start;
+ if (m_stat_duration > 0) {
+ m_stat_processed_size = m_stat_size;
+ ASSERTD(m_stat_size == m_stat_processed_size, "xfer done before processed size is equal to size.");
+ m_stat_average_rate = (m_stat_size*ONE_SEC)/m_stat_duration;
+ }
V(m_stat_mutex);
if (m_mgr) {
m_mgr->m_stat_size_done += m_stat_size;
/*add local duration to manager duration */
m_mgr->m_stat_duration_done += m_stat_duration;
- /*reprocess the manager average rate with it*/
- if (m_mgr->m_stat_duration_done != 0) {
- m_mgr->m_stat_average_rate =
- m_mgr->m_stat_size_done /
- m_mgr->m_stat_duration_done;
- }
/*unlock manager statistics */
V(m_mgr->m_stat_mutex);
-
- /* process is completed, unref the workq reference */
- --m_use_count;
}
if (m_proxy) {
ret = true;
/*transfer stops now, even if in error*/
P(m_stat_mutex);
- m_stat_duration = (utime_t)time(NULL)-m_stat_start;
+ m_stat_duration = get_current_btime()-m_stat_start;
V(m_stat_mutex);
if (m_mgr) {
m_mgr->m_stat_size_error += m_stat_size;
/*unlock manager statistics */
V(m_mgr->m_stat_mutex);
-
- /* process is completed, unref the workq reference */
- --m_use_count;
}
/* in both case, success or failure, life keeps going on */
pthread_cond_broadcast(&m_done);
void transfer_manager::update_statistics()
{
/* lock the manager stats */
- P(m_stat_mutex);
-
- /* ETA naive calculation for each element in the queue =
- * (accumulator(previous elements size) / average_rate) / num_workers;
- */
- uint64_t accumulator=0;
+ lock_guard lg_stat(m_stat_mutex);
/* lock the queue so order and chaining cannot be modified */
- P(m_mutex);
- P(m_wq.mutex);
- m_stat_nb_workers = m_wq.max_workers;
+ lock_guard lg(m_mutex);
- /* parse the queued and processed transfers */
+ /* recompute global average rate */
transfer *t;
+ uint64_t accumulated_done_average_rate = 0;
+ uint32_t nb_done_accumulated = 0;
foreach_dlist(t, &m_transfer_list) {
- if ( (t->m_state == TRANS_STATE_QUEUED) ||
- (t->m_state == TRANS_STATE_PROCESSED)) {
- accumulator+=t->m_stat_size;
- P(t->m_stat_mutex);
- if ((m_stat_average_rate != 0) && (m_stat_nb_workers != 0)) {
- /*update eta for each transfer*/
- t->m_stat_eta = (accumulator / m_stat_average_rate) / m_stat_nb_workers;
- }
- V(t->m_stat_mutex);
+ lock_guard lg_xferstatmutex(t->m_stat_mutex);
+ if (t->m_stat_average_rate>0) {
+ accumulated_done_average_rate += t->m_stat_average_rate;
+ t->m_stat_average_rate = 0;
+ ++nb_done_accumulated;
}
}
+ if (nb_done_accumulated) {
+ m_stat_average_rate = accumulated_done_average_rate / nb_done_accumulated;
+ }
- /* the manager ETA is the ETA of the last transfer in its workq */
- if (m_wq.last) {
- transfer *t = (transfer *)m_wq.last->data;
- if (t) {
- m_stat_eta = t->m_stat_eta;
+ /* ETA naive calculation for each element in the queue */
+ if (m_stat_average_rate != 0) {
+ uint64_t accumulator=0;
+ foreach_dlist(t, &m_transfer_list) {
+ if (t->m_state == TRANS_STATE_QUEUED) {
+ lock_guard lg_xferstatmutex(t->m_stat_mutex);
+ accumulator+= t->m_stat_size-t->m_stat_processed_size;
+ t->m_stat_eta = (accumulator / m_stat_average_rate) * ONE_SEC;
+ }
+ if (t->m_state == TRANS_STATE_PROCESSED) {
+ lock_guard lg_xferstatmutex(t->m_stat_mutex);
+ t->m_stat_eta = ((t->m_stat_size-t->m_stat_processed_size) / m_stat_average_rate) * ONE_SEC;
+ }
}
+ /* the manager ETA is the ETA of the last transfer in its workq */
+ m_stat_eta = (accumulator / m_stat_average_rate) * ONE_SEC;
}
-
- V(m_wq.mutex);
- V(m_mutex);
- V(m_stat_mutex);
}
/* short status of the transfers */
update_statistics();
char ec0[30],ec1[30],ec2[30],ec3[30],ec4[30];
POOLMEM *tmp_msg = get_pool_memory(PM_MESSAGE);
+ lock_guard lg_stat(m_stat_mutex);
uint32_t ret = Mmsg(tmp_msg, _("(%sB/s) (ETA %d s) "
"Queued=%d %sB, Processed=%d %sB, Done=%d %sB, Failed=%d %sB\n"),
- edit_uint64_with_suffix(m_stat_average_rate, ec0), m_stat_eta,
+ edit_uint64_with_suffix(m_stat_average_rate, ec0), m_stat_eta/ONE_SEC,
m_stat_nb_transfer_queued, edit_uint64_with_suffix(m_stat_size_queued, ec1),
m_stat_nb_transfer_processed, edit_uint64_with_suffix(m_stat_size_processed, ec2),
m_stat_nb_transfer_done, edit_uint64_with_suffix(m_stat_size_done, ec3),
pm_strcat(msg, tmp_msg);
if (verbose) {
- P(m_mutex);
+ lock_guard lg(m_mutex);
if (!m_transfer_list.empty()) {
ret += Mmsg(tmp_msg, _("------------------------------------------------------------ details ------------------------------------------------------------\n"));
pm_strcat(msg, tmp_msg);
foreach_dlist(tpkt, &m_transfer_list) {
ret += tpkt->append_status(msg);
}
- V(m_mutex);
}
free_pool_memory(tmp_msg);
return ret;
}
+
+void transfer_manager::append_api_status(OutputWriter &ow, bool verbose)
+{
+ update_statistics();
+
+ lock_guard lg_stat(m_stat_mutex);
+ ow.get_output(OT_START_OBJ,
+ OT_INT64, "average_rate", m_stat_average_rate,
+ OT_DURATION, "eta", m_stat_eta/ONE_SEC,
+ OT_INT64, "nb_transfer_queued", m_stat_nb_transfer_queued,
+ OT_INT64, "size_queued", m_stat_size_queued,
+ OT_INT64, "nb_transfer_processed", m_stat_nb_transfer_processed,
+ OT_INT64, "size_processed", m_stat_size_processed,
+ OT_INT64, "nb_transfer_done", m_stat_nb_transfer_done,
+ OT_INT64, "size_done", m_stat_size_done,
+ OT_INT64, "nb_transfer_error", m_stat_nb_transfer_error,
+ OT_INT64, "size_error", m_stat_size_error,
+ OT_INT, "transfers_list_size", m_transfer_list.size(),
+ OT_END);
+ if (verbose) {
+ lock_guard lg(m_mutex);
+ ow.start_list("transfers");
+ transfer *tpkt;
+ foreach_dlist(tpkt, &m_transfer_list) {
+ tpkt->append_api_status(ow);
+ }
+ ow.end_list();
+ }
+}
/*
Bacula(R) - The Network Backup Solution
- Copyright (C) 2000-2018 Kern Sibbald
+ Copyright (C) 2000-2020 Kern Sibbald
The original author of Bacula is Kern Sibbald, with contributions
from many others, a complete list can be found in the file AUTHORS.
pthread_mutex_t m_stat_mutex;
/* size of the transfer: should be filled asap */
uint64_t m_stat_size;
+ /* size processed so far : filled by the processor (driver) */
+ uint64_t m_stat_processed_size;
/* time when process started */
- utime_t m_stat_start;
+ btime_t m_stat_start;
/* duration of the transfer : automatically filled when transfer is completed*/
- utime_t m_stat_duration;
+ btime_t m_stat_duration;
/* estimate time to arrival : predictive guest approximation of transfer time*/
- utime_t m_stat_eta;
+ btime_t m_stat_eta;
+ /* computed bytes/sec transfer rate */
+ uint64_t m_stat_average_rate;
/* status variables :*/
/* protect status changes*/
bool cancel();
/* callback fct that checks if transfer has been cancelled */
- bool is_cancelled() const;
+ bool is_canceled() const;
/* append a status message into msg*/
uint32_t append_status(POOL_MEM& msgs);
+ void append_api_status(OutputWriter &ow);
void set_do_cache_truncate(bool do_cache_truncate);
+ /* reset processed size */
+ void reset_processed_size();
+
+ /* set processed size absolute value */
+ void set_processed_size(uint64_t size);
+
+ /* add increment to the current processed size */
+ void increment_processed_size(uint64_t increment);
+
protected:
friend class transfer_manager;
/* size in bytes of transfers in TRANS_STATE_ERROR state in this manager*/
uint64_t m_stat_size_error;
/* duration of transfers in TRANS_STATE_DONE state in this manager*/
- utime_t m_stat_duration_done;
+ btime_t m_stat_duration_done;
/* computed bytes/sec transfer rate */
uint64_t m_stat_average_rate;
/* computed Estimate Time to Arrival */
- utime_t m_stat_eta;
+ btime_t m_stat_eta;
/* status variables global for this manager: */
/* append a status message into msg*/
uint32_t append_status(POOL_MEM& msg, bool verbose);
+ void append_api_status(OutputWriter &ow, bool verbose);
protected:
friend class transfer;
/*
Bacula(R) - The Network Backup Solution
- Copyright (C) 2000-2018 Kern Sibbald
+ Copyright (C) 2000-2020 Kern Sibbald
The original author of Bacula is Kern Sibbald, with contributions
from many others, a complete list can be found in the file AUTHORS.
}
while (obj_len > 0) {
- if (xfer->is_cancelled()) {
+ if (xfer->is_canceled()) {
Mmsg(xfer->m_message, "Job is canceled.\n");
goto get_out;
}
out_fname, be.bstrerror());
}
obj_len -= rbytes;
+ xfer->increment_processed_size(rbytes);
if (limit->use_bwlimit()) {
limit->control_bwlimit(rbytes);
}
return put_object(xfer, cloud_fname, cache_fname, &download_limit);
}
-bool file_driver::truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *trunc_parts, POOLMEM *&err)
+bool file_driver::truncate_cloud_volume(const char *VolumeName, ilist *trunc_parts, cancel_callback *cancel_cb, POOLMEM *&err)
{
bool rtn = true;
int i;
make_cloud_filename(filename, VolumeName, i);
if (unlink(filename) != 0 && errno != ENOENT) {
berrno be;
- Mmsg2(err, "Unable to delete %s. ERR=%s\n", filename, be.bstrerror());
- Dmsg1(dbglvl, "%s", err);
- Qmsg(dcr->jcr, M_INFO, 0, "%s", err);
+ Mmsg3(err, "truncate_cloud_volume for %s: Unable to delete %s. ERR=%s\n", VolumeName, filename, be.bstrerror());
rtn = false;
} else {
- Dmsg1(dbglvl, "Unlink file %s\n", filename);
+ Mmsg2(err, "truncate_cloud_volume for %s: Unlink file %s.\n", VolumeName, filename);
}
}
Enter(dbglvl);
pm_strcpy(filename, hostName);
- dev->add_vol_and_part(filename, VolumeName, "part", part);
+ cloud_driver::add_vol_and_part(filename, VolumeName, "part", part);
Dmsg1(dbglvl, "make_cloud_filename: %s\n", filename);
}
uint32_t upload_opt;
*/
-bool file_driver::init(JCR *jcr, cloud_dev *adev, DEVRES *adevice)
+bool file_driver::init(CLOUD *cloud, POOLMEM *&err)
{
- dev = adev; /* copy cloud device pointer */
- device = adevice; /* copy device resource pointer */
- cloud = device->cloud; /* local pointer to cloud definition */
-
- /* File I/O buffer */
- buf_len = dev->max_block_size;
- if (buf_len == 0) {
- buf_len = DEFAULT_BLOCK_SIZE;
+ if (cloud->host_name == NULL) {
+ Mmsg1(err, "Failed to initialize File Cloud. ERR=Hostname not set in cloud resource %s\n", cloud->hdr.name);
+ return false;
}
+ /* File I/O buffer */
+ buf_len = DEFAULT_BLOCK_SIZE;
hostName = cloud->host_name;
bucketName = cloud->bucket_name;
return true;
}
-bool file_driver::start_of_job(DCR *dcr)
+bool file_driver::start_of_job(POOLMEM *&msg)
{
- Jmsg(dcr->jcr, M_INFO, 0, _("Using File cloud driver Host=%s Bucket=%s\n"),
- hostName, bucketName);
+ Mmsg(msg, _("Using File cloud driver Host=%s Bucket=%s\n"), hostName, bucketName);
return true;
}
-bool file_driver::end_of_job(DCR *dcr)
+bool file_driver::end_of_job(POOLMEM *&msg)
{
return true;
}
-/*
- * Note, dcr may be NULL
- */
-bool file_driver::term(DCR *dcr)
+bool file_driver::term(POOLMEM *&msg)
{
return true;
}
-bool file_driver::get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, ilist *parts, POOLMEM *&err)
+bool file_driver::get_cloud_volume_parts_list(const char* VolumeName, ilist *parts, cancel_callback *cancel_cb, POOLMEM *&err)
{
Enter(dbglvl);
entry = (struct dirent *)malloc(sizeof(struct dirent) + name_max + 1000);
for ( ;; ) {
- if (dcr->jcr->is_canceled()) {
+ if (cancel_cb && cancel_cb->fct && cancel_cb->fct(cancel_cb->arg)) {
pm_strcpy(err, "Job canceled");
goto get_out;
}
return ok;
}
-bool file_driver::get_cloud_volumes_list(DCR *dcr, alist *volumes, POOLMEM *&err)
+bool file_driver::get_cloud_volumes_list(alist *volumes, cancel_callback *cancel_cb, POOLMEM *&err)
{
if (!volumes) {
pm_strcpy(err, "Invalid argument");
entry = (struct dirent *)malloc(sizeof(struct dirent) + name_max + 1000);
for ( ;; ) {
- if (dcr->jcr->is_canceled()) {
+ if (cancel_cb && cancel_cb->fct && cancel_cb->fct(cancel_cb->arg)) {
goto get_out;
}
errno = 0;
/*
Bacula(R) - The Network Backup Solution
- Copyright (C) 2000-2018 Kern Sibbald
+ Copyright (C) 2000-2020 Kern Sibbald
The original author of Bacula is Kern Sibbald, with contributions
from many others, a complete list can be found in the file AUTHORS.
private:
void make_cloud_filename(POOLMEM *&filename, const char *VolumeName, uint32_t part);
- bool init(JCR *jcr, cloud_dev *dev, DEVRES *device);
- bool start_of_job(DCR *dcr);
- bool end_of_job(DCR *dcr);
- bool term(DCR *dcr);
- bool truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *trunc_parts, POOLMEM *&err);
+ bool init(CLOUD *cloud, POOLMEM *&err);
+ bool start_of_job(POOLMEM *&msg);
+ bool end_of_job(POOLMEM *&msg);
+ bool term(POOLMEM *&msg);
+ bool truncate_cloud_volume(const char *VolumeName, ilist *trunc_parts, cancel_callback *cancel_cb, POOLMEM *&err);
bool copy_cache_part_to_cloud(transfer *xfer);
bool copy_cloud_part_to_cache(transfer *xfer);
- bool get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, ilist *parts, POOLMEM *&err);
- bool get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err);
+ bool get_cloud_volume_parts_list(const char* VolumeName, ilist *parts, cancel_callback *cancel_cb, POOLMEM *&err);
+ bool get_cloud_volumes_list(alist *volumes, cancel_callback *cancel_cb, POOLMEM *&err);
bool put_object(transfer *xfer, const char *cache_fname, const char *cloud_fname, bwlimit *limit);
bool get_cloud_object(transfer *xfer, const char *cloud_fname, const char *cache_fname);
/*
Bacula(R) - The Network Backup Solution
- Copyright (C) 2000-2018 Kern Sibbald
+ Copyright (C) 2000-2020 Kern Sibbald
The original author of Bacula is Kern Sibbald, with contributions
from many others, a complete list can be found in the file AUTHORS.
*/
class bacula_ctx {
public:
- JCR *jcr;
+ cancel_callback *cancel_cb;
transfer *xfer;
POOLMEM *&errMsg;
ilist *parts;
alist *volumes;
S3Status status;
bwlimit *limit; /* Used to control the bandwidth */
- bacula_ctx(POOLMEM *&err) : jcr(NULL), xfer(NULL), errMsg(err), parts(NULL),
+ bacula_ctx(POOLMEM *&err) : cancel_cb(NULL), xfer(NULL), errMsg(err), parts(NULL),
isTruncated(0), nextMarker(NULL), obj_len(0), caller(NULL),
infile(NULL), outfile(NULL), volumes(NULL), status(S3StatusOK), limit(NULL)
- {}
- bacula_ctx(transfer *t) : jcr(NULL), xfer(t), errMsg(t->m_message), parts(NULL),
+ {
+ /* reset error message (necessary in case of retry) */
+ errMsg[0] = 0;
+ }
+ bacula_ctx(transfer *t) : cancel_cb(NULL), xfer(t), errMsg(t->m_message), parts(NULL),
isTruncated(0), nextMarker(NULL), obj_len(0), caller(NULL),
infile(NULL), outfile(NULL), volumes(NULL), status(S3StatusOK), limit(NULL)
- {}
+ {
+ /* reset error message (necessary in case of retry) */
+ errMsg[0] = 0;
+ }
};
return;
}
-
-
+bool xfer_cancel_cb(void *arg)
+{
+ transfer *xfer = (transfer*)arg;
+ if (xfer) {
+ return xfer->is_canceled();
+ }
+ return false;
+};
static int putObjectCallback(int buf_len, char *buf, void *callbackCtx)
{
ssize_t rbytes = 0;
int read_len;
- if (ctx->xfer->is_cancelled()) {
+ if (ctx->xfer->is_canceled()) {
Mmsg(ctx->errMsg, _("Job cancelled.\n"));
return -1;
}
if (ctx->obj_len) {
read_len = (ctx->obj_len > buf_len) ? buf_len : ctx->obj_len;
rbytes = fread(buf, 1, read_len, ctx->infile);
- Dmsg5(dbglvl, "%s thread=%lu rbytes=%d bufsize=%u remlen=%lu\n",
- ctx->caller, pthread_self(), rbytes, buf_len, ctx->obj_len);
+ Dmsg6(dbglvl, "%s xfer=part.%lu thread=%lu rbytes=%d bufsize=%u remlen=%lu\n",
+ ctx->caller, ctx->xfer->m_part, pthread_self(), rbytes, buf_len, ctx->obj_len);
if (rbytes <= 0) {
berrno be;
Mmsg(ctx->errMsg, "%s Error reading input file: ERR=%s\n",
goto get_out;
}
ctx->obj_len -= rbytes;
-
+ ctx->xfer->increment_processed_size(rbytes);
if (ctx->limit) {
ctx->limit->control_bwlimit(rbytes);
}
/* no error so far -> retrieve uploaded part info */
if (ctx.errMsg[0] == 0) {
ilist parts;
- get_cloud_volume_parts_list(xfer->m_dcr, cloud_fname, &parts, ctx.errMsg);
- for (int i=1; i <= parts.last_index() ; i++) {
- cloud_part *p = (cloud_part *)parts.get(i);
+ if (get_one_cloud_volume_part(cloud_fname, &parts, ctx.errMsg)) {
+ /* only one part is returned */
+ cloud_part *p = (cloud_part *)parts.get(parts.last_index());
if (p) {
- xfer->m_res_size = p->size;
- xfer->m_res_mtime = p->mtime;
- break; /* not need to go further */
+ xfer->m_res_size = p->size;
+ xfer->m_res_mtime = p->mtime;
}
}
+ } else {
+ Dmsg1(dbglvl, "put_object ERROR: %s\n", ctx.errMsg);
}
return ctx.status;
ssize_t wbytes;
Enter(dbglvl);
- if (ctx->xfer->is_cancelled()) {
+ if (ctx->xfer->is_canceled()) {
Mmsg(ctx->errMsg, _("Job cancelled.\n"));
return S3StatusAbortedByCallback;
}
ctx->caller, be.bstrerror());
return S3StatusAbortedByCallback;
}
-
+ ctx->xfer->increment_processed_size(wbytes);
if (ctx->limit) {
ctx->limit->control_bwlimit(wbytes);
}
/*
* Not thread safe
*/
-bool s3_driver::truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *trunc_parts, POOLMEM *&err)
+bool s3_driver::truncate_cloud_volume(const char *VolumeName, ilist *trunc_parts, cancel_callback *cancel_cb, POOLMEM *&err)
{
Enter(dbglvl);
bacula_ctx ctx(err);
- ctx.jcr = dcr->jcr;
int last_index = (int)trunc_parts->last_index();
POOLMEM *cloud_fname = get_pool_memory(PM_FNAME);
if (!trunc_parts->get(i)) {
continue;
}
- if (ctx.jcr->is_canceled()) {
+ if (cancel_cb && cancel_cb->fct && cancel_cb->fct(cancel_cb->arg)) {
Mmsg(err, _("Job cancelled.\n"));
goto get_out;
}
{
Enter(dbglvl);
filename[0] = 0;
- dev->add_vol_and_part(filename, VolumeName, "part", apart);
+ add_vol_and_part(filename, VolumeName, "part", apart);
Dmsg1(dbglvl, "make_cloud_filename: %s\n", filename);
}
uint32_t retry = max_upload_retries;
S3Status status = S3StatusOK;
do {
+ /* when the driver decide to retry, it must reset the processed size */
+ xfer->reset_processed_size();
status = put_object(xfer, xfer->m_cache_fname, cloud_fname);
--retry;
} while (retry_put_object(status) && (retry>0));
* NOTE: See the SD Cloud resource in stored_conf.h
*/
-bool s3_driver::init(JCR *jcr, cloud_dev *adev, DEVRES *adevice)
+bool s3_driver::init(CLOUD *cloud, POOLMEM *&err)
{
S3Status status;
-
- dev = adev; /* copy cloud device pointer */
- device = adevice; /* copy device resource pointer */
- cloud = device->cloud; /* local pointer to cloud definition */
-
+ if (cloud->host_name == NULL) {
+ Mmsg1(err, "Failed to initialize S3 Cloud. ERR=Hostname not set in cloud resource %s\n", cloud->hdr.name);
+ return false;
+ }
+ if (cloud->access_key == NULL) {
+ Mmsg1(err, "Failed to initialize S3 Cloud. ERR=AccessKey not set in cloud resource %s\n", cloud->hdr.name);
+ return false;
+ }
+ if (cloud->secret_key == NULL) {
+ Mmsg1(err, "Failed to initialize S3 Cloud. ERR=SecretKey not set in cloud resource %s\n", cloud->hdr.name);
+ return false;
+ }
/* Setup bucket context for S3 lib */
s3ctx.hostName = cloud->host_name;
s3ctx.bucketName = cloud->bucket_name;
s3ctx.secretAccessKey = cloud->secret_key;
s3ctx.authRegion = cloud->region;
- /* File I/O buffer */
- buf_len = dev->max_block_size;
- if (buf_len == 0) {
- buf_len = DEFAULT_BLOCK_SIZE;
- }
-
if ((status = S3_initialize("s3", S3_INIT_ALL, s3ctx.hostName)) != S3StatusOK) {
- Mmsg1(dev->errmsg, "Failed to initialize S3 lib. ERR=%s\n", S3_get_status_name(status));
- Qmsg1(jcr, M_FATAL, 0, "%s", dev->errmsg);
- Tmsg1(0, "%s", dev->errmsg);
+ Mmsg1(err, "Failed to initialize S3 lib. ERR=%s\n", S3_get_status_name(status));
return false;
}
return true;
}
-bool s3_driver::start_of_job(DCR *dcr)
+bool s3_driver::start_of_job(POOLMEM *&msg)
{
- Jmsg(dcr->jcr, M_INFO, 0, _("Using S3 cloud driver Host=%s Bucket=%s\n"),
- s3ctx.hostName, s3ctx.bucketName);
+ if (msg) {
+ Mmsg(msg, _("Using S3 cloud driver Host=%s Bucket=%s\n"), s3ctx.hostName, s3ctx.bucketName);
+ }
return true;
}
-bool s3_driver::end_of_job(DCR *dcr)
+bool s3_driver::end_of_job(POOLMEM *&msg)
{
return true;
}
-/*
- * Note, dcr may be NULL
- */
-bool s3_driver::term(DCR *dcr)
+bool s3_driver::term(POOLMEM *&msg)
{
S3_deinitialize();
return true;
}
-
-
/*
- * libs3 callback for get_cloud_volume_parts_list()
+ * libs3 callback for get_num_cloud_volume_parts_list()
*/
static S3Status partslistBucketCallback(
int isTruncated,
part->mtime = obj->lastModified;
part->size = obj->size;
ctx->parts->put(part->index, part);
+ Dmsg1(dbglvl, "partslistBucketCallback: part.%d retrieved\n", part->index);
}
}
if (ctx->nextMarker) {
bfree_and_null(ctx->nextMarker);
}
- if (nextMarker) {
- ctx->nextMarker = bstrdup(nextMarker);
+ if (isTruncated && numObj>0) {
+ /* Workaround a bug with nextMarker */
+ const S3ListBucketContent *obj = &(object[numObj-1]);
+ ctx->nextMarker = bstrdup(obj->key);
}
Leave(dbglvl);
- if (ctx->jcr->is_canceled()) {
+ if (ctx->cancel_cb && ctx->cancel_cb->fct && ctx->cancel_cb->fct(ctx->cancel_cb->arg)) {
Mmsg(ctx->errMsg, _("Job cancelled.\n"));
return S3StatusAbortedByCallback;
}
&partslistBucketCallback
};
-bool s3_driver::get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, ilist *parts, POOLMEM *&err)
+bool s3_driver::get_cloud_volume_parts_list(const char* VolumeName, ilist *parts, cancel_callback *cancel_cb, POOLMEM *&err)
{
- JCR *jcr = dcr->jcr;
Enter(dbglvl);
if (!parts || strlen(VolumeName) == 0) {
}
bacula_ctx ctx(err);
- ctx.jcr = jcr;
+ ctx.cancel_cb = cancel_cb;
ctx.parts = parts;
ctx.isTruncated = 1; /* pass into the while loop at least once */
ctx.caller = "S3_list_bucket";
ctx.isTruncated = 0;
S3_list_bucket(&s3ctx, VolumeName, ctx.nextMarker, NULL, 0, NULL, 0,
&partslistBucketHandler, &ctx);
+ Dmsg4(dbglvl, "get_cloud_volume_parts_list isTruncated=%d, nextMarker=%s, nbparts=%d, err=%s\n", ctx.isTruncated, ctx.nextMarker, ctx.parts->size(), ctx.errMsg?ctx.errMsg:"None");
if (ctx.status != S3StatusOK) {
pm_strcpy(err, S3Errors[ctx.status]);
bfree_and_null(ctx.nextMarker);
}
bfree_and_null(ctx.nextMarker);
return true;
+}
+
+bool s3_driver::get_one_cloud_volume_part(const char* part_path_name, ilist *parts, POOLMEM *&err)
+{
+ Enter(dbglvl);
+
+ if (!parts || strlen(part_path_name) == 0) {
+ pm_strcpy(err, "Invalid argument");
+ return false;
+ }
+
+ bacula_ctx ctx(err);
+ ctx.parts = parts;
+ ctx.isTruncated = 0; /* ignore truncation in this case */
+ ctx.caller = "S3_list_bucket";
+ /* S3 documentation claims the parts will be returned in binary order */
+ /* so part.1 < part.11 b.e. This assumed, the first part retrieved is the one we seek */
+ S3_list_bucket(&s3ctx, part_path_name, ctx.nextMarker, NULL, 1, NULL, 0, &partslistBucketHandler, &ctx);
+ Dmsg4(dbglvl, "get_one_cloud_volume_part isTruncated=%d, nextMarker=%s, nbparts=%d, err=%s\n", ctx.isTruncated, ctx.nextMarker, ctx.parts->size(), ctx.errMsg?ctx.errMsg:"None");
+ if (ctx.status != S3StatusOK) {
+ pm_strcpy(err, S3Errors[ctx.status]);
+ bfree_and_null(ctx.nextMarker);
+ return false;
+ }
+ bfree_and_null(ctx.nextMarker);
+ return true;
}
/*
if (ctx->nextMarker) {
bfree_and_null(ctx->nextMarker);
}
- if (nextMarker) {
- ctx->nextMarker = bstrdup(nextMarker);
+ if (isTruncated && numObj>0) {
+ /* Workaround a bug with nextMarker */
+ const S3ListBucketContent *obj = &(object[numObj-1]);
+ ctx->nextMarker = bstrdup(obj->key);
}
Leave(dbglvl);
- if (ctx->jcr->is_canceled()) {
+ if (ctx->cancel_cb && ctx->cancel_cb->fct && ctx->cancel_cb->fct(ctx->cancel_cb->arg)) {
Mmsg(ctx->errMsg, _("Job cancelled.\n"));
return S3StatusAbortedByCallback;
}
&volumeslistBucketCallback
};
-bool s3_driver::get_cloud_volumes_list(DCR *dcr, alist *volumes, POOLMEM *&err)
+bool s3_driver::get_cloud_volumes_list(alist *volumes, cancel_callback *cancel_cb, POOLMEM *&err)
{
- JCR *jcr = dcr->jcr;
Enter(dbglvl);
if (!volumes) {
bacula_ctx ctx(err);
ctx.volumes = volumes;
- ctx.jcr = jcr;
+ ctx.cancel_cb = cancel_cb;
ctx.isTruncated = 1; /* pass into the while loop at least once */
ctx.caller = "S3_list_bucket";
while (ctx.isTruncated!=0) {
return (err[0] == 0);
}
-#ifdef really_needed
-static S3Status listBucketCallback(
- int isTruncated,
- const char *nextMarker,
- int contentsCount,
- const S3ListBucketContent *contents,
- int commonPrefixesCount,
- const char **commonPrefixes,
- void *callbackData);
-
-S3ListBucketHandler listBucketHandler =
-{
- responseHandler,
- &listBucketCallback
-};
-
-
-/*
- * List content of a bucket
- */
-static S3Status listBucketCallback(
- int isTruncated,
- const char *nextMarker,
- int numObj,
- const S3ListBucketContent *contents,
- int commonPrefixesCount,
- const char **commonPrefixes,
- void *callbackData)
-{
- bacula_ctx *ctx = (bacula_ctx *)callbackCtx;
- if (print_hdr) {
- Pmsg1(000, "\n%-22s", " Object Name");
- Pmsg2(000, " %-5s %-20s", "Size", " Last Modified");
- Pmsg0(000, "\n---------------------- ----- --------------------\n");
- print_hdr = false; /* print header once only */
- }
-
- for (int i = 0; i < numObj; i++) {
- char timebuf[256];
- char sizebuf[16];
- const S3ListBucketContent *content = &(contents[i]);
- time_t t = (time_t) content->lastModified;
- strftime(timebuf, sizeof(timebuf), "%Y-%m-%dT%H:%M:%SZ", gmtime(&t));
- sprintf(sizebuf, "%5llu", (unsigned long long) content->size);
- Pmsg3(000, "%-22s %s %s\n", content->key, sizebuf, timebuf);
- }
- Pmsg0(000, "\n");
- if (ctx->jcr->is_canceled()) {
- Mmsg(ctx->errMsg, _("Job cancelled.\n"));
- return S3StatusAbortedByCallback;
- }
- return S3StatusOK;
-}
-#endif
-
#endif /* HAVE_LIBS3 */
/*
Bacula(R) - The Network Backup Solution
- Copyright (C) 2000-2018 Kern Sibbald
+ Copyright (C) 2000-2020 Kern Sibbald
The original author of Bacula is Kern Sibbald, with contributions
from many others, a complete list can be found in the file AUTHORS.
class s3_driver: public cloud_driver {
private:
S3BucketContext s3ctx; /* Main S3 bucket context */
- uint32_t buf_len;
-
public:
cloud_dev *dev; /* device that is calling us */
- DEVRES *device;
- DCR *dcr; /* dcr set during calls to S3_xxx */
- CLOUD *cloud; /* Pointer to CLOUD resource */
s3_driver() {
};
};
void make_cloud_filename(POOLMEM *&filename, const char *VolumeName, uint32_t part);
- bool init(JCR *jcr, cloud_dev *dev, DEVRES *device);
- bool start_of_job(DCR *dcr);
- bool term(DCR *dcr);
- bool end_of_job(DCR *dcr);
- bool truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *trunc_parts, POOLMEM *&err);
+ bool init(CLOUD *cloud, POOLMEM *&err);
+ bool start_of_job(POOLMEM *&err);
+ bool term(POOLMEM *&err);
+ bool end_of_job(POOLMEM *&err);
+ bool truncate_cloud_volume(const char *VolumeName, ilist *trunc_parts, cancel_callback *cancel_cb, POOLMEM *&err);
bool copy_cache_part_to_cloud(transfer *xfer);
bool copy_cloud_part_to_cache(transfer *xfer);
- bool get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, ilist *parts, POOLMEM *&err);
- bool get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err);
+ bool get_cloud_volume_parts_list(const char* VolumeName, ilist *parts, cancel_callback *cancel_cb, POOLMEM *&err);
+ bool get_cloud_volumes_list(alist *volumes, cancel_callback *cancel_cb, POOLMEM *&err);
S3Status put_object(transfer *xfer, const char *cache_fname, const char *cloud_fname);
bool retry_put_object(S3Status status);
bool get_cloud_object(transfer *xfer, const char *cloud_fname, const char *cache_fname);
+
+private:
+ bool get_one_cloud_volume_part(const char* part_path_name, ilist *parts, POOLMEM *&err);
};
#endif /* HAVE_LIBS3 */