]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.h
Fixed ipc/Queue notification race leading to stuck, overflowing queues.
[thirdparty/squid.git] / src / ipc / Queue.h
1 /*
2 */
3
4 #ifndef SQUID_IPC_QUEUE_H
5 #define SQUID_IPC_QUEUE_H
6
7 #include "Array.h"
8 #include "Debug.h"
9 #include "base/InstanceId.h"
10 #include "ipc/AtomicWord.h"
11 #include "ipc/mem/FlexibleArray.h"
12 #include "ipc/mem/Pointer.h"
13 #include "util.h"
14
15 class String;
16
17 namespace Ipc
18 {
19
20 /// State of the reading end of a queue (i.e., of the code calling pop()).
21 /// Multiple queues attached to one reader share this state.
22 class QueueReader
23 {
24 public:
25 QueueReader(); // the initial state is "blocked without a signal"
26
27 /// whether the reader is waiting for a notification signal
28 bool blocked() const { return popBlocked == 1; }
29
30 /// marks the reader as blocked, waiting for a notification signal
31 void block() { popBlocked.swap_if(0, 1); }
32
33 /// removes the block() effects
34 void unblock() { popBlocked.swap_if(1, 0); }
35
36 /// if reader is blocked and not notified, marks the notification signal
37 /// as sent and not received, returning true; otherwise, returns false
38 bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); }
39
40 /// marks sent reader notification as received (also removes pop blocking)
41 void clearSignal() { unblock(); popSignal.swap_if(1,0); }
42
43 private:
44 Atomic::Word popBlocked; ///< whether the reader is blocked on pop()
45 Atomic::Word popSignal; ///< whether writer has sent and reader has not received notification
46
47 public:
48 typedef Atomic::Word Rate; ///< pop()s per second
49 Rate rateLimit; ///< pop()s per second limit if positive
50
51 // we need a signed atomic type because balance may get negative
52 typedef Atomic::WordT<int> AtomicSignedMsec;
53 typedef AtomicSignedMsec Balance;
54 /// how far ahead the reader is compared to a perfect read/sec event rate
55 Balance balance;
56
57 /// unique ID for debugging which reader is used (works across processes)
58 const InstanceId<QueueReader> id;
59 };
60
61 /// shared array of QueueReaders
62 class QueueReaders
63 {
64 public:
65 QueueReaders(const int aCapacity);
66 size_t sharedMemorySize() const;
67 static size_t SharedMemorySize(const int capacity);
68
69 const int theCapacity; /// number of readers
70 Ipc::Mem::FlexibleArray<QueueReader> theReaders; /// readers
71 };
72
73 /**
74 * Lockless fixed-capacity queue for a single writer and a single reader.
75 *
76 * If the queue is empty, the reader is considered "blocked" and needs
77 * an out-of-band notification message to notice the next pushed item.
78 *
79 * Current implementation assumes that the writer cannot get blocked: if the
80 * queue is full, the writer will just not push and come back later (with a
81 * different value). We can add support for blocked writers if needed.
82 */
83 class OneToOneUniQueue
84 {
85 public:
86 // pop() and push() exceptions; TODO: use TextException instead
87 class Full {};
88 class ItemTooLarge {};
89
90 OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity);
91
92 unsigned int maxItemSize() const { return theMaxItemSize; }
93 int size() const { return theSize; }
94 int capacity() const { return theCapacity; }
95 int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); }
96
97 bool empty() const { return !theSize; }
98 bool full() const { return theSize == theCapacity; }
99
100 static int Bytes2Items(const unsigned int maxItemSize, int size);
101 static int Items2Bytes(const unsigned int maxItemSize, const int size);
102
103 /// returns true iff the value was set; [un]blocks the reader as needed
104 template<class Value> bool pop(Value &value, QueueReader *const reader = NULL);
105
106 /// returns true iff the caller must notify the reader of the pushed item
107 template<class Value> bool push(const Value &value, QueueReader *const reader = NULL);
108
109 /// returns true iff the value was set; the value may be stale!
110 template<class Value> bool peek(Value &value) const;
111
112 private:
113
114 unsigned int theIn; ///< input index, used only in push()
115 unsigned int theOut; ///< output index, used only in pop()
116
117 Atomic::Word theSize; ///< number of items in the queue
118 const unsigned int theMaxItemSize; ///< maximum item size
119 const int theCapacity; ///< maximum number of items, i.e. theBuffer size
120
121 char theBuffer[];
122 };
123
124 /// shared array of OneToOneUniQueues
125 class OneToOneUniQueues
126 {
127 public:
128 OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity);
129
130 size_t sharedMemorySize() const;
131 static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity);
132
133 const OneToOneUniQueue &operator [](const int index) const;
134 inline OneToOneUniQueue &operator [](const int index);
135
136 private:
137 inline const OneToOneUniQueue &front() const;
138
139 public:
140 const int theCapacity; /// number of OneToOneUniQueues
141 };
142
143 /**
144 * Base class for lockless fixed-capacity bidirectional queues for a
145 * limited number processes.
146 */
147 class BaseMultiQueue
148 {
149 public:
150 BaseMultiQueue(const int aLocalProcessId);
151
152 /// clears the reader notification received by the local process from the remote process
153 void clearReaderSignal(const int remoteProcessId);
154
155 /// picks a process and calls OneToOneUniQueue::pop() using its queue
156 template <class Value> bool pop(int &remoteProcessId, Value &value);
157
158 /// calls OneToOneUniQueue::push() using the given process queue
159 template <class Value> bool push(const int remoteProcessId, const Value &value);
160
161 /// peeks at the item likely to be pop()ed next
162 template<class Value> bool peek(int &remoteProcessId, Value &value) const;
163
164 /// returns local reader's balance
165 QueueReader::Balance &localBalance() { return localReader().balance; }
166
167 /// returns reader's balance for a given remote process
168 const QueueReader::Balance &balance(const int remoteProcessId) const;
169
170 /// returns local reader's rate limit
171 QueueReader::Rate &localRateLimit() { return localReader().rateLimit; }
172
173 /// returns reader's rate limit for a given remote process
174 const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
175
176 /// number of items in incoming queue from a given remote process
177 int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
178
179 /// number of items in outgoing queue to a given remote process
180 int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
181
182 protected:
183 /// incoming queue from a given remote process
184 virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0;
185 OneToOneUniQueue &inQueue(const int remoteProcessId);
186
187 /// outgoing queue to a given remote process
188 virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0;
189 OneToOneUniQueue &outQueue(const int remoteProcessId);
190
191 virtual const QueueReader &localReader() const = 0;
192 QueueReader &localReader();
193
194 virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0;
195 QueueReader &remoteReader(const int remoteProcessId);
196
197 virtual int remotesCount() const = 0;
198 virtual int remotesIdOffset() const = 0;
199
200 protected:
201 const int theLocalProcessId; ///< process ID of this queue
202
203 private:
204 int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
205 };
206
207 /**
208 * Lockless fixed-capacity bidirectional queue for a limited number
209 * processes. Allows communication between two groups of processes:
210 * any process in one group may send data to and receive from any
211 * process in another group, but processes in the same group can not
212 * communicate. Process in each group has a unique integer ID in
213 * [groupIdOffset, groupIdOffset + groupSize) range.
214 */
215 class FewToFewBiQueue: public BaseMultiQueue
216 {
217 public:
218 typedef OneToOneUniQueue::Full Full;
219 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
220
221 private:
222 /// Shared metadata for FewToFewBiQueue
223 struct Metadata {
224 Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset);
225 size_t sharedMemorySize() const { return sizeof(*this); }
226 static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); }
227
228 const int theGroupASize;
229 const int theGroupAIdOffset;
230 const int theGroupBSize;
231 const int theGroupBIdOffset;
232 };
233
234 public:
235 class Owner
236 {
237 public:
238 Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
239 ~Owner();
240
241 private:
242 Mem::Owner<Metadata> *const metadataOwner;
243 Mem::Owner<OneToOneUniQueues> *const queuesOwner;
244 Mem::Owner<QueueReaders> *const readersOwner;
245 };
246
247 static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
248
249 enum Group { groupA = 0, groupB = 1 };
250 FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId);
251
252 /// maximum number of items in the queue
253 static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity);
254
255 /// finds the oldest item in incoming and outgoing queues between
256 /// us and the given remote process
257 template<class Value> bool findOldest(const int remoteProcessId, Value &value) const;
258
259 protected:
260 virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
261 virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
262 virtual const QueueReader &localReader() const;
263 virtual const QueueReader &remoteReader(const int processId) const;
264 virtual int remotesCount() const;
265 virtual int remotesIdOffset() const;
266
267 private:
268 bool validProcessId(const Group group, const int processId) const;
269 int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
270 const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
271 int readerIndex(const Group group, const int processId) const;
272 Group localGroup() const { return theLocalGroup; }
273 Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
274
275 private:
276 const Mem::Pointer<Metadata> metadata; ///< shared metadata
277 const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
278 const Mem::Pointer<QueueReaders> readers; ///< readers array
279
280 const Group theLocalGroup; ///< group of this queue
281 };
282
283 /**
284 * Lockless fixed-capacity bidirectional queue for a limited number
285 * processes. Any process may send data to and receive from any other
286 * process (including itself). Each process has a unique integer ID in
287 * [processIdOffset, processIdOffset + processCount) range.
288 */
289 class MultiQueue: public BaseMultiQueue
290 {
291 public:
292 typedef OneToOneUniQueue::Full Full;
293 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
294
295 private:
296 /// Shared metadata for MultiQueue
297 struct Metadata {
298 Metadata(const int aProcessCount, const int aProcessIdOffset);
299 size_t sharedMemorySize() const { return sizeof(*this); }
300 static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); }
301
302 const int theProcessCount;
303 const int theProcessIdOffset;
304 };
305
306 public:
307 class Owner
308 {
309 public:
310 Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
311 ~Owner();
312
313 private:
314 Mem::Owner<Metadata> *const metadataOwner;
315 Mem::Owner<OneToOneUniQueues> *const queuesOwner;
316 Mem::Owner<QueueReaders> *const readersOwner;
317 };
318
319 static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
320
321 MultiQueue(const String &id, const int localProcessId);
322
323 protected:
324 virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
325 virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
326 virtual const QueueReader &localReader() const;
327 virtual const QueueReader &remoteReader(const int remoteProcessId) const;
328 virtual int remotesCount() const;
329 virtual int remotesIdOffset() const;
330
331 private:
332 bool validProcessId(const int processId) const;
333 const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const;
334 const QueueReader &reader(const int processId) const;
335
336 private:
337 const Mem::Pointer<Metadata> metadata; ///< shared metadata
338 const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
339 const Mem::Pointer<QueueReaders> readers; ///< readers array
340 };
341
342 // OneToOneUniQueue
343
344 template <class Value>
345 bool
346 OneToOneUniQueue::pop(Value &value, QueueReader *const reader)
347 {
348 if (sizeof(value) > theMaxItemSize)
349 throw ItemTooLarge();
350
351 // A writer might push between the empty test and block() below, so we do
352 // not return false right after calling block(), but test again.
353 if (empty()) {
354 if (!reader)
355 return false;
356
357 reader->block();
358 // A writer might push between the empty test and block() below,
359 // so we must test again as such a writer will not signal us.
360 if (empty())
361 return false;
362 }
363
364 if (reader)
365 reader->unblock();
366
367 const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize;
368 memcpy(&value, theBuffer + pos, sizeof(value));
369 --theSize;
370
371 return true;
372 }
373
374 template <class Value>
375 bool
376 OneToOneUniQueue::peek(Value &value) const
377 {
378 if (sizeof(value) > theMaxItemSize)
379 throw ItemTooLarge();
380
381 if (empty())
382 return false;
383
384 // the reader may pop() before we copy; making this method imprecise
385 const unsigned int pos = (theOut % theCapacity) * theMaxItemSize;
386 memcpy(&value, theBuffer + pos, sizeof(value));
387 return true;
388 }
389
390 template <class Value>
391 bool
392 OneToOneUniQueue::push(const Value &value, QueueReader *const reader)
393 {
394 if (sizeof(value) > theMaxItemSize)
395 throw ItemTooLarge();
396
397 if (full())
398 throw Full();
399
400 const unsigned int pos = theIn++ % theCapacity * theMaxItemSize;
401 memcpy(theBuffer + pos, &value, sizeof(value));
402 const bool wasEmpty = !theSize++;
403
404 return wasEmpty && (!reader || reader->raiseSignal());
405 }
406
407 // OneToOneUniQueues
408
409 inline OneToOneUniQueue &
410 OneToOneUniQueues::operator [](const int index)
411 {
412 return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]);
413 }
414
415 inline const OneToOneUniQueue &
416 OneToOneUniQueues::front() const
417 {
418 const char *const queue =
419 reinterpret_cast<const char *>(this) + sizeof(*this);
420 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
421 }
422
423 // BaseMultiQueue
424
425 template <class Value>
426 bool
427 BaseMultiQueue::pop(int &remoteProcessId, Value &value)
428 {
429 // iterate all remote processes, starting after the one we visited last
430 for (int i = 0; i < remotesCount(); ++i) {
431 if (++theLastPopProcessId >= remotesIdOffset() + remotesCount())
432 theLastPopProcessId = remotesIdOffset();
433 OneToOneUniQueue &queue = inQueue(theLastPopProcessId);
434 if (queue.pop(value, &localReader())) {
435 remoteProcessId = theLastPopProcessId;
436 debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
437 return true;
438 }
439 }
440 return false; // no process had anything to pop
441 }
442
443 template <class Value>
444 bool
445 BaseMultiQueue::push(const int remoteProcessId, const Value &value)
446 {
447 OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId);
448 QueueReader &reader = remoteReader(remoteProcessId);
449 debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
450 return remoteQueue.push(value, &reader);
451 }
452
453 template <class Value>
454 bool
455 BaseMultiQueue::peek(int &remoteProcessId, Value &value) const
456 {
457 // mimic FewToFewBiQueue::pop() but quit just before popping
458 int popProcessId = theLastPopProcessId; // preserve for future pop()
459 for (int i = 0; i < remotesCount(); ++i) {
460 if (++popProcessId >= remotesIdOffset() + remotesCount())
461 popProcessId = remotesIdOffset();
462 const OneToOneUniQueue &queue = inQueue(popProcessId);
463 if (queue.peek(value)) {
464 remoteProcessId = popProcessId;
465 return true;
466 }
467 }
468 return false; // most likely, no process had anything to pop
469 }
470
471 // FewToFewBiQueue
472
473 template <class Value>
474 bool
475 FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const
476 {
477 // we may be called before remote process configured its queue end
478 if (!validProcessId(remoteGroup(), remoteProcessId))
479 return false;
480
481 // we need the oldest value, so start with the incoming, them-to-us queue:
482 const OneToOneUniQueue &in = inQueue(remoteProcessId);
483 debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " <<
484 theLocalProcessId << " at " << in.size());
485 if (in.peek(value))
486 return true;
487
488 // if the incoming queue is empty, check the outgoing, us-to-them queue:
489 const OneToOneUniQueue &out = outQueue(remoteProcessId);
490 debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " <<
491 remoteProcessId << " at " << out.size());
492 return out.peek(value);
493 }
494
495 } // namespace Ipc
496
497 #endif // SQUID_IPC_QUEUE_H