]> git.ipfire.org Git - thirdparty/paperless-ngx.git/commitdiff
working split pages
authorflorian on nixos (Florian Brandes) <florian.brandes@posteo.de>
Wed, 23 Mar 2022 21:49:29 +0000 (22:49 +0100)
committerFlorian Brandes <florian.brandes@posteo.de>
Wed, 6 Apr 2022 19:16:41 +0000 (21:16 +0200)
Signed-off-by: florian on nixos (Florian Brandes) <florian.brandes@posteo.de>
src/documents/tasks.py
src/documents/tests/samples/patch-code-t-middle.pdf [new file with mode: 0644]
src/documents/tests/test_tasks.py

index dc646ddfcfffd2401eabf1a8a75ed0218a59cbb4..1dd41b740f0c19abaa39b84a6b31b57ce418b374 100644 (file)
@@ -69,69 +69,92 @@ def train_classifier():
         logger.warning("Classifier error: " + str(e))
 
 
-
-def barcode_reader(page) -> list:
+def barcode_reader(image) -> list:
     """
-    Read any barcodes contained in page
+    Read any barcodes contained in image
     Returns a list containing all found barcodes
     """
-    barcodes = [ ]
+    barcodes = []
     # Decode the barcode image
-    detected_barcodes = pyzbar.decode(page)
+    detected_barcodes = pyzbar.decode(image)
 
     if not detected_barcodes:
         logger.debug(f"No barcode detected")
     else:
         # Traverse through all the detected barcodes in image
         for barcode in detected_barcodes:
-            if barcode.data!="":
+            if barcode.data != "":
                 barcodes = barcodes + [str(barcode.data)]
-                logger.debug(f"Barcode of type {str(barcode.type)} found: {str(barcode.data)}")
+                logger.debug(
+                    f"Barcode of type {str(barcode.type)} found: {str(barcode.data)}"
+                )
     return barcodes
 
-def scan_file_for_seperating_barcodes(filepath) -> list:
+
+def scan_file_for_seperating_barcodes(filepath: str) -> list:
     """
     Scan the provided file for page seperating barcodes
     Returns a list of pagenumbers, which seperate the file
     """
-    seperator_page_numbers = [ ]
+    seperator_page_numbers = []
     # use a temporary directory in case the file os too big to handle in memory
     with tempfile.TemporaryDirectory() as path:
         pages_from_path = convert_from_path(filepath, output_folder=path)
         for current_page_number, page in enumerate(pages_from_path):
             current_barcodes = barcode_reader(page)
-            if current_barcodes.isin("PATCHT"):
-                seperator_page_numbers = seperator_page_numbers + current_page_number
+            if "b'PATCHT'" in current_barcodes:
+                seperator_page_numbers = seperator_page_numbers + [current_page_number]
     return seperator_page_numbers
 
-def seperate_pages(filepath, pages_to_split_on: list):
+
+def seperate_pages(filepath: str, pages_to_split_on: list) -> list:
     """
     Seperate the provided file on the pages_to_split_on.
     The pages which are defined by page_numbers will be removed.
+    Returns a list of (temporary) filepaths to consume.
+    These will need to be deleted later.
     """
-    pages_to_split_on = scan_file_for_seperating_barcodes(filepath)
+    os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
+    tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
     fname = os.path.splitext(os.path.basename(filepath))[0]
     pdf = Pdf.open(filepath)
+    document_paths = []
+    logger.debug(f"Temp dir is {str(tempdir)}")
     # TODO: Get the directory of the file and save the other files there
     # TODO: Return list of new paths of the new files
-    for count, page_number in enumerate(pages_to_split_on):
-        # First element, so iterate from zero to the first seperator page
-        if count == 0:
-            dst = Pdf.new()
-            for page in range(0, page_number):
-                dst.pages.append(page)
-            output_filename = '{}_page_{}.pdf'.format(
-                fname, str(count))
-            with open(output_filename, 'wb') as out:
-                dst.save(out)
-        else:
-            dst = Pdf.new()
-            for page in range(pages_to_split_on[count-1], page_number):
+    if len(pages_to_split_on) <= 0:
+        logger.warning(f"No pages to split on!")
+    else:
+        # go from the first page to the first separator page
+        dst = Pdf.new()
+        for n, page in enumerate(pdf.pages):
+            if n < pages_to_split_on[0]:
                 dst.pages.append(page)
