from documents.sanity_checker import SanityCheckFailedException
from whoosh.writing import AsyncWriter
+# barcode decoder
+import os
+from pyzbar import pyzbar
+from pdf2image import convert_from_path
+import tempfile
+from pikepdf import Pdf
+
logger = logging.getLogger("paperless.tasks")
logger.warning("Classifier error: " + str(e))
+
+def barcode_reader(page) -> list:
+ """
+ Read any barcodes contained in page
+ Returns a list containing all found barcodes
+ """
+ barcodes = [ ]
+ # Decode the barcode image
+ detected_barcodes = pyzbar.decode(page)
+
+ if not detected_barcodes:
+ logger.debug(f"No barcode detected")
+ else:
+ # Traverse through all the detected barcodes in image
+ for barcode in detected_barcodes:
+ if barcode.data!="":
+ barcodes = barcodes + [str(barcode.data)]
+ logger.debug(f"Barcode of type {str(barcode.type)} found: {str(barcode.data)}")
+ return barcodes
+
+def scan_file_for_seperating_barcodes(filepath) -> list:
+ """
+ Scan the provided file for page seperating barcodes
+ Returns a list of pagenumbers, which seperate the file
+ """
+ seperator_page_numbers = [ ]
+ # use a temporary directory in case the file os too big to handle in memory
+ with tempfile.TemporaryDirectory() as path:
+ pages_from_path = convert_from_path(filepath, output_folder=path)
+ for current_page_number, page in enumerate(pages_from_path):
+ current_barcodes = barcode_reader(page)
+ if current_barcodes.isin("PATCHT"):
+ seperator_page_numbers = seperator_page_numbers + current_page_number
+ return seperator_page_numbers
+
+def seperate_pages(filepath, pages_to_split_on: list):
+ """
+ Seperate the provided file on the pages_to_split_on.
+ The pages which are defined by page_numbers will be removed.
+ """
+ pages_to_split_on = scan_file_for_seperating_barcodes(filepath)
+ fname = os.path.splitext(os.path.basename(filepath))[0]
+ pdf = Pdf.open(filepath)
+ # TODO: Get the directory of the file and save the other files there
+ # TODO: Return list of new paths of the new files
+ for count, page_number in enumerate(pages_to_split_on):
+ # First element, so iterate from zero to the first seperator page
+ if count == 0:
+ dst = Pdf.new()
+ for page in range(0, page_number):
+ dst.pages.append(page)
+ output_filename = '{}_page_{}.pdf'.format(
+ fname, str(count))
+ with open(output_filename, 'wb') as out:
+ dst.save(out)
+ else:
+ dst = Pdf.new()
+ for page in range(pages_to_split_on[count-1], page_number):
+ dst.pages.append(page)
+ output_filename = '{}_page_{}.pdf'.format(
+ fname, page+1)
+ with open(output_filename, 'wb') as out:
+ dst.save(out)
+
+
def consume_file(
path,
override_filename=None,
task_id=None,
):
+ # check for seperators in current document
+ seperator_page_numbers = scan_file_for_seperating_barcodes(path)
+ if seperator_page_numbers != [ ]:
+ logger.debug(f"Pages with seperators found: {str(seperator_page_numbers)}")
+
document = Consumer().try_consume_file(
path,
override_filename=override_filename,
from documents.sanity_checker import SanityCheckMessages
from documents.tests.utils import DirectoriesMixin
+from PIL import Image
+
class TestTasks(DirectoriesMixin, TestCase):
def test_index_reindex(self):
mtime3 = os.stat(settings.MODEL_FILE).st_mtime
self.assertNotEqual(mtime2, mtime3)
+ def test_barcode_reader(self):
+ test_file = os.path.join(
+ os.path.dirname(__file__),
+ "samples",
+ "patch-code-t.pbm"
+ )
+ img = Image.open(test_file)
+ self.assertEqual(tasks.barcode_reader(img), ["b'PATCHT'"])
+
@mock.patch("documents.tasks.sanity_checker.check_sanity")
def test_sanity_check_success(self, m):
m.return_value = SanityCheckMessages()