]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Optimized the number of "queue is no longer empty" IpcIo notifications.
authorAlex Rousskov <rousskov@measurement-factory.com>
Tue, 19 Apr 2011 04:31:53 +0000 (22:31 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Tue, 19 Apr 2011 04:31:53 +0000 (22:31 -0600)
The original code relied on the writer (pusher) knowledge to decide when a
notification is needed. That code was simpler but it resulted in many
pointless notifications because the reader could have been busy processing the
last popped item and would have checked the queue after that processing
anyway. This would become especially wasteful when the reader pops multiple
requests before processing them (e.g. to do "elevator" seek optimization).

The intermediate implementation (not comitted) placed the reader state in
each queue. That was still fairly simple and worked OK, but it was not
addressing the needs of the disker readers. Diskers have many incoming
queues. If at least one incoming queue has requests, the disker is not
blocked and does not need a notification.

The last implementation allows all incoming queues of a single disker to share
the reader/disker state. The reader state is disassociated from the single
queue.  There is still some wasteful state updates when multiple queues are
iterated in FewToOneBiQueue::pop(), but their overheads should be very minor.
We need to figure out whether a single shared reader state can also be used
for workers though (each worker also has many incoming queues...).

Also added debugging and a few XXXs/TODOs to mark future work items.

src/DiskIO/IpcIo/IpcIoFile.cc
src/DiskIO/IpcIo/IpcIoFile.h
src/ipc/Queue.cc
src/ipc/Queue.h

index 1de66ab9dfb04ada59782e908f469e4568729847..11a3bd15dca4dae44e2e6ef626a80177514ed17f 100644 (file)
 CBDATA_CLASS_INIT(IpcIoFile);
 
 IpcIoFile::DiskerQueue *IpcIoFile::diskerQueue = NULL;
-const double IpcIoFile::Timeout = 7;
+const double IpcIoFile::Timeout = 7; // seconds;  XXX: ALL,9 may require more
 IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
 IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
 
 static bool DiskerOpen(const String &path, int flags, mode_t mode);
 static void DiskerClose(const String &path);
 
+/// IpcIo wrapper for debugs() streams; XXX: find a better class name
+struct SipcIo {
+    SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
+        worker(aWorker), msg(aMsg), disker(aDisker) {}
+
+    int worker;
+    const IpcIoMsg &msg;
+    int disker;
+};
+
+std::ostream &
+operator <<(std::ostream &os, const SipcIo &sio)
+{
+    return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
+        (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker;
+}
+
 
 IpcIoFile::IpcIoFile(char const *aDb):
     dbName(aDb), diskId(-1), workerQueue(NULL), error_(false), lastRequestId(0),
@@ -99,6 +116,7 @@ IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) {
     } else {
         diskId = response->strand.kidId;
         if (diskId >= 0) {
+            // XXX: Remove this +-1 math! FewToOneBiQueue API must use kid IDs.
             workerQueue = DiskerQueue::Attach(dbName, KidIdentifier - 1);
             const bool inserted =
                 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
@@ -267,6 +285,9 @@ IpcIoFile::trackPendingRequest(IpcIoPendingRequest *const pending)
 void
 IpcIoFile::push(IpcIoPendingRequest *const pending)
 {
+    // prevent queue overflows: check for responses to earlier requests
+    handleResponses("before push");
+
     debugs(47, 7, HERE);
     Must(diskId >= 0);
     Must(workerQueue);
@@ -289,13 +310,18 @@ IpcIoFile::push(IpcIoPendingRequest *const pending)
         memcpy(ipcIo.buf, pending->writeRequest->buf, ipcIo.len); // optimize away
     }
 
+    debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId) << " at " << workerQueue->pushQueue->size());
+
     try {
         if (workerQueue->push(ipcIo))
-            Notify(diskId); // notify disker
+            Notify(diskId); // must notify disker
         trackPendingRequest(pending);
     } catch (const WorkerQueue::Full &) {
-        // XXX: grow queue size?
-        pending->completeIo(NULL);
+        debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " <<
+               SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
+        // TODO: grow queue size
+
+        pending->completeIo(NULL); // XXX: should distinguish this from timeout
         delete pending;
     }
 }
