From: norbert.bizet Date: Wed, 21 Sep 2022 09:19:54 +0000 (-0400) Subject: cloud: Fix #9508 transfer remove dcr use for JobId X-Git-Tag: Release-13.0.2~9 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f6fb265d4103b37a6ba1647bf08cb15eddacd913;p=thirdparty%2Fbacula.git cloud: Fix #9508 transfer remove dcr use for JobId --- diff --git a/bacula/src/stored/cloud_dev.c b/bacula/src/stored/cloud_dev.c index fb7c8a37b..971ae055f 100644 --- a/bacula/src/stored/cloud_dev.c +++ b/bacula/src/stored/cloud_dev.c @@ -215,7 +215,7 @@ transfer_state upload_engine(transfer *tpkt) if (tpkt && tpkt->m_driver) { /* call the driver method async */ Dmsg4(dbglvl, "Upload start %s-%d JobId : %d driver :%p\n", - tpkt->m_volume_name, tpkt->m_part, tpkt->m_dcr->jcr->JobId, tpkt->m_driver); + tpkt->m_volume_name, tpkt->m_part, tpkt->m_job_id, tpkt->m_driver); cancel_callback cancel_cb; cancel_cb.fct = DCR_cancel_cb; @@ -253,7 +253,7 @@ transfer_state upload_engine(transfer *tpkt) } } else { Dmsg4(dbglvl, "Move error!! JobId=%d part=%d Vol=%s cache=%s\n", - tpkt->m_dcr->jcr->JobId, tpkt->m_part, tpkt->m_volume_name, tpkt->m_cache_fname); + tpkt->m_job_id, tpkt->m_part, tpkt->m_volume_name, tpkt->m_cache_fname); POOL_MEM dmsg(PM_MESSAGE); tpkt->append_status(dmsg); Dmsg1(dbglvl, "%s\n",dmsg.c_str()); @@ -265,7 +265,7 @@ transfer_state upload_engine(transfer *tpkt) if (!tpkt->m_driver->copy_cache_part_to_cloud(tpkt)) { /* Error message already sent by Qmsg() */ Dmsg4(dbglvl, "Upload error!! JobId=%d part=%d Vol=%s cache=%s\n", - tpkt->m_dcr->jcr->JobId, tpkt->m_part, tpkt->m_volume_name, tpkt->m_cache_fname); + tpkt->m_job_id, tpkt->m_part, tpkt->m_volume_name, tpkt->m_cache_fname); POOL_MEM dmsg(PM_MESSAGE); tpkt->append_status(dmsg); Dmsg1(dbglvl, "%s\n",dmsg.c_str()); @@ -273,7 +273,7 @@ transfer_state upload_engine(transfer *tpkt) } Dmsg2(dbglvl, "Upload end JobId : %d driver :%p\n", - tpkt->m_dcr->jcr->JobId, tpkt->m_driver); + tpkt->m_job_id, tpkt->m_driver); if (tpkt->m_do_cache_truncate && tpkt->m_part!=1) { if (unlink(tpkt->m_cache_fname) != 0) { @@ -308,7 +308,7 @@ transfer_state wait_engine(transfer *tpkt) } if (tpkt->m_driver && tpkt->m_driver->is_waiting_on_server(tpkt)) { Dmsg3(dbglvl, "JobId=%d %s/part.%d waiting...\n", - tpkt->m_dcr->jcr->JobId, tpkt->m_volume_name, tpkt->m_part); + tpkt->m_job_id, tpkt->m_volume_name, tpkt->m_part); lock_guard lg(tpkt->m_mutex); /* increase the timeout increment up to MAX_WAIT_TIMEOUT_INC_INSEC value */ if (tpkt->m_wait_timeout_inc_insec < MAX_WAIT_TIMEOUT_INC_INSEC) @@ -322,7 +322,7 @@ transfer_state wait_engine(transfer *tpkt) return TRANS_STATE_QUEUED; } else { Dmsg3(dbglvl, "JobId=%d %s/part.%d is ready!\n", - tpkt->m_dcr->jcr->JobId, tpkt->m_volume_name, tpkt->m_part); + tpkt->m_job_id, tpkt->m_volume_name, tpkt->m_part); lock_guard lg(tpkt->m_mutex); tpkt->m_wait_timeout_inc_insec = 0; tpkt->m_funct = download_engine; @@ -351,9 +351,9 @@ transfer_state download_engine(transfer *tpkt) if (tpkt && tpkt->m_driver) { /* call the driver method async */ Dmsg4(dbglvl, "JobId=%d %s/part.%d download started to %s.\n", - tpkt->m_dcr->jcr->JobId, tpkt->m_volume_name, tpkt->m_part, tpkt->m_cache_fname); + tpkt->m_job_id, tpkt->m_volume_name, tpkt->m_part, tpkt->m_cache_fname); Dmsg4(dbglvl, "%s/part.%d download started. job : %d driver :%p\n", - tpkt->m_volume_name, tpkt->m_part, tpkt->m_dcr->jcr->JobId, tpkt->m_driver); + tpkt->m_volume_name, tpkt->m_part, tpkt->m_job_id, tpkt->m_driver); int ret = tpkt->m_driver->copy_cloud_part_to_cache(tpkt); switch (ret) { case cloud_driver::CLOUD_DRIVER_COPY_PART_TO_CACHE_OK: @@ -366,7 +366,7 @@ transfer_state download_engine(transfer *tpkt) strcpy(p,partnumber); if (rename(tpkt->m_cache_fname, cache_fname) != 0) { Dmsg5(dbglvl, "JobId=%d %s/part.%d download. part copy from %s to %s error!!\n", - tpkt->m_dcr->jcr->JobId, tpkt->m_volume_name, tpkt->m_part, tpkt->m_cache_fname, cache_fname); + tpkt->m_job_id, tpkt->m_volume_name, tpkt->m_part, tpkt->m_cache_fname, cache_fname); free_pool_memory(cache_fname); return TRANS_STATE_ERROR; } @@ -376,7 +376,7 @@ transfer_state download_engine(transfer *tpkt) case cloud_driver::CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR: { Dmsg4(dbglvl, "JobId=%d %s/part.%d download to cache=%s error!!\n", - tpkt->m_dcr->jcr->JobId, tpkt->m_volume_name, tpkt->m_part, tpkt->m_cache_fname); + tpkt->m_job_id, tpkt->m_volume_name, tpkt->m_part, tpkt->m_cache_fname); POOL_MEM dmsg(PM_MESSAGE); tpkt->append_status(dmsg); Dmsg1(dbglvl, "%s\n",dmsg.c_str()); @@ -394,7 +394,7 @@ transfer_state download_engine(transfer *tpkt) { lock_guard lg(tpkt->m_mutex); Dmsg4(dbglvl, "JobId=%d %s/part.%d download to cache=%s retry... \n", - tpkt->m_dcr->jcr->JobId, tpkt->m_volume_name, tpkt->m_part, tpkt->m_cache_fname); + tpkt->m_job_id, tpkt->m_volume_name, tpkt->m_part, tpkt->m_cache_fname); tpkt->m_wait_timeout_inc_insec = WAIT_TIMEOUT_INC_INSEC; tpkt->m_wait_timeout = time(NULL)+ tpkt->m_wait_timeout_inc_insec; tpkt->m_funct = wait_engine; @@ -462,6 +462,7 @@ bool cloud_dev::upload_part_to_cloud(DCR *dcr, const char *VolumeName, uint32_t VolumeName, /* VolumeName is duplicated in the transfer constructor*/ upart, driver, + dcr->jcr->JobId, dcr, cloud_prox); dcr->uploads->append(item); @@ -567,6 +568,7 @@ transfer *cloud_dev::download_part_to_cache(DCR *dcr, const char *VolumeName, ui VolumeName, /* VolumeName is duplicated in the transfer constructor*/ dpart, driver, + dcr->jcr->JobId, dcr, NULL); // no proxy on download to cache dcr->downloads->append(item); diff --git a/bacula/src/stored/cloud_transfer_mgr.c b/bacula/src/stored/cloud_transfer_mgr.c index 618442007..0c319a08d 100644 --- a/bacula/src/stored/cloud_transfer_mgr.c +++ b/bacula/src/stored/cloud_transfer_mgr.c @@ -49,6 +49,7 @@ transfer::transfer(uint64_t size, const char *volume_name, uint32_t part, cloud_driver *driver, + uint32_t JobId, DCR *dcr, cloud_proxy *proxy) : m_stat_size(size), @@ -68,6 +69,7 @@ transfer::transfer(uint64_t size, m_volume_name(bstrdup(volume_name)), /* volume name is duplicated*/ m_part(part), m_driver(driver), + m_job_id(JobId), m_dcr(dcr), m_proxy(proxy), m_workq_elem(NULL), @@ -262,11 +264,13 @@ uint32_t transfer::append_status(POOL_MEM& msg) void transfer::append_api_status(OutputWriter &ow) { lock_guard lg(m_stat_mutex); + Dmsg2(dbglvl, "transfer::append_api_status state=%d JobId=%d\n", m_state, m_job_id); + if (m_state > TRANS_STATE_PROCESSED) { ow.get_output(OT_START_OBJ, OT_STRING,"volume_name", NPRTB(m_volume_name), OT_INT32, "part", m_part, - OT_INT32, "jobid", m_dcr ? (m_dcr->jcr ? m_dcr->jcr->JobId : 0) : 0, + OT_INT32, "jobid", m_job_id, OT_STRING,"state", (m_state == TRANS_STATE_QUEUED) ? (m_wait_timeout_inc_insec == 0) ? "queued":"waiting" :transfer_state_name[m_state], OT_INT64, "size", m_stat_size, @@ -278,7 +282,7 @@ void transfer::append_api_status(OutputWriter &ow) ow.get_output(OT_START_OBJ, OT_STRING,"volume_name", NPRTB(m_volume_name), OT_INT32, "part", m_part, - OT_INT32, "jobid", m_dcr ? (m_dcr->jcr ? m_dcr->jcr->JobId : 0) : 0, + OT_INT32, "jobid", m_job_id, OT_STRING,"state", (m_state == TRANS_STATE_QUEUED) ? (m_wait_timeout_inc_insec == 0) ? "queued":"waiting" :transfer_state_name[m_state], OT_INT64, "size", m_stat_size, @@ -571,6 +575,7 @@ transfer *transfer_manager::get_xfer(uint64_t size, const char *volume_name, uint32_t part, cloud_driver *driver, + uint32_t JobId, DCR *dcr, cloud_proxy *proxy) { @@ -593,6 +598,7 @@ transfer *transfer_manager::get_xfer(uint64_t size, volume_name, /* volume_name is duplicated in the transfer constructor*/ part, driver, + JobId, dcr, proxy)); diff --git a/bacula/src/stored/cloud_transfer_mgr.h b/bacula/src/stored/cloud_transfer_mgr.h index 3031f3f43..4f77888d6 100644 --- a/bacula/src/stored/cloud_transfer_mgr.h +++ b/bacula/src/stored/cloud_transfer_mgr.h @@ -115,6 +115,7 @@ public: char *m_volume_name; uint32_t m_part; cloud_driver *m_driver; + uint32_t m_job_id; DCR *m_dcr; cloud_proxy *m_proxy; /* size of the transfer result : filled by the processor (driver) */ @@ -153,6 +154,7 @@ public: const char *volume_name, uint32_t part, cloud_driver *driver, + uint32_t JobId, DCR *dcr, cloud_proxy *proxy ); @@ -290,6 +292,7 @@ public: const char *volume_name, uint32_t part, cloud_driver *driver, + uint32_t JobId, DCR *dcr, cloud_proxy *proxy); diff --git a/bacula/src/stored/file_driver.c b/bacula/src/stored/file_driver.c index 19b082a53..1c6683caa 100644 --- a/bacula/src/stored/file_driver.c +++ b/bacula/src/stored/file_driver.c @@ -376,7 +376,7 @@ bool file_driver::move_cloud_part(const char *VolumeName, uint32_t apart , const rtn = true; } else { exists = 1; - transfer xfer(statbuf.st_size, NULL, cloud_source_name, VolumeName, apart, NULL, NULL, NULL); + transfer xfer(statbuf.st_size, NULL, cloud_source_name, VolumeName, apart, NULL, 0, NULL, NULL); rtn = put_object(&xfer, cloud_source_name, cloud_dest_name, &upload_limit); Mmsg(err,"%s",rtn ? to:xfer.m_message); }