]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.cc
4 * DEBUG: section 54 Interprocess Communication
9 #include "base/TextException.h"
12 #include "ipc/Queue.h"
14 /// constructs Metadata ID from parent queue ID
18 id
. append ( "__metadata" );
22 /// constructs one-to-one queues ID from parent queue ID
26 id
. append ( "__queues" );
30 /// constructs QueueReaders ID from parent queue ID
34 id
. append ( "__readers" );
41 InstanceIdDefinitions ( Ipc :: QueueReader
, "ipcQR" );
43 Ipc :: QueueReader :: QueueReader (): popBlocked ( 1 ), popSignal ( 0 ),
44 rateLimit ( 0 ), balance ( 0 )
46 debugs ( 54 , 7 , HERE
<< "constructed " << id
);
51 Ipc :: QueueReaders :: QueueReaders ( const int aCapacity
): theCapacity ( aCapacity
)
53 Must ( theCapacity
> 0 );
54 new ( theReaders
) QueueReader
[ theCapacity
];
58 Ipc :: QueueReaders :: sharedMemorySize () const
60 return SharedMemorySize ( theCapacity
);
64 Ipc :: QueueReaders :: SharedMemorySize ( const int capacity
)
66 return sizeof ( QueueReaders
) + sizeof ( QueueReader
) * capacity
;
72 Ipc :: OneToOneUniQueue :: OneToOneUniQueue ( const unsigned int aMaxItemSize
, const int aCapacity
):
73 theIn ( 0 ), theOut ( 0 ), theSize ( 0 ), theMaxItemSize ( aMaxItemSize
),
74 theCapacity ( aCapacity
)
76 Must ( theMaxItemSize
> 0 );
77 Must ( theCapacity
> 0 );
81 Ipc :: OneToOneUniQueue :: Bytes2Items ( const unsigned int maxItemSize
, int size
)
83 assert ( maxItemSize
> 0 );
84 size
-= sizeof ( OneToOneUniQueue
);
85 return size
>= 0 ? size
/ maxItemSize
: 0 ;
89 Ipc :: OneToOneUniQueue :: Items2Bytes ( const unsigned int maxItemSize
, const int size
)
92 return sizeof ( OneToOneUniQueue
) + maxItemSize
* size
;
96 /* OneToOneUniQueues */
98 Ipc :: OneToOneUniQueues :: OneToOneUniQueues ( const int aCapacity
, const unsigned int maxItemSize
, const int queueCapacity
): theCapacity ( aCapacity
)
100 Must ( theCapacity
> 0 );
101 for ( int i
= 0 ; i
< theCapacity
; ++ i
)
102 new (&(* this )[ i
]) OneToOneUniQueue ( maxItemSize
, queueCapacity
);
106 Ipc :: OneToOneUniQueues :: sharedMemorySize () const
108 return sizeof (* this ) + theCapacity
* front (). sharedMemorySize ();
112 Ipc :: OneToOneUniQueues :: SharedMemorySize ( const int capacity
, const unsigned int maxItemSize
, const int queueCapacity
)
114 const int queueSize
=
115 OneToOneUniQueue :: Items2Bytes ( maxItemSize
, queueCapacity
);
116 return sizeof ( OneToOneUniQueues
) + queueSize
* capacity
;
119 const Ipc :: OneToOneUniQueue
&
120 Ipc :: OneToOneUniQueues :: operator []( const int index
) const
122 Must ( 0 <= index
&& index
< theCapacity
);
123 const size_t queueSize
= index
? front (). sharedMemorySize () : 0 ;
124 const char * const queue
=
125 reinterpret_cast < const char *>( this ) + sizeof (* this ) + index
* queueSize
;
126 return * reinterpret_cast < const OneToOneUniQueue
*>( queue
);
132 Ipc :: FewToFewBiQueue :: Owner
*
133 Ipc :: FewToFewBiQueue :: Init ( const String
& id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
)
135 return new Owner ( id
, groupASize
, groupAIdOffset
, groupBSize
, groupBIdOffset
, maxItemSize
, capacity
);
138 Ipc :: FewToFewBiQueue :: FewToFewBiQueue ( const String
& id
, const Group aLocalGroup
, const int aLocalProcessId
):
139 metadata ( shm_old ( Metadata
)( MetadataId ( id
). termedBuf ())),
140 queues ( shm_old ( OneToOneUniQueues
)( QueuesId ( id
). termedBuf ())),
141 readers ( shm_old ( QueueReaders
)( ReadersId ( id
). termedBuf ())),
142 theLocalGroup ( aLocalGroup
), theLocalProcessId ( aLocalProcessId
),
143 theLastPopProcessId ( readers
-> theCapacity
)
145 Must ( queues
-> theCapacity
== metadata
-> theGroupASize
* metadata
-> theGroupBSize
* 2 );
146 Must ( readers
-> theCapacity
== metadata
-> theGroupASize
+ metadata
-> theGroupBSize
);
148 const QueueReader
& localReader
= reader ( theLocalGroup
, theLocalProcessId
);
149 debugs ( 54 , 7 , HERE
<< "queue " << id
<< " reader: " << localReader
. id
);
153 Ipc :: FewToFewBiQueue :: MaxItemsCount ( const int groupASize
, const int groupBSize
, const int capacity
)
155 return capacity
* groupASize
* groupBSize
* 2 ;
159 Ipc :: FewToFewBiQueue :: validProcessId ( const Group group
, const int processId
) const
163 return metadata
-> theGroupAIdOffset
<= processId
&&
164 processId
< metadata
-> theGroupAIdOffset
+ metadata
-> theGroupASize
;
166 return metadata
-> theGroupBIdOffset
<= processId
&&
167 processId
< metadata
-> theGroupBIdOffset
+ metadata
-> theGroupBSize
;
173 Ipc :: FewToFewBiQueue :: oneToOneQueueIndex ( const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const
175 Must ( fromGroup
!= toGroup
);
176 assert ( validProcessId ( fromGroup
, fromProcessId
));
177 assert ( validProcessId ( toGroup
, toProcessId
));
181 if ( fromGroup
== groupA
) {
182 index1
= fromProcessId
- metadata
-> theGroupAIdOffset
;
183 index2
= toProcessId
- metadata
-> theGroupBIdOffset
;
186 index1
= toProcessId
- metadata
-> theGroupAIdOffset
;
187 index2
= fromProcessId
- metadata
-> theGroupBIdOffset
;
188 offset
= metadata
-> theGroupASize
* metadata
-> theGroupBSize
;
190 const int index
= offset
+ index1
* metadata
-> theGroupBSize
+ index2
;
194 Ipc :: OneToOneUniQueue
&
195 Ipc :: FewToFewBiQueue :: oneToOneQueue ( const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
)
197 return (* queues
)[ oneToOneQueueIndex ( fromGroup
, fromProcessId
, toGroup
, toProcessId
)];
200 const Ipc :: OneToOneUniQueue
&
201 Ipc :: FewToFewBiQueue :: oneToOneQueue ( const Group fromGroup
, const int fromProcessId
, const Group toGroup
, const int toProcessId
) const
203 return (* queues
)[ oneToOneQueueIndex ( fromGroup
, fromProcessId
, toGroup
, toProcessId
)];
206 /// incoming queue from a given remote process
207 const Ipc :: OneToOneUniQueue
&
208 Ipc :: FewToFewBiQueue :: inQueue ( const int remoteProcessId
) const
210 return oneToOneQueue ( remoteGroup (), remoteProcessId
,
211 theLocalGroup
, theLocalProcessId
);
214 /// outgoing queue to a given remote process
215 const Ipc :: OneToOneUniQueue
&
216 Ipc :: FewToFewBiQueue :: outQueue ( const int remoteProcessId
) const
218 return oneToOneQueue ( theLocalGroup
, theLocalProcessId
,
219 remoteGroup (), remoteProcessId
);
223 Ipc :: FewToFewBiQueue :: readerIndex ( const Group group
, const int processId
) const
225 Must ( validProcessId ( group
, processId
));
226 return group
== groupA
?
227 processId
- metadata
-> theGroupAIdOffset
:
228 metadata
-> theGroupASize
+ processId
- metadata
-> theGroupBIdOffset
;
232 Ipc :: FewToFewBiQueue :: reader ( const Group group
, const int processId
)
234 return readers
-> theReaders
[ readerIndex ( group
, processId
)];
237 const Ipc :: QueueReader
&
238 Ipc :: FewToFewBiQueue :: reader ( const Group group
, const int processId
) const
240 return readers
-> theReaders
[ readerIndex ( group
, processId
)];
244 Ipc :: FewToFewBiQueue :: clearReaderSignal ( const int remoteProcessId
)
246 QueueReader
& localReader
= reader ( theLocalGroup
, theLocalProcessId
);
247 debugs ( 54 , 7 , HERE
<< "reader: " << localReader
. id
);
249 Must ( validProcessId ( remoteGroup (), remoteProcessId
));
250 localReader
. clearSignal ();
252 // we got a hint; we could reposition iteration to try popping from the
253 // remoteProcessId queue first; but it does not seem to help much and might
254 // introduce some bias so we do not do that for now:
255 // theLastPopProcessId = remoteProcessId;
258 Ipc :: QueueReader :: Balance
&
259 Ipc :: FewToFewBiQueue :: localBalance ()
261 QueueReader
& r
= reader ( theLocalGroup
, theLocalProcessId
);
265 const Ipc :: QueueReader :: Balance
&
266 Ipc :: FewToFewBiQueue :: balance ( const int remoteProcessId
) const
268 const QueueReader
& r
= reader ( remoteGroup (), remoteProcessId
);
272 Ipc :: QueueReader :: Rate
&
273 Ipc :: FewToFewBiQueue :: localRateLimit ()
275 QueueReader
& r
= reader ( theLocalGroup
, theLocalProcessId
);
279 const Ipc :: QueueReader :: Rate
&
280 Ipc :: FewToFewBiQueue :: rateLimit ( const int remoteProcessId
) const
282 const QueueReader
& r
= reader ( remoteGroup (), remoteProcessId
);
286 Ipc :: FewToFewBiQueue :: Metadata :: Metadata ( const int aGroupASize
, const int aGroupAIdOffset
, const int aGroupBSize
, const int aGroupBIdOffset
):
287 theGroupASize ( aGroupASize
), theGroupAIdOffset ( aGroupAIdOffset
),
288 theGroupBSize ( aGroupBSize
), theGroupBIdOffset ( aGroupBIdOffset
)
290 Must ( theGroupASize
> 0 );
291 Must ( theGroupBSize
> 0 );
294 Ipc :: FewToFewBiQueue :: Owner :: Owner ( const String
& id
, const int groupASize
, const int groupAIdOffset
, const int groupBSize
, const int groupBIdOffset
, const unsigned int maxItemSize
, const int capacity
):
295 metadataOwner ( shm_new ( Metadata
)( MetadataId ( id
). termedBuf (), groupASize
, groupAIdOffset
, groupBSize
, groupBIdOffset
)),
296 queuesOwner ( shm_new ( OneToOneUniQueues
)( QueuesId ( id
). termedBuf (), groupASize
* groupBSize
* 2 , maxItemSize
, capacity
)),
297 readersOwner ( shm_new ( QueueReaders
)( ReadersId ( id
). termedBuf (), groupASize
+ groupBSize
))
301 Ipc :: FewToFewBiQueue :: Owner ::~ Owner ()
303 delete metadataOwner
;