]> git.ipfire.org Git - thirdparty/bacula.git/commitdiff
cloud: Fix #9508 transfer remove dcr use for JobId
authornorbert.bizet <norbert.bizet@baculasystems.com>
Wed, 21 Sep 2022 09:19:54 +0000 (05:19 -0400)
committerEric Bollengier <eric@baculasystems.com>
Sat, 18 Feb 2023 09:21:25 +0000 (10:21 +0100)
bacula/src/stored/cloud_dev.c
bacula/src/stored/cloud_transfer_mgr.c
bacula/src/stored/cloud_transfer_mgr.h
bacula/src/stored/file_driver.c

index fb7c8a37bbb44ad1bb3140c256a20d30dd5a0c08..971ae055fb39258fee5800b24dc3812162d6e36e 100644 (file)
@@ -215,7 +215,7 @@ transfer_state upload_engine(transfer *tpkt)
    if (tpkt && tpkt->m_driver) {
       /* call the driver method async */
       Dmsg4(dbglvl, "Upload start %s-%d JobId : %d driver :%p\n",
-         tpkt->m_volume_name, tpkt->m_part, tpkt->m_dcr->jcr->JobId, tpkt->m_driver);
+         tpkt->m_volume_name, tpkt->m_part, tpkt->m_job_id, tpkt->m_driver);
 
       cancel_callback cancel_cb;
       cancel_cb.fct = DCR_cancel_cb;
@@ -253,7 +253,7 @@ transfer_state upload_engine(transfer *tpkt)
             }
          } else {
             Dmsg4(dbglvl, "Move error!! JobId=%d part=%d Vol=%s cache=%s\n",
-               tpkt->m_dcr->jcr->JobId, tpkt->m_part, tpkt->m_volume_name, tpkt->m_cache_fname);
+               tpkt->m_job_id, tpkt->m_part, tpkt->m_volume_name, tpkt->m_cache_fname);
             POOL_MEM dmsg(PM_MESSAGE);
             tpkt->append_status(dmsg);
             Dmsg1(dbglvl, "%s\n",dmsg.c_str());
@@ -265,7 +265,7 @@ transfer_state upload_engine(transfer *tpkt)
       if (!tpkt->m_driver->copy_cache_part_to_cloud(tpkt)) {
          /* Error message already sent by Qmsg() */
          Dmsg4(dbglvl, "Upload error!! JobId=%d part=%d Vol=%s cache=%s\n",
-            tpkt->m_dcr->jcr->JobId, tpkt->m_part, tpkt->m_volume_name, tpkt->m_cache_fname);
+            tpkt->m_job_id, tpkt->m_part, tpkt->m_volume_name, tpkt->m_cache_fname);
          POOL_MEM dmsg(PM_MESSAGE);
          tpkt->append_status(dmsg);
          Dmsg1(dbglvl, "%s\n",dmsg.c_str());
@@ -273,7 +273,7 @@ transfer_state upload_engine(transfer *tpkt)
       }
 
       Dmsg2(dbglvl, "Upload end JobId : %d driver :%p\n",
-         tpkt->m_dcr->jcr->JobId, tpkt->m_driver);
+         tpkt->m_job_id, tpkt->m_driver);
 
       if (tpkt->m_do_cache_truncate && tpkt->m_part!=1) {
          if (unlink(tpkt->m_cache_fname) != 0) {
@@ -308,7 +308,7 @@ transfer_state wait_engine(transfer *tpkt)
       }
       if (tpkt->m_driver && tpkt->m_driver->is_waiting_on_server(tpkt)) {
          Dmsg3(dbglvl, "JobId=%d %s/part.%d waiting...\n",
-         tpkt->m_dcr->jcr->JobId, tpkt->m_volume_name, tpkt->m_part);
+         tpkt->m_job_id, tpkt->m_volume_name, tpkt->m_part);
          lock_guard lg(tpkt->m_mutex);
          /* increase the timeout increment up to MAX_WAIT_TIMEOUT_INC_INSEC value */
          if (tpkt->m_wait_timeout_inc_insec < MAX_WAIT_TIMEOUT_INC_INSEC)
@@ -322,7 +322,7 @@ transfer_state wait_engine(transfer *tpkt)
          return TRANS_STATE_QUEUED;
       } else {
          Dmsg3(dbglvl, "JobId=%d %s/part.%d is ready!\n",
-         tpkt->m_dcr->jcr->JobId, tpkt->m_volume_name, tpkt->m_part);
+         tpkt->m_job_id, tpkt->m_volume_name, tpkt->m_part);
          lock_guard lg(tpkt->m_mutex);
          tpkt->m_wait_timeout_inc_insec = 0;
          tpkt->m_funct = download_engine;
