From: Radosław Korzeniewski Date: Fri, 11 Jun 2021 14:20:11 +0000 (+0200) Subject: kubernetes: Add Plugin Object support. X-Git-Tag: Beta-15.0.0~756 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=0c4534e64fb41b4bf0026e0d1220fd7151c0bf04;p=thirdparty%2Fbacula.git kubernetes: Add Plugin Object support. This patch adds PluginObject support to the Kubernetes Backend. Backup job will create two Plugin Objects: 1. aggregated number of PODs backuped 2. aggregated number and size of PVCs backuped I.e. *llist objects objectid=59 objectid: 59 jobid: 2,212 path: /@kubernetes/ filename: pluginname: kubernetes: ns=plugintest debug verify_ssl=0 objectcategory: Container objecttype: POD objectname: Kubernetes PODs objectsource: 192.168.2.140:6443 objectuuid: 192.168.2.140:6443 objectsize: 0 objectstatus: U objectcount: 8 *llist objects objectid=60 objectid: 60 jobid: 2,212 path: /@kubernetes/ filename: pluginname: kubernetes: ns=plugintest debug verify_ssl=0 objectcategory: Container objecttype: PVC objectname: Kubernetes Persistent Volume Claims objectsource: 192.168.2.140:6443 objectuuid: 192.168.2.140:6443 objectsize: 5,368,709,120 objectstatus: U objectcount: 5 --- diff --git a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/io/default_io.py b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/io/default_io.py index fa23fb29c..b2c85e46e 100644 --- a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/io/default_io.py +++ b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/io/default_io.py @@ -129,21 +129,38 @@ class DefaultIO(object): 4 - A EOD packet """ - full_file_name = info.name - - self.send_command("FNAME:%s" % full_file_name) + self.send_command("FNAME:{}".format(info.name)) + self.send_command("TSTAMP:{} {} {}".format(info.accessed_at, info.modified_at, info.created_at)) + self.send_command("STAT:{} {} {} {} {} {}".format(info.type, info.size, info.uid, info.gid, info.mode, info.nlink)) + self.send_eod() - timestamp_tuple = (info.accessed_at, info.modified_at, info.created_at) - self.send_command("TSTAMP:%s %s %s" % timestamp_tuple) + def send_plugin_object(self, podata): + """ + Prints PluginObject class information into stdout - stat_tuple = (info.type, info.size, info.uid, info.gid, info.mode, info.nlink) - self.send_command("STAT:%s %s %s %s %s %s" % stat_tuple) + Args: + podata (class PluginObject): Entity representing information about a Plugin Object + """ + self.send_command("PLUGINOBJ:{}".format(podata.path)) + self.send_command("PLUGINOBJ_NAME:{}".format(podata.name)) + if podata.cat is not None: + self.send_command("PLUGINOBJ_CAT:{}".format(podata.cat)) + if podata.type is not None: + self.send_command("PLUGINOBJ_TYPE:{}".format(podata.type)) + if podata.src is not None: + self.send_command("PLUGINOBJ_SRC:{}".format(podata.src)) + if podata.uuid is not None: + self.send_command("PLUGINOBJ_UUID:{}".format(podata.uuid)) + if podata.size is not None: + self.send_command("PLUGINOBJ_SIZE:{}".format(podata.size)) + if podata.count is not None: + self.send_command("PLUGINOBJ_COUNT:{}".format(podata.count)) self.send_eod() def send_query_response(self, response): key = response[0] value = response[1] - self.send_command(str(key)+"="+str(value)) + self.send_command("{}={}".format(key, value)) def read_line(self): """ diff --git a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/backup_job.py b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/backup_job.py index 1d1186599..b9988255f 100644 --- a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/backup_job.py +++ b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/backup_job.py @@ -18,6 +18,7 @@ import logging from baculak8s.entities.file_info import DIRECTORY +from baculak8s.entities.plugin_object import PluginObject from baculak8s.io.packet_definitions import FILE_DATA_START from baculak8s.jobs.estimation_job import PVCDATA_GET_ERROR, EstimationJob from baculak8s.jobs.job_pod_bacula import DEFAULTRECVBUFFERSIZE @@ -27,6 +28,7 @@ from baculak8s.plugins.k8sbackend.baculabackup import BACULABACKUPPODNAME from baculak8s.plugins.k8sbackend.podexec import ExecStatus, exec_commands from baculak8s.util.respbody import parse_json_descr from baculak8s.util.boolparam import BoolParam +from baculak8s.plugins.k8sbackend.k8sfileinfo import defaultk8spath BACKUP_START_PACKET = "BackupStart" BACKUP_PARAM_LABELS = "Resource Selector: {}" @@ -53,11 +55,45 @@ class BackupJob(EstimationJob): self._io.send_info(BACKUP_PARAM_LABELS.format(_label)) def execution_loop(self): - return super().processing_loop(estimate=False) + super().processing_loop(estimate=False) + self.process_plugin_objects() def process_file(self, data): return self._backup_file(data) + def process_plugin_objects(self): + # logging.debug("SELF: {}".format(dir(self))) + """ + name: plugin command line + category: Container + type: Kubernetes/Openshift POD + source: (name on the network?) + uuid: (I don't know if available) + size: total size POD would be nice + status: T/W/e + count: Number of POD + """ + logging.debug("PO_PODS: {}".format(self._plugin.pods_counter)) + po_pods = PluginObject("/{}/".format(defaultk8spath), + "Kubernetes PODs", + cat="Container", + potype="POD", + src=self._plugin.po_source_data, + uuid=self._plugin.po_source_data, + count=self._plugin.pods_counter) + self._io.send_plugin_object(po_pods) + logging.debug("PO_PVCS: {} {}".format(self._plugin.pvcs_counter, self._plugin.pvcs_totalsize)) + po_pvcs = PluginObject("/{}/".format(defaultk8spath), + "Kubernetes Persistent Volume Claims", + cat="Container", + potype="PVC", + src=self._plugin.po_source_data, + uuid=self._plugin.po_source_data, + count=self._plugin.pvcs_counter, + size=self._plugin.pvcs_totalsize) + logging.debug("PO_PVCS: {}".format(po_pvcs)) + self._io.send_plugin_object(po_pvcs) + def _backup_file(self, data): file_info = data.get('fi') super()._estimate_file(file_info) diff --git a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/estimation_job.py b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/estimation_job.py index 5b1eca419..51faa335d 100644 --- a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/estimation_job.py +++ b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/estimation_job.py @@ -101,7 +101,7 @@ class EstimationJob(JobPodBacula): self._handle_error(NO_PV_FOUND) for pv in pv_list: - logging.debug('processing pv:' + str(pv)) + logging.debug("processing pv: {}".format(pv)) self.process_file(pv_list.get(pv)) ns_list = self._plugin.list_all_namespaces(estimate=estimate) @@ -115,7 +115,7 @@ class EstimationJob(JobPodBacula): for nsname in ns_list: ns = ns_list.get(nsname) - logging.debug('processing ns:{}'.format(ns)) + logging.debug('processing ns: {}'.format(ns.get('name'))) if not estimate: self._io.send_info(PROCESSING_NAMESPACE_INFO.format(namespace=ns['name'])) self.process_file(ns) diff --git a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/job.py b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/job.py index 69927d7e7..238d90582 100644 --- a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/job.py +++ b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/job.py @@ -65,7 +65,7 @@ class Job(metaclass=ABCMeta): response = self._plugin.connect() if 'error' in response: - logging.debug("response data:" + str(response)) + logging.debug("response data:{}".format(response)) if 'error_code' in response: self._io.send_connection_error(response['error_code']) else: diff --git a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/k8sbackend/persistentvolumeclaims.py b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/k8sbackend/persistentvolumeclaims.py index 9d4b2b638..748521779 100644 --- a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/k8sbackend/persistentvolumeclaims.py +++ b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/k8sbackend/persistentvolumeclaims.py @@ -25,6 +25,7 @@ import logging import kubernetes +from baculak8s.util.size_util import k8s_size_to_int from baculak8s.entities.file_info import NOT_EMPTY_FILE from baculak8s.plugins.k8sbackend.k8sfileinfo import (K8SObjType, encoder_dump, encoder_load, @@ -53,11 +54,15 @@ def persistentvolumeclaims_namespaced_names(corev1api, namespace, labels=""): def persistentvolumeclaims_list_namespaced(corev1api, namespace, estimate=False, labels=""): pvcslist = {} + pvcstotalsize = 0 pvcs = corev1api.list_namespaced_persistent_volume_claim(namespace=namespace, watch=False, label_selector=labels) for pvc in pvcs.items: pvcdata = persistentvolumeclaims_read_namespaced(corev1api, namespace, pvc.metadata.name) spec = encoder_dump(pvcdata) # logging.debug("PVCDATA-OBJ:{}".format(pvcdata)) + pvcsize = k8s_size_to_int(pvcdata.status.capacity['storage']) + pvcstotalsize += pvcsize + # logging.debug("PVCDATA-SIZE:{} {}".format(pvcdata.status.capacity['storage'], pvcsize)) # logging.debug("PVCDATA-ENC:{}".format(spec)) pvcslist['pvc-' + pvc.metadata.name] = { 'spec': spec if not estimate else None, @@ -67,7 +72,7 @@ def persistentvolumeclaims_list_namespaced(corev1api, namespace, estimate=False, size=len(spec), creation_timestamp=pvcdata.metadata.creation_timestamp), } - return pvcslist + return pvcslist, pvcstotalsize def persistentvolumeclaims_restore_namespaced(corev1api, file_info, file_content): diff --git a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/kubernetes_plugin.py b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/kubernetes_plugin.py index 5d502190b..1c52ec6df 100644 --- a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/kubernetes_plugin.py +++ b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/kubernetes_plugin.py @@ -123,6 +123,10 @@ class KubernetesPlugin(Plugin): K8SObjType.K8SOBJ_PVCDATA: {}, } self._connected = False + self.pods_counter = 0 + self.pvcs_counter = 0 + self.pvcs_totalsize = 0 + self.po_source_data = "" def connect(self): """ @@ -204,6 +208,9 @@ class KubernetesPlugin(Plugin): data = {} else: data = {'response': response} + response = self.__execute(lambda: self.coreapi.get_api_versions(), check_connection=False) + if not (isinstance(response, dict) and "error" in response): + self.po_source_data = response.server_address_by_client_cid_rs[0].server_address return data def disconnect(self): @@ -328,12 +335,20 @@ class KubernetesPlugin(Plugin): self.config['labels'])) def get_pods(self, namespace, estimate=False): - return self.__execute(lambda: pods_list_namespaced(self.corev1api, namespace, estimate, self.config['labels'])) + pods = self.__execute(lambda: pods_list_namespaced(self.corev1api, namespace, estimate, self.config['labels'])) + nrpods = len(pods) + logging.debug("get_pods[{}]:pods:{}".format(namespace, nrpods)) + self.pods_counter += nrpods + return pods def get_pvcs(self, namespace, estimate=False): - pvcs = self.__execute(lambda: persistentvolumeclaims_list_namespaced(self.corev1api, namespace, estimate, + pvcs, totalsize = self.__execute(lambda: persistentvolumeclaims_list_namespaced(self.corev1api, namespace, estimate, self.config['labels'])) self.k8s['pvcs'] = pvcs + nrpvcs = len(pvcs) + logging.debug("get_pvcs[{}]:pvcs:{}".format(namespace, nrpvcs)) + self.pvcs_counter += nrpvcs + self.pvcs_totalsize += totalsize return pvcs def get_podtemplates(self, namespace, estimate=False):