]> git.ipfire.org Git - thirdparty/bacula.git/commitdiff
Backport cloud upload code from Enterprise
authorNorbert Bizet <norbert.bizet@baculasystems.com>
Fri, 5 Jun 2020 06:20:58 +0000 (08:20 +0200)
committerKern Sibbald <kern@sibbald.com>
Fri, 5 Jun 2020 06:20:58 +0000 (08:20 +0200)
bacula/src/stored/cloud_dev.c
bacula/src/stored/cloud_dev.h
bacula/src/stored/cloud_driver.h
bacula/src/stored/cloud_parts.c
bacula/src/stored/cloud_parts.h
bacula/src/stored/cloud_transfer_mgr.c
bacula/src/stored/cloud_transfer_mgr.h
bacula/src/stored/file_driver.c
bacula/src/stored/file_driver.h
bacula/src/stored/s3_driver.c
bacula/src/stored/s3_driver.h

index 970f6d4f0832d93f9392a139449968c3d78a1d57..f295f7fb10f7dfd9c9e883dc6d5f39d6533e1400 100644 (file)
@@ -1,7 +1,7 @@
 /*
    Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2000-2018 Kern Sibbald
+   Copyright (C) 2000-2020 Kern Sibbald
 
    The original author of Bacula is Kern Sibbald, with contributions
    from many others, a complete list can be found in the file AUTHORS.
@@ -219,10 +219,12 @@ void *download_engine(transfer *tpkt)
 /*
  * Upload the given part to the cloud
  */
