debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key)
<< " for writing in map [" << path << ']');
const int idx = slotIdx(key);
- free(idx);
Slot &s = shared->slots[idx];
- freeIfNeeded(s);
- if (s.state.swap_if(Slot::Empty, Slot::Writing)) {
- fileno = idx;
+
+ if (s.exclusiveLock()) {
+ assert(s.state != Slot::Writeable); // until we start breaking locks
+ s.state = Slot::Writeable;
s.setKey(key);
+ fileno = idx;
++shared->count;
debugs(79, 5, HERE << " opened slot at " << fileno << " for key " <<
storeKeyText(key) << " for writing in map [" << path << ']');
- return &s.seBasics;
+ return &s.seBasics; // and keep the entry locked
}
+
debugs(79, 5, HERE << " failed to open slot for key " << storeKeyText(key)
<< " for writing in map [" << path << ']');
- return 0;
+ return NULL;
}
void
"openning for reading in map [" << path << ']');
assert(valid(fileno));
Slot &s = shared->slots[fileno];
- assert(s.state == Slot::Writing);
- ++s.readLevel;
- assert(s.state.swap_if(Slot::Writing, Slot::Usable));
+ assert(s.state.swap_if(Slot::Writeable, Slot::Readable));
+ s.releaseExclusiveLock();
}
bool
-Rock::DirMap::free(const sfileno fileno)
+Rock::DirMap::putAt(const StoreEntry &e, const sfileno fileno)
{
- assert(valid(fileno));
- Slot &s = shared->slots[fileno];
- if (s.state.swap_if(Slot::Usable, Slot::WaitingToBeFreed)) {
- debugs(79, 5, HERE << " marked slot at " << fileno << " to be freed in"
- " map [" << path << ']');
- freeIfNeeded(s);
+ const cache_key *key = static_cast<const cache_key*>(e.key);
+ debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key)
+ << " for putting in map [" << path << ']');
+ if (!valid(fileno)) {
+ debugs(79, 5, HERE << "failure: bad fileno: " << fileno);
+ return false;
+ }
+
+ const int idx = slotIdx(key);
+ if (fileno != idx) {
+ debugs(79, 5, HERE << "failure: hash changed: " << idx << " vs. " <<
+ fileno);
+ return false;
+ }
+
+ Slot &s = shared->slots[idx];
+
+ if (s.exclusiveLock()) {
+ assert(s.state != Slot::Writeable); // until we start breaking locks
+ s.setKey(static_cast<const cache_key*>(e.key));
+ s.seBasics.set(e);
+ s.state = Slot::Readable;
+ s.releaseExclusiveLock();
+ ++shared->count;
+ debugs(79, 5, HERE << " put slot at " << fileno << " for key " <<
+ storeKeyText(key) << " in map [" << path << ']');
return true;
}
- debugs(79, 5, HERE << " failed to mark slot at " << fileno << " to be "
- "freed in map [" << path << ']');
+
+ debugs(79, 5, HERE << " failed to open slot for key " << storeKeyText(key)
+ << " for putting in map [" << path << ']');
return false;
}
+void
+Rock::DirMap::free(const sfileno fileno)
+{
+ debugs(79, 5, HERE << " marking slot at " << fileno << " to be freed in"
+ " map [" << path << ']');
+
+ assert(valid(fileno));
+ Slot &s = shared->slots[fileno];
+ s.waitingToBeFreed = true; // mark, regardless of whether we can free
+ freeIfNeeded(s);
+}
+
const StoreEntryBasics *
Rock::DirMap::openForReading(const cache_key *const key, sfileno &fileno)
{
debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key)
<< " for reading in map [" << path << ']');
const int idx = slotIdx(key);
- const StoreEntryBasics *const seBasics = openForReadingAt(idx);
- if (seBasics) {
+ if (const StoreEntryBasics *const seBasics = openForReadingAt(idx)) {
Slot &s = shared->slots[idx];
if (s.checkKey(key)) {
fileno = idx;
']');
return seBasics;
}
- --s.readLevel;
+ s.releaseSharedLock();
freeIfNeeded(s);
}
debugs(79, 5, HERE << " failed to open slot for key " << storeKeyText(key)
"reading in map [" << path << ']');
assert(valid(fileno));
Slot &s = shared->slots[fileno];
- ++s.readLevel;
- if (s.state == Slot::Usable) {
+ if (s.sharedLock()) {
debugs(79, 5, HERE << " opened slot at " << fileno << " for reading in"
" map [" << path << ']');
return &s.seBasics;
}
- --s.readLevel;
freeIfNeeded(s);
debugs(79, 5, HERE << " failed to open slot at " << fileno << " for "
"reading in map [" << path << ']');
"map [" << path << ']');
assert(valid(fileno));
Slot &s = shared->slots[fileno];
- assert(s.readLevel > 0);
- --s.readLevel;
+ s.releaseSharedLock();
freeIfNeeded(s);
}
return (k[0] + k[1]) % shared->limit;
}
-Rock::DirMap::Slot &
+Rock::Slot &
Rock::DirMap::slot(const cache_key *const key)
{
return shared->slots[slotIdx(key)];
Rock::DirMap::freeIfNeeded(Slot &s)
{
const int idx = &s - shared->slots;
- if (s.state.swap_if(Slot::WaitingToBeFreed, Slot::Freeing)) {
- debugs(79, 5, HERE << " trying to free slot at " << idx << " in map ["
- << path << ']');
- if (s.readLevel > 0) {
- assert(s.state.swap_if(Slot::Freeing, Slot::WaitingToBeFreed));
- debugs(79, 5, HERE << " failed to free slot at " << idx << " in "
- "map [" << path << ']');
- } else {
+ if (s.exclusiveLock()) {
+ if (s.waitingToBeFreed.swap_if(true, false)) {
memset(s.key_, 0, sizeof(s.key_));
memset(&s.seBasics, 0, sizeof(s.seBasics));
+ s.state = Slot::Empty;
+ s.releaseExclusiveLock();
--shared->count;
- assert(s.state.swap_if(Slot::Freeing, Slot::Empty));
debugs(79, 5, HERE << " freed slot at " << idx << " in map [" <<
path << ']');
- }
+ } else {
+ s.releaseExclusiveLock();
+ }
}
}
return sizeof(Shared) + limit * sizeof(Slot);
}
+
+/* Rock::Slot */
+
void
-Rock::DirMap::Slot::setKey(const cache_key *const aKey)
+Rock::Slot::setKey(const cache_key *const aKey)
{
memcpy(key_, aKey, sizeof(key_));
}
bool
-Rock::DirMap::Slot::checkKey(const cache_key *const aKey) const
+Rock::Slot::checkKey(const cache_key *const aKey) const
{
const uint32_t *const k = reinterpret_cast<const uint32_t *>(aKey);
return k[0] == key_[0] && k[1] == key_[1] &&
k[2] == key_[2] && k[3] == key_[3];
}
+
+bool
+Rock::Slot::sharedLock() const
+{
+ ++readers; // this locks new writers out
+ if (state == Readable && !writers && !waitingToBeFreed)
+ return true;
+ --readers;
+ return false;
+}
+
+bool
+Rock::Slot::exclusiveLock()
+{
+ if (!writers++) { // we are the first writer (this locks new readers out)
+ if (!readers) // there are no old readers
+ return true;
+ }
+ --writers;
+ return false;
+}
+
+void
+Rock::Slot::releaseSharedLock() const
+{
+ assert(readers-- > 0);
+}
+
+void
+Rock::Slot::releaseExclusiveLock()
+{
+ assert(writers-- > 0);
+}
+
+
Rock::DirMap::Shared::Shared(const int aLimit): limit(aLimit), count(0)
{
}
namespace Rock {
+/// DirMap entry
+class Slot {
+public:
+ /// possible persistent states
+ typedef enum {
+ Empty, ///< ready for writing, with nothing of value
+ Writeable, ///< transitions from Empty to Readable
+ Readable, ///< ready for reading
+ } State;
+
+ void setKey(const cache_key *const aKey);
+ bool checkKey(const cache_key *const aKey) const;
+
+ bool sharedLock() const; ///< lock for reading or return false
+ bool exclusiveLock(); ///< lock for modification or return false
+ void releaseSharedLock() const; ///< undo successful sharedLock()
+ void releaseExclusiveLock(); ///< undo successful exclusiveLock()
+
+public:
+ // we want two uint64_t, but older GCCs lack __sync_fetch_and_add_8
+ AtomicWordT<uint32_t> key_[4]; ///< MD5 entry key
+ StoreEntryBasics seBasics; ///< basic store entry data
+ AtomicWordT<uint8_t> state; ///< current state
+ AtomicWordT<uint8_t> waitingToBeFreed; ///< a state-independent mark
+
+private:
+ mutable AtomicWord readers; ///< number of users trying to read
+ AtomicWord writers; ///< number of writers trying to modify the slot
+};
+
/// \ingroup Rock
/// map of used db slots indexed by sfileno
class DirMap
DirMap(const char *const aPath, const int limit); ///< create a new shared DirMap
DirMap(const char *const aPath); ///< open an existing shared DirMap
- /// start writing a new entry
+ /// finds space for writing a new entry or returns nil
StoreEntryBasics *openForWriting(const cache_key *const key, sfileno &fileno);
/// finish writing a new entry, leaves the entry opened for reading
void closeForWriting(const sfileno fileno);
- /// mark slot as waiting to be freed, will be freed when no one uses it
- bool free(const sfileno fileno);
+ /// stores entry info at the requested slot or returns false
+ bool putAt(const StoreEntry &e, const sfileno fileno);
+
+ /// mark the slot as waiting to be freed and, if possible, free it
+ void free(const sfileno fileno);
/// open slot for reading, increments read level
const StoreEntryBasics *openForReading(const cache_key *const key, sfileno &fileno);
static int AbsoluteEntryLimit(); ///< maximum entryLimit() possible
private:
- struct Slot {
- enum {
- Empty,
- Writing,
- Usable,
- WaitingToBeFreed,
- Freeing
- };
-
- void setKey(const cache_key *const aKey);
- bool checkKey(const cache_key *const aKey) const;
-
- AtomicWordT<uint8_t> state; ///< slot state
- AtomicWord readLevel; ///< read level
-
- // we want two uint64_t, but older GCCs lack __sync_fetch_and_add_8
- AtomicWordT<uint32_t> key_[4]; ///< MD5 entry key
-
- StoreEntryBasics seBasics; ///< basic store entry data
- };
-
struct Shared {
Shared(const int aLimit);
}
/* Add a new object to the cache with empty memory copy and pointer to disk
- * use to rebuild store from disk. XXX: dupes UFSSwapDir::addDiskRestore */
+ * use to rebuild store from disk. Based on UFSSwapDir::addDiskRestore */
bool
Rock::SwapDir::addEntry(const int fileno, const StoreEntry &from)
{
- const cache_key *const key = reinterpret_cast<const cache_key *>(from.key);
- debugs(47, 5, HERE << &from << ' ' << storeKeyText(key)
- << ", fileno="<< std::setfill('0') << std::hex << std::uppercase <<
+ debugs(47, 8, HERE << &from << ' ' << from.getMD5Text() <<
+ ", fileno="<< std::setfill('0') << std::hex << std::uppercase <<
std::setw(8) << fileno);
- // after writing, we close for reading because we do not add this entry to
- // store_table and, hence, there is nobody to hold the read lock
-
- int idx;
- StoreEntryBasics *const basics = map->openForWriting(key, idx);
- if (!basics) {
- debugs(47, 5, HERE << "Rock::SwapDir::addEntry: the entry loaded from "
- "disk clashed with locked newer entries");
- return false;
- } else if (fileno != idx) {
- debugs(47, 5, HERE << "Rock::SwapDir::addEntry: the entry loaded from "
- "disk was hashed to a new slot");
- map->closeForWriting(idx);
- map->closeForReading(idx);
- map->free(idx);
- return false;
+ if (map->putAt(from, fileno)) {
+ // we do not add this entry to store_table so core will not updateSize
+ updateSize(from.swap_file_sz, +1);
+ return true;
}
- basics->set(from);
- map->closeForWriting(fileno);
- map->closeForReading(fileno);
- return true;
+
+ return false;
}