]> git.ipfire.org Git - thirdparty/bacula.git/commitdiff
k8s: Add compatibility with csi volume snapshots backup
authorfrancisco.garcia <francisco.garcia@baculasystems.com>
Mon, 31 Jul 2023 11:40:59 +0000 (13:40 +0200)
committerEric Bollengier <eric@baculasystems.com>
Thu, 14 Sep 2023 11:57:01 +0000 (13:57 +0200)
In this feature, you can do backups more consistents with volumesnapshots. It create a snapshot in pvc before do backup.

bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/backup_job.py
bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/job_pod_bacula.py
bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/kubernetes_plugin.py
bacula/src/plugins/fd/kubernetes-backend/baculak8s/util/lambda_util.py
bacula/src/plugins/fd/kubernetes-backend/setup.py
bacula/src/plugins/fd/kubernetes-fd.c

index 35ed43a137e3d72564125d4139fd6489747cb4de..0edb2f3fbef1eafa99508dbf1ab31e7db3b1fbea 100644 (file)
@@ -128,6 +128,10 @@ class BackupJob(EstimationJob):
 
     def process_pvcdata(self, namespace, pvcdata):
         status = None
+        # Detect if pvcdata is compatible with snapshots
+        vsnapshot, pvcdata = self.handle_create_vsnapshot_backup(namespace, pvcdata)
+
+        logging.debug('Process_pvcdata (Backup_job): {} {}'.format(vsnapshot, pvcdata))
         if self.prepare_bacula_pod(pvcdata, namespace=namespace, mode='backup'):
             super()._estimate_file(pvcdata)     # here to send info about pvcdata to plugin
             status = self.__backup_pvcdata(namespace=namespace)
@@ -135,6 +139,8 @@ class BackupJob(EstimationJob):
                 self._io.send_eod()
                 self.handle_tarstderr()
             self.handle_delete_pod(namespace=namespace)
+        # Both prepare_bacula_pod fails or not, we must remove snapshot and pvc
+        self.handle_delete_vsnapshot_backup(namespace, vsnapshot, pvcdata)
         return status
 
     def handle_pod_container_exec_command(self, corev1api, namespace, pod, runjobparam, failonerror=False):
index f0b710ac841160e71ebe8688e61a68b920e631e9..71f1d0449d9131a957f87d91bb434dbdeeb3c07f 100644 (file)
@@ -52,9 +52,13 @@ CANNOT_REMOVE_BACKUP_POD_ERR = "Cannot remove backup pod. Err={}"
 PVCDATA_GET_ERROR = "Cannot get PVC Data object Err={}"
 PVCCLONE_YAML_PREPARED_INFO = "Prepare snapshot: {namespace}/{snapname} storage: {storage} capacity: {capacity}"
 CANNOT_CREATE_PVC_CLONE_ERR = "Cannot create PVC snapshot. Err={}"
+CANNOT_CREATE_VSNAPSHOT_ERR = "Cannot create Volume Snapshot. Err={}"
+CANNOT_CREATE_PVC_SNAPSHOT_ERR = "Cannot create PVC from Volume Snapshot. Err={}"
 CANNOT_REMOVE_PVC_CLONE_ERR = "Cannot remove PVC snapshot. Err={}"
+CANNOT_REMOVE_VSNAPSHOT_ERR = "Unable to remove volume snapshot {vsnapshot}! Please you must remove it manually."
 CANNOT_START_CONNECTIONSERVER = "Cannot start ConnectionServer. Err={}"
 
