6 #ifndef SQUID_IPC_QUEUE_H
7 #define SQUID_IPC_QUEUE_H
11 #include "base/InstanceId.h"
12 #include "ipc/AtomicWord.h"
13 #include "ipc/mem/Pointer.h"
21 /// State of the reading end of a queue (i.e., of the code calling pop()).
22 /// Multiple queues attached to one reader share this state.
26 QueueReader(); // the initial state is "blocked without a signal"
28 /// whether the reader is waiting for a notification signal
29 bool blocked() const { return popBlocked
== 1; }
31 /// marks the reader as blocked, waiting for a notification signal
32 void block() { popBlocked
.swap_if(0, 1); }
34 /// removes the block() effects
35 void unblock() { popBlocked
.swap_if(1, 0); }
37 /// if reader is blocked and not notified, marks the notification signal
38 /// as sent and not received, returning true; otherwise, returns false
39 bool raiseSignal() { return blocked() && popSignal
.swap_if(0,1); }
41 /// marks sent reader notification as received (also removes pop blocking)
42 void clearSignal() { unblock(); popSignal
.swap_if(1,0); }
45 AtomicWord popBlocked
; ///< whether the reader is blocked on pop()
46 AtomicWord popSignal
; ///< whether writer has sent and reader has not received notification
49 typedef AtomicWord Rate
; ///< pop()s per second
50 Rate rateLimit
; ///< pop()s per second limit if positive
52 // we need a signed atomic type because balance may get negative
53 typedef AtomicWordT
<int> AtomicSignedMsec
;
54 typedef AtomicSignedMsec Balance
;
55 /// how far ahead the reader is compared to a perfect read/sec event rate
58 /// unique ID for debugging which reader is used (works across processes)
59 const InstanceId
<QueueReader
> id
;
62 /// shared array of QueueReaders
66 QueueReaders(const int aCapacity
);
67 size_t sharedMemorySize() const;
68 static size_t SharedMemorySize(const int capacity
);
70 const int theCapacity
; /// number of readers
71 QueueReader theReaders
[]; /// readers
75 * Lockless fixed-capacity queue for a single writer and a single reader.
77 * If the queue is empty, the reader is considered "blocked" and needs
78 * an out-of-band notification message to notice the next pushed item.
80 * Current implementation assumes that the writer cannot get blocked: if the
81 * queue is full, the writer will just not push and come back later (with a
82 * different value). We can add support for blocked writers if needed.
84 class OneToOneUniQueue
87 // pop() and push() exceptions; TODO: use TextException instead
89 class ItemTooLarge
{};
91 OneToOneUniQueue(const unsigned int aMaxItemSize
, const int aCapacity
);
93 unsigned int maxItemSize() const { return theMaxItemSize
; }
94 int size() const { return theSize
; }
95 int capacity() const { return theCapacity
; }
96 int sharedMemorySize() const { return Items2Bytes(theMaxItemSize
, theCapacity
); }
98 bool empty() const { return !theSize
; }
99 bool full() const { return theSize
== theCapacity
; }
101 static int Bytes2Items(const unsigned int maxItemSize
, int size
);
102 static int Items2Bytes(const unsigned int maxItemSize
, const int size
);
104 /// returns true iff the value was set; [un]blocks the reader as needed
105 template<class Value
> bool pop(Value
&value
, QueueReader
*const reader
= NULL
);
107 /// returns true iff the caller must notify the reader of the pushed item
108 template<class Value
> bool push(const Value
&value
, QueueReader
*const reader
= NULL
);
110 /// returns true iff the value was set; the value may be stale!
111 template<class Value
> bool peek(Value
&value
) const;
115 unsigned int theIn
; ///< input index, used only in push()
116 unsigned int theOut
; ///< output index, used only in pop()
118 AtomicWord theSize
; ///< number of items in the queue
119 const unsigned int theMaxItemSize
; ///< maximum item size
120 const int theCapacity
; ///< maximum number of items, i.e. theBuffer size
125 /// shared array of OneToOneUniQueues
126 class OneToOneUniQueues
129 OneToOneUniQueues(const int aCapacity
, const unsigned int maxItemSize
, const int queueCapacity
);
131 size_t sharedMemorySize() const;
132 static size_t SharedMemorySize(const int capacity
, const unsigned int maxItemSize
, const int queueCapacity
);
134 const OneToOneUniQueue
&operator [](const int index
) const;
135 inline OneToOneUniQueue
&operator [](const int index
);
138 inline const OneToOneUniQueue
&front() const;
141 const int theCapacity
; /// number of OneToOneUniQueues
145 * Lockless fixed-capacity bidirectional queue for a limited number
146 * processes. Allows communication between two groups of processes:
147 * any process in one group may send data to and receive from any
148 * process in another group, but processes in the same group can not
149 * communicate. Process in each group has a unique integer ID in
150 * [groupIdOffset, groupIdOffset + groupSize) range.
152 class FewToFewBiQueue
155 typedef OneToOneUniQueue::Full Full
;
156 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge
;
159 /// Shared metadata for FewToFewBiQueue
161 Metadata(const int aGroupASize
, const int aGroupAIdOffset
, const int aGroupBSize
, const int aGroupBIdOffset
);
162 size_t sharedMemorySize() const { return sizeof(*this); }
163 static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata
); }
165 const int theGroupASize
;
166 const int theGroupAIdOffset
;
167 const int theGroupBSize
;
168 const int theGroupBIdOffset
;
175 Owner(const String
&id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
);
179 Mem::Owner
<Metadata
> *const metadataOwner
;
180 Mem::Owner
<OneToOneUniQueues
> *const queuesOwner
;
181 Mem::Owner
<QueueReaders
> *const readersOwner
;
184 static Owner
*Init(const String
&id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
);
186 enum Group
{ groupA
= 0, groupB
= 1 };
187 FewToFewBiQueue(const String
&id
, const Group aLocalGroup
, const int aLocalProcessId
);
189 /// maximum number of items in the queue
190 static int MaxItemsCount(const int groupASize
, const int groupBSize
, const int capacity
);
192 Group
localGroup() const { return theLocalGroup
; }
193 Group
remoteGroup() const { return theLocalGroup
== groupA
? groupB
: groupA
; }
195 /// clears the reader notification received by the local process from the remote process
196 void clearReaderSignal(const int remoteProcessId
);
198 /// picks a process and calls OneToOneUniQueue::pop() using its queue
199 template <class Value
> bool pop(int &remoteProcessId
, Value
&value
);
201 /// calls OneToOneUniQueue::push() using the given process queue
202 template <class Value
> bool push(const int remoteProcessId
, const Value
&value
);
204 /// finds the oldest item in incoming and outgoing queues between
205 /// us and the given remote process
206 template<class Value
> bool findOldest(const int remoteProcessId
, Value
&value
) const;
208 /// peeks at the item likely to be pop()ed next
209 template<class Value
> bool peek(int &remoteProcessId
, Value
&value
) const;
211 /// returns local reader's balance
212 QueueReader::Balance
&localBalance();
214 /// returns reader's balance for a given remote process
215 const QueueReader::Balance
&balance(const int remoteProcessId
) const;
217 /// returns local reader's rate limit
218 QueueReader::Rate
&localRateLimit();
220 /// returns reader's rate limit for a given remote process
221 const QueueReader::Rate
&rateLimit(const int remoteProcessId
) const;
223 /// number of items in incoming queue from a given remote process
224 int inSize(const int remoteProcessId
) const { return inQueue(remoteProcessId
).size(); }
226 /// number of items in outgoing queue to a given remote process
227 int outSize(const int remoteProcessId
) const { return outQueue(remoteProcessId
).size(); }
230 bool validProcessId(const Group group
, const int processId
) const;
231 int oneToOneQueueIndex(const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const;
232 const OneToOneUniQueue
&oneToOneQueue(const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const;
233 OneToOneUniQueue
&oneToOneQueue(const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
);
234 const OneToOneUniQueue
&inQueue(const int remoteProcessId
) const;
235 const OneToOneUniQueue
&outQueue(const int remoteProcessId
) const;
236 QueueReader
&reader(const Group group
, const int processId
);
237 const QueueReader
&reader(const Group group
, const int processId
) const;
238 int readerIndex(const Group group
, const int processId
) const;
239 int remoteGroupSize() const { return theLocalGroup
== groupA
? metadata
->theGroupBSize
: metadata
->theGroupASize
; }
240 int remoteGroupIdOffset() const { return theLocalGroup
== groupA
? metadata
->theGroupBIdOffset
: metadata
->theGroupAIdOffset
; }
243 const Mem::Pointer
<Metadata
> metadata
; ///< shared metadata
244 const Mem::Pointer
<OneToOneUniQueues
> queues
; ///< unidirection one-to-one queues
245 const Mem::Pointer
<QueueReaders
> readers
; ///< readers array
247 const Group theLocalGroup
; ///< group of this queue
248 const int theLocalProcessId
; ///< process ID of this queue
249 int theLastPopProcessId
; ///< the ID of the last process we tried to pop() from
255 template <class Value
>
257 OneToOneUniQueue::pop(Value
&value
, QueueReader
*const reader
)
259 if (sizeof(value
) > theMaxItemSize
)
260 throw ItemTooLarge();
262 // A writer might push between the empty test and block() below, so we do
263 // not return false right after calling block(), but test again.
269 // A writer might push between the empty test and block() below,
270 // so we must test again as such a writer will not signal us.
278 const unsigned int pos
= (theOut
++ % theCapacity
) * theMaxItemSize
;
279 memcpy(&value
, theBuffer
+ pos
, sizeof(value
));
285 template <class Value
>
287 OneToOneUniQueue::peek(Value
&value
) const
289 if (sizeof(value
) > theMaxItemSize
)
290 throw ItemTooLarge();
295 // the reader may pop() before we copy; making this method imprecise
296 const unsigned int pos
= (theOut
% theCapacity
) * theMaxItemSize
;
297 memcpy(&value
, theBuffer
+ pos
, sizeof(value
));
301 template <class Value
>
303 OneToOneUniQueue::push(const Value
&value
, QueueReader
*const reader
)
305 if (sizeof(value
) > theMaxItemSize
)
306 throw ItemTooLarge();
311 const bool wasEmpty
= empty();
312 const unsigned int pos
= theIn
++ % theCapacity
* theMaxItemSize
;
313 memcpy(theBuffer
+ pos
, &value
, sizeof(value
));
316 return wasEmpty
&& (!reader
|| reader
->raiseSignal());
322 inline OneToOneUniQueue
&
323 OneToOneUniQueues::operator [](const int index
)
325 return const_cast<OneToOneUniQueue
&>((*const_cast<const OneToOneUniQueues
*>(this))[index
]);
328 inline const OneToOneUniQueue
&
329 OneToOneUniQueues::front() const
331 const char *const queue
=
332 reinterpret_cast<const char *>(this) + sizeof(*this);
333 return *reinterpret_cast<const OneToOneUniQueue
*>(queue
);
339 template <class Value
>
341 FewToFewBiQueue::pop(int &remoteProcessId
, Value
&value
)
343 // iterate all remote group processes, starting after the one we visited last
344 QueueReader
&localReader
= reader(theLocalGroup
, theLocalProcessId
);
345 for (int i
= 0; i
< remoteGroupSize(); ++i
) {
346 if (++theLastPopProcessId
>= remoteGroupIdOffset() + remoteGroupSize())
347 theLastPopProcessId
= remoteGroupIdOffset();
348 OneToOneUniQueue
&queue
= oneToOneQueue(remoteGroup(), theLastPopProcessId
, theLocalGroup
, theLocalProcessId
);
349 if (queue
.pop(value
, &localReader
)) {
350 remoteProcessId
= theLastPopProcessId
;
351 debugs(54, 7, HERE
<< "popped from " << remoteProcessId
<< " to " << theLocalProcessId
<< " at " << queue
.size());
355 return false; // no process had anything to pop
358 template <class Value
>
360 FewToFewBiQueue::push(const int remoteProcessId
, const Value
&value
)
362 OneToOneUniQueue
&remoteQueue
= oneToOneQueue(theLocalGroup
, theLocalProcessId
, remoteGroup(), remoteProcessId
);
363 QueueReader
&remoteReader
= reader(remoteGroup(), remoteProcessId
);
364 debugs(54, 7, HERE
<< "pushing from " << theLocalProcessId
<< " to " << remoteProcessId
<< " at " << remoteQueue
.size());
365 return remoteQueue
.push(value
, &remoteReader
);
368 template <class Value
>
370 FewToFewBiQueue::findOldest(const int remoteProcessId
, Value
&value
) const
372 // we may be called before remote process configured its queue end
373 if (!validProcessId(remoteGroup(), remoteProcessId
))
376 // we need the oldest value, so start with the incoming, them-to-us queue:
377 const OneToOneUniQueue
&in
= inQueue(remoteProcessId
);
378 debugs(54, 2, HERE
<< "peeking from " << remoteProcessId
<< " to " <<
379 theLocalProcessId
<< " at " << in
.size());
383 // if the incoming queue is empty, check the outgoing, us-to-them queue:
384 const OneToOneUniQueue
&out
= outQueue(remoteProcessId
);
385 debugs(54, 2, HERE
<< "peeking from " << theLocalProcessId
<< " to " <<
386 remoteProcessId
<< " at " << out
.size());
387 return out
.peek(value
);
390 template <class Value
>
392 FewToFewBiQueue::peek(int &remoteProcessId
, Value
&value
) const
394 // mimic FewToFewBiQueue::pop() but quit just before popping
395 int popProcessId
= theLastPopProcessId
; // preserve for future pop()
396 for (int i
= 0; i
< remoteGroupSize(); ++i
) {
397 if (++popProcessId
>= remoteGroupIdOffset() + remoteGroupSize())
398 popProcessId
= remoteGroupIdOffset();
399 const OneToOneUniQueue
&queue
=
400 oneToOneQueue(remoteGroup(), popProcessId
,
401 theLocalGroup
, theLocalProcessId
);
402 if (queue
.peek(value
)) {
403 remoteProcessId
= popProcessId
;
407 return false; // most likely, no process had anything to pop
412 #endif // SQUID_IPC_QUEUE_H