]>
Commit | Line | Data |
---|---|---|
948230de DK |
1 | /* |
2 | * $Id$ | |
3 | * | |
4 | * DEBUG: section 54 Interprocess Communication | |
5 | * | |
6 | */ | |
7 | ||
8 | #include "config.h" | |
fa61cefe AR |
9 | #include "base/TextException.h" |
10 | #include "Debug.h" | |
11 | #include "globals.h" | |
948230de DK |
12 | #include "ipc/Queue.h" |
13 | ||
f5591061 | 14 | /// constructs Metadata ID from parent queue ID |
948230de | 15 | static String |
f5591061 | 16 | MetadataId(String id) |
948230de | 17 | { |
f5591061 | 18 | id.append("__metadata"); |
948230de DK |
19 | return id; |
20 | } | |
21 | ||
f5591061 | 22 | /// constructs one-to-one queues ID from parent queue ID |
fa61cefe | 23 | static String |
f5591061 DK |
24 | QueuesId(String id) |
25 | { | |
26 | id.append("__queues"); | |
27 | return id; | |
28 | } | |
29 | ||
30 | /// constructs QueueReaders ID from parent queue ID | |
31 | static String | |
32 | ReadersId(String id) | |
fa61cefe AR |
33 | { |
34 | id.append("__readers"); | |
35 | return id; | |
36 | } | |
37 | ||
38 | ||
39 | /* QueueReader */ | |
40 | ||
15cdbc7c | 41 | InstanceIdDefinitions(Ipc::QueueReader, "ipcQR"); |
fa61cefe | 42 | |
15cdbc7c | 43 | Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0) |
fa61cefe AR |
44 | { |
45 | debugs(54, 7, HERE << "constructed " << id); | |
46 | } | |
47 | ||
68353d5a DK |
48 | /* QueueReaders */ |
49 | ||
15cdbc7c | 50 | Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity) |
68353d5a DK |
51 | { |
52 | Must(theCapacity > 0); | |
53 | new (theReaders) QueueReader[theCapacity]; | |
54 | } | |
55 | ||
56 | size_t | |
15cdbc7c | 57 | Ipc::QueueReaders::sharedMemorySize() const |
68353d5a DK |
58 | { |
59 | return SharedMemorySize(theCapacity); | |
60 | } | |
61 | ||
62 | size_t | |
15cdbc7c | 63 | Ipc::QueueReaders::SharedMemorySize(const int capacity) |
68353d5a DK |
64 | { |
65 | return sizeof(QueueReaders) + sizeof(QueueReader) * capacity; | |
66 | } | |
67 | ||
948230de DK |
68 | |
69 | // OneToOneUniQueue | |
70 | ||
f5591061 | 71 | Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity): |
9199139f AR |
72 | theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize), |
73 | theCapacity(aCapacity) | |
fa61cefe | 74 | { |
f5591061 DK |
75 | Must(theMaxItemSize > 0); |
76 | Must(theCapacity > 0); | |
fa61cefe AR |
77 | } |
78 | ||
948230de | 79 | int |
15cdbc7c | 80 | Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size) |
948230de DK |
81 | { |
82 | assert(maxItemSize > 0); | |
f5591061 | 83 | size -= sizeof(OneToOneUniQueue); |
948230de DK |
84 | return size >= 0 ? size / maxItemSize : 0; |
85 | } | |
86 | ||
87 | int | |
15cdbc7c | 88 | Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size) |
948230de DK |
89 | { |
90 | assert(size >= 0); | |
f5591061 | 91 | return sizeof(OneToOneUniQueue) + maxItemSize * size; |
948230de DK |
92 | } |
93 | ||
68353d5a | 94 | |
f5591061 | 95 | /* OneToOneUniQueues */ |
948230de | 96 | |
f5591061 | 97 | Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity) |
fa61cefe | 98 | { |
f5591061 DK |
99 | Must(theCapacity > 0); |
100 | for (int i = 0; i < theCapacity; ++i) | |
101 | new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity); | |
68353d5a DK |
102 | } |
103 | ||
104 | size_t | |
f5591061 | 105 | Ipc::OneToOneUniQueues::sharedMemorySize() const |
68353d5a | 106 | { |
f5591061 | 107 | return sizeof(*this) + theCapacity * front().sharedMemorySize(); |
fa61cefe AR |
108 | } |
109 | ||
f5591061 DK |
110 | size_t |
111 | Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity) | |
fa61cefe | 112 | { |
f5591061 DK |
113 | const int queueSize = |
114 | OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity); | |
115 | return sizeof(OneToOneUniQueues) + queueSize * capacity; | |
fa61cefe AR |
116 | } |
117 | ||
f5591061 DK |
118 | const Ipc::OneToOneUniQueue & |
119 | Ipc::OneToOneUniQueues::operator [](const int index) const | |
fa61cefe | 120 | { |
f5591061 DK |
121 | Must(0 <= index && index < theCapacity); |
122 | const size_t queueSize = index ? front().sharedMemorySize() : 0; | |
123 | const char *const queue = | |
124 | reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize; | |
125 | return *reinterpret_cast<const OneToOneUniQueue *>(queue); | |
fa61cefe AR |
126 | } |
127 | ||
948230de | 128 | |
f5591061 | 129 | // FewToFewBiQueue |
948230de | 130 | |
f5591061 DK |
131 | Ipc::FewToFewBiQueue::Owner * |
132 | Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity) | |
68353d5a | 133 | { |
f5591061 | 134 | return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity); |
68353d5a DK |
135 | } |
136 | ||
f5591061 | 137 | Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId): |
9199139f AR |
138 | metadata(shm_old(Metadata)(MetadataId(id).termedBuf())), |
139 | queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())), | |
140 | readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())), | |
141 | theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId), | |
142 | theLastPopProcessId(readers->theCapacity) | |
948230de | 143 | { |
f5591061 DK |
144 | Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2); |
145 | Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize); | |
68353d5a | 146 | |
f5591061 DK |
147 | const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); |
148 | debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id); | |
948230de DK |
149 | } |
150 | ||
f5591061 DK |
151 | bool |
152 | Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const | |
153 | { | |
154 | switch (group) { | |
155 | case groupA: | |
156 | return metadata->theGroupAIdOffset <= processId && | |
9199139f | 157 | processId < metadata->theGroupAIdOffset + metadata->theGroupASize; |
f5591061 DK |
158 | case groupB: |
159 | return metadata->theGroupBIdOffset <= processId && | |
9199139f | 160 | processId < metadata->theGroupBIdOffset + metadata->theGroupBSize; |
f5591061 DK |
161 | } |
162 | return false; | |
163 | } | |
164 | ||
0a11e039 AR |
165 | int |
166 | Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const | |
f5591061 DK |
167 | { |
168 | Must(fromGroup != toGroup); | |
0a11e039 AR |
169 | assert(validProcessId(fromGroup, fromProcessId)); |
170 | assert(validProcessId(toGroup, toProcessId)); | |
f5591061 DK |
171 | int index1; |
172 | int index2; | |
173 | int offset; | |
174 | if (fromGroup == groupA) { | |
175 | index1 = fromProcessId - metadata->theGroupAIdOffset; | |
176 | index2 = toProcessId - metadata->theGroupBIdOffset; | |
177 | offset = 0; | |
178 | } else { | |
179 | index1 = toProcessId - metadata->theGroupAIdOffset; | |
180 | index2 = fromProcessId - metadata->theGroupBIdOffset; | |
181 | offset = metadata->theGroupASize * metadata->theGroupBSize; | |
182 | } | |
183 | const int index = offset + index1 * metadata->theGroupBSize + index2; | |
0a11e039 AR |
184 | return index; |
185 | } | |
186 | ||
187 | Ipc::OneToOneUniQueue & | |
188 | Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) | |
189 | { | |
190 | return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)]; | |
191 | } | |
192 | ||
193 | const Ipc::OneToOneUniQueue & | |
194 | Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const | |
195 | { | |
196 | return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)]; | |
948230de DK |
197 | } |
198 | ||
f5591061 DK |
199 | Ipc::QueueReader & |
200 | Ipc::FewToFewBiQueue::reader(const Group group, const int processId) | |
948230de | 201 | { |
f5591061 DK |
202 | Must(validProcessId(group, processId)); |
203 | const int index = group == groupA ? | |
9199139f AR |
204 | processId - metadata->theGroupAIdOffset : |
205 | metadata->theGroupASize + processId - metadata->theGroupBIdOffset; | |
f5591061 | 206 | return readers->theReaders[index]; |
948230de | 207 | } |
fa61cefe AR |
208 | |
209 | void | |
f5591061 | 210 | Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId) |
fa61cefe | 211 | { |
f5591061 DK |
212 | QueueReader &localReader = reader(theLocalGroup, theLocalProcessId); |
213 | debugs(54, 7, HERE << "reader: " << localReader.id); | |
fa61cefe | 214 | |
f5591061 DK |
215 | Must(validProcessId(remoteGroup(), remoteProcessId)); |
216 | localReader.clearSignal(); | |
fa61cefe AR |
217 | |
218 | // we got a hint; we could reposition iteration to try popping from the | |
f5591061 | 219 | // remoteProcessId queue first; but it does not seem to help much and might |
fa61cefe | 220 | // introduce some bias so we do not do that for now: |
f5591061 | 221 | // theLastPopProcessId = remoteProcessId; |
68353d5a DK |
222 | } |
223 | ||
f5591061 | 224 | Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset): |
9199139f AR |
225 | theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset), |
226 | theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset) | |
f5591061 DK |
227 | { |
228 | Must(theGroupASize > 0); | |
229 | Must(theGroupBSize > 0); | |
230 | } | |
231 | ||
232 | Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity): | |
9199139f AR |
233 | metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)), |
234 | queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)), | |
235 | readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize)) | |
68353d5a | 236 | { |
68353d5a DK |
237 | } |
238 | ||
f5591061 | 239 | Ipc::FewToFewBiQueue::Owner::~Owner() |
68353d5a | 240 | { |
f5591061 DK |
241 | delete metadataOwner; |
242 | delete queuesOwner; | |
68353d5a | 243 | delete readersOwner; |
fa61cefe | 244 | } |