]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Split Rock-only Rock::DirMap into Rock::DirMap and reusable Ipc pieces
authorAlex Rousskov <rousskov@measurement-factory.com>
Sat, 9 Apr 2011 04:24:06 +0000 (22:24 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Sat, 9 Apr 2011 04:24:06 +0000 (22:24 -0600)
which a shared memory cache implementation can use:

Ipc::StoreMap is responsible for maintaining a collection of lockable slots,
each with readable/writeable/free state and a "waiting to be free" flag. Kids
of this class can add more metadata (in parallel structures using the same
index as primary slots). I tried extending the slots themselves, but that
turned out to be more complex/messy.

Ipc::ReadWriteLock is a basic multiple readers, single writer lock.  Its
earlier implementation inside Rock::DirMap mixed slot locking and slot
state/flags. That simplified the caller code a little, but the current simpler
class is easier to understand and reuse.

Rock::DirMap now just adds Rock::DbCellHeader metadata to Ipc::StoreMap slots.

Simplified mapping API by reducing the number of similar-but-different
methods. For example, instead of putAt, the caller can use an
openForWriting/closeForWriting pair. This helps with moving custom metadata
manipulations outside of the reusable Ipc::StoreMap.

It would be possible to split Ipc::StoreMap further by moving Store-specific
bits outside of its slots. Currently, there is no need for that though.

src/fs/rock/RockDirMap.cc
src/fs/rock/RockDirMap.h
src/fs/rock/RockSwapDir.cc
src/ipc/Makefile.am
src/ipc/ReadWriteLock.cc [new file with mode: 0644]
src/ipc/ReadWriteLock.h [new file with mode: 0644]
src/ipc/StoreMap.cc [new file with mode: 0644]
src/ipc/StoreMap.h [new file with mode: 0644]

index 64c4458e49ad023a4a27eceeccdf3660733bbc5f..160afd73eb95b2c0f3f02f7e61cac8f0a7c4cda1 100644 (file)
 #include "fs/rock/RockDirMap.h"
 
 Rock::DirMap::DirMap(const char *const aPath, const int limit):
-    path(aPath), shm(aPath)
+    Ipc::StoreMap(aPath, limit, Shared::MemSize(limit))
 {
-    shm.create(SharedSize(limit));
     assert(shm.mem());
-    shared = new (shm.mem()) Shared(limit);
-    debugs(79, 5, HERE << "] new map [" << path << "] created using a new "
-           "shared memory segment for cache_dir '" << path << "' with limit=" <<
-           entryLimit());
+    shared = new (shm.reserve(Shared::MemSize(limit))) Shared;
 }
 
 Rock::DirMap::DirMap(const char *const aPath):
-    path(aPath), shm(aPath)
+    Ipc::StoreMap(aPath)
 {
-    shm.open();
+    const int limit = entryLimit();
     assert(shm.mem());
-    shared = reinterpret_cast<Shared *>(shm.mem());
-    debugs(79, 5, HERE << "] new map [" << path << "] created using existing "
-           "shared memory segment for cache_dir '" << path << "' with limit=" <<
-           entryLimit());
+    shared = reinterpret_cast<Shared *>(shm.reserve(Shared::MemSize(limit)));
 }
 
-Rock::StoreEntryBasics *
-Rock::DirMap::openForWriting(const cache_key *const key, sfileno &fileno)
+Rock::DbCellHeader &
+Rock::DirMap::header(const sfileno fileno)
 {
-    debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key)
-           << " for writing in map [" << path << ']');
-    const int idx = slotIdx(key);
-    Slot &s = shared->slots[idx];
-
-    if (s.exclusiveLock()) {
-        assert(s.state != Slot::Writeable); // until we start breaking locks
-        if (s.state == Slot::Empty) // we may also overwrite a Readable slot
-            ++shared->count;
-        s.state = Slot::Writeable;
-        s.setKey(key);
-        fileno = idx;
-        debugs(79, 5, HERE << " opened slot at " << fileno << " for key " <<
-               storeKeyText(key) << " for writing in map [" << path << ']');
-        return &s.seBasics; // and keep the entry locked
-    }
-
-    debugs(79, 5, HERE << " failed to open slot at " << idx << " for key " << storeKeyText(key)
-           << " for writing in map [" << path << ']');
-    return NULL;
-}
-
-void
-Rock::DirMap::closeForWriting(const sfileno fileno)
-{
-    debugs(79, 5, HERE << " closing slot at " << fileno << " for writing and "
-           "openning for reading in map [" << path << ']');
-    assert(valid(fileno));
-    Slot &s = shared->slots[fileno];
-    assert(s.state.swap_if(Slot::Writeable, Slot::Readable));
-    s.switchExclusiveToSharedLock();
-}
-
-/// terminate writing the entry, freeing its slot for others to use
-void
-Rock::DirMap::abortWriting(const sfileno fileno)
-{
-    debugs(79, 5, HERE << " abort writing slot at " << fileno <<
-           " in map [" << path << ']');
-    assert(valid(fileno));
-    Slot &s = shared->slots[fileno];
-    assert(s.state == Slot::Writeable);
-    freeLocked(s);
-}
-
-void
-Rock::DirMap::abortIo(const sfileno fileno)
-{
-    debugs(79, 5, HERE << " abort I/O for slot at " << fileno <<
-           " in map [" << path << ']');
-    assert(valid(fileno));
-    Slot &s = shared->slots[fileno];
-
-    // The caller is a lock holder. Thus, if we are Writeable, then the
-    // caller must be the writer; otherwise the caller must be the reader.
-    if (s.state == Slot::Writeable)
-        abortWriting(fileno);
-    else
-        closeForReading(fileno);
-}
-
-const Rock::StoreEntryBasics *
-Rock::DirMap::peekAtReader(const sfileno fileno) const
-{
-    assert(valid(fileno));
-    const Slot &s = shared->slots[fileno];
-    switch (s.state) {
-    case Slot::Readable:
-        return &s.seBasics; // immediate access by lock holder so no locking
-    case Slot::Writeable:
-        return NULL; // cannot read the slot when it is being written
-    case Slot::Empty:
-        assert(false); // must be locked for reading or writing
-    }
-    assert(false); // not reachable
-    return NULL;
-}
-
-bool
-Rock::DirMap::putAt(const DbCellHeader &header, const StoreEntry &e,
-                    const sfileno fileno)
-{
-    const cache_key *key = static_cast<const cache_key*>(e.key);
-    debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key)
-           << " for putting in map [" << path << ']');
-    if (!valid(fileno)) {
-        debugs(79, 5, HERE << "failure: bad fileno: " << fileno);
-        return false;
-    }
-    
-    const int idx = slotIdx(key);
-    if (fileno != idx) {
-        debugs(79, 5, HERE << "failure: hash changed: " << idx << " vs. " <<
-            fileno);
-        return false;
-    }
-    
-    Slot &s = shared->slots[idx];
-
-    if (s.exclusiveLock()) {
-        assert(s.state != Slot::Writeable); // until we start breaking locks
-        if (s.state == Slot::Empty) // we may also overwrite a Readable slot
-            ++shared->count;
-        s.setKey(static_cast<const cache_key*>(e.key));
-        s.seBasics.set(header, e);
-        s.state = Slot::Readable;
-        s.releaseExclusiveLock();
-        debugs(79, 5, HERE << " put slot at " << fileno << " for key " <<
-               storeKeyText(key) << " in map [" << path << ']');
-        return true;
-    }
-
-    debugs(79, 5, HERE << " failed to open slot for key " << storeKeyText(key)
-           << " for putting in map [" << path << ']');
-    return false;
-}
-
-void
-Rock::DirMap::free(const sfileno fileno)
-{
-    debugs(79, 5, HERE << " marking slot at " << fileno << " to be freed in"
-               " map [" << path << ']');
-
-    assert(valid(fileno));
-    Slot &s = shared->slots[fileno];
-    s.waitingToBeFreed = true; // mark, regardless of whether we can free
-    freeIfNeeded(s);
-}
-
-const Rock::StoreEntryBasics *
-Rock::DirMap::openForReading(const cache_key *const key, sfileno &fileno)
-{
-    debugs(79, 5, HERE << " trying to open slot for key " << storeKeyText(key)
-           << " for reading in map [" << path << ']');
-    const int idx = slotIdx(key);
-    if (const StoreEntryBasics *const seBasics = openForReadingAt(idx)) {
-        Slot &s = shared->slots[idx];
-        if (s.checkKey(key)) {
-            fileno = idx;
-            debugs(79, 5, HERE << " opened slot at " << fileno << " for key "
-                   << storeKeyText(key) << " for reading in map [" << path <<
-                   ']');
-            return seBasics;
-        }
-        s.releaseSharedLock();
-        freeIfNeeded(s);
-    }
-    debugs(79, 5, HERE << " failed to open slot for key " << storeKeyText(key)
-           << " for reading in map [" << path << ']');
-    return 0;
+    assert(0 <= fileno && fileno < entryLimit());
+    assert(shared);
+    return shared->headers[fileno];
 }
 
