]>
Commit | Line | Data |
---|---|---|
1 | /* | |
2 | * Copyright (C) 1996-2025 The Squid Software Foundation and contributors | |
3 | * | |
4 | * Squid software is distributed under GPLv2+ license and includes | |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
7 | */ | |
8 | ||
9 | #ifndef SQUID_SRC_IPC_QUEUE_H | |
10 | #define SQUID_SRC_IPC_QUEUE_H | |
11 | ||
12 | #include "base/InstanceId.h" | |
13 | #include "debug/Stream.h" | |
14 | #include "ipc/mem/FlexibleArray.h" | |
15 | #include "ipc/mem/Pointer.h" | |
16 | #include "util.h" | |
17 | ||
18 | #include <algorithm> | |
19 | #include <atomic> | |
20 | ||
21 | class String; | |
22 | ||
23 | namespace Ipc | |
24 | { | |
25 | ||
26 | /// State of the reading end of a queue (i.e., of the code calling pop()). | |
27 | /// Multiple queues attached to one reader share this state. | |
28 | class QueueReader | |
29 | { | |
30 | public: | |
31 | QueueReader(); // the initial state is "blocked without a signal" | |
32 | ||
33 | /// whether the reader is waiting for a notification signal | |
34 | bool blocked() const { return popBlocked.load(); } | |
35 | ||
36 | /// \copydoc popSignal | |
37 | bool signaled() const { return popSignal.load(); } | |
38 | ||
39 | /// marks the reader as blocked, waiting for a notification signal | |
40 | void block() { popBlocked.store(true); } | |
41 | ||
42 | /// removes the block() effects | |
43 | void unblock() { popBlocked.store(false); } | |
44 | ||
45 | /// if reader is blocked and not notified, marks the notification signal | |
46 | /// as sent and not received, returning true; otherwise, returns false | |
47 | bool raiseSignal() { return blocked() && !popSignal.exchange(true); } | |
48 | ||
49 | /// marks sent reader notification as received (also removes pop blocking) | |
50 | void clearSignal() { unblock(); popSignal.store(false); } | |
51 | ||
52 | private: | |
53 | std::atomic<bool> popBlocked; ///< whether the reader is blocked on pop() | |
54 | std::atomic<bool> popSignal; ///< whether writer has sent and reader has not received notification | |
55 | ||
56 | public: | |
57 | typedef std::atomic<int> Rate; ///< pop()s per second | |
58 | Rate rateLimit; ///< pop()s per second limit if positive | |
59 | ||
60 | // we need a signed atomic type because balance may get negative | |
61 | typedef std::atomic<int> AtomicSignedMsec; | |
62 | typedef AtomicSignedMsec Balance; | |
63 | /// how far ahead the reader is compared to a perfect read/sec event rate | |
64 | Balance balance; | |
65 | ||
66 | /// unique ID for debugging which reader is used (works across processes) | |
67 | const InstanceId<QueueReader> id; | |
68 | }; | |
69 | ||
70 | /// shared array of QueueReaders | |
71 | class QueueReaders | |
72 | { | |
73 | public: | |
74 | QueueReaders(const int aCapacity); | |
75 | size_t sharedMemorySize() const; | |
76 | static size_t SharedMemorySize(const int capacity); | |
77 | ||
78 | const int theCapacity; /// number of readers | |
79 | Ipc::Mem::FlexibleArray<QueueReader> theReaders; /// readers | |
80 | }; | |
81 | ||
82 | /** | |
83 | * Lockless fixed-capacity queue for a single writer and a single reader. | |
84 | * | |
85 | * If the queue is empty, the reader is considered "blocked" and needs | |
86 | * an out-of-band notification message to notice the next pushed item. | |
87 | * | |
88 | * Current implementation assumes that the writer cannot get blocked: if the | |
89 | * queue is full, the writer will just not push and come back later (with a | |
90 | * different value). We can add support for blocked writers if needed. | |
91 | */ | |
92 | class OneToOneUniQueue | |
93 | { | |
94 | public: | |
95 | // pop() and push() exceptions; TODO: use TextException instead | |
96 | class Full {}; | |
97 | class ItemTooLarge {}; | |
98 | ||
99 | OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity); | |
100 | ||
101 | unsigned int maxItemSize() const { return theMaxItemSize; } | |
102 | int size() const { return theSize; } | |
103 | int capacity() const { return theCapacity; } | |
104 | int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); } | |
105 | ||
106 | bool empty() const { return !theSize; } | |
107 | bool full() const { return theSize == theCapacity; } | |
108 | ||
109 | static int Bytes2Items(const unsigned int maxItemSize, int size); | |
110 | static int Items2Bytes(const unsigned int maxItemSize, const int size); | |
111 | ||
112 | /// returns true iff the value was set; [un]blocks the reader as needed | |
113 | template<class Value> bool pop(Value &value, QueueReader *const reader = nullptr); | |
114 | ||
115 | /// returns true iff the caller must notify the reader of the pushed item | |
116 | template<class Value> bool push(const Value &value, QueueReader *const reader = nullptr); | |
117 | ||
118 | /// returns true iff the value was set; the value may be stale! | |
119 | template<class Value> bool peek(Value &value) const; | |
120 | ||
121 | /// prints incoming queue state; suitable for cache manager reports | |
122 | template<class Value> void statIn(std::ostream &, int localProcessId, int remoteProcessId) const; | |
123 | /// prints outgoing queue state; suitable for cache manager reports | |
124 | template<class Value> void statOut(std::ostream &, int localProcessId, int remoteProcessId) const; | |
125 | ||
126 | private: | |
127 | void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const; | |
128 | void statClose(std::ostream &) const; | |
129 | template<class Value> void statSamples(std::ostream &, unsigned int start, uint32_t size) const; | |
130 | template<class Value> void statRange(std::ostream &, unsigned int start, uint32_t n) const; | |
131 | ||
132 | // optimization: these non-std::atomic data members are in shared memory, | |
133 | // but each is used only by one process (aside from obscured reporting) | |
134 | unsigned int theIn; ///< current push() position; reporting aside, used only in push() | |
135 | unsigned int theOut; ///< current pop() position; reporting aside, used only in pop()/peek() | |
136 | ||
137 | std::atomic<uint32_t> theSize; ///< number of items in the queue | |
138 | const unsigned int theMaxItemSize; ///< maximum item size | |
139 | const uint32_t theCapacity; ///< maximum number of items, i.e. theBuffer size | |
140 | ||
141 | char theBuffer[]; | |
142 | }; | |
143 | ||
144 | /// shared array of OneToOneUniQueues | |
145 | class OneToOneUniQueues | |
146 | { | |
147 | public: | |
148 | OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity); | |
149 | ||
150 | size_t sharedMemorySize() const; | |
151 | static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity); | |
152 | ||
153 | const OneToOneUniQueue &operator [](const int index) const; | |
154 | inline OneToOneUniQueue &operator [](const int index); | |
155 | ||
156 | private: | |
157 | inline const OneToOneUniQueue &front() const; | |
158 | ||
159 | public: | |
160 | const int theCapacity; /// number of OneToOneUniQueues | |
161 | }; | |
162 | ||
163 | /** | |
164 | * Base class for lockless fixed-capacity bidirectional queues for a | |
165 | * limited number processes. | |
166 | */ | |
167 | class BaseMultiQueue | |
168 | { | |
169 | public: | |
170 | BaseMultiQueue(const int aLocalProcessId); | |
171 | virtual ~BaseMultiQueue() {} | |
172 | ||
173 | /// clears the reader notification received by the local process from the remote process | |
174 | void clearReaderSignal(const int remoteProcessId); | |
175 | ||
176 | /// clears all reader notifications received by the local process | |
177 | void clearAllReaderSignals(); | |
178 | ||
179 | /// picks a process and calls OneToOneUniQueue::pop() using its queue | |
180 | template <class Value> bool pop(int &remoteProcessId, Value &value); | |
181 | ||
182 | /// calls OneToOneUniQueue::push() using the given process queue | |
183 | template <class Value> bool push(const int remoteProcessId, const Value &value); | |
184 | ||
185 | /// peeks at the item likely to be pop()ed next | |
186 | template<class Value> bool peek(int &remoteProcessId, Value &value) const; | |
187 | ||
188 | /// prints current state; suitable for cache manager reports | |
189 | template<class Value> void stat(std::ostream &) const; | |
190 | ||
191 | /// returns local reader's balance | |
192 | QueueReader::Balance &localBalance() { return localReader().balance; } | |
193 | ||
194 | /// returns reader's balance for a given remote process | |
195 | const QueueReader::Balance &balance(const int remoteProcessId) const; | |
196 | ||
197 | /// returns local reader's rate limit | |
198 | QueueReader::Rate &localRateLimit() { return localReader().rateLimit; } | |
199 | ||
200 | /// returns reader's rate limit for a given remote process | |
201 | const QueueReader::Rate &rateLimit(const int remoteProcessId) const; | |
202 | ||
203 | /// number of items in incoming queue from a given remote process | |
204 | int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); } | |
205 | ||
206 | /// number of items in outgoing queue to a given remote process | |
207 | int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); } | |
208 | ||
209 | protected: | |
210 | /// incoming queue from a given remote process | |
211 | virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0; | |
212 | OneToOneUniQueue &inQueue(const int remoteProcessId); | |
213 | ||
214 | /// outgoing queue to a given remote process | |
215 | virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0; | |
216 | OneToOneUniQueue &outQueue(const int remoteProcessId); | |
217 | ||
218 | virtual const QueueReader &localReader() const = 0; | |
219 | QueueReader &localReader(); | |
220 | ||
221 | virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0; | |
222 | QueueReader &remoteReader(const int remoteProcessId); | |
223 | ||
224 | virtual int remotesCount() const = 0; | |
225 | virtual int remotesIdOffset() const = 0; | |
226 | ||
227 | protected: | |
228 | const int theLocalProcessId; ///< process ID of this queue | |
229 | ||
230 | private: | |
231 | int theLastPopProcessId; ///< the ID of the last process we tried to pop() from | |
232 | }; | |
233 | ||
234 | /** | |
235 | * Lockless fixed-capacity bidirectional queue for a limited number | |
236 | * processes. Allows communication between two groups of processes: | |
237 | * any process in one group may send data to and receive from any | |
238 | * process in another group, but processes in the same group can not | |
239 | * communicate. Process in each group has a unique integer ID in | |
240 | * [groupIdOffset, groupIdOffset + groupSize) range. | |
241 | */ | |
242 | class FewToFewBiQueue: public BaseMultiQueue | |
243 | { | |
244 | public: | |
245 | typedef OneToOneUniQueue::Full Full; | |
246 | typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; | |
247 | ||
248 | private: | |
249 | /// Shared metadata for FewToFewBiQueue | |
250 | struct Metadata { | |
251 | Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset); | |
252 | size_t sharedMemorySize() const { return sizeof(*this); } | |
253 | static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); } | |
254 | ||
255 | const int theGroupASize; | |
256 | const int theGroupAIdOffset; | |
257 | const int theGroupBSize; | |
258 | const int theGroupBIdOffset; | |
259 | }; | |
260 | ||
261 | public: | |
262 | class Owner | |
263 | { | |
264 | public: | |
265 | Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); | |
266 | ~Owner(); | |
267 | ||
268 | private: | |
269 | Mem::Owner<Metadata> *const metadataOwner; | |
270 | Mem::Owner<OneToOneUniQueues> *const queuesOwner; | |
271 | Mem::Owner<QueueReaders> *const readersOwner; | |
272 | }; | |
273 | ||
274 | static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); | |
275 | ||
276 | enum Group { groupA = 0, groupB = 1 }; | |
277 | FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId); | |
278 | ||
279 | /// maximum number of items in the queue | |
280 | static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity); | |
281 | ||
282 | /// finds the oldest item in incoming and outgoing queues between | |
283 | /// us and the given remote process | |
284 | template<class Value> bool findOldest(const int remoteProcessId, Value &value) const; | |
285 | ||
286 | protected: | |
287 | const OneToOneUniQueue &inQueue(const int remoteProcessId) const override; | |
288 | const OneToOneUniQueue &outQueue(const int remoteProcessId) const override; | |
289 | const QueueReader &localReader() const override; | |
290 | const QueueReader &remoteReader(const int processId) const override; | |
291 | int remotesCount() const override; | |
292 | int remotesIdOffset() const override; | |
293 | ||
294 | private: | |
295 | bool validProcessId(const Group group, const int processId) const; | |
296 | int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; | |
297 | const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; | |
298 | int readerIndex(const Group group, const int processId) const; | |
299 | Group localGroup() const { return theLocalGroup; } | |
300 | Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; } | |
301 | ||
302 | private: | |
303 | const Mem::Pointer<Metadata> metadata; ///< shared metadata | |
304 | const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues | |
305 | const Mem::Pointer<QueueReaders> readers; ///< readers array | |
306 | ||
307 | const Group theLocalGroup; ///< group of this queue | |
308 | }; | |
309 | ||
310 | /** | |
311 | * Lockless fixed-capacity bidirectional queue for a limited number | |
312 | * processes. Any process may send data to and receive from any other | |
313 | * process (including itself). Each process has a unique integer ID in | |
314 | * [processIdOffset, processIdOffset + processCount) range. | |
315 | */ | |
316 | class MultiQueue: public BaseMultiQueue | |
317 | { | |
318 | public: | |
319 | typedef OneToOneUniQueue::Full Full; | |
320 | typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; | |
321 | ||
322 | private: | |
323 | /// Shared metadata for MultiQueue | |
324 | struct Metadata { | |
325 | Metadata(const int aProcessCount, const int aProcessIdOffset); | |
326 | size_t sharedMemorySize() const { return sizeof(*this); } | |
327 | static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); } | |
328 | ||
329 | const int theProcessCount; | |
330 | const int theProcessIdOffset; | |
331 | }; | |
332 | ||
333 | public: | |
334 | class Owner | |
335 | { | |
336 | public: | |
337 | Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity); | |
338 | ~Owner(); | |
339 | ||
340 | private: | |
341 | Mem::Owner<Metadata> *const metadataOwner; | |
342 | Mem::Owner<OneToOneUniQueues> *const queuesOwner; | |
343 | Mem::Owner<QueueReaders> *const readersOwner; | |
344 | }; | |
345 | ||
346 | static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity); | |
347 | ||
348 | MultiQueue(const String &id, const int localProcessId); | |
349 | ||
350 | protected: | |
351 | const OneToOneUniQueue &inQueue(const int remoteProcessId) const override; | |
352 | const OneToOneUniQueue &outQueue(const int remoteProcessId) const override; | |
353 | const QueueReader &localReader() const override; | |
354 | const QueueReader &remoteReader(const int remoteProcessId) const override; | |
355 | int remotesCount() const override; | |
356 | int remotesIdOffset() const override; | |
357 | ||
358 | private: | |
359 | bool validProcessId(const int processId) const; | |
360 | const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const; | |
361 | const QueueReader &reader(const int processId) const; | |
362 | ||
363 | private: | |
364 | const Mem::Pointer<Metadata> metadata; ///< shared metadata | |
365 | const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues | |
366 | const Mem::Pointer<QueueReaders> readers; ///< readers array | |
367 | }; | |
368 | ||
369 | // OneToOneUniQueue | |
370 | ||
371 | template <class Value> | |
372 | bool | |
373 | OneToOneUniQueue::pop(Value &value, QueueReader *const reader) | |
374 | { | |
375 | if (sizeof(value) > theMaxItemSize) | |
376 | throw ItemTooLarge(); | |
377 | ||
378 | // A writer might push between the empty test and block() below, so we do | |
379 | // not return false right after calling block(), but test again. | |
380 | if (empty()) { | |
381 | if (!reader) | |
382 | return false; | |
383 | ||
384 | reader->block(); | |
385 | // A writer might push between the empty test and block() below, | |
386 | // so we must test again as such a writer will not signal us. | |
387 | if (empty()) | |
388 | return false; | |
389 | } | |
390 | ||
391 | if (reader) | |
392 | reader->unblock(); | |
393 | ||
394 | const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize; | |
395 | memcpy(&value, theBuffer + pos, sizeof(value)); | |
396 | --theSize; | |
397 | ||
398 | return true; | |
399 | } | |
400 | ||
401 | template <class Value> | |
402 | bool | |
403 | OneToOneUniQueue::peek(Value &value) const | |
404 | { | |
405 | if (sizeof(value) > theMaxItemSize) | |
406 | throw ItemTooLarge(); | |
407 | ||
408 | if (empty()) | |
409 | return false; | |
410 | ||
411 | // the reader may pop() before we copy; making this method imprecise | |
412 | const unsigned int pos = (theOut % theCapacity) * theMaxItemSize; | |
413 | memcpy(&value, theBuffer + pos, sizeof(value)); | |
414 | return true; | |
415 | } | |
416 | ||
417 | template <class Value> | |
418 | bool | |
419 | OneToOneUniQueue::push(const Value &value, QueueReader *const reader) | |
420 | { | |
421 | if (sizeof(value) > theMaxItemSize) | |
422 | throw ItemTooLarge(); | |
423 | ||
424 | if (full()) | |
425 | throw Full(); | |
426 | ||
427 | const unsigned int pos = theIn++ % theCapacity * theMaxItemSize; | |
428 | memcpy(theBuffer + pos, &value, sizeof(value)); | |
429 | const bool wasEmpty = !theSize++; | |
430 | ||
431 | return wasEmpty && (!reader || reader->raiseSignal()); | |
432 | } | |
433 | ||
434 | template <class Value> | |
435 | void | |
436 | OneToOneUniQueue::statIn(std::ostream &os, const int localProcessId, const int remoteProcessId) const | |
437 | { | |
438 | os << " kid" << localProcessId << " receiving from kid" << remoteProcessId << ": "; | |
439 | // Nobody can modify our theOut so, after capturing some valid theSize value | |
440 | // in count, we can reliably report all [theOut, theOut+count) items that | |
441 | // were queued at theSize capturing time. We will miss new items push()ed by | |
442 | // the other side, but it is OK -- we report state at the capturing time. | |
443 | const auto count = theSize.load(); | |
444 | statOpen(os, "other", "popIndex", count); | |
445 | statSamples<Value>(os, theOut, count); | |
446 | statClose(os); | |
447 | } | |
448 | ||
449 | template <class Value> | |
450 | void | |
451 | OneToOneUniQueue::statOut(std::ostream &os, const int localProcessId, const int remoteProcessId) const | |
452 | { | |
453 | os << " kid" << localProcessId << " sending to kid" << remoteProcessId << ": "; | |
454 | // Nobody can modify our theIn so, after capturing some valid theSize value | |
455 | // in count, we can reliably report all [theIn-count, theIn) items that were | |
456 | // queued at theSize capturing time. We may report items already pop()ed by | |
457 | // the other side, but that is OK because pop() does not modify items -- it | |
458 | // only increments theOut. | |
459 | const auto count = theSize.load(); | |
460 | statOpen(os, "pushIndex", "other", count); | |
461 | statSamples<Value>(os, theIn - count, count); // unsigned offset underflow OK | |
462 | statClose(os); | |
463 | } | |
464 | ||
465 | /// report a sample of [start, start + size) items | |
466 | template <class Value> | |
467 | void | |
468 | OneToOneUniQueue::statSamples(std::ostream &os, const unsigned int start, const uint32_t count) const | |
469 | { | |
470 | if (!count) { | |
471 | os << " "; | |
472 | return; | |
473 | } | |
474 | ||
475 | os << ", items: [\n"; | |
476 | // report a few leading and trailing items, without repetitions | |
477 | const auto sampleSize = std::min(3U, count); // leading (and max) sample | |
478 | statRange<Value>(os, start, sampleSize); | |
479 | if (sampleSize < count) { // the first sample did not show some items | |
480 | // The `start` offset aside, the first sample reported all items | |
481 | // below the sampleSize offset. The second sample needs to report | |
482 | // the last sampleSize items (i.e. starting at count-sampleSize | |
483 | // offset) except those already reported by the first sample. | |
484 | const auto secondSampleOffset = std::max(sampleSize, count - sampleSize); | |
485 | const auto secondSampleSize = std::min(sampleSize, count - sampleSize); | |
486 | ||
487 | // but first we print a sample separator, unless there are no items | |
488 | // between the samples or the separator hides the only unsampled item | |
489 | const auto bothSamples = sampleSize + secondSampleSize; | |
490 | if (bothSamples + 1U == count) | |
491 | statRange<Value>(os, start + sampleSize, 1); | |
492 | else if (count > bothSamples) | |
493 | os << " # ... " << (count - bothSamples) << " items not shown ...\n"; | |
494 | ||
495 | statRange<Value>(os, start + secondSampleOffset, secondSampleSize); | |
496 | } | |
497 | os << " ]"; | |
498 | } | |
499 | ||
500 | /// statSamples() helper that reports n items from start | |
501 | template <class Value> | |
502 | void | |
503 | OneToOneUniQueue::statRange(std::ostream &os, const unsigned int start, const uint32_t n) const | |
504 | { | |
505 | assert(sizeof(Value) <= theMaxItemSize); | |
506 | auto offset = start; | |
507 | for (uint32_t i = 0; i < n; ++i) { | |
508 | // XXX: Throughout this C++ header, these overflow wrapping tricks work | |
509 | // only because theCapacity currently happens to be a power of 2 (e.g., | |
510 | // the highest offset (0xF...FFF) % 3 is 0 and so is the next offset). | |
511 | const auto pos = (offset++ % theCapacity) * theMaxItemSize; | |
512 | Value value; | |
513 | memcpy(&value, theBuffer + pos, sizeof(value)); | |
514 | os << " { "; | |
515 | value.stat(os); | |
516 | os << " },\n"; | |
517 | } | |
518 | } | |
519 | ||
520 | // OneToOneUniQueues | |
521 | ||
522 | inline OneToOneUniQueue & | |
523 | OneToOneUniQueues::operator [](const int index) | |
524 | { | |
525 | return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]); | |
526 | } | |
527 | ||
528 | inline const OneToOneUniQueue & | |
529 | OneToOneUniQueues::front() const | |
530 | { | |
531 | const char *const queue = | |
532 | reinterpret_cast<const char *>(this) + sizeof(*this); | |
533 | return *reinterpret_cast<const OneToOneUniQueue *>(queue); | |
534 | } | |
535 | ||
536 | // BaseMultiQueue | |
537 | ||
538 | template <class Value> | |
539 | bool | |
540 | BaseMultiQueue::pop(int &remoteProcessId, Value &value) | |
541 | { | |
542 | // iterate all remote processes, starting after the one we visited last | |
543 | for (int i = 0; i < remotesCount(); ++i) { | |
544 | if (++theLastPopProcessId >= remotesIdOffset() + remotesCount()) | |
545 | theLastPopProcessId = remotesIdOffset(); | |
546 | OneToOneUniQueue &queue = inQueue(theLastPopProcessId); | |
547 | if (queue.pop(value, &localReader())) { | |
548 | remoteProcessId = theLastPopProcessId; | |
549 | debugs(54, 7, "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size()); | |
550 | return true; | |
551 | } | |
552 | } | |
553 | return false; // no process had anything to pop | |
554 | } | |
555 | ||
556 | template <class Value> | |
557 | bool | |
558 | BaseMultiQueue::push(const int remoteProcessId, const Value &value) | |
559 | { | |
560 | OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId); | |
561 | QueueReader &reader = remoteReader(remoteProcessId); | |
562 | debugs(54, 7, "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size()); | |
563 | return remoteQueue.push(value, &reader); | |
564 | } | |
565 | ||
566 | template <class Value> | |
567 | bool | |
568 | BaseMultiQueue::peek(int &remoteProcessId, Value &value) const | |
569 | { | |
570 | // mimic FewToFewBiQueue::pop() but quit just before popping | |
571 | int popProcessId = theLastPopProcessId; // preserve for future pop() | |
572 | for (int i = 0; i < remotesCount(); ++i) { | |
573 | if (++popProcessId >= remotesIdOffset() + remotesCount()) | |
574 | popProcessId = remotesIdOffset(); | |
575 | const OneToOneUniQueue &queue = inQueue(popProcessId); | |
576 | if (queue.peek(value)) { | |
577 | remoteProcessId = popProcessId; | |
578 | return true; | |
579 | } | |
580 | } | |
581 | return false; // most likely, no process had anything to pop | |
582 | } | |
583 | ||
584 | template <class Value> | |
585 | void | |
586 | BaseMultiQueue::stat(std::ostream &os) const | |
587 | { | |
588 | for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) { | |
589 | const auto &queue = inQueue(processId); | |
590 | queue.statIn<Value>(os, theLocalProcessId, processId); | |
591 | } | |
592 | ||
593 | os << "\n"; | |
594 | ||
595 | for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) { | |
596 | const auto &queue = outQueue(processId); | |
597 | queue.statOut<Value>(os, theLocalProcessId, processId); | |
598 | } | |
599 | ||
600 | os << "\n"; | |
601 | ||
602 | const auto &reader = localReader(); | |
603 | os << " kid" << theLocalProcessId << " reader flags: " << | |
604 | "{ blocked: " << reader.blocked() << ", signaled: " << reader.signaled() << " }\n"; | |
605 | } | |
606 | ||
607 | // FewToFewBiQueue | |
608 | ||
609 | template <class Value> | |
610 | bool | |
611 | FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const | |
612 | { | |
613 | // we may be called before remote process configured its queue end | |
614 | if (!validProcessId(remoteGroup(), remoteProcessId)) | |
615 | return false; | |
616 | ||
617 | // we need the oldest value, so start with the incoming, them-to-us queue: | |
618 | const OneToOneUniQueue &in = inQueue(remoteProcessId); | |
619 | debugs(54, 2, "peeking from " << remoteProcessId << " to " << | |
620 | theLocalProcessId << " at " << in.size()); | |
621 | if (in.peek(value)) | |
622 | return true; | |
623 | ||
624 | // if the incoming queue is empty, check the outgoing, us-to-them queue: | |
625 | const OneToOneUniQueue &out = outQueue(remoteProcessId); | |
626 | debugs(54, 2, "peeking from " << theLocalProcessId << " to " << | |
627 | remoteProcessId << " at " << out.size()); | |
628 | return out.peek(value); | |
629 | } | |
630 | ||
631 | } // namespace Ipc | |
632 | ||
633 | #endif /* SQUID_SRC_IPC_QUEUE_H */ | |
634 |