]>
Commit | Line | Data |
---|---|---|
9a51593d | 1 | /* |
4ac4a490 | 2 | * Copyright (C) 1996-2017 The Squid Software Foundation and contributors |
bbc27441 AJ |
3 | * |
4 | * Squid software is distributed under GPLv2+ license and includes | |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
9a51593d DK |
7 | */ |
8 | ||
9 | #ifndef SQUID_IPC_QUEUE_H | |
10 | #define SQUID_IPC_QUEUE_H | |
11 | ||
602d9612 | 12 | #include "base/InstanceId.h" |
d5d5493b | 13 | #include "Debug.h" |
3a8c5551 | 14 | #include "ipc/mem/FlexibleArray.h" |
68353d5a | 15 | #include "ipc/mem/Pointer.h" |
9a51593d DK |
16 | #include "util.h" |
17 | ||
6c6a656a AJ |
18 | #include <atomic> |
19 | ||
b2aa0934 DK |
20 | class String; |
21 | ||
9199139f AR |
22 | namespace Ipc |
23 | { | |
15cdbc7c | 24 | |
fa61cefe AR |
25 | /// State of the reading end of a queue (i.e., of the code calling pop()). |
26 | /// Multiple queues attached to one reader share this state. | |
9199139f AR |
27 | class QueueReader |
28 | { | |
fa61cefe AR |
29 | public: |
30 | QueueReader(); // the initial state is "blocked without a signal" | |
31 | ||
32 | /// whether the reader is waiting for a notification signal | |
6c6a656a | 33 | bool blocked() const { return popBlocked.load(); } |
fa61cefe AR |
34 | |
35 | /// marks the reader as blocked, waiting for a notification signal | |
6c6a656a | 36 | void block() { popBlocked.store(true); } |
fa61cefe AR |
37 | |
38 | /// removes the block() effects | |
6c6a656a | 39 | void unblock() { popBlocked.store(false); } |
fa61cefe AR |
40 | |
41 | /// if reader is blocked and not notified, marks the notification signal | |
42 | /// as sent and not received, returning true; otherwise, returns false | |
6c6a656a | 43 | bool raiseSignal() { return blocked() && !popSignal.exchange(true); } |
fa61cefe AR |
44 | |
45 | /// marks sent reader notification as received (also removes pop blocking) | |
6c6a656a | 46 | void clearSignal() { unblock(); popSignal.store(false); } |
fa61cefe AR |
47 | |
48 | private: | |
6c6a656a AJ |
49 | std::atomic<bool> popBlocked; ///< whether the reader is blocked on pop() |
50 | std::atomic<bool> popSignal; ///< whether writer has sent and reader has not received notification | |
fa61cefe AR |
51 | |
52 | public: | |
6c6a656a | 53 | typedef std::atomic<int> Rate; ///< pop()s per second |
df881a0f AR |
54 | Rate rateLimit; ///< pop()s per second limit if positive |
55 | ||
e9adbbd7 | 56 | // we need a signed atomic type because balance may get negative |
6c6a656a | 57 | typedef std::atomic<int> AtomicSignedMsec; |
df881a0f AR |
58 | typedef AtomicSignedMsec Balance; |
59 | /// how far ahead the reader is compared to a perfect read/sec event rate | |
60 | Balance balance; | |
61 | ||
fa61cefe AR |
62 | /// unique ID for debugging which reader is used (works across processes) |
63 | const InstanceId<QueueReader> id; | |
64 | }; | |
65 | ||
68353d5a | 66 | /// shared array of QueueReaders |
9199139f AR |
67 | class QueueReaders |
68 | { | |
68353d5a DK |
69 | public: |
70 | QueueReaders(const int aCapacity); | |
71 | size_t sharedMemorySize() const; | |
72 | static size_t SharedMemorySize(const int capacity); | |
73 | ||
74 | const int theCapacity; /// number of readers | |
3a8c5551 | 75 | Ipc::Mem::FlexibleArray<QueueReader> theReaders; /// readers |
68353d5a | 76 | }; |
fa61cefe AR |
77 | |
78 | /** | |
79 | * Lockless fixed-capacity queue for a single writer and a single reader. | |
80 | * | |
81 | * If the queue is empty, the reader is considered "blocked" and needs | |
82 | * an out-of-band notification message to notice the next pushed item. | |
83 | * | |
84 | * Current implementation assumes that the writer cannot get blocked: if the | |
85 | * queue is full, the writer will just not push and come back later (with a | |
86 | * different value). We can add support for blocked writers if needed. | |
87 | */ | |
9199139f AR |
88 | class OneToOneUniQueue |
89 | { | |
9a51593d | 90 | public: |
fa61cefe | 91 | // pop() and push() exceptions; TODO: use TextException instead |
7a907247 | 92 | class Full {}; |
b2aa0934 DK |
93 | class ItemTooLarge {}; |
94 | ||
f5591061 | 95 | OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity); |
68353d5a | 96 | |
f5591061 DK |
97 | unsigned int maxItemSize() const { return theMaxItemSize; } |
98 | int size() const { return theSize; } | |
99 | int capacity() const { return theCapacity; } | |
100 | int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); } | |
9a51593d | 101 | |
f5591061 DK |
102 | bool empty() const { return !theSize; } |
103 | bool full() const { return theSize == theCapacity; } | |
9a51593d | 104 | |
b2aa0934 DK |
105 | static int Bytes2Items(const unsigned int maxItemSize, int size); |
106 | static int Items2Bytes(const unsigned int maxItemSize, const int size); | |
107 | ||
fa61cefe | 108 | /// returns true iff the value was set; [un]blocks the reader as needed |
f5591061 | 109 | template<class Value> bool pop(Value &value, QueueReader *const reader = NULL); |
fa61cefe AR |
110 | |
111 | /// returns true iff the caller must notify the reader of the pushed item | |
f5591061 | 112 | template<class Value> bool push(const Value &value, QueueReader *const reader = NULL); |
9a51593d | 113 | |
0a11e039 AR |
114 | /// returns true iff the value was set; the value may be stale! |
115 | template<class Value> bool peek(Value &value) const; | |
116 | ||
9a51593d | 117 | private: |
b2aa0934 | 118 | |
f5591061 DK |
119 | unsigned int theIn; ///< input index, used only in push() |
120 | unsigned int theOut; ///< output index, used only in pop() | |
68353d5a | 121 | |
6c6a656a | 122 | std::atomic<uint32_t> theSize; ///< number of items in the queue |
f5591061 | 123 | const unsigned int theMaxItemSize; ///< maximum item size |
6c6a656a | 124 | const uint32_t theCapacity; ///< maximum number of items, i.e. theBuffer size |
7a907247 | 125 | |
f5591061 DK |
126 | char theBuffer[]; |
127 | }; | |
68353d5a | 128 | |
f5591061 | 129 | /// shared array of OneToOneUniQueues |
9199139f AR |
130 | class OneToOneUniQueues |
131 | { | |
f5591061 DK |
132 | public: |
133 | OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity); | |
68353d5a | 134 | |
f5591061 DK |
135 | size_t sharedMemorySize() const; |
136 | static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity); | |
9a51593d | 137 | |
f5591061 DK |
138 | const OneToOneUniQueue &operator [](const int index) const; |
139 | inline OneToOneUniQueue &operator [](const int index); | |
9a51593d | 140 | |
f5591061 DK |
141 | private: |
142 | inline const OneToOneUniQueue &front() const; | |
fa61cefe | 143 | |
f5591061 DK |
144 | public: |
145 | const int theCapacity; /// number of OneToOneUniQueues | |
9a51593d DK |
146 | }; |
147 | ||
807feb1d DK |
148 | /** |
149 | * Base class for lockless fixed-capacity bidirectional queues for a | |
150 | * limited number processes. | |
151 | */ | |
152 | class BaseMultiQueue | |
153 | { | |
154 | public: | |
155 | BaseMultiQueue(const int aLocalProcessId); | |
4f8892c3 | 156 | virtual ~BaseMultiQueue() {} |
807feb1d DK |
157 | |
158 | /// clears the reader notification received by the local process from the remote process | |
159 | void clearReaderSignal(const int remoteProcessId); | |
160 | ||
161 | /// picks a process and calls OneToOneUniQueue::pop() using its queue | |
162 | template <class Value> bool pop(int &remoteProcessId, Value &value); | |
163 | ||
164 | /// calls OneToOneUniQueue::push() using the given process queue | |
165 | template <class Value> bool push(const int remoteProcessId, const Value &value); | |
166 | ||
167 | /// peeks at the item likely to be pop()ed next | |
168 | template<class Value> bool peek(int &remoteProcessId, Value &value) const; | |
169 | ||
170 | /// returns local reader's balance | |
171 | QueueReader::Balance &localBalance() { return localReader().balance; } | |
172 | ||
173 | /// returns reader's balance for a given remote process | |
174 | const QueueReader::Balance &balance(const int remoteProcessId) const; | |
175 | ||
176 | /// returns local reader's rate limit | |
177 | QueueReader::Rate &localRateLimit() { return localReader().rateLimit; } | |
178 | ||
179 | /// returns reader's rate limit for a given remote process | |
180 | const QueueReader::Rate &rateLimit(const int remoteProcessId) const; | |
181 | ||
182 | /// number of items in incoming queue from a given remote process | |
183 | int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); } | |
184 | ||
185 | /// number of items in outgoing queue to a given remote process | |
186 | int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); } | |
187 | ||
188 | protected: | |
189 | /// incoming queue from a given remote process | |
190 | virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0; | |
191 | OneToOneUniQueue &inQueue(const int remoteProcessId); | |
192 | ||
193 | /// outgoing queue to a given remote process | |
194 | virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0; | |
195 | OneToOneUniQueue &outQueue(const int remoteProcessId); | |
196 | ||
197 | virtual const QueueReader &localReader() const = 0; | |
198 | QueueReader &localReader(); | |
199 | ||
200 | virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0; | |
201 | QueueReader &remoteReader(const int remoteProcessId); | |
202 | ||
203 | virtual int remotesCount() const = 0; | |
204 | virtual int remotesIdOffset() const = 0; | |
205 | ||
206 | protected: | |
207 | const int theLocalProcessId; ///< process ID of this queue | |
208 | ||
209 | private: | |
210 | int theLastPopProcessId; ///< the ID of the last process we tried to pop() from | |
211 | }; | |
212 | ||
9a51593d DK |
213 | /** |
214 | * Lockless fixed-capacity bidirectional queue for a limited number | |
f5591061 DK |
215 | * processes. Allows communication between two groups of processes: |
216 | * any process in one group may send data to and receive from any | |
217 | * process in another group, but processes in the same group can not | |
218 | * communicate. Process in each group has a unique integer ID in | |
219 | * [groupIdOffset, groupIdOffset + groupSize) range. | |
9a51593d | 220 | */ |
807feb1d | 221 | class FewToFewBiQueue: public BaseMultiQueue |
9199139f | 222 | { |
9a51593d | 223 | public: |
f5591061 DK |
224 | typedef OneToOneUniQueue::Full Full; |
225 | typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; | |
226 | ||
227 | private: | |
228 | /// Shared metadata for FewToFewBiQueue | |
229 | struct Metadata { | |
230 | Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset); | |
231 | size_t sharedMemorySize() const { return sizeof(*this); } | |
232 | static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); } | |
233 | ||
234 | const int theGroupASize; | |
235 | const int theGroupAIdOffset; | |
236 | const int theGroupBSize; | |
237 | const int theGroupBIdOffset; | |
238 | }; | |
7a907247 | 239 | |
f5591061 | 240 | public: |
9199139f AR |
241 | class Owner |
242 | { | |
68353d5a | 243 | public: |
f5591061 | 244 | Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); |
68353d5a DK |
245 | ~Owner(); |
246 | ||
247 | private: | |
f5591061 DK |
248 | Mem::Owner<Metadata> *const metadataOwner; |
249 | Mem::Owner<OneToOneUniQueues> *const queuesOwner; | |
15cdbc7c | 250 | Mem::Owner<QueueReaders> *const readersOwner; |
68353d5a DK |
251 | }; |
252 | ||
f5591061 | 253 | static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); |
68353d5a | 254 | |
f5591061 DK |
255 | enum Group { groupA = 0, groupB = 1 }; |
256 | FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId); | |
9a51593d | 257 | |
ea2cdeb6 DK |
258 | /// maximum number of items in the queue |
259 | static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity); | |
260 | ||
807feb1d DK |
261 | /// finds the oldest item in incoming and outgoing queues between |
262 | /// us and the given remote process | |
263 | template<class Value> bool findOldest(const int remoteProcessId, Value &value) const; | |
264 | ||
265 | protected: | |
266 | virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const; | |
267 | virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const; | |
268 | virtual const QueueReader &localReader() const; | |
269 | virtual const QueueReader &remoteReader(const int processId) const; | |
270 | virtual int remotesCount() const; | |
271 | virtual int remotesIdOffset() const; | |
272 | ||
273 | private: | |
274 | bool validProcessId(const Group group, const int processId) const; | |
275 | int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; | |
276 | const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; | |
277 | int readerIndex(const Group group, const int processId) const; | |
f5591061 DK |
278 | Group localGroup() const { return theLocalGroup; } |
279 | Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; } | |
9a51593d | 280 | |
807feb1d DK |
281 | private: |
282 | const Mem::Pointer<Metadata> metadata; ///< shared metadata | |
283 | const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues | |
284 | const Mem::Pointer<QueueReaders> readers; ///< readers array | |
fa61cefe | 285 | |
807feb1d DK |
286 | const Group theLocalGroup; ///< group of this queue |
287 | }; | |
fa61cefe | 288 | |
807feb1d DK |
289 | /** |
290 | * Lockless fixed-capacity bidirectional queue for a limited number | |
291 | * processes. Any process may send data to and receive from any other | |
292 | * process (including itself). Each process has a unique integer ID in | |
293 | * [processIdOffset, processIdOffset + processCount) range. | |
294 | */ | |
295 | class MultiQueue: public BaseMultiQueue | |
296 | { | |
297 | public: | |
298 | typedef OneToOneUniQueue::Full Full; | |
299 | typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; | |
0a11e039 | 300 | |
807feb1d DK |
301 | private: |
302 | /// Shared metadata for MultiQueue | |
303 | struct Metadata { | |
304 | Metadata(const int aProcessCount, const int aProcessIdOffset); | |
305 | size_t sharedMemorySize() const { return sizeof(*this); } | |
306 | static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); } | |
df881a0f | 307 | |
807feb1d DK |
308 | const int theProcessCount; |
309 | const int theProcessIdOffset; | |
310 | }; | |
df881a0f | 311 | |
807feb1d DK |
312 | public: |
313 | class Owner | |
314 | { | |
315 | public: | |
316 | Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity); | |
317 | ~Owner(); | |
55939a01 | 318 | |
807feb1d DK |
319 | private: |
320 | Mem::Owner<Metadata> *const metadataOwner; | |
321 | Mem::Owner<OneToOneUniQueues> *const queuesOwner; | |
322 | Mem::Owner<QueueReaders> *const readersOwner; | |
323 | }; | |
df881a0f | 324 | |
807feb1d | 325 | static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity); |
55939a01 | 326 | |
807feb1d | 327 | MultiQueue(const String &id, const int localProcessId); |
55939a01 | 328 | |
807feb1d DK |
329 | protected: |
330 | virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const; | |
331 | virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const; | |
332 | virtual const QueueReader &localReader() const; | |
333 | virtual const QueueReader &remoteReader(const int remoteProcessId) const; | |
334 | virtual int remotesCount() const; | |
335 | virtual int remotesIdOffset() const; | |
55939a01 | 336 | |
f5591061 | 337 | private: |
807feb1d DK |
338 | bool validProcessId(const int processId) const; |
339 | const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const; | |
340 | const QueueReader &reader(const int processId) const; | |
fa61cefe | 341 | |
f5591061 DK |
342 | private: |
343 | const Mem::Pointer<Metadata> metadata; ///< shared metadata | |
344 | const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues | |
15cdbc7c | 345 | const Mem::Pointer<QueueReaders> readers; ///< readers array |
9a51593d DK |
346 | }; |
347 | ||
9a51593d DK |
348 | // OneToOneUniQueue |
349 | ||
9a51593d DK |
350 | template <class Value> |
351 | bool | |
f5591061 | 352 | OneToOneUniQueue::pop(Value &value, QueueReader *const reader) |
9a51593d | 353 | { |
f5591061 | 354 | if (sizeof(value) > theMaxItemSize) |
b2aa0934 DK |
355 | throw ItemTooLarge(); |
356 | ||
fa61cefe AR |
357 | // A writer might push between the empty test and block() below, so we do |
358 | // not return false right after calling block(), but test again. | |
359 | if (empty()) { | |
f5591061 DK |
360 | if (!reader) |
361 | return false; | |
362 | ||
363 | reader->block(); | |
fa61cefe AR |
364 | // A writer might push between the empty test and block() below, |
365 | // so we must test again as such a writer will not signal us. | |
366 | if (empty()) | |
367 | return false; | |
368 | } | |
9a51593d | 369 | |
f5591061 DK |
370 | if (reader) |
371 | reader->unblock(); | |
372 | ||
373 | const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize; | |
374 | memcpy(&value, theBuffer + pos, sizeof(value)); | |
375 | --theSize; | |
fa61cefe AR |
376 | |
377 | return true; | |
9a51593d DK |
378 | } |
379 | ||
0a11e039 AR |
380 | template <class Value> |
381 | bool | |
382 | OneToOneUniQueue::peek(Value &value) const | |
383 | { | |
384 | if (sizeof(value) > theMaxItemSize) | |
385 | throw ItemTooLarge(); | |
386 | ||
387 | if (empty()) | |
388 | return false; | |
389 | ||
390 | // the reader may pop() before we copy; making this method imprecise | |
391 | const unsigned int pos = (theOut % theCapacity) * theMaxItemSize; | |
392 | memcpy(&value, theBuffer + pos, sizeof(value)); | |
393 | return true; | |
394 | } | |
395 | ||
9a51593d DK |
396 | template <class Value> |
397 | bool | |
f5591061 | 398 | OneToOneUniQueue::push(const Value &value, QueueReader *const reader) |
9a51593d | 399 | { |
f5591061 | 400 | if (sizeof(value) > theMaxItemSize) |
b2aa0934 DK |
401 | throw ItemTooLarge(); |
402 | ||
9a51593d | 403 | if (full()) |
7a907247 | 404 | throw Full(); |
9a51593d | 405 | |
f5591061 DK |
406 | const unsigned int pos = theIn++ % theCapacity * theMaxItemSize; |
407 | memcpy(theBuffer + pos, &value, sizeof(value)); | |
ad40da43 | 408 | const bool wasEmpty = !theSize++; |
f5591061 DK |
409 | |
410 | return wasEmpty && (!reader || reader->raiseSignal()); | |
411 | } | |
412 | ||
f5591061 DK |
413 | // OneToOneUniQueues |
414 | ||
415 | inline OneToOneUniQueue & | |
416 | OneToOneUniQueues::operator [](const int index) | |
417 | { | |
418 | return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]); | |
419 | } | |
420 | ||
421 | inline const OneToOneUniQueue & | |
422 | OneToOneUniQueues::front() const | |
423 | { | |
424 | const char *const queue = | |
425 | reinterpret_cast<const char *>(this) + sizeof(*this); | |
426 | return *reinterpret_cast<const OneToOneUniQueue *>(queue); | |
9a51593d DK |
427 | } |
428 | ||
807feb1d | 429 | // BaseMultiQueue |
9a51593d | 430 | |
9a51593d DK |
431 | template <class Value> |
432 | bool | |
807feb1d | 433 | BaseMultiQueue::pop(int &remoteProcessId, Value &value) |
9a51593d | 434 | { |
807feb1d DK |
435 | // iterate all remote processes, starting after the one we visited last |
436 | for (int i = 0; i < remotesCount(); ++i) { | |
437 | if (++theLastPopProcessId >= remotesIdOffset() + remotesCount()) | |
438 | theLastPopProcessId = remotesIdOffset(); | |
439 | OneToOneUniQueue &queue = inQueue(theLastPopProcessId); | |
440 | if (queue.pop(value, &localReader())) { | |
f5591061 DK |
441 | remoteProcessId = theLastPopProcessId; |
442 | debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size()); | |
fa61cefe AR |
443 | return true; |
444 | } | |
9a51593d | 445 | } |
f5591061 | 446 | return false; // no process had anything to pop |
9a51593d DK |
447 | } |
448 | ||
449 | template <class Value> | |
450 | bool | |
807feb1d | 451 | BaseMultiQueue::push(const int remoteProcessId, const Value &value) |
9a51593d | 452 | { |
807feb1d DK |
453 | OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId); |
454 | QueueReader &reader = remoteReader(remoteProcessId); | |
f5591061 | 455 | debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size()); |
807feb1d | 456 | return remoteQueue.push(value, &reader); |
9a51593d DK |
457 | } |
458 | ||
807feb1d DK |
459 | template <class Value> |
460 | bool | |
461 | BaseMultiQueue::peek(int &remoteProcessId, Value &value) const | |
462 | { | |
463 | // mimic FewToFewBiQueue::pop() but quit just before popping | |
464 | int popProcessId = theLastPopProcessId; // preserve for future pop() | |
465 | for (int i = 0; i < remotesCount(); ++i) { | |
466 | if (++popProcessId >= remotesIdOffset() + remotesCount()) | |
467 | popProcessId = remotesIdOffset(); | |
468 | const OneToOneUniQueue &queue = inQueue(popProcessId); | |
469 | if (queue.peek(value)) { | |
470 | remoteProcessId = popProcessId; | |
471 | return true; | |
472 | } | |
473 | } | |
474 | return false; // most likely, no process had anything to pop | |
475 | } | |
476 | ||
477 | // FewToFewBiQueue | |
478 | ||
0a11e039 AR |
479 | template <class Value> |
480 | bool | |
5aac671b | 481 | FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const |
0a11e039 AR |
482 | { |
483 | // we may be called before remote process configured its queue end | |
484 | if (!validProcessId(remoteGroup(), remoteProcessId)) | |
485 | return false; | |
486 | ||
b8c75806 | 487 | // we need the oldest value, so start with the incoming, them-to-us queue: |
55939a01 AR |
488 | const OneToOneUniQueue &in = inQueue(remoteProcessId); |
489 | debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " << | |
490 | theLocalProcessId << " at " << in.size()); | |
491 | if (in.peek(value)) | |
b8c75806 AR |
492 | return true; |
493 | ||
494 | // if the incoming queue is empty, check the outgoing, us-to-them queue: | |
55939a01 AR |
495 | const OneToOneUniQueue &out = outQueue(remoteProcessId); |
496 | debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " << | |
497 | remoteProcessId << " at " << out.size()); | |
498 | return out.peek(value); | |
0a11e039 AR |
499 | } |
500 | ||
15cdbc7c DK |
501 | } // namespace Ipc |
502 | ||
9a51593d | 503 | #endif // SQUID_IPC_QUEUE_H |
f53969cc | 504 |