]> git.ipfire.org Git - thirdparty/bacula.git/commitdiff
kubernetes: Add Plugin Object support.
authorRadosław Korzeniewski <radoslaw@korzeniewski.net>
Fri, 11 Jun 2021 14:20:11 +0000 (16:20 +0200)
committerEric Bollengier <eric@baculasystems.com>
Thu, 14 Sep 2023 11:56:56 +0000 (13:56 +0200)
This patch adds PluginObject support to the Kubernetes Backend.
Backup job will create two Plugin Objects:
1. aggregated number of PODs backuped
2. aggregated number and size of PVCs backuped
I.e.
*llist objects objectid=59
       objectid: 59
          jobid: 2,212
           path: /@kubernetes/
       filename:
     pluginname: kubernetes: ns=plugintest debug verify_ssl=0
 objectcategory: Container
     objecttype: POD
     objectname: Kubernetes PODs
   objectsource: 192.168.2.140:6443
     objectuuid: 192.168.2.140:6443
     objectsize: 0
   objectstatus: U
    objectcount: 8

*llist objects objectid=60
       objectid: 60
          jobid: 2,212
           path: /@kubernetes/
       filename:
     pluginname: kubernetes: ns=plugintest debug verify_ssl=0
 objectcategory: Container
     objecttype: PVC
     objectname: Kubernetes Persistent Volume Claims
   objectsource: 192.168.2.140:6443
     objectuuid: 192.168.2.140:6443
     objectsize: 5,368,709,120
   objectstatus: U
    objectcount: 5

bacula/src/plugins/fd/kubernetes-backend/baculak8s/io/default_io.py
bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/backup_job.py
bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/estimation_job.py
bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/job.py
bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/k8sbackend/persistentvolumeclaims.py
bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/kubernetes_plugin.py

index fa23fb29c50d4cece589ca228fe0ec0267d10463..b2c85e46e1a696643beb608ac2972fced996c200 100644 (file)
@@ -129,21 +129,38 @@ class DefaultIO(object):
             4 - A EOD packet
         """
 
-        full_file_name = info.name
-
-        self.send_command("FNAME:%s" % full_file_name)
+        self.send_command("FNAME:{}".format(info.name))
+        self.send_command("TSTAMP:{} {} {}".format(info.accessed_at, info.modified_at, info.created_at))
+        self.send_command("STAT:{} {} {} {} {} {}".format(info.type, info.size, info.uid, info.gid, info.mode, info.nlink))
+        self.send_eod()
 
-        timestamp_tuple = (info.accessed_at, info.modified_at, info.created_at)
-        self.send_command("TSTAMP:%s %s %s" % timestamp_tuple)
+    def send_plugin_object(self, podata):
+        """
+            Prints PluginObject class information into stdout
 
-        stat_tuple = (info.type, info.size, info.uid, info.gid, info.mode, info.nlink)
-        self.send_command("STAT:%s %s %s %s %s %s" % stat_tuple)
+        Args:
+            podata (class PluginObject): Entity representing information about a Plugin Object
+        """
+        self.send_command("PLUGINOBJ:{}".format(podata.path))
+        self.send_command("PLUGINOBJ_NAME:{}".format(podata.name))
+        if podata.cat is not None:
+            self.send_command("PLUGINOBJ_CAT:{}".format(podata.cat))
+        if podata.type is not None:
+            self.send_command("PLUGINOBJ_TYPE:{}".format(podata.type))
+        if podata.src is not None:
+            self.send_command("PLUGINOBJ_SRC:{}".format(podata.src))
+        if podata.uuid is not None:
+            self.send_command("PLUGINOBJ_UUID:{}".format(podata.uuid))
+        if podata.size is not None:
+            self.send_command("PLUGINOBJ_SIZE:{}".format(podata.size))
+        if podata.count is not None:
+            self.send_command("PLUGINOBJ_COUNT:{}".format(podata.count))
         self.send_eod()
 
     def send_query_response(self, response):
         key = response[0]
         value = response[1]
-        self.send_command(str(key)+"="+str(value))
+        self.send_command("{}={}".format(key, value))
 
     def read_line(self):
         """
index 1d11865998a1976a816d16d5dc19712ea8fbda42..b9988255f26d351ae3f0376236e073534f60cfed 100644 (file)
@@ -18,6 +18,7 @@
 import logging
 
 from baculak8s.entities.file_info import DIRECTORY