+VSNAPSHOT_BACKUP_COMPATIBLE_INFO = "The pvc `{}` is compatible with volume snapshot backup. Doing backup with this tecnology."
 
 class JobPodBacula(Job, metaclass=ABCMeta):
     """
@@ -330,3 +334,35 @@ class JobPodBacula(Job, metaclass=ABCMeta):
                 return None
 
         return clonename
+
+    def handle_create_vsnapshot_backup(self, namespace, pvcdata):
+        """
+        Manage operations to create snapshot and new pvc from this snapshot to do backup. 
+        """
+        # Check if pvc is compatible with vsnapshot
+        if not self._plugin.check_vsnapshot_compatibility(pvcdata.get('storage_class_name')):
+            return None, pvcdata
+
+        self._io.send_info(VSNAPSHOT_BACKUP_COMPATIBLE_INFO.format(pvcdata.get('name')))
+        vsnapshot = self._plugin.create_vsnapshot(namespace, pvcdata.get('name'))
+        if isinstance(vsnapshot, dict) and 'error' in vsnapshot:
+            self._handle_error(CANNOT_CREATE_VSNAPSHOT_ERR.format(parse_json_descr(vsnapshot)))
+            return None, None
+        # Create pvc from volume snapshot
+        new_pvc = self._plugin.create_pvc_from_vsnapshot(namespace, pvcdata)
+        if isinstance(new_pvc, dict) and 'error' in new_pvc:
+            self._handle_error(CANNOT_CREATE_PVC_SNAPSHOT_ERR.format(parse_json_descr(new_pvc)))
+            return None, None
+        self._io.send_info(VSNAPSHOT_BACKUP_COMPATIBLE_INFO.format(new_pvc.get('name')))
+        return vsnapshot, new_pvc
+
+    def handle_delete_vsnapshot_backup(self, namespace, vsnapshot, pvcdata):
+        logging.debug('handle_delete_vsnapshot: {}/{}'.format(namespace, vsnapshot))
+        if vsnapshot is None:
+            return None
+        response = self._plugin.remove_pvcclone(namespace, pvcdata.get('name'))
+        if isinstance(response, dict) and "error" in response:
+            return self._handle_error(CANNOT_REMOVE_PVC_CLONE_ERR.format(vsnapshot=vsnapshot.get('name')))
+        response = self._plugin.remove_vsnapshot(namespace, vsnapshot.get('name'))
+        if isinstance(response, dict) and "error" in response:
+            return self._handle_error(CANNOT_REMOVE_VSNAPSHOT_ERR.format(vsnapshot=vsnapshot.get('name')))
index 2be65e7da9a937d5eb6cbae4cf165d05e71d3035..f3eb78c2fb78f744a0e0e7fb93e1857e280a777a 100644 (file)
@@ -32,6 +32,7 @@ from baculak8s.plugins import k8sbackend
 from baculak8s.plugins.k8sbackend.baculabackup import BACULABACKUPPODNAME
 from baculak8s.plugins.k8sbackend.baculaannotations import annotated_namespaced_pods_data
 from baculak8s.plugins.k8sbackend.configmaps import *
+from baculak8s.plugins.k8sbackend.csi_snapshot import *
 from baculak8s.plugins.k8sbackend.daemonset import *
 from baculak8s.plugins.k8sbackend.deployment import *
 from baculak8s.plugins.k8sbackend.endpoints import *
@@ -53,11 +54,11 @@ from baculak8s.plugins.k8sbackend.statefulset import *
 from baculak8s.plugins.k8sbackend.storageclass import *
 from baculak8s.plugins.plugin import *
 from baculak8s.util.date_util import gmt_to_unix_timestamp
+from baculak8s.util.lambda_util import wait_until_resource_is_ready
 
 HTTP_NOT_FOUND = 404
 K8S_POD_CONTAINER_STATUS_ERROR = "Pod status error! Reason: {}, Message: {}"
-
-
+K8S_VOLUME_SNAPSHOT_STATUS_ERROR = "Volume status error! Reason: {}, Message: {}"
 class KubernetesPlugin(Plugin):
     """
         Plugin that communicates with Kubernetes API
@@ -184,6 +185,7 @@ class KubernetesPlugin(Plugin):
         self.corev1api = client.CoreV1Api(api_client=self.clientAPI)
         self.appsv1api = client.AppsV1Api(api_client=self.clientAPI)
         self.storagev1api = client.StorageV1Api(api_client=self.clientAPI)
+        self.crd_api = client.CustomObjectsApi(api_client=self.clientAPI) # To manage csi snapshots
 
         logging.getLogger(requests.packages.urllib3.__package__).setLevel(logging.ERROR)
         logging.getLogger(client.rest.__package__).setLevel(logging.ERROR)
@@ -531,6 +533,9 @@ class KubernetesPlugin(Plugin):
             return self.__restore_k8s_object(file_info, file_content_source)
 
     # TODO: export/move all checks into k8sbackend
+    def check_vsnapshot_compatibility(self, storage_class_name):
+        return SNAPSHOT_DRIVER_COMPATIBLE in storage_class_name
+
     def _check_config_map(self, file_info):
         return self.__exec_check_object(
             lambda: self.corev1api.read_namespaced_config_map(k8sfile2objname(file_info.name), file_info.namespace))
