]>
Commit | Line | Data |
---|---|---|
9a51593d | 1 | /* |
1f7b830e | 2 | * Copyright (C) 1996-2025 The Squid Software Foundation and contributors |
bbc27441 AJ |
3 | * |
4 | * Squid software is distributed under GPLv2+ license and includes | |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
9a51593d DK |
7 | */ |
8 | ||
ff9d9458 FC |
9 | #ifndef SQUID_SRC_IPC_QUEUE_H |
10 | #define SQUID_SRC_IPC_QUEUE_H | |
9a51593d | 11 | |
602d9612 | 12 | #include "base/InstanceId.h" |
675b8408 | 13 | #include "debug/Stream.h" |
3a8c5551 | 14 | #include "ipc/mem/FlexibleArray.h" |
68353d5a | 15 | #include "ipc/mem/Pointer.h" |
9a51593d DK |
16 | #include "util.h" |
17 | ||
d8ee9e8d | 18 | #include <algorithm> |
6c6a656a AJ |
19 | #include <atomic> |
20 | ||
b2aa0934 DK |
21 | class String; |
22 | ||
9199139f AR |
23 | namespace Ipc |
24 | { | |
15cdbc7c | 25 | |
fa61cefe AR |
26 | /// State of the reading end of a queue (i.e., of the code calling pop()). |
27 | /// Multiple queues attached to one reader share this state. | |
9199139f AR |
28 | class QueueReader |
29 | { | |
fa61cefe AR |
30 | public: |
31 | QueueReader(); // the initial state is "blocked without a signal" | |
32 | ||
33 | /// whether the reader is waiting for a notification signal | |
6c6a656a | 34 | bool blocked() const { return popBlocked.load(); } |
fa61cefe | 35 | |
5faec1a1 EB |
36 | /// \copydoc popSignal |
37 | bool signaled() const { return popSignal.load(); } | |
38 | ||
fa61cefe | 39 | /// marks the reader as blocked, waiting for a notification signal |
6c6a656a | 40 | void block() { popBlocked.store(true); } |
fa61cefe AR |
41 | |
42 | /// removes the block() effects | |
6c6a656a | 43 | void unblock() { popBlocked.store(false); } |
fa61cefe AR |
44 | |
45 | /// if reader is blocked and not notified, marks the notification signal | |
46 | /// as sent and not received, returning true; otherwise, returns false | |
6c6a656a | 47 | bool raiseSignal() { return blocked() && !popSignal.exchange(true); } |
fa61cefe AR |
48 | |
49 | /// marks sent reader notification as received (also removes pop blocking) | |
6c6a656a | 50 | void clearSignal() { unblock(); popSignal.store(false); } |
fa61cefe AR |
51 | |
52 | private: | |
6c6a656a AJ |
53 | std::atomic<bool> popBlocked; ///< whether the reader is blocked on pop() |
54 | std::atomic<bool> popSignal; ///< whether writer has sent and reader has not received notification | |
fa61cefe AR |
55 | |
56 | public: | |
6c6a656a | 57 | typedef std::atomic<int> Rate; ///< pop()s per second |
df881a0f AR |
58 | Rate rateLimit; ///< pop()s per second limit if positive |
59 | ||
e9adbbd7 | 60 | // we need a signed atomic type because balance may get negative |
6c6a656a | 61 | typedef std::atomic<int> AtomicSignedMsec; |
df881a0f AR |
62 | typedef AtomicSignedMsec Balance; |
63 | /// how far ahead the reader is compared to a perfect read/sec event rate | |
64 | Balance balance; | |
65 | ||
fa61cefe AR |
66 | /// unique ID for debugging which reader is used (works across processes) |
67 | const InstanceId<QueueReader> id; | |
68 | }; | |
69 | ||
68353d5a | 70 | /// shared array of QueueReaders |
9199139f AR |
71 | class QueueReaders |
72 | { | |
68353d5a DK |
73 | public: |
74 | QueueReaders(const int aCapacity); | |
75 | size_t sharedMemorySize() const; | |
76 | static size_t SharedMemorySize(const int capacity); | |
77 | ||
78 | const int theCapacity; /// number of readers | |
3a8c5551 | 79 | Ipc::Mem::FlexibleArray<QueueReader> theReaders; /// readers |
68353d5a | 80 | }; |
fa61cefe AR |
81 | |
82 | /** | |
83 | * Lockless fixed-capacity queue for a single writer and a single reader. | |
84 | * | |
85 | * If the queue is empty, the reader is considered "blocked" and needs | |
86 | * an out-of-band notification message to notice the next pushed item. | |
87 | * | |
88 | * Current implementation assumes that the writer cannot get blocked: if the | |
89 | * queue is full, the writer will just not push and come back later (with a | |
90 | * different value). We can add support for blocked writers if needed. | |
91 | */ | |
9199139f AR |
92 | class OneToOneUniQueue |
93 | { | |
9a51593d | 94 | public: |
fa61cefe | 95 | // pop() and push() exceptions; TODO: use TextException instead |
7a907247 | 96 | class Full {}; |
b2aa0934 DK |
97 | class ItemTooLarge {}; |
98 | ||
f5591061 | 99 | OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity); |
68353d5a | 100 | |
f5591061 DK |
101 | unsigned int maxItemSize() const { return theMaxItemSize; } |
102 | int size() const { return theSize; } | |
103 | int capacity() const { return theCapacity; } | |
104 | int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); } | |
9a51593d | 105 | |
f5591061 DK |
106 | bool empty() const { return !theSize; } |
107 | bool full() const { return theSize == theCapacity; } | |
9a51593d | 108 | |
b2aa0934 DK |
109 | static int Bytes2Items(const unsigned int maxItemSize, int size); |
110 | static int Items2Bytes(const unsigned int maxItemSize, const int size); | |
111 | ||
fa61cefe | 112 | /// returns true iff the value was set; [un]blocks the reader as needed |
aee3523a | 113 | template<class Value> bool pop(Value &value, QueueReader *const reader = nullptr); |
fa61cefe AR |
114 | |
115 | /// returns true iff the caller must notify the reader of the pushed item | |
aee3523a | 116 | template<class Value> bool push(const Value &value, QueueReader *const reader = nullptr); |
9a51593d | 117 | |
0a11e039 AR |
118 | /// returns true iff the value was set; the value may be stale! |
119 | template<class Value> bool peek(Value &value) const; | |
120 | ||
d8ee9e8d EB |
121 | /// prints incoming queue state; suitable for cache manager reports |
122 | template<class Value> void statIn(std::ostream &, int localProcessId, int remoteProcessId) const; | |
123 | /// prints outgoing queue state; suitable for cache manager reports | |
124 | template<class Value> void statOut(std::ostream &, int localProcessId, int remoteProcessId) const; | |
125 | ||
9a51593d | 126 | private: |
d8ee9e8d EB |
127 | void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const; |
128 | void statClose(std::ostream &) const; | |
129 | template<class Value> void statSamples(std::ostream &, unsigned int start, uint32_t size) const; | |
130 | template<class Value> void statRange(std::ostream &, unsigned int start, uint32_t n) const; | |
b2aa0934 | 131 | |
d8ee9e8d EB |
132 | // optimization: these non-std::atomic data members are in shared memory, |
133 | // but each is used only by one process (aside from obscured reporting) | |
134 | unsigned int theIn; ///< current push() position; reporting aside, used only in push() | |
135 | unsigned int theOut; ///< current pop() position; reporting aside, used only in pop()/peek() | |
68353d5a | 136 | |
6c6a656a | 137 | std::atomic<uint32_t> theSize; ///< number of items in the queue |
f5591061 | 138 | const unsigned int theMaxItemSize; ///< maximum item size |
6c6a656a | 139 | const uint32_t theCapacity; ///< maximum number of items, i.e. theBuffer size |
7a907247 | 140 | |
f5591061 DK |
141 | char theBuffer[]; |
142 | }; | |
68353d5a | 143 | |
f5591061 | 144 | /// shared array of OneToOneUniQueues |
9199139f AR |
145 | class OneToOneUniQueues |
146 | { | |
f5591061 DK |
147 | public: |
148 | OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity); | |
68353d5a | 149 | |
f5591061 DK |
150 | size_t sharedMemorySize() const; |
151 | static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity); | |
9a51593d | 152 | |
f5591061 DK |
153 | const OneToOneUniQueue &operator [](const int index) const; |
154 | inline OneToOneUniQueue &operator [](const int index); | |
9a51593d | 155 | |
f5591061 DK |
156 | private: |
157 | inline const OneToOneUniQueue &front() const; | |
fa61cefe | 158 | |
f5591061 DK |
159 | public: |
160 | const int theCapacity; /// number of OneToOneUniQueues | |
9a51593d DK |
161 | }; |
162 | ||
807feb1d DK |
163 | /** |
164 | * Base class for lockless fixed-capacity bidirectional queues for a | |
165 | * limited number processes. | |
166 | */ | |
167 | class BaseMultiQueue | |
168 | { | |
169 | public: | |
170 | BaseMultiQueue(const int aLocalProcessId); | |
4f8892c3 | 171 | virtual ~BaseMultiQueue() {} |
807feb1d DK |
172 | |
173 | /// clears the reader notification received by the local process from the remote process | |
174 | void clearReaderSignal(const int remoteProcessId); | |
175 | ||
5faec1a1 EB |
176 | /// clears all reader notifications received by the local process |
177 | void clearAllReaderSignals(); | |
178 | ||
807feb1d DK |
179 | /// picks a process and calls OneToOneUniQueue::pop() using its queue |
180 | template <class Value> bool pop(int &remoteProcessId, Value &value); | |
181 | ||
182 | /// calls OneToOneUniQueue::push() using the given process queue | |
183 | template <class Value> bool push(const int remoteProcessId, const Value &value); | |
184 | ||
185 | /// peeks at the item likely to be pop()ed next | |
186 | template<class Value> bool peek(int &remoteProcessId, Value &value) const; | |
187 | ||
d8ee9e8d EB |
188 | /// prints current state; suitable for cache manager reports |
189 | template<class Value> void stat(std::ostream &) const; | |
190 | ||
807feb1d DK |
191 | /// returns local reader's balance |
192 | QueueReader::Balance &localBalance() { return localReader().balance; } | |
193 | ||
194 | /// returns reader's balance for a given remote process | |
195 | const QueueReader::Balance &balance(const int remoteProcessId) const; | |
196 | ||
197 | /// returns local reader's rate limit | |
198 | QueueReader::Rate &localRateLimit() { return localReader().rateLimit; } | |
199 | ||
200 | /// returns reader's rate limit for a given remote process | |
201 | const QueueReader::Rate &rateLimit(const int remoteProcessId) const; | |
202 | ||
203 | /// number of items in incoming queue from a given remote process | |
204 | int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); } | |
205 | ||
206 | /// number of items in outgoing queue to a given remote process | |
207 | int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); } | |
208 | ||
209 | protected: | |
210 | /// incoming queue from a given remote process | |
211 | virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0; | |
212 | OneToOneUniQueue &inQueue(const int remoteProcessId); | |
213 | ||
214 | /// outgoing queue to a given remote process | |
215 | virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0; | |
216 | OneToOneUniQueue &outQueue(const int remoteProcessId); | |
217 | ||
218 | virtual const QueueReader &localReader() const = 0; | |
219 | QueueReader &localReader(); | |
220 | ||
221 | virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0; | |
222 | QueueReader &remoteReader(const int remoteProcessId); | |
223 | ||
224 | virtual int remotesCount() const = 0; | |
225 | virtual int remotesIdOffset() const = 0; | |
226 | ||
227 | protected: | |
228 | const int theLocalProcessId; ///< process ID of this queue | |
229 | ||
230 | private: | |
231 | int theLastPopProcessId; ///< the ID of the last process we tried to pop() from | |
232 | }; | |
233 | ||
9a51593d DK |
234 | /** |
235 | * Lockless fixed-capacity bidirectional queue for a limited number | |
f5591061 DK |
236 | * processes. Allows communication between two groups of processes: |
237 | * any process in one group may send data to and receive from any | |
238 | * process in another group, but processes in the same group can not | |
239 | * communicate. Process in each group has a unique integer ID in | |
240 | * [groupIdOffset, groupIdOffset + groupSize) range. | |
9a51593d | 241 | */ |
807feb1d | 242 | class FewToFewBiQueue: public BaseMultiQueue |
9199139f | 243 | { |
9a51593d | 244 | public: |
f5591061 DK |
245 | typedef OneToOneUniQueue::Full Full; |
246 | typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; | |
247 | ||
248 | private: | |
249 | /// Shared metadata for FewToFewBiQueue | |
250 | struct Metadata { | |
251 | Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset); | |
252 | size_t sharedMemorySize() const { return sizeof(*this); } | |
253 | static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); } | |
254 | ||
255 | const int theGroupASize; | |
256 | const int theGroupAIdOffset; | |
257 | const int theGroupBSize; | |
258 | const int theGroupBIdOffset; | |
259 | }; | |
7a907247 | 260 | |
f5591061 | 261 | public: |
9199139f AR |
262 | class Owner |
263 | { | |
68353d5a | 264 | public: |
f5591061 | 265 | Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); |
68353d5a DK |
266 | ~Owner(); |
267 | ||
268 | private: | |
f5591061 DK |
269 | Mem::Owner<Metadata> *const metadataOwner; |
270 | Mem::Owner<OneToOneUniQueues> *const queuesOwner; | |
15cdbc7c | 271 | Mem::Owner<QueueReaders> *const readersOwner; |
68353d5a DK |
272 | }; |
273 | ||
f5591061 | 274 | static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); |
68353d5a | 275 | |
f5591061 DK |
276 | enum Group { groupA = 0, groupB = 1 }; |
277 | FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId); | |
9a51593d | 278 | |
ea2cdeb6 DK |
279 | /// maximum number of items in the queue |
280 | static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity); | |
281 | ||
807feb1d DK |
282 | /// finds the oldest item in incoming and outgoing queues between |
283 | /// us and the given remote process | |
284 | template<class Value> bool findOldest(const int remoteProcessId, Value &value) const; | |
285 | ||
286 | protected: | |
337b9aa4 AR |
287 | const OneToOneUniQueue &inQueue(const int remoteProcessId) const override; |
288 | const OneToOneUniQueue &outQueue(const int remoteProcessId) const override; | |
289 | const QueueReader &localReader() const override; | |
290 | const QueueReader &remoteReader(const int processId) const override; | |
291 | int remotesCount() const override; | |
292 | int remotesIdOffset() const override; | |
807feb1d DK |
293 | |
294 | private: | |
295 | bool validProcessId(const Group group, const int processId) const; | |
296 | int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; | |
297 | const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; | |
298 | int readerIndex(const Group group, const int processId) const; | |
f5591061 DK |
299 | Group localGroup() const { return theLocalGroup; } |
300 | Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; } | |
9a51593d | 301 | |
807feb1d DK |
302 | private: |
303 | const Mem::Pointer<Metadata> metadata; ///< shared metadata | |
304 | const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues | |
305 | const Mem::Pointer<QueueReaders> readers; ///< readers array | |
fa61cefe | 306 | |
807feb1d DK |
307 | const Group theLocalGroup; ///< group of this queue |
308 | }; | |
fa61cefe | 309 | |
807feb1d DK |
310 | /** |
311 | * Lockless fixed-capacity bidirectional queue for a limited number | |
312 | * processes. Any process may send data to and receive from any other | |
313 | * process (including itself). Each process has a unique integer ID in | |
314 | * [processIdOffset, processIdOffset + processCount) range. | |
315 | */ | |
316 | class MultiQueue: public BaseMultiQueue | |
317 | { | |
318 | public: | |
319 | typedef OneToOneUniQueue::Full Full; | |
320 | typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; | |
0a11e039 | 321 | |
807feb1d DK |
322 | private: |
323 | /// Shared metadata for MultiQueue | |
324 | struct Metadata { | |
325 | Metadata(const int aProcessCount, const int aProcessIdOffset); | |
326 | size_t sharedMemorySize() const { return sizeof(*this); } | |
327 | static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); } | |
df881a0f | 328 | |
807feb1d DK |
329 | const int theProcessCount; |
330 | const int theProcessIdOffset; | |
331 | }; | |
df881a0f | 332 | |
807feb1d DK |
333 | public: |
334 | class Owner | |
335 | { | |
336 | public: | |
337 | Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity); | |
338 | ~Owner(); | |
55939a01 | 339 | |
807feb1d DK |
340 | private: |
341 | Mem::Owner<Metadata> *const metadataOwner; | |
342 | Mem::Owner<OneToOneUniQueues> *const queuesOwner; | |
343 | Mem::Owner<QueueReaders> *const readersOwner; | |
344 | }; | |
df881a0f | 345 | |
807feb1d | 346 | static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity); |
55939a01 | 347 | |
807feb1d | 348 | MultiQueue(const String &id, const int localProcessId); |
55939a01 | 349 | |
807feb1d | 350 | protected: |
337b9aa4 AR |
351 | const OneToOneUniQueue &inQueue(const int remoteProcessId) const override; |
352 | const OneToOneUniQueue &outQueue(const int remoteProcessId) const override; | |
353 | const QueueReader &localReader() const override; | |
354 | const QueueReader &remoteReader(const int remoteProcessId) const override; | |
355 | int remotesCount() const override; | |
356 | int remotesIdOffset() const override; | |
55939a01 | 357 | |
f5591061 | 358 | private: |
807feb1d DK |
359 | bool validProcessId(const int processId) const; |
360 | const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const; | |
361 | const QueueReader &reader(const int processId) const; | |
fa61cefe | 362 | |
f5591061 DK |
363 | private: |
364 | const Mem::Pointer<Metadata> metadata; ///< shared metadata | |
365 | const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues | |
15cdbc7c | 366 | const Mem::Pointer<QueueReaders> readers; ///< readers array |
9a51593d DK |
367 | }; |
368 | ||
9a51593d DK |
369 | // OneToOneUniQueue |
370 | ||
9a51593d DK |
371 | template <class Value> |
372 | bool | |
f5591061 | 373 | OneToOneUniQueue::pop(Value &value, QueueReader *const reader) |
9a51593d | 374 | { |
f5591061 | 375 | if (sizeof(value) > theMaxItemSize) |
b2aa0934 DK |
376 | throw ItemTooLarge(); |
377 | ||
fa61cefe AR |
378 | // A writer might push between the empty test and block() below, so we do |
379 | // not return false right after calling block(), but test again. | |
380 | if (empty()) { | |
f5591061 DK |
381 | if (!reader) |
382 | return false; | |
383 | ||
384 | reader->block(); | |
fa61cefe AR |
385 | // A writer might push between the empty test and block() below, |
386 | // so we must test again as such a writer will not signal us. | |
387 | if (empty()) | |
388 | return false; | |
389 | } | |
9a51593d | 390 | |
f5591061 DK |
391 | if (reader) |
392 | reader->unblock(); | |
393 | ||
394 | const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize; | |
395 | memcpy(&value, theBuffer + pos, sizeof(value)); | |
396 | --theSize; | |
fa61cefe AR |
397 | |
398 | return true; | |
9a51593d DK |
399 | } |
400 | ||
0a11e039 AR |
401 | template <class Value> |
402 | bool | |
403 | OneToOneUniQueue::peek(Value &value) const | |
404 | { | |
405 | if (sizeof(value) > theMaxItemSize) | |
406 | throw ItemTooLarge(); | |
407 | ||
408 | if (empty()) | |
409 | return false; | |
410 | ||
411 | // the reader may pop() before we copy; making this method imprecise | |
412 | const unsigned int pos = (theOut % theCapacity) * theMaxItemSize; | |
413 | memcpy(&value, theBuffer + pos, sizeof(value)); | |
414 | return true; | |
415 | } | |
416 | ||
9a51593d DK |
417 | template <class Value> |
418 | bool | |
f5591061 | 419 | OneToOneUniQueue::push(const Value &value, QueueReader *const reader) |
9a51593d | 420 | { |
f5591061 | 421 | if (sizeof(value) > theMaxItemSize) |
b2aa0934 DK |
422 | throw ItemTooLarge(); |
423 | ||
9a51593d | 424 | if (full()) |
7a907247 | 425 | throw Full(); |
9a51593d | 426 | |
f5591061 DK |
427 | const unsigned int pos = theIn++ % theCapacity * theMaxItemSize; |
428 | memcpy(theBuffer + pos, &value, sizeof(value)); | |
ad40da43 | 429 | const bool wasEmpty = !theSize++; |
f5591061 DK |
430 | |
431 | return wasEmpty && (!reader || reader->raiseSignal()); | |
432 | } | |
433 | ||
d8ee9e8d EB |
434 | template <class Value> |
435 | void | |
436 | OneToOneUniQueue::statIn(std::ostream &os, const int localProcessId, const int remoteProcessId) const | |
437 | { | |
438 | os << " kid" << localProcessId << " receiving from kid" << remoteProcessId << ": "; | |
439 | // Nobody can modify our theOut so, after capturing some valid theSize value | |
440 | // in count, we can reliably report all [theOut, theOut+count) items that | |
441 | // were queued at theSize capturing time. We will miss new items push()ed by | |
442 | // the other side, but it is OK -- we report state at the capturing time. | |
443 | const auto count = theSize.load(); | |
444 | statOpen(os, "other", "popIndex", count); | |
445 | statSamples<Value>(os, theOut, count); | |
446 | statClose(os); | |
447 | } | |
448 | ||
449 | template <class Value> | |
450 | void | |
451 | OneToOneUniQueue::statOut(std::ostream &os, const int localProcessId, const int remoteProcessId) const | |
452 | { | |
453 | os << " kid" << localProcessId << " sending to kid" << remoteProcessId << ": "; | |
454 | // Nobody can modify our theIn so, after capturing some valid theSize value | |
455 | // in count, we can reliably report all [theIn-count, theIn) items that were | |
456 | // queued at theSize capturing time. We may report items already pop()ed by | |
457 | // the other side, but that is OK because pop() does not modify items -- it | |
458 | // only increments theOut. | |
459 | const auto count = theSize.load(); | |
460 | statOpen(os, "pushIndex", "other", count); | |
461 | statSamples<Value>(os, theIn - count, count); // unsigned offset underflow OK | |
462 | statClose(os); | |
463 | } | |
464 | ||
465 | /// report a sample of [start, start + size) items | |
466 | template <class Value> | |
467 | void | |
468 | OneToOneUniQueue::statSamples(std::ostream &os, const unsigned int start, const uint32_t count) const | |
469 | { | |
470 | if (!count) { | |
471 | os << " "; | |
472 | return; | |
473 | } | |
474 | ||
475 | os << ", items: [\n"; | |
476 | // report a few leading and trailing items, without repetitions | |
477 | const auto sampleSize = std::min(3U, count); // leading (and max) sample | |
478 | statRange<Value>(os, start, sampleSize); | |
479 | if (sampleSize < count) { // the first sample did not show some items | |
480 | // The `start` offset aside, the first sample reported all items | |
481 | // below the sampleSize offset. The second sample needs to report | |
482 | // the last sampleSize items (i.e. starting at count-sampleSize | |
483 | // offset) except those already reported by the first sample. | |
484 | const auto secondSampleOffset = std::max(sampleSize, count - sampleSize); | |
485 | const auto secondSampleSize = std::min(sampleSize, count - sampleSize); | |
486 | ||
487 | // but first we print a sample separator, unless there are no items | |
488 | // between the samples or the separator hides the only unsampled item | |
489 | const auto bothSamples = sampleSize + secondSampleSize; | |
490 | if (bothSamples + 1U == count) | |
491 | statRange<Value>(os, start + sampleSize, 1); | |
492 | else if (count > bothSamples) | |
493 | os << " # ... " << (count - bothSamples) << " items not shown ...\n"; | |
494 | ||
495 | statRange<Value>(os, start + secondSampleOffset, secondSampleSize); | |
496 | } | |
497 | os << " ]"; | |
498 | } | |
499 | ||
500 | /// statSamples() helper that reports n items from start | |
501 | template <class Value> | |
502 | void | |
503 | OneToOneUniQueue::statRange(std::ostream &os, const unsigned int start, const uint32_t n) const | |
504 | { | |
505 | assert(sizeof(Value) <= theMaxItemSize); | |
506 | auto offset = start; | |
507 | for (uint32_t i = 0; i < n; ++i) { | |
508 | // XXX: Throughout this C++ header, these overflow wrapping tricks work | |
509 | // only because theCapacity currently happens to be a power of 2 (e.g., | |
510 | // the highest offset (0xF...FFF) % 3 is 0 and so is the next offset). | |
511 | const auto pos = (offset++ % theCapacity) * theMaxItemSize; | |
512 | Value value; | |
513 | memcpy(&value, theBuffer + pos, sizeof(value)); | |
514 | os << " { "; | |
515 | value.stat(os); | |
516 | os << " },\n"; | |
517 | } | |
518 | } | |
519 | ||
f5591061 DK |
520 | // OneToOneUniQueues |
521 | ||
522 | inline OneToOneUniQueue & | |
523 | OneToOneUniQueues::operator [](const int index) | |
524 | { | |
525 | return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]); | |
526 | } | |
527 | ||
528 | inline const OneToOneUniQueue & | |
529 | OneToOneUniQueues::front() const | |
530 | { | |
531 | const char *const queue = | |
532 | reinterpret_cast<const char *>(this) + sizeof(*this); | |
533 | return *reinterpret_cast<const OneToOneUniQueue *>(queue); | |
9a51593d DK |
534 | } |
535 | ||
807feb1d | 536 | // BaseMultiQueue |
9a51593d | 537 | |
9a51593d DK |
538 | template <class Value> |
539 | bool | |
807feb1d | 540 | BaseMultiQueue::pop(int &remoteProcessId, Value &value) |
9a51593d | 541 | { |
807feb1d DK |
542 | // iterate all remote processes, starting after the one we visited last |
543 | for (int i = 0; i < remotesCount(); ++i) { | |
544 | if (++theLastPopProcessId >= remotesIdOffset() + remotesCount()) | |
545 | theLastPopProcessId = remotesIdOffset(); | |
546 | OneToOneUniQueue &queue = inQueue(theLastPopProcessId); | |
547 | if (queue.pop(value, &localReader())) { | |
f5591061 | 548 | remoteProcessId = theLastPopProcessId; |
bf95c10a | 549 | debugs(54, 7, "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size()); |
fa61cefe AR |
550 | return true; |
551 | } | |
9a51593d | 552 | } |
f5591061 | 553 | return false; // no process had anything to pop |
9a51593d DK |
554 | } |
555 | ||
556 | template <class Value> | |
557 | bool | |
807feb1d | 558 | BaseMultiQueue::push(const int remoteProcessId, const Value &value) |
9a51593d | 559 | { |
807feb1d DK |
560 | OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId); |
561 | QueueReader &reader = remoteReader(remoteProcessId); | |
bf95c10a | 562 | debugs(54, 7, "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size()); |
807feb1d | 563 | return remoteQueue.push(value, &reader); |
9a51593d DK |
564 | } |
565 | ||
807feb1d DK |
566 | template <class Value> |
567 | bool | |
568 | BaseMultiQueue::peek(int &remoteProcessId, Value &value) const | |
569 | { | |
570 | // mimic FewToFewBiQueue::pop() but quit just before popping | |
571 | int popProcessId = theLastPopProcessId; // preserve for future pop() | |
572 | for (int i = 0; i < remotesCount(); ++i) { | |
573 | if (++popProcessId >= remotesIdOffset() + remotesCount()) | |
574 | popProcessId = remotesIdOffset(); | |
575 | const OneToOneUniQueue &queue = inQueue(popProcessId); | |
576 | if (queue.peek(value)) { | |
577 | remoteProcessId = popProcessId; | |
578 | return true; | |
579 | } | |
580 | } | |
581 | return false; // most likely, no process had anything to pop | |
582 | } | |
583 | ||
d8ee9e8d EB |
584 | template <class Value> |
585 | void | |
586 | BaseMultiQueue::stat(std::ostream &os) const | |
587 | { | |
588 | for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) { | |
589 | const auto &queue = inQueue(processId); | |
590 | queue.statIn<Value>(os, theLocalProcessId, processId); | |
591 | } | |
592 | ||
593 | os << "\n"; | |
594 | ||
595 | for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) { | |
596 | const auto &queue = outQueue(processId); | |
597 | queue.statOut<Value>(os, theLocalProcessId, processId); | |
598 | } | |
5faec1a1 EB |
599 | |
600 | os << "\n"; | |
601 | ||
602 | const auto &reader = localReader(); | |
603 | os << " kid" << theLocalProcessId << " reader flags: " << | |
604 | "{ blocked: " << reader.blocked() << ", signaled: " << reader.signaled() << " }\n"; | |
d8ee9e8d EB |
605 | } |
606 | ||
807feb1d DK |
607 | // FewToFewBiQueue |
608 | ||
0a11e039 AR |
609 | template <class Value> |
610 | bool | |
5aac671b | 611 | FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const |
0a11e039 AR |
612 | { |
613 | // we may be called before remote process configured its queue end | |
614 | if (!validProcessId(remoteGroup(), remoteProcessId)) | |
615 | return false; | |
616 | ||
b8c75806 | 617 | // we need the oldest value, so start with the incoming, them-to-us queue: |
55939a01 | 618 | const OneToOneUniQueue &in = inQueue(remoteProcessId); |
bf95c10a | 619 | debugs(54, 2, "peeking from " << remoteProcessId << " to " << |
55939a01 AR |
620 | theLocalProcessId << " at " << in.size()); |
621 | if (in.peek(value)) | |
b8c75806 AR |
622 | return true; |
623 | ||
624 | // if the incoming queue is empty, check the outgoing, us-to-them queue: | |
55939a01 | 625 | const OneToOneUniQueue &out = outQueue(remoteProcessId); |
bf95c10a | 626 | debugs(54, 2, "peeking from " << theLocalProcessId << " to " << |
55939a01 AR |
627 | remoteProcessId << " at " << out.size()); |
628 | return out.peek(value); | |
0a11e039 AR |
629 | } |
630 | ||
15cdbc7c DK |
631 | } // namespace Ipc |
632 | ||
ff9d9458 | 633 | #endif /* SQUID_SRC_IPC_QUEUE_H */ |
f53969cc | 634 |