]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ipc/Queue.cc
Docs: Copyright updates for 2018 (#114)
[thirdparty/squid.git] / src / ipc / Queue.cc
CommitLineData
948230de 1/*
5b74111a 2 * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
948230de 3 *
bbc27441
AJ
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
948230de
DK
7 */
8
bbc27441
AJ
9/* DEBUG: section 54 Interprocess Communication */
10
f7f3304a 11#include "squid.h"
fa61cefe
AR
12#include "base/TextException.h"
13#include "Debug.h"
14#include "globals.h"
948230de
DK
15#include "ipc/Queue.h"
16
807feb1d
DK
17#include <limits>
18
f5591061 19/// constructs Metadata ID from parent queue ID
948230de 20static String
f5591061 21MetadataId(String id)
948230de 22{
f5591061 23 id.append("__metadata");
948230de
DK
24 return id;
25}
26
f5591061 27/// constructs one-to-one queues ID from parent queue ID
fa61cefe 28static String
f5591061
DK
29QueuesId(String id)
30{
31 id.append("__queues");
32 return id;
33}
34
35/// constructs QueueReaders ID from parent queue ID
36static String
37ReadersId(String id)
fa61cefe
AR
38{
39 id.append("__readers");
40 return id;
41}
42
fa61cefe
AR
43/* QueueReader */
44
15cdbc7c 45InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
fa61cefe 46
6c6a656a 47Ipc::QueueReader::QueueReader(): popBlocked(true), popSignal(false),
f53969cc 48 rateLimit(0), balance(0)
fa61cefe
AR
49{
50 debugs(54, 7, HERE << "constructed " << id);
51}
52
68353d5a
DK
53/* QueueReaders */
54
3a8c5551 55Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity),
f53969cc 56 theReaders(theCapacity)
68353d5a
DK
57{
58 Must(theCapacity > 0);
68353d5a
DK
59}
60
61size_t
15cdbc7c 62Ipc::QueueReaders::sharedMemorySize() const
68353d5a
DK
63{
64 return SharedMemorySize(theCapacity);
65}
66
67size_t
15cdbc7c 68Ipc::QueueReaders::SharedMemorySize(const int capacity)
68353d5a
DK
69{
70 return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
71}
72
948230de
DK
73// OneToOneUniQueue
74
f5591061 75Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
f53969cc
SM
76 theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
77 theCapacity(aCapacity)
fa61cefe 78{
f5591061
DK
79 Must(theMaxItemSize > 0);
80 Must(theCapacity > 0);
fa61cefe
AR
81}
82
948230de 83int
15cdbc7c 84Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
948230de
DK
85{
86 assert(maxItemSize > 0);
f5591061 87 size -= sizeof(OneToOneUniQueue);
948230de
DK
88 return size >= 0 ? size / maxItemSize : 0;
89}
90
91int
15cdbc7c 92Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
948230de
DK
93{
94 assert(size >= 0);
f5591061 95 return sizeof(OneToOneUniQueue) + maxItemSize * size;
948230de
DK
96}
97
f5591061 98/* OneToOneUniQueues */
948230de 99
f5591061 100Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
fa61cefe 101{
f5591061
DK
102 Must(theCapacity > 0);
103 for (int i = 0; i < theCapacity; ++i)
104 new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
68353d5a
DK
105}
106
107size_t
f5591061 108Ipc::OneToOneUniQueues::sharedMemorySize() const
68353d5a 109{
f5591061 110 return sizeof(*this) + theCapacity * front().sharedMemorySize();
fa61cefe
AR
111}
112
f5591061
DK
113size_t
114Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
fa61cefe 115{
f5591061
DK
116 const int queueSize =
117 OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
118 return sizeof(OneToOneUniQueues) + queueSize * capacity;
fa61cefe
AR
119}
120
f5591061
DK
121const Ipc::OneToOneUniQueue &
122Ipc::OneToOneUniQueues::operator [](const int index) const
fa61cefe 123{
f5591061
DK
124 Must(0 <= index && index < theCapacity);
125 const size_t queueSize = index ? front().sharedMemorySize() : 0;
126 const char *const queue =
127 reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
128 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
fa61cefe
AR
129}
130
807feb1d
DK
131// BaseMultiQueue
132
133Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId):
f53969cc
SM
134 theLocalProcessId(aLocalProcessId),
135 theLastPopProcessId(std::numeric_limits<int>::max() - 1)
807feb1d
DK
136{
137}
138
139void
ced8def3 140Ipc::BaseMultiQueue::clearReaderSignal(const int /*remoteProcessId*/)
807feb1d
DK
141{
142 QueueReader &reader = localReader();
539283df 143 debugs(54, 7, "reader: " << reader.id);
807feb1d
DK
144
145 reader.clearSignal();
146
147 // we got a hint; we could reposition iteration to try popping from the
148 // remoteProcessId queue first; but it does not seem to help much and might
149 // introduce some bias so we do not do that for now:
150 // theLastPopProcessId = remoteProcessId;
151}
152
153const Ipc::QueueReader::Balance &
154Ipc::BaseMultiQueue::balance(const int remoteProcessId) const
155{
156 const QueueReader &r = remoteReader(remoteProcessId);
157 return r.balance;
158}
159
160const Ipc::QueueReader::Rate &
161Ipc::BaseMultiQueue::rateLimit(const int remoteProcessId) const
162{
163 const QueueReader &r = remoteReader(remoteProcessId);
164 return r.rateLimit;
165}
166
167Ipc::OneToOneUniQueue &
9d4e9cfb
AR
168Ipc::BaseMultiQueue::inQueue(const int remoteProcessId)
169{
807feb1d
DK
170 const OneToOneUniQueue &queue =
171 const_cast<const BaseMultiQueue *>(this)->inQueue(remoteProcessId);
172 return const_cast<OneToOneUniQueue &>(queue);
173}
174
175Ipc::OneToOneUniQueue &
9d4e9cfb
AR
176Ipc::BaseMultiQueue::outQueue(const int remoteProcessId)
177{
807feb1d
DK
178 const OneToOneUniQueue &queue =
179 const_cast<const BaseMultiQueue *>(this)->outQueue(remoteProcessId);
180 return const_cast<OneToOneUniQueue &>(queue);
181}
182
183Ipc::QueueReader &
9d4e9cfb
AR
184Ipc::BaseMultiQueue::localReader()
185{
807feb1d
DK
186 const QueueReader &reader =
187 const_cast<const BaseMultiQueue *>(this)->localReader();
188 return const_cast<QueueReader &>(reader);
189}
190
191Ipc::QueueReader &
9d4e9cfb
AR
192Ipc::BaseMultiQueue::remoteReader(const int remoteProcessId)
193{
807feb1d
DK
194 const QueueReader &reader =
195 const_cast<const BaseMultiQueue *>(this)->remoteReader(remoteProcessId);
196 return const_cast<QueueReader &>(reader);
197}
198
f5591061 199// FewToFewBiQueue
948230de 200
f5591061
DK
201Ipc::FewToFewBiQueue::Owner *
202Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
68353d5a 203{
f5591061 204 return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
68353d5a
DK
205}
206
f5591061 207Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
f53969cc
SM
208 BaseMultiQueue(aLocalProcessId),
209 metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
210 queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
211 readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
212 theLocalGroup(aLocalGroup)
948230de 213{
f5591061
DK
214 Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
215 Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
68353d5a 216
539283df 217 debugs(54, 7, "queue " << id << " reader: " << localReader().id);
948230de
DK
218}
219
ea2cdeb6
DK
220int
221Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
222{
223 return capacity * groupASize * groupBSize * 2;
224}
225
f5591061
DK
226bool
227Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
228{
229 switch (group) {
230 case groupA:
231 return metadata->theGroupAIdOffset <= processId &&
9199139f 232 processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
f5591061
DK
233 case groupB:
234 return metadata->theGroupBIdOffset <= processId &&
9199139f 235 processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
f5591061
DK
236 }
237 return false;
238}
239
0a11e039
AR
240int
241Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
f5591061
DK
242{
243 Must(fromGroup != toGroup);
0a11e039
AR
244 assert(validProcessId(fromGroup, fromProcessId));
245 assert(validProcessId(toGroup, toProcessId));
f5591061
DK
246 int index1;
247 int index2;
248 int offset;
249 if (fromGroup == groupA) {
250 index1 = fromProcessId - metadata->theGroupAIdOffset;
251 index2 = toProcessId - metadata->theGroupBIdOffset;
252 offset = 0;
253 } else {
254 index1 = toProcessId - metadata->theGroupAIdOffset;
255 index2 = fromProcessId - metadata->theGroupBIdOffset;
256 offset = metadata->theGroupASize * metadata->theGroupBSize;
257 }
258 const int index = offset + index1 * metadata->theGroupBSize + index2;
0a11e039
AR
259 return index;
260}
261
0a11e039
AR
262const Ipc::OneToOneUniQueue &
263Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
264{
265 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
948230de
DK
266}
267
55939a01
AR
268const Ipc::OneToOneUniQueue &
269Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
270{
271 return oneToOneQueue(remoteGroup(), remoteProcessId,
272 theLocalGroup, theLocalProcessId);
273}
274
55939a01
AR
275const Ipc::OneToOneUniQueue &
276Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
277{
278 return oneToOneQueue(theLocalGroup, theLocalProcessId,
279 remoteGroup(), remoteProcessId);
280}
281
df881a0f
AR
282int
283Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
284{
285 Must(validProcessId(group, processId));
286 return group == groupA ?
287 processId - metadata->theGroupAIdOffset :
288 metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
289}
290
807feb1d
DK
291const Ipc::QueueReader &
292Ipc::FewToFewBiQueue::localReader() const
948230de 293{
807feb1d 294 return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)];
df881a0f
AR
295}
296
297const Ipc::QueueReader &
807feb1d 298Ipc::FewToFewBiQueue::remoteReader(const int processId) const
df881a0f 299{
807feb1d 300 return readers->theReaders[readerIndex(remoteGroup(), processId)];
948230de 301}
fa61cefe 302
807feb1d
DK
303int
304Ipc::FewToFewBiQueue::remotesCount() const
fa61cefe 305{
807feb1d 306 return theLocalGroup == groupA ? metadata->theGroupBSize :
9d4e9cfb 307 metadata->theGroupASize;
807feb1d 308}
fa61cefe 309
807feb1d
DK
310int
311Ipc::FewToFewBiQueue::remotesIdOffset() const
312{
313 return theLocalGroup == groupA ? metadata->theGroupBIdOffset :
9d4e9cfb 314 metadata->theGroupAIdOffset;
807feb1d 315}
fa61cefe 316
807feb1d 317Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
f53969cc
SM
318 theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
319 theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
807feb1d
DK
320{
321 Must(theGroupASize > 0);
322 Must(theGroupBSize > 0);
323}
324
325Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
f53969cc
SM
326 metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
327 queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
328 readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
807feb1d 329{
68353d5a
DK
330}
331
807feb1d 332Ipc::FewToFewBiQueue::Owner::~Owner()
df881a0f 333{
807feb1d
DK
334 delete metadataOwner;
335 delete queuesOwner;
336 delete readersOwner;
df881a0f
AR
337}
338
807feb1d
DK
339// MultiQueue
340
341Ipc::MultiQueue::Owner *
342Ipc::MultiQueue::Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
55939a01 343{
807feb1d 344 return new Owner(id, processCount, processIdOffset, maxItemSize, capacity);
55939a01
AR
345}
346
807feb1d 347Ipc::MultiQueue::MultiQueue(const String &id, const int localProcessId):
f53969cc
SM
348 BaseMultiQueue(localProcessId),
349 metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
350 queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
351 readers(shm_old(QueueReaders)(ReadersId(id).termedBuf()))
df881a0f 352{
807feb1d
DK
353 Must(queues->theCapacity == metadata->theProcessCount * metadata->theProcessCount);
354 Must(readers->theCapacity == metadata->theProcessCount);
355
539283df 356 debugs(54, 7, "queue " << id << " reader: " << localReader().id);
55939a01
AR
357}
358
807feb1d
DK
359bool
360Ipc::MultiQueue::validProcessId(const int processId) const
55939a01 361{
807feb1d 362 return metadata->theProcessIdOffset <= processId &&
9d4e9cfb 363 processId < metadata->theProcessIdOffset + metadata->theProcessCount;
df881a0f
AR
364}
365
807feb1d
DK
366const Ipc::OneToOneUniQueue &
367Ipc::MultiQueue::oneToOneQueue(const int fromProcessId, const int toProcessId) const
f5591061 368{
807feb1d
DK
369 assert(validProcessId(fromProcessId));
370 assert(validProcessId(toProcessId));
371 const int fromIndex = fromProcessId - metadata->theProcessIdOffset;
372 const int toIndex = toProcessId - metadata->theProcessIdOffset;
373 const int index = fromIndex * metadata->theProcessCount + toIndex;
374 return (*queues)[index];
f5591061
DK
375}
376
807feb1d
DK
377const Ipc::QueueReader &
378Ipc::MultiQueue::reader(const int processId) const
68353d5a 379{
807feb1d
DK
380 assert(validProcessId(processId));
381 const int index = processId - metadata->theProcessIdOffset;
382 return readers->theReaders[index];
68353d5a
DK
383}
384
807feb1d
DK
385const Ipc::OneToOneUniQueue &
386Ipc::MultiQueue::inQueue(const int remoteProcessId) const
387{
388 return oneToOneQueue(remoteProcessId, theLocalProcessId);
389}
390
391const Ipc::OneToOneUniQueue &
392Ipc::MultiQueue::outQueue(const int remoteProcessId) const
393{
394 return oneToOneQueue(theLocalProcessId, remoteProcessId);
395}
396
397const Ipc::QueueReader &
398Ipc::MultiQueue::localReader() const
399{
400 return reader(theLocalProcessId);
401}
402
403const Ipc::QueueReader &
404Ipc::MultiQueue::remoteReader(const int processId) const
405{
406 return reader(processId);
407}
408
409int
410Ipc::MultiQueue::remotesCount() const
411{
412 return metadata->theProcessCount;
413}
414
415int
416Ipc::MultiQueue::remotesIdOffset() const
417{
418 return metadata->theProcessIdOffset;
419}
420
421Ipc::MultiQueue::Metadata::Metadata(const int aProcessCount, const int aProcessIdOffset):
f53969cc 422 theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset)
807feb1d
DK
423{
424 Must(theProcessCount > 0);
425}
426
427Ipc::MultiQueue::Owner::Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity):
f53969cc
SM
428 metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), processCount, processIdOffset)),
429 queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), processCount*processCount, maxItemSize, capacity)),
430 readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), processCount))
807feb1d
DK
431{
432}
433
434Ipc::MultiQueue::Owner::~Owner()
68353d5a 435{
f5591061
DK
436 delete metadataOwner;
437 delete queuesOwner;
68353d5a 438 delete readersOwner;
fa61cefe 439}
f53969cc 440