]> git.ipfire.org Git - thirdparty/bacula.git/commitdiff
k8s: Fix restore problem where pod require the pvc data when it starts
authorFrancisco Manuel Garcia Botella <francisco.garcia@baculasystems.com>
Thu, 6 Jun 2024 11:32:44 +0000 (13:32 +0200)
committerEric Bollengier <eric@baculasystems.com>
Wed, 4 Dec 2024 08:14:26 +0000 (09:14 +0100)
bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/estimation_job.py
bacula/src/plugins/fd/kubernetes-backend/baculak8s/jobs/restore_job.py
bacula/src/plugins/fd/kubernetes-backend/baculak8s/plugins/kubernetes_plugin.py
bacula/src/plugins/fd/kubernetes-backend/baculak8s/util/iso8601.py

index 4f66afe81c71762c086a25a288eabba8982821ce..5586ecf98ea4d9708c740984e4f50f41f76a1bbd 100644 (file)
@@ -123,8 +123,8 @@ class EstimationJob(JobPodBacula):
                 if not estimate:
                     self._io.send_info(PROCESSING_NAMESPACE_INFO.format(namespace=ns['name']))
                 self.process_file(ns)
-                nsdata = self._plugin.list_namespaced_objects(nsname, estimate=estimate)
-                logging.debug('NSDATA:{}'.format([ns.keys() for ns in nsdata]))         # limit debug output
+                nsdata = self._plugin.list_namespaced_objects_before_pvcdata(nsname, estimate=estimate)
+                logging.debug('Before PVCData NSDATA:{}'.format([ns.keys() for ns in nsdata]))         # limit debug output
                 for sub in nsdata:
                     # sub is a list of different resource types
                     if isinstance(sub, dict) and sub.get('exception'):
@@ -219,6 +219,16 @@ class EstimationJob(JobPodBacula):
                                 self._io.send_info(PROCESSING_PVCDATA_STOP_INFO.format(pvc=pvc))
                         logging.debug("Finish pvcdatalist")
 
+                nsdata = self._plugin.list_namespaced_objects_after_pvcdata(nsname, estimate=estimate)
+                logging.debug('After PVCDATA NSDATA:{}'.format([ns.keys() for ns in nsdata]))         # limit debug output
+                for sub in nsdata:
+                    # sub is a list of different resource types
+                    if isinstance(sub, dict) and sub.get('exception'):
+                        self._handle_error(RES_LIST_ERROR.format(parse_json_descr(sub)))
+                    else:
+                        for res in sub:
+                            self.process_file(sub.get(res))
+
     def _estimate_file(self, data):
         logging.debug('{}'.format(data))
         if isinstance(data, dict):
index 7ec250bb2bae9c3084fc24b6c2708a654ff9b7d0..8636e3d024c5119f4cffca7a9cc1c5b4a1a1478d 100644 (file)
@@ -170,10 +170,11 @@ class RestoreJob(JobPodBacula):
         for _ in range(DEFAULTTIMEOUT):
             time.sleep(1)
             isready = self._plugin.pvc_isready(namespace, pvcname)
+            iswaiting = self._plugin.is_pvc_waiting_first_consumer(namespace,pvcname)
             if isinstance(isready, dict) and 'error' in isready:
                 # cannot check pvc status
                 self._handle_error(PVC_STATUS_ERR.format(pvcname, parse_json_descr(isready)))
-            elif isready:
+            elif isready or iswaiting:
                 pvcisready = True
                 break
             # well, we have to wait for pvc to be ready, so restart procedure
index 730a3ef129f145b3d86c98e7c1938bac53c042f1..e1eff5c65649ff1a1bf4aad6cdc584b6f4c0d110 100644 (file)
@@ -432,6 +432,30 @@ class KubernetesPlugin(Plugin):
         return self.k8s[K8SObjType.K8SOBJ_NAMESPACE]
 
     def list_namespaced_objects(self, namespace, estimate=False):