@@ -320,24 +346,31 @@ IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
 }
 
 void
-IpcIoFile::handleResponses()
+IpcIoFile::handleNotification()
 {
+    debugs(47, 4, HERE << "notified");
+    workerQueue->clearReaderSignal();
+    handleResponses("after notification");
+}
+
+void
+IpcIoFile::handleResponses(const char *when)
+{
+    debugs(47, 4, HERE << "popping all " << when);
     Must(workerQueue);
-    try {
-        while (true) {
-            IpcIoMsg ipcIo;
-            workerQueue->pop(ipcIo); // XXX: notify disker?
-            handleResponse(ipcIo);
-        }
-    } catch (const WorkerQueue::Empty &) {}
+    IpcIoMsg ipcIo;
+    // get all responses we can: since we are not pushing, this will stop
+    while (workerQueue->pop(ipcIo))
+        handleResponse(ipcIo);
 }
 
 void
 IpcIoFile::handleResponse(const IpcIoMsg &ipcIo)
 {
     const int requestId = ipcIo.requestId;
-    debugs(47, 7, HERE << "disker response to " << ipcIo.command <<
-           "; ipcIo" << KidIdentifier << '.' << requestId);
+    debugs(47, 7, HERE << "popped disker response: " <<
+        SipcIo(KidIdentifier, ipcIo, diskId)  << " at " << workerQueue->popQueue->size());
+
     Must(requestId);
     if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
         pending->completeIo(&ipcIo);
@@ -352,6 +385,7 @@ IpcIoFile::handleResponse(const IpcIoMsg &ipcIo)
 void
 IpcIoFile::Notify(const int peerId)
 {
+    // TODO: Count and report the total number of notifications, pops, pushes.
     debugs(47, 7, HERE << "kid" << peerId);
     Ipc::TypedMsgHdr msg;
     msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
@@ -363,14 +397,16 @@ IpcIoFile::Notify(const int peerId)
 void
 IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
 {
-    debugs(47, 7, HERE);
-    if (IamDiskProcess())
-        DiskerHandleRequests();
-    else {
-        const int diskId = msg.getInt();
+    const int from = msg.getInt();
+    debugs(47, 7, HERE << "from " << from);
+    if (IamDiskProcess()) {
+        const int workerId = from;
+        DiskerHandleRequests(workerId - 1);
+    } else {
+        const int diskId = from;
         const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
-        Must(i != IpcIoFiles.end()); // XXX: warn and continue?
-        i->second->handleResponses();
+        Must(i != IpcIoFiles.end()); // TODO: warn but continue
+        i->second->handleNotification();
     }
 }
 
@@ -551,17 +587,22 @@ diskerWrite(IpcIoMsg &ipcIo)
 }
 
 void
-IpcIoFile::DiskerHandleRequests()
+IpcIoFile::DiskerHandleRequests(const int workerWhoNotified)
 {
     Must(diskerQueue);
-    try {
-        while (true) {
-            int workerId;
-            IpcIoMsg ipcIo;
-            diskerQueue->pop(workerId, ipcIo); // XXX: notify worker?
-            DiskerHandleRequest(workerId, ipcIo);
-        }
-    } catch (const DiskerQueue::Empty &) {}
+    diskerQueue->clearReaderSignal(workerWhoNotified);
+
+    int workerId = 0;
+    IpcIoMsg ipcIo;
+    while (diskerQueue->pop(workerId, ipcIo))
+        DiskerHandleRequest(workerId, ipcIo);
+
+    // TODO: If the loop keeps on looping, we probably should take a break
+    // once in a while to update clock, read Coordinator messages, etc. 
+    // This can be combined with "elevator" optimization where we get up to N
+    // requests first, then reorder the popped requests to optimize seek time,
+    // then do I/O, then take a break, and come back for the next set of I/O
+    // requests.
 }
 
 /// called when disker receives an I/O request
@@ -587,11 +628,20 @@ IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
     else // ipcIo.command == IpcIo::cmdWrite
         diskerWrite(ipcIo);
 
+    debugs(47, 7, HERE << "pushing " << SipcIo(workerId+1, ipcIo, KidIdentifier) << " at " << diskerQueue->biQueues[workerId]->pushQueue->size());
+
     try {
         if (diskerQueue->push(workerId, ipcIo))
-            Notify(workerId + 1); // notify worker
+            Notify(workerId + 1); // must notify worker
     } catch (const DiskerQueue::Full &) {
-        // XXX: grow queue size?
+        // The worker queue should not overflow because the worker should pop()
+        // before push()ing and because if disker pops N requests at a time,
+        // we should make sure the worker pop() queue length is the worker
+        // push queue length plus N+1. XXX: implement the N+1 difference.
+        debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue overflow: " <<
+               SipcIo(workerId+1, ipcIo, KidIdentifier)); // TODO: report queue len
+
+        // the I/O request we could not push will timeout
     }
 }
 
