From: Norbert Bizet Date: Fri, 5 Jun 2020 06:20:58 +0000 (+0200) Subject: Backport cloud upload code from Enterprise X-Git-Tag: Release-9.6.4~11 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=022d075fc0bf1c8199b943db6e81284d0f6b74c1;p=thirdparty%2Fbacula.git Backport cloud upload code from Enterprise --- diff --git a/bacula/src/stored/cloud_dev.c b/bacula/src/stored/cloud_dev.c index 970f6d4f0..f295f7fb1 100644 --- a/bacula/src/stored/cloud_dev.c +++ b/bacula/src/stored/cloud_dev.c @@ -1,7 +1,7 @@ /* Bacula(R) - The Network Backup Solution - Copyright (C) 2000-2018 Kern Sibbald + Copyright (C) 2000-2020 Kern Sibbald The original author of Bacula is Kern Sibbald, with contributions from many others, a complete list can be found in the file AUTHORS. @@ -219,10 +219,12 @@ void *download_engine(transfer *tpkt) /* * Upload the given part to the cloud */ -bool cloud_dev::upload_part_to_cloud(DCR *dcr, const char *VolumeName, uint32_t upart) +bool cloud_dev::upload_part_to_cloud(DCR *dcr, const char *VolumeName, uint32_t upart, bool do_truncate) { - if (upload_opt == UPLOAD_NO) { - /* lets pretend everything is OK */ + /* for a "regular" backup job, don't proceed with the upload, but pretend everything went OK. */ + /* Otherwise, pass thru and proceed with the upload, even if UPLOAD_NO is set */ + bool internal_job = dcr->jcr->is_internal_job() || dcr->jcr->is_JobType(JT_ADMIN); + if ((upload_opt == UPLOAD_NO) && !internal_job) { return true; } bool ret=false; @@ -274,8 +276,9 @@ bool cloud_dev::upload_part_to_cloud(DCR *dcr, const char *VolumeName, uint32_t dcr->uploads->append(item); /* transfer are queued manually, so the caller has control on when the transfer is scheduled * this should come handy for upload_opt */ - item->set_do_cache_truncate(trunc_opt == TRUNC_AFTER_UPLOAD); - if (upload_opt == UPLOAD_EACHPART) { + item->set_do_cache_truncate(do_truncate); + if ( (upload_opt == UPLOAD_EACHPART) || + ((upload_opt == UPLOAD_NO) && internal_job) ) { /* in each part upload option, queue right away */ item->queue(); } @@ -325,8 +328,8 @@ transfer *cloud_dev::download_part_to_cache(DCR *dcr, const char *VolumeName, ui pm_strcpy(cache_fname, dev_name); /* create a uniq xfer file name with XFER_TMP_NAME and the pid */ char xferbuf[32]; - bsnprintf(xferbuf, sizeof(xferbuf), "%s_%d", XFER_TMP_NAME, (int)getpid()); - add_vol_and_part(cache_fname, VolumeName, xferbuf, dpart); + bsnprintf(xferbuf, sizeof(xferbuf), "%s_%d_%d", XFER_TMP_NAME, (int)getpid(), (int)dcr->jcr->JobId); + cloud_driver::add_vol_and_part(cache_fname, VolumeName, xferbuf, dpart); /* use the cloud proxy to retrieve the transfer size */ uint64_t cloud_size = cloud_prox->get_size(VolumeName, dpart); @@ -495,7 +498,6 @@ bool cloud_dev::get_cache_sizes(DCR *dcr, const char *VolumeName) POOLMEM *fname = get_pool_memory(PM_NAME); uint32_t cpart; bool ok = false; - POOL_MEM dname(PM_FNAME); int status = 0; @@ -599,32 +601,13 @@ get_out: return ok; } - -/* Utility routines */ - -void cloud_dev::add_vol_and_part(POOLMEM *&filename, - const char *VolumeName, const char *name, uint32_t apart) -{ - Enter(dbglvl); - char partnumber[20]; - int len = strlen(filename); - - if (len > 0 && !IsPathSeparator((filename)[len-1])) { - pm_strcat(filename, "/"); - } - - pm_strcat(filename, VolumeName); - bsnprintf(partnumber, sizeof(partnumber), "/%s.%d", name, apart); - pm_strcat(filename, partnumber); -} - void cloud_dev::make_cache_filename(POOLMEM *&filename, const char *VolumeName, uint32_t upart) { Enter(dbglvl); pm_strcpy(filename, dev_name); - add_vol_and_part(filename, VolumeName, "part", upart); + cloud_driver::add_vol_and_part(filename, VolumeName, "part", upart); } void cloud_dev::make_cache_volume_name(POOLMEM *&volname, @@ -655,7 +638,7 @@ cloud_dev::~cloud_dev() cache_sizes = NULL; } if (driver) { - driver->term(NULL); + driver->term(errmsg); delete driver; driver = NULL; } @@ -709,13 +692,16 @@ cloud_dev::cloud_dev(JCR *jcr, DEVRES *device) download_mgr.m_wq.max_workers = device->cloud->max_concurrent_downloads; } + POOL_MEM err; /* device->errmsg is not yet ready */ /* Initialize the driver */ - driver->init(jcr, this, device); + if (!driver->init(device->cloud, err.addr())) { + Qmsg1(jcr, M_FATAL, 0, "Cloud driver initialization error %s\n", err.c_str()); + Tmsg1(0, "Cloud driver initialization error %s\n", err.c_str()); + } } /* the cloud proxy owns its cloud_parts, so we can 'set and forget' them */ cloud_prox = cloud_proxy::get_instance(); - } /* @@ -1156,7 +1142,7 @@ bool cloud_dev::close(DCR *dcr) /* Ensure the last written part is uploaded */ if ((part > 0) && dcr->is_writing()) { - if (!upload_part_to_cloud(dcr, VolHdr.VolumeName, part)) { + if (!upload_part_to_cloud(dcr, VolHdr.VolumeName, part, (trunc_opt == TRUNC_AFTER_UPLOAD))) { if (errmsg[0]) { Qmsg(dcr->jcr, M_ERROR, 0, "%s", errmsg); } @@ -1219,7 +1205,10 @@ bool cloud_dev::probe_cloud_proxy(DCR *dcr,const char *VolName, bool force) jcr_not_killable jkl(dcr->jcr); ilist cloud_parts(100, false); /* !! dont own the parts here */ /* first, retrieve the volume content within cloud_parts list*/ - if (!driver->get_cloud_volume_parts_list(dcr, VolName, &cloud_parts, errmsg)) { + cancel_callback cancel_cb; + cancel_cb.fct = DCR_cancel_cb; + cancel_cb.arg = dcr; + if (!driver->get_cloud_volume_parts_list(VolName, &cloud_parts, &cancel_cb, errmsg)) { Dmsg2(dbglvl, "Cannot get cloud sizes for Volume=%s Err=%s\n", VolName, errmsg); return false; } @@ -1319,8 +1308,8 @@ bool cloud_dev::truncate(DCR *dcr) bool ok = false; POOL_MEM dname(PM_FNAME); int status = 0; - ilist * iuploads = New(ilist(100,true)); /* owns the parts */ - ilist *truncate_list = NULL; + ilist * iuploads=New(ilist(100,true)); /* owns the parts */ + ilist *truncate_list=NULL; FILE *fp; errmsg[0] = 0; Enter(dbglvl); @@ -1435,10 +1424,16 @@ bool cloud_dev::truncate(DCR *dcr) iuploads->put(part->index, part); } /* returns the list of items to truncate : cloud parts-uploads*/ + cancel_callback cancel_cb; + cancel_cb.fct = DCR_cancel_cb; + cancel_cb.arg = dcr; truncate_list = cloud_prox->exclude(getVolCatName(), iuploads); - if (truncate_list && !driver->truncate_cloud_volume(dcr, getVolCatName(), truncate_list, errmsg)) { - Qmsg(dcr->jcr, M_ERROR, 0, "truncate_cloud_volume for %s: ERR=%s\n", getVolCatName(), errmsg); + if (truncate_list && !driver->truncate_cloud_volume(getVolCatName(), truncate_list, &cancel_cb, errmsg)) { + Dmsg1(dbglvl, "%s", errmsg); + Qmsg(dcr->jcr, M_ERROR, 0, "%s", errmsg); goto get_out; + } else { + Dmsg1(dbglvl, "%s", errmsg); } /* force proxy refresh (volume should be empty so it should be fast) */ /* another approach would be to reuse truncate_list to remove items */ @@ -1605,7 +1600,7 @@ bool cloud_dev::open_next_part(DCR *dcr) /* Write part to cloud */ Dmsg2(dbglvl, "=== part=%d num_cache_parts=%d\n", part, num_cache_parts); if (dcr->is_writing()) { - if (!upload_part_to_cloud(dcr, getVolCatName(), part)) { + if (!upload_part_to_cloud(dcr, getVolCatName(), part, (trunc_opt == TRUNC_AFTER_UPLOAD))) { if (errmsg[0]) { Qmsg(dcr->jcr, M_ERROR, 0, "%s", errmsg); } @@ -1900,10 +1895,14 @@ bool cloud_dev::do_size_checks(DCR *dcr, DEV_BLOCK *block) bool cloud_dev::start_of_job(DCR *dcr) { + bool ret=false; if (driver) { - driver->start_of_job(dcr); + ret = driver->start_of_job(errmsg); + } else { + Mmsg(errmsg, "Cloud driver not properly loaded"); /* We should always have a dummy driver */ } - return true; + Jmsg(dcr->jcr, ret? M_INFO : M_FATAL, 0, "%s\n", errmsg); + return ret; } @@ -1950,7 +1949,7 @@ static void update_volume_record(DCR *dcr, transfer *ppkt) } } -bool cloud_dev::end_of_job(DCR *dcr) +bool cloud_dev::end_of_job(DCR *dcr, uint32_t truncate) { Enter(dbglvl); transfer *tpkt; /* current packet */ @@ -2012,10 +2011,10 @@ bool cloud_dev::end_of_job(DCR *dcr) tpkt->append_status(umsg); Jmsg(dcr->jcr, (tpkt->m_state == TRANS_STATE_ERROR) ? M_ERROR : M_INFO, 0, "%s%s", prefix, umsg.c_str()); Dmsg1(dbglvl, "%s", umsg.c_str()); - + bool do_truncate = (truncate==TRUNC_AT_ENDOFJOB) || (truncate==TRUNC_CONF_DEFAULT && trunc_opt==TRUNC_AT_ENDOFJOB); if (tpkt->m_state == TRANS_STATE_ERROR) { Mmsg(dcr->jcr->StatusErrMsg, _("Upload to Cloud failed")); - } else if (trunc_opt == TRUNC_AT_ENDOFJOB && tpkt->m_part!=1) { + } else if (do_truncate && tpkt->m_part!=1) { /* else -> don't remove the cache file if the upload failed */ if (unlink(tpkt->m_cache_fname) != 0) { berrno be; @@ -2056,7 +2055,7 @@ bool cloud_dev::end_of_job(DCR *dcr) dcr->uploads->destroy(); if (driver) { - driver->end_of_job(dcr); + driver->end_of_job(errmsg); } Leave(dbglvl); @@ -2096,12 +2095,28 @@ bool cloud_dev::wait_end_of_transfer(DCR *dcr, transfer *elem) Leave(dbglvl); return (stat == 0); } +/* return the volumes list */ +bool cloud_dev::get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err) +{ + cancel_callback cancel_cb; + cancel_cb.fct = DCR_cancel_cb; + cancel_cb.arg = dcr; + return driver->get_cloud_volumes_list(volumes, &cancel_cb, err); +} + +/* return the list of parts contained in VolumeName */ +bool cloud_dev::get_cloud_volume_parts_list(DCR *dcr, const char *VolumeName, ilist *parts, POOLMEM *&err) +{ + cancel_callback cancel_cb; + cancel_cb.fct = DCR_cancel_cb; + cancel_cb.arg = dcr; + return driver->get_cloud_volume_parts_list(VolumeName, parts, &cancel_cb, err); +} /* TODO: Add .api2 mode for the status message */ /* format a status message of the cloud transfers. Verbose gives details on each transfer */ uint32_t cloud_dev::get_cloud_upload_transfer_status(POOL_MEM& msg, bool verbose) { - upload_mgr.update_statistics(); uint32_t ret = 0; ret = Mmsg(msg,_(" Uploads ")); ret += upload_mgr.append_status(msg, verbose); @@ -2111,7 +2126,6 @@ uint32_t cloud_dev::get_cloud_upload_transfer_status(POOL_MEM& msg, bool verbose /* format a status message of the cloud transfers. Verbose gives details on each transfer */ uint32_t cloud_dev::get_cloud_download_transfer_status(POOL_MEM& msg, bool verbose) { - download_mgr.update_statistics(); uint32_t ret = 0; ret = Mmsg(msg,_(" Downloads ")); ret += download_mgr.append_status(msg, verbose); @@ -2243,7 +2257,7 @@ get_out: /* * Upload cache parts that are not in the cloud */ -bool cloud_dev::upload_cache(DCR *dcr, const char *VolumeName, POOLMEM *&err) +bool cloud_dev::upload_cache(DCR *dcr, const char *VolumeName, uint32_t truncate, POOLMEM *&err) { int i; Enter(dbglvl); @@ -2252,8 +2266,10 @@ bool cloud_dev::upload_cache(DCR *dcr, const char *VolumeName, POOLMEM *&err) ilist cache_parts; POOLMEM *vol_dir = get_pool_memory(PM_NAME); POOLMEM *fname = get_pool_memory(PM_NAME); - - if (!driver->get_cloud_volume_parts_list(dcr, VolumeName, &cloud_parts, err)) { + cancel_callback cancel_cb; + cancel_cb.fct = DCR_cancel_cb; + cancel_cb.arg = dcr; + if (!driver->get_cloud_volume_parts_list(VolumeName, &cloud_parts, &cancel_cb, err)) { Qmsg2(dcr->jcr, M_ERROR, 0, "Error while uploading parts for volume %s. %s\n", VolumeName, err); ret = false; goto bail_out; @@ -2284,7 +2300,8 @@ bool cloud_dev::upload_cache(DCR *dcr, const char *VolumeName, POOLMEM *&err) } Mmsg(fname, "%s/part.%d", vol_dir, i); Dmsg1(dbglvl, "Do upload of %s\n", fname); - if (!upload_part_to_cloud(dcr, VolumeName, i)) { + bool do_truncate = (truncate==TRUNC_AFTER_UPLOAD) || (truncate==TRUNC_CONF_DEFAULT && (trunc_opt == TRUNC_AFTER_UPLOAD)); + if (!upload_part_to_cloud(dcr, VolumeName, i, do_truncate)) { if (errmsg[0]) { Qmsg(dcr->jcr, M_ERROR, 0, "%s", errmsg); } diff --git a/bacula/src/stored/cloud_dev.h b/bacula/src/stored/cloud_dev.h index 9739cacfd..1f6491892 100644 --- a/bacula/src/stored/cloud_dev.h +++ b/bacula/src/stored/cloud_dev.h @@ -1,7 +1,7 @@ /* Bacula(R) - The Network Backup Solution - Copyright (C) 2000-2018 Kern Sibbald + Copyright (C) 2000-2020 Kern Sibbald The original author of Bacula is Kern Sibbald, with contributions from many others, a complete list can be found in the file AUTHORS. @@ -36,6 +36,12 @@ #include "cloud_transfer_mgr.h" #include "cloud_parts.h" +/* enum loadable drivers */ +enum { + C_S3_DRIVER = 1, + C_FILE_DRIVER = 2 +}; + class cloud_dev: public file_dev { public: int64_t obj_len; @@ -49,6 +55,8 @@ public: uint32_t trunc_opt; uint32_t upload_opt; + uint32_t current_driver_type; + cloud_driver *driver; static transfer_manager download_mgr; @@ -56,12 +64,9 @@ public: cloud_proxy *cloud_prox; - void add_vol_and_part(POOLMEM *&filename, const char *VolumeName, const char *name, uint32_t part); - private: - char *cache_directory; bool download_parts_to_read(DCR *dcr, alist* parts); - bool upload_part_to_cloud(DCR *dcr, const char *VolumeName, uint32_t part); + bool upload_part_to_cloud(DCR *dcr, const char *VolumeName, uint32_t part, bool do_truncate); transfer *download_part_to_cache(DCR *dcr, const char *VolumeName, uint32_t part); void make_cache_filename(POOLMEM *&filename, const char *VolumeName, uint32_t part); void make_cache_volume_name(POOLMEM *&full_volname, const char *VolumeName); @@ -86,7 +91,7 @@ public: bool open_next_part(DCR *dcr); bool truncate(DCR *dcr); int truncate_cache(DCR *dcr, const char *VolName, int64_t *size); - bool upload_cache(DCR *dcr, const char *VolName, POOLMEM *&err); + bool upload_cache(DCR *dcr, const char *VolName, uint32_t truncate, POOLMEM *&err); bool close(DCR *dcr); bool update_pos(DCR *dcr); bool is_eod_valid(DCR *dcr); @@ -106,10 +111,9 @@ public: bool relabel, bool no_prelabel); bool rewrite_volume_label(DCR *dcr, bool recycle); bool start_of_job(DCR *dcr); - bool end_of_job(DCR *dcr); - bool get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err) - { return !driver?false:driver->get_cloud_volumes_list(dcr, volumes, err); }; - bool get_cloud_volume_parts_list(DCR *dcr, const char *VolumeName, ilist *parts, POOLMEM *&err) { return driver->get_cloud_volume_parts_list(dcr, VolumeName, parts, err);}; + bool end_of_job(DCR *dcr, uint32_t truncate); + bool get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err); + bool get_cloud_volume_parts_list(DCR *dcr, const char *VolumeName, ilist *parts, POOLMEM *&err); uint32_t get_cloud_upload_transfer_status(POOL_MEM &msg, bool verbose); uint32_t get_cloud_download_transfer_status(POOL_MEM &msg, bool verbose); }; diff --git a/bacula/src/stored/cloud_driver.h b/bacula/src/stored/cloud_driver.h index dafd880fa..26e7bc5cc 100644 --- a/bacula/src/stored/cloud_driver.h +++ b/bacula/src/stored/cloud_driver.h @@ -1,25 +1,24 @@ /* - Bacula(R) - The Network Backup Solution + Bacula® - The Network Backup Solution - Copyright (C) 2000-2018 Kern Sibbald + Copyright (C) 2000-2016 Bacula Systems SA + All rights reserved. - The original author of Bacula is Kern Sibbald, with contributions - from many others, a complete list can be found in the file AUTHORS. + The main author of Bacula is Kern Sibbald, with contributions from many + others, a complete list can be found in the file AUTHORS. - You may use this file and others of this release according to the - license defined in the LICENSE file, which includes the Affero General - Public License, v3.0 ("AGPLv3") and some additional permissions and - terms pursuant to its AGPLv3 Section 7. + Licensees holding a valid Bacula Systems SA license may use this file + and others of this release in accordance with the proprietary license + agreement provided in the LICENSE file. Redistribution of any part of + this release is not permitted. - This notice must be preserved when any source code is - conveyed and/or propagated. - - Bacula(R) is a registered trademark of Kern Sibbald. + Bacula® is a registered trademark of Kern Sibbald. */ /* * Routines for writing Cloud drivers * * Written by Kern Sibbald, May MMXVI + * */ #include "bacula.h" @@ -34,10 +33,12 @@ #define NUM_UPLOAD_RETRIES 2 class cloud_dev; -enum { - C_S3_DRIVER = 1, - C_FILE_DRIVER = 2 -}; +/* define the cancel callback type */ +typedef bool (cancel_cb_type)(void*); +typedef struct { + cancel_cb_type* fct; + void *arg; +} cancel_callback; /* Abstract class cannot be instantiated */ class cloud_driver: public SMARTALLOC { @@ -47,14 +48,26 @@ public: virtual bool copy_cache_part_to_cloud(transfer *xfer) = 0; virtual bool copy_cloud_part_to_cache(transfer *xfer) = 0; - virtual bool truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *trunc_parts, POOLMEM *&err) = 0; - virtual bool init(JCR *jcr, cloud_dev *dev, DEVRES *device) = 0; - virtual bool term(DCR *dcr) = 0; - virtual bool start_of_job(DCR *dcr) = 0; - virtual bool end_of_job(DCR *dcr) = 0; + virtual bool truncate_cloud_volume(const char *VolumeName, ilist *trunc_parts, cancel_callback *cancel_cb, POOLMEM *&err) = 0; + virtual bool init(CLOUD *cloud, POOLMEM *&err) = 0; + virtual bool term(POOLMEM *&err) = 0; + virtual bool start_of_job(POOLMEM *&err) = 0; + virtual bool end_of_job(POOLMEM *&err) = 0; + virtual bool get_cloud_volume_parts_list(const char* VolumeName, ilist *parts, cancel_callback *cancel_cb, POOLMEM *&err) = 0; + virtual bool get_cloud_volumes_list(alist *volumes, cancel_callback *cancel_cb, POOLMEM *&err) = 0; + static void add_vol_and_part(POOLMEM *&filename, const char *VolumeName, const char *name, uint32_t apart) + { + char partnumber[20]; + int len = strlen(filename); + + if (len > 0 && !IsPathSeparator((filename)[len-1])) { + pm_strcat(filename, "/"); + } - virtual bool get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, ilist *parts, POOLMEM *&err) = 0; - virtual bool get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err) = 0; /* TODO: Adapt the prototype to have a handler instead */ + pm_strcat(filename, VolumeName); + bsnprintf(partnumber, sizeof(partnumber), "/%s.%d", name, apart); + pm_strcat(filename, partnumber); + } bwlimit upload_limit; bwlimit download_limit; diff --git a/bacula/src/stored/cloud_parts.c b/bacula/src/stored/cloud_parts.c index a78032a36..eaa218374 100644 --- a/bacula/src/stored/cloud_parts.c +++ b/bacula/src/stored/cloud_parts.c @@ -1,25 +1,24 @@ /* - Bacula(R) - The Network Backup Solution + Bacula® - The Network Backup Solution - Copyright (C) 2000-2018 Kern Sibbald + Copyright (C) 2000-2016 Bacula Systems SA + All rights reserved. - The original author of Bacula is Kern Sibbald, with contributions - from many others, a complete list can be found in the file AUTHORS. + The main author of Bacula is Kern Sibbald, with contributions from many + others, a complete list can be found in the file AUTHORS. - You may use this file and others of this release according to the - license defined in the LICENSE file, which includes the Affero General - Public License, v3.0 ("AGPLv3") and some additional permissions and - terms pursuant to its AGPLv3 Section 7. + Licensees holding a valid Bacula Systems SA license may use this file + and others of this release in accordance with the proprietary license + agreement provided in the LICENSE file. Redistribution of any part of + this release is not permitted. - This notice must be preserved when any source code is - conveyed and/or propagated. - - Bacula(R) is a registered trademark of Kern Sibbald. + Bacula® is a registered trademark of Kern Sibbald. */ /* * Routines for writing Cloud drivers * * Written by Kern Sibbald, May MMXVI + * */ #include "cloud_parts.h" @@ -604,4 +603,4 @@ int main (int argc, char *argv[]) } -#endif /* TEST_PROGRAM */ +#endif /* TEST_PROGRAM */ \ No newline at end of file diff --git a/bacula/src/stored/cloud_parts.h b/bacula/src/stored/cloud_parts.h index 4755b98ef..1494897d5 100644 --- a/bacula/src/stored/cloud_parts.h +++ b/bacula/src/stored/cloud_parts.h @@ -1,25 +1,24 @@ /* - Bacula(R) - The Network Backup Solution + Bacula® - The Network Backup Solution - Copyright (C) 2000-2018 Kern Sibbald + Copyright (C) 2000-2016 Bacula Systems SA + All rights reserved. - The original author of Bacula is Kern Sibbald, with contributions - from many others, a complete list can be found in the file AUTHORS. + The main author of Bacula is Kern Sibbald, with contributions from many + others, a complete list can be found in the file AUTHORS. - You may use this file and others of this release according to the - license defined in the LICENSE file, which includes the Affero General - Public License, v3.0 ("AGPLv3") and some additional permissions and - terms pursuant to its AGPLv3 Section 7. + Licensees holding a valid Bacula Systems SA license may use this file + and others of this release in accordance with the proprietary license + agreement provided in the LICENSE file. Redistribution of any part of + this release is not permitted. - This notice must be preserved when any source code is - conveyed and/or propagated. - - Bacula(R) is a registered trademark of Kern Sibbald. + Bacula® is a registered trademark of Kern Sibbald. */ /* * Routines for managing Volumes contains and comparing parts * * Written by Norbert Bizet, May MMXVI + * */ #ifndef _CLOUD_PARTS_H_ #define _CLOUD_PARTS_H_ @@ -76,10 +75,10 @@ class cloud_proxy : public SMARTALLOC private: htable *m_hash; /* the root htable */ bool m_owns; /* determines if ilist own the cloud_parts */ - pthread_mutex_t m_mutex; /* protect access*/ + pthread_mutex_t m_mutex; /* protect access*/ static cloud_proxy *m_pinstance; /* singleton instance */ static uint64_t m_count; /* static refcount */ - + ~cloud_proxy(); public: @@ -101,7 +100,7 @@ public: /* Check if the volume entry exists and return true if it's the case */ bool volume_lookup(const char *volume); - + /* reset the volume list content with the content of part_list */ bool reset(const char *volume, ilist *part_list); diff --git a/bacula/src/stored/cloud_transfer_mgr.c b/bacula/src/stored/cloud_transfer_mgr.c index e3786005d..6548649f7 100644 --- a/bacula/src/stored/cloud_transfer_mgr.c +++ b/bacula/src/stored/cloud_transfer_mgr.c @@ -1,7 +1,7 @@ /* Bacula(R) - The Network Backup Solution - Copyright (C) 2000-2018 Kern Sibbald + Copyright (C) 2000-2020 Kern Sibbald The original author of Bacula is Kern Sibbald, with contributions from many others, a complete list can be found in the file AUTHORS. @@ -27,6 +27,10 @@ #include "cloud_transfer_mgr.h" #include "stored.h" +#define ONE_SEC 1000000LL /* number of microseconds in a second */ + +static const int dbglvl = 450; + /* constructor * size : the size in bytes of the transfer * funct : function to process @@ -46,9 +50,11 @@ transfer::transfer(uint64_t size, DCR *dcr, cloud_proxy *proxy) : m_stat_size(size), + m_stat_processed_size(0), m_stat_start(0), m_stat_duration(0), m_stat_eta(0), + m_stat_average_rate(0), m_message(NULL), m_state(TRANS_STATE_CREATED), m_mgr(NULL), @@ -84,7 +90,7 @@ transfer::~transfer() free(m_cache_fname); if (m_use_count > 0) { ASSERT(FALSE); - Dmsg1(0, "!!!m_use_count = %d\n", m_use_count); + Dmsg1(dbglvl, "!!!m_use_count = %d\n", m_use_count); } } @@ -98,6 +104,31 @@ bool transfer::queue() return true; } +/* reset processed size */ +void transfer::reset_processed_size() +{ + lock_guard lg(m_stat_mutex); + m_stat_processed_size = 0; + ASSERTD(m_stat_processed_size==0, "invalid locking of processed size"); +} + +/* set absolute value process size */ +void transfer::set_processed_size(uint64_t size) +{ + lock_guard lg(m_stat_mutex); + m_stat_processed_size = size; + m_stat_duration = get_current_btime()-m_stat_start; + if (m_stat_duration > 0) { + m_stat_average_rate = (m_stat_processed_size*ONE_SEC)/m_stat_duration; + } + ASSERTD(m_stat_processed_size <= m_stat_size, "increment_processed_size increment too big"); + +} +/* add increment to the current processed size */ +void transfer::increment_processed_size(uint64_t increment) +{ + set_processed_size(m_stat_processed_size+increment); +} /* opaque function that processes m_funct with m_arg as parameter * depending on m_funct return value, changes state to TRANS_STATE_DONE @@ -166,7 +197,7 @@ bool transfer::cancel() } /* checking the cancel status : doesnt request locking */ -bool transfer::is_cancelled() const +bool transfer::is_canceled() const { return m_cancel; } @@ -178,21 +209,22 @@ uint32_t transfer::append_status(POOL_MEM& msg) uint32_t ret=0; static const char *state[] = {"created", "queued", "process", "done", "error"}; + lock_guard lg(m_stat_mutex); if (m_state > TRANS_STATE_PROCESSED) { ret = Mmsg(tmp_msg,_("%s/part.%-5d state=%-7s size=%sB duration=%ds%s%s\n"), m_volume_name, m_part, state[m_state], edit_uint64_with_suffix(m_stat_size, ec), - m_stat_duration, + m_stat_duration/ONE_SEC, (strlen(m_message) != 0)?" msg=":"", (strlen(m_message) != 0)?m_message:""); pm_strcat(msg, tmp_msg); } else { - ret = Mmsg(tmp_msg,_("%s/part.%-5d, state=%-7s size=%sB eta=%dss%s%s\n"), + ret = Mmsg(tmp_msg,_("%s/part.%-5d state=%-7s size=%sB eta=%ds%s%s\n"), m_volume_name, m_part, state[m_state], edit_uint64_with_suffix(m_stat_size, ec), - m_stat_eta, + m_stat_eta/ONE_SEC, (strlen(m_message) != 0)?" msg=":"", (strlen(m_message) != 0)?m_message:""); pm_strcat(msg, tmp_msg); @@ -201,6 +233,34 @@ uint32_t transfer::append_status(POOL_MEM& msg) return ret; } +void transfer::append_api_status(OutputWriter &ow) +{ + static const char *state[] = {"created", "queued", "process", "done", "error"}; + + lock_guard lg(m_stat_mutex); + if (m_state > TRANS_STATE_PROCESSED) { + ow.get_output(OT_START_OBJ, + OT_STRING,"volume_name", m_volume_name, + OT_INT32, "part", m_part, + OT_INT32, "jobid", m_dcr->jcr->JobId, + OT_STRING,"state", state[m_state], + OT_INT64, "size", m_stat_size, + OT_DURATION, "duration", m_stat_duration/ONE_SEC, + OT_STRING,"message", m_message, + OT_END); + } else { + ow.get_output(OT_START_OBJ, + OT_STRING,"volume_name", m_volume_name, + OT_INT32, "part", m_part, + OT_INT32, "jobid", m_dcr->jcr->JobId, + OT_STRING,"state", state[m_state], + OT_INT64, "size", m_stat_size, + OT_INT64, "processed_size", m_stat_processed_size, + OT_DURATION, "eta", m_stat_eta/ONE_SEC, + OT_STRING,"message", m_message, + OT_END); + } +} /* the manager references itself through this function */ void transfer::set_manager(transfer_manager *mgr) @@ -235,7 +295,6 @@ bool transfer::transition(transfer_state state) V(m_mgr->m_stat_mutex); P(m_mgr->m_mutex); - ++m_use_count; m_mgr->add_work(this); V(m_mgr->m_mutex); } @@ -259,7 +318,6 @@ bool transfer::transition(transfer_state state) P(m_mgr->m_mutex); m_mgr->remove_work(m_workq_elem); - --m_use_count; V(m_mgr->m_mutex); } } @@ -283,7 +341,7 @@ bool transfer::transition(transfer_state state) /*transfer starts now*/ P(m_stat_mutex); - m_stat_start = (utime_t)time(NULL); + m_stat_start = get_current_btime(); V(m_stat_mutex); } } @@ -296,7 +354,12 @@ bool transfer::transition(transfer_state state) ret = true; /*transfer stops now : compute transfer duration*/ P(m_stat_mutex); - m_stat_duration = (utime_t)time(NULL)-m_stat_start; + m_stat_duration = get_current_btime()-m_stat_start; + if (m_stat_duration > 0) { + m_stat_processed_size = m_stat_size; + ASSERTD(m_stat_size == m_stat_processed_size, "xfer done before processed size is equal to size."); + m_stat_average_rate = (m_stat_size*ONE_SEC)/m_stat_duration; + } V(m_stat_mutex); if (m_mgr) { @@ -309,17 +372,8 @@ bool transfer::transition(transfer_state state) m_mgr->m_stat_size_done += m_stat_size; /*add local duration to manager duration */ m_mgr->m_stat_duration_done += m_stat_duration; - /*reprocess the manager average rate with it*/ - if (m_mgr->m_stat_duration_done != 0) { - m_mgr->m_stat_average_rate = - m_mgr->m_stat_size_done / - m_mgr->m_stat_duration_done; - } /*unlock manager statistics */ V(m_mgr->m_stat_mutex); - - /* process is completed, unref the workq reference */ - --m_use_count; } if (m_proxy) { @@ -335,7 +389,7 @@ bool transfer::transition(transfer_state state) ret = true; /*transfer stops now, even if in error*/ P(m_stat_mutex); - m_stat_duration = (utime_t)time(NULL)-m_stat_start; + m_stat_duration = get_current_btime()-m_stat_start; V(m_stat_mutex); if (m_mgr) { @@ -348,9 +402,6 @@ bool transfer::transition(transfer_state state) m_mgr->m_stat_size_error += m_stat_size; /*unlock manager statistics */ V(m_mgr->m_stat_mutex); - - /* process is completed, unref the workq reference */ - --m_use_count; } /* in both case, success or failure, life keeps going on */ pthread_cond_broadcast(&m_done); @@ -569,44 +620,44 @@ bool transfer_manager::find(const char *VolName, uint32_t index) void transfer_manager::update_statistics() { /* lock the manager stats */ - P(m_stat_mutex); - - /* ETA naive calculation for each element in the queue = - * (accumulator(previous elements size) / average_rate) / num_workers; - */ - uint64_t accumulator=0; + lock_guard lg_stat(m_stat_mutex); /* lock the queue so order and chaining cannot be modified */ - P(m_mutex); - P(m_wq.mutex); - m_stat_nb_workers = m_wq.max_workers; + lock_guard lg(m_mutex); - /* parse the queued and processed transfers */ + /* recompute global average rate */ transfer *t; + uint64_t accumulated_done_average_rate = 0; + uint32_t nb_done_accumulated = 0; foreach_dlist(t, &m_transfer_list) { - if ( (t->m_state == TRANS_STATE_QUEUED) || - (t->m_state == TRANS_STATE_PROCESSED)) { - accumulator+=t->m_stat_size; - P(t->m_stat_mutex); - if ((m_stat_average_rate != 0) && (m_stat_nb_workers != 0)) { - /*update eta for each transfer*/ - t->m_stat_eta = (accumulator / m_stat_average_rate) / m_stat_nb_workers; - } - V(t->m_stat_mutex); + lock_guard lg_xferstatmutex(t->m_stat_mutex); + if (t->m_stat_average_rate>0) { + accumulated_done_average_rate += t->m_stat_average_rate; + t->m_stat_average_rate = 0; + ++nb_done_accumulated; } } + if (nb_done_accumulated) { + m_stat_average_rate = accumulated_done_average_rate / nb_done_accumulated; + } - /* the manager ETA is the ETA of the last transfer in its workq */ - if (m_wq.last) { - transfer *t = (transfer *)m_wq.last->data; - if (t) { - m_stat_eta = t->m_stat_eta; + /* ETA naive calculation for each element in the queue */ + if (m_stat_average_rate != 0) { + uint64_t accumulator=0; + foreach_dlist(t, &m_transfer_list) { + if (t->m_state == TRANS_STATE_QUEUED) { + lock_guard lg_xferstatmutex(t->m_stat_mutex); + accumulator+= t->m_stat_size-t->m_stat_processed_size; + t->m_stat_eta = (accumulator / m_stat_average_rate) * ONE_SEC; + } + if (t->m_state == TRANS_STATE_PROCESSED) { + lock_guard lg_xferstatmutex(t->m_stat_mutex); + t->m_stat_eta = ((t->m_stat_size-t->m_stat_processed_size) / m_stat_average_rate) * ONE_SEC; + } } + /* the manager ETA is the ETA of the last transfer in its workq */ + m_stat_eta = (accumulator / m_stat_average_rate) * ONE_SEC; } - - V(m_wq.mutex); - V(m_mutex); - V(m_stat_mutex); } /* short status of the transfers */ @@ -615,9 +666,10 @@ uint32_t transfer_manager::append_status(POOL_MEM& msg, bool verbose) update_statistics(); char ec0[30],ec1[30],ec2[30],ec3[30],ec4[30]; POOLMEM *tmp_msg = get_pool_memory(PM_MESSAGE); + lock_guard lg_stat(m_stat_mutex); uint32_t ret = Mmsg(tmp_msg, _("(%sB/s) (ETA %d s) " "Queued=%d %sB, Processed=%d %sB, Done=%d %sB, Failed=%d %sB\n"), - edit_uint64_with_suffix(m_stat_average_rate, ec0), m_stat_eta, + edit_uint64_with_suffix(m_stat_average_rate, ec0), m_stat_eta/ONE_SEC, m_stat_nb_transfer_queued, edit_uint64_with_suffix(m_stat_size_queued, ec1), m_stat_nb_transfer_processed, edit_uint64_with_suffix(m_stat_size_processed, ec2), m_stat_nb_transfer_done, edit_uint64_with_suffix(m_stat_size_done, ec3), @@ -625,7 +677,7 @@ uint32_t transfer_manager::append_status(POOL_MEM& msg, bool verbose) pm_strcat(msg, tmp_msg); if (verbose) { - P(m_mutex); + lock_guard lg(m_mutex); if (!m_transfer_list.empty()) { ret += Mmsg(tmp_msg, _("------------------------------------------------------------ details ------------------------------------------------------------\n")); pm_strcat(msg, tmp_msg); @@ -634,8 +686,36 @@ uint32_t transfer_manager::append_status(POOL_MEM& msg, bool verbose) foreach_dlist(tpkt, &m_transfer_list) { ret += tpkt->append_status(msg); } - V(m_mutex); } free_pool_memory(tmp_msg); return ret; } + +void transfer_manager::append_api_status(OutputWriter &ow, bool verbose) +{ + update_statistics(); + + lock_guard lg_stat(m_stat_mutex); + ow.get_output(OT_START_OBJ, + OT_INT64, "average_rate", m_stat_average_rate, + OT_DURATION, "eta", m_stat_eta/ONE_SEC, + OT_INT64, "nb_transfer_queued", m_stat_nb_transfer_queued, + OT_INT64, "size_queued", m_stat_size_queued, + OT_INT64, "nb_transfer_processed", m_stat_nb_transfer_processed, + OT_INT64, "size_processed", m_stat_size_processed, + OT_INT64, "nb_transfer_done", m_stat_nb_transfer_done, + OT_INT64, "size_done", m_stat_size_done, + OT_INT64, "nb_transfer_error", m_stat_nb_transfer_error, + OT_INT64, "size_error", m_stat_size_error, + OT_INT, "transfers_list_size", m_transfer_list.size(), + OT_END); + if (verbose) { + lock_guard lg(m_mutex); + ow.start_list("transfers"); + transfer *tpkt; + foreach_dlist(tpkt, &m_transfer_list) { + tpkt->append_api_status(ow); + } + ow.end_list(); + } +} diff --git a/bacula/src/stored/cloud_transfer_mgr.h b/bacula/src/stored/cloud_transfer_mgr.h index e229c1661..cd30d0cab 100644 --- a/bacula/src/stored/cloud_transfer_mgr.h +++ b/bacula/src/stored/cloud_transfer_mgr.h @@ -1,7 +1,7 @@ /* Bacula(R) - The Network Backup Solution - Copyright (C) 2000-2018 Kern Sibbald + Copyright (C) 2000-2020 Kern Sibbald The original author of Bacula is Kern Sibbald, with contributions from many others, a complete list can be found in the file AUTHORS. @@ -72,12 +72,16 @@ public: pthread_mutex_t m_stat_mutex; /* size of the transfer: should be filled asap */ uint64_t m_stat_size; + /* size processed so far : filled by the processor (driver) */ + uint64_t m_stat_processed_size; /* time when process started */ - utime_t m_stat_start; + btime_t m_stat_start; /* duration of the transfer : automatically filled when transfer is completed*/ - utime_t m_stat_duration; + btime_t m_stat_duration; /* estimate time to arrival : predictive guest approximation of transfer time*/ - utime_t m_stat_eta; + btime_t m_stat_eta; + /* computed bytes/sec transfer rate */ + uint64_t m_stat_average_rate; /* status variables :*/ /* protect status changes*/ @@ -159,13 +163,23 @@ public: bool cancel(); /* callback fct that checks if transfer has been cancelled */ - bool is_cancelled() const; + bool is_canceled() const; /* append a status message into msg*/ uint32_t append_status(POOL_MEM& msgs); + void append_api_status(OutputWriter &ow); void set_do_cache_truncate(bool do_cache_truncate); + /* reset processed size */ + void reset_processed_size(); + + /* set processed size absolute value */ + void set_processed_size(uint64_t size); + + /* add increment to the current processed size */ + void increment_processed_size(uint64_t increment); + protected: friend class transfer_manager; @@ -212,11 +226,11 @@ public: /* size in bytes of transfers in TRANS_STATE_ERROR state in this manager*/ uint64_t m_stat_size_error; /* duration of transfers in TRANS_STATE_DONE state in this manager*/ - utime_t m_stat_duration_done; + btime_t m_stat_duration_done; /* computed bytes/sec transfer rate */ uint64_t m_stat_average_rate; /* computed Estimate Time to Arrival */ - utime_t m_stat_eta; + btime_t m_stat_eta; /* status variables global for this manager: */ @@ -283,6 +297,7 @@ public: /* append a status message into msg*/ uint32_t append_status(POOL_MEM& msg, bool verbose); + void append_api_status(OutputWriter &ow, bool verbose); protected: friend class transfer; diff --git a/bacula/src/stored/file_driver.c b/bacula/src/stored/file_driver.c index b209ca836..f517e1bc6 100644 --- a/bacula/src/stored/file_driver.c +++ b/bacula/src/stored/file_driver.c @@ -1,7 +1,7 @@ /* Bacula(R) - The Network Backup Solution - Copyright (C) 2000-2018 Kern Sibbald + Copyright (C) 2000-2020 Kern Sibbald The original author of Bacula is Kern Sibbald, with contributions from many others, a complete list can be found in the file AUTHORS. @@ -125,7 +125,7 @@ bool file_driver::put_object(transfer *xfer, const char *in_fname, const char *o } while (obj_len > 0) { - if (xfer->is_cancelled()) { + if (xfer->is_canceled()) { Mmsg(xfer->m_message, "Job is canceled.\n"); goto get_out; } @@ -147,6 +147,7 @@ bool file_driver::put_object(transfer *xfer, const char *in_fname, const char *o out_fname, be.bstrerror()); } obj_len -= rbytes; + xfer->increment_processed_size(rbytes); if (limit->use_bwlimit()) { limit->control_bwlimit(rbytes); } @@ -177,7 +178,7 @@ bool file_driver::get_cloud_object(transfer *xfer, const char *cloud_fname, cons return put_object(xfer, cloud_fname, cache_fname, &download_limit); } -bool file_driver::truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *trunc_parts, POOLMEM *&err) +bool file_driver::truncate_cloud_volume(const char *VolumeName, ilist *trunc_parts, cancel_callback *cancel_cb, POOLMEM *&err) { bool rtn = true; int i; @@ -189,12 +190,10 @@ bool file_driver::truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist make_cloud_filename(filename, VolumeName, i); if (unlink(filename) != 0 && errno != ENOENT) { berrno be; - Mmsg2(err, "Unable to delete %s. ERR=%s\n", filename, be.bstrerror()); - Dmsg1(dbglvl, "%s", err); - Qmsg(dcr->jcr, M_INFO, 0, "%s", err); + Mmsg3(err, "truncate_cloud_volume for %s: Unable to delete %s. ERR=%s\n", VolumeName, filename, be.bstrerror()); rtn = false; } else { - Dmsg1(dbglvl, "Unlink file %s\n", filename); + Mmsg2(err, "truncate_cloud_volume for %s: Unlink file %s.\n", VolumeName, filename); } } @@ -208,7 +207,7 @@ void file_driver::make_cloud_filename(POOLMEM *&filename, Enter(dbglvl); pm_strcpy(filename, hostName); - dev->add_vol_and_part(filename, VolumeName, "part", part); + cloud_driver::add_vol_and_part(filename, VolumeName, "part", part); Dmsg1(dbglvl, "make_cloud_filename: %s\n", filename); } @@ -254,17 +253,14 @@ bool file_driver::copy_cloud_part_to_cache(transfer *xfer) uint32_t upload_opt; */ -bool file_driver::init(JCR *jcr, cloud_dev *adev, DEVRES *adevice) +bool file_driver::init(CLOUD *cloud, POOLMEM *&err) { - dev = adev; /* copy cloud device pointer */ - device = adevice; /* copy device resource pointer */ - cloud = device->cloud; /* local pointer to cloud definition */ - - /* File I/O buffer */ - buf_len = dev->max_block_size; - if (buf_len == 0) { - buf_len = DEFAULT_BLOCK_SIZE; + if (cloud->host_name == NULL) { + Mmsg1(err, "Failed to initialize File Cloud. ERR=Hostname not set in cloud resource %s\n", cloud->hdr.name); + return false; } + /* File I/O buffer */ + buf_len = DEFAULT_BLOCK_SIZE; hostName = cloud->host_name; bucketName = cloud->bucket_name; @@ -276,27 +272,23 @@ bool file_driver::init(JCR *jcr, cloud_dev *adev, DEVRES *adevice) return true; } -bool file_driver::start_of_job(DCR *dcr) +bool file_driver::start_of_job(POOLMEM *&msg) { - Jmsg(dcr->jcr, M_INFO, 0, _("Using File cloud driver Host=%s Bucket=%s\n"), - hostName, bucketName); + Mmsg(msg, _("Using File cloud driver Host=%s Bucket=%s\n"), hostName, bucketName); return true; } -bool file_driver::end_of_job(DCR *dcr) +bool file_driver::end_of_job(POOLMEM *&msg) { return true; } -/* - * Note, dcr may be NULL - */ -bool file_driver::term(DCR *dcr) +bool file_driver::term(POOLMEM *&msg) { return true; } -bool file_driver::get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, ilist *parts, POOLMEM *&err) +bool file_driver::get_cloud_volume_parts_list(const char* VolumeName, ilist *parts, cancel_callback *cancel_cb, POOLMEM *&err) { Enter(dbglvl); @@ -343,7 +335,7 @@ bool file_driver::get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, entry = (struct dirent *)malloc(sizeof(struct dirent) + name_max + 1000); for ( ;; ) { - if (dcr->jcr->is_canceled()) { + if (cancel_cb && cancel_cb->fct && cancel_cb->fct(cancel_cb->arg)) { pm_strcpy(err, "Job canceled"); goto get_out; } @@ -413,7 +405,7 @@ get_out: return ok; } -bool file_driver::get_cloud_volumes_list(DCR *dcr, alist *volumes, POOLMEM *&err) +bool file_driver::get_cloud_volumes_list(alist *volumes, cancel_callback *cancel_cb, POOLMEM *&err) { if (!volumes) { pm_strcpy(err, "Invalid argument"); @@ -450,7 +442,7 @@ bool file_driver::get_cloud_volumes_list(DCR *dcr, alist *volumes, POOLMEM *&err entry = (struct dirent *)malloc(sizeof(struct dirent) + name_max + 1000); for ( ;; ) { - if (dcr->jcr->is_canceled()) { + if (cancel_cb && cancel_cb->fct && cancel_cb->fct(cancel_cb->arg)) { goto get_out; } errno = 0; diff --git a/bacula/src/stored/file_driver.h b/bacula/src/stored/file_driver.h index 1d31d92a1..b834dd1fb 100644 --- a/bacula/src/stored/file_driver.h +++ b/bacula/src/stored/file_driver.h @@ -1,7 +1,7 @@ /* Bacula(R) - The Network Backup Solution - Copyright (C) 2000-2018 Kern Sibbald + Copyright (C) 2000-2020 Kern Sibbald The original author of Bacula is Kern Sibbald, with contributions from many others, a complete list can be found in the file AUTHORS. @@ -49,15 +49,15 @@ public: private: void make_cloud_filename(POOLMEM *&filename, const char *VolumeName, uint32_t part); - bool init(JCR *jcr, cloud_dev *dev, DEVRES *device); - bool start_of_job(DCR *dcr); - bool end_of_job(DCR *dcr); - bool term(DCR *dcr); - bool truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *trunc_parts, POOLMEM *&err); + bool init(CLOUD *cloud, POOLMEM *&err); + bool start_of_job(POOLMEM *&msg); + bool end_of_job(POOLMEM *&msg); + bool term(POOLMEM *&msg); + bool truncate_cloud_volume(const char *VolumeName, ilist *trunc_parts, cancel_callback *cancel_cb, POOLMEM *&err); bool copy_cache_part_to_cloud(transfer *xfer); bool copy_cloud_part_to_cache(transfer *xfer); - bool get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, ilist *parts, POOLMEM *&err); - bool get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err); + bool get_cloud_volume_parts_list(const char* VolumeName, ilist *parts, cancel_callback *cancel_cb, POOLMEM *&err); + bool get_cloud_volumes_list(alist *volumes, cancel_callback *cancel_cb, POOLMEM *&err); bool put_object(transfer *xfer, const char *cache_fname, const char *cloud_fname, bwlimit *limit); bool get_cloud_object(transfer *xfer, const char *cloud_fname, const char *cache_fname); diff --git a/bacula/src/stored/s3_driver.c b/bacula/src/stored/s3_driver.c index 4cbd5bd5f..c1366ef38 100644 --- a/bacula/src/stored/s3_driver.c +++ b/bacula/src/stored/s3_driver.c @@ -1,7 +1,7 @@ /* Bacula(R) - The Network Backup Solution - Copyright (C) 2000-2018 Kern Sibbald + Copyright (C) 2000-2020 Kern Sibbald The original author of Bacula is Kern Sibbald, with contributions from many others, a complete list can be found in the file AUTHORS. @@ -176,7 +176,7 @@ static const char *S3Errors[] = { */ class bacula_ctx { public: - JCR *jcr; + cancel_callback *cancel_cb; transfer *xfer; POOLMEM *&errMsg; ilist *parts; @@ -189,14 +189,20 @@ public: alist *volumes; S3Status status; bwlimit *limit; /* Used to control the bandwidth */ - bacula_ctx(POOLMEM *&err) : jcr(NULL), xfer(NULL), errMsg(err), parts(NULL), + bacula_ctx(POOLMEM *&err) : cancel_cb(NULL), xfer(NULL), errMsg(err), parts(NULL), isTruncated(0), nextMarker(NULL), obj_len(0), caller(NULL), infile(NULL), outfile(NULL), volumes(NULL), status(S3StatusOK), limit(NULL) - {} - bacula_ctx(transfer *t) : jcr(NULL), xfer(t), errMsg(t->m_message), parts(NULL), + { + /* reset error message (necessary in case of retry) */ + errMsg[0] = 0; + } + bacula_ctx(transfer *t) : cancel_cb(NULL), xfer(t), errMsg(t->m_message), parts(NULL), isTruncated(0), nextMarker(NULL), obj_len(0), caller(NULL), infile(NULL), outfile(NULL), volumes(NULL), status(S3StatusOK), limit(NULL) - {} + { + /* reset error message (necessary in case of retry) */ + errMsg[0] = 0; + } }; @@ -275,8 +281,14 @@ static void responseCompleteCallback( return; } - - +bool xfer_cancel_cb(void *arg) +{ + transfer *xfer = (transfer*)arg; + if (xfer) { + return xfer->is_canceled(); + } + return false; +}; static int putObjectCallback(int buf_len, char *buf, void *callbackCtx) { @@ -285,15 +297,15 @@ static int putObjectCallback(int buf_len, char *buf, void *callbackCtx) ssize_t rbytes = 0; int read_len; - if (ctx->xfer->is_cancelled()) { + if (ctx->xfer->is_canceled()) { Mmsg(ctx->errMsg, _("Job cancelled.\n")); return -1; } if (ctx->obj_len) { read_len = (ctx->obj_len > buf_len) ? buf_len : ctx->obj_len; rbytes = fread(buf, 1, read_len, ctx->infile); - Dmsg5(dbglvl, "%s thread=%lu rbytes=%d bufsize=%u remlen=%lu\n", - ctx->caller, pthread_self(), rbytes, buf_len, ctx->obj_len); + Dmsg6(dbglvl, "%s xfer=part.%lu thread=%lu rbytes=%d bufsize=%u remlen=%lu\n", + ctx->caller, ctx->xfer->m_part, pthread_self(), rbytes, buf_len, ctx->obj_len); if (rbytes <= 0) { berrno be; Mmsg(ctx->errMsg, "%s Error reading input file: ERR=%s\n", @@ -301,7 +313,7 @@ static int putObjectCallback(int buf_len, char *buf, void *callbackCtx) goto get_out; } ctx->obj_len -= rbytes; - + ctx->xfer->increment_processed_size(rbytes); if (ctx->limit) { ctx->limit->control_bwlimit(rbytes); } @@ -356,15 +368,16 @@ get_out: /* no error so far -> retrieve uploaded part info */ if (ctx.errMsg[0] == 0) { ilist parts; - get_cloud_volume_parts_list(xfer->m_dcr, cloud_fname, &parts, ctx.errMsg); - for (int i=1; i <= parts.last_index() ; i++) { - cloud_part *p = (cloud_part *)parts.get(i); + if (get_one_cloud_volume_part(cloud_fname, &parts, ctx.errMsg)) { + /* only one part is returned */ + cloud_part *p = (cloud_part *)parts.get(parts.last_index()); if (p) { - xfer->m_res_size = p->size; - xfer->m_res_mtime = p->mtime; - break; /* not need to go further */ + xfer->m_res_size = p->size; + xfer->m_res_mtime = p->mtime; } } + } else { + Dmsg1(dbglvl, "put_object ERROR: %s\n", ctx.errMsg); } return ctx.status; @@ -377,7 +390,7 @@ static S3Status getObjectDataCallback(int buf_len, const char *buf, ssize_t wbytes; Enter(dbglvl); - if (ctx->xfer->is_cancelled()) { + if (ctx->xfer->is_canceled()) { Mmsg(ctx->errMsg, _("Job cancelled.\n")); return S3StatusAbortedByCallback; } @@ -389,7 +402,7 @@ static S3Status getObjectDataCallback(int buf_len, const char *buf, ctx->caller, be.bstrerror()); return S3StatusAbortedByCallback; } - + ctx->xfer->increment_processed_size(wbytes); if (ctx->limit) { ctx->limit->control_bwlimit(wbytes); } @@ -457,12 +470,11 @@ get_out: /* * Not thread safe */ -bool s3_driver::truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *trunc_parts, POOLMEM *&err) +bool s3_driver::truncate_cloud_volume(const char *VolumeName, ilist *trunc_parts, cancel_callback *cancel_cb, POOLMEM *&err) { Enter(dbglvl); bacula_ctx ctx(err); - ctx.jcr = dcr->jcr; int last_index = (int)trunc_parts->last_index(); POOLMEM *cloud_fname = get_pool_memory(PM_FNAME); @@ -470,7 +482,7 @@ bool s3_driver::truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *t if (!trunc_parts->get(i)) { continue; } - if (ctx.jcr->is_canceled()) { + if (cancel_cb && cancel_cb->fct && cancel_cb->fct(cancel_cb->arg)) { Mmsg(err, _("Job cancelled.\n")); goto get_out; } @@ -496,7 +508,7 @@ void s3_driver::make_cloud_filename(POOLMEM *&filename, { Enter(dbglvl); filename[0] = 0; - dev->add_vol_and_part(filename, VolumeName, "part", apart); + add_vol_and_part(filename, VolumeName, "part", apart); Dmsg1(dbglvl, "make_cloud_filename: %s\n", filename); } @@ -519,6 +531,8 @@ bool s3_driver::copy_cache_part_to_cloud(transfer *xfer) uint32_t retry = max_upload_retries; S3Status status = S3StatusOK; do { + /* when the driver decide to retry, it must reset the processed size */ + xfer->reset_processed_size(); status = put_object(xfer, xfer->m_cache_fname, cloud_fname); --retry; } while (retry_put_object(status) && (retry>0)); @@ -543,14 +557,21 @@ bool s3_driver::copy_cloud_part_to_cache(transfer *xfer) * NOTE: See the SD Cloud resource in stored_conf.h */ -bool s3_driver::init(JCR *jcr, cloud_dev *adev, DEVRES *adevice) +bool s3_driver::init(CLOUD *cloud, POOLMEM *&err) { S3Status status; - - dev = adev; /* copy cloud device pointer */ - device = adevice; /* copy device resource pointer */ - cloud = device->cloud; /* local pointer to cloud definition */ - + if (cloud->host_name == NULL) { + Mmsg1(err, "Failed to initialize S3 Cloud. ERR=Hostname not set in cloud resource %s\n", cloud->hdr.name); + return false; + } + if (cloud->access_key == NULL) { + Mmsg1(err, "Failed to initialize S3 Cloud. ERR=AccessKey not set in cloud resource %s\n", cloud->hdr.name); + return false; + } + if (cloud->secret_key == NULL) { + Mmsg1(err, "Failed to initialize S3 Cloud. ERR=SecretKey not set in cloud resource %s\n", cloud->hdr.name); + return false; + } /* Setup bucket context for S3 lib */ s3ctx.hostName = cloud->host_name; s3ctx.bucketName = cloud->bucket_name; @@ -560,46 +581,34 @@ bool s3_driver::init(JCR *jcr, cloud_dev *adev, DEVRES *adevice) s3ctx.secretAccessKey = cloud->secret_key; s3ctx.authRegion = cloud->region; - /* File I/O buffer */ - buf_len = dev->max_block_size; - if (buf_len == 0) { - buf_len = DEFAULT_BLOCK_SIZE; - } - if ((status = S3_initialize("s3", S3_INIT_ALL, s3ctx.hostName)) != S3StatusOK) { - Mmsg1(dev->errmsg, "Failed to initialize S3 lib. ERR=%s\n", S3_get_status_name(status)); - Qmsg1(jcr, M_FATAL, 0, "%s", dev->errmsg); - Tmsg1(0, "%s", dev->errmsg); + Mmsg1(err, "Failed to initialize S3 lib. ERR=%s\n", S3_get_status_name(status)); return false; } return true; } -bool s3_driver::start_of_job(DCR *dcr) +bool s3_driver::start_of_job(POOLMEM *&msg) { - Jmsg(dcr->jcr, M_INFO, 0, _("Using S3 cloud driver Host=%s Bucket=%s\n"), - s3ctx.hostName, s3ctx.bucketName); + if (msg) { + Mmsg(msg, _("Using S3 cloud driver Host=%s Bucket=%s\n"), s3ctx.hostName, s3ctx.bucketName); + } return true; } -bool s3_driver::end_of_job(DCR *dcr) +bool s3_driver::end_of_job(POOLMEM *&msg) { return true; } -/* - * Note, dcr may be NULL - */ -bool s3_driver::term(DCR *dcr) +bool s3_driver::term(POOLMEM *&msg) { S3_deinitialize(); return true; } - - /* - * libs3 callback for get_cloud_volume_parts_list() + * libs3 callback for get_num_cloud_volume_parts_list() */ static S3Status partslistBucketCallback( int isTruncated, @@ -623,6 +632,7 @@ static S3Status partslistBucketCallback( part->mtime = obj->lastModified; part->size = obj->size; ctx->parts->put(part->index, part); + Dmsg1(dbglvl, "partslistBucketCallback: part.%d retrieved\n", part->index); } } @@ -630,12 +640,14 @@ static S3Status partslistBucketCallback( if (ctx->nextMarker) { bfree_and_null(ctx->nextMarker); } - if (nextMarker) { - ctx->nextMarker = bstrdup(nextMarker); + if (isTruncated && numObj>0) { + /* Workaround a bug with nextMarker */ + const S3ListBucketContent *obj = &(object[numObj-1]); + ctx->nextMarker = bstrdup(obj->key); } Leave(dbglvl); - if (ctx->jcr->is_canceled()) { + if (ctx->cancel_cb && ctx->cancel_cb->fct && ctx->cancel_cb->fct(ctx->cancel_cb->arg)) { Mmsg(ctx->errMsg, _("Job cancelled.\n")); return S3StatusAbortedByCallback; } @@ -648,9 +660,8 @@ S3ListBucketHandler partslistBucketHandler = &partslistBucketCallback }; -bool s3_driver::get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, ilist *parts, POOLMEM *&err) +bool s3_driver::get_cloud_volume_parts_list(const char* VolumeName, ilist *parts, cancel_callback *cancel_cb, POOLMEM *&err) { - JCR *jcr = dcr->jcr; Enter(dbglvl); if (!parts || strlen(VolumeName) == 0) { @@ -659,7 +670,7 @@ bool s3_driver::get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, il } bacula_ctx ctx(err); - ctx.jcr = jcr; + ctx.cancel_cb = cancel_cb; ctx.parts = parts; ctx.isTruncated = 1; /* pass into the while loop at least once */ ctx.caller = "S3_list_bucket"; @@ -667,6 +678,7 @@ bool s3_driver::get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, il ctx.isTruncated = 0; S3_list_bucket(&s3ctx, VolumeName, ctx.nextMarker, NULL, 0, NULL, 0, &partslistBucketHandler, &ctx); + Dmsg4(dbglvl, "get_cloud_volume_parts_list isTruncated=%d, nextMarker=%s, nbparts=%d, err=%s\n", ctx.isTruncated, ctx.nextMarker, ctx.parts->size(), ctx.errMsg?ctx.errMsg:"None"); if (ctx.status != S3StatusOK) { pm_strcpy(err, S3Errors[ctx.status]); bfree_and_null(ctx.nextMarker); @@ -675,7 +687,33 @@ bool s3_driver::get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, il } bfree_and_null(ctx.nextMarker); return true; +} + +bool s3_driver::get_one_cloud_volume_part(const char* part_path_name, ilist *parts, POOLMEM *&err) +{ + Enter(dbglvl); + + if (!parts || strlen(part_path_name) == 0) { + pm_strcpy(err, "Invalid argument"); + return false; + } + + bacula_ctx ctx(err); + ctx.parts = parts; + ctx.isTruncated = 0; /* ignore truncation in this case */ + ctx.caller = "S3_list_bucket"; + /* S3 documentation claims the parts will be returned in binary order */ + /* so part.1 < part.11 b.e. This assumed, the first part retrieved is the one we seek */ + S3_list_bucket(&s3ctx, part_path_name, ctx.nextMarker, NULL, 1, NULL, 0, &partslistBucketHandler, &ctx); + Dmsg4(dbglvl, "get_one_cloud_volume_part isTruncated=%d, nextMarker=%s, nbparts=%d, err=%s\n", ctx.isTruncated, ctx.nextMarker, ctx.parts->size(), ctx.errMsg?ctx.errMsg:"None"); + if (ctx.status != S3StatusOK) { + pm_strcpy(err, S3Errors[ctx.status]); + bfree_and_null(ctx.nextMarker); + return false; + } + bfree_and_null(ctx.nextMarker); + return true; } /* @@ -703,12 +741,14 @@ static S3Status volumeslistBucketCallback( if (ctx->nextMarker) { bfree_and_null(ctx->nextMarker); } - if (nextMarker) { - ctx->nextMarker = bstrdup(nextMarker); + if (isTruncated && numObj>0) { + /* Workaround a bug with nextMarker */ + const S3ListBucketContent *obj = &(object[numObj-1]); + ctx->nextMarker = bstrdup(obj->key); } Leave(dbglvl); - if (ctx->jcr->is_canceled()) { + if (ctx->cancel_cb && ctx->cancel_cb->fct && ctx->cancel_cb->fct(ctx->cancel_cb->arg)) { Mmsg(ctx->errMsg, _("Job cancelled.\n")); return S3StatusAbortedByCallback; } @@ -721,9 +761,8 @@ S3ListBucketHandler volumeslistBucketHandler = &volumeslistBucketCallback }; -bool s3_driver::get_cloud_volumes_list(DCR *dcr, alist *volumes, POOLMEM *&err) +bool s3_driver::get_cloud_volumes_list(alist *volumes, cancel_callback *cancel_cb, POOLMEM *&err) { - JCR *jcr = dcr->jcr; Enter(dbglvl); if (!volumes) { @@ -733,7 +772,7 @@ bool s3_driver::get_cloud_volumes_list(DCR *dcr, alist *volumes, POOLMEM *&err) bacula_ctx ctx(err); ctx.volumes = volumes; - ctx.jcr = jcr; + ctx.cancel_cb = cancel_cb; ctx.isTruncated = 1; /* pass into the while loop at least once */ ctx.caller = "S3_list_bucket"; while (ctx.isTruncated!=0) { @@ -748,59 +787,4 @@ bool s3_driver::get_cloud_volumes_list(DCR *dcr, alist *volumes, POOLMEM *&err) return (err[0] == 0); } -#ifdef really_needed -static S3Status listBucketCallback( - int isTruncated, - const char *nextMarker, - int contentsCount, - const S3ListBucketContent *contents, - int commonPrefixesCount, - const char **commonPrefixes, - void *callbackData); - -S3ListBucketHandler listBucketHandler = -{ - responseHandler, - &listBucketCallback -}; - - -/* - * List content of a bucket - */ -static S3Status listBucketCallback( - int isTruncated, - const char *nextMarker, - int numObj, - const S3ListBucketContent *contents, - int commonPrefixesCount, - const char **commonPrefixes, - void *callbackData) -{ - bacula_ctx *ctx = (bacula_ctx *)callbackCtx; - if (print_hdr) { - Pmsg1(000, "\n%-22s", " Object Name"); - Pmsg2(000, " %-5s %-20s", "Size", " Last Modified"); - Pmsg0(000, "\n---------------------- ----- --------------------\n"); - print_hdr = false; /* print header once only */ - } - - for (int i = 0; i < numObj; i++) { - char timebuf[256]; - char sizebuf[16]; - const S3ListBucketContent *content = &(contents[i]); - time_t t = (time_t) content->lastModified; - strftime(timebuf, sizeof(timebuf), "%Y-%m-%dT%H:%M:%SZ", gmtime(&t)); - sprintf(sizebuf, "%5llu", (unsigned long long) content->size); - Pmsg3(000, "%-22s %s %s\n", content->key, sizebuf, timebuf); - } - Pmsg0(000, "\n"); - if (ctx->jcr->is_canceled()) { - Mmsg(ctx->errMsg, _("Job cancelled.\n")); - return S3StatusAbortedByCallback; - } - return S3StatusOK; -} -#endif - #endif /* HAVE_LIBS3 */ diff --git a/bacula/src/stored/s3_driver.h b/bacula/src/stored/s3_driver.h index de039d516..f6d242221 100644 --- a/bacula/src/stored/s3_driver.h +++ b/bacula/src/stored/s3_driver.h @@ -1,7 +1,7 @@ /* Bacula(R) - The Network Backup Solution - Copyright (C) 2000-2018 Kern Sibbald + Copyright (C) 2000-2020 Kern Sibbald The original author of Bacula is Kern Sibbald, with contributions from many others, a complete list can be found in the file AUTHORS. @@ -35,13 +35,8 @@ class s3_driver: public cloud_driver { private: S3BucketContext s3ctx; /* Main S3 bucket context */ - uint32_t buf_len; - public: cloud_dev *dev; /* device that is calling us */ - DEVRES *device; - DCR *dcr; /* dcr set during calls to S3_xxx */ - CLOUD *cloud; /* Pointer to CLOUD resource */ s3_driver() { }; @@ -49,18 +44,21 @@ public: }; void make_cloud_filename(POOLMEM *&filename, const char *VolumeName, uint32_t part); - bool init(JCR *jcr, cloud_dev *dev, DEVRES *device); - bool start_of_job(DCR *dcr); - bool term(DCR *dcr); - bool end_of_job(DCR *dcr); - bool truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *trunc_parts, POOLMEM *&err); + bool init(CLOUD *cloud, POOLMEM *&err); + bool start_of_job(POOLMEM *&err); + bool term(POOLMEM *&err); + bool end_of_job(POOLMEM *&err); + bool truncate_cloud_volume(const char *VolumeName, ilist *trunc_parts, cancel_callback *cancel_cb, POOLMEM *&err); bool copy_cache_part_to_cloud(transfer *xfer); bool copy_cloud_part_to_cache(transfer *xfer); - bool get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, ilist *parts, POOLMEM *&err); - bool get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err); + bool get_cloud_volume_parts_list(const char* VolumeName, ilist *parts, cancel_callback *cancel_cb, POOLMEM *&err); + bool get_cloud_volumes_list(alist *volumes, cancel_callback *cancel_cb, POOLMEM *&err); S3Status put_object(transfer *xfer, const char *cache_fname, const char *cloud_fname); bool retry_put_object(S3Status status); bool get_cloud_object(transfer *xfer, const char *cloud_fname, const char *cache_fname); + +private: + bool get_one_cloud_volume_part(const char* part_path_name, ilist *parts, POOLMEM *&err); }; #endif /* HAVE_LIBS3 */