]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ipc/Queue.cc
Source Format Enforcement (#763)
[thirdparty/squid.git] / src / ipc / Queue.cc
CommitLineData
948230de 1/*
f70aedc4 2 * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
948230de 3 *
bbc27441
AJ
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
948230de
DK
7 */
8
bbc27441
AJ
9/* DEBUG: section 54 Interprocess Communication */
10
f7f3304a 11#include "squid.h"
fa61cefe
AR
12#include "base/TextException.h"
13#include "Debug.h"
14#include "globals.h"
948230de
DK
15#include "ipc/Queue.h"
16
807feb1d
DK
17#include <limits>
18
f5591061 19/// constructs Metadata ID from parent queue ID
948230de 20static String
f5591061 21MetadataId(String id)
948230de 22{
f5591061 23 id.append("__metadata");
948230de
DK
24 return id;
25}
26
f5591061 27/// constructs one-to-one queues ID from parent queue ID
fa61cefe 28static String
f5591061
DK
29QueuesId(String id)
30{
31 id.append("__queues");
32 return id;
33}
34
35/// constructs QueueReaders ID from parent queue ID
36static String
37ReadersId(String id)
fa61cefe
AR
38{
39 id.append("__readers");
40 return id;
41}
42
fa61cefe
AR
43/* QueueReader */
44
15cdbc7c 45InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
fa61cefe 46
6c6a656a 47Ipc::QueueReader::QueueReader(): popBlocked(true), popSignal(false),
f53969cc 48 rateLimit(0), balance(0)
fa61cefe
AR
49{
50 debugs(54, 7, HERE << "constructed " << id);
51}
52
68353d5a
DK
53/* QueueReaders */
54
3a8c5551 55Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity),
f53969cc 56 theReaders(theCapacity)
68353d5a
DK
57{
58 Must(theCapacity > 0);
68353d5a
DK
59}
60
61size_t
15cdbc7c 62Ipc::QueueReaders::sharedMemorySize() const
68353d5a
DK
63{
64 return SharedMemorySize(theCapacity);
65}
66
67size_t
15cdbc7c 68Ipc::QueueReaders::SharedMemorySize(const int capacity)
68353d5a
DK
69{
70 return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
71}
72
948230de
DK
73// OneToOneUniQueue
74
f5591061 75Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
f53969cc
SM
76 theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
77 theCapacity(aCapacity)
fa61cefe 78{
f5591061
DK
79 Must(theMaxItemSize > 0);
80 Must(theCapacity > 0);
fa61cefe
AR
81}
82
948230de 83int
15cdbc7c 84Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
948230de
DK
85{
86 assert(maxItemSize > 0);
f5591061 87 size -= sizeof(OneToOneUniQueue);
948230de
DK
88 return size >= 0 ? size / maxItemSize : 0;
89}
90
91int
15cdbc7c 92Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
948230de
DK
93{
94 assert(size >= 0);
f5591061 95 return sizeof(OneToOneUniQueue) + maxItemSize * size;
948230de
DK
96}
97
d8ee9e8d
EB
98/// start state reporting (by reporting queue parameters)
99/// The labels reflect whether the caller owns theIn or theOut data member and,
100/// hence, cannot report the other value reliably.
101void
102Ipc::OneToOneUniQueue::statOpen(std::ostream &os, const char *inLabel, const char *outLabel, const uint32_t count) const
103{
104 os << "{ size: " << count <<
70ac5b29 105 ", capacity: " << theCapacity <<
106 ", " << inLabel << ": " << theIn <<
107 ", " << outLabel << ": " << theOut;
d8ee9e8d
EB
108}
109
110/// end state reporting started by statOpen()
111void
112Ipc::OneToOneUniQueue::statClose(std::ostream &os) const
113{
114 os << "}\n";
115}
116
f5591061 117/* OneToOneUniQueues */
948230de 118
f5591061 119Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
fa61cefe 120{
f5591061
DK
121 Must(theCapacity > 0);
122 for (int i = 0; i < theCapacity; ++i)
123 new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
68353d5a
DK
124}
125
126size_t
f5591061 127Ipc::OneToOneUniQueues::sharedMemorySize() const
68353d5a 128{
f5591061 129 return sizeof(*this) + theCapacity * front().sharedMemorySize();
fa61cefe
AR
130}
131
f5591061
DK
132size_t
133Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
fa61cefe 134{
f5591061
DK
135 const int queueSize =
136 OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
137 return sizeof(OneToOneUniQueues) + queueSize * capacity;
fa61cefe
AR
138}
139
f5591061
DK
140const Ipc::OneToOneUniQueue &
141Ipc::OneToOneUniQueues::operator [](const int index) const
fa61cefe 142{
f5591061
DK
143 Must(0 <= index && index < theCapacity);
144 const size_t queueSize = index ? front().sharedMemorySize() : 0;
145 const char *const queue =
146 reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
147 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
fa61cefe
AR
148}
149
807feb1d
DK
150// BaseMultiQueue
151
152Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId):
f53969cc
SM
153 theLocalProcessId(aLocalProcessId),
154 theLastPopProcessId(std::numeric_limits<int>::max() - 1)
807feb1d
DK
155{
156}
157
158void
ced8def3 159Ipc::BaseMultiQueue::clearReaderSignal(const int /*remoteProcessId*/)
807feb1d
DK
160{
161 QueueReader &reader = localReader();
539283df 162 debugs(54, 7, "reader: " << reader.id);
807feb1d
DK
163
164 reader.clearSignal();
165
166 // we got a hint; we could reposition iteration to try popping from the
167 // remoteProcessId queue first; but it does not seem to help much and might
168 // introduce some bias so we do not do that for now:
169 // theLastPopProcessId = remoteProcessId;
170}
171
172const Ipc::QueueReader::Balance &
173Ipc::BaseMultiQueue::balance(const int remoteProcessId) const
174{
175 const QueueReader &r = remoteReader(remoteProcessId);
176 return r.balance;
177}
178
179const Ipc::QueueReader::Rate &
180Ipc::BaseMultiQueue::rateLimit(const int remoteProcessId) const
181{
182 const QueueReader &r = remoteReader(remoteProcessId);
183 return r.rateLimit;
184}
185
186Ipc::OneToOneUniQueue &
9d4e9cfb
AR
187Ipc::BaseMultiQueue::inQueue(const int remoteProcessId)
188{
807feb1d
DK
189 const OneToOneUniQueue &queue =
190 const_cast<const BaseMultiQueue *>(this)->inQueue(remoteProcessId);
191 return const_cast<OneToOneUniQueue &>(queue);
192}
193
194Ipc::OneToOneUniQueue &
9d4e9cfb
AR
195Ipc::BaseMultiQueue::outQueue(const int remoteProcessId)
196{
807feb1d
DK
197 const OneToOneUniQueue &queue =
198 const_cast<const BaseMultiQueue *>(this)->outQueue(remoteProcessId);
199 return const_cast<OneToOneUniQueue &>(queue);
200}
201
202Ipc::QueueReader &
9d4e9cfb
AR
203Ipc::BaseMultiQueue::localReader()
204{
807feb1d
DK
205 const QueueReader &reader =
206 const_cast<const BaseMultiQueue *>(this)->localReader();
207 return const_cast<QueueReader &>(reader);
208}
209
210Ipc::QueueReader &
9d4e9cfb
AR
211Ipc::BaseMultiQueue::remoteReader(const int remoteProcessId)
212{
807feb1d
DK
213 const QueueReader &reader =
214 const_cast<const BaseMultiQueue *>(this)->remoteReader(remoteProcessId);
215 return const_cast<QueueReader &>(reader);
216}
217
f5591061 218// FewToFewBiQueue
948230de 219
f5591061
DK
220Ipc::FewToFewBiQueue::Owner *
221Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
68353d5a 222{
f5591061 223 return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
68353d5a
DK
224}
225
f5591061 226Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
f53969cc
SM
227 BaseMultiQueue(aLocalProcessId),
228 metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
229 queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
230 readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
231 theLocalGroup(aLocalGroup)
948230de 232{
f5591061
DK
233 Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
234 Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
68353d5a 235
539283df 236 debugs(54, 7, "queue " << id << " reader: " << localReader().id);
948230de
DK
237}
238
ea2cdeb6
DK
239int
240Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
241{
242 return capacity * groupASize * groupBSize * 2;
243}
244
f5591061
DK
245bool
246Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
247{
248 switch (group) {
249 case groupA:
250 return metadata->theGroupAIdOffset <= processId &&
9199139f 251 processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
f5591061
DK
252 case groupB:
253 return metadata->theGroupBIdOffset <= processId &&
9199139f 254 processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
f5591061
DK
255 }
256 return false;
257}
258
0a11e039
AR
259int
260Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
f5591061
DK
261{
262 Must(fromGroup != toGroup);
0a11e039
AR
263 assert(validProcessId(fromGroup, fromProcessId));
264 assert(validProcessId(toGroup, toProcessId));
f5591061
DK
265 int index1;
266 int index2;
267 int offset;
268 if (fromGroup == groupA) {
269 index1 = fromProcessId - metadata->theGroupAIdOffset;
270 index2 = toProcessId - metadata->theGroupBIdOffset;
271 offset = 0;
272 } else {
273 index1 = toProcessId - metadata->theGroupAIdOffset;
274 index2 = fromProcessId - metadata->theGroupBIdOffset;
275 offset = metadata->theGroupASize * metadata->theGroupBSize;
276 }
277 const int index = offset + index1 * metadata->theGroupBSize + index2;
0a11e039
AR
278 return index;
279}
280
0a11e039
AR
281const Ipc::OneToOneUniQueue &
282Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
283{
284 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
948230de
DK
285}
286
55939a01
AR
287const Ipc::OneToOneUniQueue &
288Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
289{
290 return oneToOneQueue(remoteGroup(), remoteProcessId,
291 theLocalGroup, theLocalProcessId);
292}
293
55939a01
AR
294const Ipc::OneToOneUniQueue &
295Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
296{
297 return oneToOneQueue(theLocalGroup, theLocalProcessId,
298 remoteGroup(), remoteProcessId);
299}
300
df881a0f
AR
301int
302Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
303{
304 Must(validProcessId(group, processId));
305 return group == groupA ?
306 processId - metadata->theGroupAIdOffset :
307 metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
308}
309
807feb1d
DK
310const Ipc::QueueReader &
311Ipc::FewToFewBiQueue::localReader() const
948230de 312{
807feb1d 313 return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)];
df881a0f
AR
314}
315
316const Ipc::QueueReader &
807feb1d 317Ipc::FewToFewBiQueue::remoteReader(const int processId) const
df881a0f 318{
807feb1d 319 return readers->theReaders[readerIndex(remoteGroup(), processId)];
948230de 320}
fa61cefe 321
807feb1d
DK
322int
323Ipc::FewToFewBiQueue::remotesCount() const
fa61cefe 324{
807feb1d 325 return theLocalGroup == groupA ? metadata->theGroupBSize :
9d4e9cfb 326 metadata->theGroupASize;
807feb1d 327}
fa61cefe 328
807feb1d
DK
329int
330Ipc::FewToFewBiQueue::remotesIdOffset() const
331{
332 return theLocalGroup == groupA ? metadata->theGroupBIdOffset :
9d4e9cfb 333 metadata->theGroupAIdOffset;
807feb1d 334}
fa61cefe 335
807feb1d 336Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
f53969cc
SM
337 theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
338 theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
807feb1d
DK
339{
340 Must(theGroupASize > 0);
341 Must(theGroupBSize > 0);
342}
343
344Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
f53969cc
SM
345 metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
346 queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
347 readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
807feb1d 348{
68353d5a
DK
349}
350
807feb1d 351Ipc::FewToFewBiQueue::Owner::~Owner()
df881a0f 352{
807feb1d
DK
353 delete metadataOwner;
354 delete queuesOwner;
355 delete readersOwner;
df881a0f
AR
356}
357
807feb1d
DK
358// MultiQueue
359
360Ipc::MultiQueue::Owner *
361Ipc::MultiQueue::Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
55939a01 362{
807feb1d 363 return new Owner(id, processCount, processIdOffset, maxItemSize, capacity);
55939a01
AR
364}
365
807feb1d 366Ipc::MultiQueue::MultiQueue(const String &id, const int localProcessId):
f53969cc
SM
367 BaseMultiQueue(localProcessId),
368 metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
369 queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
370 readers(shm_old(QueueReaders)(ReadersId(id).termedBuf()))
df881a0f 371{
807feb1d
DK
372 Must(queues->theCapacity == metadata->theProcessCount * metadata->theProcessCount);
373 Must(readers->theCapacity == metadata->theProcessCount);
374
539283df 375 debugs(54, 7, "queue " << id << " reader: " << localReader().id);
55939a01
AR
376}
377
807feb1d
DK
378bool
379Ipc::MultiQueue::validProcessId(const int processId) const
55939a01 380{
807feb1d 381 return metadata->theProcessIdOffset <= processId &&
9d4e9cfb 382 processId < metadata->theProcessIdOffset + metadata->theProcessCount;
df881a0f
AR
383}
384
807feb1d
DK
385const Ipc::OneToOneUniQueue &
386Ipc::MultiQueue::oneToOneQueue(const int fromProcessId, const int toProcessId) const
f5591061 387{
807feb1d
DK
388 assert(validProcessId(fromProcessId));
389 assert(validProcessId(toProcessId));
390 const int fromIndex = fromProcessId - metadata->theProcessIdOffset;
391 const int toIndex = toProcessId - metadata->theProcessIdOffset;
392 const int index = fromIndex * metadata->theProcessCount + toIndex;
393 return (*queues)[index];
f5591061
DK
394}
395
807feb1d
DK
396const Ipc::QueueReader &
397Ipc::MultiQueue::reader(const int processId) const
68353d5a 398{
807feb1d
DK
399 assert(validProcessId(processId));
400 const int index = processId - metadata->theProcessIdOffset;
401 return readers->theReaders[index];
68353d5a
DK
402}
403
807feb1d
DK
404const Ipc::OneToOneUniQueue &
405Ipc::MultiQueue::inQueue(const int remoteProcessId) const
406{
407 return oneToOneQueue(remoteProcessId, theLocalProcessId);
408}
409
410const Ipc::OneToOneUniQueue &
411Ipc::MultiQueue::outQueue(const int remoteProcessId) const
412{
413 return oneToOneQueue(theLocalProcessId, remoteProcessId);
414}
415
416const Ipc::QueueReader &
417Ipc::MultiQueue::localReader() const
418{
419 return reader(theLocalProcessId);
420}
421
422const Ipc::QueueReader &
423Ipc::MultiQueue::remoteReader(const int processId) const
424{
425 return reader(processId);
426}
427
428int
429Ipc::MultiQueue::remotesCount() const
430{
431 return metadata->theProcessCount;
432}
433
434int
435Ipc::MultiQueue::remotesIdOffset() const
436{
437 return metadata->theProcessIdOffset;
438}
439
440Ipc::MultiQueue::Metadata::Metadata(const int aProcessCount, const int aProcessIdOffset):
f53969cc 441 theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset)
807feb1d
DK
442{
443 Must(theProcessCount > 0);
444}
445
446Ipc::MultiQueue::Owner::Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity):
f53969cc
SM
447 metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), processCount, processIdOffset)),
448 queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), processCount*processCount, maxItemSize, capacity)),
449 readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), processCount))
807feb1d
DK
450{
451}
452
453Ipc::MultiQueue::Owner::~Owner()
68353d5a 454{
f5591061
DK
455 delete metadataOwner;
456 delete queuesOwner;
68353d5a 457 delete readersOwner;
fa61cefe 458}
f53969cc 459