]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ipc/Queue.h
Docs: Copyright updates for 2018 (#114)
[thirdparty/squid.git] / src / ipc / Queue.h
CommitLineData
9a51593d 1/*
5b74111a 2 * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
bbc27441
AJ
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9a51593d
DK
7 */
8
9#ifndef SQUID_IPC_QUEUE_H
10#define SQUID_IPC_QUEUE_H
11
602d9612 12#include "base/InstanceId.h"
d5d5493b 13#include "Debug.h"
3a8c5551 14#include "ipc/mem/FlexibleArray.h"
68353d5a 15#include "ipc/mem/Pointer.h"
9a51593d
DK
16#include "util.h"
17
6c6a656a
AJ
18#include <atomic>
19
b2aa0934
DK
20class String;
21
9199139f
AR
22namespace Ipc
23{
15cdbc7c 24
fa61cefe
AR
25/// State of the reading end of a queue (i.e., of the code calling pop()).
26/// Multiple queues attached to one reader share this state.
9199139f
AR
27class QueueReader
28{
fa61cefe
AR
29public:
30 QueueReader(); // the initial state is "blocked without a signal"
31
32 /// whether the reader is waiting for a notification signal
6c6a656a 33 bool blocked() const { return popBlocked.load(); }
fa61cefe
AR
34
35 /// marks the reader as blocked, waiting for a notification signal
6c6a656a 36 void block() { popBlocked.store(true); }
fa61cefe
AR
37
38 /// removes the block() effects
6c6a656a 39 void unblock() { popBlocked.store(false); }
fa61cefe
AR
40
41 /// if reader is blocked and not notified, marks the notification signal
42 /// as sent and not received, returning true; otherwise, returns false
6c6a656a 43 bool raiseSignal() { return blocked() && !popSignal.exchange(true); }
fa61cefe
AR
44
45 /// marks sent reader notification as received (also removes pop blocking)
6c6a656a 46 void clearSignal() { unblock(); popSignal.store(false); }
fa61cefe
AR
47
48private:
6c6a656a
AJ
49 std::atomic<bool> popBlocked; ///< whether the reader is blocked on pop()
50 std::atomic<bool> popSignal; ///< whether writer has sent and reader has not received notification
fa61cefe
AR
51
52public:
6c6a656a 53 typedef std::atomic<int> Rate; ///< pop()s per second
df881a0f
AR
54 Rate rateLimit; ///< pop()s per second limit if positive
55
e9adbbd7 56 // we need a signed atomic type because balance may get negative
6c6a656a 57 typedef std::atomic<int> AtomicSignedMsec;
df881a0f
AR
58 typedef AtomicSignedMsec Balance;
59 /// how far ahead the reader is compared to a perfect read/sec event rate
60 Balance balance;
61
fa61cefe
AR
62 /// unique ID for debugging which reader is used (works across processes)
63 const InstanceId<QueueReader> id;
64};
65
68353d5a 66/// shared array of QueueReaders
9199139f
AR
67class QueueReaders
68{
68353d5a
DK
69public:
70 QueueReaders(const int aCapacity);
71 size_t sharedMemorySize() const;
72 static size_t SharedMemorySize(const int capacity);
73
74 const int theCapacity; /// number of readers
3a8c5551 75 Ipc::Mem::FlexibleArray<QueueReader> theReaders; /// readers
68353d5a 76};
fa61cefe
AR
77
78/**
79 * Lockless fixed-capacity queue for a single writer and a single reader.
80 *
81 * If the queue is empty, the reader is considered "blocked" and needs
82 * an out-of-band notification message to notice the next pushed item.
83 *
84 * Current implementation assumes that the writer cannot get blocked: if the
85 * queue is full, the writer will just not push and come back later (with a
86 * different value). We can add support for blocked writers if needed.
87 */
9199139f
AR
88class OneToOneUniQueue
89{
9a51593d 90public:
fa61cefe 91 // pop() and push() exceptions; TODO: use TextException instead
7a907247 92 class Full {};
b2aa0934
DK
93 class ItemTooLarge {};
94
f5591061 95 OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity);
68353d5a 96
f5591061
DK
97 unsigned int maxItemSize() const { return theMaxItemSize; }
98 int size() const { return theSize; }
99 int capacity() const { return theCapacity; }
100 int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); }
9a51593d 101
f5591061
DK
102 bool empty() const { return !theSize; }
103 bool full() const { return theSize == theCapacity; }
9a51593d 104
b2aa0934
DK
105 static int Bytes2Items(const unsigned int maxItemSize, int size);
106 static int Items2Bytes(const unsigned int maxItemSize, const int size);
107
fa61cefe 108 /// returns true iff the value was set; [un]blocks the reader as needed
f5591061 109 template<class Value> bool pop(Value &value, QueueReader *const reader = NULL);
fa61cefe
AR
110
111 /// returns true iff the caller must notify the reader of the pushed item
f5591061 112 template<class Value> bool push(const Value &value, QueueReader *const reader = NULL);
9a51593d 113
0a11e039
AR
114 /// returns true iff the value was set; the value may be stale!
115 template<class Value> bool peek(Value &value) const;
116
9a51593d 117private:
b2aa0934 118
f5591061
DK
119 unsigned int theIn; ///< input index, used only in push()
120 unsigned int theOut; ///< output index, used only in pop()
68353d5a 121
6c6a656a 122 std::atomic<uint32_t> theSize; ///< number of items in the queue
f5591061 123 const unsigned int theMaxItemSize; ///< maximum item size
6c6a656a 124 const uint32_t theCapacity; ///< maximum number of items, i.e. theBuffer size
7a907247 125
f5591061
DK
126 char theBuffer[];
127};
68353d5a 128
f5591061 129/// shared array of OneToOneUniQueues
9199139f
AR
130class OneToOneUniQueues
131{
f5591061
DK
132public:
133 OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity);
68353d5a 134
f5591061
DK
135 size_t sharedMemorySize() const;
136 static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity);
9a51593d 137
f5591061
DK
138 const OneToOneUniQueue &operator [](const int index) const;
139 inline OneToOneUniQueue &operator [](const int index);
9a51593d 140
f5591061
DK
141private:
142 inline const OneToOneUniQueue &front() const;
fa61cefe 143
f5591061
DK
144public:
145 const int theCapacity; /// number of OneToOneUniQueues
9a51593d
DK
146};
147
807feb1d
DK
148/**
149 * Base class for lockless fixed-capacity bidirectional queues for a
150 * limited number processes.
151 */
152class BaseMultiQueue
153{
154public:
155 BaseMultiQueue(const int aLocalProcessId);
4f8892c3 156 virtual ~BaseMultiQueue() {}
807feb1d
DK
157
158 /// clears the reader notification received by the local process from the remote process
159 void clearReaderSignal(const int remoteProcessId);
160
161 /// picks a process and calls OneToOneUniQueue::pop() using its queue
162 template <class Value> bool pop(int &remoteProcessId, Value &value);
163
164 /// calls OneToOneUniQueue::push() using the given process queue
165 template <class Value> bool push(const int remoteProcessId, const Value &value);
166
167 /// peeks at the item likely to be pop()ed next
168 template<class Value> bool peek(int &remoteProcessId, Value &value) const;
169
170 /// returns local reader's balance
171 QueueReader::Balance &localBalance() { return localReader().balance; }
172
173 /// returns reader's balance for a given remote process
174 const QueueReader::Balance &balance(const int remoteProcessId) const;
175
176 /// returns local reader's rate limit
177 QueueReader::Rate &localRateLimit() { return localReader().rateLimit; }
178
179 /// returns reader's rate limit for a given remote process
180 const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
181
182 /// number of items in incoming queue from a given remote process
183 int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
184
185 /// number of items in outgoing queue to a given remote process
186 int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
187
188protected:
189 /// incoming queue from a given remote process
190 virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0;
191 OneToOneUniQueue &inQueue(const int remoteProcessId);
192
193 /// outgoing queue to a given remote process
194 virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0;
195 OneToOneUniQueue &outQueue(const int remoteProcessId);
196
197 virtual const QueueReader &localReader() const = 0;
198 QueueReader &localReader();
199
200 virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0;
201 QueueReader &remoteReader(const int remoteProcessId);
202
203 virtual int remotesCount() const = 0;
204 virtual int remotesIdOffset() const = 0;
205
206protected:
207 const int theLocalProcessId; ///< process ID of this queue
208
209private:
210 int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
211};
212
9a51593d
DK
213/**
214 * Lockless fixed-capacity bidirectional queue for a limited number
f5591061
DK
215 * processes. Allows communication between two groups of processes:
216 * any process in one group may send data to and receive from any
217 * process in another group, but processes in the same group can not
218 * communicate. Process in each group has a unique integer ID in
219 * [groupIdOffset, groupIdOffset + groupSize) range.
9a51593d 220 */
807feb1d 221class FewToFewBiQueue: public BaseMultiQueue
9199139f 222{
9a51593d 223public:
f5591061
DK
224 typedef OneToOneUniQueue::Full Full;
225 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
226
227private:
228 /// Shared metadata for FewToFewBiQueue
229 struct Metadata {
230 Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset);
231 size_t sharedMemorySize() const { return sizeof(*this); }
232 static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); }
233
234 const int theGroupASize;
235 const int theGroupAIdOffset;
236 const int theGroupBSize;
237 const int theGroupBIdOffset;
238 };
7a907247 239
f5591061 240public:
9199139f
AR
241 class Owner
242 {
68353d5a 243 public:
f5591061 244 Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
68353d5a
DK
245 ~Owner();
246
247 private:
f5591061
DK
248 Mem::Owner<Metadata> *const metadataOwner;
249 Mem::Owner<OneToOneUniQueues> *const queuesOwner;
15cdbc7c 250 Mem::Owner<QueueReaders> *const readersOwner;
68353d5a
DK
251 };
252
f5591061 253 static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
68353d5a 254
f5591061
DK
255 enum Group { groupA = 0, groupB = 1 };
256 FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId);
9a51593d 257
ea2cdeb6
DK
258 /// maximum number of items in the queue
259 static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity);
260
807feb1d
DK
261 /// finds the oldest item in incoming and outgoing queues between
262 /// us and the given remote process
263 template<class Value> bool findOldest(const int remoteProcessId, Value &value) const;
264
265protected:
266 virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
267 virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
268 virtual const QueueReader &localReader() const;
269 virtual const QueueReader &remoteReader(const int processId) const;
270 virtual int remotesCount() const;
271 virtual int remotesIdOffset() const;
272
273private:
274 bool validProcessId(const Group group, const int processId) const;
275 int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
276 const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
277 int readerIndex(const Group group, const int processId) const;
f5591061
DK
278 Group localGroup() const { return theLocalGroup; }
279 Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
9a51593d 280
807feb1d
DK
281private:
282 const Mem::Pointer<Metadata> metadata; ///< shared metadata
283 const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
284 const Mem::Pointer<QueueReaders> readers; ///< readers array
fa61cefe 285
807feb1d
DK
286 const Group theLocalGroup; ///< group of this queue
287};
fa61cefe 288
807feb1d
DK
289/**
290 * Lockless fixed-capacity bidirectional queue for a limited number
291 * processes. Any process may send data to and receive from any other
292 * process (including itself). Each process has a unique integer ID in
293 * [processIdOffset, processIdOffset + processCount) range.
294 */
295class MultiQueue: public BaseMultiQueue
296{
297public:
298 typedef OneToOneUniQueue::Full Full;
299 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
0a11e039 300
807feb1d
DK
301private:
302 /// Shared metadata for MultiQueue
303 struct Metadata {
304 Metadata(const int aProcessCount, const int aProcessIdOffset);
305 size_t sharedMemorySize() const { return sizeof(*this); }
306 static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); }
df881a0f 307
807feb1d
DK
308 const int theProcessCount;
309 const int theProcessIdOffset;
310 };
df881a0f 311
807feb1d
DK
312public:
313 class Owner
314 {
315 public:
316 Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
317 ~Owner();
55939a01 318
807feb1d
DK
319 private:
320 Mem::Owner<Metadata> *const metadataOwner;
321 Mem::Owner<OneToOneUniQueues> *const queuesOwner;
322 Mem::Owner<QueueReaders> *const readersOwner;
323 };
df881a0f 324
807feb1d 325 static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
55939a01 326
807feb1d 327 MultiQueue(const String &id, const int localProcessId);
55939a01 328
807feb1d
DK
329protected:
330 virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
331 virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
332 virtual const QueueReader &localReader() const;
333 virtual const QueueReader &remoteReader(const int remoteProcessId) const;
334 virtual int remotesCount() const;
335 virtual int remotesIdOffset() const;
55939a01 336
f5591061 337private:
807feb1d
DK
338 bool validProcessId(const int processId) const;
339 const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const;
340 const QueueReader &reader(const int processId) const;
fa61cefe 341
f5591061
DK
342private:
343 const Mem::Pointer<Metadata> metadata; ///< shared metadata
344 const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
15cdbc7c 345 const Mem::Pointer<QueueReaders> readers; ///< readers array
9a51593d
DK
346};
347
9a51593d
DK
348// OneToOneUniQueue
349
9a51593d
DK
350template <class Value>
351bool
f5591061 352OneToOneUniQueue::pop(Value &value, QueueReader *const reader)
9a51593d 353{
f5591061 354 if (sizeof(value) > theMaxItemSize)
b2aa0934
DK
355 throw ItemTooLarge();
356
fa61cefe
AR
357 // A writer might push between the empty test and block() below, so we do
358 // not return false right after calling block(), but test again.
359 if (empty()) {
f5591061
DK
360 if (!reader)
361 return false;
362
363 reader->block();
fa61cefe
AR
364 // A writer might push between the empty test and block() below,
365 // so we must test again as such a writer will not signal us.
366 if (empty())
367 return false;
368 }
9a51593d 369
f5591061
DK
370 if (reader)
371 reader->unblock();
372
373 const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize;
374 memcpy(&value, theBuffer + pos, sizeof(value));
375 --theSize;
fa61cefe
AR
376
377 return true;
9a51593d
DK
378}
379
0a11e039
AR
380template <class Value>
381bool
382OneToOneUniQueue::peek(Value &value) const
383{
384 if (sizeof(value) > theMaxItemSize)
385 throw ItemTooLarge();
386
387 if (empty())
388 return false;
389
390 // the reader may pop() before we copy; making this method imprecise
391 const unsigned int pos = (theOut % theCapacity) * theMaxItemSize;
392 memcpy(&value, theBuffer + pos, sizeof(value));
393 return true;
394}
395
9a51593d
DK
396template <class Value>
397bool
f5591061 398OneToOneUniQueue::push(const Value &value, QueueReader *const reader)
9a51593d 399{
f5591061 400 if (sizeof(value) > theMaxItemSize)
b2aa0934
DK
401 throw ItemTooLarge();
402
9a51593d 403 if (full())
7a907247 404 throw Full();
9a51593d 405
f5591061
DK
406 const unsigned int pos = theIn++ % theCapacity * theMaxItemSize;
407 memcpy(theBuffer + pos, &value, sizeof(value));
ad40da43 408 const bool wasEmpty = !theSize++;
f5591061
DK
409
410 return wasEmpty && (!reader || reader->raiseSignal());
411}
412
f5591061
DK
413// OneToOneUniQueues
414
415inline OneToOneUniQueue &
416OneToOneUniQueues::operator [](const int index)
417{
418 return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]);
419}
420
421inline const OneToOneUniQueue &
422OneToOneUniQueues::front() const
423{
424 const char *const queue =
425 reinterpret_cast<const char *>(this) + sizeof(*this);
426 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
9a51593d
DK
427}
428
807feb1d 429// BaseMultiQueue
9a51593d 430
9a51593d
DK
431template <class Value>
432bool
807feb1d 433BaseMultiQueue::pop(int &remoteProcessId, Value &value)
9a51593d 434{
807feb1d
DK
435 // iterate all remote processes, starting after the one we visited last
436 for (int i = 0; i < remotesCount(); ++i) {
437 if (++theLastPopProcessId >= remotesIdOffset() + remotesCount())
438 theLastPopProcessId = remotesIdOffset();
439 OneToOneUniQueue &queue = inQueue(theLastPopProcessId);
440 if (queue.pop(value, &localReader())) {
f5591061
DK
441 remoteProcessId = theLastPopProcessId;
442 debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
fa61cefe
AR
443 return true;
444 }
9a51593d 445 }
f5591061 446 return false; // no process had anything to pop
9a51593d
DK
447}
448
449template <class Value>
450bool
807feb1d 451BaseMultiQueue::push(const int remoteProcessId, const Value &value)
9a51593d 452{
807feb1d
DK
453 OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId);
454 QueueReader &reader = remoteReader(remoteProcessId);
f5591061 455 debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
807feb1d 456 return remoteQueue.push(value, &reader);
9a51593d
DK
457}
458
807feb1d
DK
459template <class Value>
460bool
461BaseMultiQueue::peek(int &remoteProcessId, Value &value) const
462{
463 // mimic FewToFewBiQueue::pop() but quit just before popping
464 int popProcessId = theLastPopProcessId; // preserve for future pop()
465 for (int i = 0; i < remotesCount(); ++i) {
466 if (++popProcessId >= remotesIdOffset() + remotesCount())
467 popProcessId = remotesIdOffset();
468 const OneToOneUniQueue &queue = inQueue(popProcessId);
469 if (queue.peek(value)) {
470 remoteProcessId = popProcessId;
471 return true;
472 }
473 }
474 return false; // most likely, no process had anything to pop
475}
476
477// FewToFewBiQueue
478
0a11e039
AR
479template <class Value>
480bool
5aac671b 481FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const
0a11e039
AR
482{
483 // we may be called before remote process configured its queue end
484 if (!validProcessId(remoteGroup(), remoteProcessId))
485 return false;
486
b8c75806 487 // we need the oldest value, so start with the incoming, them-to-us queue:
55939a01
AR
488 const OneToOneUniQueue &in = inQueue(remoteProcessId);
489 debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " <<
490 theLocalProcessId << " at " << in.size());
491 if (in.peek(value))
b8c75806
AR
492 return true;
493
494 // if the incoming queue is empty, check the outgoing, us-to-them queue:
55939a01
AR
495 const OneToOneUniQueue &out = outQueue(remoteProcessId);
496 debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " <<
497 remoteProcessId << " at " << out.size());
498 return out.peek(value);
0a11e039
AR
499}
500
15cdbc7c
DK
501} // namespace Ipc
502
9a51593d 503#endif // SQUID_IPC_QUEUE_H
f53969cc 504