index d4c24df794097694a0b445fbc25924f881e3cf7e..8d66e495d2114209f3cfc05585e208de188819cd 100644 (file)
@@ -85,10 +85,11 @@ private:
     void checkTimeouts();
     void scheduleTimeoutCheck();
 
-    void handleResponses();
+    void handleNotification();
+    void handleResponses(const char *when);
     void handleResponse(const IpcIoMsg &ipcIo);
 
-    static void DiskerHandleRequests();
+    static void DiskerHandleRequests(const int workerId);
     static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
 
 private:
index e86fe37ac31b7417ba7d3bd2b62f3f6ff85e6bf2..c192ff3ec212e83fc44800891048881aa4faa95e 100644 (file)
@@ -6,9 +6,12 @@
  */
 
 #include "config.h"
+#include "base/TextException.h"
+#include "Debug.h"
+#include "globals.h"
 #include "ipc/Queue.h"
 
-
+/// constructs shared segment ID from parent queue ID and child queue index
 static String
 QueueId(String id, const int idx)
 {
@@ -17,24 +20,50 @@ QueueId(String id, const int idx)
     return id;
 }
 
+/// constructs QueueReader ID from parent queue ID
+static String
+ReaderId(String id)
+{
+    id.append("__readers");
+    return id;
+}
+
+
+/* QueueReader */
+
+InstanceIdDefinitions(QueueReader, "ipcQR");
+
+QueueReader::QueueReader(): popBlocked(1), popSignal(0)
+{
+    debugs(54, 7, HERE << "constructed " << id);
+}
+
 
 // OneToOneUniQueue
 
 OneToOneUniQueue::OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity):
-    shm(id.termedBuf())
+    shm(id.termedBuf()), reader_(NULL)
 {
-    shm.create(Items2Bytes(maxItemSize, capacity));
-    assert(shm.mem());
-    shared = new (shm.mem()) Shared(maxItemSize, capacity);
+    const int sharedSize = Items2Bytes(maxItemSize, capacity);
+    shm.create(sharedSize);
+    shared = new (shm.reserve(sharedSize)) Shared(maxItemSize, capacity);
 }
 
-OneToOneUniQueue::OneToOneUniQueue(const String &id): shm(id.termedBuf())
+OneToOneUniQueue::OneToOneUniQueue(const String &id): shm(id.termedBuf()),
+    reader_(NULL)
 {
     shm.open();
     shared = reinterpret_cast<Shared *>(shm.mem());
     assert(shared);
 }
 
