]> git.ipfire.org Git - thirdparty/paperless-ngx.git/commitdiff
fixes #949: change to MIME detection for files 962/head
authorFlorian Brandes <florian.brandes@posteo.de>
Mon, 16 May 2022 15:29:29 +0000 (17:29 +0200)
committerFlorian Brandes <florian.brandes@posteo.de>
Mon, 16 May 2022 15:29:29 +0000 (17:29 +0200)
Signed-off-by: Florian Brandes <florian.brandes@posteo.de>
src/documents/tasks.py
src/documents/tests/test_tasks.py

index 10a1ad6717e1140ce879682d0251f0b6ddc7da86..7fd1f5672fa95f7f3d9ee7b0e08dd59415892776 100644 (file)
@@ -4,6 +4,7 @@ import shutil
 import tempfile
 from typing import List  # for type hinting. Can be removed, if only Python >3.8 is used
 
+import magic
 import tqdm
 from asgiref.sync import async_to_sync
 from channels.layers import get_channel_layer
@@ -95,19 +96,33 @@ def barcode_reader(image) -> List[str]:
     return barcodes
 
 
+def get_file_type(path: str) -> str:
+    """
+    Determines the file type, based on MIME type.
+
+    Returns the MIME type.
+    """
+    mime_type = magic.from_file(path, mime=True)
+    logger.debug(f"Detected mime type: {mime_type}")
+    return mime_type
+
+
 def convert_from_tiff_to_pdf(filepath: str) -> str:
     """
-    converts a given TIFF image file to pdf into a temp. directory.
+    converts a given TIFF image file to pdf into a temporary directory.
+
     Returns the new pdf file.
     """
     file_name = os.path.splitext(os.path.basename(filepath))[0]
-    file_extension = os.path.splitext(os.path.basename(filepath))[1].lower()
+    mime_type = get_file_type(filepath)
     tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
     # use old file name with pdf extension
-    if file_extension == ".tif" or file_extension == ".tiff":
+    if mime_type == "image/tiff":
         newpath = os.path.join(tempdir, file_name + ".pdf")
     else:
-        logger.warning(f"Cannot convert from {str(file_extension)} to pdf.")
+        logger.warning(
+            f"Cannot convert mime type {str(mime_type)} from {str(filepath)} to pdf.",
+        )
         return None
     with Image.open(filepath) as image:
         images = []
@@ -231,17 +246,17 @@ def consume_file(
         document_list = []
         converted_tiff = None
         if settings.CONSUMER_BARCODE_TIFF_SUPPORT:
-            supported_extensions = [".pdf", ".tiff", ".tif"]
+            supported_mime = ["image/tiff", "application/pdf"]
         else:
-            supported_extensions = [".pdf"]
-        file_extension = os.path.splitext(os.path.basename(path))[1].lower()
-        if file_extension not in supported_extensions:
+            supported_mime = ["application/pdf"]
+        mime_type = get_file_type(path)
+        if mime_type not in supported_mime:
             # if not supported, skip this routine
             logger.warning(
-                f"Unsupported file format for barcode reader: {str(file_extension)}",
+                f"Unsupported file format for barcode reader: {str(mime_type)}",
             )
         else:
-            if file_extension in {".tif", ".tiff"}:
+            if mime_type == "image/tiff":
                 file_to_process = convert_from_tiff_to_pdf(path)
             else:
                 file_to_process = path
index 7e5381defab25abd5d5600be021e30e74e569afb..41b9380db1d865f3f6b65322d3d60826864fa9ec 100644 (file)
@@ -204,6 +204,34 @@ class TestTasks(DirectoriesMixin, TestCase):
         img = Image.open(test_file)
         self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"])
 
+    def test_get_mime_type(self):
+        tiff_file = os.path.join(
+            os.path.dirname(__file__),
+            "samples",
+            "simple.tiff",
+        )
+        pdf_file = os.path.join(
+            os.path.dirname(__file__),
+            "samples",
+            "simple.pdf",
+        )
+        png_file = os.path.join(
+            os.path.dirname(__file__),
+            "samples",
+            "barcodes",
+            "barcode-128-custom.png",
+        )
+        tiff_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile1")
+        pdf_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile2")
+        shutil.copy(tiff_file, tiff_file_no_extension)
+        shutil.copy(pdf_file, pdf_file_no_extension)
+
+        self.assertEqual(tasks.get_file_type(tiff_file), "image/tiff")
+        self.assertEqual(tasks.get_file_type(pdf_file), "application/pdf")
+        self.assertEqual(tasks.get_file_type(tiff_file_no_extension), "image/tiff")
+        self.assertEqual(tasks.get_file_type(pdf_file_no_extension), "application/pdf")
+        self.assertEqual(tasks.get_file_type(png_file), "image/png")
+
     def test_convert_from_tiff_to_pdf(self):
         test_file = os.path.join(
             os.path.dirname(__file__),
@@ -469,7 +497,7 @@ class TestTasks(DirectoriesMixin, TestCase):
         self.assertEqual(
             cm.output,
             [
-                "WARNING:paperless.tasks:Unsupported file format for barcode reader: .jpg",
+                "WARNING:paperless.tasks:Unsupported file format for barcode reader: image/jpeg",
             ],
         )
         m.assert_called_once()
@@ -481,6 +509,26 @@ class TestTasks(DirectoriesMixin, TestCase):
         self.assertIsNone(kwargs["override_document_type_id"])
         self.assertIsNone(kwargs["override_tag_ids"])
 
+    @override_settings(
+        CONSUMER_ENABLE_BARCODES=True,
+        CONSUMER_BARCODE_TIFF_SUPPORT=True,
+    )
+    def test_consume_barcode_supported_no_extension_file(self):
+        """
+        This test assumes barcode and TIFF support are enabled and
+        the user uploads a supported image file, but without extension
+        """
+        test_file = os.path.join(
+            os.path.dirname(__file__),
+            "samples",
+            "barcodes",
+            "patch-code-t-middle.tiff",
+        )
+        dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle")
+        shutil.copy(test_file, dst)
+
+        self.assertEqual(tasks.consume_file(dst), "File successfully split")
+
     @mock.patch("documents.tasks.sanity_checker.check_sanity")
     def test_sanity_check_success(self, m):
         m.return_value = SanityCheckMessages()