]>
Commit | Line | Data |
---|---|---|
9a51593d | 1 | /* |
bbc27441 AJ |
2 | * Copyright (C) 1996-2014 The Squid Software Foundation and contributors |
3 | * | |
4 | * Squid software is distributed under GPLv2+ license and includes | |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
9a51593d DK |
7 | */ |
8 | ||
9 | #ifndef SQUID_IPC_QUEUE_H | |
10 | #define SQUID_IPC_QUEUE_H | |
11 | ||
602d9612 | 12 | #include "base/InstanceId.h" |
d5d5493b | 13 | #include "Debug.h" |
9a51593d | 14 | #include "ipc/AtomicWord.h" |
3a8c5551 | 15 | #include "ipc/mem/FlexibleArray.h" |
68353d5a | 16 | #include "ipc/mem/Pointer.h" |
9a51593d DK |
17 | #include "util.h" |
18 | ||
b2aa0934 DK |
19 | class String; |
20 | ||
9199139f AR |
21 | namespace Ipc |
22 | { | |
15cdbc7c | 23 | |
fa61cefe AR |
24 | /// State of the reading end of a queue (i.e., of the code calling pop()). |
25 | /// Multiple queues attached to one reader share this state. | |
9199139f AR |
26 | class QueueReader |
27 | { | |
fa61cefe AR |
28 | public: |
29 | QueueReader(); // the initial state is "blocked without a signal" | |
30 | ||
31 | /// whether the reader is waiting for a notification signal | |
32 | bool blocked() const { return popBlocked == 1; } | |
33 | ||
34 | /// marks the reader as blocked, waiting for a notification signal | |
35 | void block() { popBlocked.swap_if(0, 1); } | |
36 | ||
37 | /// removes the block() effects | |
38 | void unblock() { popBlocked.swap_if(1, 0); } | |
39 | ||
40 | /// if reader is blocked and not notified, marks the notification signal | |
41 | /// as sent and not received, returning true; otherwise, returns false | |
42 | bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); } | |
43 | ||
44 | /// marks sent reader notification as received (also removes pop blocking) | |
45 | void clearSignal() { unblock(); popSignal.swap_if(1,0); } | |
46 | ||
47 | private: | |
794d4c0c DK |
48 | Atomic::Word popBlocked; ///< whether the reader is blocked on pop() |
49 | Atomic::Word popSignal; ///< whether writer has sent and reader has not received notification | |
fa61cefe AR |
50 | |
51 | public: | |
794d4c0c | 52 | typedef Atomic::Word Rate; ///< pop()s per second |
df881a0f AR |
53 | Rate rateLimit; ///< pop()s per second limit if positive |
54 | ||
e9adbbd7 | 55 | // we need a signed atomic type because balance may get negative |
794d4c0c | 56 | typedef Atomic::WordT<int> AtomicSignedMsec; |
df881a0f AR |
57 | typedef AtomicSignedMsec Balance; |
58 | /// how far ahead the reader is compared to a perfect read/sec event rate | |
59 | Balance balance; | |
60 | ||
fa61cefe AR |
61 | /// unique ID for debugging which reader is used (works across processes) |
62 | const InstanceId<QueueReader> id; | |
63 | }; | |
64 | ||
68353d5a | 65 | /// shared array of QueueReaders |
9199139f AR |
66 | class QueueReaders |
67 | { | |
68353d5a DK |
68 | public: |
69 | QueueReaders(const int aCapacity); | |
70 | size_t sharedMemorySize() const; | |
71 | static size_t SharedMemorySize(const int capacity); | |
72 | ||
73 | const int theCapacity; /// number of readers | |
3a8c5551 | 74 | Ipc::Mem::FlexibleArray<QueueReader> theReaders; /// readers |
68353d5a | 75 | }; |
fa61cefe AR |
76 | |
77 | /** | |
78 | * Lockless fixed-capacity queue for a single writer and a single reader. | |
79 | * | |
80 | * If the queue is empty, the reader is considered "blocked" and needs | |
81 | * an out-of-band notification message to notice the next pushed item. | |
82 | * | |
83 | * Current implementation assumes that the writer cannot get blocked: if the | |
84 | * queue is full, the writer will just not push and come back later (with a | |
85 | * different value). We can add support for blocked writers if needed. | |
86 | */ | |
9199139f AR |
87 | class OneToOneUniQueue |
88 | { | |
9a51593d | 89 | public: |
fa61cefe | 90 | // pop() and push() exceptions; TODO: use TextException instead |
7a907247 | 91 | class Full {}; |
b2aa0934 DK |
92 | class ItemTooLarge {}; |
93 | ||
f5591061 | 94 | OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity); |
68353d5a | 95 | |
f5591061 DK |
96 | unsigned int maxItemSize() const { return theMaxItemSize; } |
97 | int size() const { return theSize; } | |
98 | int capacity() const { return theCapacity; } | |
99 | int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); } | |
9a51593d | 100 | |
f5591061 DK |
101 | bool empty() const { return !theSize; } |
102 | bool full() const { return theSize == theCapacity; } | |
9a51593d | 103 | |
b2aa0934 DK |
104 | static int Bytes2Items(const unsigned int maxItemSize, int size); |
105 | static int Items2Bytes(const unsigned int maxItemSize, const int size); | |
106 | ||
fa61cefe | 107 | /// returns true iff the value was set; [un]blocks the reader as needed |
f5591061 | 108 | template<class Value> bool pop(Value &value, QueueReader *const reader = NULL); |
fa61cefe AR |
109 | |
110 | /// returns true iff the caller must notify the reader of the pushed item | |
f5591061 | 111 | template<class Value> bool push(const Value &value, QueueReader *const reader = NULL); |
9a51593d | 112 | |
0a11e039 AR |
113 | /// returns true iff the value was set; the value may be stale! |
114 | template<class Value> bool peek(Value &value) const; | |
115 | ||
9a51593d | 116 | private: |
b2aa0934 | 117 | |
f5591061 DK |
118 | unsigned int theIn; ///< input index, used only in push() |
119 | unsigned int theOut; ///< output index, used only in pop() | |
68353d5a | 120 | |
794d4c0c | 121 | Atomic::Word theSize; ///< number of items in the queue |
f5591061 DK |
122 | const unsigned int theMaxItemSize; ///< maximum item size |
123 | const int theCapacity; ///< maximum number of items, i.e. theBuffer size | |
7a907247 | 124 | |
f5591061 DK |
125 | char theBuffer[]; |
126 | }; | |
68353d5a | 127 | |
f5591061 | 128 | /// shared array of OneToOneUniQueues |
9199139f AR |
129 | class OneToOneUniQueues |
130 | { | |
f5591061 DK |
131 | public: |
132 | OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity); | |
68353d5a | 133 | |
f5591061 DK |
134 | size_t sharedMemorySize() const; |
135 | static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity); | |
9a51593d | 136 | |
f5591061 DK |
137 | const OneToOneUniQueue &operator [](const int index) const; |
138 | inline OneToOneUniQueue &operator [](const int index); | |
9a51593d | 139 | |
f5591061 DK |
140 | private: |
141 | inline const OneToOneUniQueue &front() const; | |
fa61cefe | 142 | |
f5591061 DK |
143 | public: |
144 | const int theCapacity; /// number of OneToOneUniQueues | |
9a51593d DK |
145 | }; |
146 | ||
807feb1d DK |
147 | /** |
148 | * Base class for lockless fixed-capacity bidirectional queues for a | |
149 | * limited number processes. | |
150 | */ | |
151 | class BaseMultiQueue | |
152 | { | |
153 | public: | |
154 | BaseMultiQueue(const int aLocalProcessId); | |
4f8892c3 | 155 | virtual ~BaseMultiQueue() {} |
807feb1d DK |
156 | |
157 | /// clears the reader notification received by the local process from the remote process | |
158 | void clearReaderSignal(const int remoteProcessId); | |
159 | ||
160 | /// picks a process and calls OneToOneUniQueue::pop() using its queue | |
161 | template <class Value> bool pop(int &remoteProcessId, Value &value); | |
162 | ||
163 | /// calls OneToOneUniQueue::push() using the given process queue | |
164 | template <class Value> bool push(const int remoteProcessId, const Value &value); | |
165 | ||
166 | /// peeks at the item likely to be pop()ed next | |
167 | template<class Value> bool peek(int &remoteProcessId, Value &value) const; | |
168 | ||
169 | /// returns local reader's balance | |
170 | QueueReader::Balance &localBalance() { return localReader().balance; } | |
171 | ||
172 | /// returns reader's balance for a given remote process | |
173 | const QueueReader::Balance &balance(const int remoteProcessId) const; | |
174 | ||
175 | /// returns local reader's rate limit | |
176 | QueueReader::Rate &localRateLimit() { return localReader().rateLimit; } | |
177 | ||
178 | /// returns reader's rate limit for a given remote process | |
179 | const QueueReader::Rate &rateLimit(const int remoteProcessId) const; | |
180 | ||
181 | /// number of items in incoming queue from a given remote process | |
182 | int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); } | |
183 | ||
184 | /// number of items in outgoing queue to a given remote process | |
185 | int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); } | |
186 | ||
187 | protected: | |
188 | /// incoming queue from a given remote process | |
189 | virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0; | |
190 | OneToOneUniQueue &inQueue(const int remoteProcessId); | |
191 | ||
192 | /// outgoing queue to a given remote process | |
193 | virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0; | |
194 | OneToOneUniQueue &outQueue(const int remoteProcessId); | |
195 | ||
196 | virtual const QueueReader &localReader() const = 0; | |
197 | QueueReader &localReader(); | |
198 | ||
199 | virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0; | |
200 | QueueReader &remoteReader(const int remoteProcessId); | |
201 | ||
202 | virtual int remotesCount() const = 0; | |
203 | virtual int remotesIdOffset() const = 0; | |
204 | ||
205 | protected: | |
206 | const int theLocalProcessId; ///< process ID of this queue | |
207 | ||
208 | private: | |
209 | int theLastPopProcessId; ///< the ID of the last process we tried to pop() from | |
210 | }; | |
211 | ||
9a51593d DK |
212 | /** |
213 | * Lockless fixed-capacity bidirectional queue for a limited number | |
f5591061 DK |
214 | * processes. Allows communication between two groups of processes: |
215 | * any process in one group may send data to and receive from any | |
216 | * process in another group, but processes in the same group can not | |
217 | * communicate. Process in each group has a unique integer ID in | |
218 | * [groupIdOffset, groupIdOffset + groupSize) range. | |
9a51593d | 219 | */ |
807feb1d | 220 | class FewToFewBiQueue: public BaseMultiQueue |
9199139f | 221 | { |
9a51593d | 222 | public: |
f5591061 DK |
223 | typedef OneToOneUniQueue::Full Full; |
224 | typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; | |
225 | ||
226 | private: | |
227 | /// Shared metadata for FewToFewBiQueue | |
228 | struct Metadata { | |
229 | Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset); | |
230 | size_t sharedMemorySize() const { return sizeof(*this); } | |
231 | static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); } | |
232 | ||
233 | const int theGroupASize; | |
234 | const int theGroupAIdOffset; | |
235 | const int theGroupBSize; | |
236 | const int theGroupBIdOffset; | |
237 | }; | |
7a907247 | 238 | |
f5591061 | 239 | public: |
9199139f AR |
240 | class Owner |
241 | { | |
68353d5a | 242 | public: |
f5591061 | 243 | Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); |
68353d5a DK |
244 | ~Owner(); |
245 | ||
246 | private: | |
f5591061 DK |
247 | Mem::Owner<Metadata> *const metadataOwner; |
248 | Mem::Owner<OneToOneUniQueues> *const queuesOwner; | |
15cdbc7c | 249 | Mem::Owner<QueueReaders> *const readersOwner; |
68353d5a DK |
250 | }; |
251 | ||
f5591061 | 252 | static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); |
68353d5a | 253 | |
f5591061 DK |
254 | enum Group { groupA = 0, groupB = 1 }; |
255 | FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId); | |
9a51593d | 256 | |
ea2cdeb6 DK |
257 | /// maximum number of items in the queue |
258 | static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity); | |
259 | ||
807feb1d DK |
260 | /// finds the oldest item in incoming and outgoing queues between |
261 | /// us and the given remote process | |
262 | template<class Value> bool findOldest(const int remoteProcessId, Value &value) const; | |
263 | ||
264 | protected: | |
265 | virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const; | |
266 | virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const; | |
267 | virtual const QueueReader &localReader() const; | |
268 | virtual const QueueReader &remoteReader(const int processId) const; | |
269 | virtual int remotesCount() const; | |
270 | virtual int remotesIdOffset() const; | |
271 | ||
272 | private: | |
273 | bool validProcessId(const Group group, const int processId) const; | |
274 | int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; | |
275 | const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; | |
276 | int readerIndex(const Group group, const int processId) const; | |
f5591061 DK |
277 | Group localGroup() const { return theLocalGroup; } |
278 | Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; } | |
9a51593d | 279 | |
807feb1d DK |
280 | private: |
281 | const Mem::Pointer<Metadata> metadata; ///< shared metadata | |
282 | const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues | |
283 | const Mem::Pointer<QueueReaders> readers; ///< readers array | |
fa61cefe | 284 | |
807feb1d DK |
285 | const Group theLocalGroup; ///< group of this queue |
286 | }; | |
fa61cefe | 287 | |
807feb1d DK |
288 | /** |
289 | * Lockless fixed-capacity bidirectional queue for a limited number | |
290 | * processes. Any process may send data to and receive from any other | |
291 | * process (including itself). Each process has a unique integer ID in | |
292 | * [processIdOffset, processIdOffset + processCount) range. | |
293 | */ | |
294 | class MultiQueue: public BaseMultiQueue | |
295 | { | |
296 | public: | |
297 | typedef OneToOneUniQueue::Full Full; | |
298 | typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; | |
0a11e039 | 299 | |
807feb1d DK |
300 | private: |
301 | /// Shared metadata for MultiQueue | |
302 | struct Metadata { | |
303 | Metadata(const int aProcessCount, const int aProcessIdOffset); | |
304 | size_t sharedMemorySize() const { return sizeof(*this); } | |
305 | static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); } | |
df881a0f | 306 | |
807feb1d DK |
307 | const int theProcessCount; |
308 | const int theProcessIdOffset; | |
309 | }; | |
df881a0f | 310 | |
807feb1d DK |
311 | public: |
312 | class Owner | |
313 | { | |
314 | public: | |
315 | Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity); | |
316 | ~Owner(); | |
55939a01 | 317 | |
807feb1d DK |
318 | private: |
319 | Mem::Owner<Metadata> *const metadataOwner; | |
320 | Mem::Owner<OneToOneUniQueues> *const queuesOwner; | |
321 | Mem::Owner<QueueReaders> *const readersOwner; | |
322 | }; | |
df881a0f | 323 | |
807feb1d | 324 | static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity); |
55939a01 | 325 | |
807feb1d | 326 | MultiQueue(const String &id, const int localProcessId); |
55939a01 | 327 | |
807feb1d DK |
328 | protected: |
329 | virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const; | |
330 | virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const; | |
331 | virtual const QueueReader &localReader() const; | |
332 | virtual const QueueReader &remoteReader(const int remoteProcessId) const; | |
333 | virtual int remotesCount() const; | |
334 | virtual int remotesIdOffset() const; | |
55939a01 | 335 | |
f5591061 | 336 | private: |
807feb1d DK |
337 | bool validProcessId(const int processId) const; |
338 | const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const; | |
339 | const QueueReader &reader(const int processId) const; | |
fa61cefe | 340 | |
f5591061 DK |
341 | private: |
342 | const Mem::Pointer<Metadata> metadata; ///< shared metadata | |
343 | const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues | |
15cdbc7c | 344 | const Mem::Pointer<QueueReaders> readers; ///< readers array |
9a51593d DK |
345 | }; |
346 | ||
9a51593d DK |
347 | // OneToOneUniQueue |
348 | ||
9a51593d DK |
349 | template <class Value> |
350 | bool | |
f5591061 | 351 | OneToOneUniQueue::pop(Value &value, QueueReader *const reader) |
9a51593d | 352 | { |
f5591061 | 353 | if (sizeof(value) > theMaxItemSize) |
b2aa0934 DK |
354 | throw ItemTooLarge(); |
355 | ||
fa61cefe AR |
356 | // A writer might push between the empty test and block() below, so we do |
357 | // not return false right after calling block(), but test again. | |
358 | if (empty()) { | |
f5591061 DK |
359 | if (!reader) |
360 | return false; | |
361 | ||
362 | reader->block(); | |
fa61cefe AR |
363 | // A writer might push between the empty test and block() below, |
364 | // so we must test again as such a writer will not signal us. | |
365 | if (empty()) | |
366 | return false; | |
367 | } | |
9a51593d | 368 | |
f5591061 DK |
369 | if (reader) |
370 | reader->unblock(); | |
371 | ||
372 | const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize; | |
373 | memcpy(&value, theBuffer + pos, sizeof(value)); | |
374 | --theSize; | |
fa61cefe AR |
375 | |
376 | return true; | |
9a51593d DK |
377 | } |
378 | ||
0a11e039 AR |
379 | template <class Value> |
380 | bool | |
381 | OneToOneUniQueue::peek(Value &value) const | |
382 | { | |
383 | if (sizeof(value) > theMaxItemSize) | |
384 | throw ItemTooLarge(); | |
385 | ||
386 | if (empty()) | |
387 | return false; | |
388 | ||
389 | // the reader may pop() before we copy; making this method imprecise | |
390 | const unsigned int pos = (theOut % theCapacity) * theMaxItemSize; | |
391 | memcpy(&value, theBuffer + pos, sizeof(value)); | |
392 | return true; | |
393 | } | |
394 | ||
9a51593d DK |
395 | template <class Value> |
396 | bool | |
f5591061 | 397 | OneToOneUniQueue::push(const Value &value, QueueReader *const reader) |
9a51593d | 398 | { |
f5591061 | 399 | if (sizeof(value) > theMaxItemSize) |
b2aa0934 DK |
400 | throw ItemTooLarge(); |
401 | ||
9a51593d | 402 | if (full()) |
7a907247 | 403 | throw Full(); |
9a51593d | 404 | |
f5591061 DK |
405 | const unsigned int pos = theIn++ % theCapacity * theMaxItemSize; |
406 | memcpy(theBuffer + pos, &value, sizeof(value)); | |
ad40da43 | 407 | const bool wasEmpty = !theSize++; |
f5591061 DK |
408 | |
409 | return wasEmpty && (!reader || reader->raiseSignal()); | |
410 | } | |
411 | ||
f5591061 DK |
412 | // OneToOneUniQueues |
413 | ||
414 | inline OneToOneUniQueue & | |
415 | OneToOneUniQueues::operator [](const int index) | |
416 | { | |
417 | return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]); | |
418 | } | |
419 | ||
420 | inline const OneToOneUniQueue & | |
421 | OneToOneUniQueues::front() const | |
422 | { | |
423 | const char *const queue = | |
424 | reinterpret_cast<const char *>(this) + sizeof(*this); | |
425 | return *reinterpret_cast<const OneToOneUniQueue *>(queue); | |
9a51593d DK |
426 | } |
427 | ||
807feb1d | 428 | // BaseMultiQueue |
9a51593d | 429 | |
9a51593d DK |
430 | template <class Value> |
431 | bool | |
807feb1d | 432 | BaseMultiQueue::pop(int &remoteProcessId, Value &value) |
9a51593d | 433 | { |
807feb1d DK |
434 | // iterate all remote processes, starting after the one we visited last |
435 | for (int i = 0; i < remotesCount(); ++i) { | |
436 | if (++theLastPopProcessId >= remotesIdOffset() + remotesCount()) | |
437 | theLastPopProcessId = remotesIdOffset(); | |
438 | OneToOneUniQueue &queue = inQueue(theLastPopProcessId); | |
439 | if (queue.pop(value, &localReader())) { | |
f5591061 DK |
440 | remoteProcessId = theLastPopProcessId; |
441 | debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size()); | |
fa61cefe AR |
442 | return true; |
443 | } | |
9a51593d | 444 | } |
f5591061 | 445 | return false; // no process had anything to pop |
9a51593d DK |
446 | } |
447 | ||
448 | template <class Value> | |
449 | bool | |
807feb1d | 450 | BaseMultiQueue::push(const int remoteProcessId, const Value &value) |
9a51593d | 451 | { |
807feb1d DK |
452 | OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId); |
453 | QueueReader &reader = remoteReader(remoteProcessId); | |
f5591061 | 454 | debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size()); |
807feb1d | 455 | return remoteQueue.push(value, &reader); |
9a51593d DK |
456 | } |
457 | ||
807feb1d DK |
458 | template <class Value> |
459 | bool | |
460 | BaseMultiQueue::peek(int &remoteProcessId, Value &value) const | |
461 | { | |
462 | // mimic FewToFewBiQueue::pop() but quit just before popping | |
463 | int popProcessId = theLastPopProcessId; // preserve for future pop() | |
464 | for (int i = 0; i < remotesCount(); ++i) { | |
465 | if (++popProcessId >= remotesIdOffset() + remotesCount()) | |
466 | popProcessId = remotesIdOffset(); | |
467 | const OneToOneUniQueue &queue = inQueue(popProcessId); | |
468 | if (queue.peek(value)) { | |
469 | remoteProcessId = popProcessId; | |
470 | return true; | |
471 | } | |
472 | } | |
473 | return false; // most likely, no process had anything to pop | |
474 | } | |
475 | ||
476 | // FewToFewBiQueue | |
477 | ||
0a11e039 AR |
478 | template <class Value> |
479 | bool | |
5aac671b | 480 | FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const |
0a11e039 AR |
481 | { |
482 | // we may be called before remote process configured its queue end | |
483 | if (!validProcessId(remoteGroup(), remoteProcessId)) | |
484 | return false; | |
485 | ||
b8c75806 | 486 | // we need the oldest value, so start with the incoming, them-to-us queue: |
55939a01 AR |
487 | const OneToOneUniQueue &in = inQueue(remoteProcessId); |
488 | debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " << | |
489 | theLocalProcessId << " at " << in.size()); | |
490 | if (in.peek(value)) | |
b8c75806 AR |
491 | return true; |
492 | ||
493 | // if the incoming queue is empty, check the outgoing, us-to-them queue: | |
55939a01 AR |
494 | const OneToOneUniQueue &out = outQueue(remoteProcessId); |
495 | debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " << | |
496 | remoteProcessId << " at " << out.size()); | |
497 | return out.peek(value); | |
0a11e039 AR |
498 | } |
499 | ||
15cdbc7c DK |
500 | } // namespace Ipc |
501 | ||
9a51593d | 502 | #endif // SQUID_IPC_QUEUE_H |
f53969cc | 503 |