]> git.ipfire.org Git - thirdparty/bacula.git/commitdiff
glacier: Fix #9511 handle return values in case of glacier retry
authornorbert.bizet <norbert.bizet@baculasystems.com>
Thu, 22 Sep 2022 09:03:36 +0000 (05:03 -0400)
committerEric Bollengier <eric@baculasystems.com>
Thu, 14 Sep 2023 11:56:59 +0000 (13:56 +0200)
bacula/scripts/aws_cloud_driver.in
bacula/src/stored/generic_driver.c

index 1b04e69fed93da54aa07ab090af3421aa67cc9b3..c0234495a9f10610b389418f2a9cd3438800f6bd 100755 (executable)
@@ -31,7 +31,7 @@ import logging, traceback
 
 
 # RETRY DOWNLOAD
-RETRY_DOWNLOAD = 0xED
+RETRY_DOWNLOAD = 0x0D
 
 def vol_ls():
    try:
@@ -228,12 +228,18 @@ def wait_on_restore():
       if err:
          logging.error("wait_on_restore got error {0}".format(err))
          sys.stderr.write(err)
+         return 1
       if output:
          logging.info("wait_on_restore got ouput {0}".format(output))
          j = json.loads(output)
-         if "Restore" in j and j["Restore"] and j["Restore"]=='ongoing-request="true"':
-            logging.info("Ongoing restore detected")
-            return 0
+         if "Restore" in j and j["Restore"]:
+            if j["Restore"]=='ongoing-request="true"':
+               logging.info("Ongoing restore detected")
+               sys.stdout.write("WOR-INPROGRESS")
+            elif j["Restore"]=='ongoing-request="false"':
+               logging.info("NO Ongoing restore detected")
+               sys.stdout.write("WOR-DONE")
+      return 0
    except Exception as e:
       exc = traceback.format_exception_only(type(e), e)[0]
       sys.stderr.write(exc)
index a7347aeccc3360815dbb56380850c4e077caa310..18d839b6de24642423fe44008790d6a1888d56f7 100644 (file)
@@ -33,8 +33,9 @@
 const int64_t dbglvl = DT_CLOUD|50;
 const size_t block_size = 4096;
 const int TIMEOUT = 0;
-const int RETRY_DOWNLOAD = 0xED;
-
+const int RETRY_DOWNLOAD = 0x0D;
+const int MIN_RESTORE_CONTAINER_NAME_SIZE = 3;
+const int MAX_RESTORE_CONTAINER_NAME_SIZE = 63;
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -843,6 +844,20 @@ int generic_driver::copy_cloud_part_to_cache(transfer *xfer)
    ccb.fct = &copy_cloud_part_to_cache_cancel_cb;
    ccb.arg = (void*)xfer;
 
