2 * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 #ifndef SQUID_IPC_QUEUE_H
10 #define SQUID_IPC_QUEUE_H
12 #include "base/InstanceId.h"
13 #include "debug/Stream.h"
14 #include "ipc/mem/FlexibleArray.h"
15 #include "ipc/mem/Pointer.h"
26 /// State of the reading end of a queue (i.e., of the code calling pop()).
27 /// Multiple queues attached to one reader share this state.
31 QueueReader(); // the initial state is "blocked without a signal"
33 /// whether the reader is waiting for a notification signal
34 bool blocked() const { return popBlocked
.load(); }
36 /// \copydoc popSignal
37 bool signaled() const { return popSignal
.load(); }
39 /// marks the reader as blocked, waiting for a notification signal
40 void block() { popBlocked
.store(true); }
42 /// removes the block() effects
43 void unblock() { popBlocked
.store(false); }
45 /// if reader is blocked and not notified, marks the notification signal
46 /// as sent and not received, returning true; otherwise, returns false
47 bool raiseSignal() { return blocked() && !popSignal
.exchange(true); }
49 /// marks sent reader notification as received (also removes pop blocking)
50 void clearSignal() { unblock(); popSignal
.store(false); }
53 std::atomic
<bool> popBlocked
; ///< whether the reader is blocked on pop()
54 std::atomic
<bool> popSignal
; ///< whether writer has sent and reader has not received notification
57 typedef std::atomic
<int> Rate
; ///< pop()s per second
58 Rate rateLimit
; ///< pop()s per second limit if positive
60 // we need a signed atomic type because balance may get negative
61 typedef std::atomic
<int> AtomicSignedMsec
;
62 typedef AtomicSignedMsec Balance
;
63 /// how far ahead the reader is compared to a perfect read/sec event rate
66 /// unique ID for debugging which reader is used (works across processes)
67 const InstanceId
<QueueReader
> id
;
70 /// shared array of QueueReaders
74 QueueReaders(const int aCapacity
);
75 size_t sharedMemorySize() const;
76 static size_t SharedMemorySize(const int capacity
);
78 const int theCapacity
; /// number of readers
79 Ipc::Mem::FlexibleArray
<QueueReader
> theReaders
; /// readers
83 * Lockless fixed-capacity queue for a single writer and a single reader.
85 * If the queue is empty, the reader is considered "blocked" and needs
86 * an out-of-band notification message to notice the next pushed item.
88 * Current implementation assumes that the writer cannot get blocked: if the
89 * queue is full, the writer will just not push and come back later (with a
90 * different value). We can add support for blocked writers if needed.
92 class OneToOneUniQueue
95 // pop() and push() exceptions; TODO: use TextException instead
97 class ItemTooLarge
{};
99 OneToOneUniQueue(const unsigned int aMaxItemSize
, const int aCapacity
);
101 unsigned int maxItemSize() const { return theMaxItemSize
; }
102 int size() const { return theSize
; }
103 int capacity() const { return theCapacity
; }
104 int sharedMemorySize() const { return Items2Bytes(theMaxItemSize
, theCapacity
); }
106 bool empty() const { return !theSize
; }
107 bool full() const { return theSize
== theCapacity
; }
109 static int Bytes2Items(const unsigned int maxItemSize
, int size
);
110 static int Items2Bytes(const unsigned int maxItemSize
, const int size
);
112 /// returns true iff the value was set; [un]blocks the reader as needed
113 template<class Value
> bool pop(Value
&value
, QueueReader
*const reader
= nullptr);
115 /// returns true iff the caller must notify the reader of the pushed item
116 template<class Value
> bool push(const Value
&value
, QueueReader
*const reader
= nullptr);
118 /// returns true iff the value was set; the value may be stale!
119 template<class Value
> bool peek(Value
&value
) const;
121 /// prints incoming queue state; suitable for cache manager reports
122 template<class Value
> void statIn(std::ostream
&, int localProcessId
, int remoteProcessId
) const;
123 /// prints outgoing queue state; suitable for cache manager reports
124 template<class Value
> void statOut(std::ostream
&, int localProcessId
, int remoteProcessId
) const;
127 void statOpen(std::ostream
&, const char *inLabel
, const char *outLabel
, uint32_t count
) const;
128 void statClose(std::ostream
&) const;
129 template<class Value
> void statSamples(std::ostream
&, unsigned int start
, uint32_t size
) const;
130 template<class Value
> void statRange(std::ostream
&, unsigned int start
, uint32_t n
) const;
132 // optimization: these non-std::atomic data members are in shared memory,
133 // but each is used only by one process (aside from obscured reporting)
134 unsigned int theIn
; ///< current push() position; reporting aside, used only in push()
135 unsigned int theOut
; ///< current pop() position; reporting aside, used only in pop()/peek()
137 std::atomic
<uint32_t> theSize
; ///< number of items in the queue
138 const unsigned int theMaxItemSize
; ///< maximum item size
139 const uint32_t theCapacity
; ///< maximum number of items, i.e. theBuffer size
144 /// shared array of OneToOneUniQueues
145 class OneToOneUniQueues
148 OneToOneUniQueues(const int aCapacity
, const unsigned int maxItemSize
, const int queueCapacity
);
150 size_t sharedMemorySize() const;
151 static size_t SharedMemorySize(const int capacity
, const unsigned int maxItemSize
, const int queueCapacity
);
153 const OneToOneUniQueue
&operator [](const int index
) const;
154 inline OneToOneUniQueue
&operator [](const int index
);
157 inline const OneToOneUniQueue
&front() const;
160 const int theCapacity
; /// number of OneToOneUniQueues
164 * Base class for lockless fixed-capacity bidirectional queues for a
165 * limited number processes.
170 BaseMultiQueue(const int aLocalProcessId
);
171 virtual ~BaseMultiQueue() {}
173 /// clears the reader notification received by the local process from the remote process
174 void clearReaderSignal(const int remoteProcessId
);
176 /// clears all reader notifications received by the local process
177 void clearAllReaderSignals();
179 /// picks a process and calls OneToOneUniQueue::pop() using its queue
180 template <class Value
> bool pop(int &remoteProcessId
, Value
&value
);
182 /// calls OneToOneUniQueue::push() using the given process queue
183 template <class Value
> bool push(const int remoteProcessId
, const Value
&value
);
185 /// peeks at the item likely to be pop()ed next
186 template<class Value
> bool peek(int &remoteProcessId
, Value
&value
) const;
188 /// prints current state; suitable for cache manager reports
189 template<class Value
> void stat(std::ostream
&) const;
191 /// returns local reader's balance
192 QueueReader::Balance
&localBalance() { return localReader().balance
; }
194 /// returns reader's balance for a given remote process
195 const QueueReader::Balance
&balance(const int remoteProcessId
) const;
197 /// returns local reader's rate limit
198 QueueReader::Rate
&localRateLimit() { return localReader().rateLimit
; }
200 /// returns reader's rate limit for a given remote process
201 const QueueReader::Rate
&rateLimit(const int remoteProcessId
) const;
203 /// number of items in incoming queue from a given remote process
204 int inSize(const int remoteProcessId
) const { return inQueue(remoteProcessId
).size(); }
206 /// number of items in outgoing queue to a given remote process
207 int outSize(const int remoteProcessId
) const { return outQueue(remoteProcessId
).size(); }
210 /// incoming queue from a given remote process
211 virtual const OneToOneUniQueue
&inQueue(const int remoteProcessId
) const = 0;
212 OneToOneUniQueue
&inQueue(const int remoteProcessId
);
214 /// outgoing queue to a given remote process
215 virtual const OneToOneUniQueue
&outQueue(const int remoteProcessId
) const = 0;
216 OneToOneUniQueue
&outQueue(const int remoteProcessId
);
218 virtual const QueueReader
&localReader() const = 0;
219 QueueReader
&localReader();
221 virtual const QueueReader
&remoteReader(const int remoteProcessId
) const = 0;
222 QueueReader
&remoteReader(const int remoteProcessId
);
224 virtual int remotesCount() const = 0;
225 virtual int remotesIdOffset() const = 0;
228 const int theLocalProcessId
; ///< process ID of this queue
231 int theLastPopProcessId
; ///< the ID of the last process we tried to pop() from
235 * Lockless fixed-capacity bidirectional queue for a limited number
236 * processes. Allows communication between two groups of processes:
237 * any process in one group may send data to and receive from any
238 * process in another group, but processes in the same group can not
239 * communicate. Process in each group has a unique integer ID in
240 * [groupIdOffset, groupIdOffset + groupSize) range.
242 class FewToFewBiQueue
: public BaseMultiQueue
245 typedef OneToOneUniQueue::Full Full
;
246 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge
;
249 /// Shared metadata for FewToFewBiQueue
251 Metadata(const int aGroupASize
, const int aGroupAIdOffset
, const int aGroupBSize
, const int aGroupBIdOffset
);
252 size_t sharedMemorySize() const { return sizeof(*this); }
253 static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata
); }
255 const int theGroupASize
;
256 const int theGroupAIdOffset
;
257 const int theGroupBSize
;
258 const int theGroupBIdOffset
;
265 Owner(const String
&id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
);
269 Mem::Owner
<Metadata
> *const metadataOwner
;
270 Mem::Owner
<OneToOneUniQueues
> *const queuesOwner
;
271 Mem::Owner
<QueueReaders
> *const readersOwner
;
274 static Owner
*Init(const String
&id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
);
276 enum Group
{ groupA
= 0, groupB
= 1 };
277 FewToFewBiQueue(const String
&id
, const Group aLocalGroup
, const int aLocalProcessId
);
279 /// maximum number of items in the queue
280 static int MaxItemsCount(const int groupASize
, const int groupBSize
, const int capacity
);
282 /// finds the oldest item in incoming and outgoing queues between
283 /// us and the given remote process
284 template<class Value
> bool findOldest(const int remoteProcessId
, Value
&value
) const;
287 virtual const OneToOneUniQueue
&inQueue(const int remoteProcessId
) const;
288 virtual const OneToOneUniQueue
&outQueue(const int remoteProcessId
) const;
289 virtual const QueueReader
&localReader() const;
290 virtual const QueueReader
&remoteReader(const int processId
) const;
291 virtual int remotesCount() const;
292 virtual int remotesIdOffset() const;
295 bool validProcessId(const Group group
, const int processId
) const;
296 int oneToOneQueueIndex(const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const;
297 const OneToOneUniQueue
&oneToOneQueue(const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const;
298 int readerIndex(const Group group
, const int processId
) const;
299 Group
localGroup() const { return theLocalGroup
; }
300 Group
remoteGroup() const { return theLocalGroup
== groupA
? groupB
: groupA
; }
303 const Mem::Pointer
<Metadata
> metadata
; ///< shared metadata
304 const Mem::Pointer
<OneToOneUniQueues
> queues
; ///< unidirection one-to-one queues
305 const Mem::Pointer
<QueueReaders
> readers
; ///< readers array
307 const Group theLocalGroup
; ///< group of this queue
311 * Lockless fixed-capacity bidirectional queue for a limited number
312 * processes. Any process may send data to and receive from any other
313 * process (including itself). Each process has a unique integer ID in
314 * [processIdOffset, processIdOffset + processCount) range.
316 class MultiQueue
: public BaseMultiQueue
319 typedef OneToOneUniQueue::Full Full
;
320 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge
;
323 /// Shared metadata for MultiQueue
325 Metadata(const int aProcessCount
, const int aProcessIdOffset
);
326 size_t sharedMemorySize() const { return sizeof(*this); }
327 static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata
); }
329 const int theProcessCount
;
330 const int theProcessIdOffset
;
337 Owner(const String
&id
, const int processCount
, const int processIdOffset
, const unsigned int maxItemSize
, const int capacity
);
341 Mem::Owner
<Metadata
> *const metadataOwner
;
342 Mem::Owner
<OneToOneUniQueues
> *const queuesOwner
;
343 Mem::Owner
<QueueReaders
> *const readersOwner
;
346 static Owner
*Init(const String
&id
, const int processCount
, const int processIdOffset
, const unsigned int maxItemSize
, const int capacity
);
348 MultiQueue(const String
&id
, const int localProcessId
);
351 virtual const OneToOneUniQueue
&inQueue(const int remoteProcessId
) const;
352 virtual const OneToOneUniQueue
&outQueue(const int remoteProcessId
) const;
353 virtual const QueueReader
&localReader() const;
354 virtual const QueueReader
&remoteReader(const int remoteProcessId
) const;
355 virtual int remotesCount() const;
356 virtual int remotesIdOffset() const;
359 bool validProcessId(const int processId
) const;
360 const OneToOneUniQueue
&oneToOneQueue(const int fromProcessId
, const int toProcessId
) const;
361 const QueueReader
&reader(const int processId
) const;
364 const Mem::Pointer
<Metadata
> metadata
; ///< shared metadata
365 const Mem::Pointer
<OneToOneUniQueues
> queues
; ///< unidirection one-to-one queues
366 const Mem::Pointer
<QueueReaders
> readers
; ///< readers array
371 template <class Value
>
373 OneToOneUniQueue::pop(Value
&value
, QueueReader
*const reader
)
375 if (sizeof(value
) > theMaxItemSize
)
376 throw ItemTooLarge();
378 // A writer might push between the empty test and block() below, so we do
379 // not return false right after calling block(), but test again.
385 // A writer might push between the empty test and block() below,
386 // so we must test again as such a writer will not signal us.
394 const unsigned int pos
= (theOut
++ % theCapacity
) * theMaxItemSize
;
395 memcpy(&value
, theBuffer
+ pos
, sizeof(value
));
401 template <class Value
>
403 OneToOneUniQueue::peek(Value
&value
) const
405 if (sizeof(value
) > theMaxItemSize
)
406 throw ItemTooLarge();
411 // the reader may pop() before we copy; making this method imprecise
412 const unsigned int pos
= (theOut
% theCapacity
) * theMaxItemSize
;
413 memcpy(&value
, theBuffer
+ pos
, sizeof(value
));
417 template <class Value
>
419 OneToOneUniQueue::push(const Value
&value
, QueueReader
*const reader
)
421 if (sizeof(value
) > theMaxItemSize
)
422 throw ItemTooLarge();
427 const unsigned int pos
= theIn
++ % theCapacity
* theMaxItemSize
;
428 memcpy(theBuffer
+ pos
, &value
, sizeof(value
));
429 const bool wasEmpty
= !theSize
++;
431 return wasEmpty
&& (!reader
|| reader
->raiseSignal());
434 template <class Value
>
436 OneToOneUniQueue::statIn(std::ostream
&os
, const int localProcessId
, const int remoteProcessId
) const
438 os
<< " kid" << localProcessId
<< " receiving from kid" << remoteProcessId
<< ": ";
439 // Nobody can modify our theOut so, after capturing some valid theSize value
440 // in count, we can reliably report all [theOut, theOut+count) items that
441 // were queued at theSize capturing time. We will miss new items push()ed by
442 // the other side, but it is OK -- we report state at the capturing time.
443 const auto count
= theSize
.load();
444 statOpen(os
, "other", "popIndex", count
);
445 statSamples
<Value
>(os
, theOut
, count
);
449 template <class Value
>
451 OneToOneUniQueue::statOut(std::ostream
&os
, const int localProcessId
, const int remoteProcessId
) const
453 os
<< " kid" << localProcessId
<< " sending to kid" << remoteProcessId
<< ": ";
454 // Nobody can modify our theIn so, after capturing some valid theSize value
455 // in count, we can reliably report all [theIn-count, theIn) items that were
456 // queued at theSize capturing time. We may report items already pop()ed by
457 // the other side, but that is OK because pop() does not modify items -- it
458 // only increments theOut.
459 const auto count
= theSize
.load();
460 statOpen(os
, "pushIndex", "other", count
);
461 statSamples
<Value
>(os
, theIn
- count
, count
); // unsigned offset underflow OK
465 /// report a sample of [start, start + size) items
466 template <class Value
>
468 OneToOneUniQueue::statSamples(std::ostream
&os
, const unsigned int start
, const uint32_t count
) const
475 os
<< ", items: [\n";
476 // report a few leading and trailing items, without repetitions
477 const auto sampleSize
= std::min(3U, count
); // leading (and max) sample
478 statRange
<Value
>(os
, start
, sampleSize
);
479 if (sampleSize
< count
) { // the first sample did not show some items
480 // The `start` offset aside, the first sample reported all items
481 // below the sampleSize offset. The second sample needs to report
482 // the last sampleSize items (i.e. starting at count-sampleSize
483 // offset) except those already reported by the first sample.
484 const auto secondSampleOffset
= std::max(sampleSize
, count
- sampleSize
);
485 const auto secondSampleSize
= std::min(sampleSize
, count
- sampleSize
);
487 // but first we print a sample separator, unless there are no items
488 // between the samples or the separator hides the only unsampled item
489 const auto bothSamples
= sampleSize
+ secondSampleSize
;
490 if (bothSamples
+ 1U == count
)
491 statRange
<Value
>(os
, start
+ sampleSize
, 1);
492 else if (count
> bothSamples
)
493 os
<< " # ... " << (count
- bothSamples
) << " items not shown ...\n";
495 statRange
<Value
>(os
, start
+ secondSampleOffset
, secondSampleSize
);
500 /// statSamples() helper that reports n items from start
501 template <class Value
>
503 OneToOneUniQueue::statRange(std::ostream
&os
, const unsigned int start
, const uint32_t n
) const
505 assert(sizeof(Value
) <= theMaxItemSize
);
507 for (uint32_t i
= 0; i
< n
; ++i
) {
508 // XXX: Throughout this C++ header, these overflow wrapping tricks work
509 // only because theCapacity currently happens to be a power of 2 (e.g.,
510 // the highest offset (0xF...FFF) % 3 is 0 and so is the next offset).
511 const auto pos
= (offset
++ % theCapacity
) * theMaxItemSize
;
513 memcpy(&value
, theBuffer
+ pos
, sizeof(value
));
522 inline OneToOneUniQueue
&
523 OneToOneUniQueues::operator [](const int index
)
525 return const_cast<OneToOneUniQueue
&>((*const_cast<const OneToOneUniQueues
*>(this))[index
]);
528 inline const OneToOneUniQueue
&
529 OneToOneUniQueues::front() const
531 const char *const queue
=
532 reinterpret_cast<const char *>(this) + sizeof(*this);
533 return *reinterpret_cast<const OneToOneUniQueue
*>(queue
);
538 template <class Value
>
540 BaseMultiQueue::pop(int &remoteProcessId
, Value
&value
)
542 // iterate all remote processes, starting after the one we visited last
543 for (int i
= 0; i
< remotesCount(); ++i
) {
544 if (++theLastPopProcessId
>= remotesIdOffset() + remotesCount())
545 theLastPopProcessId
= remotesIdOffset();
546 OneToOneUniQueue
&queue
= inQueue(theLastPopProcessId
);
547 if (queue
.pop(value
, &localReader())) {
548 remoteProcessId
= theLastPopProcessId
;
549 debugs(54, 7, "popped from " << remoteProcessId
<< " to " << theLocalProcessId
<< " at " << queue
.size());
553 return false; // no process had anything to pop
556 template <class Value
>
558 BaseMultiQueue::push(const int remoteProcessId
, const Value
&value
)
560 OneToOneUniQueue
&remoteQueue
= outQueue(remoteProcessId
);
561 QueueReader
&reader
= remoteReader(remoteProcessId
);
562 debugs(54, 7, "pushing from " << theLocalProcessId
<< " to " << remoteProcessId
<< " at " << remoteQueue
.size());
563 return remoteQueue
.push(value
, &reader
);
566 template <class Value
>
568 BaseMultiQueue::peek(int &remoteProcessId
, Value
&value
) const
570 // mimic FewToFewBiQueue::pop() but quit just before popping
571 int popProcessId
= theLastPopProcessId
; // preserve for future pop()
572 for (int i
= 0; i
< remotesCount(); ++i
) {
573 if (++popProcessId
>= remotesIdOffset() + remotesCount())
574 popProcessId
= remotesIdOffset();
575 const OneToOneUniQueue
&queue
= inQueue(popProcessId
);
576 if (queue
.peek(value
)) {
577 remoteProcessId
= popProcessId
;
581 return false; // most likely, no process had anything to pop
584 template <class Value
>
586 BaseMultiQueue::stat(std::ostream
&os
) const
588 for (int processId
= remotesIdOffset(); processId
< remotesIdOffset() + remotesCount(); ++processId
) {
589 const auto &queue
= inQueue(processId
);
590 queue
.statIn
<Value
>(os
, theLocalProcessId
, processId
);
595 for (int processId
= remotesIdOffset(); processId
< remotesIdOffset() + remotesCount(); ++processId
) {
596 const auto &queue
= outQueue(processId
);
597 queue
.statOut
<Value
>(os
, theLocalProcessId
, processId
);
602 const auto &reader
= localReader();
603 os
<< " kid" << theLocalProcessId
<< " reader flags: " <<
604 "{ blocked: " << reader
.blocked() << ", signaled: " << reader
.signaled() << " }\n";
609 template <class Value
>
611 FewToFewBiQueue::findOldest(const int remoteProcessId
, Value
&value
) const
613 // we may be called before remote process configured its queue end
614 if (!validProcessId(remoteGroup(), remoteProcessId
))
617 // we need the oldest value, so start with the incoming, them-to-us queue:
618 const OneToOneUniQueue
&in
= inQueue(remoteProcessId
);
619 debugs(54, 2, "peeking from " << remoteProcessId
<< " to " <<
620 theLocalProcessId
<< " at " << in
.size());
624 // if the incoming queue is empty, check the outgoing, us-to-them queue:
625 const OneToOneUniQueue
&out
= outQueue(remoteProcessId
);
626 debugs(54, 2, "peeking from " << theLocalProcessId
<< " to " <<
627 remoteProcessId
<< " at " << out
.size());
628 return out
.peek(value
);
633 #endif // SQUID_IPC_QUEUE_H