]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.cc
Docs: Copyright updates for 2018 (#114)
[thirdparty/squid.git] / src / ipc / Queue.cc
1 /*
2 * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 54 Interprocess Communication */
10
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "Debug.h"
14 #include "globals.h"
15 #include "ipc/Queue.h"
16
17 #include <limits>
18
19 /// constructs Metadata ID from parent queue ID
20 static String
21 MetadataId(String id)
22 {
23 id.append("__metadata");
24 return id;
25 }
26
27 /// constructs one-to-one queues ID from parent queue ID
28 static String
29 QueuesId(String id)
30 {
31 id.append("__queues");
32 return id;
33 }
34
35 /// constructs QueueReaders ID from parent queue ID
36 static String
37 ReadersId(String id)
38 {
39 id.append("__readers");
40 return id;
41 }
42
43 /* QueueReader */
44
45 InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
46
47 Ipc::QueueReader::QueueReader(): popBlocked(true), popSignal(false),
48 rateLimit(0), balance(0)
49 {
50 debugs(54, 7, HERE << "constructed " << id);
51 }
52
53 /* QueueReaders */
54
55 Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity),
56 theReaders(theCapacity)
57 {
58 Must(theCapacity > 0);
59 }
60
61 size_t
62 Ipc::QueueReaders::sharedMemorySize() const
63 {
64 return SharedMemorySize(theCapacity);
65 }
66
67 size_t
68 Ipc::QueueReaders::SharedMemorySize(const int capacity)
69 {
70 return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
71 }
72
73 // OneToOneUniQueue
74
75 Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
76 theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
77 theCapacity(aCapacity)
78 {
79 Must(theMaxItemSize > 0);
80 Must(theCapacity > 0);
81 }
82
83 int
84 Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
85 {
86 assert(maxItemSize > 0);
87 size -= sizeof(OneToOneUniQueue);
88 return size >= 0 ? size / maxItemSize : 0;
89 }
90
91 int
92 Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
93 {
94 assert(size >= 0);
95 return sizeof(OneToOneUniQueue) + maxItemSize * size;
96 }
97
98 /* OneToOneUniQueues */
99
100 Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
101 {
102 Must(theCapacity > 0);
103 for (int i = 0; i < theCapacity; ++i)
104 new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
105 }
106
107 size_t
108 Ipc::OneToOneUniQueues::sharedMemorySize() const
109 {
110 return sizeof(*this) + theCapacity * front().sharedMemorySize();
111 }
112
113 size_t
114 Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
115 {
116 const int queueSize =
117 OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
118 return sizeof(OneToOneUniQueues) + queueSize * capacity;
119 }
120
121 const Ipc::OneToOneUniQueue &
122 Ipc::OneToOneUniQueues::operator [](const int index) const
123 {
124 Must(0 <= index && index < theCapacity);
125 const size_t queueSize = index ? front().sharedMemorySize() : 0;
126 const char *const queue =
127 reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
128 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
129 }
130
131 // BaseMultiQueue
132
133 Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId):
134 theLocalProcessId(aLocalProcessId),
135 theLastPopProcessId(std::numeric_limits<int>::max() - 1)
136 {
137 }
138
139 void
140 Ipc::BaseMultiQueue::clearReaderSignal(const int /*remoteProcessId*/)
141 {
142 QueueReader &reader = localReader();
143 debugs(54, 7, "reader: " << reader.id);
144
145 reader.clearSignal();
146
147 // we got a hint; we could reposition iteration to try popping from the
148 // remoteProcessId queue first; but it does not seem to help much and might
149 // introduce some bias so we do not do that for now:
150 // theLastPopProcessId = remoteProcessId;
151 }
152
153 const Ipc::QueueReader::Balance &
154 Ipc::BaseMultiQueue::balance(const int remoteProcessId) const
155 {
156 const QueueReader &r = remoteReader(remoteProcessId);
157 return r.balance;
158 }
159
160 const Ipc::QueueReader::Rate &
161 Ipc::BaseMultiQueue::rateLimit(const int remoteProcessId) const
162 {
163 const QueueReader &r = remoteReader(remoteProcessId);
164 return r.rateLimit;
165 }
166
167 Ipc::OneToOneUniQueue &
168 Ipc::BaseMultiQueue::inQueue(const int remoteProcessId)
169 {
170 const OneToOneUniQueue &queue =
171 const_cast<const BaseMultiQueue *>(this)->inQueue(remoteProcessId);
172 return const_cast<OneToOneUniQueue &>(queue);
173 }
174
175 Ipc::OneToOneUniQueue &
176 Ipc::BaseMultiQueue::outQueue(const int remoteProcessId)
177 {
178 const OneToOneUniQueue &queue =
179 const_cast<const BaseMultiQueue *>(this)->outQueue(remoteProcessId);
180 return const_cast<OneToOneUniQueue &>(queue);
181 }
182
183 Ipc::QueueReader &
184 Ipc::BaseMultiQueue::localReader()
185 {
186 const QueueReader &reader =
187 const_cast<const BaseMultiQueue *>(this)->localReader();
188 return const_cast<QueueReader &>(reader);
189 }
190
191 Ipc::QueueReader &
192 Ipc::BaseMultiQueue::remoteReader(const int remoteProcessId)
193 {
194 const QueueReader &reader =
195 const_cast<const BaseMultiQueue *>(this)->remoteReader(remoteProcessId);
196 return const_cast<QueueReader &>(reader);
197 }
198
199 // FewToFewBiQueue
200
201 Ipc::FewToFewBiQueue::Owner *
202 Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
203 {
204 return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
205 }
206
207 Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
208 BaseMultiQueue(aLocalProcessId),
209 metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
210 queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
211 readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
212 theLocalGroup(aLocalGroup)
213 {
214 Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
215 Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
216
217 debugs(54, 7, "queue " << id << " reader: " << localReader().id);
218 }
219
220 int
221 Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
222 {
223 return capacity * groupASize * groupBSize * 2;
224 }
225
226 bool
227 Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
228 {
229 switch (group) {
230 case groupA:
231 return metadata->theGroupAIdOffset <= processId &&
232 processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
233 case groupB:
234 return metadata->theGroupBIdOffset <= processId &&
235 processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
236 }
237 return false;
238 }
239
240 int
241 Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
242 {
243 Must(fromGroup != toGroup);
244 assert(validProcessId(fromGroup, fromProcessId));
245 assert(validProcessId(toGroup, toProcessId));
246 int index1;
247 int index2;
248 int offset;
249 if (fromGroup == groupA) {
250 index1 = fromProcessId - metadata->theGroupAIdOffset;
251 index2 = toProcessId - metadata->theGroupBIdOffset;
252 offset = 0;
253 } else {
254 index1 = toProcessId - metadata->theGroupAIdOffset;
255 index2 = fromProcessId - metadata->theGroupBIdOffset;
256 offset = metadata->theGroupASize * metadata->theGroupBSize;
257 }
258 const int index = offset + index1 * metadata->theGroupBSize + index2;
259 return index;
260 }
261
262 const Ipc::OneToOneUniQueue &
263 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
264 {
265 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
266 }
267
268 const Ipc::OneToOneUniQueue &
269 Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
270 {
271 return oneToOneQueue(remoteGroup(), remoteProcessId,
272 theLocalGroup, theLocalProcessId);
273 }
274
275 const Ipc::OneToOneUniQueue &
276 Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
277 {
278 return oneToOneQueue(theLocalGroup, theLocalProcessId,
279 remoteGroup(), remoteProcessId);
280 }
281
282 int
283 Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
284 {
285 Must(validProcessId(group, processId));
286 return group == groupA ?
287 processId - metadata->theGroupAIdOffset :
288 metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
289 }
290
291 const Ipc::QueueReader &
292 Ipc::FewToFewBiQueue::localReader() const
293 {
294 return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)];
295 }
296
297 const Ipc::QueueReader &
298 Ipc::FewToFewBiQueue::remoteReader(const int processId) const
299 {
300 return readers->theReaders[readerIndex(remoteGroup(), processId)];
301 }
302
303 int
304 Ipc::FewToFewBiQueue::remotesCount() const
305 {
306 return theLocalGroup == groupA ? metadata->theGroupBSize :
307 metadata->theGroupASize;
308 }
309
310 int
311 Ipc::FewToFewBiQueue::remotesIdOffset() const
312 {
313 return theLocalGroup == groupA ? metadata->theGroupBIdOffset :
314 metadata->theGroupAIdOffset;
315 }
316
317 Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
318 theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
319 theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
320 {
321 Must(theGroupASize > 0);
322 Must(theGroupBSize > 0);
323 }
324
325 Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
326 metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
327 queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
328 readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
329 {
330 }
331
332 Ipc::FewToFewBiQueue::Owner::~Owner()
333 {
334 delete metadataOwner;
335 delete queuesOwner;
336 delete readersOwner;
337 }
338
339 // MultiQueue
340
341 Ipc::MultiQueue::Owner *
342 Ipc::MultiQueue::Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
343 {
344 return new Owner(id, processCount, processIdOffset, maxItemSize, capacity);
345 }
346
347 Ipc::MultiQueue::MultiQueue(const String &id, const int localProcessId):
348 BaseMultiQueue(localProcessId),
349 metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
350 queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
351 readers(shm_old(QueueReaders)(ReadersId(id).termedBuf()))
352 {
353 Must(queues->theCapacity == metadata->theProcessCount * metadata->theProcessCount);
354 Must(readers->theCapacity == metadata->theProcessCount);
355
356 debugs(54, 7, "queue " << id << " reader: " << localReader().id);
357 }
358
359 bool
360 Ipc::MultiQueue::validProcessId(const int processId) const
361 {
362 return metadata->theProcessIdOffset <= processId &&
363 processId < metadata->theProcessIdOffset + metadata->theProcessCount;
364 }
365
366 const Ipc::OneToOneUniQueue &
367 Ipc::MultiQueue::oneToOneQueue(const int fromProcessId, const int toProcessId) const
368 {
369 assert(validProcessId(fromProcessId));
370 assert(validProcessId(toProcessId));
371 const int fromIndex = fromProcessId - metadata->theProcessIdOffset;
372 const int toIndex = toProcessId - metadata->theProcessIdOffset;
373 const int index = fromIndex * metadata->theProcessCount + toIndex;
374 return (*queues)[index];
375 }
376
377 const Ipc::QueueReader &
378 Ipc::MultiQueue::reader(const int processId) const
379 {
380 assert(validProcessId(processId));
381 const int index = processId - metadata->theProcessIdOffset;
382 return readers->theReaders[index];
383 }
384
385 const Ipc::OneToOneUniQueue &
386 Ipc::MultiQueue::inQueue(const int remoteProcessId) const
387 {
388 return oneToOneQueue(remoteProcessId, theLocalProcessId);
389 }
390
391 const Ipc::OneToOneUniQueue &
392 Ipc::MultiQueue::outQueue(const int remoteProcessId) const
393 {
394 return oneToOneQueue(theLocalProcessId, remoteProcessId);
395 }
396
397 const Ipc::QueueReader &
398 Ipc::MultiQueue::localReader() const
399 {
400 return reader(theLocalProcessId);
401 }
402
403 const Ipc::QueueReader &
404 Ipc::MultiQueue::remoteReader(const int processId) const
405 {
406 return reader(processId);
407 }
408
409 int
410 Ipc::MultiQueue::remotesCount() const
411 {
412 return metadata->theProcessCount;
413 }
414
415 int
416 Ipc::MultiQueue::remotesIdOffset() const
417 {
418 return metadata->theProcessIdOffset;
419 }
420
421 Ipc::MultiQueue::Metadata::Metadata(const int aProcessCount, const int aProcessIdOffset):
422 theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset)
423 {
424 Must(theProcessCount > 0);
425 }
426
427 Ipc::MultiQueue::Owner::Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity):
428 metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), processCount, processIdOffset)),
429 queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), processCount*processCount, maxItemSize, capacity)),
430 readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), processCount))
431 {
432 }
433
434 Ipc::MultiQueue::Owner::~Owner()
435 {
436 delete metadataOwner;
437 delete queuesOwner;
438 delete readersOwner;
439 }
440