@@ -351,9 +351,9 @@ transfer_state download_engine(transfer *tpkt)
    if (tpkt && tpkt->m_driver) {
       /* call the driver method async */
       Dmsg4(dbglvl, "JobId=%d %s/part.%d download started to %s.\n",
-      tpkt->m_dcr->jcr->JobId, tpkt->m_volume_name, tpkt->m_part, tpkt->m_cache_fname);
+      tpkt->m_job_id, tpkt->m_volume_name, tpkt->m_part, tpkt->m_cache_fname);
       Dmsg4(dbglvl, "%s/part.%d download started. job : %d driver :%p\n",
-         tpkt->m_volume_name, tpkt->m_part, tpkt->m_dcr->jcr->JobId, tpkt->m_driver);
+         tpkt->m_volume_name, tpkt->m_part, tpkt->m_job_id, tpkt->m_driver);
       int ret = tpkt->m_driver->copy_cloud_part_to_cache(tpkt);
       switch (ret) {
          case cloud_driver::CLOUD_DRIVER_COPY_PART_TO_CACHE_OK:
@@ -366,7 +366,7 @@ transfer_state download_engine(transfer *tpkt)
             strcpy(p,partnumber);
             if (rename(tpkt->m_cache_fname, cache_fname) != 0) {
                Dmsg5(dbglvl, "JobId=%d %s/part.%d download. part copy from %s to %s error!!\n",
-               tpkt->m_dcr->jcr->JobId, tpkt->m_volume_name, tpkt->m_part, tpkt->m_cache_fname, cache_fname);
+               tpkt->m_job_id, tpkt->m_volume_name, tpkt->m_part, tpkt->m_cache_fname, cache_fname);
                free_pool_memory(cache_fname);
                return TRANS_STATE_ERROR;
             }
@@ -376,7 +376,7 @@ transfer_state download_engine(transfer *tpkt)
          case cloud_driver::CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR:
          {
             Dmsg4(dbglvl, "JobId=%d %s/part.%d download to cache=%s error!!\n",
-               tpkt->m_dcr->jcr->JobId, tpkt->m_volume_name, tpkt->m_part, tpkt->m_cache_fname);
+               tpkt->m_job_id, tpkt->m_volume_name, tpkt->m_part, tpkt->m_cache_fname);
             POOL_MEM dmsg(PM_MESSAGE);
             tpkt->append_status(dmsg);
             Dmsg1(dbglvl, "%s\n",dmsg.c_str());
@@ -394,7 +394,7 @@ transfer_state download_engine(transfer *tpkt)
          {
             lock_guard lg(tpkt->m_mutex);
             Dmsg4(dbglvl, "JobId=%d %s/part.%d download to cache=%s retry... \n",
-               tpkt->m_dcr->jcr->JobId, tpkt->m_volume_name, tpkt->m_part, tpkt->m_cache_fname);
+               tpkt->m_job_id, tpkt->m_volume_name, tpkt->m_part, tpkt->m_cache_fname);
             tpkt->m_wait_timeout_inc_insec = WAIT_TIMEOUT_INC_INSEC;
             tpkt->m_wait_timeout = time(NULL)+ tpkt->m_wait_timeout_inc_insec;
             tpkt->m_funct = wait_engine;
@@ -462,6 +462,7 @@ bool cloud_dev::upload_part_to_cloud(DCR *dcr, const char *VolumeName, uint32_t
                                        VolumeName, /* VolumeName is duplicated in the transfer constructor*/
                                        upart,
                                        driver,
+                                       dcr->jcr->JobId,
                                        dcr,
                                        cloud_prox);
    dcr->uploads->append(item);
@@ -567,6 +568,7 @@ transfer *cloud_dev::download_part_to_cache(DCR *dcr, const char *VolumeName, ui
                                  VolumeName, /* VolumeName is duplicated in the transfer constructor*/
                                  dpart,
                                  driver,
+                                 dcr->jcr->JobId,
                                  dcr,
                                  NULL); // no proxy on download to cache
       dcr->downloads->append(item);
index 6184420070c7a1b38bf3d4227ce2a1977fd6f14b..0c319a08dc80edd142283fc1651821df3d805468 100644 (file)
@@ -49,6 +49,7 @@ transfer::transfer(uint64_t    size,
                   const char   *volume_name,
                   uint32_t     part,
                   cloud_driver *driver,
+                  uint32_t     JobId,
                   DCR          *dcr,
                   cloud_proxy  *proxy) :
    m_stat_size(size),
