import tempfile
import time
from pathlib import Path
+from typing import TYPE_CHECKING
from typing import Optional
import tqdm
from guardian.models import GroupObjectPermission
from guardian.models import UserObjectPermission
+if TYPE_CHECKING:
+ from django.db.models import QuerySet
+
if settings.AUDIT_LOG_ENABLED:
from auditlog.models import LogEntry
help="Sets the export zip file name",
)
+ parser.add_argument(
+ "--data-only",
+ default=False,
+ action="store_true",
+ help="If set, only the database will be imported, not files",
+ )
+
parser.add_argument(
"--no-progress-bar",
default=False,
help="If set, the progress bar will not be shown",
)
- def __init__(self, *args, **kwargs):
- BaseCommand.__init__(self, *args, **kwargs)
- self.target: Path = None
- self.split_manifest = False
- self.files_in_export_dir: set[Path] = set()
- self.exported_files: list[Path] = []
- self.compare_checksums = False
- self.use_filename_format = False
- self.use_folder_prefix = False
- self.delete = False
- self.no_archive = False
- self.no_thumbnail = False
-
def handle(self, *args, **options):
self.target = Path(options["target"]).resolve()
self.split_manifest: bool = options["split_manifest"]
self.no_archive: bool = options["no_archive"]
self.no_thumbnail: bool = options["no_thumbnail"]
self.zip_export: bool = options["zip"]
+ self.data_only: bool = options["data_only"]
+ self.no_progress_bar: bool = options["no_progress_bar"]
+
+ self.files_in_export_dir: set[Path] = set()
+ self.exported_files: set[str] = set()
# If zipping, save the original target for later and
# get a temporary directory for the target instead
temp_dir = None
- self.original_target: Optional[Path] = None
+ self.original_target = self.target
if self.zip_export:
- self.original_target = self.target
-
settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True)
temp_dir = tempfile.TemporaryDirectory(
dir=settings.SCRATCH_DIR,
raise CommandError("That path doesn't appear to be writable")
try:
+ # Prevent any ongoing changes in the documents
with FileLock(settings.MEDIA_LOCK):
- self.dump(options["no_progress_bar"])
+ self.dump()
# We've written everything to the temporary directory in this case,
# now make an archive in the original target, with all files stored
- if self.zip_export:
+ if self.zip_export and temp_dir is not None:
shutil.make_archive(
os.path.join(
self.original_target,
if self.zip_export and temp_dir is not None:
temp_dir.cleanup()
- def dump(self, progress_bar_disable=False):
+ def dump(self):
# 1. Take a snapshot of what files exist in the current export folder
for x in self.target.glob("**/*"):
if x.is_file():
# 2. Create manifest, containing all correspondents, types, tags, storage paths
# note, documents and ui_settings
- with transaction.atomic():
- manifest = json.loads(
- serializers.serialize("json", Correspondent.objects.all()),
- )
-
- manifest += json.loads(serializers.serialize("json", Tag.objects.all()))
-
- manifest += json.loads(
- serializers.serialize("json", DocumentType.objects.all()),
- )
-
- manifest += json.loads(
- serializers.serialize("json", StoragePath.objects.all()),
- )
-
- manifest += json.loads(
- serializers.serialize("json", MailAccount.objects.all()),
- )
-
- manifest += json.loads(
- serializers.serialize("json", MailRule.objects.all()),
- )
-
- manifest += json.loads(
- serializers.serialize("json", SavedView.objects.all()),
- )
-
- manifest += json.loads(
- serializers.serialize("json", SavedViewFilterRule.objects.all()),
- )
-
- manifest += json.loads(serializers.serialize("json", Group.objects.all()))
-
- manifest += json.loads(
- serializers.serialize(
- "json",
- User.objects.exclude(username__in=["consumer", "AnonymousUser"]),
- ),
- )
-
- manifest += json.loads(
- serializers.serialize("json", UiSettings.objects.all()),
- )
-
- manifest += json.loads(
- serializers.serialize("json", ContentType.objects.all()),
- )
-
- manifest += json.loads(
- serializers.serialize("json", Permission.objects.all()),
- )
-
- manifest += json.loads(
- serializers.serialize("json", UserObjectPermission.objects.all()),
- )
-
- manifest += json.loads(
- serializers.serialize("json", GroupObjectPermission.objects.all()),
- )
-
- manifest += json.loads(
- serializers.serialize("json", WorkflowTrigger.objects.all()),
- )
-
- manifest += json.loads(
- serializers.serialize("json", WorkflowAction.objects.all()),
- )
-
- manifest += json.loads(
- serializers.serialize("json", Workflow.objects.all()),
- )
+ manifest_key_to_object_query: dict[str, QuerySet] = {
+ "correspondents": Correspondent.objects.all(),
+ "tags": Tag.objects.all(),
+ "document_types": DocumentType.objects.all(),
+ "storage_paths": StoragePath.objects.all(),
+ "mail_accounts": MailAccount.objects.all(),
+ "mail_rules": MailRule.objects.all(),
+ "saved_views": SavedView.objects.all(),
+ "saved_view_filter_rules": SavedViewFilterRule.objects.all(),
+ "groups": Group.objects.all(),
+ "users": User.objects.exclude(
+ username__in=["consumer", "AnonymousUser"],
+ ).all(),
+ "ui_settings": UiSettings.objects.all(),
+ "content_types": ContentType.objects.all(),
+ "permissions": Permission.objects.all(),
+ "user_object_permissions": UserObjectPermission.objects.all(),
+ "group_object_permissions": GroupObjectPermission.objects.all(),
+ "workflow_triggers": WorkflowTrigger.objects.all(),
+ "workflow_actions": WorkflowAction.objects.all(),
+ "workflows": Workflow.objects.all(),
+ "custom_fields": CustomField.objects.all(),
+ "custom_field_instances": CustomFieldInstance.objects.all(),
+ "app_configs": ApplicationConfiguration.objects.all(),
+ "notes": Note.objects.all(),
+ "documents": Document.objects.order_by("id").all(),
+ }
+
+ if settings.AUDIT_LOG_ENABLED:
+ manifest_key_to_object_query["log_entries"] = LogEntry.objects.all()
- manifest += json.loads(
- serializers.serialize("json", CustomField.objects.all()),
- )
-
- manifest += json.loads(
- serializers.serialize("json", ApplicationConfiguration.objects.all()),
- )
+ with transaction.atomic():
+ manifest_dict = {}
- if settings.AUDIT_LOG_ENABLED:
- manifest += json.loads(
- serializers.serialize("json", LogEntry.objects.all()),
+ # Build an overall manifest
+ for key in manifest_key_to_object_query:
+ manifest_dict[key] = json.loads(
+ serializers.serialize("json", manifest_key_to_object_query[key]),
)
# These are treated specially and included in the per-document manifest
# if that setting is enabled. Otherwise, they are just exported to the bulk
# manifest
- documents = Document.objects.order_by("id")
- document_map: dict[int, Document] = {d.pk: d for d in documents}
- document_manifest = json.loads(serializers.serialize("json", documents))
-
- notes = json.loads(
- serializers.serialize("json", Note.objects.all()),
- )
-
- custom_field_instances = json.loads(
- serializers.serialize("json", CustomFieldInstance.objects.all()),
- )
- if not self.split_manifest:
- manifest += document_manifest
- manifest += notes
- manifest += custom_field_instances
+ document_map: dict[int, Document] = {
+ d.pk: d for d in manifest_key_to_object_query["documents"]
+ }
+ document_manifest = manifest_dict["documents"]
# 3. Export files from each document
for index, document_dict in tqdm.tqdm(
enumerate(document_manifest),
total=len(document_manifest),
- disable=progress_bar_disable,
+ disable=self.no_progress_bar,
):
# 3.1. store files unencrypted
document_dict["fields"]["storage_type"] = Document.STORAGE_TYPE_UNENCRYPTED
document = document_map[document_dict["pk"]]
# 3.2. generate a unique filename
- filename_counter = 0
- while True:
- if self.use_filename_format:
- base_name = generate_filename(
- document,
- counter=filename_counter,
- append_gpg=False,
- )
- else:
- base_name = document.get_public_filename(counter=filename_counter)
-
- if base_name not in self.exported_files:
- self.exported_files.append(base_name)
- break
- else:
- filename_counter += 1
+ base_name = self.generate_base_name(document)
# 3.3. write filenames into manifest
- original_name = base_name
- if self.use_folder_prefix:
- original_name = os.path.join("originals", original_name)
- original_target = (self.target / Path(original_name)).resolve()
- document_dict[EXPORTER_FILE_NAME] = original_name
-
- if not self.no_thumbnail:
- thumbnail_name = base_name + "-thumbnail.webp"
- if self.use_folder_prefix:
- thumbnail_name = os.path.join("thumbnails", thumbnail_name)
- thumbnail_target = (self.target / Path(thumbnail_name)).resolve()
- document_dict[EXPORTER_THUMBNAIL_NAME] = thumbnail_name
- else:
- thumbnail_target = None
-
- if not self.no_archive and document.has_archive_version:
- archive_name = base_name + "-archive.pdf"
- if self.use_folder_prefix:
- archive_name = os.path.join("archive", archive_name)
- archive_target = (self.target / Path(archive_name)).resolve()
- document_dict[EXPORTER_ARCHIVE_NAME] = archive_name
- else:
- archive_target = None
+ original_target, thumbnail_target, archive_target = (
+ self.generate_document_targets(document, base_name, document_dict)
+ )
# 3.4. write files to target folder
- if document.storage_type == Document.STORAGE_TYPE_GPG:
- t = int(time.mktime(document.created.timetuple()))
-
- original_target.parent.mkdir(parents=True, exist_ok=True)
- with document.source_file as out_file:
- original_target.write_bytes(GnuPG.decrypted(out_file))
- os.utime(original_target, times=(t, t))
-
- if thumbnail_target:
- thumbnail_target.parent.mkdir(parents=True, exist_ok=True)
- with document.thumbnail_file as out_file:
- thumbnail_target.write_bytes(GnuPG.decrypted(out_file))
- os.utime(thumbnail_target, times=(t, t))
-
- if archive_target:
- archive_target.parent.mkdir(parents=True, exist_ok=True)
- with document.archive_path as out_file:
- archive_target.write_bytes(GnuPG.decrypted(out_file))
- os.utime(archive_target, times=(t, t))
- else:
- self.check_and_copy(
- document.source_path,
- document.checksum,
+ if not self.data_only:
+ self.copy_document_files(
+ document,
original_target,
+ thumbnail_target,
+ archive_target,
)
- if thumbnail_target:
- self.check_and_copy(document.thumbnail_path, None, thumbnail_target)
-
- if archive_target:
- self.check_and_copy(
- document.archive_path,
- document.archive_checksum,
- archive_target,
- )
-
if self.split_manifest:
- manifest_name = base_name + "-manifest.json"
+ manifest_name = Path(base_name + "-manifest.json")
if self.use_folder_prefix:
- manifest_name = os.path.join("json", manifest_name)
- manifest_name = (self.target / Path(manifest_name)).resolve()
+ manifest_name = Path("json") / manifest_name
+ manifest_name = (self.target / manifest_name).resolve()
manifest_name.parent.mkdir(parents=True, exist_ok=True)
content = [document_manifest[index]]
content += list(
filter(
lambda d: d["fields"]["document"] == document_dict["pk"],
- notes,
+ manifest_dict["notes"],
),
)
content += list(
filter(
lambda d: d["fields"]["document"] == document_dict["pk"],
- custom_field_instances,
+ manifest_dict["custom_field_instances"],
),
)
manifest_name.write_text(
if manifest_name in self.files_in_export_dir:
self.files_in_export_dir.remove(manifest_name)
- # 4.1 write manifest to target folder
- manifest_path = (self.target / Path("manifest.json")).resolve()
+ # These were exported already
+ if self.split_manifest:
+ del manifest_dict["documents"]
+ del manifest_dict["notes"]
+ del manifest_dict["custom_field_instances"]
+
+ # 4.1 write primary manifest to target folder
+ manifest = []
+ for key in manifest_dict:
+ manifest.extend(manifest_dict[key])
+ manifest_path = (self.target / "manifest.json").resolve()
manifest_path.write_text(
json.dumps(manifest, indent=2, ensure_ascii=False),
encoding="utf-8",
self.files_in_export_dir.remove(manifest_path)
# 4.2 write version information to target folder
- version_path = (self.target / Path("version.json")).resolve()
+ version_path = (self.target / "version.json").resolve()
version_path.write_text(
json.dumps(
{"version": version.__full_version_str__},
else:
item.unlink()
- def check_and_copy(self, source, source_checksum, target: Path):
+ def generate_base_name(self, document: Document) -> str:
+ """
+ Generates a unique name for the document, one which hasn't already been exported (or will be)
+ """
+ filename_counter = 0
+ while True:
+ if self.use_filename_format:
+ base_name = generate_filename(
+ document,
+ counter=filename_counter,
+ append_gpg=False,
+ )
+ else:
+ base_name = document.get_public_filename(counter=filename_counter)
+
+ if base_name not in self.exported_files:
+ self.exported_files.add(base_name)
+ break
+ else:
+ filename_counter += 1
+ return base_name
+
+ def generate_document_targets(
+ self,
+ document: Document,
+ base_name: str,
+ document_dict: dict,
+ ) -> tuple[Path, Optional[Path], Optional[Path]]:
+ """
+ Generates the targets for a given document, including the original file, archive file and thumbnail (depending on settings).
+ """
+ original_name = base_name
+ if self.use_folder_prefix:
+ original_name = os.path.join("originals", original_name)
+ original_target = (self.target / Path(original_name)).resolve()
+ document_dict[EXPORTER_FILE_NAME] = original_name
+
+ if not self.no_thumbnail:
+ thumbnail_name = base_name + "-thumbnail.webp"
+ if self.use_folder_prefix:
+ thumbnail_name = os.path.join("thumbnails", thumbnail_name)
+ thumbnail_target = (self.target / Path(thumbnail_name)).resolve()
+ document_dict[EXPORTER_THUMBNAIL_NAME] = thumbnail_name
+ else:
+ thumbnail_target = None
+
+ if not self.no_archive and document.has_archive_version:
+ archive_name = base_name + "-archive.pdf"
+ if self.use_folder_prefix:
+ archive_name = os.path.join("archive", archive_name)
+ archive_target = (self.target / Path(archive_name)).resolve()
+ document_dict[EXPORTER_ARCHIVE_NAME] = archive_name
+ else:
+ archive_target = None
+
+ return original_target, thumbnail_target, archive_target
+
+ def copy_document_files(
+ self,
+ document: Document,
+ original_target: Path,
+ thumbnail_target: Optional[Path],
+ archive_target: Optional[Path],
+ ) -> None:
+ """
+ Copies files from the document storage location to the specified target location.
+
+ If the document is encrypted, the files are decrypted before copying them to the target location.
+ """
+ if document.storage_type == Document.STORAGE_TYPE_GPG:
+ t = int(time.mktime(document.created.timetuple()))
+
+ original_target.parent.mkdir(parents=True, exist_ok=True)
+ with document.source_file as out_file:
+ original_target.write_bytes(GnuPG.decrypted(out_file))
+ os.utime(original_target, times=(t, t))
+
+ if thumbnail_target:
+ thumbnail_target.parent.mkdir(parents=True, exist_ok=True)
+ with document.thumbnail_file as out_file:
+ thumbnail_target.write_bytes(GnuPG.decrypted(out_file))
+ os.utime(thumbnail_target, times=(t, t))
+
+ if archive_target:
+ archive_target.parent.mkdir(parents=True, exist_ok=True)
+ if TYPE_CHECKING:
+ assert isinstance(document.archive_path, Path)
+ with document.archive_path as out_file:
+ archive_target.write_bytes(GnuPG.decrypted(out_file))
+ os.utime(archive_target, times=(t, t))
+ else:
+ self.check_and_copy(
+ document.source_path,
+ document.checksum,
+ original_target,
+ )
+
+ if thumbnail_target:
+ self.check_and_copy(document.thumbnail_path, None, thumbnail_target)
+
+ if archive_target:
+ if TYPE_CHECKING:
+ assert isinstance(document.archive_path, Path)
+ self.check_and_copy(
+ document.archive_path,
+ document.archive_checksum,
+ archive_target,
+ )
+
+ def check_and_copy(
+ self,
+ source: Path,
+ source_checksum: Optional[str],
+ target: Path,
+ ):
+ """
+ Copies the source to the target, if target doesn't exist or the target doesn't seem to match
+ the source attributes
+ """
+
+ target = target.resolve()
if target in self.files_in_export_dir:
self.files_in_export_dir.remove(target)
def add_arguments(self, parser):
parser.add_argument("source")
+
parser.add_argument(
"--no-progress-bar",
default=False,
help="If set, the progress bar will not be shown",
)
- def __init__(self, *args, **kwargs):
- BaseCommand.__init__(self, *args, **kwargs)
- self.source = None
- self.manifest = None
- self.version = None
+ parser.add_argument(
+ "--data-only",
+ default=False,
+ action="store_true",
+ help="If set, only the database will be exported, not files",
+ )
def pre_check(self) -> None:
"""
if not os.access(self.source, os.R_OK):
raise CommandError("That path doesn't appear to be readable")
- for document_dir in [settings.ORIGINALS_DIR, settings.ARCHIVE_DIR]:
- if document_dir.exists() and document_dir.is_dir():
- for entry in document_dir.glob("**/*"):
- if entry.is_dir():
- continue
- self.stdout.write(
- self.style.WARNING(
- f"Found file {entry.relative_to(document_dir)}, this might indicate a non-empty installation",
- ),
- )
- break
+ # Skip this check if operating only on the database
+ # We can expect data to exist in that case
+ if not self.data_only:
+ for document_dir in [settings.ORIGINALS_DIR, settings.ARCHIVE_DIR]:
+ if document_dir.exists() and document_dir.is_dir():
+ for entry in document_dir.glob("**/*"):
+ if entry.is_dir():
+ continue
+ self.stdout.write(
+ self.style.WARNING(
+ f"Found file {entry.relative_to(document_dir)}, this might indicate a non-empty installation",
+ ),
+ )
+ break
if (
User.objects.exclude(username__in=["consumer", "AnonymousUser"]).count()
!= 0
logging.getLogger().handlers[0].level = logging.ERROR
self.source = Path(options["source"]).resolve()
+ self.data_only: bool = options["data_only"]
+ self.no_progress_bar: bool = options["no_progress_bar"]
self.pre_check()
else:
self.stdout.write(self.style.NOTICE("No version.json file located"))
- self._check_manifest_valid()
+ if not self.data_only:
+ self._check_manifest_files_valid()
with (
disable_signal(
)
raise e
- self._import_files_from_manifest(options["no_progress_bar"])
+ if not self.data_only:
+ self._import_files_from_manifest()
+ else:
+ self.stdout.write(self.style.NOTICE("Data only import completed"))
self.stdout.write("Updating search index...")
call_command(
"document_index",
"reindex",
- no_progress_bar=options["no_progress_bar"],
+ no_progress_bar=self.no_progress_bar,
)
@staticmethod
"That directory doesn't appear to contain a manifest.json file.",
)
- def _check_manifest_valid(self):
+ def _check_manifest_files_valid(self):
"""
Attempts to verify the manifest is valid. Namely checking the files
referred to exist and the files can be read from
)
doc_file = record[EXPORTER_FILE_NAME]
- doc_path = self.source / doc_file
+ doc_path: Path = self.source / doc_file
if not doc_path.exists():
raise CommandError(
f'The manifest file refers to "{doc_file}" which does not '
"appear to be in the source directory.",
)
try:
- with doc_path.open(mode="rb") as infile:
- infile.read(1)
+ with doc_path.open(mode="rb"):
+ pass
except Exception as e:
raise CommandError(
f"Failed to read from original file {doc_path}",
if EXPORTER_ARCHIVE_NAME in record:
archive_file = record[EXPORTER_ARCHIVE_NAME]
- doc_archive_path = self.source / archive_file
+ doc_archive_path: Path = self.source / archive_file
if not doc_archive_path.exists():
raise CommandError(
f"The manifest file refers to {archive_file} which "
f"does not appear to be in the source directory.",
)
try:
- with doc_archive_path.open(mode="rb") as infile:
- infile.read(1)
+ with doc_archive_path.open(mode="rb"):
+ pass
except Exception as e:
raise CommandError(
f"Failed to read from archive file {doc_archive_path}",
) from e
- def _import_files_from_manifest(self, progress_bar_disable):
+ def _import_files_from_manifest(self):
settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True)
settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True)
settings.ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
filter(lambda r: r["model"] == "documents.document", self.manifest),
)
- for record in tqdm.tqdm(manifest_documents, disable=progress_bar_disable):
+ for record in tqdm.tqdm(manifest_documents, disable=self.no_progress_bar):
document = Document.objects.get(pk=record["pk"])
doc_file = record[EXPORTER_FILE_NAME]