-const Rock::StoreEntryBasics *
-Rock::DirMap::openForReadingAt(const sfileno fileno)
+const Rock::DbCellHeader &
+Rock::DirMap::header(const sfileno fileno) const
 {
-    debugs(79, 5, HERE << " trying to open slot at " << fileno << " for "
-           "reading in map [" << path << ']');
-    assert(valid(fileno));
-    Slot &s = shared->slots[fileno];
-    if (s.sharedLock()) {
-        debugs(79, 5, HERE << " opened slot at " << fileno << " for reading in"
-               " map [" << path << ']');
-        return &s.seBasics;
-    }
-    freeIfNeeded(s);
-    debugs(79, 5, HERE << " failed to open slot at " << fileno << " for "
-           "reading in map [" << path << ']');
-    return 0;
-}
-
-void
-Rock::DirMap::closeForReading(const sfileno fileno)
-{
-    debugs(79, 5, HERE << " closing slot at " << fileno << " for reading in "
-           "map [" << path << ']');
-    assert(valid(fileno));
-    Slot &s = shared->slots[fileno];
-    assert(s.state == Slot::Readable);
-    s.releaseSharedLock();
-    freeIfNeeded(s);
-}
-
-int
-Rock::DirMap::entryLimit() const
-{
-    return shared->limit;
-}
-
-int
-Rock::DirMap::entryCount() const
-{
-    return shared->count;
-}
-
-bool
-Rock::DirMap::full() const
-{
-    return entryCount() >= entryLimit();
-}
-
-void
-Rock::DirMap::updateStats(MapStats &stats) const
-{
-    stats.capacity += shared->limit;
-    for (int i = 0; i < shared->limit; ++i)
-        shared->slots[i].updateStats(stats);
-}
-
-bool
-Rock::DirMap::valid(const int pos) const
-{
-    return 0 <= pos && pos < entryLimit();
+    assert(0 <= fileno && fileno < entryLimit());
+    assert(shared);
+    return shared->headers[fileno];
 }
 
 int
@@ -255,184 +47,8 @@ Rock::DirMap::AbsoluteEntryLimit()
     return sfilenoMax;
 }
 
