]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.cc
SMP Caching: Core changes, IPC primitives, Shared memory cache, and Rock Store
[thirdparty/squid.git] / src / ipc / Queue.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8 #include "config.h"
9 #include "base/TextException.h"
10 #include "Debug.h"
11 #include "globals.h"
12 #include "ipc/Queue.h"
13
14 /// constructs Metadata ID from parent queue ID
15 static String
16 MetadataId(String id)
17 {
18 id.append("__metadata");
19 return id;
20 }
21
22 /// constructs one-to-one queues ID from parent queue ID
23 static String
24 QueuesId(String id)
25 {
26 id.append("__queues");
27 return id;
28 }
29
30 /// constructs QueueReaders ID from parent queue ID
31 static String
32 ReadersId(String id)
33 {
34 id.append("__readers");
35 return id;
36 }
37
38
39 /* QueueReader */
40
41 InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
42
43 Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0)
44 {
45 debugs(54, 7, HERE << "constructed " << id);
46 }
47
48 /* QueueReaders */
49
50 Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity)
51 {
52 Must(theCapacity > 0);
53 new (theReaders) QueueReader[theCapacity];
54 }
55
56 size_t
57 Ipc::QueueReaders::sharedMemorySize() const
58 {
59 return SharedMemorySize(theCapacity);
60 }
61
62 size_t
63 Ipc::QueueReaders::SharedMemorySize(const int capacity)
64 {
65 return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
66 }
67
68
69 // OneToOneUniQueue
70
71 Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
72 theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
73 theCapacity(aCapacity)
74 {
75 Must(theMaxItemSize > 0);
76 Must(theCapacity > 0);
77 }
78
79 int
80 Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
81 {
82 assert(maxItemSize > 0);
83 size -= sizeof(OneToOneUniQueue);
84 return size >= 0 ? size / maxItemSize : 0;
85 }
86
87 int
88 Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
89 {
90 assert(size >= 0);
91 return sizeof(OneToOneUniQueue) + maxItemSize * size;
92 }
93
94
95 /* OneToOneUniQueues */
96
97 Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
98 {
99 Must(theCapacity > 0);
100 for (int i = 0; i < theCapacity; ++i)
101 new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
102 }
103
104 size_t
105 Ipc::OneToOneUniQueues::sharedMemorySize() const
106 {
107 return sizeof(*this) + theCapacity * front().sharedMemorySize();
108 }
109
110 size_t
111 Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
112 {
113 const int queueSize =
114 OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
115 return sizeof(OneToOneUniQueues) + queueSize * capacity;
116 }
117
118 const Ipc::OneToOneUniQueue &
119 Ipc::OneToOneUniQueues::operator [](const int index) const
120 {
121 Must(0 <= index && index < theCapacity);
122 const size_t queueSize = index ? front().sharedMemorySize() : 0;
123 const char *const queue =
124 reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
125 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
126 }
127
128
129 // FewToFewBiQueue
130
131 Ipc::FewToFewBiQueue::Owner *
132 Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
133 {
134 return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
135 }
136
137 Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
138 metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
139 queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
140 readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
141 theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
142 theLastPopProcessId(readers->theCapacity)
143 {
144 Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
145 Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
146
147 const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
148 debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id);
149 }
150
151 bool
152 Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
153 {
154 switch (group) {
155 case groupA:
156 return metadata->theGroupAIdOffset <= processId &&
157 processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
158 case groupB:
159 return metadata->theGroupBIdOffset <= processId &&
160 processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
161 }
162 return false;
163 }
164
165 int
166 Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
167 {
168 Must(fromGroup != toGroup);
169 assert(validProcessId(fromGroup, fromProcessId));
170 assert(validProcessId(toGroup, toProcessId));
171 int index1;
172 int index2;
173 int offset;
174 if (fromGroup == groupA) {
175 index1 = fromProcessId - metadata->theGroupAIdOffset;
176 index2 = toProcessId - metadata->theGroupBIdOffset;
177 offset = 0;
178 } else {
179 index1 = toProcessId - metadata->theGroupAIdOffset;
180 index2 = fromProcessId - metadata->theGroupBIdOffset;
181 offset = metadata->theGroupASize * metadata->theGroupBSize;
182 }
183 const int index = offset + index1 * metadata->theGroupBSize + index2;
184 return index;
185 }
186
187 Ipc::OneToOneUniQueue &
188 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
189 {
190 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
191 }
192
193 const Ipc::OneToOneUniQueue &
194 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
195 {
196 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
197 }
198
199 Ipc::QueueReader &
200 Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
201 {
202 Must(validProcessId(group, processId));
203 const int index = group == groupA ?
204 processId - metadata->theGroupAIdOffset :
205 metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
206 return readers->theReaders[index];
207 }
208
209 void
210 Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId)
211 {
212 QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
213 debugs(54, 7, HERE << "reader: " << localReader.id);
214
215 Must(validProcessId(remoteGroup(), remoteProcessId));
216 localReader.clearSignal();
217
218 // we got a hint; we could reposition iteration to try popping from the
219 // remoteProcessId queue first; but it does not seem to help much and might
220 // introduce some bias so we do not do that for now:
221 // theLastPopProcessId = remoteProcessId;
222 }
223
224 Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
225 theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
226 theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
227 {
228 Must(theGroupASize > 0);
229 Must(theGroupBSize > 0);
230 }
231
232 Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
233 metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
234 queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
235 readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
236 {
237 }
238
239 Ipc::FewToFewBiQueue::Owner::~Owner()
240 {
241 delete metadataOwner;
242 delete queuesOwner;
243 delete readersOwner;
244 }