]>
Commit | Line | Data |
---|---|---|
948230de DK |
1 | /* |
2 | * $Id$ | |
3 | * | |
4 | * DEBUG: section 54 Interprocess Communication | |
5 | * | |
6 | */ | |
7 | ||
f7f3304a | 8 | #include "squid.h" |
fa61cefe AR |
9 | #include "base/TextException.h" |
10 | #include "Debug.h" | |
11 | #include "globals.h" | |
948230de DK |
12 | #include "ipc/Queue.h" |
13 | ||
f5591061 | 14 | /// constructs Metadata ID from parent queue ID |
948230de | 15 | static String |
f5591061 | 16 | MetadataId(String id) |
948230de | 17 | { |
f5591061 | 18 | id.append("__metadata"); |
948230de DK |
19 | return id; |
20 | } | |
21 | ||
f5591061 | 22 | /// constructs one-to-one queues ID from parent queue ID |
fa61cefe | 23 | static String |
f5591061 DK |
24 | QueuesId(String id) |
25 | { | |
26 | id.append("__queues"); | |
27 | return id; | |
28 | } | |
29 | ||
30 | /// constructs QueueReaders ID from parent queue ID | |
31 | static String | |
32 | ReadersId(String id) | |
fa61cefe AR |
33 | { |
34 | id.append("__readers"); | |
35 | return id; | |
36 | } | |
37 | ||
38 | ||
39 | /* QueueReader */ | |
40 | ||
15cdbc7c | 41 | InstanceIdDefinitions(Ipc::QueueReader, "ipcQR"); |
fa61cefe | 42 | |
df881a0f AR |
43 | Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0), |
44 | rateLimit(0), balance(0) | |
fa61cefe AR |
45 | { |
46 | debugs(54, 7, HERE << "constructed " << id); | |
47 | } | |
48 | ||
68353d5a DK |
49 | /* QueueReaders */ |
50 | ||
15cdbc7c | 51 | Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity) |
68353d5a DK |
52 | { |
53 | Must(theCapacity > 0); | |
54 | new (theReaders) QueueReader[theCapacity]; | |
55 | } | |
56 | ||
57 | size_t | |
15cdbc7c | 58 | Ipc::QueueReaders::sharedMemorySize() const |
68353d5a DK |
59 | { |
60 | return SharedMemorySize(theCapacity); | |
61 | } | |
62 | ||
63 | size_t | |
15cdbc7c | 64 | Ipc::QueueReaders::SharedMemorySize(const int capacity) |
68353d5a DK |
65 | { |
66 | return sizeof(QueueReaders) + sizeof(QueueReader) * capacity; | |
67 | } | |
68 | ||
948230de DK |
69 | |
70 | // OneToOneUniQueue | |
71 | ||
f5591061 | 72 | Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity): |
9199139f AR |
73 | theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize), |
74 | theCapacity(aCapacity) | |
fa61cefe | 75 | { |
f5591061 DK |
76 | Must(theMaxItemSize > 0); |
77 | Must(theCapacity > 0); | |
fa61cefe AR |
78 | } |
79 | ||
948230de | 80 | int |
15cdbc7c | 81 | Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size) |
948230de DK |
82 | { |
83 | assert(maxItemSize > 0); | |
f5591061 | 84 | size -= sizeof(OneToOneUniQueue); |
948230de DK |
85 | return size >= 0 ? size / maxItemSize : 0; |
86 | } | |
87 | ||
88 | int | |
15cdbc7c | 89 | Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size) |
948230de DK |
90 | { |
91 | assert(size >= 0); | |
f5591061 | 92 | return sizeof(OneToOneUniQueue) + maxItemSize * size; |
948230de DK |
93 | } |
94 | ||
68353d5a | 95 | |
f5591061 | 96 | /* OneToOneUniQueues */ |
948230de | 97 | |
f5591061 | 98 | Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity) |
fa61cefe | 99 | { |
f5591061 DK |
100 | Must(theCapacity > 0); |
101 | for (int i = 0; i < theCapacity; ++i) | |
102 | new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity); | |
68353d5a DK |
103 | } |
104 | ||
105 | size_t | |
f5591061 | 106 | Ipc::OneToOneUniQueues::sharedMemorySize() const |
68353d5a | 107 | { |
f5591061 | 108 | return sizeof(*this) + theCapacity * front().sharedMemorySize(); |
fa61cefe AR |
109 | } |
110 | ||
f5591061 DK |
111 | size_t |
112 | Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity) | |
fa61cefe | 113 | { |
f5591061 DK |
114 | const int queueSize = |
115 | OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity); | |
116 | return sizeof(OneToOneUniQueues) + queueSize * capacity; | |
fa61cefe AR |
117 | } |
118 | ||
f5591061 DK |
119 | const Ipc::OneToOneUniQueue & |
120 | Ipc::OneToOneUniQueues::operator [](const int index) const | |
fa61cefe | 121 | { |
f5591061 DK |
122 | Must(0 <= index && index < theCapacity); |
123 | const size_t queueSize = index ? front().sharedMemorySize() : 0; | |
124 | const char *const queue = | |
125 | reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize; | |
126 | return *reinterpret_cast<const OneToOneUniQueue *>(queue); | |
fa61cefe AR |
127 | } |
128 | ||
948230de | 129 | |
f5591061 | 130 | // FewToFewBiQueue |
948230de | 131 | |
f5591061 DK |
132 | Ipc::FewToFewBiQueue::Owner * |
133 | Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity) | |
68353d5a | 134 | { |
f5591061 | 135 | return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity); |
68353d5a DK |
136 | } |
137 | ||
f5591061 | 138 | Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId): |
9199139f AR |
139 | metadata(shm_old(Metadata)(MetadataId(id).termedBuf())), |
140 | queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())), | |
141 | readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())), | |
142 | theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId), | |
143 | theLastPopProcessId(readers->theCapacity) | |
948230de | 144 | { |
f5591061 DK |
145 | Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2); |
146 | Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize); | |
68353d5a | 147 | |
f5591061 DK |
148 | const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); |
149 | debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id); | |
948230de DK |
150 | } |
151 | ||
ea2cdeb6 DK |
152 | int |
153 | Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity) | |
154 | { | |
155 | return capacity * groupASize * groupBSize * 2; | |
156 | } | |
157 | ||
f5591061 DK |
158 | bool |
159 | Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const | |
160 | { | |
161 | switch (group) { | |
162 | case groupA: | |
163 | return metadata->theGroupAIdOffset <= processId && | |
9199139f | 164 | processId < metadata->theGroupAIdOffset + metadata->theGroupASize; |
f5591061 DK |
165 | case groupB: |
166 | return metadata->theGroupBIdOffset <= processId && | |
9199139f | 167 | processId < metadata->theGroupBIdOffset + metadata->theGroupBSize; |
f5591061 DK |
168 | } |
169 | return false; | |
170 | } | |
171 | ||
0a11e039 AR |
172 | int |
173 | Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const | |
f5591061 DK |
174 | { |
175 | Must(fromGroup != toGroup); | |
0a11e039 AR |
176 | assert(validProcessId(fromGroup, fromProcessId)); |
177 | assert(validProcessId(toGroup, toProcessId)); | |
f5591061 DK |
178 | int index1; |
179 | int index2; | |
180 | int offset; | |
181 | if (fromGroup == groupA) { | |
182 | index1 = fromProcessId - metadata->theGroupAIdOffset; | |
183 | index2 = toProcessId - metadata->theGroupBIdOffset; | |
184 | offset = 0; | |
185 | } else { | |
186 | index1 = toProcessId - metadata->theGroupAIdOffset; | |
187 | index2 = fromProcessId - metadata->theGroupBIdOffset; | |
188 | offset = metadata->theGroupASize * metadata->theGroupBSize; | |
189 | } | |
190 | const int index = offset + index1 * metadata->theGroupBSize + index2; | |
0a11e039 AR |
191 | return index; |
192 | } | |
193 | ||
194 | Ipc::OneToOneUniQueue & | |
195 | Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) | |
196 | { | |
197 | return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)]; | |
198 | } | |
199 | ||
200 | const Ipc::OneToOneUniQueue & | |
201 | Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const | |
202 | { | |
203 | return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)]; | |
948230de DK |
204 | } |
205 | ||
55939a01 AR |
206 | /// incoming queue from a given remote process |
207 | const Ipc::OneToOneUniQueue & | |
208 | Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const | |
209 | { | |
210 | return oneToOneQueue(remoteGroup(), remoteProcessId, | |
211 | theLocalGroup, theLocalProcessId); | |
212 | } | |
213 | ||
214 | /// outgoing queue to a given remote process | |
215 | const Ipc::OneToOneUniQueue & | |
216 | Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const | |
217 | { | |
218 | return oneToOneQueue(theLocalGroup, theLocalProcessId, | |
219 | remoteGroup(), remoteProcessId); | |
220 | } | |
221 | ||
df881a0f AR |
222 | int |
223 | Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const | |
224 | { | |
225 | Must(validProcessId(group, processId)); | |
226 | return group == groupA ? | |
227 | processId - metadata->theGroupAIdOffset : | |
228 | metadata->theGroupASize + processId - metadata->theGroupBIdOffset; | |
229 | } | |
230 | ||
f5591061 DK |
231 | Ipc::QueueReader & |
232 | Ipc::FewToFewBiQueue::reader(const Group group, const int processId) | |
948230de | 233 | { |
df881a0f AR |
234 | return readers->theReaders[readerIndex(group, processId)]; |
235 | } | |
236 | ||
237 | const Ipc::QueueReader & | |
238 | Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const | |
239 | { | |
240 | return readers->theReaders[readerIndex(group, processId)]; | |
948230de | 241 | } |
fa61cefe AR |
242 | |
243 | void | |
f5591061 | 244 | Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId) |
fa61cefe | 245 | { |
f5591061 DK |
246 | QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); |
247 | debugs(54, 7, HERE << "reader: " << localReader.id); | |
fa61cefe | 248 | |
f5591061 DK |
249 | Must(validProcessId(remoteGroup(), remoteProcessId)); |
250 | localReader.clearSignal(); | |
fa61cefe AR |
251 | |
252 | // we got a hint; we could reposition iteration to try popping from the | |
f5591061 | 253 | // remoteProcessId queue first; but it does not seem to help much and might |
fa61cefe | 254 | // introduce some bias so we do not do that for now: |
f5591061 | 255 | // theLastPopProcessId = remoteProcessId; |
68353d5a DK |
256 | } |
257 | ||
df881a0f AR |
258 | Ipc::QueueReader::Balance & |
259 | Ipc::FewToFewBiQueue::localBalance() | |
260 | { | |
261 | QueueReader &r = reader(theLocalGroup, theLocalProcessId); | |
262 | return r.balance; | |
263 | } | |
264 | ||
55939a01 AR |
265 | const Ipc::QueueReader::Balance & |
266 | Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const | |
267 | { | |
268 | const QueueReader &r = reader(remoteGroup(), remoteProcessId); | |
269 | return r.balance; | |
270 | } | |
271 | ||
df881a0f AR |
272 | Ipc::QueueReader::Rate & |
273 | Ipc::FewToFewBiQueue::localRateLimit() | |
274 | { | |
275 | QueueReader &r = reader(theLocalGroup, theLocalProcessId); | |
276 | return r.rateLimit; | |
55939a01 AR |
277 | } |
278 | ||
279 | const Ipc::QueueReader::Rate & | |
280 | Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const | |
281 | { | |
282 | const QueueReader &r = reader(remoteGroup(), remoteProcessId); | |
283 | return r.rateLimit; | |
df881a0f AR |
284 | } |
285 | ||
f5591061 | 286 | Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset): |
9199139f AR |
287 | theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset), |
288 | theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset) | |
f5591061 DK |
289 | { |
290 | Must(theGroupASize > 0); | |
291 | Must(theGroupBSize > 0); | |
292 | } | |
293 | ||
294 | Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity): | |
9199139f AR |
295 | metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)), |
296 | queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)), | |
297 | readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize)) | |
68353d5a | 298 | { |
68353d5a DK |
299 | } |
300 | ||
f5591061 | 301 | Ipc::FewToFewBiQueue::Owner::~Owner() |
68353d5a | 302 | { |
f5591061 DK |
303 | delete metadataOwner; |
304 | delete queuesOwner; | |
68353d5a | 305 | delete readersOwner; |
fa61cefe | 306 | } |