]> git.ipfire.org Git - thirdparty/paperless-ngx.git/commitdiff
Fix: zip exports not respecting the --delete option (#5245)
authorTrenton H <797416+stumpylog@users.noreply.github.com>
Thu, 4 Jan 2024 19:58:58 +0000 (11:58 -0800)
committerGitHub <noreply@github.com>
Thu, 4 Jan 2024 19:58:58 +0000 (19:58 +0000)
src/documents/management/commands/document_exporter.py
src/documents/tests/test_management_exporter.py

index b08b0b208d580fc91f51f584a05bad762ac0f646..9bdbd1c519b30c6f23cc98db5a716866aefd3273 100644 (file)
@@ -5,6 +5,7 @@ import shutil
 import tempfile
 import time
 from pathlib import Path
+from typing import Optional
 
 import tqdm
 from django.conf import settings
@@ -172,14 +173,14 @@ class Command(BaseCommand):
         self.delete: bool = options["delete"]
         self.no_archive: bool = options["no_archive"]
         self.no_thumbnail: bool = options["no_thumbnail"]
-        zip_export: bool = options["zip"]
+        self.zip_export: bool = options["zip"]
 
         # If zipping, save the original target for later and
         # get a temporary directory for the target instead
         temp_dir = None
-        original_target = None
-        if zip_export:
-            original_target = self.target
+        self.original_target: Optional[Path] = None
+        if self.zip_export:
+            self.original_target = self.target
 
             os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
             temp_dir = tempfile.TemporaryDirectory(
@@ -203,10 +204,10 @@ class Command(BaseCommand):
 
                 # We've written everything to the temporary directory in this case,
                 # now make an archive in the original target, with all files stored
-                if zip_export:
+                if self.zip_export:
                     shutil.make_archive(
                         os.path.join(
-                            original_target,
+                            self.original_target,
                             options["zip_name"],
                         ),
                         format="zip",
@@ -215,7 +216,7 @@ class Command(BaseCommand):
 
         finally:
             # Always cleanup the temporary directory, if one was created
-            if zip_export and temp_dir is not None:
+            if self.zip_export and temp_dir is not None:
                 temp_dir.cleanup()
 
     def dump(self, progress_bar_disable=False):
@@ -466,14 +467,21 @@ class Command(BaseCommand):
 
         if self.delete:
             # 5. Remove files which we did not explicitly export in this run
+            if not self.zip_export:
+                for f in self.files_in_export_dir:
+                    f.unlink()
 
-            for f in self.files_in_export_dir:
-                f.unlink()
-
-                delete_empty_directories(
-                    f.parent,
-                    self.target,
-                )
+                    delete_empty_directories(
+                        f.parent,
+                        self.target,
+                    )
+            else:
+                # 5. Remove anything in the original location (before moving the zip)
+                for item in self.original_target.glob("*"):
+                    if item.is_dir():
+                        shutil.rmtree(item)
+                    else:
+                        item.unlink()
 
     def check_and_copy(self, source, source_checksum, target: Path):
         if target in self.files_in_export_dir:
index a51bd4662351402f6764b33e6e346bbac75b3201..888572b58d2ed71520e3b1f762026b0ed06d3050 100644 (file)
@@ -42,7 +42,7 @@ from documents.tests.utils import paperless_environment
 
 class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
     def setUp(self) -> None:
-        self.target = tempfile.mkdtemp()
+        self.target = Path(tempfile.mkdtemp())
         self.addCleanup(shutil.rmtree, self.target)
 
         self.user = User.objects.create(username="temp_admin")
@@ -496,6 +496,54 @@ class TestExportImport(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
             self.assertIn("manifest.json", zip.namelist())
             self.assertIn("version.json", zip.namelist())
 
+    @override_settings(PASSPHRASE="test")
+    def test_export_zipped_with_delete(self):
+        """
+        GIVEN:
+            - Request to export documents to zipfile
+            - There is one existing file in the target
+            - There is one existing directory in the target
+        WHEN:
+            - Documents are exported
+            - deletion of existing files is requested
+        THEN:
+            - Zipfile is created
+            - Zipfile contains exported files
+            - The existing file and directory in target are removed
+        """
+        shutil.rmtree(os.path.join(self.dirs.media_dir, "documents"))
+        shutil.copytree(
+            os.path.join(os.path.dirname(__file__), "samples", "documents"),
+            os.path.join(self.dirs.media_dir, "documents"),
+        )
+
+        # Create stuff in target directory
+        existing_file = self.target / "test.txt"
+        existing_file.touch()
+        existing_dir = self.target / "somedir"
+        existing_dir.mkdir(parents=True)
+
+        self.assertIsFile(existing_file)
+        self.assertIsDir(existing_dir)
+
+        args = ["document_exporter", self.target, "--zip", "--delete"]
+
+        call_command(*args)
+
+        expected_file = os.path.join(
+            self.target,
+            f"export-{timezone.localdate().isoformat()}.zip",
+        )
+
+        self.assertIsFile(expected_file)
+        self.assertIsNotFile(existing_file)
+        self.assertIsNotDir(existing_dir)
+
+        with ZipFile(expected_file) as zip:
+            self.assertEqual(len(zip.namelist()), 11)
+            self.assertIn("manifest.json", zip.namelist())
+            self.assertIn("version.json", zip.namelist())
+
     def test_export_target_not_exists(self):
         """
         GIVEN: