]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ipc/Queue.cc
Renamed squid.h to squid-old.h and config.h to squid.h
[thirdparty/squid.git] / src / ipc / Queue.cc
CommitLineData
948230de
DK
1/*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
f7f3304a 8#include "squid.h"
fa61cefe
AR
9#include "base/TextException.h"
10#include "Debug.h"
11#include "globals.h"
948230de
DK
12#include "ipc/Queue.h"
13
f5591061 14/// constructs Metadata ID from parent queue ID
948230de 15static String
f5591061 16MetadataId(String id)
948230de 17{
f5591061 18 id.append("__metadata");
948230de
DK
19 return id;
20}
21
f5591061 22/// constructs one-to-one queues ID from parent queue ID
fa61cefe 23static String
f5591061
DK
24QueuesId(String id)
25{
26 id.append("__queues");
27 return id;
28}
29
30/// constructs QueueReaders ID from parent queue ID
31static String
32ReadersId(String id)
fa61cefe
AR
33{
34 id.append("__readers");
35 return id;
36}
37
38
39/* QueueReader */
40
15cdbc7c 41InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
fa61cefe 42
df881a0f
AR
43Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0),
44 rateLimit(0), balance(0)
fa61cefe
AR
45{
46 debugs(54, 7, HERE << "constructed " << id);
47}
48
68353d5a
DK
49/* QueueReaders */
50
15cdbc7c 51Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity)
68353d5a
DK
52{
53 Must(theCapacity > 0);
54 new (theReaders) QueueReader[theCapacity];
55}
56
57size_t
15cdbc7c 58Ipc::QueueReaders::sharedMemorySize() const
68353d5a
DK
59{
60 return SharedMemorySize(theCapacity);
61}
62
63size_t
15cdbc7c 64Ipc::QueueReaders::SharedMemorySize(const int capacity)
68353d5a
DK
65{
66 return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
67}
68
948230de
DK
69
70// OneToOneUniQueue
71
f5591061 72Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
9199139f
AR
73 theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
74 theCapacity(aCapacity)
fa61cefe 75{
f5591061
DK
76 Must(theMaxItemSize > 0);
77 Must(theCapacity > 0);
fa61cefe
AR
78}
79
948230de 80int
15cdbc7c 81Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
948230de
DK
82{
83 assert(maxItemSize > 0);
f5591061 84 size -= sizeof(OneToOneUniQueue);
948230de
DK
85 return size >= 0 ? size / maxItemSize : 0;
86}
87
88int
15cdbc7c 89Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
948230de
DK
90{
91 assert(size >= 0);
f5591061 92 return sizeof(OneToOneUniQueue) + maxItemSize * size;
948230de
DK
93}
94
68353d5a 95
f5591061 96/* OneToOneUniQueues */
948230de 97
f5591061 98Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
fa61cefe 99{
f5591061
DK
100 Must(theCapacity > 0);
101 for (int i = 0; i < theCapacity; ++i)
102 new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
68353d5a
DK
103}
104
105size_t
f5591061 106Ipc::OneToOneUniQueues::sharedMemorySize() const
68353d5a 107{
f5591061 108 return sizeof(*this) + theCapacity * front().sharedMemorySize();
fa61cefe
AR
109}
110
f5591061
DK
111size_t
112Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
fa61cefe 113{
f5591061
DK
114 const int queueSize =
115 OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
116 return sizeof(OneToOneUniQueues) + queueSize * capacity;
fa61cefe
AR
117}
118
f5591061
DK
119const Ipc::OneToOneUniQueue &
120Ipc::OneToOneUniQueues::operator [](const int index) const
fa61cefe 121{
f5591061
DK
122 Must(0 <= index && index < theCapacity);
123 const size_t queueSize = index ? front().sharedMemorySize() : 0;
124 const char *const queue =
125 reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
126 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
fa61cefe
AR
127}
128
948230de 129
f5591061 130// FewToFewBiQueue
948230de 131
f5591061
DK
132Ipc::FewToFewBiQueue::Owner *
133Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
68353d5a 134{
f5591061 135 return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
68353d5a
DK
136}
137
f5591061 138Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
9199139f
AR
139 metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
140 queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
141 readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
142 theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
143 theLastPopProcessId(readers->theCapacity)
948230de 144{
f5591061
DK
145 Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
146 Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
68353d5a 147
f5591061
DK
148 const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
149 debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id);
948230de
DK
150}
151
ea2cdeb6
DK
152int
153Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
154{
155 return capacity * groupASize * groupBSize * 2;
156}
157
f5591061
DK
158bool
159Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
160{
161 switch (group) {
162 case groupA:
163 return metadata->theGroupAIdOffset <= processId &&
9199139f 164 processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
f5591061
DK
165 case groupB:
166 return metadata->theGroupBIdOffset <= processId &&
9199139f 167 processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
f5591061
DK
168 }
169 return false;
170}
171
0a11e039
AR
172int
173Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
f5591061
DK
174{
175 Must(fromGroup != toGroup);
0a11e039
AR
176 assert(validProcessId(fromGroup, fromProcessId));
177 assert(validProcessId(toGroup, toProcessId));
f5591061
DK
178 int index1;
179 int index2;
180 int offset;
181 if (fromGroup == groupA) {
182 index1 = fromProcessId - metadata->theGroupAIdOffset;
183 index2 = toProcessId - metadata->theGroupBIdOffset;
184 offset = 0;
185 } else {
186 index1 = toProcessId - metadata->theGroupAIdOffset;
187 index2 = fromProcessId - metadata->theGroupBIdOffset;
188 offset = metadata->theGroupASize * metadata->theGroupBSize;
189 }
190 const int index = offset + index1 * metadata->theGroupBSize + index2;
0a11e039
AR
191 return index;
192}
193
194Ipc::OneToOneUniQueue &
195Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
196{
197 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
198}
199
200const Ipc::OneToOneUniQueue &
201Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
202{
203 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
948230de
DK
204}
205
55939a01
AR
206/// incoming queue from a given remote process
207const Ipc::OneToOneUniQueue &
208Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
209{
210 return oneToOneQueue(remoteGroup(), remoteProcessId,
211 theLocalGroup, theLocalProcessId);
212}
213
214/// outgoing queue to a given remote process
215const Ipc::OneToOneUniQueue &
216Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
217{
218 return oneToOneQueue(theLocalGroup, theLocalProcessId,
219 remoteGroup(), remoteProcessId);
220}
221
df881a0f
AR
222int
223Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
224{
225 Must(validProcessId(group, processId));
226 return group == groupA ?
227 processId - metadata->theGroupAIdOffset :
228 metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
229}
230
f5591061
DK
231Ipc::QueueReader &
232Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
948230de 233{
df881a0f
AR
234 return readers->theReaders[readerIndex(group, processId)];
235}
236
237const Ipc::QueueReader &
238Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const
239{
240 return readers->theReaders[readerIndex(group, processId)];
948230de 241}
fa61cefe
AR
242
243void
f5591061 244Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId)
fa61cefe 245{
f5591061
DK
246 QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
247 debugs(54, 7, HERE << "reader: " << localReader.id);
fa61cefe 248
f5591061
DK
249 Must(validProcessId(remoteGroup(), remoteProcessId));
250 localReader.clearSignal();
fa61cefe
AR
251
252 // we got a hint; we could reposition iteration to try popping from the
f5591061 253 // remoteProcessId queue first; but it does not seem to help much and might
fa61cefe 254 // introduce some bias so we do not do that for now:
f5591061 255 // theLastPopProcessId = remoteProcessId;
68353d5a
DK
256}
257
df881a0f
AR
258Ipc::QueueReader::Balance &
259Ipc::FewToFewBiQueue::localBalance()
260{
261 QueueReader &r = reader(theLocalGroup, theLocalProcessId);
262 return r.balance;
263}
264
55939a01
AR
265const Ipc::QueueReader::Balance &
266Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const
267{
268 const QueueReader &r = reader(remoteGroup(), remoteProcessId);
269 return r.balance;
270}
271
df881a0f
AR
272Ipc::QueueReader::Rate &
273Ipc::FewToFewBiQueue::localRateLimit()
274{
275 QueueReader &r = reader(theLocalGroup, theLocalProcessId);
276 return r.rateLimit;
55939a01
AR
277}
278
279const Ipc::QueueReader::Rate &
280Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const
281{
282 const QueueReader &r = reader(remoteGroup(), remoteProcessId);
283 return r.rateLimit;
df881a0f
AR
284}
285
f5591061 286Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
9199139f
AR
287 theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
288 theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
f5591061
DK
289{
290 Must(theGroupASize > 0);
291 Must(theGroupBSize > 0);
292}
293
294Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
9199139f
AR
295 metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
296 queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
297 readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
68353d5a 298{
68353d5a
DK
299}
300
f5591061 301Ipc::FewToFewBiQueue::Owner::~Owner()
68353d5a 302{
f5591061
DK
303 delete metadataOwner;
304 delete queuesOwner;
68353d5a 305 delete readersOwner;
fa61cefe 306}