]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ipc/Queue.h
SourceFormat Enforcement
[thirdparty/squid.git] / src / ipc / Queue.h
CommitLineData
9a51593d 1/*
bbc27441
AJ
2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9a51593d
DK
7 */
8
9#ifndef SQUID_IPC_QUEUE_H
10#define SQUID_IPC_QUEUE_H
11
602d9612 12#include "base/InstanceId.h"
d5d5493b 13#include "Debug.h"
9a51593d 14#include "ipc/AtomicWord.h"
3a8c5551 15#include "ipc/mem/FlexibleArray.h"
68353d5a 16#include "ipc/mem/Pointer.h"
9a51593d
DK
17#include "util.h"
18
b2aa0934
DK
19class String;
20
9199139f
AR
21namespace Ipc
22{
15cdbc7c 23
fa61cefe
AR
24/// State of the reading end of a queue (i.e., of the code calling pop()).
25/// Multiple queues attached to one reader share this state.
9199139f
AR
26class QueueReader
27{
fa61cefe
AR
28public:
29 QueueReader(); // the initial state is "blocked without a signal"
30
31 /// whether the reader is waiting for a notification signal
32 bool blocked() const { return popBlocked == 1; }
33
34 /// marks the reader as blocked, waiting for a notification signal
35 void block() { popBlocked.swap_if(0, 1); }
36
37 /// removes the block() effects
38 void unblock() { popBlocked.swap_if(1, 0); }
39
40 /// if reader is blocked and not notified, marks the notification signal
41 /// as sent and not received, returning true; otherwise, returns false
42 bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); }
43
44 /// marks sent reader notification as received (also removes pop blocking)
45 void clearSignal() { unblock(); popSignal.swap_if(1,0); }
46
47private:
794d4c0c
DK
48 Atomic::Word popBlocked; ///< whether the reader is blocked on pop()
49 Atomic::Word popSignal; ///< whether writer has sent and reader has not received notification
fa61cefe
AR
50
51public:
794d4c0c 52 typedef Atomic::Word Rate; ///< pop()s per second
df881a0f
AR
53 Rate rateLimit; ///< pop()s per second limit if positive
54
e9adbbd7 55 // we need a signed atomic type because balance may get negative
794d4c0c 56 typedef Atomic::WordT<int> AtomicSignedMsec;
df881a0f
AR
57 typedef AtomicSignedMsec Balance;
58 /// how far ahead the reader is compared to a perfect read/sec event rate
59 Balance balance;
60
fa61cefe
AR
61 /// unique ID for debugging which reader is used (works across processes)
62 const InstanceId<QueueReader> id;
63};
64
68353d5a 65/// shared array of QueueReaders
9199139f
AR
66class QueueReaders
67{
68353d5a
DK
68public:
69 QueueReaders(const int aCapacity);
70 size_t sharedMemorySize() const;
71 static size_t SharedMemorySize(const int capacity);
72
73 const int theCapacity; /// number of readers
3a8c5551 74 Ipc::Mem::FlexibleArray<QueueReader> theReaders; /// readers
68353d5a 75};
fa61cefe
AR
76
77/**
78 * Lockless fixed-capacity queue for a single writer and a single reader.
79 *
80 * If the queue is empty, the reader is considered "blocked" and needs
81 * an out-of-band notification message to notice the next pushed item.
82 *
83 * Current implementation assumes that the writer cannot get blocked: if the
84 * queue is full, the writer will just not push and come back later (with a
85 * different value). We can add support for blocked writers if needed.
86 */
9199139f
AR
87class OneToOneUniQueue
88{
9a51593d 89public:
fa61cefe 90 // pop() and push() exceptions; TODO: use TextException instead
7a907247 91 class Full {};
b2aa0934
DK
92 class ItemTooLarge {};
93
f5591061 94 OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity);
68353d5a 95
f5591061
DK
96 unsigned int maxItemSize() const { return theMaxItemSize; }
97 int size() const { return theSize; }
98 int capacity() const { return theCapacity; }
99 int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); }
9a51593d 100
f5591061
DK
101 bool empty() const { return !theSize; }
102 bool full() const { return theSize == theCapacity; }
9a51593d 103
b2aa0934
DK
104 static int Bytes2Items(const unsigned int maxItemSize, int size);
105 static int Items2Bytes(const unsigned int maxItemSize, const int size);
106
fa61cefe 107 /// returns true iff the value was set; [un]blocks the reader as needed
f5591061 108 template<class Value> bool pop(Value &value, QueueReader *const reader = NULL);
fa61cefe
AR
109
110 /// returns true iff the caller must notify the reader of the pushed item
f5591061 111 template<class Value> bool push(const Value &value, QueueReader *const reader = NULL);
9a51593d 112
0a11e039
AR
113 /// returns true iff the value was set; the value may be stale!
114 template<class Value> bool peek(Value &value) const;
115
9a51593d 116private:
b2aa0934 117
f5591061
DK
118 unsigned int theIn; ///< input index, used only in push()
119 unsigned int theOut; ///< output index, used only in pop()
68353d5a 120
794d4c0c 121 Atomic::Word theSize; ///< number of items in the queue
f5591061
DK
122 const unsigned int theMaxItemSize; ///< maximum item size
123 const int theCapacity; ///< maximum number of items, i.e. theBuffer size
7a907247 124
f5591061
DK
125 char theBuffer[];
126};
68353d5a 127
f5591061 128/// shared array of OneToOneUniQueues
9199139f
AR
129class OneToOneUniQueues
130{
f5591061
DK
131public:
132 OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity);
68353d5a 133
f5591061
DK
134 size_t sharedMemorySize() const;
135 static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity);
9a51593d 136
f5591061
DK
137 const OneToOneUniQueue &operator [](const int index) const;
138 inline OneToOneUniQueue &operator [](const int index);
9a51593d 139
f5591061
DK
140private:
141 inline const OneToOneUniQueue &front() const;
fa61cefe 142
f5591061
DK
143public:
144 const int theCapacity; /// number of OneToOneUniQueues
9a51593d
DK
145};
146
807feb1d
DK
147/**
148 * Base class for lockless fixed-capacity bidirectional queues for a
149 * limited number processes.
150 */
151class BaseMultiQueue
152{
153public:
154 BaseMultiQueue(const int aLocalProcessId);
4f8892c3 155 virtual ~BaseMultiQueue() {}
807feb1d
DK
156
157 /// clears the reader notification received by the local process from the remote process
158 void clearReaderSignal(const int remoteProcessId);
159
160 /// picks a process and calls OneToOneUniQueue::pop() using its queue
161 template <class Value> bool pop(int &remoteProcessId, Value &value);
162
163 /// calls OneToOneUniQueue::push() using the given process queue
164 template <class Value> bool push(const int remoteProcessId, const Value &value);
165
166 /// peeks at the item likely to be pop()ed next
167 template<class Value> bool peek(int &remoteProcessId, Value &value) const;
168
169 /// returns local reader's balance
170 QueueReader::Balance &localBalance() { return localReader().balance; }
171
172 /// returns reader's balance for a given remote process
173 const QueueReader::Balance &balance(const int remoteProcessId) const;
174
175 /// returns local reader's rate limit
176 QueueReader::Rate &localRateLimit() { return localReader().rateLimit; }
177
178 /// returns reader's rate limit for a given remote process
179 const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
180
181 /// number of items in incoming queue from a given remote process
182 int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
183
184 /// number of items in outgoing queue to a given remote process
185 int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
186
187protected:
188 /// incoming queue from a given remote process
189 virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0;
190 OneToOneUniQueue &inQueue(const int remoteProcessId);
191
192 /// outgoing queue to a given remote process
193 virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0;
194 OneToOneUniQueue &outQueue(const int remoteProcessId);
195
196 virtual const QueueReader &localReader() const = 0;
197 QueueReader &localReader();
198
199 virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0;
200 QueueReader &remoteReader(const int remoteProcessId);
201
202 virtual int remotesCount() const = 0;
203 virtual int remotesIdOffset() const = 0;
204
205protected:
206 const int theLocalProcessId; ///< process ID of this queue
207
208private:
209 int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
210};
211
9a51593d
DK
212/**
213 * Lockless fixed-capacity bidirectional queue for a limited number
f5591061
DK
214 * processes. Allows communication between two groups of processes:
215 * any process in one group may send data to and receive from any
216 * process in another group, but processes in the same group can not
217 * communicate. Process in each group has a unique integer ID in
218 * [groupIdOffset, groupIdOffset + groupSize) range.
9a51593d 219 */
807feb1d 220class FewToFewBiQueue: public BaseMultiQueue
9199139f 221{
9a51593d 222public:
f5591061
DK
223 typedef OneToOneUniQueue::Full Full;
224 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
225
226private:
227 /// Shared metadata for FewToFewBiQueue
228 struct Metadata {
229 Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset);
230 size_t sharedMemorySize() const { return sizeof(*this); }
231 static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); }
232
233 const int theGroupASize;
234 const int theGroupAIdOffset;
235 const int theGroupBSize;
236 const int theGroupBIdOffset;
237 };
7a907247 238
f5591061 239public:
9199139f
AR
240 class Owner
241 {
68353d5a 242 public:
f5591061 243 Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
68353d5a
DK
244 ~Owner();
245
246 private:
f5591061
DK
247 Mem::Owner<Metadata> *const metadataOwner;
248 Mem::Owner<OneToOneUniQueues> *const queuesOwner;
15cdbc7c 249 Mem::Owner<QueueReaders> *const readersOwner;
68353d5a
DK
250 };
251
f5591061 252 static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
68353d5a 253
f5591061
DK
254 enum Group { groupA = 0, groupB = 1 };
255 FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId);
9a51593d 256
ea2cdeb6
DK
257 /// maximum number of items in the queue
258 static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity);
259
807feb1d
DK
260 /// finds the oldest item in incoming and outgoing queues between
261 /// us and the given remote process
262 template<class Value> bool findOldest(const int remoteProcessId, Value &value) const;
263
264protected:
265 virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
266 virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
267 virtual const QueueReader &localReader() const;
268 virtual const QueueReader &remoteReader(const int processId) const;
269 virtual int remotesCount() const;
270 virtual int remotesIdOffset() const;
271
272private:
273 bool validProcessId(const Group group, const int processId) const;
274 int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
275 const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
276 int readerIndex(const Group group, const int processId) const;
f5591061
DK
277 Group localGroup() const { return theLocalGroup; }
278 Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
9a51593d 279
807feb1d
DK
280private:
281 const Mem::Pointer<Metadata> metadata; ///< shared metadata
282 const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
283 const Mem::Pointer<QueueReaders> readers; ///< readers array
fa61cefe 284
807feb1d
DK
285 const Group theLocalGroup; ///< group of this queue
286};
fa61cefe 287
807feb1d
DK
288/**
289 * Lockless fixed-capacity bidirectional queue for a limited number
290 * processes. Any process may send data to and receive from any other
291 * process (including itself). Each process has a unique integer ID in
292 * [processIdOffset, processIdOffset + processCount) range.
293 */
294class MultiQueue: public BaseMultiQueue
295{
296public:
297 typedef OneToOneUniQueue::Full Full;
298 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
0a11e039 299
807feb1d
DK
300private:
301 /// Shared metadata for MultiQueue
302 struct Metadata {
303 Metadata(const int aProcessCount, const int aProcessIdOffset);
304 size_t sharedMemorySize() const { return sizeof(*this); }
305 static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); }
df881a0f 306
807feb1d
DK
307 const int theProcessCount;
308 const int theProcessIdOffset;
309 };
df881a0f 310
807feb1d
DK
311public:
312 class Owner
313 {
314 public:
315 Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
316 ~Owner();
55939a01 317
807feb1d
DK
318 private:
319 Mem::Owner<Metadata> *const metadataOwner;
320 Mem::Owner<OneToOneUniQueues> *const queuesOwner;
321 Mem::Owner<QueueReaders> *const readersOwner;
322 };
df881a0f 323
807feb1d 324 static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
55939a01 325
807feb1d 326 MultiQueue(const String &id, const int localProcessId);
55939a01 327
807feb1d
DK
328protected:
329 virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
330 virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
331 virtual const QueueReader &localReader() const;
332 virtual const QueueReader &remoteReader(const int remoteProcessId) const;
333 virtual int remotesCount() const;
334 virtual int remotesIdOffset() const;
55939a01 335
f5591061 336private:
807feb1d
DK
337 bool validProcessId(const int processId) const;
338 const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const;
339 const QueueReader &reader(const int processId) const;
fa61cefe 340
f5591061
DK
341private:
342 const Mem::Pointer<Metadata> metadata; ///< shared metadata
343 const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
15cdbc7c 344 const Mem::Pointer<QueueReaders> readers; ///< readers array
9a51593d
DK
345};
346
9a51593d
DK
347// OneToOneUniQueue
348
9a51593d
DK
349template <class Value>
350bool
f5591061 351OneToOneUniQueue::pop(Value &value, QueueReader *const reader)
9a51593d 352{
f5591061 353 if (sizeof(value) > theMaxItemSize)
b2aa0934
DK
354 throw ItemTooLarge();
355
fa61cefe
AR
356 // A writer might push between the empty test and block() below, so we do
357 // not return false right after calling block(), but test again.
358 if (empty()) {
f5591061
DK
359 if (!reader)
360 return false;
361
362 reader->block();
fa61cefe
AR
363 // A writer might push between the empty test and block() below,
364 // so we must test again as such a writer will not signal us.
365 if (empty())
366 return false;
367 }
9a51593d 368
f5591061
DK
369 if (reader)
370 reader->unblock();
371
372 const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize;
373 memcpy(&value, theBuffer + pos, sizeof(value));
374 --theSize;
fa61cefe
AR
375
376 return true;
9a51593d
DK
377}
378
0a11e039
AR
379template <class Value>
380bool
381OneToOneUniQueue::peek(Value &value) const
382{
383 if (sizeof(value) > theMaxItemSize)
384 throw ItemTooLarge();
385
386 if (empty())
387 return false;
388
389 // the reader may pop() before we copy; making this method imprecise
390 const unsigned int pos = (theOut % theCapacity) * theMaxItemSize;
391 memcpy(&value, theBuffer + pos, sizeof(value));
392 return true;
393}
394
9a51593d
DK
395template <class Value>
396bool
f5591061 397OneToOneUniQueue::push(const Value &value, QueueReader *const reader)
9a51593d 398{
f5591061 399 if (sizeof(value) > theMaxItemSize)
b2aa0934
DK
400 throw ItemTooLarge();
401
9a51593d 402 if (full())
7a907247 403 throw Full();
9a51593d 404
f5591061
DK
405 const unsigned int pos = theIn++ % theCapacity * theMaxItemSize;
406 memcpy(theBuffer + pos, &value, sizeof(value));
ad40da43 407 const bool wasEmpty = !theSize++;
f5591061
DK
408
409 return wasEmpty && (!reader || reader->raiseSignal());
410}
411
f5591061
DK
412// OneToOneUniQueues
413
414inline OneToOneUniQueue &
415OneToOneUniQueues::operator [](const int index)
416{
417 return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]);
418}
419
420inline const OneToOneUniQueue &
421OneToOneUniQueues::front() const
422{
423 const char *const queue =
424 reinterpret_cast<const char *>(this) + sizeof(*this);
425 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
9a51593d
DK
426}
427
807feb1d 428// BaseMultiQueue
9a51593d 429
9a51593d
DK
430template <class Value>
431bool
807feb1d 432BaseMultiQueue::pop(int &remoteProcessId, Value &value)
9a51593d 433{
807feb1d
DK
434 // iterate all remote processes, starting after the one we visited last
435 for (int i = 0; i < remotesCount(); ++i) {
436 if (++theLastPopProcessId >= remotesIdOffset() + remotesCount())
437 theLastPopProcessId = remotesIdOffset();
438 OneToOneUniQueue &queue = inQueue(theLastPopProcessId);
439 if (queue.pop(value, &localReader())) {
f5591061
DK
440 remoteProcessId = theLastPopProcessId;
441 debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
fa61cefe
AR
442 return true;
443 }
9a51593d 444 }
f5591061 445 return false; // no process had anything to pop
9a51593d
DK
446}
447
448template <class Value>
449bool
807feb1d 450BaseMultiQueue::push(const int remoteProcessId, const Value &value)
9a51593d 451{
807feb1d
DK
452 OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId);
453 QueueReader &reader = remoteReader(remoteProcessId);
f5591061 454 debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
807feb1d 455 return remoteQueue.push(value, &reader);
9a51593d
DK
456}
457
807feb1d
DK
458template <class Value>
459bool
460BaseMultiQueue::peek(int &remoteProcessId, Value &value) const
461{
462 // mimic FewToFewBiQueue::pop() but quit just before popping
463 int popProcessId = theLastPopProcessId; // preserve for future pop()
464 for (int i = 0; i < remotesCount(); ++i) {
465 if (++popProcessId >= remotesIdOffset() + remotesCount())
466 popProcessId = remotesIdOffset();
467 const OneToOneUniQueue &queue = inQueue(popProcessId);
468 if (queue.peek(value)) {
469 remoteProcessId = popProcessId;
470 return true;
471 }
472 }
473 return false; // most likely, no process had anything to pop
474}
475
476// FewToFewBiQueue
477
0a11e039
AR
478template <class Value>
479bool
5aac671b 480FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const
0a11e039
AR
481{
482 // we may be called before remote process configured its queue end
483 if (!validProcessId(remoteGroup(), remoteProcessId))
484 return false;
485
b8c75806 486 // we need the oldest value, so start with the incoming, them-to-us queue:
55939a01
AR
487 const OneToOneUniQueue &in = inQueue(remoteProcessId);
488 debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " <<
489 theLocalProcessId << " at " << in.size());
490 if (in.peek(value))
b8c75806
AR
491 return true;
492
493 // if the incoming queue is empty, check the outgoing, us-to-them queue:
55939a01
AR
494 const OneToOneUniQueue &out = outQueue(remoteProcessId);
495 debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " <<
496 remoteProcessId << " at " << out.size());
497 return out.peek(value);
0a11e039
AR
498}
499
15cdbc7c
DK
500} // namespace Ipc
501
9a51593d 502#endif // SQUID_IPC_QUEUE_H
f53969cc 503