]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.h
Boilerplate: update copyright blurbs on src/
[thirdparty/squid.git] / src / ipc / Queue.h
1 /*
2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 #ifndef SQUID_IPC_QUEUE_H
10 #define SQUID_IPC_QUEUE_H
11
12 #include "base/InstanceId.h"
13 #include "Debug.h"
14 #include "ipc/AtomicWord.h"
15 #include "ipc/mem/FlexibleArray.h"
16 #include "ipc/mem/Pointer.h"
17 #include "util.h"
18
19 class String;
20
21 namespace Ipc
22 {
23
24 /// State of the reading end of a queue (i.e., of the code calling pop()).
25 /// Multiple queues attached to one reader share this state.
26 class QueueReader
27 {
28 public:
29 QueueReader(); // the initial state is "blocked without a signal"
30
31 /// whether the reader is waiting for a notification signal
32 bool blocked() const { return popBlocked == 1; }
33
34 /// marks the reader as blocked, waiting for a notification signal
35 void block() { popBlocked.swap_if(0, 1); }
36
37 /// removes the block() effects
38 void unblock() { popBlocked.swap_if(1, 0); }
39
40 /// if reader is blocked and not notified, marks the notification signal
41 /// as sent and not received, returning true; otherwise, returns false
42 bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); }
43
44 /// marks sent reader notification as received (also removes pop blocking)
45 void clearSignal() { unblock(); popSignal.swap_if(1,0); }
46
47 private:
48 Atomic::Word popBlocked; ///< whether the reader is blocked on pop()
49 Atomic::Word popSignal; ///< whether writer has sent and reader has not received notification
50
51 public:
52 typedef Atomic::Word Rate; ///< pop()s per second
53 Rate rateLimit; ///< pop()s per second limit if positive
54
55 // we need a signed atomic type because balance may get negative
56 typedef Atomic::WordT<int> AtomicSignedMsec;
57 typedef AtomicSignedMsec Balance;
58 /// how far ahead the reader is compared to a perfect read/sec event rate
59 Balance balance;
60
61 /// unique ID for debugging which reader is used (works across processes)
62 const InstanceId<QueueReader> id;
63 };
64
65 /// shared array of QueueReaders
66 class QueueReaders
67 {
68 public:
69 QueueReaders(const int aCapacity);
70 size_t sharedMemorySize() const;
71 static size_t SharedMemorySize(const int capacity);
72
73 const int theCapacity; /// number of readers
74 Ipc::Mem::FlexibleArray<QueueReader> theReaders; /// readers
75 };
76
77 /**
78 * Lockless fixed-capacity queue for a single writer and a single reader.
79 *
80 * If the queue is empty, the reader is considered "blocked" and needs
81 * an out-of-band notification message to notice the next pushed item.
82 *
83 * Current implementation assumes that the writer cannot get blocked: if the
84 * queue is full, the writer will just not push and come back later (with a
85 * different value). We can add support for blocked writers if needed.
86 */
87 class OneToOneUniQueue
88 {
89 public:
90 // pop() and push() exceptions; TODO: use TextException instead
91 class Full {};
92 class ItemTooLarge {};
93
94 OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity);
95
96 unsigned int maxItemSize() const { return theMaxItemSize; }
97 int size() const { return theSize; }
98 int capacity() const { return theCapacity; }
99 int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); }
100
101 bool empty() const { return !theSize; }
102 bool full() const { return theSize == theCapacity; }
103
104 static int Bytes2Items(const unsigned int maxItemSize, int size);
105 static int Items2Bytes(const unsigned int maxItemSize, const int size);
106
107 /// returns true iff the value was set; [un]blocks the reader as needed
108 template<class Value> bool pop(Value &value, QueueReader *const reader = NULL);
109
110 /// returns true iff the caller must notify the reader of the pushed item
111 template<class Value> bool push(const Value &value, QueueReader *const reader = NULL);
112
113 /// returns true iff the value was set; the value may be stale!
114 template<class Value> bool peek(Value &value) const;
115
116 private:
117
118 unsigned int theIn; ///< input index, used only in push()
119 unsigned int theOut; ///< output index, used only in pop()
120
121 Atomic::Word theSize; ///< number of items in the queue
122 const unsigned int theMaxItemSize; ///< maximum item size
123 const int theCapacity; ///< maximum number of items, i.e. theBuffer size
124
125 char theBuffer[];
126 };
127
128 /// shared array of OneToOneUniQueues
129 class OneToOneUniQueues
130 {
131 public:
132 OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity);
133
134 size_t sharedMemorySize() const;
135 static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity);
136
137 const OneToOneUniQueue &operator [](const int index) const;
138 inline OneToOneUniQueue &operator [](const int index);
139
140 private:
141 inline const OneToOneUniQueue &front() const;
142
143 public:
144 const int theCapacity; /// number of OneToOneUniQueues
145 };
146
147 /**
148 * Base class for lockless fixed-capacity bidirectional queues for a
149 * limited number processes.
150 */
151 class BaseMultiQueue
152 {
153 public:
154 BaseMultiQueue(const int aLocalProcessId);
155 virtual ~BaseMultiQueue() {}
156
157 /// clears the reader notification received by the local process from the remote process
158 void clearReaderSignal(const int remoteProcessId);
159
160 /// picks a process and calls OneToOneUniQueue::pop() using its queue
161 template <class Value> bool pop(int &remoteProcessId, Value &value);
162
163 /// calls OneToOneUniQueue::push() using the given process queue
164 template <class Value> bool push(const int remoteProcessId, const Value &value);
165
166 /// peeks at the item likely to be pop()ed next
167 template<class Value> bool peek(int &remoteProcessId, Value &value) const;
168
169 /// returns local reader's balance
170 QueueReader::Balance &localBalance() { return localReader().balance; }
171
172 /// returns reader's balance for a given remote process
173 const QueueReader::Balance &balance(const int remoteProcessId) const;
174
175 /// returns local reader's rate limit
176 QueueReader::Rate &localRateLimit() { return localReader().rateLimit; }
177
178 /// returns reader's rate limit for a given remote process
179 const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
180
181 /// number of items in incoming queue from a given remote process
182 int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
183
184 /// number of items in outgoing queue to a given remote process
185 int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
186
187 protected:
188 /// incoming queue from a given remote process
189 virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0;
190 OneToOneUniQueue &inQueue(const int remoteProcessId);
191
192 /// outgoing queue to a given remote process
193 virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0;
194 OneToOneUniQueue &outQueue(const int remoteProcessId);
195
196 virtual const QueueReader &localReader() const = 0;
197 QueueReader &localReader();
198
199 virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0;
200 QueueReader &remoteReader(const int remoteProcessId);
201
202 virtual int remotesCount() const = 0;
203 virtual int remotesIdOffset() const = 0;
204
205 protected:
206 const int theLocalProcessId; ///< process ID of this queue
207
208 private:
209 int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
210 };
211
212 /**
213 * Lockless fixed-capacity bidirectional queue for a limited number
214 * processes. Allows communication between two groups of processes:
215 * any process in one group may send data to and receive from any
216 * process in another group, but processes in the same group can not
217 * communicate. Process in each group has a unique integer ID in
218 * [groupIdOffset, groupIdOffset + groupSize) range.
219 */
220 class FewToFewBiQueue: public BaseMultiQueue
221 {
222 public:
223 typedef OneToOneUniQueue::Full Full;
224 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
225
226 private:
227 /// Shared metadata for FewToFewBiQueue
228 struct Metadata {
229 Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset);
230 size_t sharedMemorySize() const { return sizeof(*this); }
231 static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); }
232
233 const int theGroupASize;
234 const int theGroupAIdOffset;
235 const int theGroupBSize;
236 const int theGroupBIdOffset;
237 };
238
239 public:
240 class Owner
241 {
242 public:
243 Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
244 ~Owner();
245
246 private:
247 Mem::Owner<Metadata> *const metadataOwner;
248 Mem::Owner<OneToOneUniQueues> *const queuesOwner;
249 Mem::Owner<QueueReaders> *const readersOwner;
250 };
251
252 static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
253
254 enum Group { groupA = 0, groupB = 1 };
255 FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId);
256
257 /// maximum number of items in the queue
258 static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity);
259
260 /// finds the oldest item in incoming and outgoing queues between
261 /// us and the given remote process
262 template<class Value> bool findOldest(const int remoteProcessId, Value &value) const;
263
264 protected:
265 virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
266 virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
267 virtual const QueueReader &localReader() const;
268 virtual const QueueReader &remoteReader(const int processId) const;
269 virtual int remotesCount() const;
270 virtual int remotesIdOffset() const;
271
272 private:
273 bool validProcessId(const Group group, const int processId) const;
274 int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
275 const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
276 int readerIndex(const Group group, const int processId) const;
277 Group localGroup() const { return theLocalGroup; }
278 Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
279
280 private:
281 const Mem::Pointer<Metadata> metadata; ///< shared metadata
282 const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
283 const Mem::Pointer<QueueReaders> readers; ///< readers array
284
285 const Group theLocalGroup; ///< group of this queue
286 };
287
288 /**
289 * Lockless fixed-capacity bidirectional queue for a limited number
290 * processes. Any process may send data to and receive from any other
291 * process (including itself). Each process has a unique integer ID in
292 * [processIdOffset, processIdOffset + processCount) range.
293 */
294 class MultiQueue: public BaseMultiQueue
295 {
296 public:
297 typedef OneToOneUniQueue::Full Full;
298 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
299
300 private:
301 /// Shared metadata for MultiQueue
302 struct Metadata {
303 Metadata(const int aProcessCount, const int aProcessIdOffset);
304 size_t sharedMemorySize() const { return sizeof(*this); }
305 static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); }
306
307 const int theProcessCount;
308 const int theProcessIdOffset;
309 };
310
311 public:
312 class Owner
313 {
314 public:
315 Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
316 ~Owner();
317
318 private:
319 Mem::Owner<Metadata> *const metadataOwner;
320 Mem::Owner<OneToOneUniQueues> *const queuesOwner;
321 Mem::Owner<QueueReaders> *const readersOwner;
322 };
323
324 static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
325
326 MultiQueue(const String &id, const int localProcessId);
327
328 protected:
329 virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const;
330 virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const;
331 virtual const QueueReader &localReader() const;
332 virtual const QueueReader &remoteReader(const int remoteProcessId) const;
333 virtual int remotesCount() const;
334 virtual int remotesIdOffset() const;
335
336 private:
337 bool validProcessId(const int processId) const;
338 const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const;
339 const QueueReader &reader(const int processId) const;
340
341 private:
342 const Mem::Pointer<Metadata> metadata; ///< shared metadata
343 const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
344 const Mem::Pointer<QueueReaders> readers; ///< readers array
345 };
346
347 // OneToOneUniQueue
348
349 template <class Value>
350 bool
351 OneToOneUniQueue::pop(Value &value, QueueReader *const reader)
352 {
353 if (sizeof(value) > theMaxItemSize)
354 throw ItemTooLarge();
355
356 // A writer might push between the empty test and block() below, so we do
357 // not return false right after calling block(), but test again.
358 if (empty()) {
359 if (!reader)
360 return false;
361
362 reader->block();
363 // A writer might push between the empty test and block() below,
364 // so we must test again as such a writer will not signal us.
365 if (empty())
366 return false;
367 }
368
369 if (reader)
370 reader->unblock();
371
372 const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize;
373 memcpy(&value, theBuffer + pos, sizeof(value));
374 --theSize;
375
376 return true;
377 }
378
379 template <class Value>
380 bool
381 OneToOneUniQueue::peek(Value &value) const
382 {
383 if (sizeof(value) > theMaxItemSize)
384 throw ItemTooLarge();
385
386 if (empty())
387 return false;
388
389 // the reader may pop() before we copy; making this method imprecise
390 const unsigned int pos = (theOut % theCapacity) * theMaxItemSize;
391 memcpy(&value, theBuffer + pos, sizeof(value));
392 return true;
393 }
394
395 template <class Value>
396 bool
397 OneToOneUniQueue::push(const Value &value, QueueReader *const reader)
398 {
399 if (sizeof(value) > theMaxItemSize)
400 throw ItemTooLarge();
401
402 if (full())
403 throw Full();
404
405 const unsigned int pos = theIn++ % theCapacity * theMaxItemSize;
406 memcpy(theBuffer + pos, &value, sizeof(value));
407 const bool wasEmpty = !theSize++;
408
409 return wasEmpty && (!reader || reader->raiseSignal());
410 }
411
412 // OneToOneUniQueues
413
414 inline OneToOneUniQueue &
415 OneToOneUniQueues::operator [](const int index)
416 {
417 return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]);
418 }
419
420 inline const OneToOneUniQueue &
421 OneToOneUniQueues::front() const
422 {
423 const char *const queue =
424 reinterpret_cast<const char *>(this) + sizeof(*this);
425 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
426 }
427
428 // BaseMultiQueue
429
430 template <class Value>
431 bool
432 BaseMultiQueue::pop(int &remoteProcessId, Value &value)
433 {
434 // iterate all remote processes, starting after the one we visited last
435 for (int i = 0; i < remotesCount(); ++i) {
436 if (++theLastPopProcessId >= remotesIdOffset() + remotesCount())
437 theLastPopProcessId = remotesIdOffset();
438 OneToOneUniQueue &queue = inQueue(theLastPopProcessId);
439 if (queue.pop(value, &localReader())) {
440 remoteProcessId = theLastPopProcessId;
441 debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
442 return true;
443 }
444 }
445 return false; // no process had anything to pop
446 }
447
448 template <class Value>
449 bool
450 BaseMultiQueue::push(const int remoteProcessId, const Value &value)
451 {
452 OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId);
453 QueueReader &reader = remoteReader(remoteProcessId);
454 debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
455 return remoteQueue.push(value, &reader);
456 }
457
458 template <class Value>
459 bool
460 BaseMultiQueue::peek(int &remoteProcessId, Value &value) const
461 {
462 // mimic FewToFewBiQueue::pop() but quit just before popping
463 int popProcessId = theLastPopProcessId; // preserve for future pop()
464 for (int i = 0; i < remotesCount(); ++i) {
465 if (++popProcessId >= remotesIdOffset() + remotesCount())
466 popProcessId = remotesIdOffset();
467 const OneToOneUniQueue &queue = inQueue(popProcessId);
468 if (queue.peek(value)) {
469 remoteProcessId = popProcessId;
470 return true;
471 }
472 }
473 return false; // most likely, no process had anything to pop
474 }
475
476 // FewToFewBiQueue
477
478 template <class Value>
479 bool
480 FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const
481 {
482 // we may be called before remote process configured its queue end
483 if (!validProcessId(remoteGroup(), remoteProcessId))
484 return false;
485
486 // we need the oldest value, so start with the incoming, them-to-us queue:
487 const OneToOneUniQueue &in = inQueue(remoteProcessId);
488 debugs(54, 2, HERE << "peeking from " << remoteProcessId << " to " <<
489 theLocalProcessId << " at " << in.size());
490 if (in.peek(value))
491 return true;
492
493 // if the incoming queue is empty, check the outgoing, us-to-them queue:
494 const OneToOneUniQueue &out = outQueue(remoteProcessId);
495 debugs(54, 2, HERE << "peeking from " << theLocalProcessId << " to " <<
496 remoteProcessId << " at " << out.size());
497 return out.peek(value);
498 }
499
500 } // namespace Ipc
501
502 #endif // SQUID_IPC_QUEUE_H