2 * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 #ifndef SQUID_IPC_QUEUE_H
10 #define SQUID_IPC_QUEUE_H
12 #include "base/InstanceId.h"
14 #include "ipc/mem/FlexibleArray.h"
15 #include "ipc/mem/Pointer.h"
25 /// State of the reading end of a queue (i.e., of the code calling pop()).
26 /// Multiple queues attached to one reader share this state.
30 QueueReader(); // the initial state is "blocked without a signal"
32 /// whether the reader is waiting for a notification signal
33 bool blocked() const { return popBlocked
.load(); }
35 /// marks the reader as blocked, waiting for a notification signal
36 void block() { popBlocked
.store(true); }
38 /// removes the block() effects
39 void unblock() { popBlocked
.store(false); }
41 /// if reader is blocked and not notified, marks the notification signal
42 /// as sent and not received, returning true; otherwise, returns false
43 bool raiseSignal() { return blocked() && !popSignal
.exchange(true); }
45 /// marks sent reader notification as received (also removes pop blocking)
46 void clearSignal() { unblock(); popSignal
.store(false); }
49 std::atomic
<bool> popBlocked
; ///< whether the reader is blocked on pop()
50 std::atomic
<bool> popSignal
; ///< whether writer has sent and reader has not received notification
53 typedef std::atomic
<int> Rate
; ///< pop()s per second
54 Rate rateLimit
; ///< pop()s per second limit if positive
56 // we need a signed atomic type because balance may get negative
57 typedef std::atomic
<int> AtomicSignedMsec
;
58 typedef AtomicSignedMsec Balance
;
59 /// how far ahead the reader is compared to a perfect read/sec event rate
62 /// unique ID for debugging which reader is used (works across processes)
63 const InstanceId
<QueueReader
> id
;
66 /// shared array of QueueReaders
70 QueueReaders(const int aCapacity
);
71 size_t sharedMemorySize() const;
72 static size_t SharedMemorySize(const int capacity
);
74 const int theCapacity
; /// number of readers
75 Ipc::Mem::FlexibleArray
<QueueReader
> theReaders
; /// readers
79 * Lockless fixed-capacity queue for a single writer and a single reader.
81 * If the queue is empty, the reader is considered "blocked" and needs
82 * an out-of-band notification message to notice the next pushed item.
84 * Current implementation assumes that the writer cannot get blocked: if the
85 * queue is full, the writer will just not push and come back later (with a
86 * different value). We can add support for blocked writers if needed.
88 class OneToOneUniQueue
91 // pop() and push() exceptions; TODO: use TextException instead
93 class ItemTooLarge
{};
95 OneToOneUniQueue(const unsigned int aMaxItemSize
, const int aCapacity
);
97 unsigned int maxItemSize() const { return theMaxItemSize
; }
98 int size() const { return theSize
; }
99 int capacity() const { return theCapacity
; }
100 int sharedMemorySize() const { return Items2Bytes(theMaxItemSize
, theCapacity
); }
102 bool empty() const { return !theSize
; }
103 bool full() const { return theSize
== theCapacity
; }
105 static int Bytes2Items(const unsigned int maxItemSize
, int size
);
106 static int Items2Bytes(const unsigned int maxItemSize
, const int size
);
108 /// returns true iff the value was set; [un]blocks the reader as needed
109 template<class Value
> bool pop(Value
&value
, QueueReader
*const reader
= NULL
);
111 /// returns true iff the caller must notify the reader of the pushed item
112 template<class Value
> bool push(const Value
&value
, QueueReader
*const reader
= NULL
);
114 /// returns true iff the value was set; the value may be stale!
115 template<class Value
> bool peek(Value
&value
) const;
119 unsigned int theIn
; ///< input index, used only in push()
120 unsigned int theOut
; ///< output index, used only in pop()
122 std::atomic
<uint32_t> theSize
; ///< number of items in the queue
123 const unsigned int theMaxItemSize
; ///< maximum item size
124 const uint32_t theCapacity
; ///< maximum number of items, i.e. theBuffer size
129 /// shared array of OneToOneUniQueues
130 class OneToOneUniQueues
133 OneToOneUniQueues(const int aCapacity
, const unsigned int maxItemSize
, const int queueCapacity
);
135 size_t sharedMemorySize() const;
136 static size_t SharedMemorySize(const int capacity
, const unsigned int maxItemSize
, const int queueCapacity
);
138 const OneToOneUniQueue
&operator [](const int index
) const;
139 inline OneToOneUniQueue
&operator [](const int index
);
142 inline const OneToOneUniQueue
&front() const;
145 const int theCapacity
; /// number of OneToOneUniQueues
149 * Base class for lockless fixed-capacity bidirectional queues for a
150 * limited number processes.
155 BaseMultiQueue(const int aLocalProcessId
);
156 virtual ~BaseMultiQueue() {}
158 /// clears the reader notification received by the local process from the remote process
159 void clearReaderSignal(const int remoteProcessId
);
161 /// picks a process and calls OneToOneUniQueue::pop() using its queue
162 template <class Value
> bool pop(int &remoteProcessId
, Value
&value
);
164 /// calls OneToOneUniQueue::push() using the given process queue
165 template <class Value
> bool push(const int remoteProcessId
, const Value
&value
);
167 /// peeks at the item likely to be pop()ed next
168 template<class Value
> bool peek(int &remoteProcessId
, Value
&value
) const;
170 /// returns local reader's balance
171 QueueReader::Balance
&localBalance() { return localReader().balance
; }
173 /// returns reader's balance for a given remote process
174 const QueueReader::Balance
&balance(const int remoteProcessId
) const;
176 /// returns local reader's rate limit
177 QueueReader::Rate
&localRateLimit() { return localReader().rateLimit
; }
179 /// returns reader's rate limit for a given remote process
180 const QueueReader::Rate
&rateLimit(const int remoteProcessId
) const;
182 /// number of items in incoming queue from a given remote process
183 int inSize(const int remoteProcessId
) const { return inQueue(remoteProcessId
).size(); }
185 /// number of items in outgoing queue to a given remote process
186 int outSize(const int remoteProcessId
) const { return outQueue(remoteProcessId
).size(); }
189 /// incoming queue from a given remote process
190 virtual const OneToOneUniQueue
&inQueue(const int remoteProcessId
) const = 0;
191 OneToOneUniQueue
&inQueue(const int remoteProcessId
);
193 /// outgoing queue to a given remote process
194 virtual const OneToOneUniQueue
&outQueue(const int remoteProcessId
) const = 0;
195 OneToOneUniQueue
&outQueue(const int remoteProcessId
);
197 virtual const QueueReader
&localReader() const = 0;
198 QueueReader
&localReader();
200 virtual const QueueReader
&remoteReader(const int remoteProcessId
) const = 0;
201 QueueReader
&remoteReader(const int remoteProcessId
);
203 virtual int remotesCount() const = 0;
204 virtual int remotesIdOffset() const = 0;
207 const int theLocalProcessId
; ///< process ID of this queue
210 int theLastPopProcessId
; ///< the ID of the last process we tried to pop() from
214 * Lockless fixed-capacity bidirectional queue for a limited number
215 * processes. Allows communication between two groups of processes:
216 * any process in one group may send data to and receive from any
217 * process in another group, but processes in the same group can not
218 * communicate. Process in each group has a unique integer ID in
219 * [groupIdOffset, groupIdOffset + groupSize) range.
221 class FewToFewBiQueue
: public BaseMultiQueue
224 typedef OneToOneUniQueue::Full Full
;
225 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge
;
228 /// Shared metadata for FewToFewBiQueue
230 Metadata(const int aGroupASize
, const int aGroupAIdOffset
, const int aGroupBSize
, const int aGroupBIdOffset
);
231 size_t sharedMemorySize() const { return sizeof(*this); }
232 static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata
); }
234 const int theGroupASize
;
235 const int theGroupAIdOffset
;
236 const int theGroupBSize
;
237 const int theGroupBIdOffset
;
244 Owner(const String
&id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
);
248 Mem::Owner
<Metadata
> *const metadataOwner
;
249 Mem::Owner
<OneToOneUniQueues
> *const queuesOwner
;
250 Mem::Owner
<QueueReaders
> *const readersOwner
;
253 static Owner
*Init(const String
&id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
);
255 enum Group
{ groupA
= 0, groupB
= 1 };
256 FewToFewBiQueue(const String
&id
, const Group aLocalGroup
, const int aLocalProcessId
);
258 /// maximum number of items in the queue
259 static int MaxItemsCount(const int groupASize
, const int groupBSize
, const int capacity
);
261 /// finds the oldest item in incoming and outgoing queues between
262 /// us and the given remote process
263 template<class Value
> bool findOldest(const int remoteProcessId
, Value
&value
) const;
266 virtual const OneToOneUniQueue
&inQueue(const int remoteProcessId
) const;
267 virtual const OneToOneUniQueue
&outQueue(const int remoteProcessId
) const;
268 virtual const QueueReader
&localReader() const;
269 virtual const QueueReader
&remoteReader(const int processId
) const;
270 virtual int remotesCount() const;
271 virtual int remotesIdOffset() const;
274 bool validProcessId(const Group group
, const int processId
) const;
275 int oneToOneQueueIndex(const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const;
276 const OneToOneUniQueue
&oneToOneQueue(const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const;
277 int readerIndex(const Group group
, const int processId
) const;
278 Group
localGroup() const { return theLocalGroup
; }
279 Group
remoteGroup() const { return theLocalGroup
== groupA
? groupB
: groupA
; }
282 const Mem::Pointer
<Metadata
> metadata
; ///< shared metadata
283 const Mem::Pointer
<OneToOneUniQueues
> queues
; ///< unidirection one-to-one queues
284 const Mem::Pointer
<QueueReaders
> readers
; ///< readers array
286 const Group theLocalGroup
; ///< group of this queue
290 * Lockless fixed-capacity bidirectional queue for a limited number
291 * processes. Any process may send data to and receive from any other
292 * process (including itself). Each process has a unique integer ID in
293 * [processIdOffset, processIdOffset + processCount) range.
295 class MultiQueue
: public BaseMultiQueue
298 typedef OneToOneUniQueue::Full Full
;
299 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge
;
302 /// Shared metadata for MultiQueue
304 Metadata(const int aProcessCount
, const int aProcessIdOffset
);
305 size_t sharedMemorySize() const { return sizeof(*this); }
306 static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata
); }
308 const int theProcessCount
;
309 const int theProcessIdOffset
;
316 Owner(const String
&id
, const int processCount
, const int processIdOffset
, const unsigned int maxItemSize
, const int capacity
);
320 Mem::Owner
<Metadata
> *const metadataOwner
;
321 Mem::Owner
<OneToOneUniQueues
> *const queuesOwner
;
322 Mem::Owner
<QueueReaders
> *const readersOwner
;
325 static Owner
*Init(const String
&id
, const int processCount
, const int processIdOffset
, const unsigned int maxItemSize
, const int capacity
);
327 MultiQueue(const String
&id
, const int localProcessId
);
330 virtual const OneToOneUniQueue
&inQueue(const int remoteProcessId
) const;
331 virtual const OneToOneUniQueue
&outQueue(const int remoteProcessId
) const;
332 virtual const QueueReader
&localReader() const;
333 virtual const QueueReader
&remoteReader(const int remoteProcessId
) const;
334 virtual int remotesCount() const;
335 virtual int remotesIdOffset() const;
338 bool validProcessId(const int processId
) const;
339 const OneToOneUniQueue
&oneToOneQueue(const int fromProcessId
, const int toProcessId
) const;
340 const QueueReader
&reader(const int processId
) const;
343 const Mem::Pointer
<Metadata
> metadata
; ///< shared metadata
344 const Mem::Pointer
<OneToOneUniQueues
> queues
; ///< unidirection one-to-one queues
345 const Mem::Pointer
<QueueReaders
> readers
; ///< readers array
350 template <class Value
>
352 OneToOneUniQueue::pop(Value
&value
, QueueReader
*const reader
)
354 if (sizeof(value
) > theMaxItemSize
)
355 throw ItemTooLarge();
357 // A writer might push between the empty test and block() below, so we do
358 // not return false right after calling block(), but test again.
364 // A writer might push between the empty test and block() below,
365 // so we must test again as such a writer will not signal us.
373 const unsigned int pos
= (theOut
++ % theCapacity
) * theMaxItemSize
;
374 memcpy(&value
, theBuffer
+ pos
, sizeof(value
));
380 template <class Value
>
382 OneToOneUniQueue::peek(Value
&value
) const
384 if (sizeof(value
) > theMaxItemSize
)
385 throw ItemTooLarge();
390 // the reader may pop() before we copy; making this method imprecise
391 const unsigned int pos
= (theOut
% theCapacity
) * theMaxItemSize
;
392 memcpy(&value
, theBuffer
+ pos
, sizeof(value
));
396 template <class Value
>
398 OneToOneUniQueue::push(const Value
&value
, QueueReader
*const reader
)
400 if (sizeof(value
) > theMaxItemSize
)
401 throw ItemTooLarge();
406 const unsigned int pos
= theIn
++ % theCapacity
* theMaxItemSize
;
407 memcpy(theBuffer
+ pos
, &value
, sizeof(value
));
408 const bool wasEmpty
= !theSize
++;
410 return wasEmpty
&& (!reader
|| reader
->raiseSignal());
415 inline OneToOneUniQueue
&
416 OneToOneUniQueues::operator [](const int index
)
418 return const_cast<OneToOneUniQueue
&>((*const_cast<const OneToOneUniQueues
*>(this))[index
]);
421 inline const OneToOneUniQueue
&
422 OneToOneUniQueues::front() const
424 const char *const queue
=
425 reinterpret_cast<const char *>(this) + sizeof(*this);
426 return *reinterpret_cast<const OneToOneUniQueue
*>(queue
);
431 template <class Value
>
433 BaseMultiQueue::pop(int &remoteProcessId
, Value
&value
)
435 // iterate all remote processes, starting after the one we visited last
436 for (int i
= 0; i
< remotesCount(); ++i
) {
437 if (++theLastPopProcessId
>= remotesIdOffset() + remotesCount())
438 theLastPopProcessId
= remotesIdOffset();
439 OneToOneUniQueue
&queue
= inQueue(theLastPopProcessId
);
440 if (queue
.pop(value
, &localReader())) {
441 remoteProcessId
= theLastPopProcessId
;
442 debugs(54, 7, HERE
<< "popped from " << remoteProcessId
<< " to " << theLocalProcessId
<< " at " << queue
.size());
446 return false; // no process had anything to pop
449 template <class Value
>
451 BaseMultiQueue::push(const int remoteProcessId
, const Value
&value
)
453 OneToOneUniQueue
&remoteQueue
= outQueue(remoteProcessId
);
454 QueueReader
&reader
= remoteReader(remoteProcessId
);
455 debugs(54, 7, HERE
<< "pushing from " << theLocalProcessId
<< " to " << remoteProcessId
<< " at " << remoteQueue
.size());
456 return remoteQueue
.push(value
, &reader
);
459 template <class Value
>
461 BaseMultiQueue::peek(int &remoteProcessId
, Value
&value
) const
463 // mimic FewToFewBiQueue::pop() but quit just before popping
464 int popProcessId
= theLastPopProcessId
; // preserve for future pop()
465 for (int i
= 0; i
< remotesCount(); ++i
) {
466 if (++popProcessId
>= remotesIdOffset() + remotesCount())
467 popProcessId
= remotesIdOffset();
468 const OneToOneUniQueue
&queue
= inQueue(popProcessId
);
469 if (queue
.peek(value
)) {
470 remoteProcessId
= popProcessId
;
474 return false; // most likely, no process had anything to pop
479 template <class Value
>
481 FewToFewBiQueue::findOldest(const int remoteProcessId
, Value
&value
) const
483 // we may be called before remote process configured its queue end
484 if (!validProcessId(remoteGroup(), remoteProcessId
))
487 // we need the oldest value, so start with the incoming, them-to-us queue:
488 const OneToOneUniQueue
&in
= inQueue(remoteProcessId
);
489 debugs(54, 2, HERE
<< "peeking from " << remoteProcessId
<< " to " <<
490 theLocalProcessId
<< " at " << in
.size());
494 // if the incoming queue is empty, check the outgoing, us-to-them queue:
495 const OneToOneUniQueue
&out
= outQueue(remoteProcessId
);
496 debugs(54, 2, HERE
<< "peeking from " << theLocalProcessId
<< " to " <<
497 remoteProcessId
<< " at " << out
.size());
498 return out
.peek(value
);
503 #endif // SQUID_IPC_QUEUE_H