+   if (xfer) {
+      /* Azure restores on a different bucket, so the restore bucket is the one to be checked */
+      /* swap bucket to restore or swap it back to the original */
+      if (strlen(xfer->m_restore_bucket) != 0) {
+         pm_strcpy(bucket_env, "CLOUD_BUCKET=");
+         pm_strcat(bucket_env, NPRTB(xfer->m_restore_bucket));
+         envs[1] = bucket_env;
+      } else {
+         pm_strcpy(bucket_env, "CLOUD_BUCKET=");
+         pm_strcat(bucket_env, NPRTB(bucket_name));
+         envs[1] = bucket_env;
+      }
+   }
+
    bool use_cache_file_path = (strstr(driver_command, "was_cloud_driver") != NULL);
    if (!use_cache_file_path) {
       /* Create the cache file */
@@ -901,10 +916,23 @@ int generic_driver::copy_cloud_part_to_cache(transfer *xfer)
 
       free(fname);
 
-      if (ret==RETRY_DOWNLOAD) return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY;
+      if (ret==RETRY_DOWNLOAD)
+      {
+         /* reset xfer->m_message (MT9511)
+         Normally we dont do it in order to report as many errors as possible but 
+         retry is detected based on the return code (0x0D).
+         This code might be interpreted as an error thru bpipe ("Unknown error during program execvp" in berror.c), 
+         so to keep the log clean, we reset the error message in this very specific case. */
+         if (xfer && xfer->m_message) {
+            Dmsg3(dbglvl, "generic_driver::RETRY_DOWNLOAD purgin xfer message '%s' %s\\part%d.\n", xfer->m_message, xfer->m_volume_name, xfer->m_part);
+            *(xfer->m_message)='\0';
+         }
+         return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY;
+      }
+
       return (ret==0) ? CLOUD_DRIVER_COPY_PART_TO_CACHE_OK:CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
 
-   } else {
+   } else { /* Azure is assumed here. Make sure to adjust logic if other provider use direct write to cache*/
       int ret=call_fct("download",xfer->m_volume_name, xfer->m_part, NULL, NULL, &ccb, xfer->m_message, xfer->m_cache_fname);
       if (getenv("GENERATE_CLOUD_HASH") && (ret==0)) {
          /* compute SHA512 */
@@ -937,7 +965,54 @@ int generic_driver::copy_cloud_part_to_cache(transfer *xfer)
          bmemzero(xfer->m_hash64, 64);
       }
 
-      if (ret==RETRY_DOWNLOAD) return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY;
+      if (ret==RETRY_DOWNLOAD) {
+         /* the python driver will write the restoration bucket name into the cache file */
+         /* retrieve the restore bucket name */
+         char *fname = bstrdup(xfer->m_cache_fname);
+         FILE *file=bfopen(fname, "rb");
+         if (!file) {
+            berrno be;
+            Mmsg2(xfer->m_message, "Could not open output file %s. ERR=%s\n",
+                  fname, be.bstrerror());
+            free(fname);
+            return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
+         }
+
+         fseek(file, 0L, SEEK_END);
+         ssize_t size = ftell(file);
+         fseek(file, 0L, SEEK_SET);
+
+         /*Azure Container names must be from 3 through 63 characters long.*/
+         if (size < MIN_RESTORE_CONTAINER_NAME_SIZE ||
+             size > MAX_RESTORE_CONTAINER_NAME_SIZE) {
+            /* if we're out-of-bound, something is wrong: let's abort */
+            Mmsg1(xfer->m_message, "restore bucket name length %d is out of range. Aborting.\n", size);
+            free(fname);
+            return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
+         } else {
+            char restore_bucket_name[MAX_RESTORE_CONTAINER_NAME_SIZE+1];
+            ssize_t ret = fread(restore_bucket_name, 1, size, file);
+            if (ret != size) {
+               berrno be;
+               Mmsg2(xfer->m_message, "Could not close output file %s. ERR=%s\n",
+                     fname, be.bstrerror());
+               free(fname);
+               return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
+            }
+            restore_bucket_name[size] = '\0';
+            if (fclose(file) != 0) {
+               berrno be;
+               Mmsg2(xfer->m_message, "Could not close output file %s. ERR=%s\n",
+                     fname, be.bstrerror());
+               free(fname);
+               return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
+            }
+            free(fname);
+            xfer->set_restore_bucket(restore_bucket_name);
+            return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY;
+         }
+      }
+
       return (ret==0) ? CLOUD_DRIVER_COPY_PART_TO_CACHE_OK:CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
    }
    return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
@@ -950,14 +1025,55 @@ bool generic_driver::restore_cloud_object(transfer *xfer, const char *cloud_fnam
    /* not implemented */
    return false;
 }
