From: Alex Rousskov Date: Tue, 19 Apr 2011 04:31:53 +0000 (-0600) Subject: Optimized the number of "queue is no longer empty" IpcIo notifications. X-Git-Tag: take06~12 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=fa61cefe6486fe4f7ff291af66f2d182fcebf88a;p=thirdparty%2Fsquid.git Optimized the number of "queue is no longer empty" IpcIo notifications. The original code relied on the writer (pusher) knowledge to decide when a notification is needed. That code was simpler but it resulted in many pointless notifications because the reader could have been busy processing the last popped item and would have checked the queue after that processing anyway. This would become especially wasteful when the reader pops multiple requests before processing them (e.g. to do "elevator" seek optimization). The intermediate implementation (not comitted) placed the reader state in each queue. That was still fairly simple and worked OK, but it was not addressing the needs of the disker readers. Diskers have many incoming queues. If at least one incoming queue has requests, the disker is not blocked and does not need a notification. The last implementation allows all incoming queues of a single disker to share the reader/disker state. The reader state is disassociated from the single queue. There is still some wasteful state updates when multiple queues are iterated in FewToOneBiQueue::pop(), but their overheads should be very minor. We need to figure out whether a single shared reader state can also be used for workers though (each worker also has many incoming queues...). Also added debugging and a few XXXs/TODOs to mark future work items. --- diff --git a/src/DiskIO/IpcIo/IpcIoFile.cc b/src/DiskIO/IpcIo/IpcIoFile.cc index 1de66ab9df..11a3bd15dc 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.cc +++ b/src/DiskIO/IpcIo/IpcIoFile.cc @@ -18,13 +18,30 @@ CBDATA_CLASS_INIT(IpcIoFile); IpcIoFile::DiskerQueue *IpcIoFile::diskerQueue = NULL; -const double IpcIoFile::Timeout = 7; +const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen; IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles; static bool DiskerOpen(const String &path, int flags, mode_t mode); static void DiskerClose(const String &path); +/// IpcIo wrapper for debugs() streams; XXX: find a better class name +struct SipcIo { + SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker): + worker(aWorker), msg(aMsg), disker(aDisker) {} + + int worker; + const IpcIoMsg &msg; + int disker; +}; + +std::ostream & +operator <<(std::ostream &os, const SipcIo &sio) +{ + return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId << + (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker; +} + IpcIoFile::IpcIoFile(char const *aDb): dbName(aDb), diskId(-1), workerQueue(NULL), error_(false), lastRequestId(0), @@ -99,6 +116,7 @@ IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) { } else { diskId = response->strand.kidId; if (diskId >= 0) { + // XXX: Remove this +-1 math! FewToOneBiQueue API must use kid IDs. workerQueue = DiskerQueue::Attach(dbName, KidIdentifier - 1); const bool inserted = IpcIoFiles.insert(std::make_pair(diskId, this)).second; @@ -267,6 +285,9 @@ IpcIoFile::trackPendingRequest(IpcIoPendingRequest *const pending) void IpcIoFile::push(IpcIoPendingRequest *const pending) { + // prevent queue overflows: check for responses to earlier requests + handleResponses("before push"); + debugs(47, 7, HERE); Must(diskId >= 0); Must(workerQueue); @@ -289,13 +310,18 @@ IpcIoFile::push(IpcIoPendingRequest *const pending) memcpy(ipcIo.buf, pending->writeRequest->buf, ipcIo.len); // optimize away } + debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId) << " at " << workerQueue->pushQueue->size()); + try { if (workerQueue->push(ipcIo)) - Notify(diskId); // notify disker + Notify(diskId); // must notify disker trackPendingRequest(pending); } catch (const WorkerQueue::Full &) { - // XXX: grow queue size? - pending->completeIo(NULL); + debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " << + SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len + // TODO: grow queue size + + pending->completeIo(NULL); // XXX: should distinguish this from timeout delete pending; } } @@ -320,24 +346,31 @@ IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response) } void -IpcIoFile::handleResponses() +IpcIoFile::handleNotification() { + debugs(47, 4, HERE << "notified"); + workerQueue->clearReaderSignal(); + handleResponses("after notification"); +} + +void +IpcIoFile::handleResponses(const char *when) +{ + debugs(47, 4, HERE << "popping all " << when); Must(workerQueue); - try { - while (true) { - IpcIoMsg ipcIo; - workerQueue->pop(ipcIo); // XXX: notify disker? - handleResponse(ipcIo); - } - } catch (const WorkerQueue::Empty &) {} + IpcIoMsg ipcIo; + // get all responses we can: since we are not pushing, this will stop + while (workerQueue->pop(ipcIo)) + handleResponse(ipcIo); } void IpcIoFile::handleResponse(const IpcIoMsg &ipcIo) { const int requestId = ipcIo.requestId; - debugs(47, 7, HERE << "disker response to " << ipcIo.command << - "; ipcIo" << KidIdentifier << '.' << requestId); + debugs(47, 7, HERE << "popped disker response: " << + SipcIo(KidIdentifier, ipcIo, diskId) << " at " << workerQueue->popQueue->size()); + Must(requestId); if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) { pending->completeIo(&ipcIo); @@ -352,6 +385,7 @@ IpcIoFile::handleResponse(const IpcIoMsg &ipcIo) void IpcIoFile::Notify(const int peerId) { + // TODO: Count and report the total number of notifications, pops, pushes. debugs(47, 7, HERE << "kid" << peerId); Ipc::TypedMsgHdr msg; msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type? @@ -363,14 +397,16 @@ IpcIoFile::Notify(const int peerId) void IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg) { - debugs(47, 7, HERE); - if (IamDiskProcess()) - DiskerHandleRequests(); - else { - const int diskId = msg.getInt(); + const int from = msg.getInt(); + debugs(47, 7, HERE << "from " << from); + if (IamDiskProcess()) { + const int workerId = from; + DiskerHandleRequests(workerId - 1); + } else { + const int diskId = from; const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId); - Must(i != IpcIoFiles.end()); // XXX: warn and continue? - i->second->handleResponses(); + Must(i != IpcIoFiles.end()); // TODO: warn but continue + i->second->handleNotification(); } } @@ -551,17 +587,22 @@ diskerWrite(IpcIoMsg &ipcIo) } void -IpcIoFile::DiskerHandleRequests() +IpcIoFile::DiskerHandleRequests(const int workerWhoNotified) { Must(diskerQueue); - try { - while (true) { - int workerId; - IpcIoMsg ipcIo; - diskerQueue->pop(workerId, ipcIo); // XXX: notify worker? - DiskerHandleRequest(workerId, ipcIo); - } - } catch (const DiskerQueue::Empty &) {} + diskerQueue->clearReaderSignal(workerWhoNotified); + + int workerId = 0; + IpcIoMsg ipcIo; + while (diskerQueue->pop(workerId, ipcIo)) + DiskerHandleRequest(workerId, ipcIo); + + // TODO: If the loop keeps on looping, we probably should take a break + // once in a while to update clock, read Coordinator messages, etc. + // This can be combined with "elevator" optimization where we get up to N + // requests first, then reorder the popped requests to optimize seek time, + // then do I/O, then take a break, and come back for the next set of I/O + // requests. } /// called when disker receives an I/O request @@ -587,11 +628,20 @@ IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo) else // ipcIo.command == IpcIo::cmdWrite diskerWrite(ipcIo); + debugs(47, 7, HERE << "pushing " << SipcIo(workerId+1, ipcIo, KidIdentifier) << " at " << diskerQueue->biQueues[workerId]->pushQueue->size()); + try { if (diskerQueue->push(workerId, ipcIo)) - Notify(workerId + 1); // notify worker + Notify(workerId + 1); // must notify worker } catch (const DiskerQueue::Full &) { - // XXX: grow queue size? + // The worker queue should not overflow because the worker should pop() + // before push()ing and because if disker pops N requests at a time, + // we should make sure the worker pop() queue length is the worker + // push queue length plus N+1. XXX: implement the N+1 difference. + debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue overflow: " << + SipcIo(workerId+1, ipcIo, KidIdentifier)); // TODO: report queue len + + // the I/O request we could not push will timeout } } diff --git a/src/DiskIO/IpcIo/IpcIoFile.h b/src/DiskIO/IpcIo/IpcIoFile.h index d4c24df794..8d66e495d2 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.h +++ b/src/DiskIO/IpcIo/IpcIoFile.h @@ -85,10 +85,11 @@ private: void checkTimeouts(); void scheduleTimeoutCheck(); - void handleResponses(); + void handleNotification(); + void handleResponses(const char *when); void handleResponse(const IpcIoMsg &ipcIo); - static void DiskerHandleRequests(); + static void DiskerHandleRequests(const int workerId); static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo); private: diff --git a/src/ipc/Queue.cc b/src/ipc/Queue.cc index e86fe37ac3..c192ff3ec2 100644 --- a/src/ipc/Queue.cc +++ b/src/ipc/Queue.cc @@ -6,9 +6,12 @@ */ #include "config.h" +#include "base/TextException.h" +#include "Debug.h" +#include "globals.h" #include "ipc/Queue.h" - +/// constructs shared segment ID from parent queue ID and child queue index static String QueueId(String id, const int idx) { @@ -17,24 +20,50 @@ QueueId(String id, const int idx) return id; } +/// constructs QueueReader ID from parent queue ID +static String +ReaderId(String id) +{ + id.append("__readers"); + return id; +} + + +/* QueueReader */ + +InstanceIdDefinitions(QueueReader, "ipcQR"); + +QueueReader::QueueReader(): popBlocked(1), popSignal(0) +{ + debugs(54, 7, HERE << "constructed " << id); +} + // OneToOneUniQueue OneToOneUniQueue::OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity): - shm(id.termedBuf()) + shm(id.termedBuf()), reader_(NULL) { - shm.create(Items2Bytes(maxItemSize, capacity)); - assert(shm.mem()); - shared = new (shm.mem()) Shared(maxItemSize, capacity); + const int sharedSize = Items2Bytes(maxItemSize, capacity); + shm.create(sharedSize); + shared = new (shm.reserve(sharedSize)) Shared(maxItemSize, capacity); } -OneToOneUniQueue::OneToOneUniQueue(const String &id): shm(id.termedBuf()) +OneToOneUniQueue::OneToOneUniQueue(const String &id): shm(id.termedBuf()), + reader_(NULL) { shm.open(); shared = reinterpret_cast(shm.mem()); assert(shared); } +void +OneToOneUniQueue::reader(QueueReader *aReader) +{ + Must(!reader_ && aReader); + reader_ = aReader; +} + int OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size) { @@ -56,6 +85,13 @@ OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCap { } +QueueReader & +OneToOneUniQueue::reader() +{ + Must(reader_); + return *reader_; +} + // OneToOneBiQueue @@ -71,17 +107,42 @@ OneToOneBiQueue::OneToOneBiQueue(const String &id): { } +void +OneToOneBiQueue::readers(QueueReader *r1, QueueReader *r2) +{ + popQueue->reader(r1); + pushQueue->reader(r2); +} + +void +OneToOneBiQueue::clearReaderSignal() +{ + debugs(54, 7, HERE << "reader: " << &popQueue->reader()); + popQueue->reader().clearSignal(); +} + // FewToOneBiQueue FewToOneBiQueue::FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity): - theLastPopWorkerId(-1), theWorkerCount(aWorkerCount) + theLastPopWorker(0), theWorkerCount(aWorkerCount), + shm(ReaderId(id).termedBuf()), + reader(NULL) { + // create a new segment for the local and remote queue readers + // TODO: all our queues and readers should use a single segment + shm.create((theWorkerCount+1)*sizeof(QueueReader)); + reader = new (shm.reserve(sizeof(QueueReader))) QueueReader; + debugs(54, 7, HERE << "disker " << id << " reader: " << reader->id); + assert(theWorkerCount >= 0); biQueues.reserve(theWorkerCount); for (int i = 0; i < theWorkerCount; ++i) { OneToOneBiQueue *const biQueue = new OneToOneBiQueue(QueueId(id, i), maxItemSize, capacity); + QueueReader *remoteReader = + new (shm.reserve(sizeof(QueueReader))) QueueReader; + biQueue->readers(reader, remoteReader); biQueues.push_back(biQueue); } } @@ -89,7 +150,22 @@ FewToOneBiQueue::FewToOneBiQueue(const String &id, const int aWorkerCount, const OneToOneBiQueue * FewToOneBiQueue::Attach(const String &id, const int workerId) { - return new OneToOneBiQueue(QueueId(id, workerId)); + // XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker? + Ipc::Mem::Segment *shmPtr = new Ipc::Mem::Segment(ReaderId(id).termedBuf()); + + Ipc::Mem::Segment &shm = *shmPtr; + shm.open(); + assert(shm.size() >= static_cast((1 + workerId+1)*sizeof(QueueReader))); + QueueReader *readers = reinterpret_cast(shm.mem()); + QueueReader *remoteReader = &readers[0]; + debugs(54, 7, HERE << "disker " << id << " reader: " << remoteReader->id); + QueueReader *localReader = &readers[workerId+1]; + debugs(54, 7, HERE << "local " << id << " reader: " << localReader->id); + + OneToOneBiQueue *const biQueue = + new OneToOneBiQueue(QueueId(id, workerId)); + biQueue->readers(localReader, remoteReader); + return biQueue; } FewToOneBiQueue::~FewToOneBiQueue() @@ -102,3 +178,17 @@ bool FewToOneBiQueue::validWorkerId(const int workerId) const { return 0 <= workerId && workerId < theWorkerCount; } + +void +FewToOneBiQueue::clearReaderSignal(int workerId) +{ + debugs(54, 7, HERE << "reader: " << reader->id); + + assert(validWorkerId(workerId)); + reader->clearSignal(); + + // we got a hint; we could reposition iteration to try popping from the + // workerId queue first; but it does not seem to help much and might + // introduce some bias so we do not do that for now: + // theLastPopWorker = (workerId + theWorkerCount - 1) % theWorkerCount; +} diff --git a/src/ipc/Queue.h b/src/ipc/Queue.h index ff62eac462..a766d103d5 100644 --- a/src/ipc/Queue.h +++ b/src/ipc/Queue.h @@ -7,16 +7,58 @@ #define SQUID_IPC_QUEUE_H #include "Array.h" +#include "base/InstanceId.h" #include "ipc/AtomicWord.h" #include "ipc/mem/Segment.h" #include "util.h" class String; -/// Lockless fixed-capacity queue for a single writer and a single reader. +/// State of the reading end of a queue (i.e., of the code calling pop()). +/// Multiple queues attached to one reader share this state. +class QueueReader { +public: + QueueReader(); // the initial state is "blocked without a signal" + + /// whether the reader is waiting for a notification signal + bool blocked() const { return popBlocked == 1; } + + /// marks the reader as blocked, waiting for a notification signal + void block() { popBlocked.swap_if(0, 1); } + + /// removes the block() effects + void unblock() { popBlocked.swap_if(1, 0); } + + /// if reader is blocked and not notified, marks the notification signal + /// as sent and not received, returning true; otherwise, returns false + bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); } + + /// marks sent reader notification as received (also removes pop blocking) + void clearSignal() { unblock(); popSignal.swap_if(1,0); } + +private: + AtomicWord popBlocked; ///< whether the reader is blocked on pop() + AtomicWord popSignal; ///< whether writer has sent and reader has not received notification + +public: + /// unique ID for debugging which reader is used (works across processes) + const InstanceId id; +}; + + +/** + * Lockless fixed-capacity queue for a single writer and a single reader. + * + * If the queue is empty, the reader is considered "blocked" and needs + * an out-of-band notification message to notice the next pushed item. + * + * Current implementation assumes that the writer cannot get blocked: if the + * queue is full, the writer will just not push and come back later (with a + * different value). We can add support for blocked writers if needed. + */ class OneToOneUniQueue { public: - class Empty {}; + // pop() and push() exceptions; TODO: use TextException instead class Full {}; class ItemTooLarge {}; @@ -33,10 +75,14 @@ public: static int Bytes2Items(const unsigned int maxItemSize, int size); static int Items2Bytes(const unsigned int maxItemSize, const int size); - template - bool pop(Value &value); ///< returns true iff the queue was full - template - bool push(const Value &value); ///< returns true iff the queue was empty + /// returns true iff the value was set; [un]blocks the reader as needed + template bool pop(Value &value); + + /// returns true iff the caller must notify the reader of the pushed item + template bool push(const Value &value); + + QueueReader &reader(); + void reader(QueueReader *aReader); private: struct Shared { @@ -54,12 +100,12 @@ private: Ipc::Mem::Segment shm; ///< shared memory segment Shared *shared; ///< pointer to shared memory + QueueReader *reader_; ///< the state of the code popping from this queue }; /// Lockless fixed-capacity bidirectional queue for two processes. class OneToOneBiQueue { public: - typedef OneToOneUniQueue::Empty Empty; typedef OneToOneUniQueue::Full Full; typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; @@ -67,12 +113,14 @@ public: OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity); OneToOneBiQueue(const String &id); ///< Attach to existing shared queue. - template - bool pop(Value &value) { return popQueue->pop(value); } - template - bool push(const Value &value) { return pushQueue->push(value); } + void readers(QueueReader *r1, QueueReader *r2); + void clearReaderSignal(); -private: + /* wrappers to call the right OneToOneUniQueue method for this process */ + template bool pop(Value &value) { return popQueue->pop(value); } + template bool push(const Value &value) { return pushQueue->push(value); } + +//private: OneToOneUniQueue *const popQueue; ///< queue to pop from for this process OneToOneUniQueue *const pushQueue; ///< queue to push to for this process }; @@ -87,7 +135,6 @@ private: */ class FewToOneBiQueue { public: - typedef OneToOneBiQueue::Empty Empty; typedef OneToOneBiQueue::Full Full; typedef OneToOneBiQueue::ItemTooLarge ItemTooLarge; @@ -98,15 +145,22 @@ public: bool validWorkerId(const int workerId) const; int workerCount() const { return theWorkerCount; } - template - bool pop(int &workerId, Value &value); ///< returns false iff the queue was full - template - bool push(const int workerId, const Value &value); ///< returns false iff the queue was empty + /// clears the reader notification received by the disker from worker + void clearReaderSignal(int workerId); -private: - int theLastPopWorkerId; ///< the last worker ID we pop()ed from - Vector biQueues; ///< worker queues - const int theWorkerCount; ///< number of worker processes + /// picks a worker and calls OneToOneUniQueue::pop() using its queue + template bool pop(int &workerId, Value &value); + + /// calls OneToOneUniQueue::push() using the given worker queue + template bool push(const int workerId, const Value &value); + +//private: XXX: make private by moving pop/push debugging into pop/push + int theLastPopWorker; ///< the ID of the last worker we tried to pop() from + Vector biQueues; ///< worker queues indexed by worker ID + const int theWorkerCount; ///< the total number of workers + + Ipc::Mem::Segment shm; ///< shared memory segment to store the reader + QueueReader *reader; ///< the state of the code popping from all biQueues }; @@ -119,15 +173,23 @@ OneToOneUniQueue::pop(Value &value) if (sizeof(value) > shared->theMaxItemSize) throw ItemTooLarge(); - if (empty()) - throw Empty(); + // A writer might push between the empty test and block() below, so we do + // not return false right after calling block(), but test again. + if (empty()) { + reader().block(); + // A writer might push between the empty test and block() below, + // so we must test again as such a writer will not signal us. + if (empty()) + return false; + } - const bool wasFull = full(); + reader().unblock(); const unsigned int pos = - shared->theOut++ % shared->theCapacity * shared->theMaxItemSize; + (shared->theOut++ % shared->theCapacity) * shared->theMaxItemSize; memcpy(&value, shared->theBuffer + pos, sizeof(value)); --shared->theSize; - return wasFull; + + return true; } template @@ -145,7 +207,8 @@ OneToOneUniQueue::push(const Value &value) shared->theIn++ % shared->theCapacity * shared->theMaxItemSize; memcpy(shared->theBuffer + pos, &value, sizeof(value)); ++shared->theSize; - return wasEmpty; + + return wasEmpty && reader().raiseSignal(); } @@ -155,16 +218,15 @@ template bool FewToOneBiQueue::pop(int &workerId, Value &value) { - ++theLastPopWorkerId; + // iterate all workers, starting after the one we visited last for (int i = 0; i < theWorkerCount; ++i) { - theLastPopWorkerId = (theLastPopWorkerId + 1) % theWorkerCount; - try { - const bool wasFull = biQueues[theLastPopWorkerId]->pop(value); - workerId = theLastPopWorkerId; - return wasFull; - } catch (const Empty &) {} + theLastPopWorker = (theLastPopWorker + 1) % theWorkerCount; + if (biQueues[theLastPopWorker]->pop(value)) { + workerId = theLastPopWorker; + return true; + } } - throw Empty(); + return false; // no worker had anything to pop } template