From: norbert.bizet Date: Thu, 22 Sep 2022 09:03:36 +0000 (-0400) Subject: glacier: Fix #9511 handle return values in case of glacier retry X-Git-Tag: Beta-15.0.0~505 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4c52015bcf8643dd7b477784d4c8239d557ee44b;p=thirdparty%2Fbacula.git glacier: Fix #9511 handle return values in case of glacier retry --- diff --git a/bacula/scripts/aws_cloud_driver.in b/bacula/scripts/aws_cloud_driver.in index 1b04e69fe..c0234495a 100755 --- a/bacula/scripts/aws_cloud_driver.in +++ b/bacula/scripts/aws_cloud_driver.in @@ -31,7 +31,7 @@ import logging, traceback # RETRY DOWNLOAD -RETRY_DOWNLOAD = 0xED +RETRY_DOWNLOAD = 0x0D def vol_ls(): try: @@ -228,12 +228,18 @@ def wait_on_restore(): if err: logging.error("wait_on_restore got error {0}".format(err)) sys.stderr.write(err) + return 1 if output: logging.info("wait_on_restore got ouput {0}".format(output)) j = json.loads(output) - if "Restore" in j and j["Restore"] and j["Restore"]=='ongoing-request="true"': - logging.info("Ongoing restore detected") - return 0 + if "Restore" in j and j["Restore"]: + if j["Restore"]=='ongoing-request="true"': + logging.info("Ongoing restore detected") + sys.stdout.write("WOR-INPROGRESS") + elif j["Restore"]=='ongoing-request="false"': + logging.info("NO Ongoing restore detected") + sys.stdout.write("WOR-DONE") + return 0 except Exception as e: exc = traceback.format_exception_only(type(e), e)[0] sys.stderr.write(exc) diff --git a/bacula/src/stored/generic_driver.c b/bacula/src/stored/generic_driver.c index a7347aecc..18d839b6d 100644 --- a/bacula/src/stored/generic_driver.c +++ b/bacula/src/stored/generic_driver.c @@ -33,8 +33,9 @@ const int64_t dbglvl = DT_CLOUD|50; const size_t block_size = 4096; const int TIMEOUT = 0; -const int RETRY_DOWNLOAD = 0xED; - +const int RETRY_DOWNLOAD = 0x0D; +const int MIN_RESTORE_CONTAINER_NAME_SIZE = 3; +const int MAX_RESTORE_CONTAINER_NAME_SIZE = 63; #ifdef __cplusplus extern "C" { #endif @@ -843,6 +844,20 @@ int generic_driver::copy_cloud_part_to_cache(transfer *xfer) ccb.fct = ©_cloud_part_to_cache_cancel_cb; ccb.arg = (void*)xfer; + if (xfer) { + /* Azure restores on a different bucket, so the restore bucket is the one to be checked */ + /* swap bucket to restore or swap it back to the original */ + if (strlen(xfer->m_restore_bucket) != 0) { + pm_strcpy(bucket_env, "CLOUD_BUCKET="); + pm_strcat(bucket_env, NPRTB(xfer->m_restore_bucket)); + envs[1] = bucket_env; + } else { + pm_strcpy(bucket_env, "CLOUD_BUCKET="); + pm_strcat(bucket_env, NPRTB(bucket_name)); + envs[1] = bucket_env; + } + } + bool use_cache_file_path = (strstr(driver_command, "was_cloud_driver") != NULL); if (!use_cache_file_path) { /* Create the cache file */ @@ -901,10 +916,23 @@ int generic_driver::copy_cloud_part_to_cache(transfer *xfer) free(fname); - if (ret==RETRY_DOWNLOAD) return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY; + if (ret==RETRY_DOWNLOAD) + { + /* reset xfer->m_message (MT9511) + Normally we dont do it in order to report as many errors as possible but + retry is detected based on the return code (0x0D). + This code might be interpreted as an error thru bpipe ("Unknown error during program execvp" in berror.c), + so to keep the log clean, we reset the error message in this very specific case. */ + if (xfer && xfer->m_message) { + Dmsg3(dbglvl, "generic_driver::RETRY_DOWNLOAD purgin xfer message '%s' %s\\part%d.\n", xfer->m_message, xfer->m_volume_name, xfer->m_part); + *(xfer->m_message)='\0'; + } + return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY; + } + return (ret==0) ? CLOUD_DRIVER_COPY_PART_TO_CACHE_OK:CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR; - } else { + } else { /* Azure is assumed here. Make sure to adjust logic if other provider use direct write to cache*/ int ret=call_fct("download",xfer->m_volume_name, xfer->m_part, NULL, NULL, &ccb, xfer->m_message, xfer->m_cache_fname); if (getenv("GENERATE_CLOUD_HASH") && (ret==0)) { /* compute SHA512 */ @@ -937,7 +965,54 @@ int generic_driver::copy_cloud_part_to_cache(transfer *xfer) bmemzero(xfer->m_hash64, 64); } - if (ret==RETRY_DOWNLOAD) return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY; + if (ret==RETRY_DOWNLOAD) { + /* the python driver will write the restoration bucket name into the cache file */ + /* retrieve the restore bucket name */ + char *fname = bstrdup(xfer->m_cache_fname); + FILE *file=bfopen(fname, "rb"); + if (!file) { + berrno be; + Mmsg2(xfer->m_message, "Could not open output file %s. ERR=%s\n", + fname, be.bstrerror()); + free(fname); + return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR; + } + + fseek(file, 0L, SEEK_END); + ssize_t size = ftell(file); + fseek(file, 0L, SEEK_SET); + + /*Azure Container names must be from 3 through 63 characters long.*/ + if (size < MIN_RESTORE_CONTAINER_NAME_SIZE || + size > MAX_RESTORE_CONTAINER_NAME_SIZE) { + /* if we're out-of-bound, something is wrong: let's abort */ + Mmsg1(xfer->m_message, "restore bucket name length %d is out of range. Aborting.\n", size); + free(fname); + return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR; + } else { + char restore_bucket_name[MAX_RESTORE_CONTAINER_NAME_SIZE+1]; + ssize_t ret = fread(restore_bucket_name, 1, size, file); + if (ret != size) { + berrno be; + Mmsg2(xfer->m_message, "Could not close output file %s. ERR=%s\n", + fname, be.bstrerror()); + free(fname); + return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR; + } + restore_bucket_name[size] = '\0'; + if (fclose(file) != 0) { + berrno be; + Mmsg2(xfer->m_message, "Could not close output file %s. ERR=%s\n", + fname, be.bstrerror()); + free(fname); + return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR; + } + free(fname); + xfer->set_restore_bucket(restore_bucket_name); + return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY; + } + } + return (ret==0) ? CLOUD_DRIVER_COPY_PART_TO_CACHE_OK:CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR; } return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR; @@ -950,14 +1025,55 @@ bool generic_driver::restore_cloud_object(transfer *xfer, const char *cloud_fnam /* not implemented */ return false; } + +struct is_waiting_on_server_read_cb_arg { + POOLMEM *msg; +}; + +size_t is_waiting_on_server_read_cb(char *res, size_t size, void* arg) +{ + is_waiting_on_server_read_cb_arg*_arg = (is_waiting_on_server_read_cb_arg*)arg; + if (_arg) { + Mmsg(_arg->msg, "%s", res); + } + return size; +} + bool generic_driver::is_waiting_on_server(transfer *xfer) { Dmsg2(dbglvl, "generic_driver::is_waiting_on_server for %spart%d.\n", xfer->m_volume_name, xfer->m_part); - if (strstr(driver_command, "aws_cloud_driver") != NULL) { + /* only Amazon and Azure support restoration */ + if (strstr(driver_command, "aws_cloud_driver") != NULL || + strstr(driver_command, "was_cloud_driver") != NULL) { Dmsg0(dbglvl, "call_fct wait_on_restore\n"); - int ret = call_fct("wait_on_restore",xfer->m_volume_name, xfer->m_part, NULL, NULL, NULL, xfer->m_message, NULL); - Dmsg1(dbglvl, "wait_on_restore returns %d\n", ret); - return (ret == 0); + + if (xfer) { + /* Azure restores on a different bucket, so the restore bucket is the one to be checked */ + /* swap bucket to restore or swap it back to the original */ + if (strlen(xfer->m_restore_bucket) != 0) { + pm_strcpy(bucket_env, "CLOUD_BUCKET="); + pm_strcat(bucket_env, NPRTB(xfer->m_restore_bucket)); + envs[1] = bucket_env; + } else { + pm_strcpy(bucket_env, "CLOUD_BUCKET="); + pm_strcat(bucket_env, NPRTB(bucket_name)); + envs[1] = bucket_env; + } + } + + POOLMEM *output = get_memory(block_size+1); + read_callback rcb; + rcb.fct = &is_waiting_on_server_read_cb; + is_waiting_on_server_read_cb_arg arg; + arg.msg = output; + rcb.arg = (void*)&arg; + + int ret = call_fct("wait_on_restore",xfer->m_volume_name, xfer->m_part, &rcb, NULL, NULL, xfer->m_message, NULL); + /* We report xfer->m_message here once */ + Dmsg3(dbglvl, "wait_on_restore returns %d. output=%s error=%s\n", ret, output, xfer->m_message); + bool in_progress = (strcmp(output, "WOR-INPROGRESS") == 0); + free_pool_memory(output); + return (in_progress); } return false; } @@ -991,7 +1107,7 @@ bool generic_driver::truncate_cloud_volume(const char *volume_name, ilist *trunc struct clean_cloud_volume_read_cb_arg { - POOLMEM *remain; + POOLMEM **remain; ilist *parts; cleanup_cb_type *cb; cleanup_ctx_type *ctx; @@ -999,27 +1115,30 @@ struct clean_cloud_volume_read_cb_arg { size_t clean_cloud_volume_read_cb(char *res, size_t sz, void* arg) { + ilist *parts = NULL; bool wrong_string = false; size_t left = sz; + POOLMEM **remain = NULL; clean_cloud_volume_read_cb_arg *_arg = (clean_cloud_volume_read_cb_arg*)arg; - if (!_arg) { - return 0; + if (_arg) { + parts = _arg->parts; + remain = _arg->remain; } - if (_arg->parts) { + if (parts) { char * pch = strtok (res,"\n"); /* we enter the cb again and remaining string has not been processed */ - if (_arg->remain && strlen(_arg->remain) != 0) { - pm_strcat(_arg->remain, pch); - char *name=strstr(_arg->remain, "part"); - char *time=strstr(_arg->remain, ",mtime:"); + if (remain && strlen(*remain) != 0) { + pm_strcat(remain, pch); + char *name=strstr(*remain, "part"); + char *time=strstr(*remain, ",mtime:"); if (name && time) { *time='\0'; - _arg->parts->append(bstrdup(name)); + parts->append(bstrdup(name)); } else { wrong_string = true; } - *(_arg->remain) = 0; + **remain = 0; left -= strlen(pch)+1; pch = strtok (NULL,"\n"); } @@ -1029,9 +1148,9 @@ size_t clean_cloud_volume_read_cb(char *res, size_t sz, void* arg) char *time=strstr(pch, ",mtime:"); if (name && time) { *time='\0'; - _arg->parts->append(bstrdup(name)); + parts->append(bstrdup(name)); } else { - pm_strcpy(_arg->remain, pch); + pm_strcpy(remain, pch); } left -= strlen(pch)+1; pch = strtok (NULL, "\n"); @@ -1055,7 +1174,7 @@ bool generic_driver::clean_cloud_volume(const char *VolumeName, cleanup_cb_type arg.parts = &parts; POOLMEM *p= get_memory(block_size); *p = 0; - arg.remain = p; + arg.remain = &p; arg.cb = cb; arg.ctx = ctx; read_callback pcb; @@ -1063,7 +1182,7 @@ bool generic_driver::clean_cloud_volume(const char *VolumeName, cleanup_cb_type pcb.arg = (void*)&arg; /* list everything in the volume VolumeName */ int ret = call_fct("ls", VolumeName, "", &pcb, NULL, cancel_cb, err); - free_pool_memory(arg.remain); + free_pool_memory(*arg.remain); int rtn=0; int i; @@ -1085,27 +1204,29 @@ bool generic_driver::clean_cloud_volume(const char *VolumeName, cleanup_cb_type } struct get_cloud_volume_parts_list_read_cb_arg { - POOLMEM *remain; + POOLMEM **remain; ilist *parts; }; size_t get_cloud_volume_parts_list_read_cb(char *res, size_t sz, void* arg) { + ilist *parts = NULL; bool wrong_string = false; size_t left = sz; - + POOLMEM **remain = NULL; get_cloud_volume_parts_list_read_cb_arg *_arg = (get_cloud_volume_parts_list_read_cb_arg*)arg; - if (!_arg) { - return 0; + if (_arg) { + parts = _arg->parts; + remain = _arg->remain; } - if (_arg->parts) { + if (parts) { char * pch = strtok (res,"\n"); /* we enter the cb again and remaining string has not been processed */ - if (_arg->remain && strlen(_arg->remain) != 0) { - pm_strcat(_arg->remain, pch); - char *ext=strstr(_arg->remain, "part."), *size=strstr(_arg->remain, "size:"), *mtime=strstr(_arg->remain, "mtime:"); + if (remain && *remain && strlen(*remain) != 0) { + pm_strcat(remain, pch); + char *ext=strstr(*remain, "part."), *size=strstr(*remain, "size:"), *mtime=strstr(*remain, "mtime:"); if (ext && size && mtime) { cloud_part *part = (cloud_part*) malloc(sizeof(cloud_part)); part->index = str_to_uint64(&(ext[5])); @@ -1113,11 +1234,11 @@ size_t get_cloud_volume_parts_list_read_cb(char *res, size_t sz, void* arg) part->size = str_to_uint64((&(size[5]))); /* ***FIXME*** : wbn to retrieve SHA512 from cloud */ bmemzero(part->hash64, 64); - _arg->parts->put(part->index, part); + parts->put(part->index, part); } else { wrong_string = true; } - *(_arg->remain) = 0; + **remain = 0; left -= strlen(pch)+1; pch = strtok (NULL,"\n"); } @@ -1131,9 +1252,9 @@ size_t get_cloud_volume_parts_list_read_cb(char *res, size_t sz, void* arg) part->size = str_to_uint64((&(size[5]))); /* ***FIXME*** : wbn to retrieve SHA512 from cloud */ bmemzero(part->hash64, 64); - _arg->parts->put(part->index, part); + parts->put(part->index, part); } else { - pm_strcpy(_arg->remain, pch); + pm_strcpy(remain, pch); } left -= strlen(pch)+1; pch = strtok (NULL, "\n"); @@ -1158,17 +1279,17 @@ bool generic_driver::get_cloud_volume_parts_list(const char* volume_name, ilist arg.parts = parts; POOLMEM *p= get_memory(block_size); *p = 0; - arg.remain = p; + arg.remain = &p; read_callback pcb; pcb.fct = &get_cloud_volume_parts_list_read_cb; pcb.arg = (void*)&arg; int ret = call_fct("ls", volume_name, "part.", &pcb, NULL, cancel_cb, err); - free_pool_memory(arg.remain); + free_pool_memory(*arg.remain); return (ret == 0); } struct get_cloud_volume_list_read_cb_arg { - POOLMEM *remain; + POOLMEM **remain; alist *volumes; }; @@ -1187,28 +1308,31 @@ void get_cloud_volumes_list_read_cb_append_to_volumes(char* c, alist* volumes) { size_t get_cloud_volumes_list_read_cb(char* res, size_t size, void *arg) { + alist *volumes = NULL; + POOLMEM **remain = NULL; bool is_complete(res[size-1]=='\n'); get_cloud_volume_list_read_cb_arg *_arg = (get_cloud_volume_list_read_cb_arg*)arg; - if (!_arg) { - return 0; + if (_arg) { + volumes = _arg->volumes; + remain = _arg->remain; } - if (_arg->volumes) { + if (volumes) { /* do the actual process */ char * pch = strtok (res,"\n"); - if (_arg->remain && strlen(_arg->remain) != 0) { - pm_strcat(_arg->remain, pch); - get_cloud_volumes_list_read_cb_append_to_volumes(_arg->remain, _arg->volumes); + if (remain && *remain && strlen(*remain) != 0) { + pm_strcat(remain, pch); + get_cloud_volumes_list_read_cb_append_to_volumes(*remain, volumes); pch = strtok (NULL, "\n"); - *(_arg->remain) = 0; + *remain = 0; } while (pch != NULL) { - pm_strcpy(_arg->remain, pch); + pm_strcpy(*remain, pch); pch = strtok (NULL, "\n"); if (pch || is_complete) { - get_cloud_volumes_list_read_cb_append_to_volumes(_arg->remain, _arg->volumes); + get_cloud_volumes_list_read_cb_append_to_volumes(*remain, volumes); } } return size; @@ -1228,10 +1352,10 @@ bool generic_driver::get_cloud_volumes_list(alist *volumes, cancel_callback *can arg.volumes = volumes; POOLMEM *p= get_memory(block_size); *p = 0; - arg.remain = p; + arg.remain = &p; pcb.arg = (void*)&arg; int ret = call_fct("vol_ls", NULL, 0, &pcb, NULL, cancel_cb, err); - free_pool_memory(arg.remain); + free_pool_memory(*arg.remain); return (ret == 0); }