-int
-Rock::DirMap::slotIdx(const cache_key *const key) const
-{
-    const uint64_t *const k = reinterpret_cast<const uint64_t *>(key);
-    // TODO: use a better hash function
-    return (k[0] + k[1]) % shared->limit;
-}
-
-Rock::Slot &
-Rock::DirMap::slot(const cache_key *const key)
-{
-    return shared->slots[slotIdx(key)];
-}
-
-/// frees the slot if (b) it is waiting to be freed and (a) we can lock it
-void
-Rock::DirMap::freeIfNeeded(Slot &s)
+size_t
+Rock::DirMap::Shared::MemSize(int limit)
 {
-    if (s.exclusiveLock()) {
-        if (s.waitingToBeFreed == true)
-            freeLocked(s);
-        else
-            s.releaseExclusiveLock();
-    }
-}
-
-/// unconditionally frees the already exclusively locked slot and releases lock
-void
-Rock::DirMap::freeLocked(Slot &s)
-{
-    memset(s.key_, 0, sizeof(s.key_));
-    memset(&s.seBasics, 0, sizeof(s.seBasics));
-    s.waitingToBeFreed = false;
-    s.state = Slot::Empty;
-    s.releaseExclusiveLock();
-    --shared->count;
-    debugs(79, 5, HERE << " freed slot at " << (&s - shared->slots) <<
-           " in map [" << path << ']');
-}
-
-int
-Rock::DirMap::SharedSize(const int limit)
-{
-    return sizeof(Shared) + limit * sizeof(Slot);
-}
-
-
-/* Rock::Slot */
-
-void
-Rock::Slot::setKey(const cache_key *const aKey)
-{
-    memcpy(key_, aKey, sizeof(key_));
-}
-
-bool
-Rock::Slot::checkKey(const cache_key *const aKey) const
-{
-    const uint32_t *const k = reinterpret_cast<const uint32_t *>(aKey);
-    return k[0] == key_[0] && k[1] == key_[1] &&
-           k[2] == key_[2] && k[3] == key_[3];
-}
-
-
-bool
-Rock::Slot::sharedLock() const
-{
-    ++readers; // this locks new writers out
-    if (state == Readable && !writers && !waitingToBeFreed)
-        return true;
-    --readers;
-    return false;
-}
-
-bool
-Rock::Slot::exclusiveLock()
-{
-    if (!writers++) { // we are the first writer (this locks new readers out)
-        if (!readers) // there are no old readers
-            return true;
-       }
-    --writers;
-    return false;
-}
-
-void
-Rock::Slot::releaseSharedLock() const
-{
-    assert(readers-- > 0);
-}
-
-void
-Rock::Slot::releaseExclusiveLock()
-{
-    assert(writers-- > 0);
-}
-
-void
-Rock::Slot::switchExclusiveToSharedLock()
-{
-    ++readers; // must be done before we release exclusive control
-    releaseExclusiveLock();
-}
-
-void
-Rock::Slot::updateStats(MapStats &stats) const
-{
-    switch (state) {
-    case Readable:
-        ++stats.readable;
-        stats.readers += readers;
-        break;
-    case Writeable:
-        ++stats.writeable;
-        stats.writers += writers;
-        break;
-    case Empty:
-        ++stats.empty;
-        break;
-    }
-
-    if (waitingToBeFreed)
-        ++stats.marked;
-}
-
-
-Rock::DirMap::Shared::Shared(const int aLimit): limit(aLimit), count(0)
-{
-}
-
-void
-Rock::StoreEntryBasics::set(const DbCellHeader &aHeader, const StoreEntry &from)
-{
-    assert(from.swap_file_sz > 0);
-    memset(this, 0, sizeof(*this));
-    header = aHeader;
-    timestamp = from.timestamp;
-    lastref = from.lastref;
-    expires = from.expires;
-    lastmod = from.lastmod;
-    swap_file_sz = from.swap_file_sz;
-    refcount = from.refcount;
-    flags = from.flags;
-}
-
-
-/* MapStats */
-
-Rock::MapStats::MapStats()
-{
-    memset(this, 0, sizeof(*this));
-}
-void
-Rock::MapStats::dump(StoreEntry &e) const
-{
-    storeAppendPrintf(&e, "Available slots: %9d\n", capacity);
-
-    if (!capacity)
-        return;
-
-    storeAppendPrintf(&e, "Readable slots:  %9d %6.2f%%\n",
-        readable, (100.0 * readable / capacity));
-    storeAppendPrintf(&e, "Filling slots:   %9d %6.2f%%\n",
-        writeable, (100.0 * writeable / capacity));
-    storeAppendPrintf(&e, "Empty slots:     %9d %6.2f%%\n",
-        empty, (100.0 * empty / capacity));
-
-    if (readers || writers) {
-        const int locks = readers + writers;
-        storeAppendPrintf(&e, "Readers:         %9d %6.2f%%\n",
-            readers, (100.0 * readers / locks));
-        storeAppendPrintf(&e, "Writers:         %9d %6.2f%%\n",
-            writers, (100.0 * writers / locks));
-    }
-
-    if (readable + writeable) {
-        storeAppendPrintf(&e, "Marked slots:    %9d %6.2f%%\n",
-            marked, (100.0 * marked / (readable + writeable)));
-    }
+    return sizeof(Shared) + limit*sizeof(DbCellHeader);
 }
index 6317e61aa91db8ba491e2afedd4456742e217f42..cbba2f37738a8e813e30b32c59b19a045a046e05 100644 (file)
 #define SQUID_FS_ROCK_DIR_MAP_H
 
 #include "fs/rock/RockFile.h"