@@ -68,6 +69,7 @@ transfer::transfer(uint64_t    size,
    m_volume_name(bstrdup(volume_name)), /* volume name is duplicated*/
    m_part(part),
    m_driver(driver),
+   m_job_id(JobId),
    m_dcr(dcr),
    m_proxy(proxy),
    m_workq_elem(NULL),
@@ -262,11 +264,13 @@ uint32_t transfer::append_status(POOL_MEM& msg)
 void transfer::append_api_status(OutputWriter &ow)
 {
    lock_guard lg(m_stat_mutex);
+   Dmsg2(dbglvl, "transfer::append_api_status state=%d JobId=%d\n", m_state, m_job_id);
+
    if (m_state > TRANS_STATE_PROCESSED) {
          ow.get_output(OT_START_OBJ,
                   OT_STRING,"volume_name",            NPRTB(m_volume_name),
                   OT_INT32, "part",                   m_part,
-                  OT_INT32, "jobid",                  m_dcr ? (m_dcr->jcr ? m_dcr->jcr->JobId : 0) : 0,
+                  OT_INT32, "jobid",                  m_job_id,
                   OT_STRING,"state",                  (m_state == TRANS_STATE_QUEUED) ? 
                                                          (m_wait_timeout_inc_insec == 0) ? "queued":"waiting" :transfer_state_name[m_state],
                   OT_INT64, "size",                   m_stat_size,
@@ -278,7 +282,7 @@ void transfer::append_api_status(OutputWriter &ow)
          ow.get_output(OT_START_OBJ,
                   OT_STRING,"volume_name",            NPRTB(m_volume_name),
                   OT_INT32, "part",                   m_part,
-                  OT_INT32, "jobid",                  m_dcr ? (m_dcr->jcr ? m_dcr->jcr->JobId : 0) : 0,
+                  OT_INT32, "jobid",                  m_job_id,
                   OT_STRING,"state",                  (m_state == TRANS_STATE_QUEUED) ? 
                                                          (m_wait_timeout_inc_insec == 0) ? "queued":"waiting" :transfer_state_name[m_state],
                   OT_INT64, "size",                   m_stat_size,
@@ -571,6 +575,7 @@ transfer *transfer_manager::get_xfer(uint64_t     size,
             const char   *volume_name,
             uint32_t     part,
             cloud_driver *driver,
+            uint32_t     JobId,
             DCR          *dcr,
             cloud_proxy  *proxy)
 {
@@ -593,6 +598,7 @@ transfer *transfer_manager::get_xfer(uint64_t     size,
                        volume_name, /* volume_name is duplicated in the transfer constructor*/
                        part,
                        driver,
+                       JobId,
                        dcr,
                        proxy));
 
index 3031f3f432b45da3a47d6b5cc52a5479e628882c..4f77888d665794c7cb13415cd9d2726cd502f28b 100644 (file)
@@ -115,6 +115,7 @@ public:
    char                *m_volume_name;
    uint32_t             m_part;
    cloud_driver        *m_driver;
+   uint32_t             m_job_id;
    DCR                 *m_dcr;
    cloud_proxy         *m_proxy;
    /* size of the transfer result : filled by the processor (driver) */
@@ -153,6 +154,7 @@ public:
             const char   *volume_name,
             uint32_t     part,
             cloud_driver *driver,
+            uint32_t     JobId,
             DCR          *dcr,
             cloud_proxy  *proxy
       );
@@ -290,6 +292,7 @@ public:
             const char   *volume_name,
             uint32_t     part,
             cloud_driver *driver,
+            uint32_t     JobId,
             DCR          *dcr,
             cloud_proxy  *proxy);
 
index 19b082a53272d9e013741f4efbdda593d5579f3a..1c6683caa087de70db8fbea1982d7260bf752ef3 100644 (file)
@@ -376,7 +376,7 @@ bool file_driver::move_cloud_part(const char *VolumeName, uint32_t apart , const
       rtn = true;
    } else {
       exists = 1;
-      transfer xfer(statbuf.st_size, NULL, cloud_source_name, VolumeName, apart, NULL, NULL, NULL);
+      transfer xfer(statbuf.st_size, NULL, cloud_source_name, VolumeName, apart, NULL, 0, NULL, NULL);
       rtn = put_object(&xfer, cloud_source_name, cloud_dest_name, &upload_limit);
       Mmsg(err,"%s",rtn ? to:xfer.m_message);
    }