-bool cloud_dev::upload_part_to_cloud(DCR *dcr, const char *VolumeName, uint32_t upart)
+bool cloud_dev::upload_part_to_cloud(DCR *dcr, const char *VolumeName, uint32_t upart, bool do_truncate)
 {
-   if (upload_opt == UPLOAD_NO) {
-      /* lets pretend everything is OK */
+   /* for a "regular" backup job, don't proceed with the upload, but pretend everything went OK. */
+   /* Otherwise, pass thru and proceed with the upload, even if UPLOAD_NO is set */
+   bool internal_job = dcr->jcr->is_internal_job() || dcr->jcr->is_JobType(JT_ADMIN);
+   if ((upload_opt == UPLOAD_NO) && !internal_job) {
       return true;
    }
    bool ret=false;
@@ -274,8 +276,9 @@ bool cloud_dev::upload_part_to_cloud(DCR *dcr, const char *VolumeName, uint32_t
    dcr->uploads->append(item);
    /* transfer are queued manually, so the caller has control on when the transfer is scheduled
     * this should come handy for upload_opt */
-   item->set_do_cache_truncate(trunc_opt == TRUNC_AFTER_UPLOAD);
-   if (upload_opt == UPLOAD_EACHPART) {
+   item->set_do_cache_truncate(do_truncate);
+   if ( (upload_opt == UPLOAD_EACHPART) ||
+        ((upload_opt == UPLOAD_NO) && internal_job) ) {
       /* in each part upload option, queue right away */
       item->queue();
    }
@@ -325,8 +328,8 @@ transfer *cloud_dev::download_part_to_cache(DCR *dcr, const char *VolumeName, ui
       pm_strcpy(cache_fname, dev_name);
       /* create a uniq xfer file name with XFER_TMP_NAME and the pid */
       char  xferbuf[32];
-      bsnprintf(xferbuf, sizeof(xferbuf), "%s_%d", XFER_TMP_NAME, (int)getpid());
-      add_vol_and_part(cache_fname, VolumeName, xferbuf, dpart);
+      bsnprintf(xferbuf, sizeof(xferbuf), "%s_%d_%d", XFER_TMP_NAME, (int)getpid(), (int)dcr->jcr->JobId);
+      cloud_driver::add_vol_and_part(cache_fname, VolumeName, xferbuf, dpart);
 
       /* use the cloud proxy to retrieve the transfer size */
       uint64_t cloud_size = cloud_prox->get_size(VolumeName, dpart);
@@ -495,7 +498,6 @@ bool cloud_dev::get_cache_sizes(DCR *dcr, const char *VolumeName)
    POOLMEM *fname = get_pool_memory(PM_NAME);
    uint32_t cpart;
    bool ok = false;
-
    POOL_MEM dname(PM_FNAME);
    int status = 0;
 
@@ -599,32 +601,13 @@ get_out:
    return ok;
 }
 
-
-/* Utility routines */
-
-void cloud_dev::add_vol_and_part(POOLMEM *&filename,
-        const char *VolumeName, const char *name, uint32_t apart)
-{
-   Enter(dbglvl);
-   char partnumber[20];
-   int len = strlen(filename);
-
-   if (len > 0 && !IsPathSeparator((filename)[len-1])) {
-      pm_strcat(filename, "/");
-   }
-
-   pm_strcat(filename, VolumeName);
-   bsnprintf(partnumber, sizeof(partnumber), "/%s.%d", name, apart);
-   pm_strcat(filename, partnumber);
-}
-
 void cloud_dev::make_cache_filename(POOLMEM *&filename,
         const char *VolumeName, uint32_t upart)
 {
    Enter(dbglvl);
 
    pm_strcpy(filename, dev_name);
-   add_vol_and_part(filename, VolumeName, "part", upart);
+   cloud_driver::add_vol_and_part(filename, VolumeName, "part", upart);
 }
 
 void cloud_dev::make_cache_volume_name(POOLMEM *&volname,
@@ -655,7 +638,7 @@ cloud_dev::~cloud_dev()
       cache_sizes = NULL;
    }
    if (driver) {
-      driver->term(NULL);
+      driver->term(errmsg);
       delete driver;
       driver = NULL;
    }
@@ -709,13 +692,16 @@ cloud_dev::cloud_dev(JCR *jcr, DEVRES *device)
          download_mgr.m_wq.max_workers = device->cloud->max_concurrent_downloads;
       }
 
+      POOL_MEM err;             /* device->errmsg is not yet ready */
       /* Initialize the driver */
-      driver->init(jcr, this, device);
+      if (!driver->init(device->cloud, err.addr())) {
+         Qmsg1(jcr, M_FATAL, 0, "Cloud driver initialization error %s\n", err.c_str());
+         Tmsg1(0, "Cloud driver initialization error %s\n", err.c_str());
+      }
    }
 
    /* the cloud proxy owns its cloud_parts, so we can 'set and forget' them */
    cloud_prox = cloud_proxy::get_instance();
-
 }
 
 /*
@@ -1156,7 +1142,7 @@ bool cloud_dev::close(DCR *dcr)
 
    /* Ensure the last written part is uploaded */
    if ((part > 0) && dcr->is_writing()) {
-      if (!upload_part_to_cloud(dcr, VolHdr.VolumeName, part)) {
+      if (!upload_part_to_cloud(dcr, VolHdr.VolumeName, part, (trunc_opt == TRUNC_AFTER_UPLOAD))) {
          if (errmsg[0]) {
             Qmsg(dcr->jcr, M_ERROR, 0, "%s", errmsg);
          }
@@ -1219,7 +1205,10 @@ bool cloud_dev::probe_cloud_proxy(DCR *dcr,const char *VolName, bool force)
       jcr_not_killable jkl(dcr->jcr);
       ilist cloud_parts(100, false); /* !! dont own the parts here */
       /* first, retrieve the volume content within cloud_parts list*/
-      if (!driver->get_cloud_volume_parts_list(dcr, VolName, &cloud_parts, errmsg)) {
+      cancel_callback cancel_cb;
+      cancel_cb.fct = DCR_cancel_cb;
+      cancel_cb.arg = dcr;
+      if (!driver->get_cloud_volume_parts_list(VolName, &cloud_parts, &cancel_cb, errmsg)) {
          Dmsg2(dbglvl, "Cannot get cloud sizes for Volume=%s Err=%s\n", VolName, errmsg);
          return false;
       }
@@ -1319,8 +1308,8 @@ bool cloud_dev::truncate(DCR *dcr)
    bool ok = false;
    POOL_MEM dname(PM_FNAME);
    int status = 0;
-   ilist * iuploads = New(ilist(100,true)); /* owns the parts */
-   ilist *truncate_list = NULL;
+   ilist * iuploads=New(ilist(100,true)); /* owns the parts */
+   ilist *truncate_list=NULL;
    FILE *fp;
    errmsg[0] = 0;
    Enter(dbglvl);
@@ -1435,10 +1424,16 @@ bool cloud_dev::truncate(DCR *dcr)
       iuploads->put(part->index, part);
    }
    /* returns the list of items to truncate : cloud parts-uploads*/
+   cancel_callback cancel_cb;
+   cancel_cb.fct = DCR_cancel_cb;
+   cancel_cb.arg = dcr;
    truncate_list = cloud_prox->exclude(getVolCatName(), iuploads);
-   if (truncate_list && !driver->truncate_cloud_volume(dcr, getVolCatName(), truncate_list, errmsg)) {
-      Qmsg(dcr->jcr, M_ERROR, 0, "truncate_cloud_volume for %s: ERR=%s\n", getVolCatName(), errmsg);
+   if (truncate_list && !driver->truncate_cloud_volume(getVolCatName(), truncate_list, &cancel_cb, errmsg)) {
+      Dmsg1(dbglvl, "%s", errmsg);
+      Qmsg(dcr->jcr, M_ERROR, 0, "%s", errmsg);
       goto get_out;
+   } else {
+      Dmsg1(dbglvl, "%s", errmsg);
    }
    /* force proxy refresh (volume should be empty so it should be fast) */
    /* another approach would be to reuse truncate_list to remove items */
@@ -1605,7 +1600,7 @@ bool cloud_dev::open_next_part(DCR *dcr)
    /* Write part to cloud */
    Dmsg2(dbglvl, "=== part=%d num_cache_parts=%d\n", part, num_cache_parts);
    if (dcr->is_writing()) {
-      if (!upload_part_to_cloud(dcr, getVolCatName(), part)) {
+      if (!upload_part_to_cloud(dcr, getVolCatName(), part, (trunc_opt == TRUNC_AFTER_UPLOAD))) {
          if (errmsg[0]) {
             Qmsg(dcr->jcr, M_ERROR, 0, "%s", errmsg);
          }
@@ -1900,10 +1895,14 @@ bool cloud_dev::do_size_checks(DCR *dcr, DEV_BLOCK *block)
 
 bool cloud_dev::start_of_job(DCR *dcr)
 {
+   bool ret=false;
    if (driver) {
-      driver->start_of_job(dcr);
+      ret = driver->start_of_job(errmsg);
+   } else {
+      Mmsg(errmsg, "Cloud driver not properly loaded"); /* We should always have a dummy driver */
    }
-   return true;
+   Jmsg(dcr->jcr, ret? M_INFO : M_FATAL, 0, "%s\n", errmsg);
+   return ret;
 }
 
 
@@ -1950,7 +1949,7 @@ static void update_volume_record(DCR *dcr, transfer *ppkt)
    }
 }
 
-bool cloud_dev::end_of_job(DCR *dcr)
+bool cloud_dev::end_of_job(DCR *dcr, uint32_t truncate)
 {
    Enter(dbglvl);
    transfer *tpkt;            /* current packet */
@@ -2012,10 +2011,10 @@ bool cloud_dev::end_of_job(DCR *dcr)
          tpkt->append_status(umsg);
          Jmsg(dcr->jcr, (tpkt->m_state == TRANS_STATE_ERROR) ? M_ERROR : M_INFO, 0, "%s%s", prefix, umsg.c_str());
          Dmsg1(dbglvl, "%s", umsg.c_str());
-
+         bool do_truncate = (truncate==TRUNC_AT_ENDOFJOB) || (truncate==TRUNC_CONF_DEFAULT && trunc_opt==TRUNC_AT_ENDOFJOB);
          if (tpkt->m_state == TRANS_STATE_ERROR) {
             Mmsg(dcr->jcr->StatusErrMsg, _("Upload to Cloud failed"));
-         } else if (trunc_opt == TRUNC_AT_ENDOFJOB && tpkt->m_part!=1) {
+         } else if (do_truncate && tpkt->m_part!=1) {
             /* else -> don't remove the cache file if the upload failed */
             if (unlink(tpkt->m_cache_fname) != 0) {
                berrno be;
@@ -2056,7 +2055,7 @@ bool cloud_dev::end_of_job(DCR *dcr)
    dcr->uploads->destroy();
 
    if (driver) {
-      driver->end_of_job(dcr);
+      driver->end_of_job(errmsg);
    }
 
    Leave(dbglvl);
@@ -2096,12 +2095,28 @@ bool cloud_dev::wait_end_of_transfer(DCR *dcr, transfer *elem)
    Leave(dbglvl);
    return (stat == 0);
 }
+/* return the volumes list */
+bool cloud_dev::get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err)
+{
+   cancel_callback cancel_cb;
+   cancel_cb.fct = DCR_cancel_cb;
+   cancel_cb.arg = dcr;
+   return driver->get_cloud_volumes_list(volumes, &cancel_cb, err);
+}
+
+/* return the list of parts contained in VolumeName */
+bool cloud_dev::get_cloud_volume_parts_list(DCR *dcr, const char *VolumeName, ilist *parts, POOLMEM *&err)
+{
+   cancel_callback cancel_cb;
+   cancel_cb.fct = DCR_cancel_cb;
+   cancel_cb.arg = dcr;
+   return driver->get_cloud_volume_parts_list(VolumeName, parts,  &cancel_cb, err);
+}
 
 /* TODO: Add .api2 mode for the status message */
 /* format a status message of the cloud transfers. Verbose gives details on each transfer */
 uint32_t cloud_dev::get_cloud_upload_transfer_status(POOL_MEM& msg, bool verbose)
 {
-   upload_mgr.update_statistics();
    uint32_t ret = 0;
    ret = Mmsg(msg,_("   Uploads   "));
    ret += upload_mgr.append_status(msg, verbose);
@@ -2111,7 +2126,6 @@ uint32_t cloud_dev::get_cloud_upload_transfer_status(POOL_MEM& msg, bool verbose
 /* format a status message of the cloud transfers. Verbose gives details on each transfer */
 uint32_t cloud_dev::get_cloud_download_transfer_status(POOL_MEM& msg, bool verbose)
 {
-   download_mgr.update_statistics();
    uint32_t ret = 0;
    ret = Mmsg(msg,_("   Downloads "));
    ret += download_mgr.append_status(msg, verbose);
@@ -2243,7 +2257,7 @@ get_out:
 /*
  * Upload cache parts that are not in the cloud
  */
-bool cloud_dev::upload_cache(DCR *dcr, const char *VolumeName, POOLMEM *&err)
+bool cloud_dev::upload_cache(DCR *dcr, const char *VolumeName, uint32_t truncate, POOLMEM *&err)
 {
    int i;
    Enter(dbglvl);
@@ -2252,8 +2266,10 @@ bool cloud_dev::upload_cache(DCR *dcr, const char *VolumeName, POOLMEM *&err)
    ilist cache_parts;
    POOLMEM *vol_dir = get_pool_memory(PM_NAME);
    POOLMEM *fname = get_pool_memory(PM_NAME);
-
-   if (!driver->get_cloud_volume_parts_list(dcr, VolumeName, &cloud_parts, err)) {
+   cancel_callback cancel_cb;
+   cancel_cb.fct = DCR_cancel_cb;
+   cancel_cb.arg = dcr;
+   if (!driver->get_cloud_volume_parts_list(VolumeName, &cloud_parts, &cancel_cb, err)) {
       Qmsg2(dcr->jcr, M_ERROR, 0, "Error while uploading parts for volume %s. %s\n", VolumeName, err);
       ret = false;
       goto bail_out;
@@ -2284,7 +2300,8 @@ bool cloud_dev::upload_cache(DCR *dcr, const char *VolumeName, POOLMEM *&err)
       }
       Mmsg(fname, "%s/part.%d", vol_dir, i);
       Dmsg1(dbglvl, "Do upload of %s\n", fname);
-      if (!upload_part_to_cloud(dcr, VolumeName, i)) {
+      bool do_truncate = (truncate==TRUNC_AFTER_UPLOAD) || (truncate==TRUNC_CONF_DEFAULT && (trunc_opt == TRUNC_AFTER_UPLOAD));
+      if (!upload_part_to_cloud(dcr, VolumeName, i, do_truncate)) {
          if (errmsg[0]) {
             Qmsg(dcr->jcr, M_ERROR, 0, "%s", errmsg);
          }
index 9739cacfd33929225ecbd8462a763a6252dc5bc7..1f649189287b2afbb6d799150d1164aa590a022f 100644 (file)
@@ -1,7 +1,7 @@
 /*
    Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2000-2018 Kern Sibbald
+   Copyright (C) 2000-2020 Kern Sibbald
 
    The original author of Bacula is Kern Sibbald, with contributions
    from many others, a complete list can be found in the file AUTHORS.
 #include "cloud_transfer_mgr.h"
 #include "cloud_parts.h"
 
+/* enum loadable drivers */
+enum {
+   C_S3_DRIVER    = 1,
+   C_FILE_DRIVER  = 2
+};
+
 class cloud_dev: public file_dev {
 public:
    int64_t obj_len;
@@ -49,6 +55,8 @@ public:
    uint32_t trunc_opt;
    uint32_t upload_opt;
 
+   uint32_t current_driver_type;
+
    cloud_driver *driver;
 
    static transfer_manager download_mgr;
@@ -56,12 +64,9 @@ public:
 
    cloud_proxy *cloud_prox;
 
-   void add_vol_and_part(POOLMEM *&filename, const char *VolumeName, const char *name, uint32_t part);
-
 private:
-   char *cache_directory;
    bool download_parts_to_read(DCR *dcr, alist* parts);
-   bool upload_part_to_cloud(DCR *dcr, const char *VolumeName, uint32_t part);
+   bool upload_part_to_cloud(DCR *dcr, const char *VolumeName, uint32_t part, bool do_truncate);
    transfer *download_part_to_cache(DCR *dcr, const char *VolumeName,  uint32_t part);
    void make_cache_filename(POOLMEM *&filename, const char *VolumeName, uint32_t part);
    void make_cache_volume_name(POOLMEM *&full_volname, const char *VolumeName);
@@ -86,7 +91,7 @@ public:
    bool open_next_part(DCR *dcr);
    bool truncate(DCR *dcr);
    int  truncate_cache(DCR *dcr, const char *VolName, int64_t *size);
-   bool upload_cache(DCR *dcr, const char *VolName, POOLMEM *&err);
+   bool upload_cache(DCR *dcr, const char *VolName, uint32_t truncate, POOLMEM *&err);
    bool close(DCR *dcr);
    bool update_pos(DCR *dcr);
    bool is_eod_valid(DCR *dcr);
@@ -106,10 +111,9 @@ public:
            bool relabel, bool no_prelabel);
    bool rewrite_volume_label(DCR *dcr, bool recycle);
    bool start_of_job(DCR *dcr);
-   bool end_of_job(DCR *dcr);
-   bool get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err)
-     { return !driver?false:driver->get_cloud_volumes_list(dcr, volumes, err); };
-   bool get_cloud_volume_parts_list(DCR *dcr, const char *VolumeName, ilist *parts, POOLMEM *&err) { return driver->get_cloud_volume_parts_list(dcr, VolumeName, parts, err);};
+   bool end_of_job(DCR *dcr, uint32_t truncate);
+   bool get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err);
+   bool get_cloud_volume_parts_list(DCR *dcr, const char *VolumeName, ilist *parts, POOLMEM *&err);
    uint32_t get_cloud_upload_transfer_status(POOL_MEM &msg, bool verbose);
    uint32_t get_cloud_download_transfer_status(POOL_MEM &msg, bool verbose);
 };
index dafd880fa2a179a8b964d516a2325f90b8ccbe6f..26e7bc5cc59d94aa4c0380ab9ee8741a87f2cfb6 100644 (file)
@@ -1,25 +1,24 @@
 /*
-   Bacula(R) - The Network Backup Solution
+   Bacula® - The Network Backup Solution
 
-   Copyright (C) 2000-2018 Kern Sibbald
+   Copyright (C) 2000-2016 Bacula Systems SA
+   All rights reserved.
 
-   The original author of Bacula is Kern Sibbald, with contributions
-   from many others, a complete list can be found in the file AUTHORS.
+   The main author of Bacula is Kern Sibbald, with contributions from many
+   others, a complete list can be found in the file AUTHORS.
 
-   You may use this file and others of this release according to the
-   license defined in the LICENSE file, which includes the Affero General
-   Public License, v3.0 ("AGPLv3") and some additional permissions and
-   terms pursuant to its AGPLv3 Section 7.
+   Licensees holding a valid Bacula Systems SA license may use this file
+   and others of this release in accordance with the proprietary license
+   agreement provided in the LICENSE file.  Redistribution of any part of
+   this release is not permitted.
 
-   This notice must be preserved when any source code is
-   conveyed and/or propagated.
-
-   Bacula(R) is a registered trademark of Kern Sibbald.
+   Bacula® is a registered trademark of Kern Sibbald.
 */
 /*
  * Routines for writing Cloud drivers
  *
  * Written by Kern Sibbald, May MMXVI
+ *
  */
 
 #include "bacula.h"
 #define NUM_UPLOAD_RETRIES 2
 class cloud_dev;
 
-enum {
-   C_S3_DRIVER    = 1,
-   C_FILE_DRIVER  = 2
-};
+/* define the cancel callback type */
+typedef bool (cancel_cb_type)(void*);
+typedef struct  {
+  cancel_cb_type* fct;
+   void *arg;
+} cancel_callback;
 
 /* Abstract class cannot be instantiated */
 class cloud_driver: public SMARTALLOC {
@@ -47,14 +48,26 @@ public:
 
    virtual bool copy_cache_part_to_cloud(transfer *xfer) = 0;
    virtual bool copy_cloud_part_to_cache(transfer *xfer) = 0;
-   virtual bool truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *trunc_parts, POOLMEM *&err) = 0;
-   virtual bool init(JCR *jcr, cloud_dev *dev, DEVRES *device) = 0;
-   virtual bool term(DCR *dcr) = 0;
-   virtual bool start_of_job(DCR *dcr) = 0;
-   virtual bool end_of_job(DCR *dcr) = 0;
+   virtual bool truncate_cloud_volume(const char *VolumeName, ilist *trunc_parts, cancel_callback *cancel_cb, POOLMEM *&err) = 0;
+   virtual bool init(CLOUD *cloud, POOLMEM *&err) = 0;
+   virtual bool term(POOLMEM *&err) = 0;
+   virtual bool start_of_job(POOLMEM *&err) = 0;
+   virtual bool end_of_job(POOLMEM *&err) = 0;
+   virtual bool get_cloud_volume_parts_list(const char* VolumeName, ilist *parts, cancel_callback *cancel_cb, POOLMEM *&err) = 0;
+   virtual bool get_cloud_volumes_list(alist *volumes, cancel_callback *cancel_cb, POOLMEM *&err) = 0;
+   static void add_vol_and_part(POOLMEM *&filename, const char *VolumeName, const char *name, uint32_t apart)
+   {
+      char partnumber[20];
+      int len = strlen(filename);
+
+      if (len > 0 && !IsPathSeparator((filename)[len-1])) {
+         pm_strcat(filename, "/");
+      }
 
-   virtual bool get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, ilist *parts, POOLMEM *&err) = 0;
-   virtual bool get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err) = 0; /* TODO: Adapt the prototype to have a handler instead */
+      pm_strcat(filename, VolumeName);
+      bsnprintf(partnumber, sizeof(partnumber), "/%s.%d", name, apart);
+      pm_strcat(filename, partnumber);
+   }
 
    bwlimit upload_limit;
    bwlimit download_limit;
index a78032a3616eace28ece4a125ea2ab3bf95b2552..eaa21837450c4236b11fe3cb175799760f783dce 100644 (file)
@@ -1,25 +1,24 @@
 /*
-   Bacula(R) - The Network Backup Solution
+   Bacula® - The Network Backup Solution
 
-   Copyright (C) 2000-2018 Kern Sibbald
+   Copyright (C) 2000-2016 Bacula Systems SA
+   All rights reserved.
 
-   The original author of Bacula is Kern Sibbald, with contributions
-   from many others, a complete list can be found in the file AUTHORS.
+   The main author of Bacula is Kern Sibbald, with contributions from many
+   others, a complete list can be found in the file AUTHORS.
 
-   You may use this file and others of this release according to the
-   license defined in the LICENSE file, which includes the Affero General
-   Public License, v3.0 ("AGPLv3") and some additional permissions and
-   terms pursuant to its AGPLv3 Section 7.
+   Licensees holding a valid Bacula Systems SA license may use this file
+   and others of this release in accordance with the proprietary license
+   agreement provided in the LICENSE file.  Redistribution of any part of
+   this release is not permitted.
 
-   This notice must be preserved when any source code is
-   conveyed and/or propagated.
-
-   Bacula(R) is a registered trademark of Kern Sibbald.
+   Bacula® is a registered trademark of Kern Sibbald.
 */
 /*
  * Routines for writing Cloud drivers
  *
  * Written by Kern Sibbald, May MMXVI
+ *
  */
 
 #include "cloud_parts.h"
@@ -604,4 +603,4 @@ int main (int argc, char *argv[])
 
 }
 
-#endif /* TEST_PROGRAM */
+#endif /* TEST_PROGRAM */
\ No newline at end of file
index 4755b98ef1d9f74f3ef5910c4911ad4fdcb23d7e..1494897d51216b7486bcd632ae156a0de6cd2e5a 100644 (file)
@@ -1,25 +1,24 @@
 /*
-   Bacula(R) - The Network Backup Solution
+   Bacula® - The Network Backup Solution
 
-   Copyright (C) 2000-2018 Kern Sibbald
+   Copyright (C) 2000-2016 Bacula Systems SA
+   All rights reserved.
 
-   The original author of Bacula is Kern Sibbald, with contributions
-   from many others, a complete list can be found in the file AUTHORS.
+   The main author of Bacula is Kern Sibbald, with contributions from many
+   others, a complete list can be found in the file AUTHORS.
 
-   You may use this file and others of this release according to the
-   license defined in the LICENSE file, which includes the Affero General
-   Public License, v3.0 ("AGPLv3") and some additional permissions and
-   terms pursuant to its AGPLv3 Section 7.
+   Licensees holding a valid Bacula Systems SA license may use this file
+   and others of this release in accordance with the proprietary license
+   agreement provided in the LICENSE file.  Redistribution of any part of
+   this release is not permitted.
 
-   This notice must be preserved when any source code is
-   conveyed and/or propagated.
-
-   Bacula(R) is a registered trademark of Kern Sibbald.
+   Bacula® is a registered trademark of Kern Sibbald.
 */
 /*
  * Routines for managing Volumes contains and comparing parts
  *
  * Written by Norbert Bizet, May MMXVI
+ *
  */
 #ifndef _CLOUD_PARTS_H_
 #define _CLOUD_PARTS_H_
@@ -76,10 +75,10 @@ class cloud_proxy : public SMARTALLOC
 private:
    htable *m_hash;               /* the root htable */
    bool m_owns;                  /* determines if ilist own the cloud_parts */
-   pthread_mutex_t m_mutex;      /* protect access*/   
+   pthread_mutex_t m_mutex;      /* protect access*/
    static cloud_proxy *m_pinstance; /* singleton instance */
    static uint64_t m_count;      /* static refcount */
-   
+
    ~cloud_proxy();
 
 public:
@@ -101,7 +100,7 @@ public:
 
    /* Check if the volume entry exists and return true if it's the case */
    bool volume_lookup(const char *volume);
-   
+
    /* reset the volume list content with the content of part_list */
    bool reset(const char *volume, ilist *part_list);
 
index e3786005ddff224bc64ac3ee781e9c36416d36a9..6548649f76320a1c465b00c1e2488e2cd1b28046 100644 (file)
@@ -1,7 +1,7 @@
 /*
    Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2000-2018 Kern Sibbald
+   Copyright (C) 2000-2020 Kern Sibbald
 
    The original author of Bacula is Kern Sibbald, with contributions
    from many others, a complete list can be found in the file AUTHORS.
 #include "cloud_transfer_mgr.h"
 #include "stored.h"
 
+#define ONE_SEC 1000000LL /* number of microseconds in a second */
+
+static const int dbglvl = 450;
+
 /* constructor
    * size : the size in bytes of the transfer
    * funct : function to process
@@ -46,9 +50,11 @@ transfer::transfer(uint64_t    size,
                   DCR          *dcr,
                   cloud_proxy  *proxy) :
    m_stat_size(size),
+   m_stat_processed_size(0),
    m_stat_start(0),
    m_stat_duration(0),
    m_stat_eta(0),
+   m_stat_average_rate(0),
    m_message(NULL),
    m_state(TRANS_STATE_CREATED),
    m_mgr(NULL),
@@ -84,7 +90,7 @@ transfer::~transfer()
    free(m_cache_fname);
    if (m_use_count > 0) {
       ASSERT(FALSE);
-      Dmsg1(0, "!!!m_use_count = %d\n", m_use_count);
+      Dmsg1(dbglvl, "!!!m_use_count = %d\n", m_use_count);
    }
 }
 
@@ -98,6 +104,31 @@ bool transfer::queue()
    return true;
 }
 
+/* reset processed size */
+void transfer::reset_processed_size()
+{
+   lock_guard lg(m_stat_mutex);
+   m_stat_processed_size = 0;
+   ASSERTD(m_stat_processed_size==0, "invalid locking of processed size");
+}
+
+/* set absolute value process size */
+void transfer::set_processed_size(uint64_t size)
+{
+   lock_guard lg(m_stat_mutex);
+   m_stat_processed_size = size;
+   m_stat_duration = get_current_btime()-m_stat_start;
+   if (m_stat_duration > 0) {
+      m_stat_average_rate = (m_stat_processed_size*ONE_SEC)/m_stat_duration;
+   }
+   ASSERTD(m_stat_processed_size <= m_stat_size, "increment_processed_size increment too big");
+
+}
+/* add increment to the current processed size */
+void transfer::increment_processed_size(uint64_t increment)
+{
+   set_processed_size(m_stat_processed_size+increment);
+}
 
 /* opaque function that processes m_funct with m_arg as parameter
  * depending on m_funct return value, changes state to TRANS_STATE_DONE
@@ -166,7 +197,7 @@ bool transfer::cancel()
 }
 
 /* checking the cancel status : doesnt request locking */
-bool transfer::is_cancelled() const
+bool transfer::is_canceled() const
 {
    return m_cancel;
 }
@@ -178,21 +209,22 @@ uint32_t transfer::append_status(POOL_MEM& msg)
    uint32_t ret=0;
    static const char *state[]  = {"created",  "queued",  "process", "done", "error"};
 
+   lock_guard lg(m_stat_mutex);
    if (m_state > TRANS_STATE_PROCESSED) {
       ret =  Mmsg(tmp_msg,_("%s/part.%-5d state=%-7s size=%sB duration=%ds%s%s\n"),
                   m_volume_name, m_part,
                   state[m_state],
                   edit_uint64_with_suffix(m_stat_size, ec),
-                  m_stat_duration,
+                  m_stat_duration/ONE_SEC,
                   (strlen(m_message) != 0)?" msg=":"",
                   (strlen(m_message) != 0)?m_message:"");
       pm_strcat(msg, tmp_msg);
    } else {
-      ret = Mmsg(tmp_msg,_("%s/part.%-5d, state=%-7s size=%sB eta=%dss%s%s\n"),
+      ret = Mmsg(tmp_msg,_("%s/part.%-5d state=%-7s size=%sB eta=%ds%s%s\n"),
                   m_volume_name, m_part,
                   state[m_state],
                   edit_uint64_with_suffix(m_stat_size, ec),
-                  m_stat_eta,
+                  m_stat_eta/ONE_SEC,
                   (strlen(m_message) != 0)?" msg=":"",
                   (strlen(m_message) != 0)?m_message:"");
       pm_strcat(msg, tmp_msg);
@@ -201,6 +233,34 @@ uint32_t transfer::append_status(POOL_MEM& msg)
    return ret;
 }
 
+void transfer::append_api_status(OutputWriter &ow)
+{
+   static const char *state[]  = {"created",  "queued",  "process", "done", "error"};
+
+   lock_guard lg(m_stat_mutex);
+   if (m_state > TRANS_STATE_PROCESSED) {
+         ow.get_output(OT_START_OBJ,
+                  OT_STRING,"volume_name",            m_volume_name,
+                  OT_INT32, "part",                   m_part,
+                  OT_INT32, "jobid",                  m_dcr->jcr->JobId,
+                  OT_STRING,"state",                  state[m_state],
+                  OT_INT64, "size",                   m_stat_size,
+                  OT_DURATION, "duration",            m_stat_duration/ONE_SEC,
+                  OT_STRING,"message",                m_message,
+                  OT_END);
+   } else {
+         ow.get_output(OT_START_OBJ,
+                  OT_STRING,"volume_name",            m_volume_name,
+                  OT_INT32, "part",                   m_part,
+                  OT_INT32, "jobid",                  m_dcr->jcr->JobId,
+                  OT_STRING,"state",                  state[m_state],
+                  OT_INT64, "size",                   m_stat_size,
+                  OT_INT64, "processed_size",         m_stat_processed_size,
+                  OT_DURATION, "eta",                 m_stat_eta/ONE_SEC,
+                  OT_STRING,"message",                m_message,
+                  OT_END);
+   }
+}
 
 /* the manager references itself through this function */
 void transfer::set_manager(transfer_manager *mgr)
@@ -235,7 +295,6 @@ bool transfer::transition(transfer_state state)
                V(m_mgr->m_stat_mutex);
 
                P(m_mgr->m_mutex);
-               ++m_use_count;
                m_mgr->add_work(this);
                V(m_mgr->m_mutex);
             }
@@ -259,7 +318,6 @@ bool transfer::transition(transfer_state state)
 
                P(m_mgr->m_mutex);
                m_mgr->remove_work(m_workq_elem);
-               --m_use_count;
                V(m_mgr->m_mutex);
             }
          }
@@ -283,7 +341,7 @@ bool transfer::transition(transfer_state state)
 
                /*transfer starts now*/
                P(m_stat_mutex);
-               m_stat_start = (utime_t)time(NULL);
+               m_stat_start = get_current_btime();
                V(m_stat_mutex);
             }
          }
@@ -296,7 +354,12 @@ bool transfer::transition(transfer_state state)
             ret = true;
             /*transfer stops now : compute transfer duration*/
             P(m_stat_mutex);
-            m_stat_duration = (utime_t)time(NULL)-m_stat_start;
+            m_stat_duration = get_current_btime()-m_stat_start;
+            if (m_stat_duration > 0) {
+               m_stat_processed_size = m_stat_size;
+               ASSERTD(m_stat_size == m_stat_processed_size, "xfer done before processed size is equal to size.");
+               m_stat_average_rate = (m_stat_size*ONE_SEC)/m_stat_duration;
+            }
             V(m_stat_mutex);
 
             if (m_mgr) {
@@ -309,17 +372,8 @@ bool transfer::transition(transfer_state state)
                m_mgr->m_stat_size_done += m_stat_size;
                /*add local duration to manager duration */
                m_mgr->m_stat_duration_done += m_stat_duration;
-               /*reprocess the manager average rate with it*/
-               if (m_mgr->m_stat_duration_done != 0) {
-                  m_mgr->m_stat_average_rate =
-                        m_mgr->m_stat_size_done /
-                        m_mgr->m_stat_duration_done;
-               }
                /*unlock manager statistics */
                V(m_mgr->m_stat_mutex);
-
-               /* process is completed, unref the workq reference */
-               --m_use_count;
             }
 
             if (m_proxy) {
@@ -335,7 +389,7 @@ bool transfer::transition(transfer_state state)
             ret = true;
             /*transfer stops now, even if in error*/
             P(m_stat_mutex);
-            m_stat_duration = (utime_t)time(NULL)-m_stat_start;
+            m_stat_duration = get_current_btime()-m_stat_start;
             V(m_stat_mutex);
 
             if (m_mgr) {
@@ -348,9 +402,6 @@ bool transfer::transition(transfer_state state)
                m_mgr->m_stat_size_error += m_stat_size;
                /*unlock manager statistics */
                V(m_mgr->m_stat_mutex);
-
-               /* process is completed, unref the workq reference */
-               --m_use_count;
             }
             /* in both case, success or failure, life keeps going on */
             pthread_cond_broadcast(&m_done);
@@ -569,44 +620,44 @@ bool transfer_manager::find(const char *VolName, uint32_t index)
 void transfer_manager::update_statistics()
 {
    /* lock the manager stats */
-   P(m_stat_mutex);
-
-   /* ETA naive calculation for each element in the queue =
-    * (accumulator(previous elements size) / average_rate) / num_workers;
-    */
-   uint64_t accumulator=0;
+   lock_guard lg_stat(m_stat_mutex);
 
    /* lock the queue so order and chaining cannot be modified */
-   P(m_mutex);
-   P(m_wq.mutex);
-   m_stat_nb_workers = m_wq.max_workers;
+   lock_guard lg(m_mutex);
 
-   /* parse the queued and processed transfers */
+   /* recompute global average rate */
    transfer *t;
+   uint64_t accumulated_done_average_rate = 0;
+   uint32_t nb_done_accumulated = 0;
    foreach_dlist(t, &m_transfer_list) {
-      if ( (t->m_state == TRANS_STATE_QUEUED) ||
-            (t->m_state == TRANS_STATE_PROCESSED)) {
-         accumulator+=t->m_stat_size;
-         P(t->m_stat_mutex);
-         if ((m_stat_average_rate != 0) &&  (m_stat_nb_workers != 0)) {
-            /*update eta for each transfer*/
-            t->m_stat_eta = (accumulator / m_stat_average_rate) / m_stat_nb_workers;
-         }
-         V(t->m_stat_mutex);
+      lock_guard lg_xferstatmutex(t->m_stat_mutex);
+      if (t->m_stat_average_rate>0) {
+         accumulated_done_average_rate += t->m_stat_average_rate;
+         t->m_stat_average_rate = 0;
+         ++nb_done_accumulated;
       }
    }
+   if (nb_done_accumulated) {
+      m_stat_average_rate = accumulated_done_average_rate / nb_done_accumulated;
+   }
 
-   /* the manager ETA is the ETA of the last transfer in its workq */
-   if (m_wq.last) {
-      transfer *t = (transfer *)m_wq.last->data;
-      if (t) {
-         m_stat_eta = t->m_stat_eta;
+   /* ETA naive calculation for each element in the queue */
+   if (m_stat_average_rate != 0) {
+      uint64_t accumulator=0;
+      foreach_dlist(t, &m_transfer_list) {
+         if (t->m_state == TRANS_STATE_QUEUED) {
+            lock_guard lg_xferstatmutex(t->m_stat_mutex);
+            accumulator+= t->m_stat_size-t->m_stat_processed_size;
+            t->m_stat_eta = (accumulator / m_stat_average_rate) * ONE_SEC;
+         }
+         if (t->m_state == TRANS_STATE_PROCESSED) {
+            lock_guard lg_xferstatmutex(t->m_stat_mutex);
+            t->m_stat_eta = ((t->m_stat_size-t->m_stat_processed_size) / m_stat_average_rate) * ONE_SEC;
+         }
       }
+      /* the manager ETA is the ETA of the last transfer in its workq */
+      m_stat_eta = (accumulator / m_stat_average_rate) * ONE_SEC;
    }
-
-   V(m_wq.mutex);
-   V(m_mutex);
-   V(m_stat_mutex);
 }
 
 /* short status of the transfers */
@@ -615,9 +666,10 @@ uint32_t transfer_manager::append_status(POOL_MEM& msg, bool verbose)
    update_statistics();
    char ec0[30],ec1[30],ec2[30],ec3[30],ec4[30];
    POOLMEM *tmp_msg = get_pool_memory(PM_MESSAGE);
+   lock_guard lg_stat(m_stat_mutex);
    uint32_t ret = Mmsg(tmp_msg, _("(%sB/s) (ETA %d s) "
             "Queued=%d %sB, Processed=%d %sB, Done=%d %sB, Failed=%d %sB\n"),
-            edit_uint64_with_suffix(m_stat_average_rate, ec0), m_stat_eta,
+            edit_uint64_with_suffix(m_stat_average_rate, ec0), m_stat_eta/ONE_SEC,
             m_stat_nb_transfer_queued,  edit_uint64_with_suffix(m_stat_size_queued, ec1),
             m_stat_nb_transfer_processed,  edit_uint64_with_suffix(m_stat_size_processed, ec2),
             m_stat_nb_transfer_done,  edit_uint64_with_suffix(m_stat_size_done, ec3),
@@ -625,7 +677,7 @@ uint32_t transfer_manager::append_status(POOL_MEM& msg, bool verbose)
    pm_strcat(msg, tmp_msg);
 
    if (verbose) {
-      P(m_mutex);
+      lock_guard lg(m_mutex);
       if (!m_transfer_list.empty()) {
          ret += Mmsg(tmp_msg, _("------------------------------------------------------------ details ------------------------------------------------------------\n"));
          pm_strcat(msg, tmp_msg);
@@ -634,8 +686,36 @@ uint32_t transfer_manager::append_status(POOL_MEM& msg, bool verbose)
       foreach_dlist(tpkt, &m_transfer_list) {
          ret += tpkt->append_status(msg);
       }
-      V(m_mutex);
    }
    free_pool_memory(tmp_msg);
    return ret;
 }
+
+void transfer_manager::append_api_status(OutputWriter &ow, bool verbose)
+{
+   update_statistics();
+
+   lock_guard lg_stat(m_stat_mutex);
+   ow.get_output(OT_START_OBJ,
+                  OT_INT64, "average_rate",           m_stat_average_rate,
+                  OT_DURATION, "eta",                 m_stat_eta/ONE_SEC,
+                  OT_INT64, "nb_transfer_queued",     m_stat_nb_transfer_queued,
+                  OT_INT64, "size_queued",            m_stat_size_queued,
+                  OT_INT64, "nb_transfer_processed",  m_stat_nb_transfer_processed,
+                  OT_INT64, "size_processed",         m_stat_size_processed,
+                  OT_INT64, "nb_transfer_done",       m_stat_nb_transfer_done,
+                  OT_INT64, "size_done",              m_stat_size_done,
+                  OT_INT64, "nb_transfer_error",      m_stat_nb_transfer_error,
+                  OT_INT64, "size_error",             m_stat_size_error,
+                  OT_INT,   "transfers_list_size",    m_transfer_list.size(),
+                  OT_END);
+   if (verbose) {
+      lock_guard lg(m_mutex);
+      ow.start_list("transfers");
+      transfer *tpkt;
+      foreach_dlist(tpkt, &m_transfer_list) {
+         tpkt->append_api_status(ow);
+      }
+      ow.end_list();
+   }
+}
index e229c16611c2d8c87d0b8dc3e1b7b009a4766f75..cd30d0cabfa08085d8180a75f6551bca60c95e0f 100644 (file)
@@ -1,7 +1,7 @@
 /*
    Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2000-2018 Kern Sibbald
+   Copyright (C) 2000-2020 Kern Sibbald
 
    The original author of Bacula is Kern Sibbald, with contributions
    from many others, a complete list can be found in the file AUTHORS.
@@ -72,12 +72,16 @@ public:
    pthread_mutex_t      m_stat_mutex;
    /* size of the transfer: should be filled asap */
    uint64_t             m_stat_size;
+   /* size processed so far : filled by the processor (driver) */
+   uint64_t             m_stat_processed_size;
    /* time when process started */
-   utime_t              m_stat_start;
+   btime_t              m_stat_start;
    /* duration of the transfer : automatically filled when transfer is completed*/
-   utime_t              m_stat_duration;
+   btime_t              m_stat_duration;
    /* estimate time to arrival : predictive guest approximation of transfer time*/
-   utime_t              m_stat_eta;
+   btime_t              m_stat_eta;
+   /* computed bytes/sec transfer rate */
+   uint64_t             m_stat_average_rate;
 
 /* status variables :*/
    /* protect status changes*/
@@ -159,13 +163,23 @@ public:
    bool cancel();
 
    /* callback fct that checks if transfer has been cancelled */
-   bool is_cancelled() const;
+   bool is_canceled() const;
 
    /* append a status message into msg*/
    uint32_t append_status(POOL_MEM& msgs);
+   void append_api_status(OutputWriter &ow);
 
    void set_do_cache_truncate(bool do_cache_truncate);
 
+   /* reset processed size */
+   void reset_processed_size();
+
+   /* set processed size absolute value */
+   void set_processed_size(uint64_t size);
+
+   /* add increment to the current processed size */
+   void increment_processed_size(uint64_t increment);
+
 protected:
 friend class transfer_manager;
 
@@ -212,11 +226,11 @@ public:
    /* size in bytes of transfers in TRANS_STATE_ERROR state in this manager*/
    uint64_t             m_stat_size_error;
    /* duration of transfers in TRANS_STATE_DONE state in this manager*/
-   utime_t              m_stat_duration_done;
+   btime_t              m_stat_duration_done;
    /* computed bytes/sec transfer rate */
    uint64_t             m_stat_average_rate;
    /* computed Estimate Time to Arrival */
-   utime_t              m_stat_eta;
+   btime_t              m_stat_eta;
 
 
 /* status variables global for this manager: */
@@ -283,6 +297,7 @@ public:
 
    /* append a status message into msg*/
    uint32_t append_status(POOL_MEM& msg, bool verbose);
+   void append_api_status(OutputWriter &ow, bool verbose);
 
 protected:
 friend class transfer;
index b209ca83614bca4feb0553a319e5d4d3caf4592c..f517e1bc667a6baf7dc05520bbb8070a6788988c 100644 (file)
@@ -1,7 +1,7 @@
 /*
    Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2000-2018 Kern Sibbald
+   Copyright (C) 2000-2020 Kern Sibbald
 
    The original author of Bacula is Kern Sibbald, with contributions
    from many others, a complete list can be found in the file AUTHORS.
@@ -125,7 +125,7 @@ bool file_driver::put_object(transfer *xfer, const char *in_fname, const char *o
    }
 
    while (obj_len > 0) {
-      if (xfer->is_cancelled()) {
+      if (xfer->is_canceled()) {
          Mmsg(xfer->m_message, "Job is canceled.\n");
          goto get_out;
       }
@@ -147,6 +147,7 @@ bool file_driver::put_object(transfer *xfer, const char *in_fname, const char *o
             out_fname, be.bstrerror());
       }
       obj_len -= rbytes;
+      xfer->increment_processed_size(rbytes);
       if (limit->use_bwlimit()) {
          limit->control_bwlimit(rbytes);
       }
@@ -177,7 +178,7 @@ bool file_driver::get_cloud_object(transfer *xfer, const char *cloud_fname, cons
    return put_object(xfer, cloud_fname, cache_fname, &download_limit);
 }
 
-bool file_driver::truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *trunc_parts, POOLMEM *&err)
+bool file_driver::truncate_cloud_volume(const char *VolumeName, ilist *trunc_parts, cancel_callback *cancel_cb, POOLMEM *&err)
 {
    bool rtn = true;
    int i;
@@ -189,12 +190,10 @@ bool file_driver::truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist
       make_cloud_filename(filename, VolumeName, i);
       if (unlink(filename) != 0 && errno != ENOENT) {
          berrno be;
-         Mmsg2(err, "Unable to delete %s. ERR=%s\n", filename, be.bstrerror());
-         Dmsg1(dbglvl, "%s", err);
-         Qmsg(dcr->jcr, M_INFO, 0, "%s", err);
+         Mmsg3(err, "truncate_cloud_volume for %s: Unable to delete %s. ERR=%s\n", VolumeName, filename, be.bstrerror());
          rtn = false;
       } else {
-         Dmsg1(dbglvl, "Unlink file %s\n", filename);
+         Mmsg2(err, "truncate_cloud_volume for %s: Unlink file %s.\n", VolumeName, filename);
       }
    }
 
@@ -208,7 +207,7 @@ void file_driver::make_cloud_filename(POOLMEM *&filename,
    Enter(dbglvl);
 
    pm_strcpy(filename, hostName);
-   dev->add_vol_and_part(filename, VolumeName, "part", part);
+   cloud_driver::add_vol_and_part(filename, VolumeName, "part", part);
    Dmsg1(dbglvl, "make_cloud_filename: %s\n", filename);
 }
 
@@ -254,17 +253,14 @@ bool file_driver::copy_cloud_part_to_cache(transfer *xfer)
    uint32_t upload_opt;
 */
 
-bool file_driver::init(JCR *jcr, cloud_dev *adev, DEVRES *adevice)
+bool file_driver::init(CLOUD *cloud, POOLMEM *&err)
 {
-   dev = adev;            /* copy cloud device pointer */
-   device = adevice;      /* copy device resource pointer */
-   cloud = device->cloud; /* local pointer to cloud definition */
-
-   /* File I/O buffer */
-   buf_len = dev->max_block_size;
-   if (buf_len == 0) {
-      buf_len = DEFAULT_BLOCK_SIZE;
+   if (cloud->host_name == NULL) {
+      Mmsg1(err, "Failed to initialize File Cloud. ERR=Hostname not set in cloud resource %s\n", cloud->hdr.name);
+      return false;
    }
+   /* File I/O buffer */
+   buf_len = DEFAULT_BLOCK_SIZE;
 
    hostName = cloud->host_name;
    bucketName = cloud->bucket_name;
@@ -276,27 +272,23 @@ bool file_driver::init(JCR *jcr, cloud_dev *adev, DEVRES *adevice)
    return true;
 }
 
-bool file_driver::start_of_job(DCR *dcr)
+bool file_driver::start_of_job(POOLMEM *&msg)
 {
-   Jmsg(dcr->jcr, M_INFO, 0, _("Using File cloud driver Host=%s Bucket=%s\n"),
-      hostName, bucketName);
+   Mmsg(msg, _("Using File cloud driver Host=%s Bucket=%s\n"), hostName, bucketName);
    return true;
 }
 
-bool file_driver::end_of_job(DCR *dcr)
+bool file_driver::end_of_job(POOLMEM *&msg)
 {
    return true;
 }
 
-/*
- * Note, dcr may be NULL
- */
-bool file_driver::term(DCR *dcr)
+bool file_driver::term(POOLMEM *&msg)
 {
    return true;
 }
 
-bool file_driver::get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, ilist *parts, POOLMEM *&err)
+bool file_driver::get_cloud_volume_parts_list(const char* VolumeName, ilist *parts, cancel_callback *cancel_cb, POOLMEM *&err)
 {
    Enter(dbglvl);
 
@@ -343,7 +335,7 @@ bool file_driver::get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName,
    entry = (struct dirent *)malloc(sizeof(struct dirent) + name_max + 1000);
 
    for ( ;; ) {
-      if (dcr->jcr->is_canceled()) {
+      if (cancel_cb && cancel_cb->fct && cancel_cb->fct(cancel_cb->arg)) {
          pm_strcpy(err, "Job canceled");
          goto get_out;
       }
@@ -413,7 +405,7 @@ get_out:
    return ok;
 }
 
-bool file_driver::get_cloud_volumes_list(DCR *dcr, alist *volumes, POOLMEM *&err)
+bool file_driver::get_cloud_volumes_list(alist *volumes, cancel_callback *cancel_cb, POOLMEM *&err)
 {
    if (!volumes) {
       pm_strcpy(err, "Invalid argument");
@@ -450,7 +442,7 @@ bool file_driver::get_cloud_volumes_list(DCR *dcr, alist *volumes, POOLMEM *&err
    entry = (struct dirent *)malloc(sizeof(struct dirent) + name_max + 1000);
 
    for ( ;; ) {
-      if (dcr->jcr->is_canceled()) {
+      if (cancel_cb && cancel_cb->fct && cancel_cb->fct(cancel_cb->arg)) {
          goto get_out;
       }
       errno = 0;
index 1d31d92a15d0b64f934ce34fd145d8a3f5517f88..b834dd1fbea3f5cb32b44c23a682939523172f10 100644 (file)
@@ -1,7 +1,7 @@
 /*
    Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2000-2018 Kern Sibbald
+   Copyright (C) 2000-2020 Kern Sibbald
 
    The original author of Bacula is Kern Sibbald, with contributions
    from many others, a complete list can be found in the file AUTHORS.
@@ -49,15 +49,15 @@ public:
 
 private:
    void make_cloud_filename(POOLMEM *&filename, const char *VolumeName, uint32_t part);
-   bool init(JCR *jcr, cloud_dev *dev, DEVRES *device);
-   bool start_of_job(DCR *dcr);
-   bool end_of_job(DCR *dcr);
-   bool term(DCR *dcr);
-   bool truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *trunc_parts, POOLMEM *&err);
+   bool init(CLOUD *cloud, POOLMEM *&err);
+   bool start_of_job(POOLMEM *&msg);
+   bool end_of_job(POOLMEM *&msg);
+   bool term(POOLMEM *&msg);
+   bool truncate_cloud_volume(const char *VolumeName, ilist *trunc_parts, cancel_callback *cancel_cb, POOLMEM *&err);
    bool copy_cache_part_to_cloud(transfer *xfer);
    bool copy_cloud_part_to_cache(transfer *xfer);
-   bool get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, ilist *parts, POOLMEM *&err);
-   bool get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err);
+   bool get_cloud_volume_parts_list(const char* VolumeName, ilist *parts, cancel_callback *cancel_cb, POOLMEM *&err);
+   bool get_cloud_volumes_list(alist *volumes, cancel_callback *cancel_cb, POOLMEM *&err);
 
    bool put_object(transfer *xfer, const char *cache_fname, const char *cloud_fname, bwlimit *limit);
    bool get_cloud_object(transfer *xfer, const char *cloud_fname, const char *cache_fname);
index 4cbd5bd5fb6aeeb7777daca8c4ddfc9cf297632b..c1366ef3838018d02f68da09ce3abc95f355e535 100644 (file)
@@ -1,7 +1,7 @@
 /*
    Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2000-2018 Kern Sibbald
+   Copyright (C) 2000-2020 Kern Sibbald
 
    The original author of Bacula is Kern Sibbald, with contributions
    from many others, a complete list can be found in the file AUTHORS.
@@ -176,7 +176,7 @@ static const char *S3Errors[] = {
  */
 class bacula_ctx {
 public:
-   JCR *jcr;
+   cancel_callback *cancel_cb;
    transfer *xfer;
    POOLMEM *&errMsg;
    ilist *parts;
@@ -189,14 +189,20 @@ public:
    alist *volumes;
    S3Status status;
    bwlimit *limit;        /* Used to control the bandwidth */
-   bacula_ctx(POOLMEM *&err) : jcr(NULL), xfer(NULL), errMsg(err), parts(NULL),
+   bacula_ctx(POOLMEM *&err) : cancel_cb(NULL), xfer(NULL), errMsg(err), parts(NULL),
                               isTruncated(0), nextMarker(NULL), obj_len(0), caller(NULL),
                               infile(NULL), outfile(NULL), volumes(NULL), status(S3StatusOK), limit(NULL)
-   {}
-   bacula_ctx(transfer *t) : jcr(NULL), xfer(t), errMsg(t->m_message), parts(NULL),
+   {
+      /* reset error message (necessary in case of retry) */
+      errMsg[0] = 0;
+   }
+   bacula_ctx(transfer *t) : cancel_cb(NULL), xfer(t), errMsg(t->m_message), parts(NULL),
                               isTruncated(0), nextMarker(NULL), obj_len(0), caller(NULL),
                               infile(NULL), outfile(NULL), volumes(NULL), status(S3StatusOK), limit(NULL)
-   {}
+   {
+      /* reset error message (necessary in case of retry) */
+      errMsg[0] = 0;
+   }
 };
 
 
@@ -275,8 +281,14 @@ static void responseCompleteCallback(
    return;
 }
 
-
-
+bool xfer_cancel_cb(void *arg)
+{
+   transfer *xfer = (transfer*)arg;
+   if (xfer) {
+      return xfer->is_canceled();
+   }
+   return false;
+};
 
 static int putObjectCallback(int buf_len, char *buf, void *callbackCtx)
 {
@@ -285,15 +297,15 @@ static int putObjectCallback(int buf_len, char *buf, void *callbackCtx)
    ssize_t rbytes = 0;
    int read_len;
 
-   if (ctx->xfer->is_cancelled()) {
+   if (ctx->xfer->is_canceled()) {
       Mmsg(ctx->errMsg, _("Job cancelled.\n"));
       return -1;
    }
    if (ctx->obj_len) {
       read_len = (ctx->obj_len > buf_len) ? buf_len : ctx->obj_len;
       rbytes = fread(buf, 1, read_len, ctx->infile);
-      Dmsg5(dbglvl, "%s thread=%lu rbytes=%d bufsize=%u remlen=%lu\n",
-             ctx->caller,  pthread_self(), rbytes, buf_len, ctx->obj_len);
+      Dmsg6(dbglvl, "%s xfer=part.%lu thread=%lu rbytes=%d bufsize=%u remlen=%lu\n",
+             ctx->caller, ctx->xfer->m_part, pthread_self(), rbytes, buf_len, ctx->obj_len);
       if (rbytes <= 0) {
          berrno be;
          Mmsg(ctx->errMsg, "%s Error reading input file: ERR=%s\n",
@@ -301,7 +313,7 @@ static int putObjectCallback(int buf_len, char *buf, void *callbackCtx)
          goto get_out;
       }
       ctx->obj_len -= rbytes;
-
+      ctx->xfer->increment_processed_size(rbytes);
       if (ctx->limit) {
          ctx->limit->control_bwlimit(rbytes);
       }
@@ -356,15 +368,16 @@ get_out:
    /* no error so far -> retrieve uploaded part info */
    if (ctx.errMsg[0] == 0) {
       ilist parts;
-      get_cloud_volume_parts_list(xfer->m_dcr, cloud_fname, &parts, ctx.errMsg);
-      for (int i=1; i <= parts.last_index() ; i++) {
-         cloud_part *p = (cloud_part *)parts.get(i);
+      if (get_one_cloud_volume_part(cloud_fname, &parts, ctx.errMsg)) {
+         /* only one part is returned */
+         cloud_part *p = (cloud_part *)parts.get(parts.last_index());
          if (p) {
-            xfer->m_res_size = p->size;
-            xfer->m_res_mtime = p->mtime;
-            break; /* not need to go further */
+         xfer->m_res_size = p->size;
+         xfer->m_res_mtime = p->mtime;
          }
       }
+   } else {
+      Dmsg1(dbglvl, "put_object ERROR: %s\n", ctx.errMsg);
    }
 
    return ctx.status;
@@ -377,7 +390,7 @@ static S3Status getObjectDataCallback(int buf_len, const char *buf,
    ssize_t wbytes;
 
    Enter(dbglvl);
-   if (ctx->xfer->is_cancelled()) {
+   if (ctx->xfer->is_canceled()) {
        Mmsg(ctx->errMsg, _("Job cancelled.\n"));
        return S3StatusAbortedByCallback;
    }
@@ -389,7 +402,7 @@ static S3Status getObjectDataCallback(int buf_len, const char *buf,
          ctx->caller, be.bstrerror());
       return S3StatusAbortedByCallback;
    }
-
+   ctx->xfer->increment_processed_size(wbytes);
    if (ctx->limit) {
       ctx->limit->control_bwlimit(wbytes);
    }
@@ -457,12 +470,11 @@ get_out:
 /*
  * Not thread safe
  */
-bool s3_driver::truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *trunc_parts, POOLMEM *&err)
+bool s3_driver::truncate_cloud_volume(const char *VolumeName, ilist *trunc_parts, cancel_callback *cancel_cb, POOLMEM *&err)
 {
    Enter(dbglvl);
 
    bacula_ctx ctx(err);
-   ctx.jcr = dcr->jcr;
 
    int last_index = (int)trunc_parts->last_index();
    POOLMEM *cloud_fname = get_pool_memory(PM_FNAME);
@@ -470,7 +482,7 @@ bool s3_driver::truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *t
       if (!trunc_parts->get(i)) {
          continue;
       }
-      if (ctx.jcr->is_canceled()) {
+      if (cancel_cb && cancel_cb->fct && cancel_cb->fct(cancel_cb->arg)) {
          Mmsg(err, _("Job cancelled.\n"));
          goto get_out;
       }
@@ -496,7 +508,7 @@ void s3_driver::make_cloud_filename(POOLMEM *&filename,
 {
    Enter(dbglvl);
    filename[0] = 0;
-   dev->add_vol_and_part(filename, VolumeName, "part", apart);
+   add_vol_and_part(filename, VolumeName, "part", apart);
    Dmsg1(dbglvl, "make_cloud_filename: %s\n", filename);
 }
 
@@ -519,6 +531,8 @@ bool s3_driver::copy_cache_part_to_cloud(transfer *xfer)
    uint32_t retry = max_upload_retries;
    S3Status status = S3StatusOK;
    do {
+      /* when the driver decide to retry, it must reset the processed size */
+      xfer->reset_processed_size();
       status = put_object(xfer, xfer->m_cache_fname, cloud_fname);
       --retry;
    } while (retry_put_object(status) && (retry>0));
@@ -543,14 +557,21 @@ bool s3_driver::copy_cloud_part_to_cache(transfer *xfer)
  * NOTE: See the SD Cloud resource in stored_conf.h
 */
 
-bool s3_driver::init(JCR *jcr, cloud_dev *adev, DEVRES *adevice)
+bool s3_driver::init(CLOUD *cloud, POOLMEM *&err)
 {
    S3Status status;
-
-   dev = adev;            /* copy cloud device pointer */
-   device = adevice;      /* copy device resource pointer */
-   cloud = device->cloud; /* local pointer to cloud definition */
-
+   if (cloud->host_name == NULL) {
+      Mmsg1(err, "Failed to initialize S3 Cloud. ERR=Hostname not set in cloud resource %s\n", cloud->hdr.name);
+      return false;
+   }
+   if (cloud->access_key == NULL) {
+      Mmsg1(err, "Failed to initialize S3 Cloud. ERR=AccessKey not set in cloud resource %s\n", cloud->hdr.name);
+      return false;
+   }
+   if (cloud->secret_key == NULL) {
+      Mmsg1(err, "Failed to initialize S3 Cloud. ERR=SecretKey not set in cloud resource %s\n", cloud->hdr.name);
+      return false;
+   }
    /* Setup bucket context for S3 lib */
    s3ctx.hostName = cloud->host_name;
    s3ctx.bucketName = cloud->bucket_name;
@@ -560,46 +581,34 @@ bool s3_driver::init(JCR *jcr, cloud_dev *adev, DEVRES *adevice)
    s3ctx.secretAccessKey = cloud->secret_key;
    s3ctx.authRegion = cloud->region;
 
-   /* File I/O buffer */
-   buf_len = dev->max_block_size;
-   if (buf_len == 0) {
-      buf_len = DEFAULT_BLOCK_SIZE;
-   }
-
    if ((status = S3_initialize("s3", S3_INIT_ALL, s3ctx.hostName)) != S3StatusOK) {
-      Mmsg1(dev->errmsg, "Failed to initialize S3 lib. ERR=%s\n", S3_get_status_name(status));
-      Qmsg1(jcr, M_FATAL, 0, "%s", dev->errmsg);
-      Tmsg1(0, "%s", dev->errmsg);
+      Mmsg1(err, "Failed to initialize S3 lib. ERR=%s\n", S3_get_status_name(status));
       return false;
    }
    return true;
 }
 
-bool s3_driver::start_of_job(DCR *dcr)
+bool s3_driver::start_of_job(POOLMEM *&msg)
 {
-   Jmsg(dcr->jcr, M_INFO, 0, _("Using S3 cloud driver Host=%s Bucket=%s\n"),
-      s3ctx.hostName, s3ctx.bucketName);
+   if (msg) {
+      Mmsg(msg, _("Using S3 cloud driver Host=%s Bucket=%s\n"), s3ctx.hostName, s3ctx.bucketName);
+   }
    return true;
 }
 
-bool s3_driver::end_of_job(DCR *dcr)
+bool s3_driver::end_of_job(POOLMEM *&msg)
 {
    return true;
 }
 
-/*
- * Note, dcr may be NULL
- */
-bool s3_driver::term(DCR *dcr)
+bool s3_driver::term(POOLMEM *&msg)
 {
    S3_deinitialize();
    return true;
 }
 
-
-
 /*
- * libs3 callback for get_cloud_volume_parts_list()
+ * libs3 callback for get_num_cloud_volume_parts_list()
  */
 static S3Status partslistBucketCallback(
    int isTruncated,
@@ -623,6 +632,7 @@ static S3Status partslistBucketCallback(
          part->mtime = obj->lastModified;
          part->size  = obj->size;
          ctx->parts->put(part->index, part);
+         Dmsg1(dbglvl, "partslistBucketCallback: part.%d retrieved\n", part->index);
       }
    }
 
@@ -630,12 +640,14 @@ static S3Status partslistBucketCallback(
    if (ctx->nextMarker) {
       bfree_and_null(ctx->nextMarker);
    }
-   if (nextMarker) {
-      ctx->nextMarker = bstrdup(nextMarker);
+   if (isTruncated && numObj>0) {
+      /* Workaround a bug with nextMarker */
+      const S3ListBucketContent *obj = &(object[numObj-1]);
+      ctx->nextMarker = bstrdup(obj->key);
    }
 
    Leave(dbglvl);
-   if (ctx->jcr->is_canceled()) {
+   if (ctx->cancel_cb && ctx->cancel_cb->fct && ctx->cancel_cb->fct(ctx->cancel_cb->arg)) {
       Mmsg(ctx->errMsg, _("Job cancelled.\n"));
       return S3StatusAbortedByCallback;
    }
@@ -648,9 +660,8 @@ S3ListBucketHandler partslistBucketHandler =
    &partslistBucketCallback
 };
 
-bool s3_driver::get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, ilist *parts, POOLMEM *&err)
+bool s3_driver::get_cloud_volume_parts_list(const char* VolumeName, ilist *parts, cancel_callback *cancel_cb, POOLMEM *&err)
 {
-   JCR *jcr = dcr->jcr;
    Enter(dbglvl);
 
    if (!parts || strlen(VolumeName) == 0) {
@@ -659,7 +670,7 @@ bool s3_driver::get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, il
    }
 
    bacula_ctx ctx(err);
-   ctx.jcr = jcr;
+   ctx.cancel_cb = cancel_cb;
    ctx.parts = parts;
    ctx.isTruncated = 1; /* pass into the while loop at least once */
    ctx.caller = "S3_list_bucket";
@@ -667,6 +678,7 @@ bool s3_driver::get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, il
       ctx.isTruncated = 0;
       S3_list_bucket(&s3ctx, VolumeName, ctx.nextMarker, NULL, 0, NULL, 0,
                      &partslistBucketHandler, &ctx);
+      Dmsg4(dbglvl, "get_cloud_volume_parts_list isTruncated=%d, nextMarker=%s, nbparts=%d, err=%s\n", ctx.isTruncated, ctx.nextMarker, ctx.parts->size(), ctx.errMsg?ctx.errMsg:"None");
       if (ctx.status != S3StatusOK) {
          pm_strcpy(err, S3Errors[ctx.status]);
          bfree_and_null(ctx.nextMarker);
@@ -675,7 +687,33 @@ bool s3_driver::get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, il
    }
    bfree_and_null(ctx.nextMarker);
    return true;
+}
+
+bool s3_driver::get_one_cloud_volume_part(const char* part_path_name, ilist *parts, POOLMEM *&err)
+{
+   Enter(dbglvl);
+
+   if (!parts || strlen(part_path_name) == 0) {
+      pm_strcpy(err, "Invalid argument");
+      return false;
+   }
+
+   bacula_ctx ctx(err);
+   ctx.parts = parts;
+   ctx.isTruncated = 0; /* ignore truncation in this case */
+   ctx.caller = "S3_list_bucket";
+   /* S3 documentation claims the parts will be returned in binary order */
+   /* so part.1 < part.11 b.e. This assumed, the first part retrieved is the one we seek */
+   S3_list_bucket(&s3ctx, part_path_name, ctx.nextMarker, NULL, 1, NULL, 0, &partslistBucketHandler, &ctx);
+   Dmsg4(dbglvl, "get_one_cloud_volume_part isTruncated=%d, nextMarker=%s, nbparts=%d, err=%s\n", ctx.isTruncated, ctx.nextMarker, ctx.parts->size(), ctx.errMsg?ctx.errMsg:"None");
+   if (ctx.status != S3StatusOK) {
+      pm_strcpy(err, S3Errors[ctx.status]);
+      bfree_and_null(ctx.nextMarker);
+      return false;
+   }
 
+   bfree_and_null(ctx.nextMarker);
+   return true;
 }
 
 /*
@@ -703,12 +741,14 @@ static S3Status volumeslistBucketCallback(
    if (ctx->nextMarker) {
       bfree_and_null(ctx->nextMarker);
    }
-   if (nextMarker) {
-      ctx->nextMarker = bstrdup(nextMarker);
+   if (isTruncated && numObj>0) {
+      /* Workaround a bug with nextMarker */
+      const S3ListBucketContent *obj = &(object[numObj-1]);
+      ctx->nextMarker = bstrdup(obj->key);
    }
 
    Leave(dbglvl);
-   if (ctx->jcr->is_canceled()) {
+   if (ctx->cancel_cb && ctx->cancel_cb->fct && ctx->cancel_cb->fct(ctx->cancel_cb->arg)) {
       Mmsg(ctx->errMsg, _("Job cancelled.\n"));
       return S3StatusAbortedByCallback;
    }
@@ -721,9 +761,8 @@ S3ListBucketHandler volumeslistBucketHandler =
    &volumeslistBucketCallback
 };
 
-bool s3_driver::get_cloud_volumes_list(DCR *dcr, alist *volumes, POOLMEM *&err)
+bool s3_driver::get_cloud_volumes_list(alist *volumes, cancel_callback *cancel_cb, POOLMEM *&err)
 {
-   JCR *jcr = dcr->jcr;
    Enter(dbglvl);
 
    if (!volumes) {
@@ -733,7 +772,7 @@ bool s3_driver::get_cloud_volumes_list(DCR *dcr, alist *volumes, POOLMEM *&err)
 
    bacula_ctx ctx(err);
    ctx.volumes = volumes;
-   ctx.jcr = jcr;
+   ctx.cancel_cb = cancel_cb;
    ctx.isTruncated = 1; /* pass into the while loop at least once */
    ctx.caller = "S3_list_bucket";
    while (ctx.isTruncated!=0) {
@@ -748,59 +787,4 @@ bool s3_driver::get_cloud_volumes_list(DCR *dcr, alist *volumes, POOLMEM *&err)
    return (err[0] == 0);
 }
 
-#ifdef really_needed
-static S3Status listBucketCallback(
-   int isTruncated,
-   const char *nextMarker,
-   int contentsCount,
-   const S3ListBucketContent *contents,
-   int commonPrefixesCount,
-   const char **commonPrefixes,
-   void *callbackData);
-
-S3ListBucketHandler listBucketHandler =
-{
-   responseHandler,
-   &listBucketCallback
-};
-
-
-/*
- * List content of a bucket
- */
-static S3Status listBucketCallback(
-   int isTruncated,
-   const char *nextMarker,
-   int numObj,
-   const S3ListBucketContent *contents,
-   int commonPrefixesCount,
-   const char **commonPrefixes,
-   void *callbackData)
-{
-   bacula_ctx *ctx = (bacula_ctx *)callbackCtx;
-   if (print_hdr) {
-      Pmsg1(000, "\n%-22s", "      Object Name");
-      Pmsg2(000, "  %-5s  %-20s", "Size", "   Last Modified");
-      Pmsg0(000, "\n----------------------  -----  --------------------\n");
-      print_hdr = false;   /* print header once only */
-   }
-
-   for (int i = 0; i < numObj; i++) {
-      char timebuf[256];
-      char sizebuf[16];
-      const S3ListBucketContent *content = &(contents[i]);
-      time_t t = (time_t) content->lastModified;
-      strftime(timebuf, sizeof(timebuf), "%Y-%m-%dT%H:%M:%SZ", gmtime(&t));
-      sprintf(sizebuf, "%5llu", (unsigned long long) content->size);
-      Pmsg3(000, "%-22s  %s  %s\n", content->key, sizebuf, timebuf);
-   }
-   Pmsg0(000, "\n");
-   if (ctx->jcr->is_canceled()) {
-      Mmsg(ctx->errMsg, _("Job cancelled.\n"));
-      return S3StatusAbortedByCallback;
-   }
-   return S3StatusOK;
-}
-#endif
-
 #endif /* HAVE_LIBS3 */
index de039d5161aa0a30d2009fad492b1113762516e8..f6d242221af7d00b2eeee2bb4a7e4d4705436ccd 100644 (file)
@@ -1,7 +1,7 @@
 /*
    Bacula(R) - The Network Backup Solution
 
-   Copyright (C) 2000-2018 Kern Sibbald
+   Copyright (C) 2000-2020 Kern Sibbald
 
    The original author of Bacula is Kern Sibbald, with contributions
    from many others, a complete list can be found in the file AUTHORS.
 class s3_driver: public cloud_driver {
 private:
    S3BucketContext s3ctx;       /* Main S3 bucket context */
-   uint32_t buf_len;
-
 public:
    cloud_dev *dev;              /* device that is calling us */
-   DEVRES *device;
-   DCR *dcr;                    /* dcr set during calls to S3_xxx */
-   CLOUD *cloud;                /* Pointer to CLOUD resource */
 
    s3_driver() {
    };
@@ -49,18 +44,21 @@ public:
    };
 
    void make_cloud_filename(POOLMEM *&filename, const char *VolumeName, uint32_t part);
-   bool init(JCR *jcr, cloud_dev *dev, DEVRES *device);
-   bool start_of_job(DCR *dcr);
-   bool term(DCR *dcr);
-   bool end_of_job(DCR *dcr);
-   bool truncate_cloud_volume(DCR *dcr, const char *VolumeName, ilist *trunc_parts, POOLMEM *&err);
+   bool init(CLOUD *cloud, POOLMEM *&err);
+   bool start_of_job(POOLMEM *&err);
+   bool term(POOLMEM *&err);
+   bool end_of_job(POOLMEM *&err);
+   bool truncate_cloud_volume(const char *VolumeName, ilist *trunc_parts, cancel_callback *cancel_cb, POOLMEM *&err);
    bool copy_cache_part_to_cloud(transfer *xfer);
    bool copy_cloud_part_to_cache(transfer *xfer);
-   bool get_cloud_volume_parts_list(DCR *dcr, const char* VolumeName, ilist *parts, POOLMEM *&err);
-   bool get_cloud_volumes_list(DCR* dcr, alist *volumes, POOLMEM *&err);
+   bool get_cloud_volume_parts_list(const char* VolumeName, ilist *parts, cancel_callback *cancel_cb, POOLMEM *&err);
+   bool get_cloud_volumes_list(alist *volumes, cancel_callback *cancel_cb, POOLMEM *&err);
    S3Status put_object(transfer *xfer, const char *cache_fname, const char *cloud_fname);
    bool retry_put_object(S3Status status);
    bool get_cloud_object(transfer *xfer, const char *cloud_fname, const char *cache_fname);
+
+private:
+   bool get_one_cloud_volume_part(const char* part_path_name, ilist *parts, POOLMEM *&err);
 };
 
 #endif  /* HAVE_LIBS3 */