const int64_t dbglvl = DT_CLOUD|50;
const size_t block_size = 4096;
const int TIMEOUT = 0;
-const int RETRY_DOWNLOAD = 0xED;
-
+const int RETRY_DOWNLOAD = 0x0D;
+const int MIN_RESTORE_CONTAINER_NAME_SIZE = 3;
+const int MAX_RESTORE_CONTAINER_NAME_SIZE = 63;
#ifdef __cplusplus
extern "C" {
#endif
ccb.fct = ©_cloud_part_to_cache_cancel_cb;
ccb.arg = (void*)xfer;
+ if (xfer) {
+ /* Azure restores on a different bucket, so the restore bucket is the one to be checked */
+ /* swap bucket to restore or swap it back to the original */
+ if (strlen(xfer->m_restore_bucket) != 0) {
+ pm_strcpy(bucket_env, "CLOUD_BUCKET=");
+ pm_strcat(bucket_env, NPRTB(xfer->m_restore_bucket));
+ envs[1] = bucket_env;
+ } else {
+ pm_strcpy(bucket_env, "CLOUD_BUCKET=");
+ pm_strcat(bucket_env, NPRTB(bucket_name));
+ envs[1] = bucket_env;
+ }
+ }
+
bool use_cache_file_path = (strstr(driver_command, "was_cloud_driver") != NULL);
if (!use_cache_file_path) {
/* Create the cache file */
free(fname);
- if (ret==RETRY_DOWNLOAD) return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY;
+ if (ret==RETRY_DOWNLOAD)
+ {
+ /* reset xfer->m_message (MT9511)
+ Normally we dont do it in order to report as many errors as possible but
+ retry is detected based on the return code (0x0D).
+ This code might be interpreted as an error thru bpipe ("Unknown error during program execvp" in berror.c),
+ so to keep the log clean, we reset the error message in this very specific case. */
+ if (xfer && xfer->m_message) {
+ Dmsg3(dbglvl, "generic_driver::RETRY_DOWNLOAD purgin xfer message '%s' %s\\part%d.\n", xfer->m_message, xfer->m_volume_name, xfer->m_part);
+ *(xfer->m_message)='\0';
+ }
+ return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY;
+ }
+
return (ret==0) ? CLOUD_DRIVER_COPY_PART_TO_CACHE_OK:CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
- } else {
+ } else { /* Azure is assumed here. Make sure to adjust logic if other provider use direct write to cache*/
int ret=call_fct("download",xfer->m_volume_name, xfer->m_part, NULL, NULL, &ccb, xfer->m_message, xfer->m_cache_fname);
if (getenv("GENERATE_CLOUD_HASH") && (ret==0)) {
/* compute SHA512 */
bmemzero(xfer->m_hash64, 64);
}
- if (ret==RETRY_DOWNLOAD) return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY;
+ if (ret==RETRY_DOWNLOAD) {
+ /* the python driver will write the restoration bucket name into the cache file */
+ /* retrieve the restore bucket name */
+ char *fname = bstrdup(xfer->m_cache_fname);
+ FILE *file=bfopen(fname, "rb");
+ if (!file) {
+ berrno be;
+ Mmsg2(xfer->m_message, "Could not open output file %s. ERR=%s\n",
+ fname, be.bstrerror());
+ free(fname);
+ return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
+ }
+
+ fseek(file, 0L, SEEK_END);
+ ssize_t size = ftell(file);
+ fseek(file, 0L, SEEK_SET);
+
+ /*Azure Container names must be from 3 through 63 characters long.*/
+ if (size < MIN_RESTORE_CONTAINER_NAME_SIZE ||
+ size > MAX_RESTORE_CONTAINER_NAME_SIZE) {
+ /* if we're out-of-bound, something is wrong: let's abort */
+ Mmsg1(xfer->m_message, "restore bucket name length %d is out of range. Aborting.\n", size);
+ free(fname);
+ return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
+ } else {
+ char restore_bucket_name[MAX_RESTORE_CONTAINER_NAME_SIZE+1];
+ ssize_t ret = fread(restore_bucket_name, 1, size, file);
+ if (ret != size) {
+ berrno be;
+ Mmsg2(xfer->m_message, "Could not close output file %s. ERR=%s\n",
+ fname, be.bstrerror());
+ free(fname);
+ return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
+ }
+ restore_bucket_name[size] = '\0';
+ if (fclose(file) != 0) {
+ berrno be;
+ Mmsg2(xfer->m_message, "Could not close output file %s. ERR=%s\n",
+ fname, be.bstrerror());
+ free(fname);
+ return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
+ }
+ free(fname);
+ xfer->set_restore_bucket(restore_bucket_name);
+ return CLOUD_DRIVER_COPY_PART_TO_CACHE_RETRY;
+ }
+ }
+
return (ret==0) ? CLOUD_DRIVER_COPY_PART_TO_CACHE_OK:CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
}
return CLOUD_DRIVER_COPY_PART_TO_CACHE_ERROR;
/* not implemented */
return false;
}
+
+struct is_waiting_on_server_read_cb_arg {
+ POOLMEM *msg;
+};
+
+size_t is_waiting_on_server_read_cb(char *res, size_t size, void* arg)
+{
+ is_waiting_on_server_read_cb_arg*_arg = (is_waiting_on_server_read_cb_arg*)arg;
+ if (_arg) {
+ Mmsg(_arg->msg, "%s", res);
+ }
+ return size;
+}
+
bool generic_driver::is_waiting_on_server(transfer *xfer)
{
Dmsg2(dbglvl, "generic_driver::is_waiting_on_server for %spart%d.\n", xfer->m_volume_name, xfer->m_part);
- if (strstr(driver_command, "aws_cloud_driver") != NULL) {
+ /* only Amazon and Azure support restoration */
+ if (strstr(driver_command, "aws_cloud_driver") != NULL ||
+ strstr(driver_command, "was_cloud_driver") != NULL) {
Dmsg0(dbglvl, "call_fct wait_on_restore\n");
- int ret = call_fct("wait_on_restore",xfer->m_volume_name, xfer->m_part, NULL, NULL, NULL, xfer->m_message, NULL);
- Dmsg1(dbglvl, "wait_on_restore returns %d\n", ret);
- return (ret == 0);
+
+ if (xfer) {
+ /* Azure restores on a different bucket, so the restore bucket is the one to be checked */
+ /* swap bucket to restore or swap it back to the original */
+ if (strlen(xfer->m_restore_bucket) != 0) {
+ pm_strcpy(bucket_env, "CLOUD_BUCKET=");
+ pm_strcat(bucket_env, NPRTB(xfer->m_restore_bucket));
+ envs[1] = bucket_env;
+ } else {
+ pm_strcpy(bucket_env, "CLOUD_BUCKET=");
+ pm_strcat(bucket_env, NPRTB(bucket_name));
+ envs[1] = bucket_env;
+ }
+ }
+
+ POOLMEM *output = get_memory(block_size+1);
+ read_callback rcb;
+ rcb.fct = &is_waiting_on_server_read_cb;
+ is_waiting_on_server_read_cb_arg arg;
+ arg.msg = output;
+ rcb.arg = (void*)&arg;
+
+ int ret = call_fct("wait_on_restore",xfer->m_volume_name, xfer->m_part, &rcb, NULL, NULL, xfer->m_message, NULL);
+ /* We report xfer->m_message here once */
+ Dmsg3(dbglvl, "wait_on_restore returns %d. output=%s error=%s\n", ret, output, xfer->m_message);
+ bool in_progress = (strcmp(output, "WOR-INPROGRESS") == 0);
+ free_pool_memory(output);
+ return (in_progress);
}
return false;
}
struct clean_cloud_volume_read_cb_arg {
- POOLMEM *remain;
+ POOLMEM **remain;
ilist *parts;
cleanup_cb_type *cb;
cleanup_ctx_type *ctx;
size_t clean_cloud_volume_read_cb(char *res, size_t sz, void* arg)
{
+ ilist *parts = NULL;
bool wrong_string = false;
size_t left = sz;
+ POOLMEM **remain = NULL;
clean_cloud_volume_read_cb_arg *_arg = (clean_cloud_volume_read_cb_arg*)arg;
- if (!_arg) {
- return 0;
+ if (_arg) {
+ parts = _arg->parts;
+ remain = _arg->remain;
}
- if (_arg->parts) {
+ if (parts) {
char * pch = strtok (res,"\n");
/* we enter the cb again and remaining string has not been processed */
- if (_arg->remain && strlen(_arg->remain) != 0) {
- pm_strcat(_arg->remain, pch);
- char *name=strstr(_arg->remain, "part");
- char *time=strstr(_arg->remain, ",mtime:");
+ if (remain && strlen(*remain) != 0) {
+ pm_strcat(remain, pch);
+ char *name=strstr(*remain, "part");
+ char *time=strstr(*remain, ",mtime:");
if (name && time) {
*time='\0';
- _arg->parts->append(bstrdup(name));
+ parts->append(bstrdup(name));
} else {
wrong_string = true;
}
- *(_arg->remain) = 0;
+ **remain = 0;
left -= strlen(pch)+1;
pch = strtok (NULL,"\n");
}
char *time=strstr(pch, ",mtime:");
if (name && time) {
*time='\0';
- _arg->parts->append(bstrdup(name));
+ parts->append(bstrdup(name));
} else {
- pm_strcpy(_arg->remain, pch);
+ pm_strcpy(remain, pch);
}
left -= strlen(pch)+1;
pch = strtok (NULL, "\n");
arg.parts = &parts;
POOLMEM *p= get_memory(block_size);
*p = 0;
- arg.remain = p;
+ arg.remain = &p;
arg.cb = cb;
arg.ctx = ctx;
read_callback pcb;
pcb.arg = (void*)&arg;
/* list everything in the volume VolumeName */
int ret = call_fct("ls", VolumeName, "", &pcb, NULL, cancel_cb, err);
- free_pool_memory(arg.remain);
+ free_pool_memory(*arg.remain);
int rtn=0;
int i;
}
struct get_cloud_volume_parts_list_read_cb_arg {
- POOLMEM *remain;
+ POOLMEM **remain;
ilist *parts;
};
size_t get_cloud_volume_parts_list_read_cb(char *res, size_t sz, void* arg)
{
+ ilist *parts = NULL;
bool wrong_string = false;
size_t left = sz;
-
+ POOLMEM **remain = NULL;
get_cloud_volume_parts_list_read_cb_arg *_arg = (get_cloud_volume_parts_list_read_cb_arg*)arg;
- if (!_arg) {
- return 0;
+ if (_arg) {
+ parts = _arg->parts;
+ remain = _arg->remain;
}
- if (_arg->parts) {
+ if (parts) {
char * pch = strtok (res,"\n");
/* we enter the cb again and remaining string has not been processed */
- if (_arg->remain && strlen(_arg->remain) != 0) {
- pm_strcat(_arg->remain, pch);
- char *ext=strstr(_arg->remain, "part."), *size=strstr(_arg->remain, "size:"), *mtime=strstr(_arg->remain, "mtime:");
+ if (remain && *remain && strlen(*remain) != 0) {
+ pm_strcat(remain, pch);
+ char *ext=strstr(*remain, "part."), *size=strstr(*remain, "size:"), *mtime=strstr(*remain, "mtime:");
if (ext && size && mtime) {
cloud_part *part = (cloud_part*) malloc(sizeof(cloud_part));
part->index = str_to_uint64(&(ext[5]));
part->size = str_to_uint64((&(size[5])));
/* ***FIXME*** : wbn to retrieve SHA512 from cloud */
bmemzero(part->hash64, 64);
- _arg->parts->put(part->index, part);
+ parts->put(part->index, part);
} else {
wrong_string = true;
}
- *(_arg->remain) = 0;
+ **remain = 0;
left -= strlen(pch)+1;
pch = strtok (NULL,"\n");
}
part->size = str_to_uint64((&(size[5])));
/* ***FIXME*** : wbn to retrieve SHA512 from cloud */
bmemzero(part->hash64, 64);
- _arg->parts->put(part->index, part);
+ parts->put(part->index, part);
} else {
- pm_strcpy(_arg->remain, pch);
+ pm_strcpy(remain, pch);
}
left -= strlen(pch)+1;
pch = strtok (NULL, "\n");
arg.parts = parts;
POOLMEM *p= get_memory(block_size);
*p = 0;
- arg.remain = p;
+ arg.remain = &p;
read_callback pcb;
pcb.fct = &get_cloud_volume_parts_list_read_cb;
pcb.arg = (void*)&arg;
int ret = call_fct("ls", volume_name, "part.", &pcb, NULL, cancel_cb, err);
- free_pool_memory(arg.remain);
+ free_pool_memory(*arg.remain);
return (ret == 0);
}
struct get_cloud_volume_list_read_cb_arg {
- POOLMEM *remain;
+ POOLMEM **remain;
alist *volumes;
};
size_t get_cloud_volumes_list_read_cb(char* res, size_t size, void *arg)
{
+ alist *volumes = NULL;
+ POOLMEM **remain = NULL;
bool is_complete(res[size-1]=='\n');
get_cloud_volume_list_read_cb_arg *_arg = (get_cloud_volume_list_read_cb_arg*)arg;
- if (!_arg) {
- return 0;
+ if (_arg) {
+ volumes = _arg->volumes;
+ remain = _arg->remain;
}
- if (_arg->volumes) {
+ if (volumes) {
/* do the actual process */
char * pch = strtok (res,"\n");
- if (_arg->remain && strlen(_arg->remain) != 0) {
- pm_strcat(_arg->remain, pch);
- get_cloud_volumes_list_read_cb_append_to_volumes(_arg->remain, _arg->volumes);
+ if (remain && *remain && strlen(*remain) != 0) {
+ pm_strcat(remain, pch);
+ get_cloud_volumes_list_read_cb_append_to_volumes(*remain, volumes);
pch = strtok (NULL, "\n");
- *(_arg->remain) = 0;
+ *remain = 0;
}
while (pch != NULL)
{
- pm_strcpy(_arg->remain, pch);
+ pm_strcpy(*remain, pch);
pch = strtok (NULL, "\n");
if (pch || is_complete) {
- get_cloud_volumes_list_read_cb_append_to_volumes(_arg->remain, _arg->volumes);
+ get_cloud_volumes_list_read_cb_append_to_volumes(*remain, volumes);
}
}
return size;
arg.volumes = volumes;
POOLMEM *p= get_memory(block_size);
*p = 0;
- arg.remain = p;
+ arg.remain = &p;
pcb.arg = (void*)&arg;
int ret = call_fct("vol_ls", NULL, 0, &pcb, NULL, cancel_cb, err);
- free_pool_memory(arg.remain);
+ free_pool_memory(*arg.remain);
return (ret == 0);
}