From: francisco.garcia Date: Mon, 31 Jul 2023 11:40:59 +0000 (+0200) Subject: k8s: Add compatibility with csi volume snapshots backup X-Git-Tag: Beta-15.0.0~147 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ec6b7ea26316b1d7d6a2713da3787b9cf0f88da2;p=thirdparty%2Fbacula.git k8s: Add compatibility with csi volume snapshots backup In this feature, you can do backups more consistents with volumesnapshots. It create a snapshot in pvc before do backup. --- diff --git a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/backup_job.py b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/backup_job.py index 35ed43a13..0edb2f3fb 100644 --- a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/backup_job.py +++ b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/backup_job.py @@ -128,6 +128,10 @@ class BackupJob(EstimationJob): def process_pvcdata(self, namespace, pvcdata): status = None + # Detect if pvcdata is compatible with snapshots + vsnapshot, pvcdata = self.handle_create_vsnapshot_backup(namespace, pvcdata) + + logging.debug('Process_pvcdata (Backup_job): {} {}'.format(vsnapshot, pvcdata)) if self.prepare_bacula_pod(pvcdata, namespace=namespace, mode='backup'): super()._estimate_file(pvcdata) # here to send info about pvcdata to plugin status = self.__backup_pvcdata(namespace=namespace) @@ -135,6 +139,8 @@ class BackupJob(EstimationJob): self._io.send_eod() self.handle_tarstderr() self.handle_delete_pod(namespace=namespace) + # Both prepare_bacula_pod fails or not, we must remove snapshot and pvc + self.handle_delete_vsnapshot_backup(namespace, vsnapshot, pvcdata) return status def handle_pod_container_exec_command(self, corev1api, namespace, pod, runjobparam, failonerror=False): diff --git a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/job_pod_bacula.py b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/job_pod_bacula.py index f0b710ac8..71f1d0449 100644 --- a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/job_pod_bacula.py +++ b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/job_pod_bacula.py @@ -52,9 +52,13 @@ CANNOT_REMOVE_BACKUP_POD_ERR = "Cannot remove backup pod. Err={}" PVCDATA_GET_ERROR = "Cannot get PVC Data object Err={}" PVCCLONE_YAML_PREPARED_INFO = "Prepare snapshot: {namespace}/{snapname} storage: {storage} capacity: {capacity}" CANNOT_CREATE_PVC_CLONE_ERR = "Cannot create PVC snapshot. Err={}" +CANNOT_CREATE_VSNAPSHOT_ERR = "Cannot create Volume Snapshot. Err={}" +CANNOT_CREATE_PVC_SNAPSHOT_ERR = "Cannot create PVC from Volume Snapshot. Err={}" CANNOT_REMOVE_PVC_CLONE_ERR = "Cannot remove PVC snapshot. Err={}" +CANNOT_REMOVE_VSNAPSHOT_ERR = "Unable to remove volume snapshot {vsnapshot}! Please you must remove it manually." CANNOT_START_CONNECTIONSERVER = "Cannot start ConnectionServer. Err={}" +VSNAPSHOT_BACKUP_COMPATIBLE_INFO = "The pvc `{}` is compatible with volume snapshot backup. Doing backup with this tecnology." class JobPodBacula(Job, metaclass=ABCMeta): """ @@ -330,3 +334,35 @@ class JobPodBacula(Job, metaclass=ABCMeta): return None return clonename + + def handle_create_vsnapshot_backup(self, namespace, pvcdata): + """ + Manage operations to create snapshot and new pvc from this snapshot to do backup. + """ + # Check if pvc is compatible with vsnapshot + if not self._plugin.check_vsnapshot_compatibility(pvcdata.get('storage_class_name')): + return None, pvcdata + + self._io.send_info(VSNAPSHOT_BACKUP_COMPATIBLE_INFO.format(pvcdata.get('name'))) + vsnapshot = self._plugin.create_vsnapshot(namespace, pvcdata.get('name')) + if isinstance(vsnapshot, dict) and 'error' in vsnapshot: + self._handle_error(CANNOT_CREATE_VSNAPSHOT_ERR.format(parse_json_descr(vsnapshot))) + return None, None + # Create pvc from volume snapshot + new_pvc = self._plugin.create_pvc_from_vsnapshot(namespace, pvcdata) + if isinstance(new_pvc, dict) and 'error' in new_pvc: + self._handle_error(CANNOT_CREATE_PVC_SNAPSHOT_ERR.format(parse_json_descr(new_pvc))) + return None, None + self._io.send_info(VSNAPSHOT_BACKUP_COMPATIBLE_INFO.format(new_pvc.get('name'))) + return vsnapshot, new_pvc + + def handle_delete_vsnapshot_backup(self, namespace, vsnapshot, pvcdata): + logging.debug('handle_delete_vsnapshot: {}/{}'.format(namespace, vsnapshot)) + if vsnapshot is None: + return None + response = self._plugin.remove_pvcclone(namespace, pvcdata.get('name')) + if isinstance(response, dict) and "error" in response: + return self._handle_error(CANNOT_REMOVE_PVC_CLONE_ERR.format(vsnapshot=vsnapshot.get('name'))) + response = self._plugin.remove_vsnapshot(namespace, vsnapshot.get('name')) + if isinstance(response, dict) and "error" in response: + return self._handle_error(CANNOT_REMOVE_VSNAPSHOT_ERR.format(vsnapshot=vsnapshot.get('name'))) diff --git a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/kubernetes_plugin.py b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/kubernetes_plugin.py index 2be65e7da..f3eb78c2f 100644 --- a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/kubernetes_plugin.py +++ b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/kubernetes_plugin.py @@ -32,6 +32,7 @@ from baculak8s.plugins import k8sbackend from baculak8s.plugins.k8sbackend.baculabackup import BACULABACKUPPODNAME from baculak8s.plugins.k8sbackend.baculaannotations import annotated_namespaced_pods_data from baculak8s.plugins.k8sbackend.configmaps import * +from baculak8s.plugins.k8sbackend.csi_snapshot import * from baculak8s.plugins.k8sbackend.daemonset import * from baculak8s.plugins.k8sbackend.deployment import * from baculak8s.plugins.k8sbackend.endpoints import * @@ -53,11 +54,11 @@ from baculak8s.plugins.k8sbackend.statefulset import * from baculak8s.plugins.k8sbackend.storageclass import * from baculak8s.plugins.plugin import * from baculak8s.util.date_util import gmt_to_unix_timestamp +from baculak8s.util.lambda_util import wait_until_resource_is_ready HTTP_NOT_FOUND = 404 K8S_POD_CONTAINER_STATUS_ERROR = "Pod status error! Reason: {}, Message: {}" - - +K8S_VOLUME_SNAPSHOT_STATUS_ERROR = "Volume status error! Reason: {}, Message: {}" class KubernetesPlugin(Plugin): """ Plugin that communicates with Kubernetes API @@ -184,6 +185,7 @@ class KubernetesPlugin(Plugin): self.corev1api = client.CoreV1Api(api_client=self.clientAPI) self.appsv1api = client.AppsV1Api(api_client=self.clientAPI) self.storagev1api = client.StorageV1Api(api_client=self.clientAPI) + self.crd_api = client.CustomObjectsApi(api_client=self.clientAPI) # To manage csi snapshots logging.getLogger(requests.packages.urllib3.__package__).setLevel(logging.ERROR) logging.getLogger(client.rest.__package__).setLevel(logging.ERROR) @@ -531,6 +533,9 @@ class KubernetesPlugin(Plugin): return self.__restore_k8s_object(file_info, file_content_source) # TODO: export/move all checks into k8sbackend + def check_vsnapshot_compatibility(self, storage_class_name): + return SNAPSHOT_DRIVER_COMPATIBLE in storage_class_name + def _check_config_map(self, file_info): return self.__exec_check_object( lambda: self.corev1api.read_namespaced_config_map(k8sfile2objname(file_info.name), file_info.namespace)) @@ -777,12 +782,68 @@ class KubernetesPlugin(Plugin): return response return {} + def vsnapshot_isready(self, namespace, snapshot_name): + response = self._vsnapshot_status(namespace, snapshot_name) + if isinstance(response, dict) and "error" in response: + return response + status = response.get('status') + logging.debug("vsnapshot_isready:ReadyToUse:{}".format(status)) + if status is None: + return False + return status.get('readyToUse') + + def create_vsnapshot(self, namespace, pvcname): + logging.info("Creating vsnapshot of pvc `{}`".format(pvcname)) + snapshot = prepare_create_snapshot_body(namespace, pvcname, self._params.get('jobid')) + response = self.__execute(lambda: self.crd_api.create_namespaced_custom_object(**snapshot, pretty=True)) + if isinstance(response, dict) and "error" in response: + return response + snapshot_name = snapshot.get("body").get("metadata").get("name") + logging.debug('Request create snapshot `{}` sent correctly'.format(snapshot_name)) + wait_until_resource_is_ready( + lambda: self.vsnapshot_isready(namespace, snapshot_name), + "Waiting create volume snapshot `{}` to be ready.".format(snapshot_name) + ) + logging.info("Created successfully vsnapshot of pvc `{}`".format(pvcname)) + return { + 'name': snapshot_name, + 'kind': snapshot.get('body').get('kind'), + 'source': pvcname, + 'namespace': namespace, + 'volume_snapshot_class_name': snapshot.get('body').get('spec').get('volumeSnapshotClassName') + } + + def create_pvc_from_vsnapshot(self, namespace, pvcdata): + new_pvc = prepare_pvc_from_vsnapshot_body(namespace, pvcdata, self._params.get('jobid')) + response = self.create_pvc_clone(namespace, new_pvc) + if isinstance(response, dict) and 'error' in response: + return response + response = self.get_pvcdata_namespaced(namespace, new_pvc.get('metadata').get('name')) + if isinstance(response, dict) and 'error' in response: + return response + logging.debug('PVC from vsnapshot Response: {}'.format(response)) + new_pvc_name = response.get('name') + wait_until_resource_is_ready( + lambda: self.pvc_status(namespace, new_pvc_name), + 'Waiting create pvc from snapshot `{}` to be ready'.format(new_pvc_name) + ) + return { + 'name': new_pvc_name, + 'node_name': response.get('node_name'), + 'storage_class_name': response.get('storage_class_name'), + 'capacity': response.get('capacity'), + 'fi': pvcdata.get('fi') + } + def backup_pod_status(self, namespace): return self.corev1api.read_namespaced_pod_status(name=BACULABACKUPPODNAME, namespace=namespace) def pvc_status(self, namespace, pvcname): return self.__execute(lambda: self.corev1api.read_namespaced_persistent_volume_claim_status(name=pvcname, namespace=namespace)) + def _vsnapshot_status(self, namespace, snapshot_name): + return self.__execute(lambda: self.crd_api.get_namespaced_custom_object_status(**prepare_snapshot_action(namespace, snapshot_name))) + def backup_pod_isready(self, namespace, seq=None, podname=BACULABACKUPPODNAME): pod = self.backup_pod_status(namespace) status = pod.status @@ -816,12 +877,25 @@ class KubernetesPlugin(Plugin): return {} def remove_pvcclone(self, namespace, clonename): - logging.debug('remove_pvcclone') + logging.debug('remove_pvcclone `{}`'.format(clonename)) response = self.__execute(lambda: self.corev1api.delete_namespaced_persistent_volume_claim( clonename, namespace, grace_period_seconds=0, propagation_policy='Foreground')) if isinstance(response, dict) and "error" in response: return response + r1 = self.get_pvcdata_namespaced(namespace, clonename) + wait_until_resource_is_ready(lambda: 'error' in self.get_pvcdata_namespaced(namespace, clonename)) + r2 = self.get_pvcdata_namespaced(namespace, clonename) + logging.debug('PVC removed `{}`'.format(clonename)) + return {} + + def remove_vsnapshot(self, namespace, vsnapshot_name): + logging.debug('remove_vsnapshot `{}`'.format(vsnapshot_name)) + response = self.__execute(lambda: self.crd_api.delete_namespaced_custom_object(**prepare_snapshot_action(namespace, vsnapshot_name))) + logging.debug('Response remove_vsnapshot {}'.format(response)) + if isinstance(response, dict) and "error" in response: + return response + logging.debug('Volume Snapshot removed `{}`'.format(vsnapshot_name)) return {} def check_gone_backup_pod(self, namespace, force=False): diff --git a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/util/lambda_util.py b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/util/lambda_util.py index bf7b752af..e24f0085a 100644 --- a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/util/lambda_util.py +++ b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/util/lambda_util.py @@ -15,8 +15,26 @@ # # Bacula(R) is a registered trademark of Kern Sibbald. +import logging +import time + +K8S_NUM_MAX_TRIES_ERROR = "Reached maximum number of tries! Message: {message}" +NUM_MAX_TRIES = 20 def apply(condition, iterable): # Helper method to map lambdas on iterables # Created just for readability return list(map(condition, iterable)) + + +def wait_until_resource_is_ready(action, error_msg = 'Reached num maximum tries.', sleep = 3): + logging.debug('Waiting until resource is ready') + tries = 0 + is_ready = False + while not is_ready: + is_ready = action() + if tries >= NUM_MAX_TRIES: + logging.debug("Reached num maximum tries.") + return { 'error': K8S_NUM_MAX_TRIES_ERROR.format(message=error_msg)} + time.sleep(sleep) + return {} \ No newline at end of file diff --git a/bacula/src/plugins/fd/kubernetes-backend/setup.py b/bacula/src/plugins/fd/kubernetes-backend/setup.py index cc28bab8e..0552413ee 100644 --- a/bacula/src/plugins/fd/kubernetes-backend/setup.py +++ b/bacula/src/plugins/fd/kubernetes-backend/setup.py @@ -24,9 +24,9 @@ if sys.version_info < (3, 0): setup( name='baculak8s', - version='2.0.2', - author='Radoslaw Korzeniewski', - author_email='radekk@korzeniewski.net', + version='2.1.0', + author='Francisco Manuel Garcia Botella', + author_email='francisco.garcia@baculasystems.com', packages=find_packages(exclude=('tests', 'tests.*')), # packages=packages, license="Bacula® - The Network Backup Solution", diff --git a/bacula/src/plugins/fd/kubernetes-fd.c b/bacula/src/plugins/fd/kubernetes-fd.c index 000e86dd1..3955a3b1a 100644 --- a/bacula/src/plugins/fd/kubernetes-fd.c +++ b/bacula/src/plugins/fd/kubernetes-fd.c @@ -18,10 +18,10 @@ */ /** * @file kubernetes-fd.c - * @author Radosław Korzeniewski (radoslaw@korzeniewski.net) + * @author Francisco Manuel Garcia Botella (francisco.garcia@baculasystems.com) * @brief This is a Bacula Kubernetes Plugin with metaplugin interface. - * @version 2.0.5 - * @date 2021-01-05 + * @version 2.1.0 + * @date 2023-07-31 * * @copyright Copyright (c) 2021 All rights reserved. * IP transferred to Bacula Systems according to agreement. @@ -31,10 +31,10 @@ /* Plugin Info definitions */ const char *PLUGIN_LICENSE = "Bacula AGPLv3"; -const char *PLUGIN_AUTHOR = "Radoslaw Korzeniewski"; -const char *PLUGIN_DATE = "April 2021"; -const char *PLUGIN_VERSION = "2.0.5"; // TODO: should synchronize with kubernetes-fd.json -const char *PLUGIN_DESCRIPTION = "Bacula Enterprise Kubernetes Plugin"; +const char *PLUGIN_AUTHOR = "Radoslaw Korzeniewski, Francisco Manuel Garcia Botella"; +const char *PLUGIN_DATE = "July 2023"; +const char *PLUGIN_VERSION = "2.1.0"; // TODO: should synchronize with kubernetes-fd.json +const char *PLUGIN_DESCRIPTION = "Bacula Kubernetes Plugin"; /* Plugin compile time variables */ const char *PLUGINPREFIX = "kubernetes:";