--- /dev/null
+# Generated by Django 4.1.9 on 2023-06-29 19:29
+import logging
+import multiprocessing.pool
+import shutil
+import tempfile
+import time
+from pathlib import Path
+
+import gnupg
+from django.conf import settings
+from django.db import migrations
+
+from documents.parsers import run_convert
+
+logger = logging.getLogger("paperless.migrations")
+
+
+def _do_convert(work_package):
+ (
+ existing_encrypted_thumbnail,
+ converted_encrypted_thumbnail,
+ passphrase,
+ ) = work_package
+
+ try:
+ gpg = gnupg.GPG(gnupghome=settings.GNUPG_HOME)
+
+ logger.info(f"Decrypting thumbnail: {existing_encrypted_thumbnail}")
+
+ # Decrypt png
+ decrypted_thumbnail = existing_encrypted_thumbnail.with_suffix("").resolve()
+
+ with open(existing_encrypted_thumbnail, "rb") as existing_encrypted_file:
+ raw_thumb = gpg.decrypt_file(
+ existing_encrypted_file,
+ passphrase=passphrase,
+ always_trust=True,
+ ).data
+ with open(decrypted_thumbnail, "wb") as decrypted_file:
+ decrypted_file.write(raw_thumb)
+
+ converted_decrypted_thumbnail = Path(
+ str(converted_encrypted_thumbnail).replace("webp.gpg", "webp"),
+ ).resolve()
+
+ logger.info(f"Converting decrypted thumbnail: {decrypted_thumbnail}")
+
+ # Convert to webp
+ run_convert(
+ density=300,
+ scale="500x5000>",
+ alpha="remove",
+ strip=True,
+ trim=False,
+ auto_orient=True,
+ input_file=f"{decrypted_thumbnail}[0]",
+ output_file=str(converted_decrypted_thumbnail),
+ )
+
+ logger.info(
+ f"Encrypting converted thumbnail: {converted_decrypted_thumbnail}",
+ )
+
+ # Encrypt webp
+ with open(converted_decrypted_thumbnail, "rb") as converted_decrypted_file:
+ encrypted = gpg.encrypt_file(
+ fileobj_or_path=converted_decrypted_file,
+ recipients=None,
+ passphrase=passphrase,
+ symmetric=True,
+ always_trust=True,
+ ).data
+
+ with open(converted_encrypted_thumbnail, "wb") as converted_encrypted_file:
+ converted_encrypted_file.write(encrypted)
+
+ # Copy newly created thumbnail to thumbnail directory
+ shutil.copy(converted_encrypted_thumbnail, existing_encrypted_thumbnail.parent)
+
+ # Remove the existing encrypted PNG version
+ existing_encrypted_thumbnail.unlink()
+
+ # Remove the decrypted PNG version
+ decrypted_thumbnail.unlink()
+
+ # Remove the decrypted WebP version
+ converted_decrypted_thumbnail.unlink()
+
+ logger.info(
+ "Conversion to WebP completed, "
+ f"replaced {existing_encrypted_thumbnail.name} with {converted_encrypted_thumbnail.name}",
+ )
+
+ except Exception as e:
+ logger.error(f"Error converting thumbnail (existing file unchanged): {e}")
+
+
+def _convert_encrypted_thumbnails_to_webp(apps, schema_editor):
+ start = time.time()
+
+ with tempfile.TemporaryDirectory() as tempdir:
+ work_packages = []
+
+ if len(list(Path(settings.THUMBNAIL_DIR).glob("*.png.gpg"))) > 0:
+ passphrase = settings.PASSPHRASE
+
+ if not passphrase:
+ raise Exception(
+ "Passphrase not defined, encrypted thumbnails cannot be migrated"
+ "without this",
+ )
+
+ for file in Path(settings.THUMBNAIL_DIR).glob("*.png.gpg"):
+ existing_thumbnail = file.resolve()
+
+ # Change the existing filename suffix from png to webp
+ converted_thumbnail_name = Path(
+ str(existing_thumbnail).replace(".png.gpg", ".webp.gpg"),
+ ).name
+
+ # Create the expected output filename in the tempdir
+ converted_thumbnail = (
+ Path(tempdir) / Path(converted_thumbnail_name)
+ ).resolve()
+
+ # Package up the necessary info
+ work_packages.append(
+ (existing_thumbnail, converted_thumbnail, passphrase),
+ )
+
+ if len(work_packages):
+ logger.info(
+ "\n\n"
+ " This is a one-time only migration to convert thumbnails for all of your\n"
+ " *encrypted* documents into WebP format. If you have a lot of encrypted documents, \n"
+ " this may take a while, so a coffee break may be in order."
+ "\n",
+ )
+
+ with multiprocessing.pool.Pool(
+ processes=min(multiprocessing.cpu_count(), 4),
+ maxtasksperchild=4,
+ ) as pool:
+ pool.map(_do_convert, work_packages)
+
+ end = time.time()
+ duration = end - start
+
+ logger.info(f"Conversion completed in {duration:.3f}s")
+
+
+class Migration(migrations.Migration):
+ dependencies = [
+ ("documents", "1036_alter_savedviewfilterrule_rule_type"),
+ ]
+
+ operations = [
+ migrations.RunPython(
+ code=_convert_encrypted_thumbnails_to_webp,
+ reverse_code=migrations.RunPython.noop,
+ ),
+ ]
--- /dev/null
+import shutil
+import tempfile
+from pathlib import Path
+from typing import Callable
+from typing import Iterable
+from typing import Union
+from unittest import mock
+
+from django.test import override_settings
+
+from documents.tests.utils import TestMigrations
+
+
+@override_settings(PASSPHRASE="test")
+@mock.patch(
+ "documents.migrations.1037_webp_encrypted_thumbnail_conversion.multiprocessing.pool.Pool.map",
+)
+@mock.patch("documents.migrations.1037_webp_encrypted_thumbnail_conversion.run_convert")
+class TestMigrateToEncrytpedWebPThumbnails(TestMigrations):
+ migrate_from = "1036_alter_savedviewfilterrule_rule_type"
+ migrate_to = "1037_webp_encrypted_thumbnail_conversion"
+ auto_migrate = False
+
+ def pretend_convert_output(self, *args, **kwargs):
+ """
+ Pretends to do the conversion, by copying the input file
+ to the output file
+ """
+ shutil.copy2(
+ Path(kwargs["input_file"].rstrip("[0]")),
+ Path(kwargs["output_file"]),
+ )
+
+ def pretend_map(self, func: Callable, iterable: Iterable):
+ """
+ Pretends to be the map of a multiprocessing.Pool, but secretly does
+ everything in series
+ """
+ for item in iterable:
+ func(item)
+
+ def create_dummy_thumbnails(
+ self,
+ thumb_dir: Path,
+ ext: str,
+ count: int,
+ start_count: int = 0,
+ ):
+ """
+ Helper to create a certain count of files of given extension in a given directory
+ """
+ for idx in range(count):
+ (Path(thumb_dir) / Path(f"{start_count + idx:07}.{ext}")).touch()
+ # Triple check expected files exist
+ self.assert_file_count_by_extension(ext, thumb_dir, count)
+
+ def create_webp_thumbnail_files(
+ self,
+ thumb_dir: Path,
+ count: int,
+ start_count: int = 0,
+ ):
+ """
+ Creates a dummy WebP thumbnail file in the given directory, based on
+ the database Document
+ """
+ self.create_dummy_thumbnails(thumb_dir, "webp", count, start_count)
+
+ def create_encrypted_webp_thumbnail_files(
+ self,
+ thumb_dir: Path,
+ count: int,
+ start_count: int = 0,
+ ):
+ """
+ Creates a dummy encrypted WebP thumbnail file in the given directory, based on
+ the database Document
+ """
+ self.create_dummy_thumbnails(thumb_dir, "webp.gpg", count, start_count)
+
+ def create_png_thumbnail_files(
+ self,
+ thumb_dir: Path,
+ count: int,
+ start_count: int = 0,
+ ):
+ """
+ Creates a dummy PNG thumbnail file in the given directory, based on
+ the database Document
+ """
+
+ self.create_dummy_thumbnails(thumb_dir, "png", count, start_count)
+
+ def create_encrypted_png_thumbnail_files(
+ self,
+ thumb_dir: Path,
+ count: int,
+ start_count: int = 0,
+ ):
+ """
+ Creates a dummy encrypted PNG thumbnail file in the given directory, based on
+ the database Document
+ """
+
+ self.create_dummy_thumbnails(thumb_dir, "png.gpg", count, start_count)
+
+ def assert_file_count_by_extension(
+ self,
+ ext: str,
+ dir: Union[str, Path],
+ expected_count: int,
+ ):
+ """
+ Helper to assert a certain count of given extension files in given directory
+ """
+ if not isinstance(dir, Path):
+ dir = Path(dir)
+ matching_files = list(dir.glob(f"*.{ext}"))
+ self.assertEqual(len(matching_files), expected_count)
+
+ def assert_encrypted_png_file_count(self, dir: Path, expected_count: int):
+ """
+ Helper to assert a certain count of excrypted PNG extension files in given directory
+ """
+ self.assert_file_count_by_extension("png.gpg", dir, expected_count)
+
+ def assert_encrypted_webp_file_count(self, dir: Path, expected_count: int):
+ """
+ Helper to assert a certain count of encrypted WebP extension files in given directory
+ """
+ self.assert_file_count_by_extension("webp.gpg", dir, expected_count)
+
+ def assert_webp_file_count(self, dir: Path, expected_count: int):
+ """
+ Helper to assert a certain count of WebP extension files in given directory
+ """
+ self.assert_file_count_by_extension("webp", dir, expected_count)
+
+ def assert_png_file_count(self, dir: Path, expected_count: int):
+ """
+ Helper to assert a certain count of PNG extension files in given directory
+ """
+ self.assert_file_count_by_extension("png", dir, expected_count)
+
+ def setUp(self):
+ self.thumbnail_dir = Path(tempfile.mkdtemp()).resolve()
+
+ return super().setUp()
+
+ def tearDown(self) -> None:
+ shutil.rmtree(self.thumbnail_dir)
+
+ return super().tearDown()
+
+ def test_do_nothing_if_converted(
+ self,
+ run_convert_mock: mock.MagicMock,
+ map_mock: mock.MagicMock,
+ ):
+ """
+ GIVEN:
+ - Encrytped document exists with existing encrypted WebP thumbnail path
+ WHEN:
+ - Migration is attempted
+ THEN:
+ - Nothing is converted
+ """
+ map_mock.side_effect = self.pretend_map
+
+ with override_settings(
+ THUMBNAIL_DIR=self.thumbnail_dir,
+ ):
+ self.create_encrypted_webp_thumbnail_files(self.thumbnail_dir, 3)
+
+ self.performMigration()
+ run_convert_mock.assert_not_called()
+
+ self.assert_encrypted_webp_file_count(self.thumbnail_dir, 3)
+
+ def test_convert_thumbnails(
+ self,
+ run_convert_mock: mock.MagicMock,
+ map_mock: mock.MagicMock,
+ ):
+ """
+ GIVEN:
+ - Encrypted documents exist with PNG thumbnail
+ WHEN:
+ - Migration is attempted
+ THEN:
+ - Thumbnails are converted to webp & re-encrypted
+ """
+ map_mock.side_effect = self.pretend_map
+ run_convert_mock.side_effect = self.pretend_convert_output
+
+ with override_settings(
+ THUMBNAIL_DIR=self.thumbnail_dir,
+ ):
+ self.create_encrypted_png_thumbnail_files(self.thumbnail_dir, 3)
+
+ self.performMigration()
+
+ run_convert_mock.assert_called()
+ self.assertEqual(run_convert_mock.call_count, 3)
+
+ self.assert_encrypted_webp_file_count(self.thumbnail_dir, 3)
+
+ def test_convert_errors_out(
+ self,
+ run_convert_mock: mock.MagicMock,
+ map_mock: mock.MagicMock,
+ ):
+ """
+ GIVEN:
+ - Encrypted document exists with PNG thumbnail
+ WHEN:
+ - Migration is attempted, but raises an exception
+ THEN:
+ - Single thumbnail is converted
+ """
+ map_mock.side_effect = self.pretend_map
+ run_convert_mock.side_effect = OSError
+
+ with override_settings(
+ THUMBNAIL_DIR=self.thumbnail_dir,
+ ):
+ self.create_encrypted_png_thumbnail_files(self.thumbnail_dir, 3)
+
+ self.performMigration()
+
+ run_convert_mock.assert_called()
+ self.assertEqual(run_convert_mock.call_count, 3)
+
+ self.assert_encrypted_png_file_count(self.thumbnail_dir, 3)
+
+ def test_convert_mixed(
+ self,
+ run_convert_mock: mock.MagicMock,
+ map_mock: mock.MagicMock,
+ ):
+ """
+ GIVEN:
+ - Documents exist with PNG, encrypted PNG and WebP thumbnails
+ WHEN:
+ - Migration is attempted
+ THEN:
+ - Only encrypted PNG thumbnails are converted
+ """
+ map_mock.side_effect = self.pretend_map
+ run_convert_mock.side_effect = self.pretend_convert_output
+
+ with override_settings(
+ THUMBNAIL_DIR=self.thumbnail_dir,
+ ):
+ self.create_png_thumbnail_files(self.thumbnail_dir, 3)
+ self.create_encrypted_png_thumbnail_files(
+ self.thumbnail_dir,
+ 3,
+ start_count=3,
+ )
+ self.create_webp_thumbnail_files(self.thumbnail_dir, 2, start_count=6)
+ self.create_encrypted_webp_thumbnail_files(
+ self.thumbnail_dir,
+ 3,
+ start_count=8,
+ )
+
+ self.performMigration()
+
+ run_convert_mock.assert_called()
+ self.assertEqual(run_convert_mock.call_count, 3)
+
+ self.assert_png_file_count(self.thumbnail_dir, 3)
+ self.assert_encrypted_webp_file_count(self.thumbnail_dir, 6)
+ self.assert_webp_file_count(self.thumbnail_dir, 2)
+ self.assert_encrypted_png_file_count(self.thumbnail_dir, 0)