+void
+OneToOneUniQueue::reader(QueueReader *aReader)
+{
+    Must(!reader_ && aReader);
+    reader_ = aReader;
+}
+
 int
 OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
 {
@@ -56,6 +85,13 @@ OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCap
 {
 }
 
+QueueReader &
+OneToOneUniQueue::reader()
+{
+    Must(reader_);
+    return *reader_;
+}
+
 
 // OneToOneBiQueue
 
@@ -71,17 +107,42 @@ OneToOneBiQueue::OneToOneBiQueue(const String &id):
 {
 }
 
+void
+OneToOneBiQueue::readers(QueueReader *r1, QueueReader *r2)
+{
+    popQueue->reader(r1);
+    pushQueue->reader(r2);
+}
+
+void
+OneToOneBiQueue::clearReaderSignal()
+{
+    debugs(54, 7, HERE << "reader: " << &popQueue->reader());
+    popQueue->reader().clearSignal();
+}
+
 
 // FewToOneBiQueue
 
 FewToOneBiQueue::FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity):
-    theLastPopWorkerId(-1), theWorkerCount(aWorkerCount)
+    theLastPopWorker(0), theWorkerCount(aWorkerCount),
+    shm(ReaderId(id).termedBuf()),
+    reader(NULL)
 {
+    // create a new segment for the local and remote queue readers
+    // TODO: all our queues and readers should use a single segment
+    shm.create((theWorkerCount+1)*sizeof(QueueReader));
+    reader = new (shm.reserve(sizeof(QueueReader))) QueueReader;
+    debugs(54, 7, HERE << "disker " << id << " reader: " << reader->id);
+
     assert(theWorkerCount >= 0);
     biQueues.reserve(theWorkerCount);
     for (int i = 0; i < theWorkerCount; ++i) {
         OneToOneBiQueue *const biQueue =
             new OneToOneBiQueue(QueueId(id, i), maxItemSize, capacity);
+        QueueReader *remoteReader =
+            new (shm.reserve(sizeof(QueueReader))) QueueReader;
+        biQueue->readers(reader, remoteReader);
         biQueues.push_back(biQueue);
     }
 }