@@ -777,12 +782,68 @@ class KubernetesPlugin(Plugin):
             return response
         return {}
 
+    def vsnapshot_isready(self, namespace, snapshot_name):
+        response = self._vsnapshot_status(namespace, snapshot_name)
+        if isinstance(response, dict) and "error" in response:
+            return response
+        status = response.get('status')
+        logging.debug("vsnapshot_isready:ReadyToUse:{}".format(status))
+        if status is None:
+            return False
+        return status.get('readyToUse')
+
+    def create_vsnapshot(self, namespace, pvcname):
+        logging.info("Creating vsnapshot of pvc `{}`".format(pvcname))
+        snapshot = prepare_create_snapshot_body(namespace, pvcname, self._params.get('jobid'))
+        response = self.__execute(lambda: self.crd_api.create_namespaced_custom_object(**snapshot, pretty=True))
+        if isinstance(response, dict) and "error" in response:
+            return response
+        snapshot_name = snapshot.get("body").get("metadata").get("name")
+        logging.debug('Request create snapshot `{}` sent correctly'.format(snapshot_name))
+        wait_until_resource_is_ready(
+            lambda: self.vsnapshot_isready(namespace, snapshot_name),
+            "Waiting create volume snapshot `{}` to be ready.".format(snapshot_name)
+        )
+        logging.info("Created successfully vsnapshot of pvc `{}`".format(pvcname))
+        return {
+            'name': snapshot_name,
+            'kind': snapshot.get('body').get('kind'),
+            'source': pvcname,
+            'namespace': namespace,
+            'volume_snapshot_class_name': snapshot.get('body').get('spec').get('volumeSnapshotClassName')
+        }
+
+    def create_pvc_from_vsnapshot(self, namespace, pvcdata):
+        new_pvc = prepare_pvc_from_vsnapshot_body(namespace, pvcdata, self._params.get('jobid'))
+        response = self.create_pvc_clone(namespace, new_pvc)
+        if isinstance(response, dict) and 'error' in response:
+            return response
+        response = self.get_pvcdata_namespaced(namespace, new_pvc.get('metadata').get('name'))
+        if isinstance(response, dict) and 'error' in response:
+            return response
+        logging.debug('PVC from vsnapshot Response: {}'.format(response))
+        new_pvc_name = response.get('name')
+        wait_until_resource_is_ready(
+            lambda: self.pvc_status(namespace, new_pvc_name),
+            'Waiting create pvc from snapshot `{}` to be ready'.format(new_pvc_name)
+        )
+        return {
+            'name': new_pvc_name,
+            'node_name': response.get('node_name'),
+            'storage_class_name': response.get('storage_class_name'),
+            'capacity': response.get('capacity'),
+            'fi': pvcdata.get('fi')
+        }
+
     def backup_pod_status(self, namespace):
         return self.corev1api.read_namespaced_pod_status(name=BACULABACKUPPODNAME, namespace=namespace)
 
     def pvc_status(self, namespace, pvcname):
         return self.__execute(lambda: self.corev1api.read_namespaced_persistent_volume_claim_status(name=pvcname, namespace=namespace))
 
+    def _vsnapshot_status(self, namespace, snapshot_name):
+        return self.__execute(lambda: self.crd_api.get_namespaced_custom_object_status(**prepare_snapshot_action(namespace, snapshot_name)))
+
     def backup_pod_isready(self, namespace, seq=None, podname=BACULABACKUPPODNAME):
         pod = self.backup_pod_status(namespace)
         status = pod.status
@@ -816,12 +877,25 @@ class KubernetesPlugin(Plugin):
         return {}
 
     def remove_pvcclone(self, namespace, clonename):
-        logging.debug('remove_pvcclone')
+        logging.debug('remove_pvcclone `{}`'.format(clonename))
         response = self.__execute(lambda: self.corev1api.delete_namespaced_persistent_volume_claim(
             clonename, namespace, grace_period_seconds=0,
             propagation_policy='Foreground'))
         if isinstance(response, dict) and "error" in response:
             return response
