From: norbert.bizet Date: Thu, 31 Mar 2022 07:03:04 +0000 (-0400) Subject: cloud: add restore to generic_driver and aws_cloud_driver X-Git-Tag: Beta-15.0.0~603 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=97121a2c300f344dd2a9baf40c90f4b6cedd8f4f;p=thirdparty%2Fbacula.git cloud: add restore to generic_driver and aws_cloud_driver --- diff --git a/bacula/scripts/aws_cloud_driver.in b/bacula/scripts/aws_cloud_driver.in index 08116333b..eaa87cae2 100755 --- a/bacula/scripts/aws_cloud_driver.in +++ b/bacula/scripts/aws_cloud_driver.in @@ -29,6 +29,10 @@ from multiprocessing import Pool, cpu_count from inspect import stack import logging + +# RETRY DOWNLOAD +RETRY_DOWNLOAD = 0xED + def vol_ls(): try: logging.info("enter vol_ls") @@ -103,8 +107,14 @@ def ls(): def download(): try: logging.info("enter download") - return call(["aws", "s3", "cp", - os.path.join(cloud_path, volume, part), "-", "--only-show-errors"]) + proc = Popen(["aws", "s3", "cp", + os.path.join(cloud_path, volume, part), "-", "--only-show-errors"], stdout=PIPE, stderr=PIPE, universal_newlines=True) + output,err = proc.communicate() + logging.debug("download proc communicate output:{0} , err:{1}".format(output,err)) + if err.find("An error occurred (InvalidObjectState) when calling the GetObject operation") != -1: + restore() + return RETRY_DOWNLOAD + return proc.returncode except OSError as o: logging.exception("download OSError exception") sys.stderr.write("{2} OSError({0}): {1}\n".format(o.errno, o.strerror, stack()[0][3])) @@ -220,6 +230,62 @@ def move(): sys.stderr.write("{1} Generic exception: {0}\0".format(sys.exc_info()[0], stack()[0][3])) return 1 +def restore(): + try: + logging.info("enter restore {0}".format(os.path.join(cloud_path, volume, part))) + proc = Popen(["aws", "s3api", "restore-object", + #"--restore-request", "'{""Days": {0}, "GlacierJobParameters" : {"Tier" : "{1}"}}'.format(10,"Bulk"), #TODO: [BC-256] pass days and tier for restore + "--restore-request", "Days=10", + "--bucket", bucket, + "--key", os.path.join(volume, part)], stdout=PIPE, stderr=PIPE, universal_newlines=True) + output,err = proc.communicate() + logging.debug("restore proc communicate output:{0} , err:{1}".format(output,err)) + if err: + logging.error("restore got error {0}".format(err)) + sys.stderr.write(err) + if output: + logging.info("restore got ouput {0}".format(output)) + sys.stdout.write(output) + return proc.returncode + except OSError as o: + logging.exception("restore OSError exception") + sys.stderr.write("{2} OSError({0}): {1}\0".format(o.errno, o.strerror, stack()[0][3])) + except ValueError as v: + logging.exception("restore ValueError exception") + sys.stderr.write("{2} Wrong Value({0}): {1}\0".format(v.errno, v.strerror, stack()[0][3])) + except: + logging.exception("restore Generic exception") + sys.stderr.write("{1} Generic exception: {0}\0".format(sys.exc_info()[0], stack()[0][3])) + return 1 + +def wait_on_restore(): + try: + logging.info("enter wait_on_restore {0}".format(os.path.join(cloud_path, volume, part))) + proc = Popen(["aws", "s3api", "head-object", + "--bucket", bucket, + "--key", os.path.join(volume, part)], stdout=PIPE, stderr=PIPE, universal_newlines=True) + output,err = proc.communicate() + logging.debug("wait_on_restore proc communicate output:{0} , err:{1}".format(output,err)) + if err: + logging.error("wait_on_restore got error {0}".format(err)) + sys.stderr.write(err) + if output: + logging.info("wait_on_restore got ouput {0}".format(output)) + j = json.loads(output) + if "Restore" in j and j["Restore"] and j["Restore"]=='ongoing-request="true"': + logging.info("!Restore=ongoing-request detected in json!") + return 0 + except OSError as o: + logging.exception("wait_on_restore OSError exception") + sys.stderr.write("{2} OSError({0}): {1}\0".format(o.errno, o.strerror, stack()[0][3])) + except ValueError as v: + logging.exception("wait_on_restore ValueError exception") + sys.stderr.write("{2} Wrong Value({0}): {1}\0".format(v.errno, v.strerror, stack()[0][3])) + except: + logging.exception("wait_on_restore Generic exception") + sys.stderr.write("{1} Generic exception: {0}\0".format(sys.exc_info()[0], stack()[0][3])) + return 1 + if __name__ == '__main__': # initialize the return code with a weird value @@ -265,7 +331,8 @@ if __name__ == '__main__': "download":download, "delete":delete, "upload":upload, - "move":move + "move":move, + "wait_on_restore":wait_on_restore } if sys.argv[1] in switcher: diff --git a/bacula/src/stored/generic_driver.c b/bacula/src/stored/generic_driver.c index d70a7e3e9..0c4bf90f0 100644 --- a/bacula/src/stored/generic_driver.c +++ b/bacula/src/stored/generic_driver.c @@ -757,22 +757,22 @@ size_t move_cloud_part_read_cb(char *res, size_t size, void* arg) bool generic_driver::move_cloud_part(const char *VolumeName, uint32_t apart , const char *to, cancel_callback *cancel_cb, POOLMEM *&err, int& exists) { - /* retrieve the output message into err */ - read_callback rcb; - rcb.fct = &move_cloud_part_read_cb; - move_cloud_part_read_cb_arg arg; - arg.msg = err; - rcb.arg = (void*)&arg; - - int ret = call_fct("move", VolumeName, (int)apart, &rcb, NULL, cancel_cb, err, to); - /* 0 = OK (either because the part has been moved or because it doesn't exists) */ - if (ret == 0) { - /* copied part is return by the read callback */ - exists = strlen(err); - return true; - } + /* retrieve the output message into err */ + read_callback rcb; + rcb.fct = &move_cloud_part_read_cb; + move_cloud_part_read_cb_arg arg; + arg.msg = err; + rcb.arg = (void*)&arg; + + int ret = call_fct("move", VolumeName, (int)apart, &rcb, NULL, cancel_cb, err, to); + /* 0 = OK (either because the part has been moved or because it doesn't exists) */ + if (ret == 0) { + /* copied part is return by the read callback */ + exists = strlen(err); + return true; + } - return false; + return false; } @@ -871,6 +871,8 @@ int generic_driver::copy_cloud_part_to_cache(transfer *xfer) } free(fname); + + if (ret==0xED) return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY; return (ret==0) ? CLOUD_DRIVER_COPY_PART_TO_CACHE_OK:CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR; } else { @@ -905,6 +907,8 @@ int generic_driver::copy_cloud_part_to_cache(transfer *xfer) } else { bmemzero(xfer->m_hash64, 64); } + + if (ret==0xED) return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY; return (ret==0) ? CLOUD_DRIVER_COPY_PART_TO_CACHE_OK:CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR; } return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR; @@ -919,6 +923,13 @@ bool generic_driver::restore_cloud_object(transfer *xfer, const char *cloud_fnam } bool generic_driver::is_waiting_on_server(transfer *xfer) { + Dmsg2(dbglvl, "generic_driver::is_waiting_on_server for %spart%d.\n", xfer->m_volume_name, xfer->m_part); + if (strstr(driver_command, "aws_cloud_driver") != NULL) { + Dmsg0(dbglvl, "call_fct wait_on_restore\n"); + int ret = call_fct("wait_on_restore",xfer->m_volume_name, xfer->m_part, NULL, NULL, NULL, xfer->m_message, NULL); + Dmsg1(dbglvl, "wait_on_restore returns %d\n", ret); + return (ret == 0); + } return false; }