import tempfile
from typing import List # for type hinting. Can be removed, if only Python >3.8 is used
+import magic
import tqdm
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
return barcodes
+def get_file_type(path: str) -> str:
+ """
+ Determines the file type, based on MIME type.
+
+ Returns the MIME type.
+ """
+ mime_type = magic.from_file(path, mime=True)
+ logger.debug(f"Detected mime type: {mime_type}")
+ return mime_type
+
+
def convert_from_tiff_to_pdf(filepath: str) -> str:
"""
- converts a given TIFF image file to pdf into a temp. directory.
+ converts a given TIFF image file to pdf into a temporary directory.
+
Returns the new pdf file.
"""
file_name = os.path.splitext(os.path.basename(filepath))[0]
- file_extension = os.path.splitext(os.path.basename(filepath))[1].lower()
+ mime_type = get_file_type(filepath)
tempdir = tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR)
# use old file name with pdf extension
- if file_extension == ".tif" or file_extension == ".tiff":
+ if mime_type == "image/tiff":
newpath = os.path.join(tempdir, file_name + ".pdf")
else:
- logger.warning(f"Cannot convert from {str(file_extension)} to pdf.")
+ logger.warning(
+ f"Cannot convert mime type {str(mime_type)} from {str(filepath)} to pdf.",
+ )
return None
with Image.open(filepath) as image:
images = []
document_list = []
converted_tiff = None
if settings.CONSUMER_BARCODE_TIFF_SUPPORT:
- supported_extensions = [".pdf", ".tiff", ".tif"]
+ supported_mime = ["image/tiff", "application/pdf"]
else:
- supported_extensions = [".pdf"]
- file_extension = os.path.splitext(os.path.basename(path))[1].lower()
- if file_extension not in supported_extensions:
+ supported_mime = ["application/pdf"]
+ mime_type = get_file_type(path)
+ if mime_type not in supported_mime:
# if not supported, skip this routine
logger.warning(
- f"Unsupported file format for barcode reader: {str(file_extension)}",
+ f"Unsupported file format for barcode reader: {str(mime_type)}",
)
else:
- if file_extension in {".tif", ".tiff"}:
+ if mime_type == "image/tiff":
file_to_process = convert_from_tiff_to_pdf(path)
else:
file_to_process = path
img = Image.open(test_file)
self.assertEqual(tasks.barcode_reader(img), ["CUSTOM BARCODE"])
+ def test_get_mime_type(self):
+ tiff_file = os.path.join(
+ os.path.dirname(__file__),
+ "samples",
+ "simple.tiff",
+ )
+ pdf_file = os.path.join(
+ os.path.dirname(__file__),
+ "samples",
+ "simple.pdf",
+ )
+ png_file = os.path.join(
+ os.path.dirname(__file__),
+ "samples",
+ "barcodes",
+ "barcode-128-custom.png",
+ )
+ tiff_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile1")
+ pdf_file_no_extension = os.path.join(settings.SCRATCH_DIR, "testfile2")
+ shutil.copy(tiff_file, tiff_file_no_extension)
+ shutil.copy(pdf_file, pdf_file_no_extension)
+
+ self.assertEqual(tasks.get_file_type(tiff_file), "image/tiff")
+ self.assertEqual(tasks.get_file_type(pdf_file), "application/pdf")
+ self.assertEqual(tasks.get_file_type(tiff_file_no_extension), "image/tiff")
+ self.assertEqual(tasks.get_file_type(pdf_file_no_extension), "application/pdf")
+ self.assertEqual(tasks.get_file_type(png_file), "image/png")
+
def test_convert_from_tiff_to_pdf(self):
test_file = os.path.join(
os.path.dirname(__file__),
self.assertEqual(
cm.output,
[
- "WARNING:paperless.tasks:Unsupported file format for barcode reader: .jpg",
+ "WARNING:paperless.tasks:Unsupported file format for barcode reader: image/jpeg",
],
)
m.assert_called_once()
self.assertIsNone(kwargs["override_document_type_id"])
self.assertIsNone(kwargs["override_tag_ids"])
+ @override_settings(
+ CONSUMER_ENABLE_BARCODES=True,
+ CONSUMER_BARCODE_TIFF_SUPPORT=True,
+ )
+ def test_consume_barcode_supported_no_extension_file(self):
+ """
+ This test assumes barcode and TIFF support are enabled and
+ the user uploads a supported image file, but without extension
+ """
+ test_file = os.path.join(
+ os.path.dirname(__file__),
+ "samples",
+ "barcodes",
+ "patch-code-t-middle.tiff",
+ )
+ dst = os.path.join(settings.SCRATCH_DIR, "patch-code-t-middle")
+ shutil.copy(test_file, dst)
+
+ self.assertEqual(tasks.consume_file(dst), "File successfully split")
+
@mock.patch("documents.tasks.sanity_checker.check_sanity")
def test_sanity_check_success(self, m):
m.return_value = SanityCheckMessages()