From: Francisco Manuel Garcia Botella Date: Thu, 6 Jun 2024 11:32:44 +0000 (+0200) Subject: k8s: Fix restore problem where pod require the pvc data when it starts X-Git-Tag: Release-15.0.3~22 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9835e04256e264e41e6747b7c04a5593072c78f5;p=thirdparty%2Fbacula.git k8s: Fix restore problem where pod require the pvc data when it starts --- diff --git a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/estimation_job.py b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/estimation_job.py index 4f66afe81..5586ecf98 100644 --- a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/estimation_job.py +++ b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/estimation_job.py @@ -123,8 +123,8 @@ class EstimationJob(JobPodBacula): if not estimate: self._io.send_info(PROCESSING_NAMESPACE_INFO.format(namespace=ns['name'])) self.process_file(ns) - nsdata = self._plugin.list_namespaced_objects(nsname, estimate=estimate) - logging.debug('NSDATA:{}'.format([ns.keys() for ns in nsdata])) # limit debug output + nsdata = self._plugin.list_namespaced_objects_before_pvcdata(nsname, estimate=estimate) + logging.debug('Before PVCData NSDATA:{}'.format([ns.keys() for ns in nsdata])) # limit debug output for sub in nsdata: # sub is a list of different resource types if isinstance(sub, dict) and sub.get('exception'): @@ -219,6 +219,16 @@ class EstimationJob(JobPodBacula): self._io.send_info(PROCESSING_PVCDATA_STOP_INFO.format(pvc=pvc)) logging.debug("Finish pvcdatalist") + nsdata = self._plugin.list_namespaced_objects_after_pvcdata(nsname, estimate=estimate) + logging.debug('After PVCDATA NSDATA:{}'.format([ns.keys() for ns in nsdata])) # limit debug output + for sub in nsdata: + # sub is a list of different resource types + if isinstance(sub, dict) and sub.get('exception'): + self._handle_error(RES_LIST_ERROR.format(parse_json_descr(sub))) + else: + for res in sub: + self.process_file(sub.get(res)) + def _estimate_file(self, data): logging.debug('{}'.format(data)) if isinstance(data, dict): diff --git a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/restore_job.py b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/restore_job.py index 7ec250bb2..8636e3d02 100644 --- a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/restore_job.py +++ b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/restore_job.py @@ -170,10 +170,11 @@ class RestoreJob(JobPodBacula): for _ in range(DEFAULTTIMEOUT): time.sleep(1) isready = self._plugin.pvc_isready(namespace, pvcname) + iswaiting = self._plugin.is_pvc_waiting_first_consumer(namespace,pvcname) if isinstance(isready, dict) and 'error' in isready: # cannot check pvc status self._handle_error(PVC_STATUS_ERR.format(pvcname, parse_json_descr(isready))) - elif isready: + elif isready or iswaiting: pvcisready = True break # well, we have to wait for pvc to be ready, so restart procedure diff --git a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/kubernetes_plugin.py b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/kubernetes_plugin.py index 730a3ef12..e1eff5c65 100644 --- a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/kubernetes_plugin.py +++ b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/kubernetes_plugin.py @@ -432,6 +432,30 @@ class KubernetesPlugin(Plugin): return self.k8s[K8SObjType.K8SOBJ_NAMESPACE] def list_namespaced_objects(self, namespace, estimate=False): + # We should maintain the following resources backup order + # logging.debug("list_namespaced_objects_label:[{}]".format(self.config['labels'])) + # self.k8s[K8SObjType.K8SOBJ_PVCDATA] = {} + # data = [ + # self.get_config_maps(namespace, estimate), + # self.get_service_accounts(namespace, estimate), + # self.get_secrets(namespace, estimate), + # # self.get_endpoints(namespace, estimate), + # self.get_pvcs(namespace, estimate), + # self.get_limit_ranges(namespace, estimate), + # self.get_resource_quota(namespace, estimate), + # self.get_services(namespace, estimate), + # self.get_pods(namespace, estimate), + # self.get_daemon_sets(namespace, estimate), + # self.get_replica_sets(namespace, estimate), + # self.get_stateful_sets(namespace, estimate), + # self.get_deployments(namespace, estimate), + # self.get_replication_controller(namespace, estimate), + # self.get_ingresses(namespace,estimate), + # ] + # return data + raise NotImplementedError("Not implemented list_namespaced_objects") + + def list_namespaced_objects_before_pvcdata(self, namespace, estimate=False): # We should maintain the following resources backup order logging.debug("list_namespaced_objects_label:[{}]".format(self.config['labels'])) self.k8s[K8SObjType.K8SOBJ_PVCDATA] = {} @@ -444,6 +468,29 @@ class KubernetesPlugin(Plugin): self.get_limit_ranges(namespace, estimate), self.get_resource_quota(namespace, estimate), self.get_services(namespace, estimate), + # self.get_pods(namespace, estimate), + # self.get_daemon_sets(namespace, estimate), + # self.get_replica_sets(namespace, estimate), + # self.get_stateful_sets(namespace, estimate), + # self.get_deployments(namespace, estimate), + # self.get_replication_controller(namespace, estimate), + # self.get_ingresses(namespace,estimate), + ] + return data + + def list_namespaced_objects_after_pvcdata(self, namespace, estimate=False): + # We should maintain the following resources backup order + logging.debug("list_namespaced_objects_label:[{}]".format(self.config['labels'])) + self.k8s[K8SObjType.K8SOBJ_PVCDATA] = {} + data = [ + # self.get_config_maps(namespace, estimate), + # self.get_service_accounts(namespace, estimate), + # self.get_secrets(namespace, estimate), + # # self.get_endpoints(namespace, estimate), + # self.get_pvcs(namespace, estimate), + # self.get_limit_ranges(namespace, estimate), + # self.get_resource_quota(namespace, estimate), + # self.get_services(namespace, estimate), self.get_pods(namespace, estimate), self.get_daemon_sets(namespace, estimate), self.get_replica_sets(namespace, estimate), @@ -874,6 +921,18 @@ class KubernetesPlugin(Plugin): def pvc_status(self, namespace, pvcname): return self.__execute(lambda: self.corev1api.read_namespaced_persistent_volume_claim_status(name=pvcname, namespace=namespace)) + + def is_pvc_waiting_first_consumer(self, namespace, pvcname): + field_selector = 'involvedObject.kind='+'PersistentVolumeClaim'+',involvedObject.name='+pvcname + response = self.__execute(lambda: self.corev1api.list_namespaced_event(namespace, field_selector=field_selector)) + logging.debug('Response in pvc_waiting_fisrt_consumer:') + logging.debug(response) + if response is None or (not isinstance(response, dict) and not hasattr(response, "items")): + return False + for event in response.items: + if event.reason == 'WaitForFirstConsumer': + return True + return False def _vsnapshot_status(self, namespace, snapshot_name): return self.__execute(lambda: self.crd_api.get_namespaced_custom_object_status(**prepare_snapshot_action(namespace, snapshot_name))) diff --git a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/util/iso8601.py b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/util/iso8601.py index c666d122c..ab356303a 100644 --- a/bacula/src/plugins/fd/kubernetes-backend/baculak8s/util/iso8601.py +++ b/bacula/src/plugins/fd/kubernetes-backend/baculak8s/util/iso8601.py @@ -27,7 +27,7 @@ __all__ = ["parse_date", "ParseError"] # Adapted from http://delete.me.uk/2005/03/iso8601.html ISO8601_REGEX_RAW = "(?P[0-9]{4})-(?P[0-9]{1,2})-(?P[0-9]{1,2})" \ - "T(?P[0-9]{2}):(?P[0-9]{2})(:(?P[0-9]{2})(\.(?P[0-9]+))?)?" \ + "T(?P[0-9]{2}):(?P[0-9]{2})(:(?P[0-9]{2})(.(?P[0-9]+))?)?" \ "(?PZ|[-+][0-9]{2}(:?[0-9]{2})?)?" ISO8601_REGEX = re.compile(ISO8601_REGEX_RAW) TIMEZONE_REGEX = re.compile("(?P[+-])(?P[0-9]{2}):?(?P[0-9]{2})?")