#include "Storage.hpp"
+#include <Checksum.hpp>
#include <Config.hpp>
#include <Logging.hpp>
#include <TemporaryFile.hpp>
#include <third_party/url.hpp>
#include <algorithm>
+#include <cmath>
#include <unordered_map>
#include <vector>
return util::join(features, " ");
}
+struct SecondaryStorageShardConfig
+{
+ std::string name;
+ double weight;
+};
+
struct SecondaryStorageConfig
{
+ std::vector<SecondaryStorageShardConfig> shards;
secondary::SecondaryStorage::Backend::Params params;
bool read_only = false;
};
+struct SecondaryStorageBackendEntry
+{
+ Url url; // With expanded "*".
+ std::string url_for_logging; // With expanded "*".
+ std::unique_ptr<secondary::SecondaryStorage::Backend> impl;
+ bool failed = false;
+};
+
struct SecondaryStorageEntry
{
SecondaryStorageConfig config;
- std::string url_for_logging;
+ std::string url_for_logging; // With unexpanded "*".
std::shared_ptr<secondary::SecondaryStorage> storage;
- std::unique_ptr<secondary::SecondaryStorage::Backend> backend;
- bool failed = false;
+ std::vector<SecondaryStorageBackendEntry> backends;
};
static std::string
util::value_or_throw<core::Error>(util::percent_decode(raw_value));
if (key == "read-only" && value == "true") {
result.read_only = true;
+ } else if (key == "shards") {
+ const auto url_str = result.params.url.str();
+ if (url_str.find('*') == std::string::npos) {
+ throw core::Error(R"(Missing "*" in URL when using shards: "{}")",
+ url_str);
+ }
+ for (const auto& shard : util::Tokenizer(value, ",")) {
+ double weight = 1.0;
+ nonstd::string_view name;
+ const auto lp_pos = shard.find('(');
+ if (lp_pos != nonstd::string_view::npos) {
+ if (shard.back() != ')') {
+ throw core::Error("Invalid shard name: \"{}\"", shard);
+ }
+ weight =
+ util::value_or_throw<core::Error>(util::parse_double(std::string(
+ shard.substr(lp_pos + 1, shard.length() - lp_pos - 2))));
+ if (weight < 0.0) {
+ throw core::Error("Invalid shard weight: \"{}\"", weight);
+ }
+ name = shard.substr(0, lp_pos);
+ } else {
+ name = shard;
+ }
+
+ result.shards.push_back({std::string(name), weight});
+ }
}
- result.params.attributes.emplace_back(
- secondary::SecondaryStorage::Backend::Attribute{
- std::string(key), value, std::string(raw_value)});
+
+ result.params.attributes.push_back(
+ {std::string(key), value, std::string(raw_value)});
}
return result;
const bool should_put_in_secondary_storage =
std::any_of(m_secondary_storages.begin(),
m_secondary_storages.end(),
- [](const auto& entry) {
- return !entry->failed && !entry->config.read_only;
- });
+ [](const auto& entry) { return !entry->config.read_only; });
if (should_put_in_secondary_storage) {
std::string value;
try {
throw core::Error("unknown secondary storage URL: {}",
url_for_logging.str());
}
- m_secondary_storages.emplace_back(
- std::make_unique<SecondaryStorageEntry>(SecondaryStorageEntry{
- config, url_for_logging.str(), storage, {}, false}));
+ m_secondary_storages.push_back(std::make_unique<SecondaryStorageEntry>(
+ SecondaryStorageEntry{config, url_for_logging.str(), storage, {}}));
}
}
static void
mark_backend_as_failed(
- SecondaryStorageEntry& entry,
+ SecondaryStorageBackendEntry& backend_entry,
const secondary::SecondaryStorage::Backend::Failure failure)
{
// The backend is expected to log details about the error.
- entry.failed = true;
+ backend_entry.failed = true;
(void)failure; // TODO: Update statistics.
}
-static bool
-backend_is_available(SecondaryStorageEntry& entry,
- nonstd::string_view operation_description,
- const bool for_writing)
+static double
+to_half_open_unit_interval(uint64_t value)
{
- if (for_writing && entry.config.read_only) {
- LOG("Not {} {} since it is read-only",
- operation_description,
- entry.url_for_logging);
- return false;
+ constexpr uint8_t double_significand_bits = 53;
+ constexpr uint64_t denominator = 1ULL << double_significand_bits;
+ constexpr uint64_t mask = denominator - 1;
+ return static_cast<double>(value & mask) / denominator;
+}
+
+static Url
+get_shard_url(const Digest& key,
+ const std::string& url,
+ const std::vector<SecondaryStorageShardConfig>& shards)
+{
+ ASSERT(!shards.empty());
+
+ // This is the "weighted rendezvous hashing" algorithm.
+ double highest_score = -1.0;
+ std::string best_shard;
+ for (const auto& shard_config : shards) {
+ Checksum checksum;
+ checksum.update(key.bytes(), key.size());
+ checksum.update(shard_config.name.data(), shard_config.name.length());
+ const double score = to_half_open_unit_interval(checksum.digest());
+ ASSERT(score >= 0.0 && score < 1.0);
+ const double weighted_score =
+ score == 0.0 ? 0.0 : shard_config.weight / -std::log(score);
+ if (weighted_score > highest_score) {
+ best_shard = shard_config.name;
+ highest_score = weighted_score;
+ }
}
- if (entry.failed) {
- LOG("Not {} {} since it failed earlier",
+ return util::replace_first(url, "*", best_shard);
+}
+
+static SecondaryStorageBackendEntry*
+get_backend(SecondaryStorageEntry& entry,
+ const Digest& key,
+ nonstd::string_view operation_description,
+ const bool for_writing)
+{
+ if (for_writing && entry.config.read_only) {
+ LOG("Not {} {} since it is read-only",
operation_description,
entry.url_for_logging);
- return false;
+ return nullptr;
}
- if (!entry.backend) {
+ const auto shard_url =
+ entry.config.shards.empty()
+ ? entry.config.params.url
+ : get_shard_url(key, entry.config.params.url.str(), entry.config.shards);
+ auto backend =
+ std::find_if(entry.backends.begin(),
+ entry.backends.end(),
+ [&](const auto& x) { return x.url.str() == shard_url.str(); });
+
+ if (backend == entry.backends.end()) {
+ auto shard_url_for_logging = shard_url;
+ shard_url_for_logging.user_info("");
+ entry.backends.push_back(
+ {shard_url, shard_url_for_logging.str(), {}, false});
+ auto shard_params = entry.config.params;
+ shard_params.url = shard_url;
try {
- entry.backend = entry.storage->create_backend(entry.config.params);
+ entry.backends.back().impl = entry.storage->create_backend(shard_params);
} catch (const secondary::SecondaryStorage::Backend::Failed& e) {
LOG("Failed to construct backend for {}{}",
entry.url_for_logging,
nonstd::string_view(e.what()).empty() ? "" : FMT(": {}", e.what()));
- mark_backend_as_failed(entry, e.failure());
+ mark_backend_as_failed(entry.backends.back(), e.failure());
}
+ return &entry.backends.back();
+ } else if (backend->failed) {
+ LOG("Not {} {} since it failed earlier",
+ operation_description,
+ entry.url_for_logging);
+ return nullptr;
+ } else {
+ return &*backend;
}
-
- return static_cast<bool>(entry.backend);
}
nonstd::optional<std::string>
Storage::get_from_secondary_storage(const Digest& key)
{
for (const auto& entry : m_secondary_storages) {
- if (!backend_is_available(*entry, "getting from", false)) {
+ auto backend = get_backend(*entry, key, "getting from", false);
+ if (!backend) {
continue;
}
Timer timer;
- const auto result = entry->backend->get(key);
+ const auto result = backend->impl->get(key);
const auto ms = timer.measure_ms();
if (!result) {
- mark_backend_as_failed(*entry, result.error());
+ mark_backend_as_failed(*backend, result.error());
continue;
}
if (value) {
LOG("Retrieved {} from {} ({:.2f} ms)",
key.to_string(),
- entry->url_for_logging,
+ backend->url_for_logging,
ms);
return *value;
} else {
- LOG(
- "No {} in {} ({:.2f} ms)", key.to_string(), entry->url_for_logging, ms);
+ LOG("No {} in {} ({:.2f} ms)",
+ key.to_string(),
+ backend->url_for_logging,
+ ms);
}
}
Storage::put_in_secondary_storage(const Digest& key, const std::string& value)
{
for (const auto& entry : m_secondary_storages) {
- if (!backend_is_available(*entry, "putting in", true)) {
+ auto backend = get_backend(*entry, key, "putting in", true);
+ if (!backend) {
continue;
}
Timer timer;
- const auto result = entry->backend->put(key, value);
+ const auto result = backend->impl->put(key, value);
const auto ms = timer.measure_ms();
if (!result) {
// The backend is expected to log details about the error.
- mark_backend_as_failed(*entry, result.error());
+ mark_backend_as_failed(*backend, result.error());
continue;
}
Storage::remove_from_secondary_storage(const Digest& key)
{
for (const auto& entry : m_secondary_storages) {
- if (!backend_is_available(*entry, "removing from", true)) {
+ auto backend = get_backend(*entry, key, "removing from", true);
+ if (!backend) {
continue;
}
Timer timer;
- const auto result = entry->backend->remove(key);
+ const auto result = backend->impl->remove(key);
const auto ms = timer.measure_ms();
if (!result) {
- mark_backend_as_failed(*entry, result.error());
+ mark_backend_as_failed(*backend, result.error());
continue;
}