CompressionStatistics
LocalStorage::get_compression_statistics(
- const ProgressReceiver& progress_receiver) const
+ const uint32_t threads, const ProgressReceiver& progress_receiver) const
{
- CompressionStatistics cs{};
+ std::atomic<uint64_t> content_size{0};
+ std::atomic<uint64_t> actual_size{0};
+ std::atomic<uint64_t> incompressible_size{0};
+ std::atomic<uint32_t> completed_dirs{0};
- for_each_cache_subdir(
- progress_receiver,
- [&](const auto& l1_index, const auto& l1_progress_receiver) {
- for_each_cache_subdir(
- l1_progress_receiver,
- [&](const auto& l2_index, const auto& l2_progress_receiver) {
- auto l2_dir = get_subdir(l1_index, l2_index);
- const auto files = get_cache_dir_files(l2_dir);
- l2_progress_receiver(0.2);
+ const size_t read_ahead =
+ std::max(static_cast<size_t>(10), 2 * static_cast<size_t>(threads));
+ util::ThreadPool thread_pool(threads, read_ahead);
- for (size_t i = 0; i < files.size(); ++i) {
- const auto& cache_file = files[i];
- try {
- core::CacheEntry::Header header(cache_file.path());
- cs.actual_size += cache_file.size_on_disk();
- cs.content_size += util::likely_size_on_disk(header.entry_size);
- } catch (core::Error&) {
- cs.incompressible_size += cache_file.size_on_disk();
- }
- l2_progress_receiver(0.2 + 0.8 * ratio(i, files.size()));
+ std::vector<std::future<void>> futures;
+ futures.reserve(256);
+
+ for_each_cache_subdir([&](uint8_t l1_index) {
+ for_each_cache_subdir([&](uint8_t l2_index) {
+ futures.push_back(thread_pool.enqueue([&, l1_index, l2_index] {
+ auto l2_dir = get_subdir(l1_index, l2_index);
+ const auto files = get_cache_dir_files(l2_dir);
+
+ uint64_t local_content_size = 0;
+ uint64_t local_actual_size = 0;
+ uint64_t local_incompressible_size = 0;
+
+ for (const auto& cache_file : files) {
+ try {
+ core::CacheEntry::Header header(cache_file.path());
+ local_actual_size += cache_file.size_on_disk();
+ local_content_size += util::likely_size_on_disk(header.entry_size);
+ } catch (core::Error&) {
+ local_incompressible_size += cache_file.size_on_disk();
}
- });
+ }
+
+ // Atomic updates (fewer atomic ops by accumulating locally first).
+ content_size += local_content_size;
+ actual_size += local_actual_size;
+ incompressible_size += local_incompressible_size;
+ ++completed_dirs;
+
+ progress_receiver(completed_dirs / 256.0);
+ }));
});
+ });
+
+ for (auto& future : futures) {
+ future.get();
+ }
+
+ thread_pool.shut_down();
- return cs;
+ return CompressionStatistics{
+ content_size.load(), actual_size.load(), incompressible_size.load()};
}
void