]>
Commit | Line | Data |
---|---|---|
948230de | 1 | /* |
5b74111a | 2 | * Copyright (C) 1996-2018 The Squid Software Foundation and contributors |
948230de | 3 | * |
bbc27441 AJ |
4 | * Squid software is distributed under GPLv2+ license and includes |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
948230de DK |
7 | */ |
8 | ||
bbc27441 AJ |
9 | /* DEBUG: section 54 Interprocess Communication */ |
10 | ||
f7f3304a | 11 | #include "squid.h" |
fa61cefe AR |
12 | #include "base/TextException.h" |
13 | #include "Debug.h" | |
14 | #include "globals.h" | |
948230de DK |
15 | #include "ipc/Queue.h" |
16 | ||
807feb1d DK |
17 | #include <limits> |
18 | ||
f5591061 | 19 | /// constructs Metadata ID from parent queue ID |
948230de | 20 | static String |
f5591061 | 21 | MetadataId(String id) |
948230de | 22 | { |
f5591061 | 23 | id.append("__metadata"); |
948230de DK |
24 | return id; |
25 | } | |
26 | ||
f5591061 | 27 | /// constructs one-to-one queues ID from parent queue ID |
fa61cefe | 28 | static String |
f5591061 DK |
29 | QueuesId(String id) |
30 | { | |
31 | id.append("__queues"); | |
32 | return id; | |
33 | } | |
34 | ||
35 | /// constructs QueueReaders ID from parent queue ID | |
36 | static String | |
37 | ReadersId(String id) | |
fa61cefe AR |
38 | { |
39 | id.append("__readers"); | |
40 | return id; | |
41 | } | |
42 | ||
fa61cefe AR |
43 | /* QueueReader */ |
44 | ||
15cdbc7c | 45 | InstanceIdDefinitions(Ipc::QueueReader, "ipcQR"); |
fa61cefe | 46 | |
6c6a656a | 47 | Ipc::QueueReader::QueueReader(): popBlocked(true), popSignal(false), |
f53969cc | 48 | rateLimit(0), balance(0) |
fa61cefe AR |
49 | { |
50 | debugs(54, 7, HERE << "constructed " << id); | |
51 | } | |
52 | ||
68353d5a DK |
53 | /* QueueReaders */ |
54 | ||
3a8c5551 | 55 | Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity), |
f53969cc | 56 | theReaders(theCapacity) |
68353d5a DK |
57 | { |
58 | Must(theCapacity > 0); | |
68353d5a DK |
59 | } |
60 | ||
61 | size_t | |
15cdbc7c | 62 | Ipc::QueueReaders::sharedMemorySize() const |
68353d5a DK |
63 | { |
64 | return SharedMemorySize(theCapacity); | |
65 | } | |
66 | ||
67 | size_t | |
15cdbc7c | 68 | Ipc::QueueReaders::SharedMemorySize(const int capacity) |
68353d5a DK |
69 | { |
70 | return sizeof(QueueReaders) + sizeof(QueueReader) * capacity; | |
71 | } | |
72 | ||
948230de DK |
73 | // OneToOneUniQueue |
74 | ||
f5591061 | 75 | Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity): |
f53969cc SM |
76 | theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize), |
77 | theCapacity(aCapacity) | |
fa61cefe | 78 | { |
f5591061 DK |
79 | Must(theMaxItemSize > 0); |
80 | Must(theCapacity > 0); | |
fa61cefe AR |
81 | } |
82 | ||
948230de | 83 | int |
15cdbc7c | 84 | Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size) |
948230de DK |
85 | { |
86 | assert(maxItemSize > 0); | |
f5591061 | 87 | size -= sizeof(OneToOneUniQueue); |
948230de DK |
88 | return size >= 0 ? size / maxItemSize : 0; |
89 | } | |
90 | ||
91 | int | |
15cdbc7c | 92 | Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size) |
948230de DK |
93 | { |
94 | assert(size >= 0); | |
f5591061 | 95 | return sizeof(OneToOneUniQueue) + maxItemSize * size; |
948230de DK |
96 | } |
97 | ||
f5591061 | 98 | /* OneToOneUniQueues */ |
948230de | 99 | |
f5591061 | 100 | Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity) |
fa61cefe | 101 | { |
f5591061 DK |
102 | Must(theCapacity > 0); |
103 | for (int i = 0; i < theCapacity; ++i) | |
104 | new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity); | |
68353d5a DK |
105 | } |
106 | ||
107 | size_t | |
f5591061 | 108 | Ipc::OneToOneUniQueues::sharedMemorySize() const |
68353d5a | 109 | { |
f5591061 | 110 | return sizeof(*this) + theCapacity * front().sharedMemorySize(); |
fa61cefe AR |
111 | } |
112 | ||
f5591061 DK |
113 | size_t |
114 | Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity) | |
fa61cefe | 115 | { |
f5591061 DK |
116 | const int queueSize = |
117 | OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity); | |
118 | return sizeof(OneToOneUniQueues) + queueSize * capacity; | |
fa61cefe AR |
119 | } |
120 | ||
f5591061 DK |
121 | const Ipc::OneToOneUniQueue & |
122 | Ipc::OneToOneUniQueues::operator [](const int index) const | |
fa61cefe | 123 | { |
f5591061 DK |
124 | Must(0 <= index && index < theCapacity); |
125 | const size_t queueSize = index ? front().sharedMemorySize() : 0; | |
126 | const char *const queue = | |
127 | reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize; | |
128 | return *reinterpret_cast<const OneToOneUniQueue *>(queue); | |
fa61cefe AR |
129 | } |
130 | ||
807feb1d DK |
131 | // BaseMultiQueue |
132 | ||
133 | Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId): | |
f53969cc SM |
134 | theLocalProcessId(aLocalProcessId), |
135 | theLastPopProcessId(std::numeric_limits<int>::max() - 1) | |
807feb1d DK |
136 | { |
137 | } | |
138 | ||
139 | void | |
ced8def3 | 140 | Ipc::BaseMultiQueue::clearReaderSignal(const int /*remoteProcessId*/) |
807feb1d DK |
141 | { |
142 | QueueReader &reader = localReader(); | |
539283df | 143 | debugs(54, 7, "reader: " << reader.id); |
807feb1d DK |
144 | |
145 | reader.clearSignal(); | |
146 | ||
147 | // we got a hint; we could reposition iteration to try popping from the | |
148 | // remoteProcessId queue first; but it does not seem to help much and might | |
149 | // introduce some bias so we do not do that for now: | |
150 | // theLastPopProcessId = remoteProcessId; | |
151 | } | |
152 | ||
153 | const Ipc::QueueReader::Balance & | |
154 | Ipc::BaseMultiQueue::balance(const int remoteProcessId) const | |
155 | { | |
156 | const QueueReader &r = remoteReader(remoteProcessId); | |
157 | return r.balance; | |
158 | } | |
159 | ||
160 | const Ipc::QueueReader::Rate & | |
161 | Ipc::BaseMultiQueue::rateLimit(const int remoteProcessId) const | |
162 | { | |
163 | const QueueReader &r = remoteReader(remoteProcessId); | |
164 | return r.rateLimit; | |
165 | } | |
166 | ||
167 | Ipc::OneToOneUniQueue & | |
9d4e9cfb AR |
168 | Ipc::BaseMultiQueue::inQueue(const int remoteProcessId) |
169 | { | |
807feb1d DK |
170 | const OneToOneUniQueue &queue = |
171 | const_cast<const BaseMultiQueue *>(this)->inQueue(remoteProcessId); | |
172 | return const_cast<OneToOneUniQueue &>(queue); | |
173 | } | |
174 | ||
175 | Ipc::OneToOneUniQueue & | |
9d4e9cfb AR |
176 | Ipc::BaseMultiQueue::outQueue(const int remoteProcessId) |
177 | { | |
807feb1d DK |
178 | const OneToOneUniQueue &queue = |
179 | const_cast<const BaseMultiQueue *>(this)->outQueue(remoteProcessId); | |
180 | return const_cast<OneToOneUniQueue &>(queue); | |
181 | } | |
182 | ||
183 | Ipc::QueueReader & | |
9d4e9cfb AR |
184 | Ipc::BaseMultiQueue::localReader() |
185 | { | |
807feb1d DK |
186 | const QueueReader &reader = |
187 | const_cast<const BaseMultiQueue *>(this)->localReader(); | |
188 | return const_cast<QueueReader &>(reader); | |
189 | } | |
190 | ||
191 | Ipc::QueueReader & | |
9d4e9cfb AR |
192 | Ipc::BaseMultiQueue::remoteReader(const int remoteProcessId) |
193 | { | |
807feb1d DK |
194 | const QueueReader &reader = |
195 | const_cast<const BaseMultiQueue *>(this)->remoteReader(remoteProcessId); | |
196 | return const_cast<QueueReader &>(reader); | |
197 | } | |
198 | ||
f5591061 | 199 | // FewToFewBiQueue |
948230de | 200 | |
f5591061 DK |
201 | Ipc::FewToFewBiQueue::Owner * |
202 | Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity) | |
68353d5a | 203 | { |
f5591061 | 204 | return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity); |
68353d5a DK |
205 | } |
206 | ||
f5591061 | 207 | Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId): |
f53969cc SM |
208 | BaseMultiQueue(aLocalProcessId), |
209 | metadata(shm_old(Metadata)(MetadataId(id).termedBuf())), | |
210 | queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())), | |
211 | readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())), | |
212 | theLocalGroup(aLocalGroup) | |
948230de | 213 | { |
f5591061 DK |
214 | Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2); |
215 | Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize); | |
68353d5a | 216 | |
539283df | 217 | debugs(54, 7, "queue " << id << " reader: " << localReader().id); |
948230de DK |
218 | } |
219 | ||
ea2cdeb6 DK |
220 | int |
221 | Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity) | |
222 | { | |
223 | return capacity * groupASize * groupBSize * 2; | |
224 | } | |
225 | ||
f5591061 DK |
226 | bool |
227 | Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const | |
228 | { | |
229 | switch (group) { | |
230 | case groupA: | |
231 | return metadata->theGroupAIdOffset <= processId && | |
9199139f | 232 | processId < metadata->theGroupAIdOffset + metadata->theGroupASize; |
f5591061 DK |
233 | case groupB: |
234 | return metadata->theGroupBIdOffset <= processId && | |
9199139f | 235 | processId < metadata->theGroupBIdOffset + metadata->theGroupBSize; |
f5591061 DK |
236 | } |
237 | return false; | |
238 | } | |
239 | ||
0a11e039 AR |
240 | int |
241 | Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const | |
f5591061 DK |
242 | { |
243 | Must(fromGroup != toGroup); | |
0a11e039 AR |
244 | assert(validProcessId(fromGroup, fromProcessId)); |
245 | assert(validProcessId(toGroup, toProcessId)); | |
f5591061 DK |
246 | int index1; |
247 | int index2; | |
248 | int offset; | |
249 | if (fromGroup == groupA) { | |
250 | index1 = fromProcessId - metadata->theGroupAIdOffset; | |
251 | index2 = toProcessId - metadata->theGroupBIdOffset; | |
252 | offset = 0; | |
253 | } else { | |
254 | index1 = toProcessId - metadata->theGroupAIdOffset; | |
255 | index2 = fromProcessId - metadata->theGroupBIdOffset; | |
256 | offset = metadata->theGroupASize * metadata->theGroupBSize; | |
257 | } | |
258 | const int index = offset + index1 * metadata->theGroupBSize + index2; | |
0a11e039 AR |
259 | return index; |
260 | } | |
261 | ||
0a11e039 AR |
262 | const Ipc::OneToOneUniQueue & |
263 | Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const | |
264 | { | |
265 | return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)]; | |
948230de DK |
266 | } |
267 | ||
55939a01 AR |
268 | const Ipc::OneToOneUniQueue & |
269 | Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const | |
270 | { | |
271 | return oneToOneQueue(remoteGroup(), remoteProcessId, | |
272 | theLocalGroup, theLocalProcessId); | |
273 | } | |
274 | ||
55939a01 AR |
275 | const Ipc::OneToOneUniQueue & |
276 | Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const | |
277 | { | |
278 | return oneToOneQueue(theLocalGroup, theLocalProcessId, | |
279 | remoteGroup(), remoteProcessId); | |
280 | } | |
281 | ||
df881a0f AR |
282 | int |
283 | Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const | |
284 | { | |
285 | Must(validProcessId(group, processId)); | |
286 | return group == groupA ? | |
287 | processId - metadata->theGroupAIdOffset : | |
288 | metadata->theGroupASize + processId - metadata->theGroupBIdOffset; | |
289 | } | |
290 | ||
807feb1d DK |
291 | const Ipc::QueueReader & |
292 | Ipc::FewToFewBiQueue::localReader() const | |
948230de | 293 | { |
807feb1d | 294 | return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)]; |
df881a0f AR |
295 | } |
296 | ||
297 | const Ipc::QueueReader & | |
807feb1d | 298 | Ipc::FewToFewBiQueue::remoteReader(const int processId) const |
df881a0f | 299 | { |
807feb1d | 300 | return readers->theReaders[readerIndex(remoteGroup(), processId)]; |
948230de | 301 | } |
fa61cefe | 302 | |
807feb1d DK |
303 | int |
304 | Ipc::FewToFewBiQueue::remotesCount() const | |
fa61cefe | 305 | { |
807feb1d | 306 | return theLocalGroup == groupA ? metadata->theGroupBSize : |
9d4e9cfb | 307 | metadata->theGroupASize; |
807feb1d | 308 | } |
fa61cefe | 309 | |
807feb1d DK |
310 | int |
311 | Ipc::FewToFewBiQueue::remotesIdOffset() const | |
312 | { | |
313 | return theLocalGroup == groupA ? metadata->theGroupBIdOffset : | |
9d4e9cfb | 314 | metadata->theGroupAIdOffset; |
807feb1d | 315 | } |
fa61cefe | 316 | |
807feb1d | 317 | Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset): |
f53969cc SM |
318 | theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset), |
319 | theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset) | |
807feb1d DK |
320 | { |
321 | Must(theGroupASize > 0); | |
322 | Must(theGroupBSize > 0); | |
323 | } | |
324 | ||
325 | Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity): | |
f53969cc SM |
326 | metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)), |
327 | queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)), | |
328 | readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize)) | |
807feb1d | 329 | { |
68353d5a DK |
330 | } |
331 | ||
807feb1d | 332 | Ipc::FewToFewBiQueue::Owner::~Owner() |
df881a0f | 333 | { |
807feb1d DK |
334 | delete metadataOwner; |
335 | delete queuesOwner; | |
336 | delete readersOwner; | |
df881a0f AR |
337 | } |
338 | ||
807feb1d DK |
339 | // MultiQueue |
340 | ||
341 | Ipc::MultiQueue::Owner * | |
342 | Ipc::MultiQueue::Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity) | |
55939a01 | 343 | { |
807feb1d | 344 | return new Owner(id, processCount, processIdOffset, maxItemSize, capacity); |
55939a01 AR |
345 | } |
346 | ||
807feb1d | 347 | Ipc::MultiQueue::MultiQueue(const String &id, const int localProcessId): |
f53969cc SM |
348 | BaseMultiQueue(localProcessId), |
349 | metadata(shm_old(Metadata)(MetadataId(id).termedBuf())), | |
350 | queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())), | |
351 | readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())) | |
df881a0f | 352 | { |
807feb1d DK |
353 | Must(queues->theCapacity == metadata->theProcessCount * metadata->theProcessCount); |
354 | Must(readers->theCapacity == metadata->theProcessCount); | |
355 | ||
539283df | 356 | debugs(54, 7, "queue " << id << " reader: " << localReader().id); |
55939a01 AR |
357 | } |
358 | ||
807feb1d DK |
359 | bool |
360 | Ipc::MultiQueue::validProcessId(const int processId) const | |
55939a01 | 361 | { |
807feb1d | 362 | return metadata->theProcessIdOffset <= processId && |
9d4e9cfb | 363 | processId < metadata->theProcessIdOffset + metadata->theProcessCount; |
df881a0f AR |
364 | } |
365 | ||
807feb1d DK |
366 | const Ipc::OneToOneUniQueue & |
367 | Ipc::MultiQueue::oneToOneQueue(const int fromProcessId, const int toProcessId) const | |
f5591061 | 368 | { |
807feb1d DK |
369 | assert(validProcessId(fromProcessId)); |
370 | assert(validProcessId(toProcessId)); | |
371 | const int fromIndex = fromProcessId - metadata->theProcessIdOffset; | |
372 | const int toIndex = toProcessId - metadata->theProcessIdOffset; | |
373 | const int index = fromIndex * metadata->theProcessCount + toIndex; | |
374 | return (*queues)[index]; | |
f5591061 DK |
375 | } |
376 | ||
807feb1d DK |
377 | const Ipc::QueueReader & |
378 | Ipc::MultiQueue::reader(const int processId) const | |
68353d5a | 379 | { |
807feb1d DK |
380 | assert(validProcessId(processId)); |
381 | const int index = processId - metadata->theProcessIdOffset; | |
382 | return readers->theReaders[index]; | |
68353d5a DK |
383 | } |
384 | ||
807feb1d DK |
385 | const Ipc::OneToOneUniQueue & |
386 | Ipc::MultiQueue::inQueue(const int remoteProcessId) const | |
387 | { | |
388 | return oneToOneQueue(remoteProcessId, theLocalProcessId); | |
389 | } | |
390 | ||
391 | const Ipc::OneToOneUniQueue & | |
392 | Ipc::MultiQueue::outQueue(const int remoteProcessId) const | |
393 | { | |
394 | return oneToOneQueue(theLocalProcessId, remoteProcessId); | |
395 | } | |
396 | ||
397 | const Ipc::QueueReader & | |
398 | Ipc::MultiQueue::localReader() const | |
399 | { | |
400 | return reader(theLocalProcessId); | |
401 | } | |
402 | ||
403 | const Ipc::QueueReader & | |
404 | Ipc::MultiQueue::remoteReader(const int processId) const | |
405 | { | |
406 | return reader(processId); | |
407 | } | |
408 | ||
409 | int | |
410 | Ipc::MultiQueue::remotesCount() const | |
411 | { | |
412 | return metadata->theProcessCount; | |
413 | } | |
414 | ||
415 | int | |
416 | Ipc::MultiQueue::remotesIdOffset() const | |
417 | { | |
418 | return metadata->theProcessIdOffset; | |
419 | } | |
420 | ||
421 | Ipc::MultiQueue::Metadata::Metadata(const int aProcessCount, const int aProcessIdOffset): | |
f53969cc | 422 | theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset) |
807feb1d DK |
423 | { |
424 | Must(theProcessCount > 0); | |
425 | } | |
426 | ||
427 | Ipc::MultiQueue::Owner::Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity): | |
f53969cc SM |
428 | metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), processCount, processIdOffset)), |
429 | queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), processCount*processCount, maxItemSize, capacity)), | |
430 | readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), processCount)) | |
807feb1d DK |
431 | { |
432 | } | |
433 | ||
434 | Ipc::MultiQueue::Owner::~Owner() | |
68353d5a | 435 | { |
f5591061 DK |
436 | delete metadataOwner; |
437 | delete queuesOwner; | |
68353d5a | 438 | delete readersOwner; |
fa61cefe | 439 | } |
f53969cc | 440 |