]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.h
Provide "fake" AtomicWordT implementation for non-SMP configurations.
[thirdparty/squid.git] / src / ipc / Queue.h
1 /*
2 * $Id$
3 *
4 */
5
6 #ifndef SQUID_IPC_QUEUE_H
7 #define SQUID_IPC_QUEUE_H
8
9 #include "Array.h"
10 #include "Debug.h"
11 #include "base/InstanceId.h"
12 #include "ipc/AtomicWord.h"
13 #include "ipc/mem/Pointer.h"
14 #include "util.h"
15
16 class String;
17
18 namespace Ipc
19 {
20
21 /// State of the reading end of a queue (i.e., of the code calling pop()).
22 /// Multiple queues attached to one reader share this state.
23 class QueueReader
24 {
25 public:
26 QueueReader(); // the initial state is "blocked without a signal"
27
28 /// whether the reader is waiting for a notification signal
29 bool blocked() const { return popBlocked == 1; }
30
31 /// marks the reader as blocked, waiting for a notification signal
32 void block() { popBlocked.swap_if(0, 1); }
33
34 /// removes the block() effects
35 void unblock() { popBlocked.swap_if(1, 0); }
36
37 /// if reader is blocked and not notified, marks the notification signal
38 /// as sent and not received, returning true; otherwise, returns false
39 bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); }
40
41 /// marks sent reader notification as received (also removes pop blocking)
42 void clearSignal() { unblock(); popSignal.swap_if(1,0); }
43
44 private:
45 Atomic::Word popBlocked; ///< whether the reader is blocked on pop()
46 Atomic::Word popSignal; ///< whether writer has sent and reader has not received notification
47
48 public:
49 typedef Atomic::Word Rate; ///< pop()s per second
50 Rate rateLimit; ///< pop()s per second limit if positive
51
52 // we need a signed atomic type because balance may get negative
53 typedef Atomic::WordT<int> AtomicSignedMsec;
54 typedef AtomicSignedMsec Balance;
55 /// how far ahead the reader is compared to a perfect read/sec event rate
56 Balance balance;
57
58 /// unique ID for debugging which reader is used (works across processes)
59 const InstanceId<QueueReader> id;
60 };
61
62 /// shared array of QueueReaders
63 class QueueReaders
64 {
65 public:
66 QueueReaders(const int aCapacity);
67 size_t sharedMemorySize() const;
68 static size_t SharedMemorySize(const int capacity);
69
70 const int theCapacity; /// number of readers
71 QueueReader theReaders[]; /// readers
72 };
73
74 /**
75 * Lockless fixed-capacity queue for a single writer and a single reader.
76 *
77 * If the queue is empty, the reader is considered "blocked" and needs
78 * an out-of-band notification message to notice the next pushed item.
79 *
80 * Current implementation assumes that the writer cannot get blocked: if the
81 * queue is full, the writer will just not push and come back later (with a
82 * different value). We can add support for blocked writers if needed.
83 */
84 class OneToOneUniQueue
85 {
86 public:
87 // pop() and push() exceptions; TODO: use TextException instead
88 class Full {};
89 class ItemTooLarge {};
90
91 OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity);
92
93 unsigned int maxItemSize() const { return theMaxItemSize; }
94 int size() const { return theSize; }
95 int capacity() const { return theCapacity; }
96 int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); }
97
98 bool empty() const { return !theSize; }
99 bool full() const { return theSize == theCapacity; }
100
101 static int Bytes2Items(const unsigned int maxItemSize, int size);
102 static int Items2Bytes(const unsigned int maxItemSize, const int size);
103
104 /// returns true iff the value was set; [un]blocks the reader as needed
105 template<class Value> bool pop(Value &value, QueueReader *const reader = NULL);
106
107 /// returns true iff the caller must notify the reader of the pushed item
108 template<class Value> bool push(const Value &value, QueueReader *const reader = NULL);
109
110 /// returns true iff the value was set; the value may be stale!
111 template<class Value> bool peek(Value &value) const;
112
113 private:
114
115 unsigned int theIn; ///< input index, used only in push()
116 unsigned int theOut; ///< output index, used only in pop()
117
118 Atomic::Word theSize; ///< number of items in the queue
119 const unsigned int theMaxItemSize; ///< maximum item size
120 const int theCapacity; ///< maximum number of items, i.e. theBuffer size
121
122 char theBuffer[];
123 };
124
125 /// shared array of OneToOneUniQueues
126 class OneToOneUniQueues
127 {
128 public:
129 OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity);
130
131 size_t sharedMemorySize() const;
132 static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity);
133
134 const OneToOneUniQueue &operator [](const int index) const;
135 inline OneToOneUniQueue &operator [](const int index);
136
137 private:
138 inline const OneToOneUniQueue &front() const;
139
140 public:
141 const int theCapacity; /// number of OneToOneUniQueues
142 };
143
144 /**
145 * Lockless fixed-capacity bidirectional queue for a limited number
146 * processes. Allows communication between two groups of processes:
147 * any process in one group may send data to and receive from any
148 * process in another group, but processes in the same group can not
149 * communicate. Process in each group has a unique integer ID in
150 * [groupIdOffset, groupIdOffset + groupSize) range.
151 */
152 class FewToFewBiQueue
153 {
154 public:
155 typedef OneToOneUniQueue::Full Full;
156 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
157
158 private:
159 /// Shared metadata for FewToFewBiQueue
160 struct Metadata {
161 Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset);
162 size_t sharedMemorySize() const { return sizeof(*this); }
163 static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); }
164
165 const int theGroupASize;
166 const int theGroupAIdOffset;
167 const int theGroupBSize;
168 const int theGroupBIdOffset;
169 };
170
171 public:
172 class Owner
173 {
174 public:
175 Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
176 ~Owner();
177
178 private:
179 Mem::Owner<Metadata> *const metadataOwner;
180 Mem::Owner<OneToOneUniQueues> *const queuesOwner;
181 Mem::Owner<QueueReaders> *const readersOwner;
182 };
183
184 static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
185
186 enum Group { groupA = 0, groupB = 1 };
187 FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId);
188
189 /// maximum number of items in the queue
190 static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity);
191
192 Group localGroup() const { return theLocalGroup; }
193 Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
194
195 /// clears the reader notification received by the local process from the remote process
196 void clearReaderSignal(const int remoteProcessId);
197
198 /// picks a process and calls OneToOneUniQueue::pop() using its queue
199 template <class Value> bool pop(int &remoteProcessId, Value &value);
200
201 /// calls OneToOneUniQueue::push() using the given process queue
202 template <class Value> bool push(const int remoteProcessId, const Value &value);
203
204 /// finds the oldest item in incoming and outgoing queues between
205 /// us and the given remote process
206 template<class Value> bool findOldest(const int remoteProcessId, Value &value) const;
207
208 /// peeks at the item likely to be pop()ed next
209 template<class Value> bool peek(int &remoteProcessId, Value &value) const;
210
211 /// returns local reader's balance
212 QueueReader::Balance &localBalance();
213
214 /// returns reader's balance for a given remote process
215 const QueueReader::Balance &balance(const int remoteProcessId) const;
216
217 /// returns local reader's rate limit
218 QueueReader::Rate &localRateLimit();
219
220 /// returns reader's rate limit for a given remote process
221 const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
222
223 /// number of items in incoming queue from a given remote process
224 int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
225
226 /// number of items in outgoing queue to a given remote process
227 int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
228
229 private:
230 bool validProcessId(const Group group, const int processId) const;
231 int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
232 const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
233 OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId);
234 const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
235 const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
236 QueueReader &reader(const Group group, const int processId);
237 const QueueReader &reader(const Group group, const int processId) const;
238 int readerIndex(const Group group, const int processId) const;
239 int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; }
240 int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; }
241
242 private:
243 const Mem::Pointer<Metadata> metadata; ///< shared metadata
244 const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
245 const Mem::Pointer<QueueReaders> readers; ///< readers array
246
247 const Group theLocalGroup; ///< group of this queue
248 const int theLocalProcessId; ///< process ID of this queue
249 int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
250 };
251
252
253 // OneToOneUniQueue
254
255 template <class Value>
256 bool
257 OneToOneUniQueue::pop(Value &value, QueueReader *const reader)
258 {
259 if (sizeof(value) > theMaxItemSize)
260 throw ItemTooLarge();
261
262 // A writer might push between the empty test and block() below, so we do
263 // not return false right after calling block(), but test again.
264 if (empty()) {
265 if (!reader)
266 return false;
267
268 reader->block();
269 // A writer might push between the empty test and block() below,
270 // so we must test again as such a writer will not signal us.
271 if (empty())
272 return false;
273 }
274
275 if (reader)
276 reader->unblock();
277
278 const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize;
279 memcpy(&value, theBuffer + pos, sizeof(value));
280 --theSize;
281
282 return true;
283 }
284
285 template <class Value>
286 bool
287 OneToOneUniQueue::peek(Value &value) const
288 {
289 if (sizeof(value) > theMaxItemSize)
290 throw ItemTooLarge();
291
292 if (empty())
293 return false;
294
295 // the reader may pop() before we copy; making this method imprecise
296 const unsigned int pos = (theOut % theCapacity) * theMaxItemSize;
297 memcpy(&value, theBuffer + pos, sizeof(value));
298 return true;
299 }
300
301 template <class Value>
302 bool
303 OneToOneUniQueue::push(const Value &value, QueueReader *const reader)
304 {
305 if (sizeof(value) > theMaxItemSize)
306 throw ItemTooLarge();
307
308 if (full())
309 throw Full();
310
311 const bool wasEmpty = empty();
312 const unsigned int pos = theIn++ % theCapacity * theMaxItemSize;
313 memcpy(theBuffer + pos, &value, sizeof(value));
314 ++theSize;
315
316 return wasEmpty && (!reader || reader->raiseSignal());
317 }
318
319
320 // OneToOneUniQueues
321
322 inline OneToOneUniQueue &
323 OneToOneUniQueues::operator [](const int index)
324 {
325 return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]);
326 }
327
328 inline const OneToOneUniQueue &
329 OneToOneUniQueues::front() const
330 {
331 const char *const queue =
332 reinterpret_cast<const char *>(this) + sizeof(*this);
333 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
334 }
335
336
337 // FewToFewBiQueue
338
339 template <class Value>
340 bool
341 FewToFewBiQueue::pop(int &remoteProcessId, Value &value)
342 {
343 // iterate all remote group processes, starting after the one we visited last
344 QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
345 for (int i = 0; i < remoteGroupSize(); ++i) {
346 if (++theLastPopProcessId >= remoteGroupIdOffset() + remoteGroupSize())
347 theLastPopProcessId = remoteGroupIdOffset();
348 OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), theLastPopProcessId, theLocalGroup, theLocalProcessId);
349 if (queue.pop(value, &localReader)) {
350 remoteProcessId = theLastPopProcessId;
351 debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
352 return true;
353 }
354 }
355 return false; // no process had anything to pop
356 }
357
358 template <class Value>
359 bool
360 FewToFewBiQueue::push(const int remoteProcessId, const Value &value)
361 {
362 OneToOneUniQueue &remoteQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId);
363 QueueReader &remoteReader = reader(remoteGroup(), remoteProcessId);
364 debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
365 return remoteQueue.push(value, &remoteReader);
366 }
367
368 template <class Value>
369 bool
370 FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const
371 {
372 // we may be called before remote process configured its queue end
373 if (!validProcessId(remoteGroup(), remoteProcessId))
374 return false;
375
376 // we need the oldest value, so start with the incoming, them-to-us queue:
377 const OneToOneUniQueue &in = inQueue(remoteProcessId);
378 debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " <<
379 theLocalProcessId << " at " << in.size());
380 if (in.peek(value))
381 return true;
382
383 // if the incoming queue is empty, check the outgoing, us-to-them queue:
384 const OneToOneUniQueue &out = outQueue(remoteProcessId);
385 debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " <<
386 remoteProcessId << " at " << out.size());
387 return out.peek(value);
388 }
389
390 template <class Value>
391 bool
392 FewToFewBiQueue::peek(int &remoteProcessId, Value &value) const
393 {
394 // mimic FewToFewBiQueue::pop() but quit just before popping
395 int popProcessId = theLastPopProcessId; // preserve for future pop()
396 for (int i = 0; i < remoteGroupSize(); ++i) {
397 if (++popProcessId >= remoteGroupIdOffset() + remoteGroupSize())
398 popProcessId = remoteGroupIdOffset();
399 const OneToOneUniQueue &queue =
400 oneToOneQueue(remoteGroup(), popProcessId,
401 theLocalGroup, theLocalProcessId);
402 if (queue.peek(value)) {
403 remoteProcessId = popProcessId;
404 return true;
405 }
406 }
407 return false; // most likely, no process had anything to pop
408 }
409
410 } // namespace Ipc
411
412 #endif // SQUID_IPC_QUEUE_H