@@ -89,7 +150,22 @@ FewToOneBiQueue::FewToOneBiQueue(const String &id, const int aWorkerCount, const
 OneToOneBiQueue *
 FewToOneBiQueue::Attach(const String &id, const int workerId)
 {
-    return new OneToOneBiQueue(QueueId(id, workerId));
+    // XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker?
+    Ipc::Mem::Segment *shmPtr = new Ipc::Mem::Segment(ReaderId(id).termedBuf());
+
+    Ipc::Mem::Segment &shm = *shmPtr;
+    shm.open();
+    assert(shm.size() >= static_cast<off_t>((1 + workerId+1)*sizeof(QueueReader)));
+    QueueReader *readers = reinterpret_cast<QueueReader*>(shm.mem());
+    QueueReader *remoteReader = &readers[0];
+    debugs(54, 7, HERE << "disker " << id << " reader: " << remoteReader->id);
+    QueueReader *localReader = &readers[workerId+1];
+    debugs(54, 7, HERE << "local " << id << " reader: " << localReader->id);
+
+    OneToOneBiQueue *const biQueue =
+        new OneToOneBiQueue(QueueId(id, workerId));
+    biQueue->readers(localReader, remoteReader);
+    return biQueue;
 }
 
 FewToOneBiQueue::~FewToOneBiQueue()
@@ -102,3 +178,17 @@ bool FewToOneBiQueue::validWorkerId(const int workerId) const
 {
     return 0 <= workerId && workerId < theWorkerCount;
 }
+
+void
+FewToOneBiQueue::clearReaderSignal(int workerId)
+{
+    debugs(54, 7, HERE << "reader: " << reader->id);
+
+    assert(validWorkerId(workerId));
+    reader->clearSignal();
+
+    // we got a hint; we could reposition iteration to try popping from the
+    // workerId queue first; but it does not seem to help much and might
+    // introduce some bias so we do not do that for now:
+    // theLastPopWorker = (workerId + theWorkerCount - 1) % theWorkerCount;
+}
index ff62eac462b2b0a1bec69715e4bef48858ef69cb..a766d103d556783a95e2b302d5a83cda80c30d0e 100644 (file)
@@ -7,16 +7,58 @@
 #define SQUID_IPC_QUEUE_H
 
 #include "Array.h"
+#include "base/InstanceId.h"
 #include "ipc/AtomicWord.h"
 #include "ipc/mem/Segment.h"
 #include "util.h"
 
 class String;
 
-/// Lockless fixed-capacity queue for a single writer and a single reader.
+/// State of the reading end of a queue (i.e., of the code calling pop()).
+/// Multiple queues attached to one reader share this state.
+class QueueReader {
+public:
+    QueueReader(); // the initial state is "blocked without a signal"
+
+    /// whether the reader is waiting for a notification signal
+    bool blocked() const { return popBlocked == 1; }
+
+    /// marks the reader as blocked, waiting for a notification signal
+    void block() { popBlocked.swap_if(0, 1); }
+
+    /// removes the block() effects
+    void unblock() { popBlocked.swap_if(1, 0); }
+
+    /// if reader is blocked and not notified, marks the notification signal
+    /// as sent and not received, returning true; otherwise, returns false
+    bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); }
+
+    /// marks sent reader notification as received (also removes pop blocking)
+    void clearSignal() { unblock(); popSignal.swap_if(1,0); }
+
+private:
+    AtomicWord popBlocked; ///< whether the reader is blocked on pop()
+    AtomicWord popSignal; ///< whether writer has sent and reader has not received notification
+
+public:
+    /// unique ID for debugging which reader is used (works across processes)
+    const InstanceId<QueueReader> id;
+};
+
+
+/**
+ * Lockless fixed-capacity queue for a single writer and a single reader.
+ *
+ * If the queue is empty, the reader is considered "blocked" and needs
+ * an out-of-band notification message to notice the next pushed item.
+ *
+ * Current implementation assumes that the writer cannot get blocked: if the
+ * queue is full, the writer will just not push and come back later (with a
+ * different value). We can add support for blocked writers if needed.
+ */
 class OneToOneUniQueue {
 public:
-    class Empty {};
+    // pop() and push() exceptions; TODO: use TextException instead
     class Full {};
     class ItemTooLarge {};
 
@@ -33,10 +75,14 @@ public:
     static int Bytes2Items(const unsigned int maxItemSize, int size);
     static int Items2Bytes(const unsigned int maxItemSize, const int size);
 
-    template <class Value>
-    bool pop(Value &value); ///< returns true iff the queue was full
-    template <class Value>
-    bool push(const Value &value); ///< returns true iff the queue was empty
+    /// returns true iff the value was set; [un]blocks the reader as needed
+    template<class Value> bool pop(Value &value);
+
+    /// returns true iff the caller must notify the reader of the pushed item
+    template<class Value> bool push(const Value &value);
+
+    QueueReader &reader();
+    void reader(QueueReader *aReader);
 
 private:
     struct Shared {
@@ -54,12 +100,12 @@ private:
 
     Ipc::Mem::Segment shm; ///< shared memory segment
     Shared *shared; ///< pointer to shared memory
+    QueueReader *reader_; ///< the state of the code popping from this queue
 };
 
 /// Lockless fixed-capacity bidirectional queue for two processes.
 class OneToOneBiQueue {
 public:
-    typedef OneToOneUniQueue::Empty Empty;
     typedef OneToOneUniQueue::Full Full;
     typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
 
@@ -67,12 +113,14 @@ public:
     OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity);
     OneToOneBiQueue(const String &id); ///< Attach to existing shared queue.
 
-    template <class Value>
-    bool pop(Value &value) { return popQueue->pop(value); }
-    template <class Value>
-    bool push(const Value &value) { return pushQueue->push(value); }
+    void readers(QueueReader *r1, QueueReader *r2);
+    void clearReaderSignal();
 
-private:
+    /* wrappers to call the right OneToOneUniQueue method for this process */
+    template<class Value> bool pop(Value &value) { return popQueue->pop(value); }
+    template<class Value> bool push(const Value &value) { return pushQueue->push(value); }
+
+//private:
     OneToOneUniQueue *const popQueue; ///< queue to pop from for this process
     OneToOneUniQueue *const pushQueue; ///< queue to push to for this process
 };
