4 #ifndef SQUID_IPC_QUEUE_H
5 #define SQUID_IPC_QUEUE_H
7 #include "base/InstanceId.h"
8 #include "base/Vector.h"
10 #include "ipc/AtomicWord.h"
11 #include "ipc/mem/FlexibleArray.h"
12 #include "ipc/mem/Pointer.h"
20 /// State of the reading end of a queue (i.e., of the code calling pop()).
21 /// Multiple queues attached to one reader share this state.
25 QueueReader(); // the initial state is "blocked without a signal"
27 /// whether the reader is waiting for a notification signal
28 bool blocked() const { return popBlocked
== 1; }
30 /// marks the reader as blocked, waiting for a notification signal
31 void block() { popBlocked
.swap_if(0, 1); }
33 /// removes the block() effects
34 void unblock() { popBlocked
.swap_if(1, 0); }
36 /// if reader is blocked and not notified, marks the notification signal
37 /// as sent and not received, returning true; otherwise, returns false
38 bool raiseSignal() { return blocked() && popSignal
.swap_if(0,1); }
40 /// marks sent reader notification as received (also removes pop blocking)
41 void clearSignal() { unblock(); popSignal
.swap_if(1,0); }
44 Atomic::Word popBlocked
; ///< whether the reader is blocked on pop()
45 Atomic::Word popSignal
; ///< whether writer has sent and reader has not received notification
48 typedef Atomic::Word Rate
; ///< pop()s per second
49 Rate rateLimit
; ///< pop()s per second limit if positive
51 // we need a signed atomic type because balance may get negative
52 typedef Atomic::WordT
<int> AtomicSignedMsec
;
53 typedef AtomicSignedMsec Balance
;
54 /// how far ahead the reader is compared to a perfect read/sec event rate
57 /// unique ID for debugging which reader is used (works across processes)
58 const InstanceId
<QueueReader
> id
;
61 /// shared array of QueueReaders
65 QueueReaders(const int aCapacity
);
66 size_t sharedMemorySize() const;
67 static size_t SharedMemorySize(const int capacity
);
69 const int theCapacity
; /// number of readers
70 Ipc::Mem::FlexibleArray
<QueueReader
> theReaders
; /// readers
74 * Lockless fixed-capacity queue for a single writer and a single reader.
76 * If the queue is empty, the reader is considered "blocked" and needs
77 * an out-of-band notification message to notice the next pushed item.
79 * Current implementation assumes that the writer cannot get blocked: if the
80 * queue is full, the writer will just not push and come back later (with a
81 * different value). We can add support for blocked writers if needed.
83 class OneToOneUniQueue
86 // pop() and push() exceptions; TODO: use TextException instead
88 class ItemTooLarge
{};
90 OneToOneUniQueue(const unsigned int aMaxItemSize
, const int aCapacity
);
92 unsigned int maxItemSize() const { return theMaxItemSize
; }
93 int size() const { return theSize
; }
94 int capacity() const { return theCapacity
; }
95 int sharedMemorySize() const { return Items2Bytes(theMaxItemSize
, theCapacity
); }
97 bool empty() const { return !theSize
; }
98 bool full() const { return theSize
== theCapacity
; }
100 static int Bytes2Items(const unsigned int maxItemSize
, int size
);
101 static int Items2Bytes(const unsigned int maxItemSize
, const int size
);
103 /// returns true iff the value was set; [un]blocks the reader as needed
104 template<class Value
> bool pop(Value
&value
, QueueReader
*const reader
= NULL
);
106 /// returns true iff the caller must notify the reader of the pushed item
107 template<class Value
> bool push(const Value
&value
, QueueReader
*const reader
= NULL
);
109 /// returns true iff the value was set; the value may be stale!
110 template<class Value
> bool peek(Value
&value
) const;
114 unsigned int theIn
; ///< input index, used only in push()
115 unsigned int theOut
; ///< output index, used only in pop()
117 Atomic::Word theSize
; ///< number of items in the queue
118 const unsigned int theMaxItemSize
; ///< maximum item size
119 const int theCapacity
; ///< maximum number of items, i.e. theBuffer size
124 /// shared array of OneToOneUniQueues
125 class OneToOneUniQueues
128 OneToOneUniQueues(const int aCapacity
, const unsigned int maxItemSize
, const int queueCapacity
);
130 size_t sharedMemorySize() const;
131 static size_t SharedMemorySize(const int capacity
, const unsigned int maxItemSize
, const int queueCapacity
);
133 const OneToOneUniQueue
&operator [](const int index
) const;
134 inline OneToOneUniQueue
&operator [](const int index
);
137 inline const OneToOneUniQueue
&front() const;
140 const int theCapacity
; /// number of OneToOneUniQueues
144 * Lockless fixed-capacity bidirectional queue for a limited number
145 * processes. Allows communication between two groups of processes:
146 * any process in one group may send data to and receive from any
147 * process in another group, but processes in the same group can not
148 * communicate. Process in each group has a unique integer ID in
149 * [groupIdOffset, groupIdOffset + groupSize) range.
151 class FewToFewBiQueue
154 typedef OneToOneUniQueue::Full Full
;
155 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge
;
158 /// Shared metadata for FewToFewBiQueue
160 Metadata(const int aGroupASize
, const int aGroupAIdOffset
, const int aGroupBSize
, const int aGroupBIdOffset
);
161 size_t sharedMemorySize() const { return sizeof(*this); }
162 static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata
); }
164 const int theGroupASize
;
165 const int theGroupAIdOffset
;
166 const int theGroupBSize
;
167 const int theGroupBIdOffset
;
174 Owner(const String
&id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
);
178 Mem::Owner
<Metadata
> *const metadataOwner
;
179 Mem::Owner
<OneToOneUniQueues
> *const queuesOwner
;
180 Mem::Owner
<QueueReaders
> *const readersOwner
;
183 static Owner
*Init(const String
&id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
);
185 enum Group
{ groupA
= 0, groupB
= 1 };
186 FewToFewBiQueue(const String
&id
, const Group aLocalGroup
, const int aLocalProcessId
);
188 /// maximum number of items in the queue
189 static int MaxItemsCount(const int groupASize
, const int groupBSize
, const int capacity
);
191 Group
localGroup() const { return theLocalGroup
; }
192 Group
remoteGroup() const { return theLocalGroup
== groupA
? groupB
: groupA
; }
194 /// clears the reader notification received by the local process from the remote process
195 void clearReaderSignal(const int remoteProcessId
);
197 /// picks a process and calls OneToOneUniQueue::pop() using its queue
198 template <class Value
> bool pop(int &remoteProcessId
, Value
&value
);
200 /// calls OneToOneUniQueue::push() using the given process queue
201 template <class Value
> bool push(const int remoteProcessId
, const Value
&value
);
203 /// finds the oldest item in incoming and outgoing queues between
204 /// us and the given remote process
205 template<class Value
> bool findOldest(const int remoteProcessId
, Value
&value
) const;
207 /// peeks at the item likely to be pop()ed next
208 template<class Value
> bool peek(int &remoteProcessId
, Value
&value
) const;
210 /// returns local reader's balance
211 QueueReader::Balance
&localBalance();
213 /// returns reader's balance for a given remote process
214 const QueueReader::Balance
&balance(const int remoteProcessId
) const;
216 /// returns local reader's rate limit
217 QueueReader::Rate
&localRateLimit();
219 /// returns reader's rate limit for a given remote process
220 const QueueReader::Rate
&rateLimit(const int remoteProcessId
) const;
222 /// number of items in incoming queue from a given remote process
223 int inSize(const int remoteProcessId
) const { return inQueue(remoteProcessId
).size(); }
225 /// number of items in outgoing queue to a given remote process
226 int outSize(const int remoteProcessId
) const { return outQueue(remoteProcessId
).size(); }
229 bool validProcessId(const Group group
, const int processId
) const;
230 int oneToOneQueueIndex(const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const;
231 const OneToOneUniQueue
&oneToOneQueue(const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const;
232 OneToOneUniQueue
&oneToOneQueue(const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
);
233 const OneToOneUniQueue
&inQueue(const int remoteProcessId
) const;
234 const OneToOneUniQueue
&outQueue(const int remoteProcessId
) const;
235 QueueReader
&reader(const Group group
, const int processId
);
236 const QueueReader
&reader(const Group group
, const int processId
) const;
237 int readerIndex(const Group group
, const int processId
) const;
238 int remoteGroupSize() const { return theLocalGroup
== groupA
? metadata
->theGroupBSize
: metadata
->theGroupASize
; }
239 int remoteGroupIdOffset() const { return theLocalGroup
== groupA
? metadata
->theGroupBIdOffset
: metadata
->theGroupAIdOffset
; }
242 const Mem::Pointer
<Metadata
> metadata
; ///< shared metadata
243 const Mem::Pointer
<OneToOneUniQueues
> queues
; ///< unidirection one-to-one queues
244 const Mem::Pointer
<QueueReaders
> readers
; ///< readers array
246 const Group theLocalGroup
; ///< group of this queue
247 const int theLocalProcessId
; ///< process ID of this queue
248 int theLastPopProcessId
; ///< the ID of the last process we tried to pop() from
253 template <class Value
>
255 OneToOneUniQueue::pop(Value
&value
, QueueReader
*const reader
)
257 if (sizeof(value
) > theMaxItemSize
)
258 throw ItemTooLarge();
260 // A writer might push between the empty test and block() below, so we do
261 // not return false right after calling block(), but test again.
267 // A writer might push between the empty test and block() below,
268 // so we must test again as such a writer will not signal us.
276 const unsigned int pos
= (theOut
++ % theCapacity
) * theMaxItemSize
;
277 memcpy(&value
, theBuffer
+ pos
, sizeof(value
));
283 template <class Value
>
285 OneToOneUniQueue::peek(Value
&value
) const
287 if (sizeof(value
) > theMaxItemSize
)
288 throw ItemTooLarge();
293 // the reader may pop() before we copy; making this method imprecise
294 const unsigned int pos
= (theOut
% theCapacity
) * theMaxItemSize
;
295 memcpy(&value
, theBuffer
+ pos
, sizeof(value
));
299 template <class Value
>
301 OneToOneUniQueue::push(const Value
&value
, QueueReader
*const reader
)
303 if (sizeof(value
) > theMaxItemSize
)
304 throw ItemTooLarge();
309 const bool wasEmpty
= empty();
310 const unsigned int pos
= theIn
++ % theCapacity
* theMaxItemSize
;
311 memcpy(theBuffer
+ pos
, &value
, sizeof(value
));
314 return wasEmpty
&& (!reader
|| reader
->raiseSignal());
319 inline OneToOneUniQueue
&
320 OneToOneUniQueues::operator [](const int index
)
322 return const_cast<OneToOneUniQueue
&>((*const_cast<const OneToOneUniQueues
*>(this))[index
]);
325 inline const OneToOneUniQueue
&
326 OneToOneUniQueues::front() const
328 const char *const queue
=
329 reinterpret_cast<const char *>(this) + sizeof(*this);
330 return *reinterpret_cast<const OneToOneUniQueue
*>(queue
);
335 template <class Value
>
337 FewToFewBiQueue::pop(int &remoteProcessId
, Value
&value
)
339 // iterate all remote group processes, starting after the one we visited last
340 QueueReader
&localReader
= reader(theLocalGroup
, theLocalProcessId
);
341 for (int i
= 0; i
< remoteGroupSize(); ++i
) {
342 if (++theLastPopProcessId
>= remoteGroupIdOffset() + remoteGroupSize())
343 theLastPopProcessId
= remoteGroupIdOffset();
344 OneToOneUniQueue
&queue
= oneToOneQueue(remoteGroup(), theLastPopProcessId
, theLocalGroup
, theLocalProcessId
);
345 if (queue
.pop(value
, &localReader
)) {
346 remoteProcessId
= theLastPopProcessId
;
347 debugs(54, 7, HERE
<< "popped from " << remoteProcessId
<< " to " << theLocalProcessId
<< " at " << queue
.size());
351 return false; // no process had anything to pop
354 template <class Value
>
356 FewToFewBiQueue::push(const int remoteProcessId
, const Value
&value
)
358 OneToOneUniQueue
&remoteQueue
= oneToOneQueue(theLocalGroup
, theLocalProcessId
, remoteGroup(), remoteProcessId
);
359 QueueReader
&remoteReader
= reader(remoteGroup(), remoteProcessId
);
360 debugs(54, 7, HERE
<< "pushing from " << theLocalProcessId
<< " to " << remoteProcessId
<< " at " << remoteQueue
.size());
361 return remoteQueue
.push(value
, &remoteReader
);
364 template <class Value
>
366 FewToFewBiQueue::findOldest(const int remoteProcessId
, Value
&value
) const
368 // we may be called before remote process configured its queue end
369 if (!validProcessId(remoteGroup(), remoteProcessId
))
372 // we need the oldest value, so start with the incoming, them-to-us queue:
373 const OneToOneUniQueue
&in
= inQueue(remoteProcessId
);
374 debugs(54, 2, HERE
<< "peeking from " << remoteProcessId
<< " to " <<
375 theLocalProcessId
<< " at " << in
.size());
379 // if the incoming queue is empty, check the outgoing, us-to-them queue:
380 const OneToOneUniQueue
&out
= outQueue(remoteProcessId
);
381 debugs(54, 2, HERE
<< "peeking from " << theLocalProcessId
<< " to " <<
382 remoteProcessId
<< " at " << out
.size());
383 return out
.peek(value
);
386 template <class Value
>
388 FewToFewBiQueue::peek(int &remoteProcessId
, Value
&value
) const
390 // mimic FewToFewBiQueue::pop() but quit just before popping
391 int popProcessId
= theLastPopProcessId
; // preserve for future pop()
392 for (int i
= 0; i
< remoteGroupSize(); ++i
) {
393 if (++popProcessId
>= remoteGroupIdOffset() + remoteGroupSize())
394 popProcessId
= remoteGroupIdOffset();
395 const OneToOneUniQueue
&queue
=
396 oneToOneQueue(remoteGroup(), popProcessId
,
397 theLocalGroup
, theLocalProcessId
);
398 if (queue
.peek(value
)) {
399 remoteProcessId
= popProcessId
;
403 return false; // most likely, no process had anything to pop
408 #endif // SQUID_IPC_QUEUE_H