-#include "ipc/AtomicWord.h"
-#include "ipc/mem/Segment.h"
+#include "ipc/StoreMap.h"
 
 namespace Rock {
 
-class StoreEntryBasics {
-public:
-    void set(const DbCellHeader &aHeader, const StoreEntry &anEntry);
-
-    DbCellHeader header; ///< rock-specific entry metadata
-
-    /* START OF ON-DISK STORE_META_STD TLV field */
-    time_t timestamp;
-    time_t lastref;
-    time_t expires;
-    time_t lastmod;
-    uint64_t swap_file_sz;
-    u_short refcount;
-    u_short flags;
-    /* END OF ON-DISK STORE_META_STD */
-};
-
-/// aggregates basic map performance measurements; all numbers are approximate
-class MapStats {
-public:
-    MapStats();
-
-    void dump(StoreEntry &e) const;
-
-    int capacity; ///< the total number of slots in the map
-    int readable; ///< number of slots in Readable state
-    int writeable; ///< number of slots in Writeable state
-    int empty; ///< number of slots in Empty state
-    int readers; ///< sum of slot.readers
-    int writers; ///< sum of slot.writers
-    int marked; ///< number of slots marked for freeing
-};
-
-/// DirMap entry
-class Slot {
-public:
-    /// possible persistent states
-    typedef enum {
-        Empty, ///< ready for writing, with nothing of value
-        Writeable, ///< transitions from Empty to Readable
-        Readable, ///< ready for reading
-    } State;
-
-    void setKey(const cache_key *const aKey);
-    bool checkKey(const cache_key *const aKey) const;
-
-    bool sharedLock() const; ///< lock for reading or return false
-    bool exclusiveLock(); ///< lock for modification or return false
-    void releaseSharedLock() const; ///< undo successful sharedLock()
-    void releaseExclusiveLock(); ///< undo successful exclusiveLock()
-    void switchExclusiveToSharedLock(); ///< trade exclusive for shared access
-
-    /// adds approximate current stats to the supplied ones
-    void updateStats(MapStats &stats) const;
-
-public:
-    // we want two uint64_t, but older GCCs lack __sync_fetch_and_add_8
-    AtomicWordT<uint32_t> key_[4]; ///< MD5 entry key
-    StoreEntryBasics seBasics; ///< basic store entry data
-    AtomicWordT<uint8_t> state; ///< current state
-    AtomicWordT<uint8_t> waitingToBeFreed; ///< a state-independent mark
-
-private:
-    mutable AtomicWord readers; ///< number of users trying to read
-    AtomicWord writers; ///< number of writers trying to modify the slot
-};
-
 /// \ingroup Rock
 /// map of used db slots indexed by sfileno
-class DirMap
+class DirMap: public Ipc::StoreMap
 {
 public:
     DirMap(const char *const aPath, const int limit); ///< create a new shared DirMap
     DirMap(const char *const aPath); ///< open an existing shared DirMap
 
-    /// finds/reservers space for writing a new entry or returns nil
-    StoreEntryBasics *openForWriting(const cache_key *const key, sfileno &fileno);
-    /// successfully finish writing the entry, leaving it opened for reading
-    void closeForWriting(const sfileno fileno);
-
-    /// stores entry info at the requested slot or returns false
-    bool putAt(const DbCellHeader &header, const StoreEntry &e, const sfileno fileno);
-
-    /// only works on locked entries; returns nil unless the slot is readable
-    const StoreEntryBasics *peekAtReader(const sfileno fileno) const;
-
-    /// mark the slot as waiting to be freed and, if possible, free it
-    void free(const sfileno fileno);
-
-    /// open slot for reading, increments read level
-    const StoreEntryBasics *openForReading(const cache_key *const key, sfileno &fileno);
-    /// open slot for reading, increments read level
-    const StoreEntryBasics *openForReadingAt(const sfileno fileno);
-    /// close slot after reading, decrements read level
-    void closeForReading(const sfileno fileno);
-
-    /// called by lock holder to terminate either slot writing or reading
-    void abortIo(const sfileno fileno);
-
-    bool full() const; ///< there are no empty slots left
-    bool valid(int n) const; ///< whether n is a valid slot coordinate
-    int entryCount() const; ///< number of used slots
-    int entryLimit() const; ///< maximum number of slots that can be used
-
-    /// adds approximate current stats to the supplied ones
-    void updateStats(MapStats &stats) const;
+    /// write access to the cell header; call openForWriting() first!
+    DbCellHeader &header(const sfileno fileno);
+    /// read-only access to the cell header; call openForReading() first!
+    const DbCellHeader &header(const sfileno fileno) const;
 
     static int AbsoluteEntryLimit(); ///< maximum entryLimit() possible
 
 private:
-    struct Shared {
-        Shared(const int aLimit);
-
-        const AtomicWord limit; ///< maximum number of map slots
-        AtomicWord count; ///< current number of map slots
-
-        Slot slots[]; ///< slots storage
+    /// data shared by all DirMaps with the same path
+    class Shared {
+    public:
+        static size_t MemSize(int limit);
+        DbCellHeader headers[]; ///< DbCellHeaders storage
     };
 
-    int slotIdx(const cache_key *const key) const;
-    Slot &slot(const cache_key *const key);
-    const StoreEntryBasics *openForReading(Slot &s);
-    void abortWriting(const sfileno fileno);
-    void freeIfNeeded(Slot &s);
-    void freeLocked(Slot &s);
-    String sharedMemoryName();
-
-    static int SharedSize(const int limit);
-
-    const String path; ///< cache_dir path, used for logging
-    Ipc::Mem::Segment shm; ///< shared memory segment
     Shared *shared; ///< pointer to shared memory
 };
 
index 988bce78f23c5a1641215a15988d76897eb2e567..49cef04cb4c2f8c7580cae589da2e8ca2e49988e 100644 (file)
@@ -45,22 +45,24 @@ Rock::SwapDir::get(const cache_key *key)
         return NULL;
 
     sfileno fileno;
-    const StoreEntryBasics *const basics = map->openForReading(key, fileno);
-    if (!basics)
+    const Ipc::StoreMapSlot *const slot = map->openForReading(key, fileno);
+    if (!slot)
         return NULL;
 
+    const Ipc::StoreMapSlot::Basics &basics = slot->basics;
+
     // create a brand new store entry and initialize it with stored basics
     StoreEntry *e = new StoreEntry();
     e->lock_count = 0;
     e->swap_dirn = index;
     e->swap_filen = fileno;
-    e->swap_file_sz = basics->swap_file_sz;
-    e->lastref = basics->lastref;
-    e->timestamp = basics->timestamp;
-    e->expires = basics->expires;
-    e->lastmod = basics->lastmod;
-    e->refcount = basics->refcount;
-    e->flags = basics->flags;
+    e->swap_file_sz = basics.swap_file_sz;
+    e->lastref = basics.lastref;
+    e->timestamp = basics.timestamp;
+    e->expires = basics.expires;
+    e->lastmod = basics.lastmod;
+    e->refcount = basics.refcount;
+    e->flags = basics.flags;
     e->store_status = STORE_OK;
     e->setMemStatus(NOT_IN_MEMORY);
     e->swap_status = SWAPOUT_DONE;
@@ -302,10 +304,16 @@ Rock::SwapDir::addEntry(const int fileno, const DbCellHeader &header, const Stor
        ", fileno="<< std::setfill('0') << std::hex << std::uppercase <<
        std::setw(8) << fileno);
 
-    if (map->putAt(header, from, fileno)) {
-        // we do not add this entry to store_table so core will not updateSize
-        updateSize(from.swap_file_sz, +1);
-        return true;
+    sfileno newLocation = 0;
+    if (Ipc::StoreMapSlot *slot = map->openForWriting(reinterpret_cast<const cache_key *>(from.key), newLocation)) {
+        if (fileno == newLocation) {
+            slot->set(from);
+            map->header(fileno) = header;
+            // core will not updateSize: we do not add the entry to store_table
+            updateSize(from.swap_file_sz, +1);
+        } // else some other, newer entry got into our cell
+        map->closeForWriting(newLocation, false);
+        return fileno == newLocation;
     }
 
     return false;
@@ -350,14 +358,15 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI
     assert(payloadEnd <= max_objsize);
 
     sfileno fileno;
-    StoreEntryBasics *const basics =
+    Ipc::StoreMapSlot *const slot =
         map->openForWriting(reinterpret_cast<const cache_key *>(e.key), fileno);
-    if (!basics) {
+    if (!slot) {
         debugs(47, 5, HERE << "Rock::SwapDir::createStoreIO: map->add failed");
         return NULL;
     }
     e.swap_file_sz = header.payloadSize; // and will be copied to the map
-    basics->set(header, e);
+    slot->set(e);
+    map->header(fileno) = header;
 
     // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
     // If that does not happen, the entry will not decrement the read level!
@@ -412,23 +421,23 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS
     // The are two ways an entry can get swap_filen: our get() locked it for
     // reading or our storeSwapOutStart() locked it for writing. Peeking at our
     // locked entry is safe, but no support for reading a filling entry.
-    const StoreEntryBasics *basics = map->peekAtReader(e.swap_filen);
-    if (!basics)
+    const Ipc::StoreMapSlot *slot = map->peekAtReader(e.swap_filen);
+    if (!slot)
         return NULL; // we were writing afterall
 
     IoState *sio = new IoState(this, &e, cbFile, cbIo, data);
 
     sio->swap_dirn = index;
     sio->swap_filen = e.swap_filen;
-    sio->payloadEnd = sizeof(DbCellHeader) + basics->header.payloadSize;
+    sio->payloadEnd = sizeof(DbCellHeader) + map->header(e.swap_filen).payloadSize;
     assert(sio->payloadEnd <= max_objsize); // the payload fits the slot
 
     debugs(47,5, HERE << "dir " << index << " has old fileno: " <<
         std::setfill('0') << std::hex << std::uppercase << std::setw(8) <<
         sio->swap_filen);
 
-    assert(basics->swap_file_sz > 0);
-    assert(basics->swap_file_sz == e.swap_file_sz);
+    assert(slot->basics.swap_file_sz > 0);
+    assert(slot->basics.swap_file_sz == e.swap_file_sz);
 
     sio->diskOffset = diskOffset(sio->swap_filen);
     assert(sio->diskOffset + sio->payloadEnd <= diskOffsetLimit());
@@ -499,7 +508,7 @@ Rock::SwapDir::writeCompleted(int errflag, size_t rlen, RefCount< ::WriteRequest
 
     if (errflag == DISK_OK) {
         // close, assuming we only write once; the entry gets the read lock
-        map->closeForWriting(sio.swap_filen);
+        map->closeForWriting(sio.swap_filen, true);
         // do not increment sio.offset_ because we do it in sio->write()
     } else {
         // Do not abortWriting here. The entry should keep the write lock
@@ -650,7 +659,7 @@ Rock::SwapDir::statfs(StoreEntry &e) const
                 entryCount, (100.0 * entryCount / limit));
 
             if (limit < 100) { // XXX: otherwise too expensive to count
-                MapStats stats;
+                Ipc::ReadWriteLockStats stats;
                 map->updateStats(stats);
                 stats.dump(e);
             }
index 43b61e4c910a6c7a6f2397e7e31226760ca94497..147917690ef3ce39641384869ed1b94a4c7cef8c 100644 (file)
@@ -14,8 +14,12 @@ libipc_la_SOURCES = \
        Messages.h \
        Queue.cc \
        Queue.h \
+       ReadWriteLock.cc \
+       ReadWriteLock.h \
        StartListening.cc \
        StartListening.h \
+       StoreMap.cc \
+       StoreMap.h \
        StrandCoord.cc \
        StrandCoord.h \
        StrandCoords.h \
diff --git a/src/ipc/ReadWriteLock.cc b/src/ipc/ReadWriteLock.cc
new file mode 100644 (file)
index 0000000..9fef9c1
--- /dev/null
@@ -0,0 +1,97 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ */
+
+#include "squid.h"
+
+#include "Store.h"
+#include "ipc/ReadWriteLock.h"
+
+bool
+Ipc::ReadWriteLock::lockShared()
+{
+    ++readers; // this locks "new" writers out
+    if (!writers) // there are no old writers
+        return true;
+    --readers;
+    return false;
+}
+
+bool
+Ipc::ReadWriteLock::lockExclusive()
+{
+    if (!writers++) { // we are the first writer + this locks "new" readers out
+        if (!readers) // there are no old readers
+            return true;
+    }
+    --writers;
+    return false;
+}
+
+void
+Ipc::ReadWriteLock::unlockShared()
+{
+    assert(readers-- > 0);
+}
+
+void
+Ipc::ReadWriteLock::unlockExclusive()
+{
+    assert(writers-- > 0);
+}
+
+void
+Ipc::ReadWriteLock::switchExclusiveToShared()
+{
+    ++readers; // must be done before we release exclusive control
+    unlockExclusive();
+}
+
+void
+Ipc::ReadWriteLock::updateStats(ReadWriteLockStats &stats) const
+{
+    if (readers) {
+        ++stats.readable;
+        stats.readers += readers;
+    } else if (writers) {
+        ++stats.writeable;
+        stats.writers += writers;
+    } else {
+        ++stats.idle;
+    }
+    ++stats.count;
+}
+
+
+/* Ipc::ReadWriteLockStats */
+
+Ipc::ReadWriteLockStats::ReadWriteLockStats()
+{
+    memset(this, 0, sizeof(*this));
+}
+void
+Ipc::ReadWriteLockStats::dump(StoreEntry &e) const
+{
+    storeAppendPrintf(&e, "Available locks: %9d\n", count);
+
+    if (!count)
+        return;
+
+    storeAppendPrintf(&e, "Reading: %9d %6.2f%%\n",
+        readable, (100.0 * readable / count));
+    storeAppendPrintf(&e, "Writing: %9d %6.2f%%\n",
+        writeable, (100.0 * writeable / count));
+    storeAppendPrintf(&e, "Idle:    %9d %6.2f%%\n",
+        idle, (100.0 * idle / count));
+
+    if (readers || writers) {
+        const int locked = readers + writers;
+        storeAppendPrintf(&e, "Readers:         %9d %6.2f%%\n",
+            readers, (100.0 * readers / locked));
+        storeAppendPrintf(&e, "Writers:         %9d %6.2f%%\n",
+            writers, (100.0 * writers / locked));
+    }
+}
diff --git a/src/ipc/ReadWriteLock.h b/src/ipc/ReadWriteLock.h
new file mode 100644 (file)
index 0000000..91507de
--- /dev/null
@@ -0,0 +1,49 @@
+#ifndef SQUID_IPC_READ_WRITE_LOCK_H
+#define SQUID_IPC_READ_WRITE_LOCK_H
+
+#include "ipc/AtomicWord.h"
+
+class StoreEntry;
+
+namespace Ipc {
+
+class ReadWriteLockStats;
+
+/// an atomic readers-writer or shared-exclusive lock suitable for maps/tables
+class ReadWriteLock {
+public:
+    // default constructor is OK because of shared memory zero-initialization
+
+    bool lockShared(); ///< lock for reading or return false
+    bool lockExclusive(); ///< lock for modification or return false
+    void unlockShared(); ///< undo successful sharedLock()
+    void unlockExclusive(); ///< undo successful exclusiveLock()
+    void switchExclusiveToShared(); ///< stop writing, start reading
+
+    /// adds approximate current stats to the supplied ones
+    void updateStats(ReadWriteLockStats &stats) const;
+
+public:
+    mutable AtomicWord readers; ///< number of users trying to read
+    AtomicWord writers; ///< number of writers trying to modify protected data
+};
+
+
+/// approximate stats of a set of ReadWriteLocks
+class ReadWriteLockStats {
+public:
+    ReadWriteLockStats();
+
+    void dump(StoreEntry &e) const;
+
+    int count; ///< the total number of locks
+    int readable; ///< number of locks locked for reading
+    int writeable; ///< number of locks locked for writing
+    int idle; ///< number of unlocked locks
+    int readers; ///< sum of lock.readers
+    int writers; ///< sum of lock.writers
+};
+
+} // namespace Ipc
+
+#endif /* SQUID_IPC_READ_WRITE_LOCK_H */
diff --git a/src/ipc/StoreMap.cc b/src/ipc/StoreMap.cc
new file mode 100644 (file)
index 0000000..1103477
--- /dev/null
@@ -0,0 +1,312 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ */
+
+#include "squid.h"
+
+#include "Store.h"
+#include "ipc/StoreMap.h"
+
+Ipc::StoreMap::StoreMap(const char *const aPath, const int limit,
+    size_t sharedSizeExtra):
+    path(aPath), shm(aPath), shared(NULL)
+{
+    const size_t mySharedSize = Shared::MemSize(limit);
+    shm.create(mySharedSize + sharedSizeExtra);
+    shared = new (shm.reserve(mySharedSize)) Shared(limit);
+    debugs(54, 5, HERE << "new map [" << path << "] created");
+}
+
+Ipc::StoreMap::StoreMap(const char *const aPath):
+    path(aPath), shm(aPath), shared(NULL)
+{
+    shm.open();
+    assert(shm.mem());
+    shared = reinterpret_cast<Shared *>(shm.mem());
+    // check that nobody used our segment chunk and that shared->limit is sane
+    assert(shared == reinterpret_cast<Shared *>(shm.reserve(Shared::MemSize(shared->limit))));
+    debugs(54, 5, HERE << "attached map [" << path << "] created");
+}
+
+Ipc::StoreMap::Slot *
+Ipc::StoreMap::openForWriting(const cache_key *const key, sfileno &fileno)
+{
+    debugs(54, 5, HERE << " trying to open slot for key " << storeKeyText(key)
+           << " for writing in map [" << path << ']');
+    const int idx = slotIndexByKey(key);
+
+    Slot &s = shared->slots[idx];
+    ReadWriteLock &lock = s.lock;
+
+    if (lock.lockExclusive()) {
+        assert(s.state != Slot::Writeable); // until we start breaking locks
+
+        // free if the entry was dirty, keeping the entry locked
+        if (s.waitingToBeFreed == true)
+            freeLocked(s, true);
+
+        if (s.state == Slot::Empty) // we may also overwrite a Readable slot
+            ++shared->count;
+        s.state = Slot::Writeable;
+        fileno = idx;
+        //s.setKey(key); // XXX: the caller should do that
+        debugs(54, 5, HERE << " opened slot at " << idx <<
+               " for writing in map [" << path << ']');
+        return &s; // and keep the entry locked
+    }
+
+    debugs(54, 5, HERE << " failed to open slot at " << idx <<
+           " for writing in map [" << path << ']');
+    return NULL;
+}
+
+void
+Ipc::StoreMap::closeForWriting(const sfileno fileno, bool lockForReading)
+{
+    debugs(54, 5, HERE << " closing slot at " << fileno << " for writing and "
+           "openning for reading in map [" << path << ']');
+    assert(valid(fileno));
+    Slot &s = shared->slots[fileno];
+    assert(s.state == Slot::Writeable);
+    s.state = Slot::Readable;
+    if (lockForReading)
+        s.lock.switchExclusiveToShared();
+    else
+        s.lock.unlockExclusive();
+}
+
+/// terminate writing the entry, freeing its slot for others to use
+void
+Ipc::StoreMap::abortWriting(const sfileno fileno)
+{
+    debugs(54, 5, HERE << " abort writing slot at " << fileno <<
+           " in map [" << path << ']');
+    assert(valid(fileno));
+    Slot &s = shared->slots[fileno];
+    assert(s.state == Slot::Writeable);
+    freeLocked(s, false);
+}
+
+void
+Ipc::StoreMap::abortIo(const sfileno fileno)
+{
+    debugs(54, 5, HERE << " abort I/O for slot at " << fileno <<
+           " in map [" << path << ']');
+    assert(valid(fileno));
+    Slot &s = shared->slots[fileno];
+
+    // The caller is a lock holder. Thus, if we are Writeable, then the
+    // caller must be the writer; otherwise the caller must be the reader.
+    if (s.state == Slot::Writeable)
+        abortWriting(fileno);
+    else
+        closeForReading(fileno);
+}
+
+const Ipc::StoreMap::Slot *
+Ipc::StoreMap::peekAtReader(const sfileno fileno) const
+{
+    assert(valid(fileno));
+    const Slot &s = shared->slots[fileno];
+    switch (s.state) {
+    case Slot::Readable:
+        return &s; // immediate access by lock holder so no locking
+    case Slot::Writeable:
+        return NULL; // cannot read the slot when it is being written
+    case Slot::Empty:
+        assert(false); // must be locked for reading or writing
+    }
+    assert(false); // not reachable
+    return NULL;
+}
+
+void
+Ipc::StoreMap::free(const sfileno fileno)
+{
+    debugs(54, 5, HERE << " marking slot at " << fileno << " to be freed in"
+               " map [" << path << ']');
+
+    assert(valid(fileno));
+    Slot &s = shared->slots[fileno];
+
+    if (s.lock.lockExclusive())
+        freeLocked(s, false);
+    else
+        s.waitingToBeFreed = true; // mark to free it later
+}
+
+const Ipc::StoreMap::Slot *
+Ipc::StoreMap::openForReading(const cache_key *const key, sfileno &fileno)
+{
+    debugs(54, 5, HERE << " trying to open slot for key " << storeKeyText(key)
+           << " for reading in map [" << path << ']');
+    const int idx = slotIndexByKey(key);
+    if (const Slot *slot = openForReadingAt(idx)) {
+        if (slot->sameKey(key)) {
+            fileno = idx;
+            debugs(54, 5, HERE << " opened slot at " << fileno << " for key "
+                   << storeKeyText(key) << " for reading in map [" << path <<
+                   ']');
+            return slot; // locked for reading
+        }
+        slot->lock.unlockShared();
+    }
+    debugs(54, 5, HERE << " failed to open slot for key " << storeKeyText(key)
+           << " for reading in map [" << path << ']');
+    return NULL;
+}
+
+const Ipc::StoreMap::Slot *
+Ipc::StoreMap::openForReadingAt(const sfileno fileno)
+{
+    debugs(54, 5, HERE << " trying to open slot at " << fileno << " for "
+           "reading in map [" << path << ']');
+    assert(valid(fileno));
+    Slot &s = shared->slots[fileno];
+
+    if (!s.lock.lockShared()) {
+        debugs(54, 5, HERE << " failed to lock slot at " << fileno << " for "
+               "reading in map [" << path << ']');
+        return NULL;
+    }
+
+    if (s.state == Slot::Empty) {
+        s.lock.unlockShared();
+        debugs(54, 7, HERE << " empty slot at " << fileno << " for "
+               "reading in map [" << path << ']');
+        return NULL;
+    }
+
+    if (s.waitingToBeFreed) {
+        s.lock.unlockShared();
+        debugs(54, 7, HERE << " dirty slot at " << fileno << " for "
+               "reading in map [" << path << ']');
+        return NULL;
+    }
+
+    // cannot be Writing here if we got shared lock and checked Empty above
+    assert(s.state == Slot::Readable);
+    debugs(54, 5, HERE << " opened slot at " << fileno << " for reading in"
+           " map [" << path << ']');
+    return &s;
+}
+
+void
+Ipc::StoreMap::closeForReading(const sfileno fileno)
+{
+    debugs(54, 5, HERE << " closing slot at " << fileno << " for reading in "
+           "map [" << path << ']');
+    assert(valid(fileno));
+    Slot &s = shared->slots[fileno];
+    assert(s.state == Slot::Readable);
+    s.lock.unlockShared();
+}
+
+int
+Ipc::StoreMap::entryLimit() const
+{
+    return shared->limit;
+}
+
+int
+Ipc::StoreMap::entryCount() const
+{
+    return shared->count;
+}
+
+bool
+Ipc::StoreMap::full() const
+{
+    return entryCount() >= entryLimit();
+}
+
+void
+Ipc::StoreMap::updateStats(ReadWriteLockStats &stats) const
+{
+    for (int i = 0; i < shared->limit; ++i)
+        shared->slots[i].lock.updateStats(stats);
+}
+
+bool
+Ipc::StoreMap::valid(const int pos) const
+{
+    return 0 <= pos && pos < entryLimit();
+}
+
+int
+Ipc::StoreMap::slotIndexByKey(const cache_key *const key) const
+{
+    const uint64_t *const k = reinterpret_cast<const uint64_t *>(key);
+    // TODO: use a better hash function
+    return (k[0] + k[1]) % shared->limit;
+}
+
+Ipc::StoreMap::Slot &
+Ipc::StoreMap::slotByKey(const cache_key *const key)
+{
+    return shared->slots[slotIndexByKey(key)];
+}
+
+/// unconditionally frees the already exclusively locked slot and releases lock
+void
+Ipc::StoreMap::freeLocked(Slot &s, bool keepLocked)
+{
+    s.waitingToBeFreed = false;
+    s.state = Slot::Empty;
+    if (!keepLocked)
+        s.lock.unlockExclusive();
+    --shared->count;
+    debugs(54, 5, HERE << " freed slot at " << (&s - shared->slots) <<
+           " in map [" << path << ']');
+}
+
+
+/* Ipc::StoreMapSlot */
+
+Ipc::StoreMapSlot::StoreMapSlot(): state(Empty)
+{
+    xmemset(&key, 0, sizeof(key));
+    xmemset(&basics, 0, sizeof(basics));
+}
+
+void
+Ipc::StoreMapSlot::setKey(const cache_key *const aKey)
+{
+    memcpy(key, aKey, sizeof(key));
+}
+
+bool
+Ipc::StoreMapSlot::sameKey(const cache_key *const aKey) const
+{
+    const uint64_t *const k = reinterpret_cast<const uint64_t *>(aKey);
+    return k[0] == key[0] && k[1] == key[1];
+}
+
+void
+Ipc::StoreMapSlot::set(const StoreEntry &from)
+{
+    memcpy(key, from.key, sizeof(key));
+    // XXX: header = aHeader;
+    basics.timestamp = from.timestamp;
+    basics.lastref = from.lastref;
+    basics.expires = from.expires;
+    basics.lastmod = from.lastmod;
+    basics.swap_file_sz = from.swap_file_sz;
+    basics.refcount = from.refcount;
+    basics.flags = from.flags;
+}
+
+/* Ipc::StoreMap::Shared */
+
+Ipc::StoreMap::Shared::Shared(const int aLimit): limit(aLimit), count(0)
+{
+}
+
+size_t
+Ipc::StoreMap::Shared::MemSize(int limit)
+{
+    return sizeof(Shared) + limit * sizeof(Slot);
+}
+
diff --git a/src/ipc/StoreMap.h b/src/ipc/StoreMap.h
new file mode 100644 (file)
index 0000000..3489d28
--- /dev/null
@@ -0,0 +1,120 @@
+#ifndef SQUID_IPC_STORE_MAP_H
+#define SQUID_IPC_STORE_MAP_H
+
+#include "ipc/ReadWriteLock.h"
+#include "ipc/mem/Segment.h"
+
+namespace Ipc {
+
+/// a StoreMap element, holding basic shareable StoreEntry info
+class StoreMapSlot {
+public:
+    StoreMapSlot();
+
+    /// store StoreEntry key and basics
+    void set(const StoreEntry &anEntry);
+
+    void setKey(const cache_key *const aKey);
+    bool sameKey(const cache_key *const aKey) const;
+
+public:
+    mutable ReadWriteLock lock; ///< protects slot data below
+    AtomicWordT<uint8_t> waitingToBeFreed; ///< may be accessed w/o a lock
+
+    uint64_t key[2]; ///< StoreEntry key
+
+    // STORE_META_STD TLV field from StoreEntry
+    struct Basics {
+        time_t timestamp;
+        time_t lastref;
+        time_t expires;
+        time_t lastmod;
+        uint64_t swap_file_sz;
+        u_short refcount;
+        u_short flags;
+       } basics;
+
+    /// possible persistent states
+    typedef enum {
+        Empty, ///< ready for writing, with nothing of value
+        Writeable, ///< transitions from Empty to Readable
+        Readable, ///< ready for reading
+       } State;
+    State state; ///< current state
+};
+
+/// map of StoreMapSlots indexed by their keys, with read/write slot locking 
+/// kids extend to store custom data
+class StoreMap
+{
+public:
+    typedef StoreMapSlot Slot;
+
+    StoreMap(const char *const aPath, const int limit, size_t sharedSizeExtra); ///< create a new shared StoreMap
+    explicit StoreMap(const char *const aPath); ///< open an existing shared StoreMap
+
+    /// finds, reservers space for writing a new entry or returns nil
+    Slot *openForWriting(const cache_key *const key, sfileno &fileno);
+    /// successfully finish writing the entry
+    void closeForWriting(const sfileno fileno, bool lockForReading = false);
+
+    /// only works on locked entries; returns nil unless the slot is readable
+    const Slot *peekAtReader(const sfileno fileno) const;
+
+    /// mark the slot as waiting to be freed and, if possible, free it
+    void free(const sfileno fileno);
+
+    /// open slot for reading, increments read level
+    const Slot *openForReading(const cache_key *const key, sfileno &fileno);
+    /// open slot for reading, increments read level
+    const Slot *openForReadingAt(const sfileno fileno);
+    /// close slot after reading, decrements read level
+    void closeForReading(const sfileno fileno);
+
+    /// called by lock holder to terminate either slot writing or reading
+    void abortIo(const sfileno fileno);
+
+    bool full() const; ///< there are no empty slots left
+    bool valid(int n) const; ///< whether n is a valid slot coordinate
+    int entryCount() const; ///< number of used slots
+    int entryLimit() const; ///< maximum number of slots that can be used
+
+    /// adds approximate current stats to the supplied ones
+    void updateStats(ReadWriteLockStats &stats) const;
+
+protected:
+    class Shared {
+    public:
+        static size_t MemSize(int limit);
+
+        Shared(const int aLimit);
+
+        const AtomicWord limit; ///< maximum number of map slots
+        AtomicWord count; ///< current number of map slots
+
+        Slot slots[]; ///< slots storage
+    };
+
+protected:
+    const String path; ///< cache_dir path, used for logging
+    Ipc::Mem::Segment shm; ///< shared memory segment
+
+private:
+    int slotIndexByKey(const cache_key *const key) const;
+    Slot &slotByKey(const cache_key *const key);
+
+       Slot *openForReading(Slot &s);
+    void abortWriting(const sfileno fileno);
+    void freeIfNeeded(Slot &s);
+    void freeLocked(Slot &s, bool keepLocked);
+    String sharedMemoryName();
+
+    Shared *shared; ///< pointer to shared memory
+};
+
+} // namespace Ipc
+
+// We do not reuse struct _fileMap because we cannot control its size,
+// resulting in sfilenos that are pointing beyond the database.
+
+#endif /* SQUID_IPC_STORE_MAP_H */