]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ipc/Queue.h
Bug 3159: delay pools --disable-auth compile problems
[thirdparty/squid.git] / src / ipc / Queue.h
CommitLineData
9a51593d
DK
1/*
2 * $Id$
3 *
4 */
5
6#ifndef SQUID_IPC_QUEUE_H
7#define SQUID_IPC_QUEUE_H
8
9#include "Array.h"
d5d5493b 10#include "Debug.h"
fa61cefe 11#include "base/InstanceId.h"
9a51593d 12#include "ipc/AtomicWord.h"
68353d5a 13#include "ipc/mem/Pointer.h"
9a51593d
DK
14#include "util.h"
15
b2aa0934
DK
16class String;
17
15cdbc7c
DK
18namespace Ipc {
19
fa61cefe
AR
20/// State of the reading end of a queue (i.e., of the code calling pop()).
21/// Multiple queues attached to one reader share this state.
22class QueueReader {
23public:
24 QueueReader(); // the initial state is "blocked without a signal"
25
26 /// whether the reader is waiting for a notification signal
27 bool blocked() const { return popBlocked == 1; }
28
29 /// marks the reader as blocked, waiting for a notification signal
30 void block() { popBlocked.swap_if(0, 1); }
31
32 /// removes the block() effects
33 void unblock() { popBlocked.swap_if(1, 0); }
34
35 /// if reader is blocked and not notified, marks the notification signal
36 /// as sent and not received, returning true; otherwise, returns false
37 bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); }
38
39 /// marks sent reader notification as received (also removes pop blocking)
40 void clearSignal() { unblock(); popSignal.swap_if(1,0); }
41
42private:
43 AtomicWord popBlocked; ///< whether the reader is blocked on pop()
44 AtomicWord popSignal; ///< whether writer has sent and reader has not received notification
45
46public:
47 /// unique ID for debugging which reader is used (works across processes)
48 const InstanceId<QueueReader> id;
49};
50
68353d5a
DK
51/// shared array of QueueReaders
52class QueueReaders {
53public:
54 QueueReaders(const int aCapacity);
55 size_t sharedMemorySize() const;
56 static size_t SharedMemorySize(const int capacity);
57
58 const int theCapacity; /// number of readers
59 QueueReader theReaders[]; /// readers
60};
fa61cefe
AR
61
62/**
63 * Lockless fixed-capacity queue for a single writer and a single reader.
64 *
65 * If the queue is empty, the reader is considered "blocked" and needs
66 * an out-of-band notification message to notice the next pushed item.
67 *
68 * Current implementation assumes that the writer cannot get blocked: if the
69 * queue is full, the writer will just not push and come back later (with a
70 * different value). We can add support for blocked writers if needed.
71 */
9a51593d
DK
72class OneToOneUniQueue {
73public:
fa61cefe 74 // pop() and push() exceptions; TODO: use TextException instead
7a907247 75 class Full {};
b2aa0934
DK
76 class ItemTooLarge {};
77
f5591061 78 OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity);
68353d5a 79
f5591061
DK
80 unsigned int maxItemSize() const { return theMaxItemSize; }
81 int size() const { return theSize; }
82 int capacity() const { return theCapacity; }
83 int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); }
9a51593d 84
f5591061
DK
85 bool empty() const { return !theSize; }
86 bool full() const { return theSize == theCapacity; }
9a51593d 87
b2aa0934
DK
88 static int Bytes2Items(const unsigned int maxItemSize, int size);
89 static int Items2Bytes(const unsigned int maxItemSize, const int size);
90
fa61cefe 91 /// returns true iff the value was set; [un]blocks the reader as needed
f5591061 92 template<class Value> bool pop(Value &value, QueueReader *const reader = NULL);
fa61cefe
AR
93
94 /// returns true iff the caller must notify the reader of the pushed item
f5591061 95 template<class Value> bool push(const Value &value, QueueReader *const reader = NULL);
9a51593d 96
9a51593d 97private:
b2aa0934 98
f5591061
DK
99 unsigned int theIn; ///< input index, used only in push()
100 unsigned int theOut; ///< output index, used only in pop()
68353d5a 101
f5591061
DK
102 AtomicWord theSize; ///< number of items in the queue
103 const unsigned int theMaxItemSize; ///< maximum item size
104 const int theCapacity; ///< maximum number of items, i.e. theBuffer size
7a907247 105
f5591061
DK
106 char theBuffer[];
107};
68353d5a 108
f5591061
DK
109/// shared array of OneToOneUniQueues
110class OneToOneUniQueues {
111public:
112 OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity);
68353d5a 113
f5591061
DK
114 size_t sharedMemorySize() const;
115 static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity);
9a51593d 116
f5591061
DK
117 const OneToOneUniQueue &operator [](const int index) const;
118 inline OneToOneUniQueue &operator [](const int index);
9a51593d 119
f5591061
DK
120private:
121 inline const OneToOneUniQueue &front() const;
fa61cefe 122
f5591061
DK
123public:
124 const int theCapacity; /// number of OneToOneUniQueues
9a51593d
DK
125};
126
127/**
128 * Lockless fixed-capacity bidirectional queue for a limited number
f5591061
DK
129 * processes. Allows communication between two groups of processes:
130 * any process in one group may send data to and receive from any
131 * process in another group, but processes in the same group can not
132 * communicate. Process in each group has a unique integer ID in
133 * [groupIdOffset, groupIdOffset + groupSize) range.
9a51593d 134 */
f5591061 135class FewToFewBiQueue {
9a51593d 136public:
f5591061
DK
137 typedef OneToOneUniQueue::Full Full;
138 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
139
140private:
141 /// Shared metadata for FewToFewBiQueue
142 struct Metadata {
143 Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset);
144 size_t sharedMemorySize() const { return sizeof(*this); }
145 static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); }
146
147 const int theGroupASize;
148 const int theGroupAIdOffset;
149 const int theGroupBSize;
150 const int theGroupBIdOffset;
151 };
7a907247 152
f5591061 153public:
68353d5a
DK
154 class Owner {
155 public:
f5591061 156 Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
68353d5a
DK
157 ~Owner();
158
159 private:
f5591061
DK
160 Mem::Owner<Metadata> *const metadataOwner;
161 Mem::Owner<OneToOneUniQueues> *const queuesOwner;
15cdbc7c 162 Mem::Owner<QueueReaders> *const readersOwner;
68353d5a
DK
163 };
164
f5591061 165 static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
68353d5a 166
f5591061
DK
167 enum Group { groupA = 0, groupB = 1 };
168 FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId);
9a51593d 169
f5591061
DK
170 Group localGroup() const { return theLocalGroup; }
171 Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
9a51593d 172
f5591061
DK
173 /// clears the reader notification received by the local process from the remote process
174 void clearReaderSignal(const int remoteProcessId);
9a51593d 175
f5591061
DK
176 /// picks a process and calls OneToOneUniQueue::pop() using its queue
177 template <class Value> bool pop(int &remoteProcessId, Value &value);
fa61cefe 178
f5591061
DK
179 /// calls OneToOneUniQueue::push() using the given process queue
180 template <class Value> bool push(const int remoteProcessId, const Value &value);
fa61cefe 181
f5591061
DK
182private:
183 bool validProcessId(const Group group, const int processId) const;
184 OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId);
185 QueueReader &reader(const Group group, const int processId);
186 int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; }
187 int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; }
fa61cefe 188
f5591061
DK
189private:
190 const Mem::Pointer<Metadata> metadata; ///< shared metadata
191 const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
15cdbc7c 192 const Mem::Pointer<QueueReaders> readers; ///< readers array
5e44782e 193
f5591061
DK
194 const Group theLocalGroup; ///< group of this queue
195 const int theLocalProcessId; ///< process ID of this queue
196 int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
9a51593d
DK
197};
198
199
200// OneToOneUniQueue
201
9a51593d
DK
202template <class Value>
203bool
f5591061 204OneToOneUniQueue::pop(Value &value, QueueReader *const reader)
9a51593d 205{
f5591061 206 if (sizeof(value) > theMaxItemSize)
b2aa0934
DK
207 throw ItemTooLarge();
208
fa61cefe
AR
209 // A writer might push between the empty test and block() below, so we do
210 // not return false right after calling block(), but test again.
211 if (empty()) {
f5591061
DK
212 if (!reader)
213 return false;
214
215 reader->block();
fa61cefe
AR
216 // A writer might push between the empty test and block() below,
217 // so we must test again as such a writer will not signal us.
218 if (empty())
219 return false;
220 }
9a51593d 221
f5591061
DK
222 if (reader)
223 reader->unblock();
224
225 const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize;
226 memcpy(&value, theBuffer + pos, sizeof(value));
227 --theSize;
fa61cefe
AR
228
229 return true;
9a51593d
DK
230}
231
232template <class Value>
233bool
f5591061 234OneToOneUniQueue::push(const Value &value, QueueReader *const reader)
9a51593d 235{
f5591061 236 if (sizeof(value) > theMaxItemSize)
b2aa0934
DK
237 throw ItemTooLarge();
238
9a51593d 239 if (full())
7a907247 240 throw Full();
9a51593d 241
7a907247 242 const bool wasEmpty = empty();
f5591061
DK
243 const unsigned int pos = theIn++ % theCapacity * theMaxItemSize;
244 memcpy(theBuffer + pos, &value, sizeof(value));
245 ++theSize;
246
247 return wasEmpty && (!reader || reader->raiseSignal());
248}
249
fa61cefe 250
f5591061
DK
251// OneToOneUniQueues
252
253inline OneToOneUniQueue &
254OneToOneUniQueues::operator [](const int index)
255{
256 return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]);
257}
258
259inline const OneToOneUniQueue &
260OneToOneUniQueues::front() const
261{
262 const char *const queue =
263 reinterpret_cast<const char *>(this) + sizeof(*this);
264 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
9a51593d
DK
265}
266
9a51593d 267
f5591061 268// FewToFewBiQueue
9a51593d 269
9a51593d
DK
270template <class Value>
271bool
f5591061 272FewToFewBiQueue::pop(int &remoteProcessId, Value &value)
9a51593d 273{
f5591061
DK
274 // iterate all remote group processes, starting after the one we visited last
275 QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
276 for (int i = 0; i < remoteGroupSize(); ++i) {
277 if (++theLastPopProcessId >= remoteGroupIdOffset() + remoteGroupSize())
278 theLastPopProcessId = remoteGroupIdOffset();
279 OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), theLastPopProcessId, theLocalGroup, theLocalProcessId);
280 if (queue.pop(value, &localReader)) {
281 remoteProcessId = theLastPopProcessId;
282 debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
fa61cefe
AR
283 return true;
284 }
9a51593d 285 }
f5591061 286 return false; // no process had anything to pop
9a51593d
DK
287}
288
289template <class Value>
290bool
f5591061 291FewToFewBiQueue::push(const int remoteProcessId, const Value &value)
9a51593d 292{
f5591061
DK
293 OneToOneUniQueue &remoteQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId);
294 QueueReader &remoteReader = reader(remoteGroup(), remoteProcessId);
295 debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
296 return remoteQueue.push(value, &remoteReader);
9a51593d
DK
297}
298
15cdbc7c
DK
299} // namespace Ipc
300
9a51593d 301#endif // SQUID_IPC_QUEUE_H