-z, --zip
-zn, --zip-name
--data-only
+--passphrase
```
`target` is a folder to which the data gets written. This includes
If `--data-only` is provided, only the database will be exported. This option is intended
to facilitate database upgrades without needing to clean documents and thumbnails from the media directory.
+If `--passphrase` is provided, it will be used to encrypt certain fields in the export. This value
+must be provided to import. If this value is lost, the export cannot be imported.
+
!!! warning
If exporting with the file name format, there may be errors due to
document_importer source
```
-| Option | Required | Default | Description |
-| ----------- | -------- | ------- | ------------------------------------------------------------------------- |
-| source | Yes | N/A | The directory containing an export |
-| --data-only | No | False | If provided, only import data, do not import document files or thumbnails |
+| Option | Required | Default | Description |
+| -------------- | -------- | ------- | ------------------------------------------------------------------------- |
+| source | Yes | N/A | The directory containing an export |
+| `--data-only` | No | False | If provided, only import data, do not import document files or thumbnails |
+| `--passphrase` | No | N/A | If your export was encrypted with a passphrase, must be provided |
When you use the provided docker compose script, put the export inside
the `export` folder in your paperless source directory. Specify
`../export` as the `source`.
-Note that .zip files (as can be generated from the exporter) are not supported.
+Note that .zip files (as can be generated from the exporter) are not supported. You must unzip them into
+the target directory first.
!!! note
!!! warning
The importer should be run against a completely empty installation (database and directories) of Paperless-ngx.
+ If using a data only import, only the database must be empty.
### Document retagger {#retagger}
from documents.file_handling import delete_empty_directories
from documents.file_handling import generate_filename
+from documents.management.commands.mixins import CryptMixin
from documents.models import Correspondent
from documents.models import CustomField
from documents.models import CustomFieldInstance
from paperless_mail.models import MailRule
-class Command(BaseCommand):
+class Command(CryptMixin, BaseCommand):
help = (
"Decrypt and rename all files in our collection into a given target "
"directory. And include a manifest file containing document data for "
help="If set, the progress bar will not be shown",
)
+ parser.add_argument(
+ "--passphrase",
+ help="If provided, is used to encrypt sensitive data in the export",
+ )
+
def handle(self, *args, **options):
self.target = Path(options["target"]).resolve()
self.split_manifest: bool = options["split_manifest"]
self.zip_export: bool = options["zip"]
self.data_only: bool = options["data_only"]
self.no_progress_bar: bool = options["no_progress_bar"]
+ self.passphrase: Optional[str] = options.get("passphrase")
self.files_in_export_dir: set[Path] = set()
self.exported_files: set[str] = set()
serializers.serialize("json", manifest_key_to_object_query[key]),
)
+ self.encrypt_secret_fields(manifest_dict)
+
# These are treated specially and included in the per-document manifest
# if that setting is enabled. Otherwise, they are just exported to the bulk
# manifest
self.files_in_export_dir.remove(manifest_path)
# 4.2 write version information to target folder
- version_path = (self.target / "version.json").resolve()
- version_path.write_text(
+ extra_metadata_path = (self.target / "metadata.json").resolve()
+ metadata: dict[str, str | int | dict[str, str | int]] = {
+ "version": version.__full_version_str__,
+ }
+
+ # 4.2.1 If needed, write the crypto values into the metadata
+ # Django stores most of these in the field itself, we store them once here
+ if self.passphrase:
+ metadata.update(self.get_crypt_params())
+ extra_metadata_path.write_text(
json.dumps(
- {"version": version.__full_version_str__},
+ metadata,
indent=2,
ensure_ascii=False,
),
encoding="utf-8",
)
- if version_path in self.files_in_export_dir:
- self.files_in_export_dir.remove(version_path)
+ if extra_metadata_path in self.files_in_export_dir:
+ self.files_in_export_dir.remove(extra_metadata_path)
if self.delete:
# 5. Remove files which we did not explicitly export in this run
if perform_copy:
target.parent.mkdir(parents=True, exist_ok=True)
copy_file_with_basic_stats(source, target)
+
+ def encrypt_secret_fields(self, manifest: dict) -> None:
+ """
+ Encrypts certain fields in the export. Currently limited to the mail account password
+ """
+
+ if self.passphrase:
+ self.setup_crypto(passphrase=self.passphrase)
+
+ for crypt_config in self.CRYPT_FIELDS:
+ exporter_key = crypt_config["exporter_key"]
+ crypt_fields = crypt_config["fields"]
+ for manifest_record in manifest[exporter_key]:
+ for field in crypt_fields:
+ manifest_record["fields"][field] = self.encrypt_string(
+ value=manifest_record["fields"][field],
+ )
+
+ elif MailAccount.objects.count() > 0:
+ self.stdout.write(
+ self.style.NOTICE(
+ "You have configured mail accounts, "
+ "but no passphrase was given. "
+ "Passwords will be in plaintext",
+ ),
+ )
import os
from contextlib import contextmanager
from pathlib import Path
+from typing import Optional
import tqdm
from django.conf import settings
from filelock import FileLock
from documents.file_handling import create_source_path_directory
+from documents.management.commands.mixins import CryptMixin
from documents.models import Correspondent
from documents.models import CustomField
from documents.models import CustomFieldInstance
from documents.models import Tag
from documents.parsers import run_convert
from documents.settings import EXPORTER_ARCHIVE_NAME
+from documents.settings import EXPORTER_CRYPTO_SETTINGS_NAME
from documents.settings import EXPORTER_FILE_NAME
from documents.settings import EXPORTER_THUMBNAIL_NAME
from documents.signals.handlers import update_filename_and_move_files
sig.connect(receiver=receiver, sender=sender)
-class Command(BaseCommand):
+class Command(CryptMixin, BaseCommand):
help = (
"Using a manifest.json file, load the data from there, and import the "
"documents it refers to."
help="If set, only the database will be exported, not files",
)
+ parser.add_argument(
+ "--passphrase",
+ help="If provided, is used to sensitive fields in the export",
+ )
+
def pre_check(self) -> None:
"""
- Runs some initial checks against the source directory, including looking for
- common mistakes like having files still and users other than expected
+ Runs some initial checks against the state of the install and source, including:
+ - Does the target exist?
+ - Can we access the target?
+ - Does the target have a manifest file?
+ - Are there existing files in the document folders?
+ - Are there existing users or documents in the database?
"""
+ def pre_check_maybe_not_empty():
+ # Skip this check if operating only on the database
+ # We can expect data to exist in that case
+ if not self.data_only:
+ for document_dir in [settings.ORIGINALS_DIR, settings.ARCHIVE_DIR]:
+ if document_dir.exists() and document_dir.is_dir():
+ for entry in document_dir.glob("**/*"):
+ if entry.is_dir():
+ continue
+ self.stdout.write(
+ self.style.WARNING(
+ f"Found file {entry.relative_to(document_dir)}, this might indicate a non-empty installation",
+ ),
+ )
+ break
+ # But existing users or other data still matters in a data only
+ if (
+ User.objects.exclude(username__in=["consumer", "AnonymousUser"]).count()
+ != 0
+ ):
+ self.stdout.write(
+ self.style.WARNING(
+ "Found existing user(s), this might indicate a non-empty installation",
+ ),
+ )
+ if Document.objects.count() != 0:
+ self.stdout.write(
+ self.style.WARNING(
+ "Found existing documents(s), this might indicate a non-empty installation",
+ ),
+ )
+
+ def pre_check_manifest_exists():
+ if not (self.source / "manifest.json").exists():
+ raise CommandError(
+ "That directory doesn't appear to contain a manifest.json file.",
+ )
+
if not self.source.exists():
raise CommandError("That path doesn't exist")
if not os.access(self.source, os.R_OK):
raise CommandError("That path doesn't appear to be readable")
- # Skip this check if operating only on the database
- # We can expect data to exist in that case
- if not self.data_only:
- for document_dir in [settings.ORIGINALS_DIR, settings.ARCHIVE_DIR]:
- if document_dir.exists() and document_dir.is_dir():
- for entry in document_dir.glob("**/*"):
- if entry.is_dir():
- continue
- self.stdout.write(
- self.style.WARNING(
- f"Found file {entry.relative_to(document_dir)}, this might indicate a non-empty installation",
- ),
- )
- break
- if (
- User.objects.exclude(username__in=["consumer", "AnonymousUser"]).count()
- != 0
- ):
- self.stdout.write(
- self.style.WARNING(
- "Found existing user(s), this might indicate a non-empty installation",
- ),
- )
- if Document.objects.count() != 0:
- self.stdout.write(
- self.style.WARNING(
- "Found existing documents(s), this might indicate a non-empty installation",
- ),
- )
-
- def handle(self, *args, **options):
- logging.getLogger().handlers[0].level = logging.ERROR
-
- self.source = Path(options["source"]).resolve()
- self.data_only: bool = options["data_only"]
- self.no_progress_bar: bool = options["no_progress_bar"]
-
- self.pre_check()
-
- manifest_paths = []
+ pre_check_maybe_not_empty()
+ pre_check_manifest_exists()
+ def load_manifest_files(self) -> None:
+ """
+ Loads manifest data from the various JSON files for parsing and loading the database
+ """
main_manifest_path = self.source / "manifest.json"
- self._check_manifest_exists(main_manifest_path)
-
with main_manifest_path.open() as infile:
self.manifest = json.load(infile)
- manifest_paths.append(main_manifest_path)
+ self.manifest_paths.append(main_manifest_path)
for file in Path(self.source).glob("**/*-manifest.json"):
with file.open() as infile:
self.manifest += json.load(infile)
- manifest_paths.append(file)
+ self.manifest_paths.append(file)
+
+ def load_metadata(self) -> None:
+ """
+ Loads either just the version information or the version information and extra data
+ Must account for the old style of export as well, with just version.json
+ """
version_path = self.source / "version.json"
+ metadata_path = self.source / "metadata.json"
+ if not version_path.exists() and not metadata_path.exists():
+ self.stdout.write(
+ self.style.NOTICE("No version.json or metadata.json file located"),
+ )
+ return
+
if version_path.exists():
with version_path.open() as infile:
self.version = json.load(infile)["version"]
- # Provide an initial warning if needed to the user
- if self.version != version.__full_version_str__:
+ elif metadata_path.exists():
+ with metadata_path.open() as infile:
+ data = json.load(infile)
+ self.version = data["version"]
+ if not self.passphrase and EXPORTER_CRYPTO_SETTINGS_NAME in data:
+ raise CommandError(
+ "No passphrase was given, but this export contains encrypted fields",
+ )
+ elif EXPORTER_CRYPTO_SETTINGS_NAME in data:
+ self.load_crypt_params(data)
+
+ if self.version and self.version != version.__full_version_str__:
+ self.stdout.write(
+ self.style.WARNING(
+ "Version mismatch: "
+ f"Currently {version.__full_version_str__},"
+ f" importing {self.version}."
+ " Continuing, but import may fail.",
+ ),
+ )
+
+ def load_data_to_database(self) -> None:
+ """
+ As the name implies, loads data from the JSON file(s) into the database
+ """
+ try:
+ with transaction.atomic():
+ # delete these since pk can change, re-created from import
+ ContentType.objects.all().delete()
+ Permission.objects.all().delete()
+ for manifest_path in self.manifest_paths:
+ call_command("loaddata", manifest_path)
+ except (FieldDoesNotExist, DeserializationError, IntegrityError) as e:
+ self.stdout.write(self.style.ERROR("Database import failed"))
+ if (
+ self.version is not None
+ and self.version != version.__full_version_str__
+ ): # pragma: no cover
self.stdout.write(
- self.style.WARNING(
+ self.style.ERROR(
"Version mismatch: "
f"Currently {version.__full_version_str__},"
- f" importing {self.version}."
- " Continuing, but import may fail.",
+ f" importing {self.version}",
),
)
+ raise e
+ else:
+ self.stdout.write(
+ self.style.ERROR("No version information present"),
+ )
+ raise e
+
+ def handle(self, *args, **options):
+ logging.getLogger().handlers[0].level = logging.ERROR
+
+ self.source = Path(options["source"]).resolve()
+ self.data_only: bool = options["data_only"]
+ self.no_progress_bar: bool = options["no_progress_bar"]
+ self.passphrase: str | None = options.get("passphrase")
+ self.version: Optional[str] = None
+ self.salt: Optional[str] = None
+ self.manifest_paths = []
+ self.manifest = []
+
+ self.pre_check()
+
+ self.load_metadata()
- else:
- self.stdout.write(self.style.NOTICE("No version.json file located"))
+ self.load_manifest_files()
- if not self.data_only:
- self._check_manifest_files_valid()
+ self.check_manifest_validity()
+
+ self.decrypt_secret_fields()
with (
disable_signal(
auditlog.unregister(CustomFieldInstance)
# Fill up the database with whatever is in the manifest
- try:
- with transaction.atomic():
- # delete these since pk can change, re-created from import
- ContentType.objects.all().delete()
- Permission.objects.all().delete()
- for manifest_path in manifest_paths:
- call_command("loaddata", manifest_path)
- except (FieldDoesNotExist, DeserializationError, IntegrityError) as e:
- self.stdout.write(self.style.ERROR("Database import failed"))
- if (
- self.version is not None
- and self.version != version.__full_version_str__
- ):
- self.stdout.write(
- self.style.ERROR(
- "Version mismatch: "
- f"Currently {version.__full_version_str__},"
- f" importing {self.version}",
- ),
- )
- raise e
- else:
- self.stdout.write(
- self.style.ERROR("No version information present"),
- )
- raise e
+ self.load_data_to_database()
if not self.data_only:
self._import_files_from_manifest()
no_progress_bar=self.no_progress_bar,
)
- @staticmethod
- def _check_manifest_exists(path: Path):
- if not path.exists():
- raise CommandError(
- "That directory doesn't appear to contain a manifest.json file.",
- )
-
- def _check_manifest_files_valid(self):
+ def check_manifest_validity(self):
"""
Attempts to verify the manifest is valid. Namely checking the files
referred to exist and the files can be read from
"""
- self.stdout.write("Checking the manifest")
- for record in self.manifest:
- if record["model"] != "documents.document":
- continue
- if EXPORTER_FILE_NAME not in record:
+ def check_document_validity(document_record: dict):
+ if EXPORTER_FILE_NAME not in document_record:
raise CommandError(
"The manifest file contains a record which does not "
"refer to an actual document file.",
)
- doc_file = record[EXPORTER_FILE_NAME]
+ doc_file = document_record[EXPORTER_FILE_NAME]
doc_path: Path = self.source / doc_file
if not doc_path.exists():
raise CommandError(
f"Failed to read from original file {doc_path}",
) from e
- if EXPORTER_ARCHIVE_NAME in record:
- archive_file = record[EXPORTER_ARCHIVE_NAME]
+ if EXPORTER_ARCHIVE_NAME in document_record:
+ archive_file = document_record[EXPORTER_ARCHIVE_NAME]
doc_archive_path: Path = self.source / archive_file
if not doc_archive_path.exists():
raise CommandError(
f"Failed to read from archive file {doc_archive_path}",
) from e
+ self.stdout.write("Checking the manifest")
+ for record in self.manifest:
+ # Only check if the document files exist if this is not data only
+ # We don't care about documents for a data only import
+ if not self.data_only and record["model"] == "documents.document":
+ check_document_validity(record)
+
def _import_files_from_manifest(self):
settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True)
settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True)
copy_file_with_basic_stats(archive_path, document.archive_path)
document.save()
+
+ def decrypt_secret_fields(self) -> None:
+ """
+ The converse decryption of some fields out of the export before importing to database
+ """
+ if self.passphrase:
+ # Salt has been loaded from metadata.json at this point, so it cannot be None
+ self.setup_crypto(passphrase=self.passphrase, salt=self.salt)
+
+ had_at_least_one_record = False
+
+ for crypt_config in self.CRYPT_FIELDS:
+ importer_model = crypt_config["model_name"]
+ crypt_fields = crypt_config["fields"]
+ for record in filter(
+ lambda x: x["model"] == importer_model,
+ self.manifest,
+ ):
+ had_at_least_one_record = True
+ for field in crypt_fields:
+ record["fields"][field] = self.decrypt_string(
+ value=record["fields"][field],
+ )
+
+ if had_at_least_one_record:
+ # It's annoying, but the DB is loaded from the JSON directly
+ # Maybe could change that in the future?
+ (self.source / "manifest.json").write_text(
+ json.dumps(self.manifest, indent=2, ensure_ascii=False),
+ )
+import base64
import os
from argparse import ArgumentParser
+from typing import Optional
+from typing import TypedDict
+from typing import Union
+from cryptography.fernet import Fernet
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from django.core.management import CommandError
+from documents.settings import EXPORTER_CRYPTO_ALGO_NAME
+from documents.settings import EXPORTER_CRYPTO_KEY_ITERATIONS_NAME
+from documents.settings import EXPORTER_CRYPTO_KEY_SIZE_NAME
+from documents.settings import EXPORTER_CRYPTO_SALT_NAME
+from documents.settings import EXPORTER_CRYPTO_SETTINGS_NAME
+
+
+class CryptFields(TypedDict):
+ exporter_key: str
+ model_name: str
+ fields: list[str]
+
class MultiProcessMixin:
"""
def handle_progress_bar_mixin(self, *args, **options):
self.no_progress_bar = options["no_progress_bar"]
self.use_progress_bar = not self.no_progress_bar
+
+
+class CryptMixin:
+ """
+ Fully based on:
+ https://cryptography.io/en/latest/fernet/#using-passwords-with-fernet
+
+ To encrypt:
+ 1. Call setup_crypto providing the user provided passphrase
+ 2. Call encrypt_string with a value
+ 3. Store the returned hexadecimal representation of the value
+
+ To decrypt:
+ 1. Load the required parameters:
+ a. key iterations
+ b. key size
+ c. key algorithm
+ 2. Call setup_crypto providing the user provided passphrase and stored salt
+ 3. Call decrypt_string with a value
+ 4. Use the returned value
+
+ """
+
+ # This matches to Django's default for now
+ # https://github.com/django/django/blob/adae61942/django/contrib/auth/hashers.py#L315
+
+ # Set the defaults to be used during export
+ # During import, these are overridden from the loaded values to ensure decryption is possible
+ key_iterations = 1_000_000
+ salt_size = 16
+ key_size = 32
+ kdf_algorithm = "pbkdf2_sha256"
+
+ CRYPT_FIELDS: CryptFields = [
+ {
+ "exporter_key": "mail_accounts",
+ "model_name": "paperless_mail.mailaccount",
+ "fields": [
+ "password",
+ ],
+ },
+ ]
+
+ def get_crypt_params(self) -> dict[str, dict[str, Union[str, int]]]:
+ return {
+ EXPORTER_CRYPTO_SETTINGS_NAME: {
+ EXPORTER_CRYPTO_ALGO_NAME: self.kdf_algorithm,
+ EXPORTER_CRYPTO_KEY_ITERATIONS_NAME: self.key_iterations,
+ EXPORTER_CRYPTO_KEY_SIZE_NAME: self.key_size,
+ EXPORTER_CRYPTO_SALT_NAME: self.salt,
+ },
+ }
+
+ def load_crypt_params(self, metadata: dict):
+ # Load up the values for setting up decryption
+ self.kdf_algorithm: str = metadata[EXPORTER_CRYPTO_SETTINGS_NAME][
+ EXPORTER_CRYPTO_ALGO_NAME
+ ]
+ self.key_iterations: int = metadata[EXPORTER_CRYPTO_SETTINGS_NAME][
+ EXPORTER_CRYPTO_KEY_ITERATIONS_NAME
+ ]
+ self.key_size: int = metadata[EXPORTER_CRYPTO_SETTINGS_NAME][
+ EXPORTER_CRYPTO_KEY_SIZE_NAME
+ ]
+ self.salt: str = metadata[EXPORTER_CRYPTO_SETTINGS_NAME][
+ EXPORTER_CRYPTO_SALT_NAME
+ ]
+
+ def setup_crypto(self, *, passphrase: str, salt: Optional[str] = None):
+ """
+ Constructs a class for encryption or decryption using the specified passphrase and salt
+
+ Salt is assumed to be a hexadecimal representation of a cryptographically secure random byte string.
+ If not provided, it will be derived from the system secure random
+ """
+ self.salt = salt or os.urandom(self.salt_size).hex()
+
+ # Derive the KDF based on loaded settings
+ if self.kdf_algorithm == "pbkdf2_sha256":
+ kdf = PBKDF2HMAC(
+ algorithm=hashes.SHA256(),
+ length=self.key_size,
+ salt=bytes.fromhex(self.salt),
+ iterations=self.key_iterations,
+ )
+ else: # pragma: no cover
+ raise CommandError(
+ f"{self.kdf_algorithm} is an unknown key derivation function",
+ )
+
+ key = base64.urlsafe_b64encode(kdf.derive(passphrase.encode("utf-8")))
+
+ self.fernet = Fernet(key)
+
+ def encrypt_string(self, *, value: str) -> str:
+ """
+ Given a string value, encrypts it and returns the hexadecimal representation of the encrypted token
+
+ """
+ return self.fernet.encrypt(value.encode("utf-8")).hex()
+
+ def decrypt_string(self, *, value: str) -> str:
+ """
+ Given a string value, decrypts it and returns the original value of the field
+ """
+ return self.fernet.decrypt(bytes.fromhex(value)).decode("utf-8")
EXPORTER_FILE_NAME = "__exported_file_name__"
EXPORTER_THUMBNAIL_NAME = "__exported_thumbnail_name__"
EXPORTER_ARCHIVE_NAME = "__exported_archive_name__"
+
+EXPORTER_CRYPTO_SETTINGS_NAME = "__crypto__"
+EXPORTER_CRYPTO_SALT_NAME = "__salt_hex__"
+EXPORTER_CRYPTO_KEY_ITERATIONS_NAME = "__key_iters__"
+EXPORTER_CRYPTO_KEY_SIZE_NAME = "__key_size__"
+EXPORTER_CRYPTO_ALGO_NAME = "__key_algo__"
import os
import shutil
import tempfile
+from io import StringIO
from pathlib import Path
from unittest import mock
from zipfile import ZipFile
from documents.tests.utils import FileSystemAssertsMixin
from documents.tests.utils import SampleDirMixin
from documents.tests.utils import paperless_environment
+from paperless_mail.models import MailAccount
class TestExportImport(
with ZipFile(expected_file) as zip:
self.assertEqual(len(zip.namelist()), 11)
self.assertIn("manifest.json", zip.namelist())
- self.assertIn("version.json", zip.namelist())
+ self.assertIn("metadata.json", zip.namelist())
@override_settings(PASSPHRASE="test")
def test_export_zipped_format(self):
# Extras are from the directories, which also appear in the listing
self.assertEqual(len(zip.namelist()), 14)
self.assertIn("manifest.json", zip.namelist())
- self.assertIn("version.json", zip.namelist())
+ self.assertIn("metadata.json", zip.namelist())
@override_settings(PASSPHRASE="test")
def test_export_zipped_with_delete(self):
with ZipFile(expected_file) as zip:
self.assertEqual(len(zip.namelist()), 11)
self.assertIn("manifest.json", zip.namelist())
- self.assertIn("version.json", zip.namelist())
+ self.assertIn("metadata.json", zip.namelist())
def test_export_target_not_exists(self):
"""
# Manifest and version files only should be present in the exported directory
self.assertFileCountInDir(self.target, 2)
self.assertIsFile(self.target / "manifest.json")
- self.assertIsFile(self.target / "version.json")
+ self.assertIsFile(self.target / "metadata.json")
shutil.rmtree(self.dirs.media_dir / "documents")
Document.objects.all().delete()
)
self.assertEqual(Document.objects.all().count(), 4)
+
+
+class TestCryptExportImport(
+ DirectoriesMixin,
+ FileSystemAssertsMixin,
+ TestCase,
+):
+ def setUp(self) -> None:
+ self.target = Path(tempfile.mkdtemp())
+ return super().setUp()
+
+ def tearDown(self) -> None:
+ shutil.rmtree(self.target, ignore_errors=True)
+ return super().tearDown()
+
+ def test_export_passphrase(self):
+ """
+ GIVEN:
+ - A mail account exists
+ WHEN:
+ - Export command is called
+ - Passphrase is provided
+ THEN:
+ - Output password is not plaintext
+ """
+ MailAccount.objects.create(
+ name="Test Account",
+ imap_server="test.imap.com",
+ username="myusername",
+ password="mypassword",
+ )
+
+ call_command(
+ "document_exporter",
+ "--no-progress-bar",
+ "--passphrase",
+ "securepassword",
+ self.target,
+ )
+
+ self.assertIsFile(self.target / "metadata.json")
+ self.assertIsFile(self.target / "manifest.json")
+
+ data = json.loads((self.target / "manifest.json").read_text())
+
+ mail_accounts = list(
+ filter(lambda r: r["model"] == "paperless_mail.mailaccount", data),
+ )
+
+ self.assertEqual(len(mail_accounts), 1)
+
+ mail_account_data = mail_accounts[0]
+
+ self.assertNotEqual(mail_account_data["fields"]["password"], "mypassword")
+
+ MailAccount.objects.all().delete()
+
+ call_command(
+ "document_importer",
+ "--no-progress-bar",
+ "--passphrase",
+ "securepassword",
+ self.target,
+ )
+
+ account = MailAccount.objects.first()
+
+ self.assertIsNotNone(account)
+ self.assertEqual(account.password, "mypassword")
+
+ def test_import_crypt_no_passphrase(self):
+ """
+ GIVEN:
+ - A mail account exists
+ WHEN:
+ - Export command is called
+ - Passphrase is provided
+ - Import command is called
+ - No passphrase is given
+ THEN:
+ - An error is raised for the issue
+ """
+ call_command(
+ "document_exporter",
+ "--no-progress-bar",
+ "--passphrase",
+ "securepassword",
+ self.target,
+ )
+
+ with self.assertRaises(CommandError) as err:
+ call_command(
+ "document_importer",
+ "--no-progress-bar",
+ self.target,
+ )
+ self.assertEqual(
+ err.msg,
+ "No passphrase was given, but this export contains encrypted fields",
+ )
+
+ def test_export_warn_plaintext(self):
+ """
+ GIVEN:
+ - A mail account exists
+ WHEN:
+ - Export command is called
+ - No passphrase is provided
+ THEN:
+ - Output password is plaintext
+ - Warning is output
+ """
+ MailAccount.objects.create(
+ name="Test Account",
+ imap_server="test.imap.com",
+ username="myusername",
+ password="mypassword",
+ )
+
+ stdout = StringIO()
+
+ call_command(
+ "document_exporter",
+ "--no-progress-bar",
+ str(self.target),
+ stdout=stdout,
+ )
+ stdout.seek(0)
+ self.assertIn(
+ (
+ "You have configured mail accounts, "
+ "but no passphrase was given. "
+ "Passwords will be in plaintext"
+ ),
+ stdout.read(),
+ )
EXPORTER_ARCHIVE_NAME: "archive.pdf",
},
]
+ cmd.data_only = False
with self.assertRaises(CommandError) as cm:
- cmd._check_manifest_files_valid()
+ cmd.check_manifest_validity()
self.assertInt("Failed to read from original file", str(cm.exception))
original_path.chmod(0o444)
archive_path.chmod(0o222)
with self.assertRaises(CommandError) as cm:
- cmd._check_manifest_files_valid()
+ cmd.check_manifest_validity()
self.assertInt("Failed to read from archive file", str(cm.exception))
def test_import_source_not_existing(self):
stdout.seek(0)
self.assertIn(
"Found existing user(s), this might indicate a non-empty installation",
- str(stdout.read()),
+ stdout.read(),
)
def test_import_with_documents_exists(self):
"Found existing documents(s), this might indicate a non-empty installation",
str(stdout.read()),
)
+
+ def test_import_no_metadata_or_version_file(self):
+ """
+ GIVEN:
+ - A source directory with a manifest file only
+ WHEN:
+ - An import is attempted
+ THEN:
+ - Warning about the missing files is output
+ """
+ stdout = StringIO()
+
+ (self.dirs.scratch_dir / "manifest.json").touch()
+
+ # We're not building a manifest, so it fails, but this test doesn't care
+ with self.assertRaises(json.decoder.JSONDecodeError):
+ call_command(
+ "document_importer",
+ "--no-progress-bar",
+ str(self.dirs.scratch_dir),
+ stdout=stdout,
+ )
+ stdout.seek(0)
+ stdout_str = str(stdout.read())
+
+ self.assertIn("No version.json or metadata.json file located", stdout_str)
+
+ def test_import_version_file(self):
+ """
+ GIVEN:
+ - A source directory with a manifest file and version file
+ WHEN:
+ - An import is attempted
+ THEN:
+ - Warning about the the version mismatch is output
+ """
+ stdout = StringIO()
+
+ (self.dirs.scratch_dir / "manifest.json").touch()
+ (self.dirs.scratch_dir / "version.json").write_text(
+ json.dumps({"version": "2.8.1"}),
+ )
+
+ # We're not building a manifest, so it fails, but this test doesn't care
+ with self.assertRaises(json.decoder.JSONDecodeError):
+ call_command(
+ "document_importer",
+ "--no-progress-bar",
+ str(self.dirs.scratch_dir),
+ stdout=stdout,
+ )
+ stdout.seek(0)
+ stdout_str = str(stdout.read())
+
+ self.assertIn("Version mismatch:", stdout_str)
+ self.assertIn("importing 2.8.1", stdout_str)
env =
PAPERLESS_DISABLE_DBHANDLER=true
PAPERLESS_CACHE_BACKEND=django.core.cache.backends.locmem.LocMemCache
+norecursedirs = locale/*
[coverage:run]
source =