+from baculak8s.entities.plugin_object import PluginObject
 from baculak8s.io.packet_definitions import FILE_DATA_START
 from baculak8s.jobs.estimation_job import PVCDATA_GET_ERROR, EstimationJob
 from baculak8s.jobs.job_pod_bacula import DEFAULTRECVBUFFERSIZE
@@ -27,6 +28,7 @@ from baculak8s.plugins.k8sbackend.baculabackup import BACULABACKUPPODNAME
 from baculak8s.plugins.k8sbackend.podexec import ExecStatus, exec_commands
 from baculak8s.util.respbody import parse_json_descr
 from baculak8s.util.boolparam import BoolParam
+from baculak8s.plugins.k8sbackend.k8sfileinfo import defaultk8spath
 
 BACKUP_START_PACKET = "BackupStart"
 BACKUP_PARAM_LABELS = "Resource Selector: {}"
@@ -53,11 +55,45 @@ class BackupJob(EstimationJob):
             self._io.send_info(BACKUP_PARAM_LABELS.format(_label))
 
     def execution_loop(self):
-        return super().processing_loop(estimate=False)
+        super().processing_loop(estimate=False)
+        self.process_plugin_objects()
 
     def process_file(self, data):
         return self._backup_file(data)
 
+    def process_plugin_objects(self):
+        # logging.debug("SELF: {}".format(dir(self)))
+        """
+            name: plugin command line
+            category: Container
+            type: Kubernetes/Openshift POD
+            source: (name on the network?)
+            uuid: (I don't know if available)
+            size: total size POD would be nice
+            status: T/W/e
+            count: Number of POD
+        """
+        logging.debug("PO_PODS: {}".format(self._plugin.pods_counter))
+        po_pods = PluginObject("/{}/".format(defaultk8spath),
+                               "Kubernetes PODs",
+                               cat="Container",
+                               potype="POD",
+                               src=self._plugin.po_source_data,
+                               uuid=self._plugin.po_source_data,
+                               count=self._plugin.pods_counter)
+        self._io.send_plugin_object(po_pods)
+        logging.debug("PO_PVCS: {} {}".format(self._plugin.pvcs_counter, self._plugin.pvcs_totalsize))
+        po_pvcs = PluginObject("/{}/".format(defaultk8spath),
+                               "Kubernetes Persistent Volume Claims",
+                               cat="Container",
+                               potype="PVC",
+                               src=self._plugin.po_source_data,
+                               uuid=self._plugin.po_source_data,
+                               count=self._plugin.pvcs_counter,
+                               size=self._plugin.pvcs_totalsize)
+        logging.debug("PO_PVCS: {}".format(po_pvcs))
+        self._io.send_plugin_object(po_pvcs)
+
     def _backup_file(self, data):
         file_info = data.get('fi')
         super()._estimate_file(file_info)
index 5b1eca419b1c17d92f01f0588e447ff227012793..51faa335d730ccfe0393e3cffc85e2d67a8ae3f1 100644 (file)
@@ -101,7 +101,7 @@ class EstimationJob(JobPodBacula):
                 self._handle_error(NO_PV_FOUND)
 
             for pv in pv_list:
-                logging.debug('processing pv:' + str(pv))
+                logging.debug("processing pv: {}".format(pv))
                 self.process_file(pv_list.get(pv))
 
         ns_list = self._plugin.list_all_namespaces(estimate=estimate)
@@ -115,7 +115,7 @@ class EstimationJob(JobPodBacula):
 
             for nsname in ns_list:
                 ns = ns_list.get(nsname)
-                logging.debug('processing ns:{}'.format(ns))
+                logging.debug('processing ns: {}'.format(ns.get('name')))
                 if not estimate:
                     self._io.send_info(PROCESSING_NAMESPACE_INFO.format(namespace=ns['name']))
                 self.process_file(ns)
index 69927d7e7e18b9fe7ec8f9fb9d9820805bb1af6d..238d90582826a2d25095c11f35c33e9c3608ab01 100644 (file)
@@ -65,7 +65,7 @@ class Job(metaclass=ABCMeta):
         response = self._plugin.connect()
 
         if 'error' in response:
-            logging.debug("response data:" + str(response))
+            logging.debug("response data:{}".format(response))
             if 'error_code' in response:
                 self._io.send_connection_error(response['error_code'])
             else:
index 9d4b2b6381d579b0488a4c690ba3ddcf5c9a81b6..748521779d7bb90c994dcaf4d984b1059ac9a06c 100644 (file)
@@ -25,6 +25,7 @@
 import logging
 
 import kubernetes
+from baculak8s.util.size_util import k8s_size_to_int
 from baculak8s.entities.file_info import NOT_EMPTY_FILE
 from baculak8s.plugins.k8sbackend.k8sfileinfo import (K8SObjType, encoder_dump,
                                                       encoder_load,
@@ -53,11 +54,15 @@ def persistentvolumeclaims_namespaced_names(corev1api, namespace, labels=""):
 
 def persistentvolumeclaims_list_namespaced(corev1api, namespace, estimate=False, labels=""):
     pvcslist = {}
+    pvcstotalsize = 0
     pvcs = corev1api.list_namespaced_persistent_volume_claim(namespace=namespace, watch=False, label_selector=labels)
     for pvc in pvcs.items:
         pvcdata = persistentvolumeclaims_read_namespaced(corev1api, namespace, pvc.metadata.name)
         spec = encoder_dump(pvcdata)
         # logging.debug("PVCDATA-OBJ:{}".format(pvcdata))
+        pvcsize = k8s_size_to_int(pvcdata.status.capacity['storage'])
+        pvcstotalsize += pvcsize
+        # logging.debug("PVCDATA-SIZE:{} {}".format(pvcdata.status.capacity['storage'], pvcsize))
         # logging.debug("PVCDATA-ENC:{}".format(spec))
         pvcslist['pvc-' + pvc.metadata.name] = {
             'spec': spec if not estimate else None,
@@ -67,7 +72,7 @@ def persistentvolumeclaims_list_namespaced(corev1api, namespace, estimate=False,
                               size=len(spec),
                               creation_timestamp=pvcdata.metadata.creation_timestamp),
         }
-    return pvcslist
+    return pvcslist, pvcstotalsize
 
 
 def persistentvolumeclaims_restore_namespaced(corev1api, file_info, file_content):
index 5d502190be83ee9bcdb6fd24b77d5925cdab19f8..1c52ec6df0d676600159a7cc9cd54a41e35aa13e 100644 (file)
@@ -123,6 +123,10 @@ class KubernetesPlugin(Plugin):
             K8SObjType.K8SOBJ_PVCDATA: {},
         }
         self._connected = False
+        self.pods_counter = 0
+        self.pvcs_counter = 0
+        self.pvcs_totalsize = 0
+        self.po_source_data = "<notset>"
 
     def connect(self):
         """
@@ -204,6 +208,9 @@ class KubernetesPlugin(Plugin):
                 data = {}
             else:
                 data = {'response': response}
+                response = self.__execute(lambda: self.coreapi.get_api_versions(), check_connection=False)
+                if not (isinstance(response, dict) and "error" in response):
+                    self.po_source_data = response.server_address_by_client_cid_rs[0].server_address
             return data
 
     def disconnect(self):
@@ -328,12 +335,20 @@ class KubernetesPlugin(Plugin):
                                                                 self.config['labels']))
 
     def get_pods(self, namespace, estimate=False):
-        return self.__execute(lambda: pods_list_namespaced(self.corev1api, namespace, estimate, self.config['labels']))
+        pods = self.__execute(lambda: pods_list_namespaced(self.corev1api, namespace, estimate, self.config['labels']))
+        nrpods = len(pods)
+        logging.debug("get_pods[{}]:pods:{}".format(namespace, nrpods))
+        self.pods_counter += nrpods
+        return pods
 
     def get_pvcs(self, namespace, estimate=False):
-        pvcs = self.__execute(lambda: persistentvolumeclaims_list_namespaced(self.corev1api, namespace, estimate,
+        pvcs, totalsize = self.__execute(lambda: persistentvolumeclaims_list_namespaced(self.corev1api, namespace, estimate,
                                                                              self.config['labels']))
         self.k8s['pvcs'] = pvcs
+        nrpvcs = len(pvcs)
+        logging.debug("get_pvcs[{}]:pvcs:{}".format(namespace, nrpvcs))
+        self.pvcs_counter += nrpvcs
+        self.pvcs_totalsize += totalsize
         return pvcs
 
     def get_podtemplates(self, namespace, estimate=False):