+
+struct is_waiting_on_server_read_cb_arg {
+   POOLMEM *msg;
+};
+
+size_t is_waiting_on_server_read_cb(char *res, size_t size, void* arg)
+{
+   is_waiting_on_server_read_cb_arg*_arg = (is_waiting_on_server_read_cb_arg*)arg;
+   if (_arg) {
+      Mmsg(_arg->msg, "%s", res);
+   }
+   return size;
+}
+
 bool generic_driver::is_waiting_on_server(transfer *xfer)
 {
    Dmsg2(dbglvl, "generic_driver::is_waiting_on_server for %spart%d.\n", xfer->m_volume_name, xfer->m_part);
-   if (strstr(driver_command, "aws_cloud_driver") != NULL) {
+   /* only Amazon and Azure support restoration */
+   if (strstr(driver_command, "aws_cloud_driver") != NULL ||
+       strstr(driver_command, "was_cloud_driver") != NULL) {
       Dmsg0(dbglvl, "call_fct wait_on_restore\n");
-      int ret = call_fct("wait_on_restore",xfer->m_volume_name, xfer->m_part, NULL, NULL, NULL, xfer->m_message, NULL);
-      Dmsg1(dbglvl, "wait_on_restore returns %d\n", ret);
-      return (ret == 0);
+
+      if (xfer) {
+         /* Azure restores on a different bucket, so the restore bucket is the one to be checked */
+         /* swap bucket to restore or swap it back to the original */
+         if (strlen(xfer->m_restore_bucket) != 0) {
+            pm_strcpy(bucket_env, "CLOUD_BUCKET=");
+            pm_strcat(bucket_env, NPRTB(xfer->m_restore_bucket));
+            envs[1] = bucket_env;
+         } else {
+            pm_strcpy(bucket_env, "CLOUD_BUCKET=");
+            pm_strcat(bucket_env, NPRTB(bucket_name));
+            envs[1] = bucket_env;
+         }
+      }
+
+      POOLMEM *output = get_memory(block_size+1);
+      read_callback rcb;
+      rcb.fct = &is_waiting_on_server_read_cb;
+      is_waiting_on_server_read_cb_arg arg;
+      arg.msg = output;
+      rcb.arg = (void*)&arg;    
+
+      int ret = call_fct("wait_on_restore",xfer->m_volume_name, xfer->m_part, &rcb, NULL, NULL, xfer->m_message, NULL);
+      /* We report xfer->m_message here once */
+      Dmsg3(dbglvl, "wait_on_restore returns %d. output=%s error=%s\n", ret, output, xfer->m_message);
+      bool in_progress = (strcmp(output, "WOR-INPROGRESS") == 0);
+      free_pool_memory(output);
+      return (in_progress);
    }
    return false;
 }
@@ -991,7 +1107,7 @@ bool generic_driver::truncate_cloud_volume(const char *volume_name, ilist *trunc
 
 
 struct clean_cloud_volume_read_cb_arg {
-   POOLMEM *remain;
+   POOLMEM **remain;
    ilist *parts;
    cleanup_cb_type *cb;
    cleanup_ctx_type *ctx;
@@ -999,27 +1115,30 @@ struct clean_cloud_volume_read_cb_arg {
 
 size_t clean_cloud_volume_read_cb(char *res, size_t sz, void* arg)
 {
+   ilist *parts = NULL;
    bool wrong_string = false;
    size_t left = sz;
+   POOLMEM **remain = NULL;
    clean_cloud_volume_read_cb_arg *_arg = (clean_cloud_volume_read_cb_arg*)arg;
-   if (!_arg) {
-      return 0;
+   if (_arg) {
+      parts = _arg->parts;
+      remain = _arg->remain;
    }
 
-   if (_arg->parts) {
+   if (parts) {
       char * pch = strtok (res,"\n");
       /* we enter the cb again and remaining string has not been processed */
-      if (_arg->remain && strlen(_arg->remain) != 0) {
-         pm_strcat(_arg->remain, pch);
-         char *name=strstr(_arg->remain, "part");
-         char *time=strstr(_arg->remain, ",mtime:");
+      if (remain && strlen(*remain) != 0) {
+         pm_strcat(remain, pch);
+         char *name=strstr(*remain, "part");
+         char *time=strstr(*remain, ",mtime:");
          if (name && time) {
             *time='\0';
-            _arg->parts->append(bstrdup(name));
+            parts->append(bstrdup(name));
          } else {
             wrong_string = true;
          }
-         *(_arg->remain) = 0;
+         **remain = 0;
          left -= strlen(pch)+1;
          pch = strtok (NULL,"\n");
       }
@@ -1029,9 +1148,9 @@ size_t clean_cloud_volume_read_cb(char *res, size_t sz, void* arg)
          char *time=strstr(pch, ",mtime:");
          if (name && time) {
             *time='\0';
-            _arg->parts->append(bstrdup(name));
+            parts->append(bstrdup(name));
          } else {
-            pm_strcpy(_arg->remain, pch);
+            pm_strcpy(remain, pch);
          }
          left -= strlen(pch)+1;
          pch = strtok (NULL, "\n");
@@ -1055,7 +1174,7 @@ bool generic_driver::clean_cloud_volume(const char *VolumeName, cleanup_cb_type
    arg.parts = &parts;
    POOLMEM *p= get_memory(block_size);
    *p = 0;
-   arg.remain = p;
+   arg.remain = &p;
    arg.cb = cb;
    arg.ctx = ctx;
    read_callback pcb;
@@ -1063,7 +1182,7 @@ bool generic_driver::clean_cloud_volume(const char *VolumeName, cleanup_cb_type
    pcb.arg = (void*)&arg;
    /* list everything in the volume VolumeName */
    int ret = call_fct("ls", VolumeName, "", &pcb, NULL, cancel_cb, err);
-   free_pool_memory(arg.remain);
+   free_pool_memory(*arg.remain);
 
    int rtn=0;
    int i;
@@ -1085,27 +1204,29 @@ bool generic_driver::clean_cloud_volume(const char *VolumeName, cleanup_cb_type
 }
 
 struct get_cloud_volume_parts_list_read_cb_arg {
-   POOLMEM *remain;
+   POOLMEM **remain;
    ilist *parts;
 };
 
 
 size_t get_cloud_volume_parts_list_read_cb(char *res, size_t sz, void* arg)
 {
+   ilist *parts = NULL;
    bool wrong_string = false;
    size_t left = sz;
-
+   POOLMEM **remain = NULL;
    get_cloud_volume_parts_list_read_cb_arg *_arg = (get_cloud_volume_parts_list_read_cb_arg*)arg;
-   if (!_arg) {
-      return 0;
+   if (_arg) {
+      parts = _arg->parts;
+      remain = _arg->remain;
    }
 
-   if (_arg->parts) {
+   if (parts) {
       char * pch = strtok (res,"\n");
       /* we enter the cb again and remaining string has not been processed */
-      if (_arg->remain && strlen(_arg->remain) != 0) {
-         pm_strcat(_arg->remain, pch);
-         char *ext=strstr(_arg->remain, "part."), *size=strstr(_arg->remain, "size:"), *mtime=strstr(_arg->remain, "mtime:");
+      if (remain && *remain && strlen(*remain) != 0) {
+         pm_strcat(remain, pch);
+         char *ext=strstr(*remain, "part."), *size=strstr(*remain, "size:"), *mtime=strstr(*remain, "mtime:");
          if (ext && size && mtime) {
             cloud_part *part = (cloud_part*) malloc(sizeof(cloud_part));
             part->index = str_to_uint64(&(ext[5]));
@@ -1113,11 +1234,11 @@ size_t get_cloud_volume_parts_list_read_cb(char *res, size_t sz, void* arg)
             part->size = str_to_uint64((&(size[5])));
             /* ***FIXME*** : wbn to retrieve SHA512 from cloud */
             bmemzero(part->hash64, 64);
-            _arg->parts->put(part->index, part);
+            parts->put(part->index, part);
          } else {
             wrong_string = true;
          }
-         *(_arg->remain) = 0;
+         **remain = 0;
          left -= strlen(pch)+1;
          pch = strtok (NULL,"\n");
       }
@@ -1131,9 +1252,9 @@ size_t get_cloud_volume_parts_list_read_cb(char *res, size_t sz, void* arg)
             part->size = str_to_uint64((&(size[5])));
             /* ***FIXME*** : wbn to retrieve SHA512 from cloud */
             bmemzero(part->hash64, 64);
-            _arg->parts->put(part->index, part);
+            parts->put(part->index, part);
          } else {
-            pm_strcpy(_arg->remain, pch);
+            pm_strcpy(remain, pch);
          }
          left -= strlen(pch)+1;
          pch = strtok (NULL, "\n");
@@ -1158,17 +1279,17 @@ bool generic_driver::get_cloud_volume_parts_list(const char* volume_name, ilist
    arg.parts = parts;
    POOLMEM *p= get_memory(block_size);
    *p = 0;
-   arg.remain = p;
+   arg.remain = &p;
    read_callback pcb;
    pcb.fct = &get_cloud_volume_parts_list_read_cb;
    pcb.arg = (void*)&arg;
    int ret = call_fct("ls", volume_name, "part.", &pcb, NULL, cancel_cb, err);
-   free_pool_memory(arg.remain);
+   free_pool_memory(*arg.remain);
    return (ret == 0);
 }
 
 struct get_cloud_volume_list_read_cb_arg {
-   POOLMEM *remain;
+   POOLMEM **remain;
    alist *volumes;
 };
 
@@ -1187,28 +1308,31 @@ void get_cloud_volumes_list_read_cb_append_to_volumes(char* c, alist* volumes) {
 
 size_t get_cloud_volumes_list_read_cb(char* res, size_t size, void *arg)
 {
+   alist *volumes = NULL;
+   POOLMEM **remain = NULL;
    bool is_complete(res[size-1]=='\n');
    get_cloud_volume_list_read_cb_arg *_arg = (get_cloud_volume_list_read_cb_arg*)arg;
-   if (!_arg) {
-      return 0;
+   if (_arg) {
+      volumes = _arg->volumes;
+      remain = _arg->remain;
    }
 
-   if (_arg->volumes) {
+   if (volumes) {
       /* do the actual process */
       char * pch = strtok (res,"\n");
-      if (_arg->remain && strlen(_arg->remain) != 0) {
-         pm_strcat(_arg->remain, pch);
-         get_cloud_volumes_list_read_cb_append_to_volumes(_arg->remain, _arg->volumes);
+      if (remain && *remain && strlen(*remain) != 0) {
+         pm_strcat(remain, pch);
+         get_cloud_volumes_list_read_cb_append_to_volumes(*remain, volumes);
          pch = strtok (NULL, "\n");
-         *(_arg->remain) = 0;
+         *remain = 0;
       }
 
       while (pch != NULL)
       {
-         pm_strcpy(_arg->remain, pch);
+         pm_strcpy(*remain, pch);
          pch = strtok (NULL, "\n");
          if (pch || is_complete) {
-            get_cloud_volumes_list_read_cb_append_to_volumes(_arg->remain, _arg->volumes);
+            get_cloud_volumes_list_read_cb_append_to_volumes(*remain, volumes);
          }
       }
       return size;
@@ -1228,10 +1352,10 @@ bool generic_driver::get_cloud_volumes_list(alist *volumes, cancel_callback *can
    arg.volumes = volumes;
    POOLMEM *p= get_memory(block_size);
    *p = 0;
-   arg.remain = p;
+   arg.remain = &p;
    pcb.arg = (void*)&arg;
    int ret = call_fct("vol_ls", NULL, 0, &pcb, NULL, cancel_cb, err);
-   free_pool_memory(arg.remain);
+   free_pool_memory(*arg.remain);
    return (ret == 0);
 }