-            output_filename = '{}_page_{}.pdf'.format(
-                fname, page+1)
-            with open(output_filename, 'wb') as out:
-                dst.save(out)
+        output_filename = "{}_document_0.pdf".format(fname)
+        savepath = os.path.join(tempdir, output_filename)
+        with open(savepath, "wb") as out:
+            dst.save(out)
+        document_paths = [savepath]
+
+    for count, page_number in enumerate(pages_to_split_on):
+        logger.debug(f"Count: {str(count)} page_number: {str(page_number)}")
+        dst = Pdf.new()
+        try:
+            next_page = pages_to_split_on[count + 1]
+        except IndexError:
+            next_page = len(pdf.pages)
+        # skip the first page_number. This contains the barcode page
+        for page in range(page_number + 1, next_page):
+            logger.debug(f"page_number: {str(page_number)} next_page: {str(next_page)}")
+            dst.pages.append(pdf.pages[page])
+        output_filename = "{}_document_{}.pdf".format(fname, str(count + 1))
+        logger.debug(f"pdf no:{str(count)} has {str(len(dst.pages))} pages")
+        savepath = os.path.join(tempdir, output_filename)
+        with open(savepath, "wb") as out:
+            dst.save(out)
+        document_paths = document_paths + [savepath]
+    logger.debug(f"Temp files are {str(document_paths)}")
+    return document_paths
 
 
 def consume_file(
@@ -146,7 +169,7 @@ def consume_file(
 
     # check for seperators in current document
     seperator_page_numbers = scan_file_for_seperating_barcodes(path)
-    if seperator_page_numbers != [ ]:
+    if seperator_page_numbers != []:
         logger.debug(f"Pages with seperators found: {str(seperator_page_numbers)}")
 
     document = Consumer().try_consume_file(
diff --git a/src/documents/tests/samples/patch-code-t-middle.pdf b/src/documents/tests/samples/patch-code-t-middle.pdf
new file mode 100644 (file)
index 0000000..2ccb769
Binary files /dev/null and b/src/documents/tests/samples/patch-code-t-middle.pdf differ
index 94df0fc73abad0403c64fd312a3e0ca7a5381c84..02c747e3e11280c80cf18f5239c6939aa1691b2f 100644 (file)
@@ -93,13 +93,43 @@ class TestTasks(DirectoriesMixin, TestCase):
 
     def test_barcode_reader(self):
         test_file = os.path.join(
-            os.path.dirname(__file__),
-            "samples",
-            "patch-code-t.pbm"
+            os.path.dirname(__file__), "samples", "patch-code-t.pbm"
         )
         img = Image.open(test_file)
         self.assertEqual(tasks.barcode_reader(img), ["b'PATCHT'"])
 
+    def test_barcode_reader2(self):
+        test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.png")
+        img = Image.open(test_file)
+        self.assertEqual(tasks.barcode_reader(img), [])
+
+    def test_scan_file_for_seperating_barcodes(self):
+        test_file = os.path.join(
+            os.path.dirname(__file__), "samples", "patch-code-t.pdf"
+        )
+        pages = tasks.scan_file_for_seperating_barcodes(test_file)
+        self.assertEqual(pages, [0])
+
+    def test_scan_file_for_seperating_barcodes2(self):
+        test_file = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
+        pages = tasks.scan_file_for_seperating_barcodes(test_file)
+        self.assertEqual(pages, [])
+
+    def test_scan_file_for_seperating_barcodes3(self):
+        test_file = os.path.join(
+            os.path.dirname(__file__), "samples", "patch-code-t-middle.pdf"
+        )
+        pages = tasks.scan_file_for_seperating_barcodes(test_file)
+        self.assertEqual(pages, [1])
+
+    def test_seperate_pages(self):
+        test_file = os.path.join(
+            os.path.dirname(__file__), "samples", "patch-code-t-middle.pdf"
+        )
+        pages = tasks.seperate_pages(test_file, [1])
+
+        self.assertEqual(len(pages), 2)
+
     @mock.patch("documents.tasks.sanity_checker.check_sanity")
     def test_sanity_check_success(self, m):
         m.return_value = SanityCheckMessages()