]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.cc
Improve support for clang compilers
[thirdparty/squid.git] / src / ipc / Queue.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8 #include "squid.h"
9 #include "base/TextException.h"
10 #include "Debug.h"
11 #include "globals.h"
12 #include "ipc/Queue.h"
13
14 /// constructs Metadata ID from parent queue ID
15 static String
16 MetadataId(String id)
17 {
18 id.append("__metadata");
19 return id;
20 }
21
22 /// constructs one-to-one queues ID from parent queue ID
23 static String
24 QueuesId(String id)
25 {
26 id.append("__queues");
27 return id;
28 }
29
30 /// constructs QueueReaders ID from parent queue ID
31 static String
32 ReadersId(String id)
33 {
34 id.append("__readers");
35 return id;
36 }
37
38
39 /* QueueReader */
40
41 InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
42
43 Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0),
44 rateLimit(0), balance(0)
45 {
46 debugs(54, 7, HERE << "constructed " << id);
47 }
48
49 /* QueueReaders */
50
51 Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity)
52 {
53 Must(theCapacity > 0);
54 theReaders=new QueueReader[theCapacity];
55 }
56
57 Ipc::QueueReaders::~QueueReaders()
58 {
59 delete[] theReaders;
60 }
61
62 size_t
63 Ipc::QueueReaders::sharedMemorySize() const
64 {
65 return SharedMemorySize(theCapacity);
66 }
67
68 size_t
69 Ipc::QueueReaders::SharedMemorySize(const int capacity)
70 {
71 return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
72 }
73
74
75 // OneToOneUniQueue
76
77 Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
78 theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
79 theCapacity(aCapacity)
80 {
81 Must(theMaxItemSize > 0);
82 Must(theCapacity > 0);
83 }
84
85 int
86 Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
87 {
88 assert(maxItemSize > 0);
89 size -= sizeof(OneToOneUniQueue);
90 return size >= 0 ? size / maxItemSize : 0;
91 }
92
93 int
94 Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
95 {
96 assert(size >= 0);
97 return sizeof(OneToOneUniQueue) + maxItemSize * size;
98 }
99
100
101 /* OneToOneUniQueues */
102
103 Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
104 {
105 Must(theCapacity > 0);
106 for (int i = 0; i < theCapacity; ++i)
107 new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
108 }
109
110 size_t
111 Ipc::OneToOneUniQueues::sharedMemorySize() const
112 {
113 return sizeof(*this) + theCapacity * front().sharedMemorySize();
114 }
115
116 size_t
117 Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
118 {
119 const int queueSize =
120 OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
121 return sizeof(OneToOneUniQueues) + queueSize * capacity;
122 }
123
124 const Ipc::OneToOneUniQueue &
125 Ipc::OneToOneUniQueues::operator [](const int index) const
126 {
127 Must(0 <= index && index < theCapacity);
128 const size_t queueSize = index ? front().sharedMemorySize() : 0;
129 const char *const queue =
130 reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
131 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
132 }
133
134
135 // FewToFewBiQueue
136
137 Ipc::FewToFewBiQueue::Owner *
138 Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
139 {
140 return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
141 }
142
143 Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
144 metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
145 queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
146 readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
147 theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
148 theLastPopProcessId(readers->theCapacity)
149 {
150 Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
151 Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
152
153 const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
154 debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id);
155 }
156
157 int
158 Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
159 {
160 return capacity * groupASize * groupBSize * 2;
161 }
162
163 bool
164 Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
165 {
166 switch (group) {
167 case groupA:
168 return metadata->theGroupAIdOffset <= processId &&
169 processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
170 case groupB:
171 return metadata->theGroupBIdOffset <= processId &&
172 processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
173 }
174 return false;
175 }
176
177 int
178 Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
179 {
180 Must(fromGroup != toGroup);
181 assert(validProcessId(fromGroup, fromProcessId));
182 assert(validProcessId(toGroup, toProcessId));
183 int index1;
184 int index2;
185 int offset;
186 if (fromGroup == groupA) {
187 index1 = fromProcessId - metadata->theGroupAIdOffset;
188 index2 = toProcessId - metadata->theGroupBIdOffset;
189 offset = 0;
190 } else {
191 index1 = toProcessId - metadata->theGroupAIdOffset;
192 index2 = fromProcessId - metadata->theGroupBIdOffset;
193 offset = metadata->theGroupASize * metadata->theGroupBSize;
194 }
195 const int index = offset + index1 * metadata->theGroupBSize + index2;
196 return index;
197 }
198
199 Ipc::OneToOneUniQueue &
200 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
201 {
202 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
203 }
204
205 const Ipc::OneToOneUniQueue &
206 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
207 {
208 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
209 }
210
211 /// incoming queue from a given remote process
212 const Ipc::OneToOneUniQueue &
213 Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
214 {
215 return oneToOneQueue(remoteGroup(), remoteProcessId,
216 theLocalGroup, theLocalProcessId);
217 }
218
219 /// outgoing queue to a given remote process
220 const Ipc::OneToOneUniQueue &
221 Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
222 {
223 return oneToOneQueue(theLocalGroup, theLocalProcessId,
224 remoteGroup(), remoteProcessId);
225 }
226
227 int
228 Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
229 {
230 Must(validProcessId(group, processId));
231 return group == groupA ?
232 processId - metadata->theGroupAIdOffset :
233 metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
234 }
235
236 Ipc::QueueReader &
237 Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
238 {
239 return readers->theReaders[readerIndex(group, processId)];
240 }
241
242 const Ipc::QueueReader &
243 Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const
244 {
245 return readers->theReaders[readerIndex(group, processId)];
246 }
247
248 void
249 Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId)
250 {
251 QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
252 debugs(54, 7, HERE << "reader: " << localReader.id);
253
254 Must(validProcessId(remoteGroup(), remoteProcessId));
255 localReader.clearSignal();
256
257 // we got a hint; we could reposition iteration to try popping from the
258 // remoteProcessId queue first; but it does not seem to help much and might
259 // introduce some bias so we do not do that for now:
260 // theLastPopProcessId = remoteProcessId;
261 }
262
263 Ipc::QueueReader::Balance &
264 Ipc::FewToFewBiQueue::localBalance()
265 {
266 QueueReader &r = reader(theLocalGroup, theLocalProcessId);
267 return r.balance;
268 }
269
270 const Ipc::QueueReader::Balance &
271 Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const
272 {
273 const QueueReader &r = reader(remoteGroup(), remoteProcessId);
274 return r.balance;
275 }
276
277 Ipc::QueueReader::Rate &
278 Ipc::FewToFewBiQueue::localRateLimit()
279 {
280 QueueReader &r = reader(theLocalGroup, theLocalProcessId);
281 return r.rateLimit;
282 }
283
284 const Ipc::QueueReader::Rate &
285 Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const
286 {
287 const QueueReader &r = reader(remoteGroup(), remoteProcessId);
288 return r.rateLimit;
289 }
290
291 Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
292 theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
293 theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
294 {
295 Must(theGroupASize > 0);
296 Must(theGroupBSize > 0);
297 }
298
299 Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
300 metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
301 queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
302 readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
303 {
304 }
305
306 Ipc::FewToFewBiQueue::Owner::~Owner()
307 {
308 delete metadataOwner;
309 delete queuesOwner;
310 delete readersOwner;
311 }