]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.cc
merge from SslServerCertValidator r12332
[thirdparty/squid.git] / src / ipc / Queue.cc
1 /*
2 * DEBUG: section 54 Interprocess Communication
3 *
4 */
5
6 #include "squid.h"
7 #include "base/TextException.h"
8 #include "Debug.h"
9 #include "globals.h"
10 #include "ipc/Queue.h"
11
12 /// constructs Metadata ID from parent queue ID
13 static String
14 MetadataId(String id)
15 {
16 id.append("__metadata");
17 return id;
18 }
19
20 /// constructs one-to-one queues ID from parent queue ID
21 static String
22 QueuesId(String id)
23 {
24 id.append("__queues");
25 return id;
26 }
27
28 /// constructs QueueReaders ID from parent queue ID
29 static String
30 ReadersId(String id)
31 {
32 id.append("__readers");
33 return id;
34 }
35
36 /* QueueReader */
37
38 InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
39
40 Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0),
41 rateLimit(0), balance(0)
42 {
43 debugs(54, 7, HERE << "constructed " << id);
44 }
45
46 /* QueueReaders */
47
48 Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity),
49 theReaders(theCapacity)
50 {
51 Must(theCapacity > 0);
52 }
53
54 size_t
55 Ipc::QueueReaders::sharedMemorySize() const
56 {
57 return SharedMemorySize(theCapacity);
58 }
59
60 size_t
61 Ipc::QueueReaders::SharedMemorySize(const int capacity)
62 {
63 return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
64 }
65
66 // OneToOneUniQueue
67
68 Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
69 theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
70 theCapacity(aCapacity)
71 {
72 Must(theMaxItemSize > 0);
73 Must(theCapacity > 0);
74 }
75
76 int
77 Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
78 {
79 assert(maxItemSize > 0);
80 size -= sizeof(OneToOneUniQueue);
81 return size >= 0 ? size / maxItemSize : 0;
82 }
83
84 int
85 Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
86 {
87 assert(size >= 0);
88 return sizeof(OneToOneUniQueue) + maxItemSize * size;
89 }
90
91 /* OneToOneUniQueues */
92
93 Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
94 {
95 Must(theCapacity > 0);
96 for (int i = 0; i < theCapacity; ++i)
97 new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
98 }
99
100 size_t
101 Ipc::OneToOneUniQueues::sharedMemorySize() const
102 {
103 return sizeof(*this) + theCapacity * front().sharedMemorySize();
104 }
105
106 size_t
107 Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
108 {
109 const int queueSize =
110 OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
111 return sizeof(OneToOneUniQueues) + queueSize * capacity;
112 }
113
114 const Ipc::OneToOneUniQueue &
115 Ipc::OneToOneUniQueues::operator [](const int index) const
116 {
117 Must(0 <= index && index < theCapacity);
118 const size_t queueSize = index ? front().sharedMemorySize() : 0;
119 const char *const queue =
120 reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
121 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
122 }
123
124 // FewToFewBiQueue
125
126 Ipc::FewToFewBiQueue::Owner *
127 Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
128 {
129 return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
130 }
131
132 Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
133 metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
134 queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
135 readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
136 theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
137 theLastPopProcessId(readers->theCapacity)
138 {
139 Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
140 Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
141
142 const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
143 debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id);
144 }
145
146 int
147 Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
148 {
149 return capacity * groupASize * groupBSize * 2;
150 }
151
152 bool
153 Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
154 {
155 switch (group) {
156 case groupA:
157 return metadata->theGroupAIdOffset <= processId &&
158 processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
159 case groupB:
160 return metadata->theGroupBIdOffset <= processId &&
161 processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
162 }
163 return false;
164 }
165
166 int
167 Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
168 {
169 Must(fromGroup != toGroup);
170 assert(validProcessId(fromGroup, fromProcessId));
171 assert(validProcessId(toGroup, toProcessId));
172 int index1;
173 int index2;
174 int offset;
175 if (fromGroup == groupA) {
176 index1 = fromProcessId - metadata->theGroupAIdOffset;
177 index2 = toProcessId - metadata->theGroupBIdOffset;
178 offset = 0;
179 } else {
180 index1 = toProcessId - metadata->theGroupAIdOffset;
181 index2 = fromProcessId - metadata->theGroupBIdOffset;
182 offset = metadata->theGroupASize * metadata->theGroupBSize;
183 }
184 const int index = offset + index1 * metadata->theGroupBSize + index2;
185 return index;
186 }
187
188 Ipc::OneToOneUniQueue &
189 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
190 {
191 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
192 }
193
194 const Ipc::OneToOneUniQueue &
195 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
196 {
197 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
198 }
199
200 /// incoming queue from a given remote process
201 const Ipc::OneToOneUniQueue &
202 Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
203 {
204 return oneToOneQueue(remoteGroup(), remoteProcessId,
205 theLocalGroup, theLocalProcessId);
206 }
207
208 /// outgoing queue to a given remote process
209 const Ipc::OneToOneUniQueue &
210 Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
211 {
212 return oneToOneQueue(theLocalGroup, theLocalProcessId,
213 remoteGroup(), remoteProcessId);
214 }
215
216 int
217 Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
218 {
219 Must(validProcessId(group, processId));
220 return group == groupA ?
221 processId - metadata->theGroupAIdOffset :
222 metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
223 }
224
225 Ipc::QueueReader &
226 Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
227 {
228 return readers->theReaders[readerIndex(group, processId)];
229 }
230
231 const Ipc::QueueReader &
232 Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const
233 {
234 return readers->theReaders[readerIndex(group, processId)];
235 }
236
237 void
238 Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId)
239 {
240 QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
241 debugs(54, 7, HERE << "reader: " << localReader.id);
242
243 Must(validProcessId(remoteGroup(), remoteProcessId));
244 localReader.clearSignal();
245
246 // we got a hint; we could reposition iteration to try popping from the
247 // remoteProcessId queue first; but it does not seem to help much and might
248 // introduce some bias so we do not do that for now:
249 // theLastPopProcessId = remoteProcessId;
250 }
251
252 Ipc::QueueReader::Balance &
253 Ipc::FewToFewBiQueue::localBalance()
254 {
255 QueueReader &r = reader(theLocalGroup, theLocalProcessId);
256 return r.balance;
257 }
258
259 const Ipc::QueueReader::Balance &
260 Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const
261 {
262 const QueueReader &r = reader(remoteGroup(), remoteProcessId);
263 return r.balance;
264 }
265
266 Ipc::QueueReader::Rate &
267 Ipc::FewToFewBiQueue::localRateLimit()
268 {
269 QueueReader &r = reader(theLocalGroup, theLocalProcessId);
270 return r.rateLimit;
271 }
272
273 const Ipc::QueueReader::Rate &
274 Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const
275 {
276 const QueueReader &r = reader(remoteGroup(), remoteProcessId);
277 return r.rateLimit;
278 }
279
280 Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
281 theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
282 theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
283 {
284 Must(theGroupASize > 0);
285 Must(theGroupBSize > 0);
286 }
287
288 Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
289 metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
290 queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
291 readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
292 {
293 }
294
295 Ipc::FewToFewBiQueue::Owner::~Owner()
296 {
297 delete metadataOwner;
298 delete queuesOwner;
299 delete readersOwner;
300 }