@@ -87,7 +135,6 @@ private:
  */
 class FewToOneBiQueue {
 public:
-    typedef OneToOneBiQueue::Empty Empty;
     typedef OneToOneBiQueue::Full Full;
     typedef OneToOneBiQueue::ItemTooLarge ItemTooLarge;
 
@@ -98,15 +145,22 @@ public:
     bool validWorkerId(const int workerId) const;
     int workerCount() const { return theWorkerCount; }
 
-    template <class Value>
-    bool pop(int &workerId, Value &value); ///< returns false iff the queue was full
-    template <class Value>
-    bool push(const int workerId, const Value &value); ///< returns false iff the queue was empty
+    /// clears the reader notification received by the disker from worker
+    void clearReaderSignal(int workerId);
 
-private:
-    int theLastPopWorkerId; ///< the last worker ID we pop()ed from
-    Vector<OneToOneBiQueue *> biQueues; ///< worker queues
-    const int theWorkerCount; ///< number of worker processes
+    /// picks a worker and calls OneToOneUniQueue::pop() using its queue
+    template <class Value> bool pop(int &workerId, Value &value);
+
+    /// calls OneToOneUniQueue::push() using the given worker queue
+    template <class Value> bool push(const int workerId, const Value &value);
+
+//private: XXX: make private by moving pop/push debugging into pop/push
+    int theLastPopWorker; ///< the ID of the last worker we tried to pop() from
+    Vector<OneToOneBiQueue *> biQueues; ///< worker queues indexed by worker ID
+    const int theWorkerCount; ///< the total number of workers
+
+    Ipc::Mem::Segment shm; ///< shared memory segment to store the reader
+    QueueReader *reader; ///< the state of the code popping from all biQueues
 };
 
 
@@ -119,15 +173,23 @@ OneToOneUniQueue::pop(Value &value)
     if (sizeof(value) > shared->theMaxItemSize)
         throw ItemTooLarge();
 
-    if (empty())
-        throw Empty();
+    // A writer might push between the empty test and block() below, so we do
+    // not return false right after calling block(), but test again.
+    if (empty()) {
+        reader().block();
+        // A writer might push between the empty test and block() below,
+        // so we must test again as such a writer will not signal us.
+        if (empty())
+            return false;
+    }
 
-    const bool wasFull = full();
+    reader().unblock();
     const unsigned int pos =
-        shared->theOut++ % shared->theCapacity * shared->theMaxItemSize;
+        (shared->theOut++ % shared->theCapacity) * shared->theMaxItemSize;
     memcpy(&value, shared->theBuffer + pos, sizeof(value));
     --shared->theSize;
-    return wasFull;
+
+    return true;
 }
 
 template <class Value>
@@ -145,7 +207,8 @@ OneToOneUniQueue::push(const Value &value)
         shared->theIn++ % shared->theCapacity * shared->theMaxItemSize;
     memcpy(shared->theBuffer + pos, &value, sizeof(value));
     ++shared->theSize;
-    return wasEmpty;
+
+    return wasEmpty && reader().raiseSignal();
 }
 
 
@@ -155,16 +218,15 @@ template <class Value>
 bool
 FewToOneBiQueue::pop(int &workerId, Value &value)
 {
-    ++theLastPopWorkerId;
+    // iterate all workers, starting after the one we visited last
     for (int i = 0; i < theWorkerCount; ++i) {
-        theLastPopWorkerId = (theLastPopWorkerId + 1) % theWorkerCount;
-        try {
-            const bool wasFull = biQueues[theLastPopWorkerId]->pop(value);
-            workerId = theLastPopWorkerId;
-            return wasFull;
-        } catch (const Empty &) {}
+        theLastPopWorker = (theLastPopWorker + 1) % theWorkerCount;
+        if (biQueues[theLastPopWorker]->pop(value)) {
+            workerId = theLastPopWorker;
+            return true;
+        }
     }
-    throw Empty();
+    return false; // no worker had anything to pop
 }
 
 template <class Value>