]> git.ipfire.org Git - thirdparty/bacula.git/commitdiff
cloud: add restore to generic_driver and aws_cloud_driver
authornorbert.bizet <norbert.bizet@baculasystems.com>
Thu, 31 Mar 2022 07:03:04 +0000 (03:03 -0400)
committerEric Bollengier <eric@baculasystems.com>
Thu, 14 Sep 2023 11:56:58 +0000 (13:56 +0200)
bacula/scripts/aws_cloud_driver.in
bacula/src/stored/generic_driver.c

index 08116333be129e7fa5719b85ab64b927033673bb..eaa87cae211b7414a277ac465ce7ef9c9e492080 100755 (executable)
@@ -29,6 +29,10 @@ from multiprocessing import Pool, cpu_count
 from inspect import stack
 import logging
 
+
+# RETRY DOWNLOAD
+RETRY_DOWNLOAD = 0xED
+
 def vol_ls():
    try:
       logging.info("enter vol_ls")
@@ -103,8 +107,14 @@ def ls():
 def download():
    try:
       logging.info("enter download")
-      return call(["aws", "s3", "cp",
-            os.path.join(cloud_path, volume, part), "-", "--only-show-errors"])
+      proc = Popen(["aws", "s3", "cp",
+            os.path.join(cloud_path, volume, part), "-", "--only-show-errors"], stdout=PIPE, stderr=PIPE, universal_newlines=True)
+      output,err = proc.communicate()
+      logging.debug("download proc communicate output:{0} , err:{1}".format(output,err))
+      if err.find("An error occurred (InvalidObjectState) when calling the GetObject operation") != -1:
+         restore()
+         return RETRY_DOWNLOAD
+      return proc.returncode
    except OSError as o:
       logging.exception("download OSError exception")
       sys.stderr.write("{2} OSError({0}): {1}\n".format(o.errno, o.strerror, stack()[0][3]))
@@ -220,6 +230,62 @@ def move():
       sys.stderr.write("{1} Generic exception: {0}\0".format(sys.exc_info()[0], stack()[0][3]))
    return 1
 
+def restore():
+   try:
+      logging.info("enter restore {0}".format(os.path.join(cloud_path, volume, part)))
+      proc = Popen(["aws", "s3api", "restore-object", 
+                    #"--restore-request", "'{""Days": {0}, "GlacierJobParameters" : {"Tier" : "{1}"}}'.format(10,"Bulk"), #TODO: [BC-256] pass days and tier for restore
+                    "--restore-request", "Days=10",
+                    "--bucket", bucket,
+                    "--key", os.path.join(volume, part)], stdout=PIPE, stderr=PIPE, universal_newlines=True) 
+      output,err = proc.communicate()
+      logging.debug("restore proc communicate output:{0} , err:{1}".format(output,err))
+      if err:
+         logging.error("restore got error {0}".format(err))
+         sys.stderr.write(err)
+      if output:
+         logging.info("restore got ouput {0}".format(output))
+         sys.stdout.write(output)
+      return proc.returncode
+   except OSError as o:
+      logging.exception("restore OSError exception")
+      sys.stderr.write("{2} OSError({0}): {1}\0".format(o.errno, o.strerror, stack()[0][3]))
+   except ValueError as v:
+      logging.exception("restore ValueError exception")
+      sys.stderr.write("{2} Wrong Value({0}): {1}\0".format(v.errno, v.strerror, stack()[0][3]))
+   except:
+      logging.exception("restore Generic exception")
+      sys.stderr.write("{1} Generic exception: {0}\0".format(sys.exc_info()[0], stack()[0][3]))
+   return 1
+
+def wait_on_restore():
+   try:
+      logging.info("enter wait_on_restore {0}".format(os.path.join(cloud_path, volume, part)))
+      proc = Popen(["aws", "s3api", "head-object", 
+                    "--bucket", bucket,
+                    "--key", os.path.join(volume, part)], stdout=PIPE, stderr=PIPE, universal_newlines=True) 
+      output,err = proc.communicate()
+      logging.debug("wait_on_restore proc communicate output:{0} , err:{1}".format(output,err))
+      if err:
+         logging.error("wait_on_restore got error {0}".format(err))
+         sys.stderr.write(err)
+      if output:
+         logging.info("wait_on_restore got ouput {0}".format(output))
+         j = json.loads(output)
+         if "Restore" in j and j["Restore"] and j["Restore"]=='ongoing-request="true"':
+            logging.info("!Restore=ongoing-request detected in json!")
+            return 0
+   except OSError as o:
+      logging.exception("wait_on_restore OSError exception")
+      sys.stderr.write("{2} OSError({0}): {1}\0".format(o.errno, o.strerror, stack()[0][3]))
+   except ValueError as v:
+      logging.exception("wait_on_restore ValueError exception")
+      sys.stderr.write("{2} Wrong Value({0}): {1}\0".format(v.errno, v.strerror, stack()[0][3]))
+   except:
+      logging.exception("wait_on_restore Generic exception")
+      sys.stderr.write("{1} Generic exception: {0}\0".format(sys.exc_info()[0], stack()[0][3]))
+   return 1
+
 
 if __name__ == '__main__':
    # initialize the return code with a weird value
