]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.h
Adjust FewToOneBiQueue to use IDs in [1, workerCount] range.
[thirdparty/squid.git] / src / ipc / Queue.h
1 /*
2 * $Id$
3 *
4 */
5
6 #ifndef SQUID_IPC_QUEUE_H
7 #define SQUID_IPC_QUEUE_H
8
9 #include "Array.h"
10 #include "base/InstanceId.h"
11 #include "ipc/AtomicWord.h"
12 #include "ipc/mem/Segment.h"
13 #include "util.h"
14
15 class String;
16
17 /// State of the reading end of a queue (i.e., of the code calling pop()).
18 /// Multiple queues attached to one reader share this state.
19 class QueueReader {
20 public:
21 QueueReader(); // the initial state is "blocked without a signal"
22
23 /// whether the reader is waiting for a notification signal
24 bool blocked() const { return popBlocked == 1; }
25
26 /// marks the reader as blocked, waiting for a notification signal
27 void block() { popBlocked.swap_if(0, 1); }
28
29 /// removes the block() effects
30 void unblock() { popBlocked.swap_if(1, 0); }
31
32 /// if reader is blocked and not notified, marks the notification signal
33 /// as sent and not received, returning true; otherwise, returns false
34 bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); }
35
36 /// marks sent reader notification as received (also removes pop blocking)
37 void clearSignal() { unblock(); popSignal.swap_if(1,0); }
38
39 private:
40 AtomicWord popBlocked; ///< whether the reader is blocked on pop()
41 AtomicWord popSignal; ///< whether writer has sent and reader has not received notification
42
43 public:
44 /// unique ID for debugging which reader is used (works across processes)
45 const InstanceId<QueueReader> id;
46 };
47
48
49 /**
50 * Lockless fixed-capacity queue for a single writer and a single reader.
51 *
52 * If the queue is empty, the reader is considered "blocked" and needs
53 * an out-of-band notification message to notice the next pushed item.
54 *
55 * Current implementation assumes that the writer cannot get blocked: if the
56 * queue is full, the writer will just not push and come back later (with a
57 * different value). We can add support for blocked writers if needed.
58 */
59 class OneToOneUniQueue {
60 public:
61 // pop() and push() exceptions; TODO: use TextException instead
62 class Full {};
63 class ItemTooLarge {};
64
65 OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity);
66 OneToOneUniQueue(const String &id);
67
68 unsigned int maxItemSize() const { return shared->theMaxItemSize; }
69 int size() const { return shared->theSize; }
70 int capacity() const { return shared->theCapacity; }
71
72 bool empty() const { return !shared->theSize; }
73 bool full() const { return shared->theSize == shared->theCapacity; }
74
75 static int Bytes2Items(const unsigned int maxItemSize, int size);
76 static int Items2Bytes(const unsigned int maxItemSize, const int size);
77
78 /// returns true iff the value was set; [un]blocks the reader as needed
79 template<class Value> bool pop(Value &value);
80
81 /// returns true iff the caller must notify the reader of the pushed item
82 template<class Value> bool push(const Value &value);
83
84 QueueReader &reader();
85 void reader(QueueReader *aReader);
86
87 private:
88 struct Shared {
89 Shared(const unsigned int aMaxItemSize, const int aCapacity);
90
91 unsigned int theIn; ///< input index, used only in push()
92 unsigned int theOut; ///< output index, used only in pop()
93
94 AtomicWord theSize; ///< number of items in the queue
95 const unsigned int theMaxItemSize; ///< maximum item size
96 const int theCapacity; ///< maximum number of items, i.e. theBuffer size
97
98 char theBuffer[];
99 };
100
101 Ipc::Mem::Segment shm; ///< shared memory segment
102 Shared *shared; ///< pointer to shared memory
103 QueueReader *reader_; ///< the state of the code popping from this queue
104 };
105
106 /// Lockless fixed-capacity bidirectional queue for two processes.
107 class OneToOneBiQueue {
108 public:
109 typedef OneToOneUniQueue::Full Full;
110 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
111
112 /// Create a new shared queue.
113 OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity);
114 OneToOneBiQueue(const String &id); ///< Attach to existing shared queue.
115
116 void readers(QueueReader *r1, QueueReader *r2);
117 void clearReaderSignal();
118
119 /* wrappers to call the right OneToOneUniQueue method for this process */
120 template<class Value> bool pop(Value &value) { return popQueue->pop(value); }
121 template<class Value> bool push(const Value &value) { return pushQueue->push(value); }
122
123 //private:
124 OneToOneUniQueue *const popQueue; ///< queue to pop from for this process
125 OneToOneUniQueue *const pushQueue; ///< queue to push to for this process
126 };
127
128 /**
129 * Lockless fixed-capacity bidirectional queue for a limited number
130 * pricesses. Implements a star topology: Many worker processes
131 * communicate with the one central process. The central process uses
132 * FewToOneBiQueue object, while workers use OneToOneBiQueue objects
133 * created with the Attach() method. Each worker has a unique integer
134 * ID in [1, workerCount] range.
135 */
136 class FewToOneBiQueue {
137 public:
138 typedef OneToOneBiQueue::Full Full;
139 typedef OneToOneBiQueue::ItemTooLarge ItemTooLarge;
140
141 FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity);
142 static OneToOneBiQueue *Attach(const String &id, const int workerId);
143 ~FewToOneBiQueue();
144
145 bool validWorkerId(const int workerId) const;
146 int workerCount() const { return theWorkerCount; }
147
148 /// clears the reader notification received by the disker from worker
149 void clearReaderSignal(int workerId);
150
151 /// picks a worker and calls OneToOneUniQueue::pop() using its queue
152 template <class Value> bool pop(int &workerId, Value &value);
153
154 /// calls OneToOneUniQueue::push() using the given worker queue
155 template <class Value> bool push(const int workerId, const Value &value);
156
157 //private: XXX: make private by moving pop/push debugging into pop/push
158 int theLastPopWorker; ///< the ID of the last worker we tried to pop() from
159 Vector<OneToOneBiQueue *> biQueues; ///< worker queues indexed by worker ID
160 const int theWorkerCount; ///< the total number of workers
161
162 Ipc::Mem::Segment shm; ///< shared memory segment to store the reader
163 QueueReader *reader; ///< the state of the code popping from all biQueues
164
165 enum { WorkerIdOffset = 1 }; ///< worker ID offset, always 1 for now
166 };
167
168
169 // OneToOneUniQueue
170
171 template <class Value>
172 bool
173 OneToOneUniQueue::pop(Value &value)
174 {
175 if (sizeof(value) > shared->theMaxItemSize)
176 throw ItemTooLarge();
177
178 // A writer might push between the empty test and block() below, so we do
179 // not return false right after calling block(), but test again.
180 if (empty()) {
181 reader().block();
182 // A writer might push between the empty test and block() below,
183 // so we must test again as such a writer will not signal us.
184 if (empty())
185 return false;
186 }
187
188 reader().unblock();
189 const unsigned int pos =
190 (shared->theOut++ % shared->theCapacity) * shared->theMaxItemSize;
191 memcpy(&value, shared->theBuffer + pos, sizeof(value));
192 --shared->theSize;
193
194 return true;
195 }
196
197 template <class Value>
198 bool
199 OneToOneUniQueue::push(const Value &value)
200 {
201 if (sizeof(value) > shared->theMaxItemSize)
202 throw ItemTooLarge();
203
204 if (full())
205 throw Full();
206
207 const bool wasEmpty = empty();
208 const unsigned int pos =
209 shared->theIn++ % shared->theCapacity * shared->theMaxItemSize;
210 memcpy(shared->theBuffer + pos, &value, sizeof(value));
211 ++shared->theSize;
212
213 return wasEmpty && reader().raiseSignal();
214 }
215
216
217 // FewToOneBiQueue
218
219 template <class Value>
220 bool
221 FewToOneBiQueue::pop(int &workerId, Value &value)
222 {
223 // iterate all workers, starting after the one we visited last
224 for (int i = 0; i < theWorkerCount; ++i) {
225 theLastPopWorker = (theLastPopWorker + 1) % theWorkerCount;
226 if (biQueues[theLastPopWorker]->pop(value)) {
227 workerId = theLastPopWorker + WorkerIdOffset;
228 return true;
229 }
230 }
231 return false; // no worker had anything to pop
232 }
233
234 template <class Value>
235 bool
236 FewToOneBiQueue::push(const int workerId, const Value &value)
237 {
238 assert(validWorkerId(workerId));
239 return biQueues[workerId - WorkerIdOffset]->push(value);
240 }
241
242 #endif // SQUID_IPC_QUEUE_H