]> git.ipfire.org Git - thirdparty/paperless-ngx.git/commitdiff
Migrate encrypted png thumbnails to webp 3719/head
authorshamoon <4887959+shamoon@users.noreply.github.com>
Thu, 29 Jun 2023 18:17:52 +0000 (11:17 -0700)
committershamoon <4887959+shamoon@users.noreply.github.com>
Thu, 29 Jun 2023 20:21:15 +0000 (13:21 -0700)
src/documents/migrations/1037_webp_encrypted_thumbnail_conversion.py [new file with mode: 0644]
src/documents/tests/test_migration_encrypted_webp_conversion.py [new file with mode: 0644]

diff --git a/src/documents/migrations/1037_webp_encrypted_thumbnail_conversion.py b/src/documents/migrations/1037_webp_encrypted_thumbnail_conversion.py
new file mode 100644 (file)
index 0000000..6b4f06e
--- /dev/null
@@ -0,0 +1,162 @@
+# Generated by Django 4.1.9 on 2023-06-29 19:29
+import logging
+import multiprocessing.pool
+import shutil
+import tempfile
+import time
+from pathlib import Path
+
+import gnupg
+from django.conf import settings
+from django.db import migrations
+
+from documents.parsers import run_convert
+
+logger = logging.getLogger("paperless.migrations")
+
+
+def _do_convert(work_package):
+    (
+        existing_encrypted_thumbnail,
+        converted_encrypted_thumbnail,
+        passphrase,
+    ) = work_package
+
+    try:
+        gpg = gnupg.GPG(gnupghome=settings.GNUPG_HOME)
+
+        logger.info(f"Decrypting thumbnail: {existing_encrypted_thumbnail}")
+
+        # Decrypt png
+        decrypted_thumbnail = existing_encrypted_thumbnail.with_suffix("").resolve()
+
+        with open(existing_encrypted_thumbnail, "rb") as existing_encrypted_file:
+            raw_thumb = gpg.decrypt_file(
+                existing_encrypted_file,
+                passphrase=passphrase,
+                always_trust=True,
+            ).data
+            with open(decrypted_thumbnail, "wb") as decrypted_file:
+                decrypted_file.write(raw_thumb)
+
+        converted_decrypted_thumbnail = Path(
+            str(converted_encrypted_thumbnail).replace("webp.gpg", "webp"),
+        ).resolve()
+
+        logger.info(f"Converting decrypted thumbnail: {decrypted_thumbnail}")
+
+        # Convert to webp
+        run_convert(
+            density=300,
+            scale="500x5000>",
+            alpha="remove",
+            strip=True,
+            trim=False,
+            auto_orient=True,
+            input_file=f"{decrypted_thumbnail}[0]",
+            output_file=str(converted_decrypted_thumbnail),
+        )
+
+        logger.info(
+            f"Encrypting converted thumbnail: {converted_decrypted_thumbnail}",
+        )
+
+        # Encrypt webp
+        with open(converted_decrypted_thumbnail, "rb") as converted_decrypted_file:
+            encrypted = gpg.encrypt_file(
+                fileobj_or_path=converted_decrypted_file,
+                recipients=None,
+                passphrase=passphrase,
+                symmetric=True,
+                always_trust=True,
+            ).data
+
+            with open(converted_encrypted_thumbnail, "wb") as converted_encrypted_file:
+                converted_encrypted_file.write(encrypted)
+
+        # Copy newly created thumbnail to thumbnail directory
+        shutil.copy(converted_encrypted_thumbnail, existing_encrypted_thumbnail.parent)
+
+        # Remove the existing encrypted PNG version
+        existing_encrypted_thumbnail.unlink()
+
+        # Remove the decrypted PNG version
+        decrypted_thumbnail.unlink()
+
+        # Remove the decrypted WebP version
+        converted_decrypted_thumbnail.unlink()
+
+        logger.info(
+            "Conversion to WebP completed, "
+            f"replaced {existing_encrypted_thumbnail.name} with {converted_encrypted_thumbnail.name}",
+        )
+
+    except Exception as e:
+        logger.error(f"Error converting thumbnail (existing file unchanged): {e}")
+
+
+def _convert_encrypted_thumbnails_to_webp(apps, schema_editor):
+    start = time.time()
+
+    with tempfile.TemporaryDirectory() as tempdir:
+        work_packages = []
+
+        if len(list(Path(settings.THUMBNAIL_DIR).glob("*.png.gpg"))) > 0:
+            passphrase = settings.PASSPHRASE
+
+            if not passphrase:
+                raise Exception(
+                    "Passphrase not defined, encrypted thumbnails cannot be migrated"
+                    "without this",
+                )
+
+            for file in Path(settings.THUMBNAIL_DIR).glob("*.png.gpg"):
+                existing_thumbnail = file.resolve()
+
+                # Change the existing filename suffix from png to webp
+                converted_thumbnail_name = Path(
+                    str(existing_thumbnail).replace(".png.gpg", ".webp.gpg"),
+                ).name
+
+                # Create the expected output filename in the tempdir
+                converted_thumbnail = (
+                    Path(tempdir) / Path(converted_thumbnail_name)
+                ).resolve()
+
+                # Package up the necessary info
+                work_packages.append(
+                    (existing_thumbnail, converted_thumbnail, passphrase),
+                )
+
+            if len(work_packages):
+                logger.info(
+                    "\n\n"
+                    "  This is a one-time only migration to convert thumbnails for all of your\n"
+                    "  *encrypted* documents into WebP format. If you have a lot of encrypted documents, \n"
+                    "  this may take a while, so a coffee break may be in order."
+                    "\n",
+                )
+
+                with multiprocessing.pool.Pool(
+                    processes=min(multiprocessing.cpu_count(), 4),
+                    maxtasksperchild=4,
+                ) as pool:
+                    pool.map(_do_convert, work_packages)
+
+                    end = time.time()
+                    duration = end - start
+
+                logger.info(f"Conversion completed in {duration:.3f}s")
+
+
+class Migration(migrations.Migration):
+    dependencies = [
+        ("documents", "1036_alter_savedviewfilterrule_rule_type"),
+    ]
+
+    operations = [
+        migrations.RunPython(
+            code=_convert_encrypted_thumbnails_to_webp,
+            reverse_code=migrations.RunPython.noop,
+        ),
+    ]
diff --git a/src/documents/tests/test_migration_encrypted_webp_conversion.py b/src/documents/tests/test_migration_encrypted_webp_conversion.py
new file mode 100644 (file)
index 0000000..fbb5a86
--- /dev/null
@@ -0,0 +1,276 @@
+import shutil
+import tempfile
+from pathlib import Path
+from typing import Callable
+from typing import Iterable
+from typing import Union
+from unittest import mock
+
+from django.test import override_settings
+
+from documents.tests.utils import TestMigrations
+
+
+@override_settings(PASSPHRASE="test")
+@mock.patch(
+    "documents.migrations.1037_webp_encrypted_thumbnail_conversion.multiprocessing.pool.Pool.map",
+)
+@mock.patch("documents.migrations.1037_webp_encrypted_thumbnail_conversion.run_convert")
+class TestMigrateToEncrytpedWebPThumbnails(TestMigrations):
+    migrate_from = "1036_alter_savedviewfilterrule_rule_type"
+    migrate_to = "1037_webp_encrypted_thumbnail_conversion"
+    auto_migrate = False
+
+    def pretend_convert_output(self, *args, **kwargs):
+        """
+        Pretends to do the conversion, by copying the input file
+        to the output file
+        """
+        shutil.copy2(
+            Path(kwargs["input_file"].rstrip("[0]")),
+            Path(kwargs["output_file"]),
+        )
+
+    def pretend_map(self, func: Callable, iterable: Iterable):
+        """
+        Pretends to be the map of a multiprocessing.Pool, but secretly does
+        everything in series
+        """
+        for item in iterable:
+            func(item)
+
+    def create_dummy_thumbnails(
+        self,
+        thumb_dir: Path,
+        ext: str,
+        count: int,
+        start_count: int = 0,
+    ):
+        """
+        Helper to create a certain count of files of given extension in a given directory
+        """
+        for idx in range(count):
+            (Path(thumb_dir) / Path(f"{start_count + idx:07}.{ext}")).touch()
+        # Triple check expected files exist
+        self.assert_file_count_by_extension(ext, thumb_dir, count)
+
+    def create_webp_thumbnail_files(
+        self,
+        thumb_dir: Path,
+        count: int,
+        start_count: int = 0,
+    ):
+        """
+        Creates a dummy WebP thumbnail file in the given directory, based on
+        the database Document
+        """
+        self.create_dummy_thumbnails(thumb_dir, "webp", count, start_count)
+
+    def create_encrypted_webp_thumbnail_files(
+        self,
+        thumb_dir: Path,
+        count: int,
+        start_count: int = 0,
+    ):
+        """
+        Creates a dummy encrypted WebP thumbnail file in the given directory, based on
+        the database Document
+        """
+        self.create_dummy_thumbnails(thumb_dir, "webp.gpg", count, start_count)
+
+    def create_png_thumbnail_files(
+        self,
+        thumb_dir: Path,
+        count: int,
+        start_count: int = 0,
+    ):
+        """
+        Creates a dummy PNG thumbnail file in the given directory, based on
+        the database Document
+        """
+
+        self.create_dummy_thumbnails(thumb_dir, "png", count, start_count)
+
+    def create_encrypted_png_thumbnail_files(
+        self,
+        thumb_dir: Path,
+        count: int,
+        start_count: int = 0,
+    ):
+        """
+        Creates a dummy encrypted PNG thumbnail file in the given directory, based on
+        the database Document
+        """
+
+        self.create_dummy_thumbnails(thumb_dir, "png.gpg", count, start_count)
+
+    def assert_file_count_by_extension(
+        self,
+        ext: str,
+        dir: Union[str, Path],
+        expected_count: int,
+    ):
+        """
+        Helper to assert a certain count of given extension files in given directory
+        """
+        if not isinstance(dir, Path):
+            dir = Path(dir)
+        matching_files = list(dir.glob(f"*.{ext}"))
+        self.assertEqual(len(matching_files), expected_count)
+
+    def assert_encrypted_png_file_count(self, dir: Path, expected_count: int):
+        """
+        Helper to assert a certain count of excrypted PNG extension files in given directory
+        """
+        self.assert_file_count_by_extension("png.gpg", dir, expected_count)
+
+    def assert_encrypted_webp_file_count(self, dir: Path, expected_count: int):
+        """
+        Helper to assert a certain count of encrypted WebP extension files in given directory
+        """
+        self.assert_file_count_by_extension("webp.gpg", dir, expected_count)
+
+    def assert_webp_file_count(self, dir: Path, expected_count: int):
+        """
+        Helper to assert a certain count of WebP extension files in given directory
+        """
+        self.assert_file_count_by_extension("webp", dir, expected_count)
+
+    def assert_png_file_count(self, dir: Path, expected_count: int):
+        """
+        Helper to assert a certain count of PNG extension files in given directory
+        """
+        self.assert_file_count_by_extension("png", dir, expected_count)
+
+    def setUp(self):
+        self.thumbnail_dir = Path(tempfile.mkdtemp()).resolve()
+
+        return super().setUp()
+
+    def tearDown(self) -> None:
+        shutil.rmtree(self.thumbnail_dir)
+
+        return super().tearDown()
+
+    def test_do_nothing_if_converted(
+        self,
+        run_convert_mock: mock.MagicMock,
+        map_mock: mock.MagicMock,
+    ):
+        """
+        GIVEN:
+            - Encrytped document exists with existing encrypted WebP thumbnail path
+        WHEN:
+            - Migration is attempted
+        THEN:
+            - Nothing is converted
+        """
+        map_mock.side_effect = self.pretend_map
+
+        with override_settings(
+            THUMBNAIL_DIR=self.thumbnail_dir,
+        ):
+            self.create_encrypted_webp_thumbnail_files(self.thumbnail_dir, 3)
+
+            self.performMigration()
+            run_convert_mock.assert_not_called()
+
+            self.assert_encrypted_webp_file_count(self.thumbnail_dir, 3)
+
+    def test_convert_thumbnails(
+        self,
+        run_convert_mock: mock.MagicMock,
+        map_mock: mock.MagicMock,
+    ):
+        """
+        GIVEN:
+            - Encrypted documents exist with PNG thumbnail
+        WHEN:
+            - Migration is attempted
+        THEN:
+            - Thumbnails are converted to webp & re-encrypted
+        """
+        map_mock.side_effect = self.pretend_map
+        run_convert_mock.side_effect = self.pretend_convert_output
+
+        with override_settings(
+            THUMBNAIL_DIR=self.thumbnail_dir,
+        ):
+            self.create_encrypted_png_thumbnail_files(self.thumbnail_dir, 3)
+
+            self.performMigration()
+
+            run_convert_mock.assert_called()
+            self.assertEqual(run_convert_mock.call_count, 3)
+
+            self.assert_encrypted_webp_file_count(self.thumbnail_dir, 3)
+
+    def test_convert_errors_out(
+        self,
+        run_convert_mock: mock.MagicMock,
+        map_mock: mock.MagicMock,
+    ):
+        """
+        GIVEN:
+            - Encrypted document exists with PNG thumbnail
+        WHEN:
+            - Migration is attempted, but raises an exception
+        THEN:
+            - Single thumbnail is converted
+        """
+        map_mock.side_effect = self.pretend_map
+        run_convert_mock.side_effect = OSError
+
+        with override_settings(
+            THUMBNAIL_DIR=self.thumbnail_dir,
+        ):
+            self.create_encrypted_png_thumbnail_files(self.thumbnail_dir, 3)
+
+            self.performMigration()
+
+            run_convert_mock.assert_called()
+            self.assertEqual(run_convert_mock.call_count, 3)
+
+            self.assert_encrypted_png_file_count(self.thumbnail_dir, 3)
+
+    def test_convert_mixed(
+        self,
+        run_convert_mock: mock.MagicMock,
+        map_mock: mock.MagicMock,
+    ):
+        """
+        GIVEN:
+            - Documents exist with PNG, encrypted PNG and WebP thumbnails
+        WHEN:
+            - Migration is attempted
+        THEN:
+            - Only encrypted PNG thumbnails are converted
+        """
+        map_mock.side_effect = self.pretend_map
+        run_convert_mock.side_effect = self.pretend_convert_output
+
+        with override_settings(
+            THUMBNAIL_DIR=self.thumbnail_dir,
+        ):
+            self.create_png_thumbnail_files(self.thumbnail_dir, 3)
+            self.create_encrypted_png_thumbnail_files(
+                self.thumbnail_dir,
+                3,
+                start_count=3,
+            )
+            self.create_webp_thumbnail_files(self.thumbnail_dir, 2, start_count=6)
+            self.create_encrypted_webp_thumbnail_files(
+                self.thumbnail_dir,
+                3,
+                start_count=8,
+            )
+
+            self.performMigration()
+
+            run_convert_mock.assert_called()
+            self.assertEqual(run_convert_mock.call_count, 3)
+
+            self.assert_png_file_count(self.thumbnail_dir, 3)
+            self.assert_encrypted_webp_file_count(self.thumbnail_dir, 6)
+            self.assert_webp_file_count(self.thumbnail_dir, 2)
+            self.assert_encrypted_png_file_count(self.thumbnail_dir, 0)