@@ -265,7 +331,8 @@ if __name__ == '__main__':
          "download":download,
          "delete":delete,
          "upload":upload,
-         "move":move
+         "move":move,
+         "wait_on_restore":wait_on_restore
       }
 
       if sys.argv[1] in switcher:
index d70a7e3e99a7544379531295ef82d92018b40a3b..0c4bf90f0f89f25bafbe65f9620268f5412fce42 100644 (file)
@@ -757,22 +757,22 @@ size_t move_cloud_part_read_cb(char *res, size_t size, void* arg)
 
 bool generic_driver::move_cloud_part(const char *VolumeName, uint32_t apart , const char *to, cancel_callback *cancel_cb, POOLMEM *&err, int& exists)
 {
-      /* retrieve the output message into err */
-      read_callback rcb;
-      rcb.fct = &move_cloud_part_read_cb;
-      move_cloud_part_read_cb_arg arg;
-      arg.msg = err;
-      rcb.arg = (void*)&arg;
-
-      int ret = call_fct("move", VolumeName, (int)apart, &rcb, NULL, cancel_cb, err, to);
-      /* 0 = OK (either because the part has been moved or because it doesn't exists) */
-      if (ret == 0) {
-         /* copied part is return by the read callback */
-         exists = strlen(err);
-         return true;
-      }
+   /* retrieve the output message into err */
+   read_callback rcb;
+   rcb.fct = &move_cloud_part_read_cb;
+   move_cloud_part_read_cb_arg arg;
+   arg.msg = err;
+   rcb.arg = (void*)&arg;
+
+   int ret = call_fct("move", VolumeName, (int)apart, &rcb, NULL, cancel_cb, err, to);
+   /* 0 = OK (either because the part has been moved or because it doesn't exists) */
+   if (ret == 0) {
+      /* copied part is return by the read callback */
+      exists = strlen(err);
+      return true;
+   }
 
-      return false;
+   return false;
 }
 
 
@@ -871,6 +871,8 @@ int generic_driver::copy_cloud_part_to_cache(transfer *xfer)
       }
 
       free(fname);
+
+      if (ret==0xED) return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY;
       return (ret==0) ? CLOUD_DRIVER_COPY_PART_TO_CACHE_OK:CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
 
    } else {
@@ -905,6 +907,8 @@ int generic_driver::copy_cloud_part_to_cache(transfer *xfer)
       } else {
          bmemzero(xfer->m_hash64, 64);
       }
+
+      if (ret==0xED) return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY;
       return (ret==0) ? CLOUD_DRIVER_COPY_PART_TO_CACHE_OK:CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
    }
    return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
@@ -919,6 +923,13 @@ bool generic_driver::restore_cloud_object(transfer *xfer, const char *cloud_fnam
 }
 bool generic_driver::is_waiting_on_server(transfer *xfer)
 {
+   Dmsg2(dbglvl, "generic_driver::is_waiting_on_server for %spart%d.\n", xfer->m_volume_name, xfer->m_part);
+   if (strstr(driver_command, "aws_cloud_driver") != NULL) {
+      Dmsg0(dbglvl, "call_fct wait_on_restore\n");
+      int ret = call_fct("wait_on_restore",xfer->m_volume_name, xfer->m_part, NULL, NULL, NULL, xfer->m_message, NULL);
+      Dmsg1(dbglvl, "wait_on_restore returns %d\n", ret);
+      return (ret == 0);
+   }
    return false;
 }