]>
Commit | Line | Data |
---|---|---|
9a51593d DK |
1 | /* |
2 | * $Id$ | |
3 | * | |
4 | */ | |
5 | ||
6 | #ifndef SQUID_IPC_QUEUE_H | |
7 | #define SQUID_IPC_QUEUE_H | |
8 | ||
9 | #include "Array.h" | |
d5d5493b | 10 | #include "Debug.h" |
fa61cefe | 11 | #include "base/InstanceId.h" |
9a51593d | 12 | #include "ipc/AtomicWord.h" |
68353d5a | 13 | #include "ipc/mem/Pointer.h" |
9a51593d DK |
14 | #include "util.h" |
15 | ||
b2aa0934 DK |
16 | class String; |
17 | ||
15cdbc7c DK |
18 | namespace Ipc { |
19 | ||
fa61cefe AR |
20 | /// State of the reading end of a queue (i.e., of the code calling pop()). |
21 | /// Multiple queues attached to one reader share this state. | |
22 | class QueueReader { | |
23 | public: | |
24 | QueueReader(); // the initial state is "blocked without a signal" | |
25 | ||
26 | /// whether the reader is waiting for a notification signal | |
27 | bool blocked() const { return popBlocked == 1; } | |
28 | ||
29 | /// marks the reader as blocked, waiting for a notification signal | |
30 | void block() { popBlocked.swap_if(0, 1); } | |
31 | ||
32 | /// removes the block() effects | |
33 | void unblock() { popBlocked.swap_if(1, 0); } | |
34 | ||
35 | /// if reader is blocked and not notified, marks the notification signal | |
36 | /// as sent and not received, returning true; otherwise, returns false | |
37 | bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); } | |
38 | ||
39 | /// marks sent reader notification as received (also removes pop blocking) | |
40 | void clearSignal() { unblock(); popSignal.swap_if(1,0); } | |
41 | ||
42 | private: | |
43 | AtomicWord popBlocked; ///< whether the reader is blocked on pop() | |
44 | AtomicWord popSignal; ///< whether writer has sent and reader has not received notification | |
45 | ||
46 | public: | |
47 | /// unique ID for debugging which reader is used (works across processes) | |
48 | const InstanceId<QueueReader> id; | |
49 | }; | |
50 | ||
68353d5a DK |
51 | /// shared array of QueueReaders |
52 | class QueueReaders { | |
53 | public: | |
54 | QueueReaders(const int aCapacity); | |
55 | size_t sharedMemorySize() const; | |
56 | static size_t SharedMemorySize(const int capacity); | |
57 | ||
58 | const int theCapacity; /// number of readers | |
59 | QueueReader theReaders[]; /// readers | |
60 | }; | |
fa61cefe AR |
61 | |
62 | /** | |
63 | * Lockless fixed-capacity queue for a single writer and a single reader. | |
64 | * | |
65 | * If the queue is empty, the reader is considered "blocked" and needs | |
66 | * an out-of-band notification message to notice the next pushed item. | |
67 | * | |
68 | * Current implementation assumes that the writer cannot get blocked: if the | |
69 | * queue is full, the writer will just not push and come back later (with a | |
70 | * different value). We can add support for blocked writers if needed. | |
71 | */ | |
9a51593d DK |
72 | class OneToOneUniQueue { |
73 | public: | |
fa61cefe | 74 | // pop() and push() exceptions; TODO: use TextException instead |
7a907247 | 75 | class Full {}; |
b2aa0934 DK |
76 | class ItemTooLarge {}; |
77 | ||
f5591061 | 78 | OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity); |
68353d5a | 79 | |
f5591061 DK |
80 | unsigned int maxItemSize() const { return theMaxItemSize; } |
81 | int size() const { return theSize; } | |
82 | int capacity() const { return theCapacity; } | |
83 | int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); } | |
9a51593d | 84 | |
f5591061 DK |
85 | bool empty() const { return !theSize; } |
86 | bool full() const { return theSize == theCapacity; } | |
9a51593d | 87 | |
b2aa0934 DK |
88 | static int Bytes2Items(const unsigned int maxItemSize, int size); |
89 | static int Items2Bytes(const unsigned int maxItemSize, const int size); | |
90 | ||
fa61cefe | 91 | /// returns true iff the value was set; [un]blocks the reader as needed |
f5591061 | 92 | template<class Value> bool pop(Value &value, QueueReader *const reader = NULL); |
fa61cefe AR |
93 | |
94 | /// returns true iff the caller must notify the reader of the pushed item | |
f5591061 | 95 | template<class Value> bool push(const Value &value, QueueReader *const reader = NULL); |
9a51593d | 96 | |
9a51593d | 97 | private: |
b2aa0934 | 98 | |
f5591061 DK |
99 | unsigned int theIn; ///< input index, used only in push() |
100 | unsigned int theOut; ///< output index, used only in pop() | |
68353d5a | 101 | |
f5591061 DK |
102 | AtomicWord theSize; ///< number of items in the queue |
103 | const unsigned int theMaxItemSize; ///< maximum item size | |
104 | const int theCapacity; ///< maximum number of items, i.e. theBuffer size | |
7a907247 | 105 | |
f5591061 DK |
106 | char theBuffer[]; |
107 | }; | |
68353d5a | 108 | |
f5591061 DK |
109 | /// shared array of OneToOneUniQueues |
110 | class OneToOneUniQueues { | |
111 | public: | |
112 | OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity); | |
68353d5a | 113 | |
f5591061 DK |
114 | size_t sharedMemorySize() const; |
115 | static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity); | |
9a51593d | 116 | |
f5591061 DK |
117 | const OneToOneUniQueue &operator [](const int index) const; |
118 | inline OneToOneUniQueue &operator [](const int index); | |
9a51593d | 119 | |
f5591061 DK |
120 | private: |
121 | inline const OneToOneUniQueue &front() const; | |
fa61cefe | 122 | |
f5591061 DK |
123 | public: |
124 | const int theCapacity; /// number of OneToOneUniQueues | |
9a51593d DK |
125 | }; |
126 | ||
127 | /** | |
128 | * Lockless fixed-capacity bidirectional queue for a limited number | |
f5591061 DK |
129 | * processes. Allows communication between two groups of processes: |
130 | * any process in one group may send data to and receive from any | |
131 | * process in another group, but processes in the same group can not | |
132 | * communicate. Process in each group has a unique integer ID in | |
133 | * [groupIdOffset, groupIdOffset + groupSize) range. | |
9a51593d | 134 | */ |
f5591061 | 135 | class FewToFewBiQueue { |
9a51593d | 136 | public: |
f5591061 DK |
137 | typedef OneToOneUniQueue::Full Full; |
138 | typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; | |
139 | ||
140 | private: | |
141 | /// Shared metadata for FewToFewBiQueue | |
142 | struct Metadata { | |
143 | Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset); | |
144 | size_t sharedMemorySize() const { return sizeof(*this); } | |
145 | static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); } | |
146 | ||
147 | const int theGroupASize; | |
148 | const int theGroupAIdOffset; | |
149 | const int theGroupBSize; | |
150 | const int theGroupBIdOffset; | |
151 | }; | |
7a907247 | 152 | |
f5591061 | 153 | public: |
68353d5a DK |
154 | class Owner { |
155 | public: | |
f5591061 | 156 | Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); |
68353d5a DK |
157 | ~Owner(); |
158 | ||
159 | private: | |
f5591061 DK |
160 | Mem::Owner<Metadata> *const metadataOwner; |
161 | Mem::Owner<OneToOneUniQueues> *const queuesOwner; | |
15cdbc7c | 162 | Mem::Owner<QueueReaders> *const readersOwner; |
68353d5a DK |
163 | }; |
164 | ||
f5591061 | 165 | static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); |
68353d5a | 166 | |
f5591061 DK |
167 | enum Group { groupA = 0, groupB = 1 }; |
168 | FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId); | |
9a51593d | 169 | |
f5591061 DK |
170 | Group localGroup() const { return theLocalGroup; } |
171 | Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; } | |
9a51593d | 172 | |
f5591061 DK |
173 | /// clears the reader notification received by the local process from the remote process |
174 | void clearReaderSignal(const int remoteProcessId); | |
9a51593d | 175 | |
f5591061 DK |
176 | /// picks a process and calls OneToOneUniQueue::pop() using its queue |
177 | template <class Value> bool pop(int &remoteProcessId, Value &value); | |
fa61cefe | 178 | |
f5591061 DK |
179 | /// calls OneToOneUniQueue::push() using the given process queue |
180 | template <class Value> bool push(const int remoteProcessId, const Value &value); | |
fa61cefe | 181 | |
f5591061 DK |
182 | private: |
183 | bool validProcessId(const Group group, const int processId) const; | |
184 | OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId); | |
185 | QueueReader &reader(const Group group, const int processId); | |
186 | int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; } | |
187 | int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; } | |
fa61cefe | 188 | |
f5591061 DK |
189 | private: |
190 | const Mem::Pointer<Metadata> metadata; ///< shared metadata | |
191 | const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues | |
15cdbc7c | 192 | const Mem::Pointer<QueueReaders> readers; ///< readers array |
5e44782e | 193 | |
f5591061 DK |
194 | const Group theLocalGroup; ///< group of this queue |
195 | const int theLocalProcessId; ///< process ID of this queue | |
196 | int theLastPopProcessId; ///< the ID of the last process we tried to pop() from | |
9a51593d DK |
197 | }; |
198 | ||
199 | ||
200 | // OneToOneUniQueue | |
201 | ||
9a51593d DK |
202 | template <class Value> |
203 | bool | |
f5591061 | 204 | OneToOneUniQueue::pop(Value &value, QueueReader *const reader) |
9a51593d | 205 | { |
f5591061 | 206 | if (sizeof(value) > theMaxItemSize) |
b2aa0934 DK |
207 | throw ItemTooLarge(); |
208 | ||
fa61cefe AR |
209 | // A writer might push between the empty test and block() below, so we do |
210 | // not return false right after calling block(), but test again. | |
211 | if (empty()) { | |
f5591061 DK |
212 | if (!reader) |
213 | return false; | |
214 | ||
215 | reader->block(); | |
fa61cefe AR |
216 | // A writer might push between the empty test and block() below, |
217 | // so we must test again as such a writer will not signal us. | |
218 | if (empty()) | |
219 | return false; | |
220 | } | |
9a51593d | 221 | |
f5591061 DK |
222 | if (reader) |
223 | reader->unblock(); | |
224 | ||
225 | const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize; | |
226 | memcpy(&value, theBuffer + pos, sizeof(value)); | |
227 | --theSize; | |
fa61cefe AR |
228 | |
229 | return true; | |
9a51593d DK |
230 | } |
231 | ||
232 | template <class Value> | |
233 | bool | |
f5591061 | 234 | OneToOneUniQueue::push(const Value &value, QueueReader *const reader) |
9a51593d | 235 | { |
f5591061 | 236 | if (sizeof(value) > theMaxItemSize) |
b2aa0934 DK |
237 | throw ItemTooLarge(); |
238 | ||
9a51593d | 239 | if (full()) |
7a907247 | 240 | throw Full(); |
9a51593d | 241 | |
7a907247 | 242 | const bool wasEmpty = empty(); |
f5591061 DK |
243 | const unsigned int pos = theIn++ % theCapacity * theMaxItemSize; |
244 | memcpy(theBuffer + pos, &value, sizeof(value)); | |
245 | ++theSize; | |
246 | ||
247 | return wasEmpty && (!reader || reader->raiseSignal()); | |
248 | } | |
249 | ||
fa61cefe | 250 | |
f5591061 DK |
251 | // OneToOneUniQueues |
252 | ||
253 | inline OneToOneUniQueue & | |
254 | OneToOneUniQueues::operator [](const int index) | |
255 | { | |
256 | return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]); | |
257 | } | |
258 | ||
259 | inline const OneToOneUniQueue & | |
260 | OneToOneUniQueues::front() const | |
261 | { | |
262 | const char *const queue = | |
263 | reinterpret_cast<const char *>(this) + sizeof(*this); | |
264 | return *reinterpret_cast<const OneToOneUniQueue *>(queue); | |
9a51593d DK |
265 | } |
266 | ||
9a51593d | 267 | |
f5591061 | 268 | // FewToFewBiQueue |
9a51593d | 269 | |
9a51593d DK |
270 | template <class Value> |
271 | bool | |
f5591061 | 272 | FewToFewBiQueue::pop(int &remoteProcessId, Value &value) |
9a51593d | 273 | { |
f5591061 DK |
274 | // iterate all remote group processes, starting after the one we visited last |
275 | QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); | |
276 | for (int i = 0; i < remoteGroupSize(); ++i) { | |
277 | if (++theLastPopProcessId >= remoteGroupIdOffset() + remoteGroupSize()) | |
278 | theLastPopProcessId = remoteGroupIdOffset(); | |
279 | OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), theLastPopProcessId, theLocalGroup, theLocalProcessId); | |
280 | if (queue.pop(value, &localReader)) { | |
281 | remoteProcessId = theLastPopProcessId; | |
282 | debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size()); | |
fa61cefe AR |
283 | return true; |
284 | } | |
9a51593d | 285 | } |
f5591061 | 286 | return false; // no process had anything to pop |
9a51593d DK |
287 | } |
288 | ||
289 | template <class Value> | |
290 | bool | |
f5591061 | 291 | FewToFewBiQueue::push(const int remoteProcessId, const Value &value) |
9a51593d | 292 | { |
f5591061 DK |
293 | OneToOneUniQueue &remoteQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId); |
294 | QueueReader &remoteReader = reader(remoteGroup(), remoteProcessId); | |
295 | debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size()); | |
296 | return remoteQueue.push(value, &remoteReader); | |
9a51593d DK |
297 | } |
298 | ||
15cdbc7c DK |
299 | } // namespace Ipc |
300 | ||
9a51593d | 301 | #endif // SQUID_IPC_QUEUE_H |