]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.cc
Optimization: Make read requests in [Rock] IpcIo bypass max-swap-rate limit.
[thirdparty/squid.git] / src / ipc / Queue.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8 #include "config.h"
9 #include "base/TextException.h"
10 #include "Debug.h"
11 #include "globals.h"
12 #include "ipc/Queue.h"
13
14 /// constructs Metadata ID from parent queue ID
15 static String
16 MetadataId(String id)
17 {
18 id.append("__metadata");
19 return id;
20 }
21
22 /// constructs one-to-one queues ID from parent queue ID
23 static String
24 QueuesId(String id)
25 {
26 id.append("__queues");
27 return id;
28 }
29
30 /// constructs QueueReaders ID from parent queue ID
31 static String
32 ReadersId(String id)
33 {
34 id.append("__readers");
35 return id;
36 }
37
38
39 /* QueueReader */
40
41 InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
42
43 Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0),
44 rateLimit(0), balance(0)
45 {
46 debugs(54, 7, HERE << "constructed " << id);
47 }
48
49 /* QueueReaders */
50
51 Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity)
52 {
53 Must(theCapacity > 0);
54 new (theReaders) QueueReader[theCapacity];
55 }
56
57 size_t
58 Ipc::QueueReaders::sharedMemorySize() const
59 {
60 return SharedMemorySize(theCapacity);
61 }
62
63 size_t
64 Ipc::QueueReaders::SharedMemorySize(const int capacity)
65 {
66 return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
67 }
68
69
70 // OneToOneUniQueue
71
72 Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
73 theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
74 theCapacity(aCapacity)
75 {
76 Must(theMaxItemSize > 0);
77 Must(theCapacity > 0);
78 }
79
80 int
81 Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
82 {
83 assert(maxItemSize > 0);
84 size -= sizeof(OneToOneUniQueue);
85 return size >= 0 ? size / maxItemSize : 0;
86 }
87
88 int
89 Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
90 {
91 assert(size >= 0);
92 return sizeof(OneToOneUniQueue) + maxItemSize * size;
93 }
94
95
96 /* OneToOneUniQueues */
97
98 Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
99 {
100 Must(theCapacity > 0);
101 for (int i = 0; i < theCapacity; ++i)
102 new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
103 }
104
105 size_t
106 Ipc::OneToOneUniQueues::sharedMemorySize() const
107 {
108 return sizeof(*this) + theCapacity * front().sharedMemorySize();
109 }
110
111 size_t
112 Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
113 {
114 const int queueSize =
115 OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
116 return sizeof(OneToOneUniQueues) + queueSize * capacity;
117 }
118
119 const Ipc::OneToOneUniQueue &
120 Ipc::OneToOneUniQueues::operator [](const int index) const
121 {
122 Must(0 <= index && index < theCapacity);
123 const size_t queueSize = index ? front().sharedMemorySize() : 0;
124 const char *const queue =
125 reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
126 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
127 }
128
129
130 // FewToFewBiQueue
131
132 Ipc::FewToFewBiQueue::Owner *
133 Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
134 {
135 return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
136 }
137
138 Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
139 metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
140 queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
141 readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
142 theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
143 theLastPopProcessId(readers->theCapacity)
144 {
145 Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
146 Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
147
148 const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
149 debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id);
150 }
151
152 int
153 Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
154 {
155 return capacity * groupASize * groupBSize * 2;
156 }
157
158 bool
159 Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
160 {
161 switch (group) {
162 case groupA:
163 return metadata->theGroupAIdOffset <= processId &&
164 processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
165 case groupB:
166 return metadata->theGroupBIdOffset <= processId &&
167 processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
168 }
169 return false;
170 }
171
172 int
173 Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
174 {
175 Must(fromGroup != toGroup);
176 assert(validProcessId(fromGroup, fromProcessId));
177 assert(validProcessId(toGroup, toProcessId));
178 int index1;
179 int index2;
180 int offset;
181 if (fromGroup == groupA) {
182 index1 = fromProcessId - metadata->theGroupAIdOffset;
183 index2 = toProcessId - metadata->theGroupBIdOffset;
184 offset = 0;
185 } else {
186 index1 = toProcessId - metadata->theGroupAIdOffset;
187 index2 = fromProcessId - metadata->theGroupBIdOffset;
188 offset = metadata->theGroupASize * metadata->theGroupBSize;
189 }
190 const int index = offset + index1 * metadata->theGroupBSize + index2;
191 return index;
192 }
193
194 Ipc::OneToOneUniQueue &
195 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
196 {
197 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
198 }
199
200 const Ipc::OneToOneUniQueue &
201 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
202 {
203 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
204 }
205
206 /// incoming queue from a given remote process
207 const Ipc::OneToOneUniQueue &
208 Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
209 {
210 return oneToOneQueue(remoteGroup(), remoteProcessId,
211 theLocalGroup, theLocalProcessId);
212 }
213
214 /// outgoing queue to a given remote process
215 const Ipc::OneToOneUniQueue &
216 Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
217 {
218 return oneToOneQueue(theLocalGroup, theLocalProcessId,
219 remoteGroup(), remoteProcessId);
220 }
221
222 int
223 Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
224 {
225 Must(validProcessId(group, processId));
226 return group == groupA ?
227 processId - metadata->theGroupAIdOffset :
228 metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
229 }
230
231 Ipc::QueueReader &
232 Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
233 {
234 return readers->theReaders[readerIndex(group, processId)];
235 }
236
237 const Ipc::QueueReader &
238 Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const
239 {
240 return readers->theReaders[readerIndex(group, processId)];
241 }
242
243 void
244 Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId)
245 {
246 QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
247 debugs(54, 7, HERE << "reader: " << localReader.id);
248
249 Must(validProcessId(remoteGroup(), remoteProcessId));
250 localReader.clearSignal();
251
252 // we got a hint; we could reposition iteration to try popping from the
253 // remoteProcessId queue first; but it does not seem to help much and might
254 // introduce some bias so we do not do that for now:
255 // theLastPopProcessId = remoteProcessId;
256 }
257
258 Ipc::QueueReader::Balance &
259 Ipc::FewToFewBiQueue::localBalance()
260 {
261 QueueReader &r = reader(theLocalGroup, theLocalProcessId);
262 return r.balance;
263 }
264
265 const Ipc::QueueReader::Balance &
266 Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const
267 {
268 const QueueReader &r = reader(remoteGroup(), remoteProcessId);
269 return r.balance;
270 }
271
272 Ipc::QueueReader::Rate &
273 Ipc::FewToFewBiQueue::localRateLimit()
274 {
275 QueueReader &r = reader(theLocalGroup, theLocalProcessId);
276 return r.rateLimit;
277 }
278
279 const Ipc::QueueReader::Rate &
280 Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const
281 {
282 const QueueReader &r = reader(remoteGroup(), remoteProcessId);
283 return r.rateLimit;
284 }
285
286 Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
287 theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
288 theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
289 {
290 Must(theGroupASize > 0);
291 Must(theGroupBSize > 0);
292 }
293
294 Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
295 metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
296 queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
297 readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
298 {
299 }
300
301 Ipc::FewToFewBiQueue::Owner::~Owner()
302 {
303 delete metadataOwner;
304 delete queuesOwner;
305 delete readersOwner;
306 }