]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.cc
Merged from parent (trunk r11763, v3.2.0.12+).
[thirdparty/squid.git] / src / ipc / Queue.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8 #include "config.h"
9 #include "base/TextException.h"
10 #include "Debug.h"
11 #include "globals.h"
12 #include "ipc/Queue.h"
13
14 /// constructs Metadata ID from parent queue ID
15 static String
16 MetadataId(String id)
17 {
18 id.append("__metadata");
19 return id;
20 }
21
22 /// constructs one-to-one queues ID from parent queue ID
23 static String
24 QueuesId(String id)
25 {
26 id.append("__queues");
27 return id;
28 }
29
30 /// constructs QueueReaders ID from parent queue ID
31 static String
32 ReadersId(String id)
33 {
34 id.append("__readers");
35 return id;
36 }
37
38
39 /* QueueReader */
40
41 InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
42
43 Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0),
44 rateLimit(0), balance(0)
45 {
46 debugs(54, 7, HERE << "constructed " << id);
47 }
48
49 /* QueueReaders */
50
51 Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity)
52 {
53 Must(theCapacity > 0);
54 new (theReaders) QueueReader[theCapacity];
55 }
56
57 size_t
58 Ipc::QueueReaders::sharedMemorySize() const
59 {
60 return SharedMemorySize(theCapacity);
61 }
62
63 size_t
64 Ipc::QueueReaders::SharedMemorySize(const int capacity)
65 {
66 return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
67 }
68
69
70 // OneToOneUniQueue
71
72 Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
73 theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
74 theCapacity(aCapacity)
75 {
76 Must(theMaxItemSize > 0);
77 Must(theCapacity > 0);
78 }
79
80 int
81 Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
82 {
83 assert(maxItemSize > 0);
84 size -= sizeof(OneToOneUniQueue);
85 return size >= 0 ? size / maxItemSize : 0;
86 }
87
88 int
89 Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
90 {
91 assert(size >= 0);
92 return sizeof(OneToOneUniQueue) + maxItemSize * size;
93 }
94
95
96 /* OneToOneUniQueues */
97
98 Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
99 {
100 Must(theCapacity > 0);
101 for (int i = 0; i < theCapacity; ++i)
102 new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
103 }
104
105 size_t
106 Ipc::OneToOneUniQueues::sharedMemorySize() const
107 {
108 return sizeof(*this) + theCapacity * front().sharedMemorySize();
109 }
110
111 size_t
112 Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
113 {
114 const int queueSize =
115 OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
116 return sizeof(OneToOneUniQueues) + queueSize * capacity;
117 }
118
119 const Ipc::OneToOneUniQueue &
120 Ipc::OneToOneUniQueues::operator [](const int index) const
121 {
122 Must(0 <= index && index < theCapacity);
123 const size_t queueSize = index ? front().sharedMemorySize() : 0;
124 const char *const queue =
125 reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
126 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
127 }
128
129
130 // FewToFewBiQueue
131
132 Ipc::FewToFewBiQueue::Owner *
133 Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
134 {
135 return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
136 }
137
138 Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
139 metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
140 queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
141 readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
142 theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
143 theLastPopProcessId(readers->theCapacity)
144 {
145 Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
146 Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
147
148 const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
149 debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id);
150 }
151
152 bool
153 Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
154 {
155 switch (group) {
156 case groupA:
157 return metadata->theGroupAIdOffset <= processId &&
158 processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
159 case groupB:
160 return metadata->theGroupBIdOffset <= processId &&
161 processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
162 }
163 return false;
164 }
165
166 int
167 Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
168 {
169 Must(fromGroup != toGroup);
170 assert(validProcessId(fromGroup, fromProcessId));
171 assert(validProcessId(toGroup, toProcessId));
172 int index1;
173 int index2;
174 int offset;
175 if (fromGroup == groupA) {
176 index1 = fromProcessId - metadata->theGroupAIdOffset;
177 index2 = toProcessId - metadata->theGroupBIdOffset;
178 offset = 0;
179 } else {
180 index1 = toProcessId - metadata->theGroupAIdOffset;
181 index2 = fromProcessId - metadata->theGroupBIdOffset;
182 offset = metadata->theGroupASize * metadata->theGroupBSize;
183 }
184 const int index = offset + index1 * metadata->theGroupBSize + index2;
185 return index;
186 }
187
188 Ipc::OneToOneUniQueue &
189 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
190 {
191 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
192 }
193
194 const Ipc::OneToOneUniQueue &
195 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
196 {
197 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
198 }
199
200 int
201 Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
202 {
203 Must(validProcessId(group, processId));
204 return group == groupA ?
205 processId - metadata->theGroupAIdOffset :
206 metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
207 }
208
209 Ipc::QueueReader &
210 Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
211 {
212 return readers->theReaders[readerIndex(group, processId)];
213 }
214
215 const Ipc::QueueReader &
216 Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const
217 {
218 return readers->theReaders[readerIndex(group, processId)];
219 }
220
221 void
222 Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId)
223 {
224 QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
225 debugs(54, 7, HERE << "reader: " << localReader.id);
226
227 Must(validProcessId(remoteGroup(), remoteProcessId));
228 localReader.clearSignal();
229
230 // we got a hint; we could reposition iteration to try popping from the
231 // remoteProcessId queue first; but it does not seem to help much and might
232 // introduce some bias so we do not do that for now:
233 // theLastPopProcessId = remoteProcessId;
234 }
235
236 bool
237 Ipc::FewToFewBiQueue::popReady() const
238 {
239 // mimic FewToFewBiQueue::pop() but quit just before popping
240 int popProcessId = theLastPopProcessId; // preserve for future pop()
241 for (int i = 0; i < remoteGroupSize(); ++i) {
242 if (++popProcessId >= remoteGroupIdOffset() + remoteGroupSize())
243 popProcessId = remoteGroupIdOffset();
244 const OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), popProcessId, theLocalGroup, theLocalProcessId);
245 if (!queue.empty())
246 return true;
247 }
248 return false; // most likely, no process had anything to pop
249 }
250
251 Ipc::QueueReader::Balance &
252 Ipc::FewToFewBiQueue::localBalance()
253 {
254 QueueReader &r = reader(theLocalGroup, theLocalProcessId);
255 return r.balance;
256 }
257
258 Ipc::QueueReader::Rate &
259 Ipc::FewToFewBiQueue::localRateLimit()
260 {
261 QueueReader &r = reader(theLocalGroup, theLocalProcessId);
262 return r.rateLimit;
263 }
264
265 Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
266 theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
267 theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
268 {
269 Must(theGroupASize > 0);
270 Must(theGroupBSize > 0);
271 }
272
273 Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
274 metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
275 queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
276 readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
277 {
278 }
279
280 Ipc::FewToFewBiQueue::Owner::~Owner()
281 {
282 delete metadataOwner;
283 delete queuesOwner;
284 delete readersOwner;
285 }