]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.cc
2 * DEBUG: section 54 Interprocess Communication
7 #include "base/TextException.h"
10 #include "ipc/Queue.h"
12 /// constructs Metadata ID from parent queue ID
16 id
. append ( "__metadata" );
20 /// constructs one-to-one queues ID from parent queue ID
24 id
. append ( "__queues" );
28 /// constructs QueueReaders ID from parent queue ID
32 id
. append ( "__readers" );
38 InstanceIdDefinitions ( Ipc :: QueueReader
, "ipcQR" );
40 Ipc :: QueueReader :: QueueReader (): popBlocked ( 1 ), popSignal ( 0 ),
41 rateLimit ( 0 ), balance ( 0 )
43 debugs ( 54 , 7 , HERE
<< "constructed " << id
);
48 Ipc :: QueueReaders :: QueueReaders ( const int aCapacity
): theCapacity ( aCapacity
),
49 theReaders ( theCapacity
)
51 Must ( theCapacity
> 0 );
55 Ipc :: QueueReaders :: sharedMemorySize () const
57 return SharedMemorySize ( theCapacity
);
61 Ipc :: QueueReaders :: SharedMemorySize ( const int capacity
)
63 return sizeof ( QueueReaders
) + sizeof ( QueueReader
) * capacity
;
68 Ipc :: OneToOneUniQueue :: OneToOneUniQueue ( const unsigned int aMaxItemSize
, const int aCapacity
):
69 theIn ( 0 ), theOut ( 0 ), theSize ( 0 ), theMaxItemSize ( aMaxItemSize
),
70 theCapacity ( aCapacity
)
72 Must ( theMaxItemSize
> 0 );
73 Must ( theCapacity
> 0 );
77 Ipc :: OneToOneUniQueue :: Bytes2Items ( const unsigned int maxItemSize
, int size
)
79 assert ( maxItemSize
> 0 );
80 size
-= sizeof ( OneToOneUniQueue
);
81 return size
>= 0 ? size
/ maxItemSize
: 0 ;
85 Ipc :: OneToOneUniQueue :: Items2Bytes ( const unsigned int maxItemSize
, const int size
)
88 return sizeof ( OneToOneUniQueue
) + maxItemSize
* size
;
91 /* OneToOneUniQueues */
93 Ipc :: OneToOneUniQueues :: OneToOneUniQueues ( const int aCapacity
, const unsigned int maxItemSize
, const int queueCapacity
): theCapacity ( aCapacity
)
95 Must ( theCapacity
> 0 );
96 for ( int i
= 0 ; i
< theCapacity
; ++ i
)
97 new (&(* this )[ i
]) OneToOneUniQueue ( maxItemSize
, queueCapacity
);
101 Ipc :: OneToOneUniQueues :: sharedMemorySize () const
103 return sizeof (* this ) + theCapacity
* front (). sharedMemorySize ();
107 Ipc :: OneToOneUniQueues :: SharedMemorySize ( const int capacity
, const unsigned int maxItemSize
, const int queueCapacity
)
109 const int queueSize
=
110 OneToOneUniQueue :: Items2Bytes ( maxItemSize
, queueCapacity
);
111 return sizeof ( OneToOneUniQueues
) + queueSize
* capacity
;
114 const Ipc :: OneToOneUniQueue
&
115 Ipc :: OneToOneUniQueues :: operator []( const int index
) const
117 Must ( 0 <= index
&& index
< theCapacity
);
118 const size_t queueSize
= index
? front (). sharedMemorySize () : 0 ;
119 const char * const queue
=
120 reinterpret_cast < const char *>( this ) + sizeof (* this ) + index
* queueSize
;
121 return * reinterpret_cast < const OneToOneUniQueue
*>( queue
);
126 Ipc :: FewToFewBiQueue :: Owner
*
127 Ipc :: FewToFewBiQueue :: Init ( const String
& id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
)
129 return new Owner ( id
, groupASize
, groupAIdOffset
, groupBSize
, groupBIdOffset
, maxItemSize
, capacity
);
132 Ipc :: FewToFewBiQueue :: FewToFewBiQueue ( const String
& id
, const Group aLocalGroup
, const int aLocalProcessId
):
133 metadata ( shm_old ( Metadata
)( MetadataId ( id
). termedBuf ())),
134 queues ( shm_old ( OneToOneUniQueues
)( QueuesId ( id
). termedBuf ())),
135 readers ( shm_old ( QueueReaders
)( ReadersId ( id
). termedBuf ())),
136 theLocalGroup ( aLocalGroup
), theLocalProcessId ( aLocalProcessId
),
137 theLastPopProcessId ( readers
-> theCapacity
)
139 Must ( queues
-> theCapacity
== metadata
-> theGroupASize
* metadata
-> theGroupBSize
* 2 );
140 Must ( readers
-> theCapacity
== metadata
-> theGroupASize
+ metadata
-> theGroupBSize
);
142 const QueueReader
& localReader
= reader ( theLocalGroup
, theLocalProcessId
);
143 debugs ( 54 , 7 , HERE
<< "queue " << id
<< " reader: " << localReader
. id
);
147 Ipc :: FewToFewBiQueue :: MaxItemsCount ( const int groupASize
, const int groupBSize
, const int capacity
)
149 return capacity
* groupASize
* groupBSize
* 2 ;
153 Ipc :: FewToFewBiQueue :: validProcessId ( const Group group
, const int processId
) const
157 return metadata
-> theGroupAIdOffset
<= processId
&&
158 processId
< metadata
-> theGroupAIdOffset
+ metadata
-> theGroupASize
;
160 return metadata
-> theGroupBIdOffset
<= processId
&&
161 processId
< metadata
-> theGroupBIdOffset
+ metadata
-> theGroupBSize
;
167 Ipc :: FewToFewBiQueue :: oneToOneQueueIndex ( const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const
169 Must ( fromGroup
!= toGroup
);
170 assert ( validProcessId ( fromGroup
, fromProcessId
));
171 assert ( validProcessId ( toGroup
, toProcessId
));
175 if ( fromGroup
== groupA
) {
176 index1
= fromProcessId
- metadata
-> theGroupAIdOffset
;
177 index2
= toProcessId
- metadata
-> theGroupBIdOffset
;
180 index1
= toProcessId
- metadata
-> theGroupAIdOffset
;
181 index2
= fromProcessId
- metadata
-> theGroupBIdOffset
;
182 offset
= metadata
-> theGroupASize
* metadata
-> theGroupBSize
;
184 const int index
= offset
+ index1
* metadata
-> theGroupBSize
+ index2
;
188 Ipc :: OneToOneUniQueue
&
189 Ipc :: FewToFewBiQueue :: oneToOneQueue ( const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
)
191 return (* queues
)[ oneToOneQueueIndex ( fromGroup
, fromProcessId
, toGroup
, toProcessId
)];
194 const Ipc :: OneToOneUniQueue
&
195 Ipc :: FewToFewBiQueue :: oneToOneQueue ( const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const
197 return (* queues
)[ oneToOneQueueIndex ( fromGroup
, fromProcessId
, toGroup
, toProcessId
)];
200 /// incoming queue from a given remote process
201 const Ipc :: OneToOneUniQueue
&
202 Ipc :: FewToFewBiQueue :: inQueue ( const int remoteProcessId
) const
204 return oneToOneQueue ( remoteGroup (), remoteProcessId
,
205 theLocalGroup
, theLocalProcessId
);
208 /// outgoing queue to a given remote process
209 const Ipc :: OneToOneUniQueue
&
210 Ipc :: FewToFewBiQueue :: outQueue ( const int remoteProcessId
) const
212 return oneToOneQueue ( theLocalGroup
, theLocalProcessId
,
213 remoteGroup (), remoteProcessId
);
217 Ipc :: FewToFewBiQueue :: readerIndex ( const Group group
, const int processId
) const
219 Must ( validProcessId ( group
, processId
));
220 return group
== groupA
?
221 processId
- metadata
-> theGroupAIdOffset
:
222 metadata
-> theGroupASize
+ processId
- metadata
-> theGroupBIdOffset
;
226 Ipc :: FewToFewBiQueue :: reader ( const Group group
, const int processId
)
228 return readers
-> theReaders
[ readerIndex ( group
, processId
)];
231 const Ipc :: QueueReader
&
232 Ipc :: FewToFewBiQueue :: reader ( const Group group
, const int processId
) const
234 return readers
-> theReaders
[ readerIndex ( group
, processId
)];
238 Ipc :: FewToFewBiQueue :: clearReaderSignal ( const int remoteProcessId
)
240 QueueReader
& localReader
= reader ( theLocalGroup
, theLocalProcessId
);
241 debugs ( 54 , 7 , HERE
<< "reader: " << localReader
. id
);
243 Must ( validProcessId ( remoteGroup (), remoteProcessId
));
244 localReader
. clearSignal ();
246 // we got a hint; we could reposition iteration to try popping from the
247 // remoteProcessId queue first; but it does not seem to help much and might
248 // introduce some bias so we do not do that for now:
249 // theLastPopProcessId = remoteProcessId;
252 Ipc :: QueueReader :: Balance
&
253 Ipc :: FewToFewBiQueue :: localBalance ()
255 QueueReader
& r
= reader ( theLocalGroup
, theLocalProcessId
);
259 const Ipc :: QueueReader :: Balance
&
260 Ipc :: FewToFewBiQueue :: balance ( const int remoteProcessId
) const
262 const QueueReader
& r
= reader ( remoteGroup (), remoteProcessId
);
266 Ipc :: QueueReader :: Rate
&
267 Ipc :: FewToFewBiQueue :: localRateLimit ()
269 QueueReader
& r
= reader ( theLocalGroup
, theLocalProcessId
);
273 const Ipc :: QueueReader :: Rate
&
274 Ipc :: FewToFewBiQueue :: rateLimit ( const int remoteProcessId
) const
276 const QueueReader
& r
= reader ( remoteGroup (), remoteProcessId
);
280 Ipc :: FewToFewBiQueue :: Metadata :: Metadata ( const int aGroupASize
, const int aGroupAIdOffset
, const int aGroupBSize
, const int aGroupBIdOffset
):
281 theGroupASize ( aGroupASize
), theGroupAIdOffset ( aGroupAIdOffset
),
282 theGroupBSize ( aGroupBSize
), theGroupBIdOffset ( aGroupBIdOffset
)
284 Must ( theGroupASize
> 0 );
285 Must ( theGroupBSize
> 0 );
288 Ipc :: FewToFewBiQueue :: Owner :: Owner ( const String
& id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
):
289 metadataOwner ( shm_new ( Metadata
)( MetadataId ( id
). termedBuf (), groupASize
, groupAIdOffset
, groupBSize
, groupBIdOffset
)),
290 queuesOwner ( shm_new ( OneToOneUniQueues
)( QueuesId ( id
). termedBuf (), groupASize
* groupBSize
* 2 , maxItemSize
, capacity
)),
291 readersOwner ( shm_new ( QueueReaders
)( ReadersId ( id
). termedBuf (), groupASize
+ groupBSize
))
295 Ipc :: FewToFewBiQueue :: Owner ::~ Owner ()
297 delete metadataOwner
;