]>
Commit | Line | Data |
---|---|---|
948230de DK |
1 | /* |
2 | * $Id$ | |
3 | * | |
4 | * DEBUG: section 54 Interprocess Communication | |
5 | * | |
6 | */ | |
7 | ||
f7f3304a | 8 | #include "squid.h" |
fa61cefe AR |
9 | #include "base/TextException.h" |
10 | #include "Debug.h" | |
11 | #include "globals.h" | |
948230de DK |
12 | #include "ipc/Queue.h" |
13 | ||
f5591061 | 14 | /// constructs Metadata ID from parent queue ID |
948230de | 15 | static String |
f5591061 | 16 | MetadataId(String id) |
948230de | 17 | { |
f5591061 | 18 | id.append("__metadata"); |
948230de DK |
19 | return id; |
20 | } | |
21 | ||
f5591061 | 22 | /// constructs one-to-one queues ID from parent queue ID |
fa61cefe | 23 | static String |
f5591061 DK |
24 | QueuesId(String id) |
25 | { | |
26 | id.append("__queues"); | |
27 | return id; | |
28 | } | |
29 | ||
30 | /// constructs QueueReaders ID from parent queue ID | |
31 | static String | |
32 | ReadersId(String id) | |
fa61cefe AR |
33 | { |
34 | id.append("__readers"); | |
35 | return id; | |
36 | } | |
37 | ||
fa61cefe AR |
38 | /* QueueReader */ |
39 | ||
15cdbc7c | 40 | InstanceIdDefinitions(Ipc::QueueReader, "ipcQR"); |
fa61cefe | 41 | |
df881a0f AR |
42 | Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0), |
43 | rateLimit(0), balance(0) | |
fa61cefe AR |
44 | { |
45 | debugs(54, 7, HERE << "constructed " << id); | |
46 | } | |
47 | ||
68353d5a DK |
48 | /* QueueReaders */ |
49 | ||
15cdbc7c | 50 | Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity) |
68353d5a DK |
51 | { |
52 | Must(theCapacity > 0); | |
2cde0303 FC |
53 | theReaders=new QueueReader[theCapacity]; |
54 | } | |
55 | ||
56 | Ipc::QueueReaders::~QueueReaders() | |
57 | { | |
58 | delete[] theReaders; | |
68353d5a DK |
59 | } |
60 | ||
61 | size_t | |
15cdbc7c | 62 | Ipc::QueueReaders::sharedMemorySize() const |
68353d5a DK |
63 | { |
64 | return SharedMemorySize(theCapacity); | |
65 | } | |
66 | ||
67 | size_t | |
15cdbc7c | 68 | Ipc::QueueReaders::SharedMemorySize(const int capacity) |
68353d5a DK |
69 | { |
70 | return sizeof(QueueReaders) + sizeof(QueueReader) * capacity; | |
71 | } | |
72 | ||
948230de DK |
73 | // OneToOneUniQueue |
74 | ||
f5591061 | 75 | Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity): |
9199139f AR |
76 | theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize), |
77 | theCapacity(aCapacity) | |
fa61cefe | 78 | { |
f5591061 DK |
79 | Must(theMaxItemSize > 0); |
80 | Must(theCapacity > 0); | |
fa61cefe AR |
81 | } |
82 | ||
948230de | 83 | int |
15cdbc7c | 84 | Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size) |
948230de DK |
85 | { |
86 | assert(maxItemSize > 0); | |
f5591061 | 87 | size -= sizeof(OneToOneUniQueue); |
948230de DK |
88 | return size >= 0 ? size / maxItemSize : 0; |
89 | } | |
90 | ||
91 | int | |
15cdbc7c | 92 | Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size) |
948230de DK |
93 | { |
94 | assert(size >= 0); | |
f5591061 | 95 | return sizeof(OneToOneUniQueue) + maxItemSize * size; |
948230de DK |
96 | } |
97 | ||
f5591061 | 98 | /* OneToOneUniQueues */ |
948230de | 99 | |
f5591061 | 100 | Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity) |
fa61cefe | 101 | { |
f5591061 DK |
102 | Must(theCapacity > 0); |
103 | for (int i = 0; i < theCapacity; ++i) | |
104 | new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity); | |
68353d5a DK |
105 | } |
106 | ||
107 | size_t | |
f5591061 | 108 | Ipc::OneToOneUniQueues::sharedMemorySize() const |
68353d5a | 109 | { |
f5591061 | 110 | return sizeof(*this) + theCapacity * front().sharedMemorySize(); |
fa61cefe AR |
111 | } |
112 | ||
f5591061 DK |
113 | size_t |
114 | Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity) | |
fa61cefe | 115 | { |
f5591061 DK |
116 | const int queueSize = |
117 | OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity); | |
118 | return sizeof(OneToOneUniQueues) + queueSize * capacity; | |
fa61cefe AR |
119 | } |
120 | ||
f5591061 DK |
121 | const Ipc::OneToOneUniQueue & |
122 | Ipc::OneToOneUniQueues::operator [](const int index) const | |
fa61cefe | 123 | { |
f5591061 DK |
124 | Must(0 <= index && index < theCapacity); |
125 | const size_t queueSize = index ? front().sharedMemorySize() : 0; | |
126 | const char *const queue = | |
127 | reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize; | |
128 | return *reinterpret_cast<const OneToOneUniQueue *>(queue); | |
fa61cefe AR |
129 | } |
130 | ||
f5591061 | 131 | // FewToFewBiQueue |
948230de | 132 | |
f5591061 DK |
133 | Ipc::FewToFewBiQueue::Owner * |
134 | Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity) | |
68353d5a | 135 | { |
f5591061 | 136 | return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity); |
68353d5a DK |
137 | } |
138 | ||
f5591061 | 139 | Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId): |
9199139f AR |
140 | metadata(shm_old(Metadata)(MetadataId(id).termedBuf())), |
141 | queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())), | |
142 | readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())), | |
143 | theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId), | |
144 | theLastPopProcessId(readers->theCapacity) | |
948230de | 145 | { |
f5591061 DK |
146 | Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2); |
147 | Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize); | |
68353d5a | 148 | |
f5591061 DK |
149 | const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); |
150 | debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id); | |
948230de DK |
151 | } |
152 | ||
ea2cdeb6 DK |
153 | int |
154 | Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity) | |
155 | { | |
156 | return capacity * groupASize * groupBSize * 2; | |
157 | } | |
158 | ||
f5591061 DK |
159 | bool |
160 | Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const | |
161 | { | |
162 | switch (group) { | |
163 | case groupA: | |
164 | return metadata->theGroupAIdOffset <= processId && | |
9199139f | 165 | processId < metadata->theGroupAIdOffset + metadata->theGroupASize; |
f5591061 DK |
166 | case groupB: |
167 | return metadata->theGroupBIdOffset <= processId && | |
9199139f | 168 | processId < metadata->theGroupBIdOffset + metadata->theGroupBSize; |
f5591061 DK |
169 | } |
170 | return false; | |
171 | } | |
172 | ||
0a11e039 AR |
173 | int |
174 | Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const | |
f5591061 DK |
175 | { |
176 | Must(fromGroup != toGroup); | |
0a11e039 AR |
177 | assert(validProcessId(fromGroup, fromProcessId)); |
178 | assert(validProcessId(toGroup, toProcessId)); | |
f5591061 DK |
179 | int index1; |
180 | int index2; | |
181 | int offset; | |
182 | if (fromGroup == groupA) { | |
183 | index1 = fromProcessId - metadata->theGroupAIdOffset; | |
184 | index2 = toProcessId - metadata->theGroupBIdOffset; | |
185 | offset = 0; | |
186 | } else { | |
187 | index1 = toProcessId - metadata->theGroupAIdOffset; | |
188 | index2 = fromProcessId - metadata->theGroupBIdOffset; | |
189 | offset = metadata->theGroupASize * metadata->theGroupBSize; | |
190 | } | |
191 | const int index = offset + index1 * metadata->theGroupBSize + index2; | |
0a11e039 AR |
192 | return index; |
193 | } | |
194 | ||
195 | Ipc::OneToOneUniQueue & | |
196 | Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) | |
197 | { | |
198 | return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)]; | |
199 | } | |
200 | ||
201 | const Ipc::OneToOneUniQueue & | |
202 | Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const | |
203 | { | |
204 | return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)]; | |
948230de DK |
205 | } |
206 | ||
55939a01 AR |
207 | /// incoming queue from a given remote process |
208 | const Ipc::OneToOneUniQueue & | |
209 | Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const | |
210 | { | |
211 | return oneToOneQueue(remoteGroup(), remoteProcessId, | |
212 | theLocalGroup, theLocalProcessId); | |
213 | } | |
214 | ||
215 | /// outgoing queue to a given remote process | |
216 | const Ipc::OneToOneUniQueue & | |
217 | Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const | |
218 | { | |
219 | return oneToOneQueue(theLocalGroup, theLocalProcessId, | |
220 | remoteGroup(), remoteProcessId); | |
221 | } | |
222 | ||
df881a0f AR |
223 | int |
224 | Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const | |
225 | { | |
226 | Must(validProcessId(group, processId)); | |
227 | return group == groupA ? | |
228 | processId - metadata->theGroupAIdOffset : | |
229 | metadata->theGroupASize + processId - metadata->theGroupBIdOffset; | |
230 | } | |
231 | ||
f5591061 DK |
232 | Ipc::QueueReader & |
233 | Ipc::FewToFewBiQueue::reader(const Group group, const int processId) | |
948230de | 234 | { |
df881a0f AR |
235 | return readers->theReaders[readerIndex(group, processId)]; |
236 | } | |
237 | ||
238 | const Ipc::QueueReader & | |
239 | Ipc::FewToFewBiQueue::reader(const Group group, const int processId) const | |
240 | { | |
241 | return readers->theReaders[readerIndex(group, processId)]; | |
948230de | 242 | } |
fa61cefe AR |
243 | |
244 | void | |
f5591061 | 245 | Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId) |
fa61cefe | 246 | { |
f5591061 DK |
247 | QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); |
248 | debugs(54, 7, HERE << "reader: " << localReader.id); | |
fa61cefe | 249 | |
f5591061 DK |
250 | Must(validProcessId(remoteGroup(), remoteProcessId)); |
251 | localReader.clearSignal(); | |
fa61cefe AR |
252 | |
253 | // we got a hint; we could reposition iteration to try popping from the | |
f5591061 | 254 | // remoteProcessId queue first; but it does not seem to help much and might |
fa61cefe | 255 | // introduce some bias so we do not do that for now: |
f5591061 | 256 | // theLastPopProcessId = remoteProcessId; |
68353d5a DK |
257 | } |
258 | ||
df881a0f AR |
259 | Ipc::QueueReader::Balance & |
260 | Ipc::FewToFewBiQueue::localBalance() | |
261 | { | |
262 | QueueReader &r = reader(theLocalGroup, theLocalProcessId); | |
263 | return r.balance; | |
264 | } | |
265 | ||
55939a01 AR |
266 | const Ipc::QueueReader::Balance & |
267 | Ipc::FewToFewBiQueue::balance(const int remoteProcessId) const | |
268 | { | |
269 | const QueueReader &r = reader(remoteGroup(), remoteProcessId); | |
270 | return r.balance; | |
271 | } | |
272 | ||
df881a0f AR |
273 | Ipc::QueueReader::Rate & |
274 | Ipc::FewToFewBiQueue::localRateLimit() | |
275 | { | |
276 | QueueReader &r = reader(theLocalGroup, theLocalProcessId); | |
277 | return r.rateLimit; | |
55939a01 AR |
278 | } |
279 | ||
280 | const Ipc::QueueReader::Rate & | |
281 | Ipc::FewToFewBiQueue::rateLimit(const int remoteProcessId) const | |
282 | { | |
283 | const QueueReader &r = reader(remoteGroup(), remoteProcessId); | |
284 | return r.rateLimit; | |
df881a0f AR |
285 | } |
286 | ||
f5591061 | 287 | Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset): |
9199139f AR |
288 | theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset), |
289 | theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset) | |
f5591061 DK |
290 | { |
291 | Must(theGroupASize > 0); | |
292 | Must(theGroupBSize > 0); | |
293 | } | |
294 | ||
295 | Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity): | |
9199139f AR |
296 | metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)), |
297 | queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)), | |
298 | readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize)) | |
68353d5a | 299 | { |
68353d5a DK |
300 | } |
301 | ||
f5591061 | 302 | Ipc::FewToFewBiQueue::Owner::~Owner() |
68353d5a | 303 | { |
f5591061 DK |
304 | delete metadataOwner; |
305 | delete queuesOwner; | |
68353d5a | 306 | delete readersOwner; |
fa61cefe | 307 | } |