]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.h
Report SMP store queues state (mgr:store_queues) (#690)
[thirdparty/squid.git] / src / ipc / Queue.h
1 /*
2 * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 #ifndef SQUID_IPC_QUEUE_H
10 #define SQUID_IPC_QUEUE_H
11
12 #include "base/InstanceId.h"
13 #include "Debug.h"
14 #include "ipc/mem/FlexibleArray.h"
15 #include "ipc/mem/Pointer.h"
16 #include "util.h"
17
18 #include <algorithm>
19 #include <atomic>
20
21 class String;
22
23 namespace Ipc
24 {
25
26 /// State of the reading end of a queue (i.e., of the code calling pop()).
27 /// Multiple queues attached to one reader share this state.
28 class QueueReader
29 {
30 public:
31 QueueReader(); // the initial state is "blocked without a signal"
32
33 /// whether the reader is waiting for a notification signal
34 bool blocked() const { return popBlocked.load(); }
35
36 /// marks the reader as blocked, waiting for a notification signal
37 void block() { popBlocked.store(true); }
38
39 /// removes the block() effects
40 void unblock() { popBlocked.store(false); }
41
42 /// if reader is blocked and not notified, marks the notification signal
43 /// as sent and not received, returning true; otherwise, returns false
44 bool raiseSignal() { return blocked() && !popSignal.exchange(true); }
45
46 /// marks sent reader notification as received (also removes pop blocking)
47 void clearSignal() { unblock(); popSignal.store(false); }
48
49 private:
50 std::atomic<bool> popBlocked; ///< whether the reader is blocked on pop()
51 std::atomic<bool> popSignal; ///< whether writer has sent and reader has not received notification
52
53 public:
54 typedef std::atomic<int> Rate; ///< pop()s per second
55 Rate rateLimit; ///< pop()s per second limit if positive
56
57 // we need a signed atomic type because balance may get negative
58 typedef std::atomic<int> AtomicSignedMsec;
59 typedef AtomicSignedMsec Balance;
60 /// how far ahead the reader is compared to a perfect read/sec event rate
61 Balance balance;
62
63 /// unique ID for debugging which reader is used (works across processes)
64 const InstanceId<QueueReader> id;
65 };
66
67 /// shared array of QueueReaders
68 class QueueReaders
69 {
70 public:
71 QueueReaders(const int aCapacity);
72 size_t sharedMemorySize() const;
73 static size_t SharedMemorySize(const int capacity);
74
75 const int theCapacity; /// number of readers
76 Ipc::Mem::FlexibleArray<QueueReader> theReaders; /// readers
77 };
78
79 /**
80 * Lockless fixed-capacity queue for a single writer and a single reader.
81 *
82 * If the queue is empty, the reader is considered "blocked" and needs
83 * an out-of-band notification message to notice the next pushed item.
84 *
85 * Current implementation assumes that the writer cannot get blocked: if the
86 * queue is full, the writer will just not push and come back later (with a
87 * different value). We can add support for blocked writers if needed.
88 */
89 class OneToOneUniQueue
90 {
91 public:
92 // pop() and push() exceptions; TODO: use TextException instead
93 class Full {};
94 class ItemTooLarge {};
95
96 OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity);
97
98 unsigned int maxItemSize() const { return theMaxItemSize; }
99 int size() const { return theSize; }
100 int capacity() const { return theCapacity; }
101 int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); }
102
103 bool empty() const { return !theSize; }
104 bool full() const { return theSize == theCapacity; }
105
106 static int Bytes2Items(const unsigned int maxItemSize, int size);
107 static int Items2Bytes(const unsigned int maxItemSize, const int size);
108
109 /// returns true iff the value was set; [un]blocks the reader as needed
110 template<class Value> bool pop(Value &value, QueueReader *const reader = NULL);
111
112 /// returns true iff the caller must notify the reader of the pushed item
113 template<class Value> bool push(const Value &value, QueueReader *const reader = NULL);
114
115 /// returns true iff the value was set; the value may be stale!
116 template<class Value> bool peek(Value &value) const;
117
118 /// prints incoming queue state; suitable for cache manager reports
119 template<class Value> void statIn(std::ostream &, int localProcessId, int remoteProcessId) const;
120 /// prints outgoing queue state; suitable for cache manager reports
121 template<class Value> void statOut(std::ostream &, int localProcessId, int remoteProcessId) const;
122
123 private:
124 void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const;
125 void statClose(std::ostream &) const;
126 template<class Value> void statSamples(std::ostream &, unsigned int start, uint32_t size) const;
127 template<class Value> void statRange(std::ostream &, unsigned int start, uint32_t n) const;
128
129 // optimization: these non-std::atomic data members are in shared memory,
130 // but each is used only by one process (aside from obscured reporting)
131 unsigned int theIn; ///< current push() position; reporting aside, used only in push()
132 unsigned int theOut; ///< current pop() position; reporting aside, used only in pop()/peek()
133
134 std::atomic<uint32_t> theSize; ///< number of items in the queue
135 const unsigned int theMaxItemSize; ///< maximum item size
136 const uint32_t theCapacity; ///< maximum number of items, i.e. theBuffer size
137
138 char theBuffer[];
139 };
140
141 /// shared array of OneToOneUniQueues
142 class OneToOneUniQueues
143 {
144 public:
145 OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity);
146
147 size_t sharedMemorySize() const;
148 static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity);
149
150 const OneToOneUniQueue &operator [](const int index) const;
151 inline OneToOneUniQueue &operator [](const int index);
152
153 private:
154 inline const OneToOneUniQueue &front() const;
155
156 public:
157 const int theCapacity; /// number of OneToOneUniQueues
158 };
159
160 /**
161 * Base class for lockless fixed-capacity bidirectional queues for a
162 * limited number processes.
163 */
164 class BaseMultiQueue
165 {
166 public:
167 BaseMultiQueue(const int aLocalProcessId);
168 virtual ~BaseMultiQueue() {}
169
170 /// clears the reader notification received by the local process from the remote process
171 void clearReaderSignal(const int remoteProcessId);
172
173 /// picks a process and calls OneToOneUniQueue::pop() using its queue
174 template <class Value> bool pop(int &remoteProcessId, Value &value);
175
176 /// calls OneToOneUniQueue::push() using the given process queue
177 template <class Value> bool push(const int remoteProcessId, const Value &value);
178
179 /// peeks at the item likely to be pop()ed next
180 template<class Value> bool peek(int &remoteProcessId, Value &value) const;
181
182 /// prints current state; suitable for cache manager reports
183 template<class Value> void stat(std::ostream &) const;
184
185 /// returns local reader's balance
186 QueueReader::Balance &localBalance() { return localReader().balance; }
187
188 /// returns reader's balance for a given remote process
189 const QueueReader::Balance &balance(const int remoteProcessId) const;
190
191 /// returns local reader's rate limit
192 QueueReader::Rate &localRateLimit() { return localReader().rateLimit; }
193
194 /// returns reader's rate limit for a given remote process
195 const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
196
197 /// number of items in incoming queue from a given remote process
198 int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
199
200 /// number of items in outgoing queue to a given remote process
201 int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
202
203 protected:
204 /// incoming queue from a given remote process
205 virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0;
206 OneToOneUniQueue &inQueue(const int remoteProcessId);
207
208 /// outgoing queue to a given remote process
209 virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0;
210 OneToOneUniQueue &outQueue(const int remoteProcessId);
211
212 virtual const QueueReader &localReader() const = 0;
213 QueueReader &localReader();
214
215 virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0;
216 QueueReader &remoteReader(const int remoteProcessId);
217
218 virtual int remotesCount() const = 0;
219 virtual int remotesIdOffset() const = 0;
220
221 protected:
222 const int theLocalProcessId; ///< process ID of this queue
223
224 private:
225 int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
226 };
227
228 /**
229 * Lockless fixed-capacity bidirectional queue for a limited number
230 * processes. Allows communication between two groups of processes:
231 * any process in one group may send data to and receive from any
232 * process in another group, but processes in the same group can not
233 * communicate. Process in each group has a unique integer ID in
234 * [groupIdOffset, groupIdOffset + groupSize) range.
235 */
236 class FewToFewBiQueue: public BaseMultiQueue
237 {
238 public:
239 typedef OneToOneUniQueue::Full Full;
240 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
241
242 private:
243 /// Shared metadata for FewToFewBiQueue
244 struct Metadata {
245 Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset);
246 size_t sharedMemorySize() const { return sizeof(*this); }
247 static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); }
248
249 const int theGroupASize;
250 const int theGroupAIdOffset;
251 const int theGroupBSize;
252 const int theGroupBIdOffset;
253 };
254
255 public:
256 class Owner
257 {
258 public:
259 Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
260 ~Owner();
261
262 private:
263 Mem::Owner<Metadata> *const metadataOwner;
264 Mem::Owner<OneToOneUniQueues> *const queuesOwner;
265 Mem::Owner<QueueReaders> *const readersOwner;
266 };
267
268 static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
269
270 enum Group { groupA = 0, groupB = 1 };
271 FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId);
272
273 /// maximum number of items in the queue
274 static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity);
275
276 /// finds the oldest item in incoming and outgoing queues between
277 /// us and the given remote process
278 template<class Value> bool findOldest(const int remoteProcessId, Value &value) const;
279
280 protected:
281 virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
282 virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
283 virtual const QueueReader &localReader() const;
284 virtual const QueueReader &remoteReader(const int processId) const;
285 virtual int remotesCount() const;
286 virtual int remotesIdOffset() const;
287
288 private:
289 bool validProcessId(const Group group, const int processId) const;
290 int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
291 const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
292 int readerIndex(const Group group, const int processId) const;
293 Group localGroup() const { return theLocalGroup; }
294 Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
295
296 private:
297 const Mem::Pointer<Metadata> metadata; ///< shared metadata
298 const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
299 const Mem::Pointer<QueueReaders> readers; ///< readers array
300
301 const Group theLocalGroup; ///< group of this queue
302 };
303
304 /**
305 * Lockless fixed-capacity bidirectional queue for a limited number
306 * processes. Any process may send data to and receive from any other
307 * process (including itself). Each process has a unique integer ID in
308 * [processIdOffset, processIdOffset + processCount) range.
309 */
310 class MultiQueue: public BaseMultiQueue
311 {
312 public:
313 typedef OneToOneUniQueue::Full Full;
314 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
315
316 private:
317 /// Shared metadata for MultiQueue
318 struct Metadata {
319 Metadata(const int aProcessCount, const int aProcessIdOffset);
320 size_t sharedMemorySize() const { return sizeof(*this); }
321 static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); }
322
323 const int theProcessCount;
324 const int theProcessIdOffset;
325 };
326
327 public:
328 class Owner
329 {
330 public:
331 Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
332 ~Owner();
333
334 private:
335 Mem::Owner<Metadata> *const metadataOwner;
336 Mem::Owner<OneToOneUniQueues> *const queuesOwner;
337 Mem::Owner<QueueReaders> *const readersOwner;
338 };
339
340 static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
341
342 MultiQueue(const String &id, const int localProcessId);
343
344 protected:
345 virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
346 virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
347 virtual const QueueReader &localReader() const;
348 virtual const QueueReader &remoteReader(const int remoteProcessId) const;
349 virtual int remotesCount() const;
350 virtual int remotesIdOffset() const;
351
352 private:
353 bool validProcessId(const int processId) const;
354 const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const;
355 const QueueReader &reader(const int processId) const;
356
357 private:
358 const Mem::Pointer<Metadata> metadata; ///< shared metadata
359 const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
360 const Mem::Pointer<QueueReaders> readers; ///< readers array
361 };
362
363 // OneToOneUniQueue
364
365 template <class Value>
366 bool
367 OneToOneUniQueue::pop(Value &value, QueueReader *const reader)
368 {
369 if (sizeof(value) > theMaxItemSize)
370 throw ItemTooLarge();
371
372 // A writer might push between the empty test and block() below, so we do
373 // not return false right after calling block(), but test again.
374 if (empty()) {
375 if (!reader)
376 return false;
377
378 reader->block();
379 // A writer might push between the empty test and block() below,
380 // so we must test again as such a writer will not signal us.
381 if (empty())
382 return false;
383 }
384
385 if (reader)
386 reader->unblock();
387
388 const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize;
389 memcpy(&value, theBuffer + pos, sizeof(value));
390 --theSize;
391
392 return true;
393 }
394
395 template <class Value>
396 bool
397 OneToOneUniQueue::peek(Value &value) const
398 {
399 if (sizeof(value) > theMaxItemSize)
400 throw ItemTooLarge();
401
402 if (empty())
403 return false;
404
405 // the reader may pop() before we copy; making this method imprecise
406 const unsigned int pos = (theOut % theCapacity) * theMaxItemSize;
407 memcpy(&value, theBuffer + pos, sizeof(value));
408 return true;
409 }
410
411 template <class Value>
412 bool
413 OneToOneUniQueue::push(const Value &value, QueueReader *const reader)
414 {
415 if (sizeof(value) > theMaxItemSize)
416 throw ItemTooLarge();
417
418 if (full())
419 throw Full();
420
421 const unsigned int pos = theIn++ % theCapacity * theMaxItemSize;
422 memcpy(theBuffer + pos, &value, sizeof(value));
423 const bool wasEmpty = !theSize++;
424
425 return wasEmpty && (!reader || reader->raiseSignal());
426 }
427
428 template <class Value>
429 void
430 OneToOneUniQueue::statIn(std::ostream &os, const int localProcessId, const int remoteProcessId) const
431 {
432 os << " kid" << localProcessId << " receiving from kid" << remoteProcessId << ": ";
433 // Nobody can modify our theOut so, after capturing some valid theSize value
434 // in count, we can reliably report all [theOut, theOut+count) items that
435 // were queued at theSize capturing time. We will miss new items push()ed by
436 // the other side, but it is OK -- we report state at the capturing time.
437 const auto count = theSize.load();
438 statOpen(os, "other", "popIndex", count);
439 statSamples<Value>(os, theOut, count);
440 statClose(os);
441 }
442
443 template <class Value>
444 void
445 OneToOneUniQueue::statOut(std::ostream &os, const int localProcessId, const int remoteProcessId) const
446 {
447 os << " kid" << localProcessId << " sending to kid" << remoteProcessId << ": ";
448 // Nobody can modify our theIn so, after capturing some valid theSize value
449 // in count, we can reliably report all [theIn-count, theIn) items that were
450 // queued at theSize capturing time. We may report items already pop()ed by
451 // the other side, but that is OK because pop() does not modify items -- it
452 // only increments theOut.
453 const auto count = theSize.load();
454 statOpen(os, "pushIndex", "other", count);
455 statSamples<Value>(os, theIn - count, count); // unsigned offset underflow OK
456 statClose(os);
457 }
458
459 /// report a sample of [start, start + size) items
460 template <class Value>
461 void
462 OneToOneUniQueue::statSamples(std::ostream &os, const unsigned int start, const uint32_t count) const
463 {
464 if (!count) {
465 os << " ";
466 return;
467 }
468
469 os << ", items: [\n";
470 // report a few leading and trailing items, without repetitions
471 const auto sampleSize = std::min(3U, count); // leading (and max) sample
472 statRange<Value>(os, start, sampleSize);
473 if (sampleSize < count) { // the first sample did not show some items
474 // The `start` offset aside, the first sample reported all items
475 // below the sampleSize offset. The second sample needs to report
476 // the last sampleSize items (i.e. starting at count-sampleSize
477 // offset) except those already reported by the first sample.
478 const auto secondSampleOffset = std::max(sampleSize, count - sampleSize);
479 const auto secondSampleSize = std::min(sampleSize, count - sampleSize);
480
481 // but first we print a sample separator, unless there are no items
482 // between the samples or the separator hides the only unsampled item
483 const auto bothSamples = sampleSize + secondSampleSize;
484 if (bothSamples + 1U == count)
485 statRange<Value>(os, start + sampleSize, 1);
486 else if (count > bothSamples)
487 os << " # ... " << (count - bothSamples) << " items not shown ...\n";
488
489 statRange<Value>(os, start + secondSampleOffset, secondSampleSize);
490 }
491 os << " ]";
492 }
493
494 /// statSamples() helper that reports n items from start
495 template <class Value>
496 void
497 OneToOneUniQueue::statRange(std::ostream &os, const unsigned int start, const uint32_t n) const
498 {
499 assert(sizeof(Value) <= theMaxItemSize);
500 auto offset = start;
501 for (uint32_t i = 0; i < n; ++i) {
502 // XXX: Throughout this C++ header, these overflow wrapping tricks work
503 // only because theCapacity currently happens to be a power of 2 (e.g.,
504 // the highest offset (0xF...FFF) % 3 is 0 and so is the next offset).
505 const auto pos = (offset++ % theCapacity) * theMaxItemSize;
506 Value value;
507 memcpy(&value, theBuffer + pos, sizeof(value));
508 os << " { ";
509 value.stat(os);
510 os << " },\n";
511 }
512 }
513
514 // OneToOneUniQueues
515
516 inline OneToOneUniQueue &
517 OneToOneUniQueues::operator [](const int index)
518 {
519 return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]);
520 }
521
522 inline const OneToOneUniQueue &
523 OneToOneUniQueues::front() const
524 {
525 const char *const queue =
526 reinterpret_cast<const char *>(this) + sizeof(*this);
527 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
528 }
529
530 // BaseMultiQueue
531
532 template <class Value>
533 bool
534 BaseMultiQueue::pop(int &remoteProcessId, Value &value)
535 {
536 // iterate all remote processes, starting after the one we visited last
537 for (int i = 0; i < remotesCount(); ++i) {
538 if (++theLastPopProcessId >= remotesIdOffset() + remotesCount())
539 theLastPopProcessId = remotesIdOffset();
540 OneToOneUniQueue &queue = inQueue(theLastPopProcessId);
541 if (queue.pop(value, &localReader())) {
542 remoteProcessId = theLastPopProcessId;
543 debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
544 return true;
545 }
546 }
547 return false; // no process had anything to pop
548 }
549
550 template <class Value>
551 bool
552 BaseMultiQueue::push(const int remoteProcessId, const Value &value)
553 {
554 OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId);
555 QueueReader &reader = remoteReader(remoteProcessId);
556 debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
557 return remoteQueue.push(value, &reader);
558 }
559
560 template <class Value>
561 bool
562 BaseMultiQueue::peek(int &remoteProcessId, Value &value) const
563 {
564 // mimic FewToFewBiQueue::pop() but quit just before popping
565 int popProcessId = theLastPopProcessId; // preserve for future pop()
566 for (int i = 0; i < remotesCount(); ++i) {
567 if (++popProcessId >= remotesIdOffset() + remotesCount())
568 popProcessId = remotesIdOffset();
569 const OneToOneUniQueue &queue = inQueue(popProcessId);
570 if (queue.peek(value)) {
571 remoteProcessId = popProcessId;
572 return true;
573 }
574 }
575 return false; // most likely, no process had anything to pop
576 }
577
578 template <class Value>
579 void
580 BaseMultiQueue::stat(std::ostream &os) const
581 {
582 for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
583 const auto &queue = inQueue(processId);
584 queue.statIn<Value>(os, theLocalProcessId, processId);
585 }
586
587 os << "\n";
588
589 for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
590 const auto &queue = outQueue(processId);
591 queue.statOut<Value>(os, theLocalProcessId, processId);
592 }
593 }
594
595 // FewToFewBiQueue
596
597 template <class Value>
598 bool
599 FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const
600 {
601 // we may be called before remote process configured its queue end
602 if (!validProcessId(remoteGroup(), remoteProcessId))
603 return false;
604
605 // we need the oldest value, so start with the incoming, them-to-us queue:
606 const OneToOneUniQueue &in = inQueue(remoteProcessId);
607 debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " <<
608 theLocalProcessId << " at " << in.size());
609 if (in.peek(value))
610 return true;
611
612 // if the incoming queue is empty, check the outgoing, us-to-them queue:
613 const OneToOneUniQueue &out = outQueue(remoteProcessId);
614 debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " <<
615 remoteProcessId << " at " << out.size());
616 return out.peek(value);
617 }
618
619 } // namespace Ipc
620
621 #endif // SQUID_IPC_QUEUE_H
622