]>
Commit | Line | Data |
---|---|---|
948230de | 1 | /* |
77b1029d | 2 | * Copyright (C) 1996-2020 The Squid Software Foundation and contributors |
948230de | 3 | * |
bbc27441 AJ |
4 | * Squid software is distributed under GPLv2+ license and includes |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
948230de DK |
7 | */ |
8 | ||
bbc27441 AJ |
9 | /* DEBUG: section 54 Interprocess Communication */ |
10 | ||
f7f3304a | 11 | #include "squid.h" |
fa61cefe AR |
12 | #include "base/TextException.h" |
13 | #include "Debug.h" | |
14 | #include "globals.h" | |
948230de DK |
15 | #include "ipc/Queue.h" |
16 | ||
807feb1d DK |
17 | #include <limits> |
18 | ||
f5591061 | 19 | /// constructs Metadata ID from parent queue ID |
948230de | 20 | static String |
f5591061 | 21 | MetadataId(String id) |
948230de | 22 | { |
f5591061 | 23 | id.append("__metadata"); |
948230de DK |
24 | return id; |
25 | } | |
26 | ||
f5591061 | 27 | /// constructs one-to-one queues ID from parent queue ID |
fa61cefe | 28 | static String |
f5591061 DK |
29 | QueuesId(String id) |
30 | { | |
31 | id.append("__queues"); | |
32 | return id; | |
33 | } | |
34 | ||
35 | /// constructs QueueReaders ID from parent queue ID | |
36 | static String | |
37 | ReadersId(String id) | |
fa61cefe AR |
38 | { |
39 | id.append("__readers"); | |
40 | return id; | |
41 | } | |
42 | ||
fa61cefe AR |
43 | /* QueueReader */ |
44 | ||
15cdbc7c | 45 | InstanceIdDefinitions(Ipc::QueueReader, "ipcQR"); |
fa61cefe | 46 | |
6c6a656a | 47 | Ipc::QueueReader::QueueReader(): popBlocked(true), popSignal(false), |
f53969cc | 48 | rateLimit(0), balance(0) |
fa61cefe AR |
49 | { |
50 | debugs(54, 7, HERE << "constructed " << id); | |
51 | } | |
52 | ||
68353d5a DK |
53 | /* QueueReaders */ |
54 | ||
3a8c5551 | 55 | Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity), |
f53969cc | 56 | theReaders(theCapacity) |
68353d5a DK |
57 | { |
58 | Must(theCapacity > 0); | |
68353d5a DK |
59 | } |
60 | ||
61 | size_t | |
15cdbc7c | 62 | Ipc::QueueReaders::sharedMemorySize() const |
68353d5a DK |
63 | { |
64 | return SharedMemorySize(theCapacity); | |
65 | } | |
66 | ||
67 | size_t | |
15cdbc7c | 68 | Ipc::QueueReaders::SharedMemorySize(const int capacity) |
68353d5a DK |
69 | { |
70 | return sizeof(QueueReaders) + sizeof(QueueReader) * capacity; | |
71 | } | |
72 | ||
948230de DK |
73 | // OneToOneUniQueue |
74 | ||
f5591061 | 75 | Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity): |
f53969cc SM |
76 | theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize), |
77 | theCapacity(aCapacity) | |
fa61cefe | 78 | { |
f5591061 DK |
79 | Must(theMaxItemSize > 0); |
80 | Must(theCapacity > 0); | |
fa61cefe AR |
81 | } |
82 | ||
948230de | 83 | int |
15cdbc7c | 84 | Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size) |
948230de DK |
85 | { |
86 | assert(maxItemSize > 0); | |
f5591061 | 87 | size -= sizeof(OneToOneUniQueue); |
948230de DK |
88 | return size >= 0 ? size / maxItemSize : 0; |
89 | } | |
90 | ||
91 | int | |
15cdbc7c | 92 | Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size) |
948230de DK |
93 | { |
94 | assert(size >= 0); | |
f5591061 | 95 | return sizeof(OneToOneUniQueue) + maxItemSize * size; |
948230de DK |
96 | } |
97 | ||
d8ee9e8d EB |
98 | /// start state reporting (by reporting queue parameters) |
99 | /// The labels reflect whether the caller owns theIn or theOut data member and, | |
100 | /// hence, cannot report the other value reliably. | |
101 | void | |
102 | Ipc::OneToOneUniQueue::statOpen(std::ostream &os, const char *inLabel, const char *outLabel, const uint32_t count) const | |
103 | { | |
104 | os << "{ size: " << count << | |
70ac5b29 | 105 | ", capacity: " << theCapacity << |
106 | ", " << inLabel << ": " << theIn << | |
107 | ", " << outLabel << ": " << theOut; | |
d8ee9e8d EB |
108 | } |
109 | ||
110 | /// end state reporting started by statOpen() | |
111 | void | |
112 | Ipc::OneToOneUniQueue::statClose(std::ostream &os) const | |
113 | { | |
114 | os << "}\n"; | |
115 | } | |
116 | ||
f5591061 | 117 | /* OneToOneUniQueues */ |
948230de | 118 | |
f5591061 | 119 | Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity) |
fa61cefe | 120 | { |
f5591061 DK |
121 | Must(theCapacity > 0); |
122 | for (int i = 0; i < theCapacity; ++i) | |
123 | new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity); | |
68353d5a DK |
124 | } |
125 | ||
126 | size_t | |
f5591061 | 127 | Ipc::OneToOneUniQueues::sharedMemorySize() const |
68353d5a | 128 | { |
f5591061 | 129 | return sizeof(*this) + theCapacity * front().sharedMemorySize(); |
fa61cefe AR |
130 | } |
131 | ||
f5591061 DK |
132 | size_t |
133 | Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity) | |
fa61cefe | 134 | { |
f5591061 DK |
135 | const int queueSize = |
136 | OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity); | |
137 | return sizeof(OneToOneUniQueues) + queueSize * capacity; | |
fa61cefe AR |
138 | } |
139 | ||
f5591061 DK |
140 | const Ipc::OneToOneUniQueue & |
141 | Ipc::OneToOneUniQueues::operator [](const int index) const | |
fa61cefe | 142 | { |
f5591061 DK |
143 | Must(0 <= index && index < theCapacity); |
144 | const size_t queueSize = index ? front().sharedMemorySize() : 0; | |
145 | const char *const queue = | |
146 | reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize; | |
147 | return *reinterpret_cast<const OneToOneUniQueue *>(queue); | |
fa61cefe AR |
148 | } |
149 | ||
807feb1d DK |
150 | // BaseMultiQueue |
151 | ||
152 | Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId): | |
f53969cc SM |
153 | theLocalProcessId(aLocalProcessId), |
154 | theLastPopProcessId(std::numeric_limits<int>::max() - 1) | |
807feb1d DK |
155 | { |
156 | } | |
157 | ||
158 | void | |
ced8def3 | 159 | Ipc::BaseMultiQueue::clearReaderSignal(const int /*remoteProcessId*/) |
807feb1d DK |
160 | { |
161 | QueueReader &reader = localReader(); | |
539283df | 162 | debugs(54, 7, "reader: " << reader.id); |
807feb1d DK |
163 | |
164 | reader.clearSignal(); | |
165 | ||
166 | // we got a hint; we could reposition iteration to try popping from the | |
167 | // remoteProcessId queue first; but it does not seem to help much and might | |
168 | // introduce some bias so we do not do that for now: | |
169 | // theLastPopProcessId = remoteProcessId; | |
170 | } | |
171 | ||
172 | const Ipc::QueueReader::Balance & | |
173 | Ipc::BaseMultiQueue::balance(const int remoteProcessId) const | |
174 | { | |
175 | const QueueReader &r = remoteReader(remoteProcessId); | |
176 | return r.balance; | |
177 | } | |
178 | ||
179 | const Ipc::QueueReader::Rate & | |
180 | Ipc::BaseMultiQueue::rateLimit(const int remoteProcessId) const | |
181 | { | |
182 | const QueueReader &r = remoteReader(remoteProcessId); | |
183 | return r.rateLimit; | |
184 | } | |
185 | ||
186 | Ipc::OneToOneUniQueue & | |
9d4e9cfb AR |
187 | Ipc::BaseMultiQueue::inQueue(const int remoteProcessId) |
188 | { | |
807feb1d DK |
189 | const OneToOneUniQueue &queue = |
190 | const_cast<const BaseMultiQueue *>(this)->inQueue(remoteProcessId); | |
191 | return const_cast<OneToOneUniQueue &>(queue); | |
192 | } | |
193 | ||
194 | Ipc::OneToOneUniQueue & | |
9d4e9cfb AR |
195 | Ipc::BaseMultiQueue::outQueue(const int remoteProcessId) |
196 | { | |
807feb1d DK |
197 | const OneToOneUniQueue &queue = |
198 | const_cast<const BaseMultiQueue *>(this)->outQueue(remoteProcessId); | |
199 | return const_cast<OneToOneUniQueue &>(queue); | |
200 | } | |
201 | ||
202 | Ipc::QueueReader & | |
9d4e9cfb AR |
203 | Ipc::BaseMultiQueue::localReader() |
204 | { | |
807feb1d DK |
205 | const QueueReader &reader = |
206 | const_cast<const BaseMultiQueue *>(this)->localReader(); | |
207 | return const_cast<QueueReader &>(reader); | |
208 | } | |
209 | ||
210 | Ipc::QueueReader & | |
9d4e9cfb AR |
211 | Ipc::BaseMultiQueue::remoteReader(const int remoteProcessId) |
212 | { | |
807feb1d DK |
213 | const QueueReader &reader = |
214 | const_cast<const BaseMultiQueue *>(this)->remoteReader(remoteProcessId); | |
215 | return const_cast<QueueReader &>(reader); | |
216 | } | |
217 | ||
f5591061 | 218 | // FewToFewBiQueue |
948230de | 219 | |
f5591061 DK |
220 | Ipc::FewToFewBiQueue::Owner * |
221 | Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity) | |
68353d5a | 222 | { |
f5591061 | 223 | return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity); |
68353d5a DK |
224 | } |
225 | ||
f5591061 | 226 | Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId): |
f53969cc SM |
227 | BaseMultiQueue(aLocalProcessId), |
228 | metadata(shm_old(Metadata)(MetadataId(id).termedBuf())), | |
229 | queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())), | |
230 | readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())), | |
231 | theLocalGroup(aLocalGroup) | |
948230de | 232 | { |
f5591061 DK |
233 | Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2); |
234 | Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize); | |
68353d5a | 235 | |
539283df | 236 | debugs(54, 7, "queue " << id << " reader: " << localReader().id); |
948230de DK |
237 | } |
238 | ||
ea2cdeb6 DK |
239 | int |
240 | Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity) | |
241 | { | |
242 | return capacity * groupASize * groupBSize * 2; | |
243 | } | |
244 | ||
f5591061 DK |
245 | bool |
246 | Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const | |
247 | { | |
248 | switch (group) { | |
249 | case groupA: | |
250 | return metadata->theGroupAIdOffset <= processId && | |
9199139f | 251 | processId < metadata->theGroupAIdOffset + metadata->theGroupASize; |
f5591061 DK |
252 | case groupB: |
253 | return metadata->theGroupBIdOffset <= processId && | |
9199139f | 254 | processId < metadata->theGroupBIdOffset + metadata->theGroupBSize; |
f5591061 DK |
255 | } |
256 | return false; | |
257 | } | |
258 | ||
0a11e039 AR |
259 | int |
260 | Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const | |
f5591061 DK |
261 | { |
262 | Must(fromGroup != toGroup); | |
0a11e039 AR |
263 | assert(validProcessId(fromGroup, fromProcessId)); |
264 | assert(validProcessId(toGroup, toProcessId)); | |
f5591061 DK |
265 | int index1; |
266 | int index2; | |
267 | int offset; | |
268 | if (fromGroup == groupA) { | |
269 | index1 = fromProcessId - metadata->theGroupAIdOffset; | |
270 | index2 = toProcessId - metadata->theGroupBIdOffset; | |
271 | offset = 0; | |
272 | } else { | |
273 | index1 = toProcessId - metadata->theGroupAIdOffset; | |
274 | index2 = fromProcessId - metadata->theGroupBIdOffset; | |
275 | offset = metadata->theGroupASize * metadata->theGroupBSize; | |
276 | } | |
277 | const int index = offset + index1 * metadata->theGroupBSize + index2; | |
0a11e039 AR |
278 | return index; |
279 | } | |
280 | ||
0a11e039 AR |
281 | const Ipc::OneToOneUniQueue & |
282 | Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const | |
283 | { | |
284 | return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)]; | |
948230de DK |
285 | } |
286 | ||
55939a01 AR |
287 | const Ipc::OneToOneUniQueue & |
288 | Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const | |
289 | { | |
290 | return oneToOneQueue(remoteGroup(), remoteProcessId, | |
291 | theLocalGroup, theLocalProcessId); | |
292 | } | |
293 | ||
55939a01 AR |
294 | const Ipc::OneToOneUniQueue & |
295 | Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const | |
296 | { | |
297 | return oneToOneQueue(theLocalGroup, theLocalProcessId, | |
298 | remoteGroup(), remoteProcessId); | |
299 | } | |
300 | ||
df881a0f AR |
301 | int |
302 | Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const | |
303 | { | |
304 | Must(validProcessId(group, processId)); | |
305 | return group == groupA ? | |
306 | processId - metadata->theGroupAIdOffset : | |
307 | metadata->theGroupASize + processId - metadata->theGroupBIdOffset; | |
308 | } | |
309 | ||
807feb1d DK |
310 | const Ipc::QueueReader & |
311 | Ipc::FewToFewBiQueue::localReader() const | |
948230de | 312 | { |
807feb1d | 313 | return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)]; |
df881a0f AR |
314 | } |
315 | ||
316 | const Ipc::QueueReader & | |
807feb1d | 317 | Ipc::FewToFewBiQueue::remoteReader(const int processId) const |
df881a0f | 318 | { |
807feb1d | 319 | return readers->theReaders[readerIndex(remoteGroup(), processId)]; |
948230de | 320 | } |
fa61cefe | 321 | |
807feb1d DK |
322 | int |
323 | Ipc::FewToFewBiQueue::remotesCount() const | |
fa61cefe | 324 | { |
807feb1d | 325 | return theLocalGroup == groupA ? metadata->theGroupBSize : |
9d4e9cfb | 326 | metadata->theGroupASize; |
807feb1d | 327 | } |
fa61cefe | 328 | |
807feb1d DK |
329 | int |
330 | Ipc::FewToFewBiQueue::remotesIdOffset() const | |
331 | { | |
332 | return theLocalGroup == groupA ? metadata->theGroupBIdOffset : | |
9d4e9cfb | 333 | metadata->theGroupAIdOffset; |
807feb1d | 334 | } |
fa61cefe | 335 | |
807feb1d | 336 | Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset): |
f53969cc SM |
337 | theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset), |
338 | theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset) | |
807feb1d DK |
339 | { |
340 | Must(theGroupASize > 0); | |
341 | Must(theGroupBSize > 0); | |
342 | } | |
343 | ||
344 | Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity): | |
f53969cc SM |
345 | metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)), |
346 | queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)), | |
347 | readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize)) | |
807feb1d | 348 | { |
68353d5a DK |
349 | } |
350 | ||
807feb1d | 351 | Ipc::FewToFewBiQueue::Owner::~Owner() |
df881a0f | 352 | { |
807feb1d DK |
353 | delete metadataOwner; |
354 | delete queuesOwner; | |
355 | delete readersOwner; | |
df881a0f AR |
356 | } |
357 | ||
807feb1d DK |
358 | // MultiQueue |
359 | ||
360 | Ipc::MultiQueue::Owner * | |
361 | Ipc::MultiQueue::Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity) | |
55939a01 | 362 | { |
807feb1d | 363 | return new Owner(id, processCount, processIdOffset, maxItemSize, capacity); |
55939a01 AR |
364 | } |
365 | ||
807feb1d | 366 | Ipc::MultiQueue::MultiQueue(const String &id, const int localProcessId): |
f53969cc SM |
367 | BaseMultiQueue(localProcessId), |
368 | metadata(shm_old(Metadata)(MetadataId(id).termedBuf())), | |
369 | queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())), | |
370 | readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())) | |
df881a0f | 371 | { |
807feb1d DK |
372 | Must(queues->theCapacity == metadata->theProcessCount * metadata->theProcessCount); |
373 | Must(readers->theCapacity == metadata->theProcessCount); | |
374 | ||
539283df | 375 | debugs(54, 7, "queue " << id << " reader: " << localReader().id); |
55939a01 AR |
376 | } |
377 | ||
807feb1d DK |
378 | bool |
379 | Ipc::MultiQueue::validProcessId(const int processId) const | |
55939a01 | 380 | { |
807feb1d | 381 | return metadata->theProcessIdOffset <= processId && |
9d4e9cfb | 382 | processId < metadata->theProcessIdOffset + metadata->theProcessCount; |
df881a0f AR |
383 | } |
384 | ||
807feb1d DK |
385 | const Ipc::OneToOneUniQueue & |
386 | Ipc::MultiQueue::oneToOneQueue(const int fromProcessId, const int toProcessId) const | |
f5591061 | 387 | { |
807feb1d DK |
388 | assert(validProcessId(fromProcessId)); |
389 | assert(validProcessId(toProcessId)); | |
390 | const int fromIndex = fromProcessId - metadata->theProcessIdOffset; | |
391 | const int toIndex = toProcessId - metadata->theProcessIdOffset; | |
392 | const int index = fromIndex * metadata->theProcessCount + toIndex; | |
393 | return (*queues)[index]; | |
f5591061 DK |
394 | } |
395 | ||
807feb1d DK |
396 | const Ipc::QueueReader & |
397 | Ipc::MultiQueue::reader(const int processId) const | |
68353d5a | 398 | { |
807feb1d DK |
399 | assert(validProcessId(processId)); |
400 | const int index = processId - metadata->theProcessIdOffset; | |
401 | return readers->theReaders[index]; | |
68353d5a DK |
402 | } |
403 | ||
807feb1d DK |
404 | const Ipc::OneToOneUniQueue & |
405 | Ipc::MultiQueue::inQueue(const int remoteProcessId) const | |
406 | { | |
407 | return oneToOneQueue(remoteProcessId, theLocalProcessId); | |
408 | } | |
409 | ||
410 | const Ipc::OneToOneUniQueue & | |
411 | Ipc::MultiQueue::outQueue(const int remoteProcessId) const | |
412 | { | |
413 | return oneToOneQueue(theLocalProcessId, remoteProcessId); | |
414 | } | |
415 | ||
416 | const Ipc::QueueReader & | |
417 | Ipc::MultiQueue::localReader() const | |
418 | { | |
419 | return reader(theLocalProcessId); | |
420 | } | |
421 | ||
422 | const Ipc::QueueReader & | |
423 | Ipc::MultiQueue::remoteReader(const int processId) const | |
424 | { | |
425 | return reader(processId); | |
426 | } | |
427 | ||
428 | int | |
429 | Ipc::MultiQueue::remotesCount() const | |
430 | { | |
431 | return metadata->theProcessCount; | |
432 | } | |
433 | ||
434 | int | |
435 | Ipc::MultiQueue::remotesIdOffset() const | |
436 | { | |
437 | return metadata->theProcessIdOffset; | |
438 | } | |
439 | ||
440 | Ipc::MultiQueue::Metadata::Metadata(const int aProcessCount, const int aProcessIdOffset): | |
f53969cc | 441 | theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset) |
807feb1d DK |
442 | { |
443 | Must(theProcessCount > 0); | |
444 | } | |
445 | ||
446 | Ipc::MultiQueue::Owner::Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity): | |
f53969cc SM |
447 | metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), processCount, processIdOffset)), |
448 | queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), processCount*processCount, maxItemSize, capacity)), | |
449 | readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), processCount)) | |
807feb1d DK |
450 | { |
451 | } | |
452 | ||
453 | Ipc::MultiQueue::Owner::~Owner() | |
68353d5a | 454 | { |
f5591061 DK |
455 | delete metadataOwner; |
456 | delete queuesOwner; | |
68353d5a | 457 | delete readersOwner; |
fa61cefe | 458 | } |
f53969cc | 459 |