]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.cc
4 * DEBUG: section 54 Interprocess Communication
9 #include "base/TextException.h"
12 #include "ipc/Queue.h"
14 /// constructs shared segment ID from parent queue ID and child queue index
16 QueueId(String id
, const int idx
)
19 id
.append(xitoa(idx
));
23 /// constructs QueueReader ID from parent queue ID
27 id
.append("__readers");
34 InstanceIdDefinitions(QueueReader
, "ipcQR");
36 QueueReader::QueueReader(): popBlocked(1), popSignal(0)
38 debugs(54, 7, HERE
<< "constructed " << id
);
44 OneToOneUniQueue::OneToOneUniQueue(const String
&id
, const unsigned int maxItemSize
, const int capacity
):
45 shm(id
.termedBuf()), reader_(NULL
)
47 const int sharedSize
= Items2Bytes(maxItemSize
, capacity
);
48 shm
.create(sharedSize
);
49 shared
= new (shm
.reserve(sharedSize
)) Shared(maxItemSize
, capacity
);
52 OneToOneUniQueue::OneToOneUniQueue(const String
&id
): shm(id
.termedBuf()),
56 shared
= reinterpret_cast<Shared
*>(shm
.mem());
58 const int mySharedSize
=
59 Items2Bytes(shared
->theMaxItemSize
, shared
->theCapacity
);
60 assert(shared
== reinterpret_cast<Shared
*>(shm
.reserve(mySharedSize
)));
64 OneToOneUniQueue::reader(QueueReader
*aReader
)
66 Must(!reader_
&& aReader
);
71 OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize
, int size
)
73 assert(maxItemSize
> 0);
74 size
-= sizeof(Shared
);
75 return size
>= 0 ? size
/ maxItemSize
: 0;
79 OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize
, const int size
)
82 return sizeof(Shared
) + maxItemSize
* size
;
85 OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize
, const int aCapacity
):
86 theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize
),
87 theCapacity(aCapacity
)
92 OneToOneUniQueue::reader()
101 OneToOneBiQueue::OneToOneBiQueue(const String
&id
, const unsigned int maxItemSize
, const int capacity
):
102 popQueue(new OneToOneUniQueue(QueueId(id
, 1), maxItemSize
, capacity
)),
103 pushQueue(new OneToOneUniQueue(QueueId(id
, 2), maxItemSize
, capacity
))
107 OneToOneBiQueue::OneToOneBiQueue(const String
&id
):
108 popQueue(new OneToOneUniQueue(QueueId(id
, 2))),
109 pushQueue(new OneToOneUniQueue(QueueId(id
, 1)))
114 OneToOneBiQueue::readers(QueueReader
*r1
, QueueReader
*r2
)
116 popQueue
->reader(r1
);
117 pushQueue
->reader(r2
);
121 OneToOneBiQueue::clearReaderSignal()
123 debugs(54, 7, HERE
<< "reader: " << &popQueue
->reader());
124 popQueue
->reader().clearSignal();
130 FewToOneBiQueue::FewToOneBiQueue(const String
&id
, const int aWorkerCount
, const unsigned int maxItemSize
, const int capacity
):
131 theLastPopWorker(0), theWorkerCount(aWorkerCount
),
132 shm(ReaderId(id
).termedBuf()),
135 // create a new segment for the local and remote queue readers
136 // TODO: all our queues and readers should use a single segment
137 shm
.create((theWorkerCount
+1)*sizeof(QueueReader
));
138 reader
= new (shm
.reserve(sizeof(QueueReader
))) QueueReader
;
139 debugs(54, 7, HERE
<< "disker " << id
<< " reader: " << reader
->id
);
141 assert(theWorkerCount
>= 0);
142 biQueues
.reserve(theWorkerCount
);
143 for (int i
= 0; i
< theWorkerCount
; ++i
) {
144 OneToOneBiQueue
*const biQueue
=
145 new OneToOneBiQueue(QueueId(id
, i
), maxItemSize
, capacity
);
146 QueueReader
*remoteReader
=
147 new (shm
.reserve(sizeof(QueueReader
))) QueueReader
;
148 biQueue
->readers(reader
, remoteReader
);
149 biQueues
.push_back(biQueue
);
154 FewToOneBiQueue::Attach(const String
&id
, const int workerId
)
156 // XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker?
157 Ipc::Mem::Segment
*shmPtr
= new Ipc::Mem::Segment(ReaderId(id
).termedBuf());
159 Ipc::Mem::Segment
&shm
= *shmPtr
;
161 assert(shm
.size() >= static_cast<off_t
>((1 + workerId
+1)*sizeof(QueueReader
)));
162 QueueReader
*readers
= reinterpret_cast<QueueReader
*>(shm
.mem());
163 QueueReader
*remoteReader
= &readers
[0];
164 debugs(54, 7, HERE
<< "disker " << id
<< " reader: " << remoteReader
->id
);
165 QueueReader
*localReader
= &readers
[workerId
+1];
166 debugs(54, 7, HERE
<< "local " << id
<< " reader: " << localReader
->id
);
168 OneToOneBiQueue
*const biQueue
=
169 new OneToOneBiQueue(QueueId(id
, workerId
));
170 biQueue
->readers(localReader
, remoteReader
);
174 FewToOneBiQueue::~FewToOneBiQueue()
176 for (int i
= 0; i
< theWorkerCount
; ++i
)
180 bool FewToOneBiQueue::validWorkerId(const int workerId
) const
182 return 0 <= workerId
&& workerId
< theWorkerCount
;
186 FewToOneBiQueue::clearReaderSignal(int workerId
)
188 debugs(54, 7, HERE
<< "reader: " << reader
->id
);
190 assert(validWorkerId(workerId
));
191 reader
->clearSignal();
193 // we got a hint; we could reposition iteration to try popping from the
194 // workerId queue first; but it does not seem to help much and might
195 // introduce some bias so we do not do that for now:
196 // theLastPopWorker = (workerId + theWorkerCount - 1) % theWorkerCount;