]> git.ipfire.org Git - thirdparty/paperless-ngx.git/commitdiff
add first tests for barcode reader
authorflorian on nixos (Florian Brandes) <florian.brandes@posteo.de>
Wed, 23 Mar 2022 12:26:43 +0000 (13:26 +0100)
committerFlorian Brandes <florian.brandes@posteo.de>
Wed, 6 Apr 2022 19:16:41 +0000 (21:16 +0200)
Signed-off-by: florian on nixos (Florian Brandes) <florian.brandes@posteo.de>
src/documents/tasks.py
src/documents/tests/samples/patch-code-t.pbm [new file with mode: 0644]
src/documents/tests/samples/patch-code-t.pdf [new file with mode: 0644]
src/documents/tests/test_tasks.py

index b43f211de0b706637ba8dee5273081036a2ee0f6..dc646ddfcfffd2401eabf1a8a75ed0218a59cbb4 100644 (file)
@@ -16,6 +16,13 @@ from documents.models import Tag
 from documents.sanity_checker import SanityCheckFailedException
 from whoosh.writing import AsyncWriter
 
+# barcode decoder
+import os
+from pyzbar import pyzbar
+from pdf2image import convert_from_path
+import tempfile
+from pikepdf import Pdf
+
 logger = logging.getLogger("paperless.tasks")
 
 
@@ -62,6 +69,71 @@ def train_classifier():
         logger.warning("Classifier error: " + str(e))
 
 
+
+def barcode_reader(page) -> list:
+    """
+    Read any barcodes contained in page
+    Returns a list containing all found barcodes
+    """
+    barcodes = [ ]
+    # Decode the barcode image
+    detected_barcodes = pyzbar.decode(page)
+
+    if not detected_barcodes:
+        logger.debug(f"No barcode detected")
+    else:
+        # Traverse through all the detected barcodes in image
+        for barcode in detected_barcodes:
+            if barcode.data!="":
+                barcodes = barcodes + [str(barcode.data)]
+                logger.debug(f"Barcode of type {str(barcode.type)} found: {str(barcode.data)}")
+    return barcodes
+
+def scan_file_for_seperating_barcodes(filepath) -> list:
+    """
+    Scan the provided file for page seperating barcodes
+    Returns a list of pagenumbers, which seperate the file
+    """
+    seperator_page_numbers = [ ]
+    # use a temporary directory in case the file os too big to handle in memory
+    with tempfile.TemporaryDirectory() as path:
+        pages_from_path = convert_from_path(filepath, output_folder=path)
+        for current_page_number, page in enumerate(pages_from_path):
+            current_barcodes = barcode_reader(page)
+            if current_barcodes.isin("PATCHT"):
+                seperator_page_numbers = seperator_page_numbers + current_page_number
+    return seperator_page_numbers
+
+def seperate_pages(filepath, pages_to_split_on: list):
+    """
+    Seperate the provided file on the pages_to_split_on.
+    The pages which are defined by page_numbers will be removed.
+    """
+    pages_to_split_on = scan_file_for_seperating_barcodes(filepath)
+    fname = os.path.splitext(os.path.basename(filepath))[0]
+    pdf = Pdf.open(filepath)
+    # TODO: Get the directory of the file and save the other files there
+    # TODO: Return list of new paths of the new files
+    for count, page_number in enumerate(pages_to_split_on):
+        # First element, so iterate from zero to the first seperator page
+        if count == 0:
+            dst = Pdf.new()
+            for page in range(0, page_number):
+                dst.pages.append(page)
+            output_filename = '{}_page_{}.pdf'.format(
+                fname, str(count))
+            with open(output_filename, 'wb') as out:
+                dst.save(out)
+        else:
+            dst = Pdf.new()
+            for page in range(pages_to_split_on[count-1], page_number):
+                dst.pages.append(page)
+            output_filename = '{}_page_{}.pdf'.format(
+                fname, page+1)
+            with open(output_filename, 'wb') as out:
+                dst.save(out)
+
+
 def consume_file(
     path,
     override_filename=None,
@@ -72,6 +144,11 @@ def consume_file(
     task_id=None,
 ):
 
+    # check for seperators in current document
+    seperator_page_numbers = scan_file_for_seperating_barcodes(path)
+    if seperator_page_numbers != [ ]:
+        logger.debug(f"Pages with seperators found: {str(seperator_page_numbers)}")
+
     document = Consumer().try_consume_file(
         path,
         override_filename=override_filename,
diff --git a/src/documents/tests/samples/patch-code-t.pbm b/src/documents/tests/samples/patch-code-t.pbm
new file mode 100644 (file)
index 0000000..7e72140
Binary files /dev/null and b/src/documents/tests/samples/patch-code-t.pbm differ
diff --git a/src/documents/tests/samples/patch-code-t.pdf b/src/documents/tests/samples/patch-code-t.pdf
new file mode 100644 (file)
index 0000000..3a8a2a2
Binary files /dev/null and b/src/documents/tests/samples/patch-code-t.pdf differ
index 952d3d920def23bf048f6f2bdf9661e3cec6da10..94df0fc73abad0403c64fd312a3e0ca7a5381c84 100644 (file)
@@ -13,6 +13,8 @@ from documents.sanity_checker import SanityCheckFailedException
 from documents.sanity_checker import SanityCheckMessages
 from documents.tests.utils import DirectoriesMixin
 
+from PIL import Image
+
 
 class TestTasks(DirectoriesMixin, TestCase):
     def test_index_reindex(self):
@@ -89,6 +91,15 @@ class TestTasks(DirectoriesMixin, TestCase):
         mtime3 = os.stat(settings.MODEL_FILE).st_mtime
         self.assertNotEqual(mtime2, mtime3)
 
+    def test_barcode_reader(self):
+        test_file = os.path.join(
+            os.path.dirname(__file__),
+            "samples",
+            "patch-code-t.pbm"
+        )
+        img = Image.open(test_file)
+        self.assertEqual(tasks.barcode_reader(img), ["b'PATCHT'"])
+
     @mock.patch("documents.tasks.sanity_checker.check_sanity")
     def test_sanity_check_success(self, m):
         m.return_value = SanityCheckMessages()