from inspect import stack
import logging
+
+# RETRY DOWNLOAD
+RETRY_DOWNLOAD = 0xED
+
def vol_ls():
try:
logging.info("enter vol_ls")
def download():
try:
logging.info("enter download")
- return call(["aws", "s3", "cp",
- os.path.join(cloud_path, volume, part), "-", "--only-show-errors"])
+ proc = Popen(["aws", "s3", "cp",
+ os.path.join(cloud_path, volume, part), "-", "--only-show-errors"], stdout=PIPE, stderr=PIPE, universal_newlines=True)
+ output,err = proc.communicate()
+ logging.debug("download proc communicate output:{0} , err:{1}".format(output,err))
+ if err.find("An error occurred (InvalidObjectState) when calling the GetObject operation") != -1:
+ restore()
+ return RETRY_DOWNLOAD
+ return proc.returncode
except OSError as o:
logging.exception("download OSError exception")
sys.stderr.write("{2} OSError({0}): {1}\n".format(o.errno, o.strerror, stack()[0][3]))
sys.stderr.write("{1} Generic exception: {0}\0".format(sys.exc_info()[0], stack()[0][3]))
return 1
+def restore():
+ try:
+ logging.info("enter restore {0}".format(os.path.join(cloud_path, volume, part)))
+ proc = Popen(["aws", "s3api", "restore-object",
+ #"--restore-request", "'{""Days": {0}, "GlacierJobParameters" : {"Tier" : "{1}"}}'.format(10,"Bulk"), #TODO: [BC-256] pass days and tier for restore
+ "--restore-request", "Days=10",
+ "--bucket", bucket,
+ "--key", os.path.join(volume, part)], stdout=PIPE, stderr=PIPE, universal_newlines=True)
+ output,err = proc.communicate()
+ logging.debug("restore proc communicate output:{0} , err:{1}".format(output,err))
+ if err:
+ logging.error("restore got error {0}".format(err))
+ sys.stderr.write(err)
+ if output:
+ logging.info("restore got ouput {0}".format(output))
+ sys.stdout.write(output)
+ return proc.returncode
+ except OSError as o:
+ logging.exception("restore OSError exception")
+ sys.stderr.write("{2} OSError({0}): {1}\0".format(o.errno, o.strerror, stack()[0][3]))
+ except ValueError as v:
+ logging.exception("restore ValueError exception")
+ sys.stderr.write("{2} Wrong Value({0}): {1}\0".format(v.errno, v.strerror, stack()[0][3]))
+ except:
+ logging.exception("restore Generic exception")
+ sys.stderr.write("{1} Generic exception: {0}\0".format(sys.exc_info()[0], stack()[0][3]))
+ return 1
+
+def wait_on_restore():
+ try:
+ logging.info("enter wait_on_restore {0}".format(os.path.join(cloud_path, volume, part)))
+ proc = Popen(["aws", "s3api", "head-object",
+ "--bucket", bucket,
+ "--key", os.path.join(volume, part)], stdout=PIPE, stderr=PIPE, universal_newlines=True)
+ output,err = proc.communicate()
+ logging.debug("wait_on_restore proc communicate output:{0} , err:{1}".format(output,err))
+ if err:
+ logging.error("wait_on_restore got error {0}".format(err))
+ sys.stderr.write(err)
+ if output:
+ logging.info("wait_on_restore got ouput {0}".format(output))
+ j = json.loads(output)
+ if "Restore" in j and j["Restore"] and j["Restore"]=='ongoing-request="true"':
+ logging.info("!Restore=ongoing-request detected in json!")
+ return 0
+ except OSError as o:
+ logging.exception("wait_on_restore OSError exception")
+ sys.stderr.write("{2} OSError({0}): {1}\0".format(o.errno, o.strerror, stack()[0][3]))
+ except ValueError as v:
+ logging.exception("wait_on_restore ValueError exception")
+ sys.stderr.write("{2} Wrong Value({0}): {1}\0".format(v.errno, v.strerror, stack()[0][3]))
+ except:
+ logging.exception("wait_on_restore Generic exception")
+ sys.stderr.write("{1} Generic exception: {0}\0".format(sys.exc_info()[0], stack()[0][3]))
+ return 1
+
if __name__ == '__main__':
# initialize the return code with a weird value
"download":download,
"delete":delete,
"upload":upload,
- "move":move
+ "move":move,
+ "wait_on_restore":wait_on_restore
}
if sys.argv[1] in switcher:
bool generic_driver::move_cloud_part(const char *VolumeName, uint32_t apart , const char *to, cancel_callback *cancel_cb, POOLMEM *&err, int& exists)
{
- /* retrieve the output message into err */
- read_callback rcb;
- rcb.fct = &move_cloud_part_read_cb;
- move_cloud_part_read_cb_arg arg;
- arg.msg = err;
- rcb.arg = (void*)&arg;
-
- int ret = call_fct("move", VolumeName, (int)apart, &rcb, NULL, cancel_cb, err, to);
- /* 0 = OK (either because the part has been moved or because it doesn't exists) */
- if (ret == 0) {
- /* copied part is return by the read callback */
- exists = strlen(err);
- return true;
- }
+ /* retrieve the output message into err */
+ read_callback rcb;
+ rcb.fct = &move_cloud_part_read_cb;
+ move_cloud_part_read_cb_arg arg;
+ arg.msg = err;
+ rcb.arg = (void*)&arg;
+
+ int ret = call_fct("move", VolumeName, (int)apart, &rcb, NULL, cancel_cb, err, to);
+ /* 0 = OK (either because the part has been moved or because it doesn't exists) */
+ if (ret == 0) {
+ /* copied part is return by the read callback */
+ exists = strlen(err);
+ return true;
+ }
- return false;
+ return false;
}
}
free(fname);
+
+ if (ret==0xED) return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY;
return (ret==0) ? CLOUD_DRIVER_COPY_PART_TO_CACHE_OK:CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
} else {
} else {
bmemzero(xfer->m_hash64, 64);
}
+
+ if (ret==0xED) return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY;
return (ret==0) ? CLOUD_DRIVER_COPY_PART_TO_CACHE_OK:CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
}
return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
}
bool generic_driver::is_waiting_on_server(transfer *xfer)
{
+ Dmsg2(dbglvl, "generic_driver::is_waiting_on_server for %spart%d.\n", xfer->m_volume_name, xfer->m_part);
+ if (strstr(driver_command, "aws_cloud_driver") != NULL) {
+ Dmsg0(dbglvl, "call_fct wait_on_restore\n");
+ int ret = call_fct("wait_on_restore",xfer->m_volume_name, xfer->m_part, NULL, NULL, NULL, xfer->m_message, NULL);
+ Dmsg1(dbglvl, "wait_on_restore returns %d\n", ret);
+ return (ret == 0);
+ }
return false;
}