2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 #ifndef SQUID_IPC_QUEUE_H
10 #define SQUID_IPC_QUEUE_H
12 #include "base/InstanceId.h"
14 #include "ipc/AtomicWord.h"
15 #include "ipc/mem/FlexibleArray.h"
16 #include "ipc/mem/Pointer.h"
24 /// State of the reading end of a queue (i.e., of the code calling pop()).
25 /// Multiple queues attached to one reader share this state.
29 QueueReader(); // the initial state is "blocked without a signal"
31 /// whether the reader is waiting for a notification signal
32 bool blocked() const { return popBlocked
== 1; }
34 /// marks the reader as blocked, waiting for a notification signal
35 void block() { popBlocked
.swap_if(0, 1); }
37 /// removes the block() effects
38 void unblock() { popBlocked
.swap_if(1, 0); }
40 /// if reader is blocked and not notified, marks the notification signal
41 /// as sent and not received, returning true; otherwise, returns false
42 bool raiseSignal() { return blocked() && popSignal
.swap_if(0,1); }
44 /// marks sent reader notification as received (also removes pop blocking)
45 void clearSignal() { unblock(); popSignal
.swap_if(1,0); }
48 Atomic::Word popBlocked
; ///< whether the reader is blocked on pop()
49 Atomic::Word popSignal
; ///< whether writer has sent and reader has not received notification
52 typedef Atomic::Word Rate
; ///< pop()s per second
53 Rate rateLimit
; ///< pop()s per second limit if positive
55 // we need a signed atomic type because balance may get negative
56 typedef Atomic::WordT
<int> AtomicSignedMsec
;
57 typedef AtomicSignedMsec Balance
;
58 /// how far ahead the reader is compared to a perfect read/sec event rate
61 /// unique ID for debugging which reader is used (works across processes)
62 const InstanceId
<QueueReader
> id
;
65 /// shared array of QueueReaders
69 QueueReaders(const int aCapacity
);
70 size_t sharedMemorySize() const;
71 static size_t SharedMemorySize(const int capacity
);
73 const int theCapacity
; /// number of readers
74 Ipc::Mem::FlexibleArray
<QueueReader
> theReaders
; /// readers
78 * Lockless fixed-capacity queue for a single writer and a single reader.
80 * If the queue is empty, the reader is considered "blocked" and needs
81 * an out-of-band notification message to notice the next pushed item.
83 * Current implementation assumes that the writer cannot get blocked: if the
84 * queue is full, the writer will just not push and come back later (with a
85 * different value). We can add support for blocked writers if needed.
87 class OneToOneUniQueue
90 // pop() and push() exceptions; TODO: use TextException instead
92 class ItemTooLarge
{};
94 OneToOneUniQueue(const unsigned int aMaxItemSize
, const int aCapacity
);
96 unsigned int maxItemSize() const { return theMaxItemSize
; }
97 int size() const { return theSize
; }
98 int capacity() const { return theCapacity
; }
99 int sharedMemorySize() const { return Items2Bytes(theMaxItemSize
, theCapacity
); }
101 bool empty() const { return !theSize
; }
102 bool full() const { return theSize
== theCapacity
; }
104 static int Bytes2Items(const unsigned int maxItemSize
, int size
);
105 static int Items2Bytes(const unsigned int maxItemSize
, const int size
);
107 /// returns true iff the value was set; [un]blocks the reader as needed
108 template<class Value
> bool pop(Value
&value
, QueueReader
*const reader
= NULL
);
110 /// returns true iff the caller must notify the reader of the pushed item
111 template<class Value
> bool push(const Value
&value
, QueueReader
*const reader
= NULL
);
113 /// returns true iff the value was set; the value may be stale!
114 template<class Value
> bool peek(Value
&value
) const;
118 unsigned int theIn
; ///< input index, used only in push()
119 unsigned int theOut
; ///< output index, used only in pop()
121 Atomic::Word theSize
; ///< number of items in the queue
122 const unsigned int theMaxItemSize
; ///< maximum item size
123 const int theCapacity
; ///< maximum number of items, i.e. theBuffer size
128 /// shared array of OneToOneUniQueues
129 class OneToOneUniQueues
132 OneToOneUniQueues(const int aCapacity
, const unsigned int maxItemSize
, const int queueCapacity
);
134 size_t sharedMemorySize() const;
135 static size_t SharedMemorySize(const int capacity
, const unsigned int maxItemSize
, const int queueCapacity
);
137 const OneToOneUniQueue
&operator [](const int index
) const;
138 inline OneToOneUniQueue
&operator [](const int index
);
141 inline const OneToOneUniQueue
&front() const;
144 const int theCapacity
; /// number of OneToOneUniQueues
148 * Base class for lockless fixed-capacity bidirectional queues for a
149 * limited number processes.
154 BaseMultiQueue(const int aLocalProcessId
);
155 virtual ~BaseMultiQueue() {}
157 /// clears the reader notification received by the local process from the remote process
158 void clearReaderSignal(const int remoteProcessId
);
160 /// picks a process and calls OneToOneUniQueue::pop() using its queue
161 template <class Value
> bool pop(int &remoteProcessId
, Value
&value
);
163 /// calls OneToOneUniQueue::push() using the given process queue
164 template <class Value
> bool push(const int remoteProcessId
, const Value
&value
);
166 /// peeks at the item likely to be pop()ed next
167 template<class Value
> bool peek(int &remoteProcessId
, Value
&value
) const;
169 /// returns local reader's balance
170 QueueReader::Balance
&localBalance() { return localReader().balance
; }
172 /// returns reader's balance for a given remote process
173 const QueueReader::Balance
&balance(const int remoteProcessId
) const;
175 /// returns local reader's rate limit
176 QueueReader::Rate
&localRateLimit() { return localReader().rateLimit
; }
178 /// returns reader's rate limit for a given remote process
179 const QueueReader::Rate
&rateLimit(const int remoteProcessId
) const;
181 /// number of items in incoming queue from a given remote process
182 int inSize(const int remoteProcessId
) const { return inQueue(remoteProcessId
).size(); }
184 /// number of items in outgoing queue to a given remote process
185 int outSize(const int remoteProcessId
) const { return outQueue(remoteProcessId
).size(); }
188 /// incoming queue from a given remote process
189 virtual const OneToOneUniQueue
&inQueue(const int remoteProcessId
) const = 0;
190 OneToOneUniQueue
&inQueue(const int remoteProcessId
);
192 /// outgoing queue to a given remote process
193 virtual const OneToOneUniQueue
&outQueue(const int remoteProcessId
) const = 0;
194 OneToOneUniQueue
&outQueue(const int remoteProcessId
);
196 virtual const QueueReader
&localReader() const = 0;
197 QueueReader
&localReader();
199 virtual const QueueReader
&remoteReader(const int remoteProcessId
) const = 0;
200 QueueReader
&remoteReader(const int remoteProcessId
);
202 virtual int remotesCount() const = 0;
203 virtual int remotesIdOffset() const = 0;
206 const int theLocalProcessId
; ///< process ID of this queue
209 int theLastPopProcessId
; ///< the ID of the last process we tried to pop() from
213 * Lockless fixed-capacity bidirectional queue for a limited number
214 * processes. Allows communication between two groups of processes:
215 * any process in one group may send data to and receive from any
216 * process in another group, but processes in the same group can not
217 * communicate. Process in each group has a unique integer ID in
218 * [groupIdOffset, groupIdOffset + groupSize) range.
220 class FewToFewBiQueue
: public BaseMultiQueue
223 typedef OneToOneUniQueue::Full Full
;
224 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge
;
227 /// Shared metadata for FewToFewBiQueue
229 Metadata(const int aGroupASize
, const int aGroupAIdOffset
, const int aGroupBSize
, const int aGroupBIdOffset
);
230 size_t sharedMemorySize() const { return sizeof(*this); }
231 static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata
); }
233 const int theGroupASize
;
234 const int theGroupAIdOffset
;
235 const int theGroupBSize
;
236 const int theGroupBIdOffset
;
243 Owner(const String
&id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
);
247 Mem::Owner
<Metadata
> *const metadataOwner
;
248 Mem::Owner
<OneToOneUniQueues
> *const queuesOwner
;
249 Mem::Owner
<QueueReaders
> *const readersOwner
;
252 static Owner
*Init(const String
&id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
);
254 enum Group
{ groupA
= 0, groupB
= 1 };
255 FewToFewBiQueue(const String
&id
, const Group aLocalGroup
, const int aLocalProcessId
);
257 /// maximum number of items in the queue
258 static int MaxItemsCount(const int groupASize
, const int groupBSize
, const int capacity
);
260 /// finds the oldest item in incoming and outgoing queues between
261 /// us and the given remote process
262 template<class Value
> bool findOldest(const int remoteProcessId
, Value
&value
) const;
265 virtual const OneToOneUniQueue
&inQueue(const int remoteProcessId
) const;
266 virtual const OneToOneUniQueue
&outQueue(const int remoteProcessId
) const;
267 virtual const QueueReader
&localReader() const;
268 virtual const QueueReader
&remoteReader(const int processId
) const;
269 virtual int remotesCount() const;
270 virtual int remotesIdOffset() const;
273 bool validProcessId(const Group group
, const int processId
) const;
274 int oneToOneQueueIndex(const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const;
275 const OneToOneUniQueue
&oneToOneQueue(const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const;
276 int readerIndex(const Group group
, const int processId
) const;
277 Group
localGroup() const { return theLocalGroup
; }
278 Group
remoteGroup() const { return theLocalGroup
== groupA
? groupB
: groupA
; }
281 const Mem::Pointer
<Metadata
> metadata
; ///< shared metadata
282 const Mem::Pointer
<OneToOneUniQueues
> queues
; ///< unidirection one-to-one queues
283 const Mem::Pointer
<QueueReaders
> readers
; ///< readers array
285 const Group theLocalGroup
; ///< group of this queue
289 * Lockless fixed-capacity bidirectional queue for a limited number
290 * processes. Any process may send data to and receive from any other
291 * process (including itself). Each process has a unique integer ID in
292 * [processIdOffset, processIdOffset + processCount) range.
294 class MultiQueue
: public BaseMultiQueue
297 typedef OneToOneUniQueue::Full Full
;
298 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge
;
301 /// Shared metadata for MultiQueue
303 Metadata(const int aProcessCount
, const int aProcessIdOffset
);
304 size_t sharedMemorySize() const { return sizeof(*this); }
305 static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata
); }
307 const int theProcessCount
;
308 const int theProcessIdOffset
;
315 Owner(const String
&id
, const int processCount
, const int processIdOffset
, const unsigned int maxItemSize
, const int capacity
);
319 Mem::Owner
<Metadata
> *const metadataOwner
;
320 Mem::Owner
<OneToOneUniQueues
> *const queuesOwner
;
321 Mem::Owner
<QueueReaders
> *const readersOwner
;
324 static Owner
*Init(const String
&id
, const int processCount
, const int processIdOffset
, const unsigned int maxItemSize
, const int capacity
);
326 MultiQueue(const String
&id
, const int localProcessId
);
329 virtual const OneToOneUniQueue
&inQueue(const int remoteProcessId
) const;
330 virtual const OneToOneUniQueue
&outQueue(const int remoteProcessId
) const;
331 virtual const QueueReader
&localReader() const;
332 virtual const QueueReader
&remoteReader(const int remoteProcessId
) const;
333 virtual int remotesCount() const;
334 virtual int remotesIdOffset() const;
337 bool validProcessId(const int processId
) const;
338 const OneToOneUniQueue
&oneToOneQueue(const int fromProcessId
, const int toProcessId
) const;
339 const QueueReader
&reader(const int processId
) const;
342 const Mem::Pointer
<Metadata
> metadata
; ///< shared metadata
343 const Mem::Pointer
<OneToOneUniQueues
> queues
; ///< unidirection one-to-one queues
344 const Mem::Pointer
<QueueReaders
> readers
; ///< readers array
349 template <class Value
>
351 OneToOneUniQueue::pop(Value
&value
, QueueReader
*const reader
)
353 if (sizeof(value
) > theMaxItemSize
)
354 throw ItemTooLarge();
356 // A writer might push between the empty test and block() below, so we do
357 // not return false right after calling block(), but test again.
363 // A writer might push between the empty test and block() below,
364 // so we must test again as such a writer will not signal us.
372 const unsigned int pos
= (theOut
++ % theCapacity
) * theMaxItemSize
;
373 memcpy(&value
, theBuffer
+ pos
, sizeof(value
));
379 template <class Value
>
381 OneToOneUniQueue::peek(Value
&value
) const
383 if (sizeof(value
) > theMaxItemSize
)
384 throw ItemTooLarge();
389 // the reader may pop() before we copy; making this method imprecise
390 const unsigned int pos
= (theOut
% theCapacity
) * theMaxItemSize
;
391 memcpy(&value
, theBuffer
+ pos
, sizeof(value
));
395 template <class Value
>
397 OneToOneUniQueue::push(const Value
&value
, QueueReader
*const reader
)
399 if (sizeof(value
) > theMaxItemSize
)
400 throw ItemTooLarge();
405 const unsigned int pos
= theIn
++ % theCapacity
* theMaxItemSize
;
406 memcpy(theBuffer
+ pos
, &value
, sizeof(value
));
407 const bool wasEmpty
= !theSize
++;
409 return wasEmpty
&& (!reader
|| reader
->raiseSignal());
414 inline OneToOneUniQueue
&
415 OneToOneUniQueues::operator [](const int index
)
417 return const_cast<OneToOneUniQueue
&>((*const_cast<const OneToOneUniQueues
*>(this))[index
]);
420 inline const OneToOneUniQueue
&
421 OneToOneUniQueues::front() const
423 const char *const queue
=
424 reinterpret_cast<const char *>(this) + sizeof(*this);
425 return *reinterpret_cast<const OneToOneUniQueue
*>(queue
);
430 template <class Value
>
432 BaseMultiQueue::pop(int &remoteProcessId
, Value
&value
)
434 // iterate all remote processes, starting after the one we visited last
435 for (int i
= 0; i
< remotesCount(); ++i
) {
436 if (++theLastPopProcessId
>= remotesIdOffset() + remotesCount())
437 theLastPopProcessId
= remotesIdOffset();
438 OneToOneUniQueue
&queue
= inQueue(theLastPopProcessId
);
439 if (queue
.pop(value
, &localReader())) {
440 remoteProcessId
= theLastPopProcessId
;
441 debugs(54, 7, HERE
<< "popped from " << remoteProcessId
<< " to " << theLocalProcessId
<< " at " << queue
.size());
445 return false; // no process had anything to pop
448 template <class Value
>
450 BaseMultiQueue::push(const int remoteProcessId
, const Value
&value
)
452 OneToOneUniQueue
&remoteQueue
= outQueue(remoteProcessId
);
453 QueueReader
&reader
= remoteReader(remoteProcessId
);
454 debugs(54, 7, HERE
<< "pushing from " << theLocalProcessId
<< " to " << remoteProcessId
<< " at " << remoteQueue
.size());
455 return remoteQueue
.push(value
, &reader
);
458 template <class Value
>
460 BaseMultiQueue::peek(int &remoteProcessId
, Value
&value
) const
462 // mimic FewToFewBiQueue::pop() but quit just before popping
463 int popProcessId
= theLastPopProcessId
; // preserve for future pop()
464 for (int i
= 0; i
< remotesCount(); ++i
) {
465 if (++popProcessId
>= remotesIdOffset() + remotesCount())
466 popProcessId
= remotesIdOffset();
467 const OneToOneUniQueue
&queue
= inQueue(popProcessId
);
468 if (queue
.peek(value
)) {
469 remoteProcessId
= popProcessId
;
473 return false; // most likely, no process had anything to pop
478 template <class Value
>
480 FewToFewBiQueue::findOldest(const int remoteProcessId
, Value
&value
) const
482 // we may be called before remote process configured its queue end
483 if (!validProcessId(remoteGroup(), remoteProcessId
))
486 // we need the oldest value, so start with the incoming, them-to-us queue:
487 const OneToOneUniQueue
&in
= inQueue(remoteProcessId
);
488 debugs(54, 2, HERE
<< "peeking from " << remoteProcessId
<< " to " <<
489 theLocalProcessId
<< " at " << in
.size());
493 // if the incoming queue is empty, check the outgoing, us-to-them queue:
494 const OneToOneUniQueue
&out
= outQueue(remoteProcessId
);
495 debugs(54, 2, HERE
<< "peeking from " << theLocalProcessId
<< " to " <<
496 remoteProcessId
<< " at " << out
.size());
497 return out
.peek(value
);
502 #endif // SQUID_IPC_QUEUE_H