]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ipc/Queue.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / ipc / Queue.cc
CommitLineData
948230de
DK
1/*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
f7f3304a 8#include "squid.h"
fa61cefe
AR
9#include "base/TextException.h"
10#include "Debug.h"
11#include "globals.h"
948230de
DK
12#include "ipc/Queue.h"
13
f5591061 14/// constructs Metadata ID from parent queue ID
948230de 15static String
f5591061 16MetadataId(String id)
948230de 17{
f5591061 18 id.append("__metadata");
948230de
DK
19 return id;
20}
21
f5591061 22/// constructs one-to-one queues ID from parent queue ID
fa61cefe 23static String
f5591061
DK
24QueuesId(String id)
25{
26 id.append("__queues");
27 return id;
28}
29
30/// constructs QueueReaders ID from parent queue ID
31static String
32ReadersId(String id)
fa61cefe
AR
33{
34 id.append("__readers");
35 return id;
36}
37
fa61cefe
AR
38/* QueueReader */
39
15cdbc7c 40InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
fa61cefe 41
df881a0f
AR
42Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0),
43 rateLimit(0), balance(0)
fa61cefe
AR
44{
45 debugs(54, 7, HERE << "constructed " << id);
46}
47
68353d5a
DK
48/* QueueReaders */
49
15cdbc7c 50Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity)
68353d5a
DK
51{
52 Must(theCapacity > 0);
2cde0303
FC
53 theReaders=new QueueReader[theCapacity];
54}
55
56Ipc::QueueReaders::~QueueReaders()
57{
58 delete[] theReaders;
68353d5a
DK
59}
60
61size_t
15cdbc7c 62Ipc::QueueReaders::sharedMemorySize() const
68353d5a
DK
63{
64 return SharedMemorySize(theCapacity);
65}
66
67size_t
15cdbc7c 68Ipc::QueueReaders::SharedMemorySize(const int capacity)
68353d5a
DK
69{
70 return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
71}
72
948230de
DK
73// OneToOneUniQueue
74
f5591061 75Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
9199139f
AR
76 theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
77 theCapacity(aCapacity)
fa61cefe 78{
f5591061
DK
79 Must(theMaxItemSize > 0);
80 Must(theCapacity > 0);
fa61cefe
AR
81}
82
948230de 83int
15cdbc7c 84Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
948230de
DK
85{
86 assert(maxItemSize > 0);
f5591061 87 size -= sizeof(OneToOneUniQueue);
948230de
DK
88 return size >= 0 ? size / maxItemSize : 0;
89}
90
91int
15cdbc7c 92Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
948230de
DK
93{
94 assert(size >= 0);
f5591061 95 return sizeof(OneToOneUniQueue) + maxItemSize * size;
948230de
DK
96}
97
f5591061 98/* OneToOneUniQueues */
948230de 99
f5591061 100Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
fa61cefe 101{
f5591061
DK
102 Must(theCapacity > 0);
103 for (int i = 0; i < theCapacity; ++i)
104 new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
68353d5a
DK
105}
106
107size_t
f5591061 108Ipc::OneToOneUniQueues::sharedMemorySize() const
68353d5a 109{
f5591061 110 return sizeof(*this) + theCapacity * front().sharedMemorySize();
fa61cefe
AR
111}
112
f5591061
DK
113size_t
114Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
fa61cefe 115{
f5591061
DK
116 const int queueSize =
117 OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
118 return sizeof(OneToOneUniQueues) + queueSize * capacity;
fa61cefe
AR
119}
120
f5591061
DK
121const Ipc::OneToOneUniQueue &
122Ipc::OneToOneUniQueues::operator [](const int index) const
fa61cefe 123{
f5591061
DK
124 Must(0 <= index && index < theCapacity);
125 const size_t queueSize = index ? front().sharedMemorySize() : 0;
126 const char *const queue =
127 reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
128 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
fa61cefe
AR
129}
130
f5591061 131// FewToFewBiQueue
948230de 132
f5591061
DK
133Ipc::FewToFewBiQueue::Owner *
134Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
68353d5a 135{
f5591061 136 return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
68353d5a
DK
137}
138
f5591061 139Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
9199139f
AR
140 metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
141 queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
142 readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
143 theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
144 theLastPopProcessId(readers->theCapacity)
948230de 145{
f5591061
DK
146 Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
147 Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
68353d5a 148
f5591061
DK
149 const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
150 debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id);
948230de
DK
151}
152
ea2cdeb6
DK
153int
154Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
155{
156 return capacity * groupASize * groupBSize * 2;
157}
158
f5591061
DK
159bool
160Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
161{
162 switch (group) {
163 case groupA:
164 return metadata->theGroupAIdOffset <= processId &&
9199139f 165 processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
f5591061
DK
166 case groupB:
167 return metadata->theGroupBIdOffset <= processId &&
9199139f 168 processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
f5591061
DK
169 }
170 return false;
171}
172
0a11e039
AR
173int
174Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
f5591061
DK
175{
176 Must(fromGroup != toGroup);
0a11e039
AR
177 assert(validProcessId(fromGroup, fromProcessId));
178 assert(validProcessId(toGroup, toProcessId));
f5591061
DK
179 int index1;
180 int index2;
181 int offset;
182 if (fromGroup == groupA) {
183 index1 = fromProcessId - metadata->theGroupAIdOffset;
184 index2 = toProcessId - metadata->theGroupBIdOffset;
185 offset = 0;
186 } else {
187 index1 = toProcessId - metadata->theGroupAIdOffset;
188 index2 = fromProcessId - metadata->theGroupBIdOffset;
189 offset = metadata->theGroupASize * metadata->theGroupBSize;
190 }
191 const int index = offset + index1 * metadata->theGroupBSize + index2;
0a11e039
AR
192 return index;
193}
194
195Ipc::OneToOneUniQueue &
196Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
197{
198 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
199}
200
201const Ipc::OneToOneUniQueue &
202Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
203{
204 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
948230de
DK
205}
206
55939a01
AR
207/// incoming queue from a given remote process
208const Ipc::OneToOneUniQueue &
209Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
210{
211 return oneToOneQueue(remoteGroup(), remoteProcessId,
212 theLocalGroup, theLocalProcessId);
213}
214
215/// outgoing queue to a given remote process
216const Ipc::OneToOneUniQueue &
217Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
218{
219 return oneToOneQueue(theLocalGroup, theLocalProcessId,
220 remoteGroup(), remoteProcessId);
221}
222
df881a0f
AR
223int
224Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
225{
226 Must(validProcessId(group, processId));
227 return group == groupA ?
228 processId - metadata->theGroupAIdOffset :
229 metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
230}
231
f5591061
DK
232Ipc::QueueReader &
233Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
948230de 234{
df881a0f
AR
235 return readers->theReaders[readerIndex(group, processId)];
236}
237
238const Ipc::QueueReader &
239Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const
240{
241 return readers->theReaders[readerIndex(group, processId)];
948230de 242}
fa61cefe
AR
243
244void
f5591061 245Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId)
fa61cefe 246{
f5591061
DK
247 QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
248 debugs(54, 7, HERE << "reader: " << localReader.id);
fa61cefe 249
f5591061
DK
250 Must(validProcessId(remoteGroup(), remoteProcessId));
251 localReader.clearSignal();
fa61cefe
AR
252
253 // we got a hint; we could reposition iteration to try popping from the
f5591061 254 // remoteProcessId queue first; but it does not seem to help much and might
fa61cefe 255 // introduce some bias so we do not do that for now:
f5591061 256 // theLastPopProcessId = remoteProcessId;
68353d5a
DK
257}
258
df881a0f
AR
259Ipc::QueueReader::Balance &
260Ipc::FewToFewBiQueue::localBalance()
261{
262 QueueReader &r = reader(theLocalGroup, theLocalProcessId);
263 return r.balance;
264}
265
55939a01
AR
266const Ipc::QueueReader::Balance &
267Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const
268{
269 const QueueReader &r = reader(remoteGroup(), remoteProcessId);
270 return r.balance;
271}
272
df881a0f
AR
273Ipc::QueueReader::Rate &
274Ipc::FewToFewBiQueue::localRateLimit()
275{
276 QueueReader &r = reader(theLocalGroup, theLocalProcessId);
277 return r.rateLimit;
55939a01
AR
278}
279
280const Ipc::QueueReader::Rate &
281Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const
282{
283 const QueueReader &r = reader(remoteGroup(), remoteProcessId);
284 return r.rateLimit;
df881a0f
AR
285}
286
f5591061 287Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
9199139f
AR
288 theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
289 theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
f5591061
DK
290{
291 Must(theGroupASize > 0);
292 Must(theGroupBSize > 0);
293}
294
295Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
9199139f
AR
296 metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
297 queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
298 readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
68353d5a 299{
68353d5a
DK
300}
301
f5591061 302Ipc::FewToFewBiQueue::Owner::~Owner()
68353d5a 303{
f5591061
DK
304 delete metadataOwner;
305 delete queuesOwner;
68353d5a 306 delete readersOwner;
fa61cefe 307}