+        # We should maintain the following resources backup order
+        # logging.debug("list_namespaced_objects_label:[{}]".format(self.config['labels']))
+        # self.k8s[K8SObjType.K8SOBJ_PVCDATA] = {}
+        # data = [
+        #     self.get_config_maps(namespace, estimate),
+        #     self.get_service_accounts(namespace, estimate),
+        #     self.get_secrets(namespace, estimate),
+        #     # self.get_endpoints(namespace, estimate),
+        #     self.get_pvcs(namespace, estimate),
+        #     self.get_limit_ranges(namespace, estimate),
+        #     self.get_resource_quota(namespace, estimate),
+        #     self.get_services(namespace, estimate),
+        #     self.get_pods(namespace, estimate),
+        #     self.get_daemon_sets(namespace, estimate),
+        #     self.get_replica_sets(namespace, estimate),
+        #     self.get_stateful_sets(namespace, estimate),
+        #     self.get_deployments(namespace, estimate),
+        #     self.get_replication_controller(namespace, estimate),
+        #     self.get_ingresses(namespace,estimate),
+        # ]
+        # return data
+        raise NotImplementedError("Not implemented list_namespaced_objects")
+    
+    def list_namespaced_objects_before_pvcdata(self, namespace, estimate=False):
         # We should maintain the following resources backup order
         logging.debug("list_namespaced_objects_label:[{}]".format(self.config['labels']))
         self.k8s[K8SObjType.K8SOBJ_PVCDATA] = {}
@@ -444,6 +468,29 @@ class KubernetesPlugin(Plugin):
             self.get_limit_ranges(namespace, estimate),
             self.get_resource_quota(namespace, estimate),
             self.get_services(namespace, estimate),
+            # self.get_pods(namespace, estimate),
+            # self.get_daemon_sets(namespace, estimate),
+            # self.get_replica_sets(namespace, estimate),
+            # self.get_stateful_sets(namespace, estimate),
+            # self.get_deployments(namespace, estimate),
+            # self.get_replication_controller(namespace, estimate),
+            # self.get_ingresses(namespace,estimate),
+        ]
+        return data
+    
+    def list_namespaced_objects_after_pvcdata(self, namespace, estimate=False):
+        # We should maintain the following resources backup order
+        logging.debug("list_namespaced_objects_label:[{}]".format(self.config['labels']))
+        self.k8s[K8SObjType.K8SOBJ_PVCDATA] = {}
+        data = [
+            # self.get_config_maps(namespace, estimate),
+            # self.get_service_accounts(namespace, estimate),
+            # self.get_secrets(namespace, estimate),
+            # # self.get_endpoints(namespace, estimate),
+            # self.get_pvcs(namespace, estimate),
+            # self.get_limit_ranges(namespace, estimate),
+            # self.get_resource_quota(namespace, estimate),
+            # self.get_services(namespace, estimate),
             self.get_pods(namespace, estimate),
             self.get_daemon_sets(namespace, estimate),
             self.get_replica_sets(namespace, estimate),
@@ -874,6 +921,18 @@ class KubernetesPlugin(Plugin):
 
     def pvc_status(self, namespace, pvcname):
         return self.__execute(lambda: self.corev1api.read_namespaced_persistent_volume_claim_status(name=pvcname, namespace=namespace))
+    
+    def is_pvc_waiting_first_consumer(self, namespace, pvcname):
+        field_selector = 'involvedObject.kind='+'PersistentVolumeClaim'+',involvedObject.name='+pvcname
+        response = self.__execute(lambda: self.corev1api.list_namespaced_event(namespace, field_selector=field_selector))
+        logging.debug('Response in pvc_waiting_fisrt_consumer:')
+        logging.debug(response)
+        if response is None or (not isinstance(response, dict) and not hasattr(response, "items")):
+            return False
+        for event in response.items:
+            if event.reason == 'WaitForFirstConsumer':
+                return True
+        return False
 
     def _vsnapshot_status(self, namespace, snapshot_name):
         return self.__execute(lambda: self.crd_api.get_namespaced_custom_object_status(**prepare_snapshot_action(namespace, snapshot_name)))
index c666d122c140ed3b41880cf9f736aeb27e262a8b..ab356303a769fd22240e62736a5846cbd6f96b45 100644 (file)
@@ -27,7 +27,7 @@ __all__ = ["parse_date", "ParseError"]
 
 # Adapted from http://delete.me.uk/2005/03/iso8601.html
 ISO8601_REGEX_RAW = "(?P<year>[0-9]{4})-(?P<month>[0-9]{1,2})-(?P<day>[0-9]{1,2})" \
-                    "T(?P<hour>[0-9]{2}):(?P<minute>[0-9]{2})(:(?P<second>[0-9]{2})(\.(?P<fraction>[0-9]+))?)?" \
+                    "T(?P<hour>[0-9]{2}):(?P<minute>[0-9]{2})(:(?P<second>[0-9]{2})(.(?P<fraction>[0-9]+))?)?" \
                     "(?P<timezone>Z|[-+][0-9]{2}(:?[0-9]{2})?)?"
 ISO8601_REGEX = re.compile(ISO8601_REGEX_RAW)
 TIMEZONE_REGEX = re.compile("(?P<prefix>[+-])(?P<hours>[0-9]{2}):?(?P<minutes>[0-9]{2})?")