+        r1 = self.get_pvcdata_namespaced(namespace, clonename)
+        wait_until_resource_is_ready(lambda: 'error' in self.get_pvcdata_namespaced(namespace, clonename))
+        r2 = self.get_pvcdata_namespaced(namespace, clonename)
+        logging.debug('PVC removed `{}`'.format(clonename))
+        return {}
+
+    def remove_vsnapshot(self, namespace, vsnapshot_name):
+        logging.debug('remove_vsnapshot `{}`'.format(vsnapshot_name))
+        response = self.__execute(lambda: self.crd_api.delete_namespaced_custom_object(**prepare_snapshot_action(namespace, vsnapshot_name)))
+        logging.debug('Response remove_vsnapshot {}'.format(response))
+        if isinstance(response, dict) and "error" in response:
+            return response
+        logging.debug('Volume Snapshot removed `{}`'.format(vsnapshot_name))
         return {}
 
     def check_gone_backup_pod(self, namespace, force=False):
index bf7b752af06930572ac76fa252b3f90cb736f576..e24f0085a9bdeb8a739d2085fa676d438f89e818 100644 (file)
 #
 #   Bacula(R) is a registered trademark of Kern Sibbald.
 
+import logging
+import time
+
+K8S_NUM_MAX_TRIES_ERROR = "Reached maximum number of tries! Message: {message}"
+NUM_MAX_TRIES = 20
 
 def apply(condition, iterable):
     # Helper method to map lambdas on iterables
     # Created just for readability
     return list(map(condition, iterable))
+
+
+def wait_until_resource_is_ready(action, error_msg = 'Reached num maximum tries.', sleep = 3):
+    logging.debug('Waiting until resource is ready')
+    tries = 0
+    is_ready = False
+    while not is_ready:
+        is_ready = action()
+        if tries >= NUM_MAX_TRIES:
+            logging.debug("Reached num maximum tries.")
+            return { 'error': K8S_NUM_MAX_TRIES_ERROR.format(message=error_msg)}
+        time.sleep(sleep)
+    return {}
\ No newline at end of file
index cc28bab8e4b5af9b468431dea5a3681196ecf56b..0552413eeded41c8da2f76f712d0a3f433350c6a 100644 (file)
@@ -24,9 +24,9 @@ if sys.version_info < (3, 0):
 
 setup(
     name='baculak8s',
-    version='2.0.2',
-    author='Radoslaw Korzeniewski',
-    author_email='radekk@korzeniewski.net',
+    version='2.1.0',
+    author='Francisco Manuel Garcia Botella',
+    author_email='francisco.garcia@baculasystems.com',
     packages=find_packages(exclude=('tests', 'tests.*')),
     # packages=packages,
     license="Bacula® - The Network Backup Solution",
index 000e86dd10c4e21e6035aecbe3ee180049125f4a..3955a3b1a7b50c549bd321d6c99af3cc5f56cdfc 100644 (file)
 */
 /**
  * @file kubernetes-fd.c
- * @author Radosław Korzeniewski (radoslaw@korzeniewski.net)
+ * @author Francisco Manuel Garcia Botella (francisco.garcia@baculasystems.com)
  * @brief This is a Bacula Kubernetes Plugin with metaplugin interface.
- * @version 2.0.5
- * @date 2021-01-05
+ * @version 2.1.0
+ * @date 2023-07-31
  *
  * @copyright Copyright (c) 2021 All rights reserved.
  *            IP transferred to Bacula Systems according to agreement.
 
 /* Plugin Info definitions */
 const char *PLUGIN_LICENSE       = "Bacula AGPLv3";
-const char *PLUGIN_AUTHOR        = "Radoslaw Korzeniewski";
-const char *PLUGIN_DATE          = "April 2021";
-const char *PLUGIN_VERSION       = "2.0.5"; // TODO: should synchronize with kubernetes-fd.json
-const char *PLUGIN_DESCRIPTION   = "Bacula Enterprise Kubernetes Plugin";
+const char *PLUGIN_AUTHOR        = "Radoslaw Korzeniewski, Francisco Manuel Garcia Botella";
+const char *PLUGIN_DATE          = "July 2023";
+const char *PLUGIN_VERSION       = "2.1.0"; // TODO: should synchronize with kubernetes-fd.json
+const char *PLUGIN_DESCRIPTION   = "Bacula Kubernetes Plugin";
 
 /* Plugin compile time variables */
 const char *PLUGINPREFIX         = "kubernetes:";