2 * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 77 Delay Pools */
12 \defgroup DelayPoolsInternal Delay Pools Internal
13 \ingroup DelayPoolsAPI
19 #include "client_side_request.h"
20 #include "comm/Connection.h"
21 #include "CommonPool.h"
22 #include "CompositePoolNode.h"
23 #include "ConfigParser.h"
24 #include "DelayBucket.h"
26 #include "DelayPool.h"
27 #include "DelayPools.h"
28 #include "DelaySpec.h"
29 #include "DelayTagged.h"
30 #include "DelayUser.h"
31 #include "DelayVector.h"
33 #include "http/Stream.h"
34 #include "ip/Address.h"
35 #include "MemObject.h"
36 #include "mgr/Registration.h"
37 #include "NullDelayId.h"
38 #include "SquidString.h"
40 #include "StoreClient.h"
42 /// \ingroup DelayPoolsInternal
43 class Aggregate
: public CompositePoolNode
45 MEMPROXY_CLASS(Aggregate
);
48 typedef RefCount
<Aggregate
> Pointer
;
51 virtual DelaySpec
*rate() {return &spec
;}
53 virtual DelaySpec
const *rate() const {return &spec
;}
55 virtual void stats(StoreEntry
* sentry
);
56 virtual void dump(StoreEntry
*entry
) const;
57 virtual void update(int incr
);
60 virtual DelayIdComposite::Pointer
id(CompositeSelectionDetails
&);
64 /// \ingroup DelayPoolsInternal
65 class AggregateId
:public DelayIdComposite
67 MEMPROXY_CLASS(Aggregate::AggregateId
);
70 AggregateId (RefCount
<Aggregate
>);
71 virtual int bytesWanted (int min
, int max
) const;
72 virtual void bytesIn(int qty
);
73 virtual void delayRead(const AsyncCallPointer
&);
76 RefCount
<Aggregate
> theAggregate
;
79 friend class AggregateId
;
81 DelayBucket theBucket
;
85 /// \ingroup DelayPoolsInternal
86 template <class Key
, class Value
>
92 unsigned int size() const;
93 unsigned char findKeyIndex (Key
const key
) const;
94 bool indexUsed (unsigned char const index
) const;
95 unsigned int insert (Key
const key
);
97 #define IND_MAP_SZ 256
99 Key key_map
[IND_MAP_SZ
];
100 Value values
[IND_MAP_SZ
];
103 unsigned int nextMapPosition
;
106 /// \ingroup DelayPoolsInternal
107 class VectorPool
: public CompositePoolNode
109 MEMPROXY_CLASS(VectorPool
);
112 typedef RefCount
<VectorPool
> Pointer
;
113 virtual void dump(StoreEntry
*entry
) const;
114 virtual void parse();
115 virtual void update(int incr
);
116 virtual void stats(StoreEntry
* sentry
);
118 virtual DelayIdComposite::Pointer
id(CompositeSelectionDetails
&);
119 VectorMap
<unsigned char, DelayBucket
> buckets
;
124 bool keyAllocated (unsigned char const key
) const;
125 virtual DelaySpec
*rate() {return &spec
;}
127 virtual DelaySpec
const *rate() const {return &spec
;}
129 virtual char const *label() const = 0;
131 virtual unsigned int makeKey(Ip::Address
&src_addr
) const = 0;
135 /// \ingroup DelayPoolsInternal
136 class Id
:public DelayIdComposite
138 MEMPROXY_CLASS(VectorPool::Id
);
141 Id (RefCount
<VectorPool
>, int);
142 virtual int bytesWanted (int min
, int max
) const;
143 virtual void bytesIn(int qty
);
146 RefCount
<VectorPool
> theVector
;
151 /// \ingroup DelayPoolsInternal
152 class IndividualPool
: public VectorPool
154 MEMPROXY_CLASS(IndividualPool
);
157 virtual char const *label() const {return "Individual";}
158 virtual unsigned int makeKey(Ip::Address
&src_addr
) const;
161 /// \ingroup DelayPoolsInternal
162 class ClassCNetPool
: public VectorPool
164 MEMPROXY_CLASS(ClassCNetPool
);
167 virtual char const *label() const {return "Network";}
168 virtual unsigned int makeKey (Ip::Address
&src_addr
) const;
171 /* don't use remote storage for these */
172 /// \ingroup DelayPoolsInternal
177 bool individualUsed (unsigned int index
)const;
178 unsigned char findHostMapPosition (unsigned char const host
) const;
179 bool individualAllocated (unsigned char host
) const;
180 unsigned char hostPosition (DelaySpec
&rate
, unsigned char const host
);
181 void initHostIndex (DelaySpec
&rate
, unsigned char index
, unsigned char host
);
182 void update (DelaySpec
const &, int incr
);
183 void stats(StoreEntry
*)const;
186 VectorMap
<unsigned char, DelayBucket
> individuals
;
189 /// \ingroup DelayPoolsInternal
190 class ClassCHostPool
: public CompositePoolNode
192 MEMPROXY_CLASS(ClassCHostPool
);
195 typedef RefCount
<ClassCHostPool
> Pointer
;
196 virtual void dump(StoreEntry
*entry
) const;
197 virtual void parse();
198 virtual void update(int incr
);
199 virtual void stats(StoreEntry
* sentry
);
201 virtual DelayIdComposite::Pointer
id(CompositeSelectionDetails
&);
206 bool keyAllocated (unsigned char const key
) const;
207 virtual DelaySpec
*rate() {return &spec
;}
209 virtual DelaySpec
const *rate() const {return &spec
;}
211 virtual char const *label() const {return "Individual";}
213 virtual unsigned int makeKey(Ip::Address
&src_addr
) const;
215 unsigned char makeHostKey(Ip::Address
&src_addr
) const;
218 VectorMap
<unsigned char, ClassCBucket
> buckets
;
222 friend class ClassCHostPool::Id
;
224 /// \ingroup DelayPoolsInternal
225 class Id
:public DelayIdComposite
227 MEMPROXY_CLASS(ClassCHostPool::Id
);
230 Id (RefCount
<ClassCHostPool
>, unsigned char, unsigned char);
231 virtual int bytesWanted (int min
, int max
) const;
232 virtual void bytesIn(int qty
);
235 RefCount
<ClassCHostPool
> theClassCHost
;
236 unsigned char theNet
;
237 unsigned char theHost
;
242 Aggregate::AggregateId::delayRead(const AsyncCall::Pointer
&aRead
)
244 theAggregate
->delayRead(aRead
);
248 CommonPool::Factory(unsigned char _class
, CompositePoolNode::Pointer
& compositeCopy
)
250 CommonPool
*result
= new CommonPool
;
258 compositeCopy
= new Aggregate
;
259 result
->typeLabel
= "1";
263 result
->typeLabel
= "2";
265 DelayVector::Pointer temp
= new DelayVector
;
266 compositeCopy
= temp
.getRaw();
267 temp
->push_back (new Aggregate
);
268 temp
->push_back(new IndividualPool
);
273 result
->typeLabel
= "3";
275 DelayVector::Pointer temp
= new DelayVector
;
276 compositeCopy
= temp
.getRaw();
277 temp
->push_back (new Aggregate
);
278 temp
->push_back (new ClassCNetPool
);
279 temp
->push_back (new ClassCHostPool
);
284 result
->typeLabel
= "4";
286 DelayVector::Pointer temp
= new DelayVector
;
287 compositeCopy
= temp
.getRaw();
288 temp
->push_back (new Aggregate
);
289 temp
->push_back (new ClassCNetPool
);
290 temp
->push_back (new ClassCHostPool
);
292 temp
->push_back (new DelayUser
);
298 result
->typeLabel
= "5";
299 compositeCopy
= new DelayTagged
;
303 fatal ("unknown delay pool class");
310 CommonPool::CommonPool()
314 ClassCBucket::update (DelaySpec
const &rate
, int incr
)
316 /* If we aren't active, don't try to update us ! */
317 assert (rate
.restore_bps
!= -1);
319 for (unsigned int j
= 0; j
< individuals
.size(); ++j
)
320 individuals
.values
[j
].update (rate
, incr
);
324 ClassCBucket::stats(StoreEntry
*sentry
)const
326 for (unsigned int j
= 0; j
< individuals
.size(); ++j
) {
327 assert (individualUsed (j
));
328 storeAppendPrintf(sentry
, " %d:",individuals
.key_map
[j
]);
329 individuals
.values
[j
].stats (sentry
);
334 ClassCBucket::findHostMapPosition (unsigned char const host
) const
336 return individuals
.findKeyIndex(host
);
340 ClassCBucket::individualUsed (unsigned int index
)const
342 return individuals
.indexUsed(index
);
346 ClassCBucket::individualAllocated (unsigned char host
) const
348 return individualUsed(findHostMapPosition (host
));
352 ClassCBucket::hostPosition (DelaySpec
&rate
, unsigned char const host
)
354 if (individualAllocated (host
))
355 return findHostMapPosition(host
);
357 assert (!individualUsed (findHostMapPosition(host
)));
359 unsigned char result
= findHostMapPosition(host
);
361 initHostIndex (rate
, result
, host
);
367 ClassCBucket::initHostIndex (DelaySpec
&rate
, unsigned char index
, unsigned char host
)
369 assert (!individualUsed(index
));
371 unsigned char const newIndex
= individuals
.insert (host
);
373 /* give the bucket a default value */
374 individuals
.values
[newIndex
].init (rate
);
377 Aggregate::Aggregate()
379 theBucket
.init (*rate());
380 DelayPools::registerForUpdates (this);
383 Aggregate::~Aggregate()
385 DelayPools::deregisterForUpdates (this);
389 Aggregate::stats(StoreEntry
* sentry
)
391 rate()->stats (sentry
, "Aggregate");
393 if (rate()->restore_bps
== -1)
396 storeAppendPrintf(sentry
, "\t\tCurrent: ");
398 theBucket
.stats(sentry
);
400 storeAppendPrintf(sentry
, "\n\n");
404 Aggregate::dump(StoreEntry
*entry
) const
406 rate()->dump (entry
);
410 Aggregate::update(int incr
)
412 theBucket
.update(*rate(), incr
);
422 DelayIdComposite::Pointer
423 Aggregate::id(CompositeSelectionDetails
&)
425 if (rate()->restore_bps
!= -1)
426 return new AggregateId (this);
428 return new NullDelayId
;
431 Aggregate::AggregateId::AggregateId(RefCount
<Aggregate
> anAggregate
) : theAggregate(anAggregate
)
435 Aggregate::AggregateId::bytesWanted (int min
, int max
) const
437 return theAggregate
->theBucket
.bytesWanted(min
, max
);
441 Aggregate::AggregateId::bytesIn(int qty
)
443 theAggregate
->theBucket
.bytesIn(qty
);
444 theAggregate
->kickReads();
447 DelayPool
*DelayPools::delay_data
= nullptr;
448 time_t DelayPools::LastUpdate
= 0;
449 unsigned short DelayPools::pools_ (0);
452 DelayPools::RegisterWithCacheManager(void)
454 Mgr::RegisterAction("delay", "Delay Pool Levels", Stats
, 0, 1);
460 LastUpdate
= getCurrentTime();
461 RegisterWithCacheManager();
465 DelayPools::InitDelayData()
470 DelayPools::delay_data
= new DelayPool
[pools()];
472 eventAdd("DelayPools::Update", DelayPools::Update
, nullptr, 1.0, 1);
476 DelayPools::FreeDelayData()
478 delete[] DelayPools::delay_data
;
483 DelayPools::Update(void *)
485 // To prevent stuck transactions, stop updates only after no new transactions can
486 // register (because the pools were disabled) and the last registered transaction is gone.
487 if (!pools() && toUpdate
.empty())
490 eventAdd("DelayPools::Update", Update
, nullptr, 1.0, 1);
492 int incr
= squid_curtime
- LastUpdate
;
497 LastUpdate
= squid_curtime
;
499 std::vector
<Updateable
*>::iterator pos
= toUpdate
.begin();
501 while (pos
!= toUpdate
.end()) {
502 (*pos
)->update(incr
);
508 DelayPools::registerForUpdates(Updateable
*anObject
)
510 /* Assume no doubles */
511 toUpdate
.push_back(anObject
);
515 DelayPools::deregisterForUpdates (Updateable
*anObject
)
517 std::vector
<Updateable
*>::iterator pos
= toUpdate
.begin();
519 while (pos
!= toUpdate
.end() && *pos
!= anObject
) {
523 if (pos
!= toUpdate
.end()) {
524 /* move all objects down one */
525 std::vector
<Updateable
*>::iterator temp
= pos
;
528 while (pos
!= toUpdate
.end()) {
538 std::vector
<Updateable
*> DelayPools::toUpdate
;
541 DelayPools::Stats(StoreEntry
* sentry
)
543 storeAppendPrintf(sentry
, "Delay pools configured: %d\n\n", DelayPools::pools());
545 for (unsigned short i
= 0; i
< DelayPools::pools(); ++i
) {
546 if (DelayPools::delay_data
[i
].theComposite().getRaw()) {
547 storeAppendPrintf(sentry
, "Pool: %d\n\tClass: %s\n\n", i
+ 1, DelayPools::delay_data
[i
].pool
->theClassTypeLabel());
548 DelayPools::delay_data
[i
].theComposite()->stats (sentry
);
550 storeAppendPrintf(sentry
, "\tMisconfigured pool.\n\n");
555 DelayPools::FreePools()
557 if (!DelayPools::pools())
570 DelayPools::pools(unsigned short newPools
)
573 debugs(3, DBG_CRITICAL
, "parse_delay_pool_count: multiple delay_pools lines, aborting all previous delay_pools config");
583 template <class Key
, class Value
>
584 VectorMap
<Key
,Value
>::VectorMap() : nextMapPosition(0)
587 template <class Key
, class Value
>
589 VectorMap
<Key
,Value
>::size() const
591 return nextMapPosition
;
594 template <class Key
, class Value
>
596 VectorMap
<Key
,Value
>::insert (Key
const key
)
598 unsigned char index
= findKeyIndex (key
);
599 assert (!indexUsed(index
));
601 key_map
[index
] = key
;
608 VectorPool::VectorPool()
610 DelayPools::registerForUpdates (this);
613 VectorPool::~VectorPool()
615 DelayPools::deregisterForUpdates (this);
619 VectorPool::stats(StoreEntry
* sentry
)
621 rate()->stats (sentry
, label());
623 if (rate()->restore_bps
== -1) {
624 storeAppendPrintf(sentry
, "\n\n");
628 storeAppendPrintf(sentry
, "\t\tCurrent:");
630 for (unsigned int i
= 0; i
< buckets
.size(); ++i
) {
631 storeAppendPrintf(sentry
, " %d:", buckets
.key_map
[i
]);
632 buckets
.values
[i
].stats(sentry
);
636 storeAppendPrintf(sentry
, " Not used yet.");
638 storeAppendPrintf(sentry
, "\n\n");
642 VectorPool::dump(StoreEntry
*entry
) const
644 rate()->dump (entry
);
648 VectorPool::update(int incr
)
650 if (rate()->restore_bps
== -1)
653 for (unsigned int i
= 0; i
< buckets
.size(); ++i
)
654 buckets
.values
[i
].update (*rate(), incr
);
664 VectorPool::keyAllocated (unsigned char const key
) const
666 return buckets
.indexUsed(buckets
.findKeyIndex (key
));
669 template <class Key
, class Value
>
671 VectorMap
<Key
,Value
>::indexUsed (unsigned char const index
) const
673 return index
< size();
676 /** returns the used position, or the position to allocate */
677 template <class Key
, class Value
>
679 VectorMap
<Key
,Value
>::findKeyIndex (Key
const key
) const
681 for (unsigned int index
= 0; index
< size(); ++index
) {
682 assert(indexUsed(index
));
684 if (key_map
[index
] == key
)
692 DelayIdComposite::Pointer
693 VectorPool::id(CompositeSelectionDetails
&details
)
695 if (rate()->restore_bps
== -1)
696 return new NullDelayId
;
698 /* non-IPv4 are not able to provide IPv4-bitmask for this pool type key. */
699 if ( !details
.src_addr
.isIPv4() )
700 return new NullDelayId
;
702 unsigned int key
= makeKey(details
.src_addr
);
704 if (keyAllocated(key
))
705 return new Id(this, buckets
.findKeyIndex(key
));
707 unsigned char const resultIndex
= buckets
.insert(key
);
709 buckets
.values
[resultIndex
].init(*rate());
711 return new Id(this, resultIndex
);
714 VectorPool::Id::Id(VectorPool::Pointer aPool
, int anIndex
) : theVector (aPool
), theIndex (anIndex
)
718 VectorPool::Id::bytesWanted (int min
, int max
) const
720 return theVector
->buckets
.values
[theIndex
].bytesWanted (min
, max
);
724 VectorPool::Id::bytesIn(int qty
)
726 theVector
->buckets
.values
[theIndex
].bytesIn (qty
);
730 IndividualPool::makeKey(Ip::Address
&src_addr
) const
732 /* IPv4 required for this pool */
733 if ( !src_addr
.isIPv4() )
737 src_addr
.getInAddr(host
);
738 return (ntohl(host
.s_addr
) & 0xff);
742 ClassCNetPool::makeKey(Ip::Address
&src_addr
) const
744 /* IPv4 required for this pool */
745 if ( !src_addr
.isIPv4() )
749 src_addr
.getInAddr(net
);
750 return ( (ntohl(net
.s_addr
) >> 8) & 0xff);
753 ClassCHostPool::ClassCHostPool()
755 DelayPools::registerForUpdates (this);
758 ClassCHostPool::~ClassCHostPool()
760 DelayPools::deregisterForUpdates (this);
764 ClassCHostPool::stats(StoreEntry
* sentry
)
766 rate()->stats (sentry
, label());
768 if (rate()->restore_bps
== -1) {
769 storeAppendPrintf(sentry
, "\n\n");
773 for (unsigned int index
= 0; index
< buckets
.size(); ++index
) {
774 storeAppendPrintf(sentry
, "\t\tCurrent [Network %d]:", buckets
.key_map
[index
]);
775 buckets
.values
[index
].stats (sentry
);
776 storeAppendPrintf(sentry
, "\n");
780 storeAppendPrintf(sentry
, "\t\tCurrent [All networks]: Not used yet.\n");
782 storeAppendPrintf(sentry
, "\n\n");
786 ClassCHostPool::dump(StoreEntry
*entry
) const
788 rate()->dump (entry
);
792 ClassCHostPool::update(int incr
)
794 if (rate()->restore_bps
== -1)
797 for (unsigned int i
= 0; i
< buckets
.size(); ++i
)
798 buckets
.values
[i
].update (*rate(), incr
);
802 ClassCHostPool::parse()
808 ClassCHostPool::keyAllocated (unsigned char const key
) const
810 return buckets
.indexUsed(buckets
.findKeyIndex (key
));
814 ClassCHostPool::makeHostKey(Ip::Address
&src_addr
) const
816 /* IPv4 required for this pool */
817 if ( !src_addr
.isIPv4() )
820 /* Temporary bypass for IPv4-only */
822 src_addr
.getInAddr(host
);
823 return (ntohl(host
.s_addr
) & 0xff);
827 ClassCHostPool::makeKey(Ip::Address
&src_addr
) const
829 /* IPv4 required for this pool */
830 if ( !src_addr
.isIPv4() )
834 src_addr
.getInAddr(net
);
835 return ( (ntohl(net
.s_addr
) >> 8) & 0xff);
838 DelayIdComposite::Pointer
839 ClassCHostPool::id(CompositeSelectionDetails
&details
)
841 if (rate()->restore_bps
== -1)
842 return new NullDelayId
;
844 /* non-IPv4 are not able to provide IPv4-bitmask for this pool type key. */
845 if ( !details
.src_addr
.isIPv4() )
846 return new NullDelayId
;
848 unsigned int key
= makeKey (details
.src_addr
);
850 unsigned char host
= makeHostKey (details
.src_addr
);
852 unsigned char hostIndex
;
854 unsigned char netIndex
;
856 if (keyAllocated (key
))
857 netIndex
= buckets
.findKeyIndex(key
);
859 netIndex
= buckets
.insert (key
);
861 hostIndex
= buckets
.values
[netIndex
].hostPosition (*rate(), host
);
863 return new Id (this, netIndex
, hostIndex
);
866 ClassCHostPool::Id::Id (ClassCHostPool::Pointer aPool
, unsigned char aNet
, unsigned char aHost
) : theClassCHost (aPool
), theNet (aNet
), theHost (aHost
)
870 ClassCHostPool::Id::bytesWanted (int min
, int max
) const
872 return theClassCHost
->buckets
.values
[theNet
].individuals
.values
[theHost
].bytesWanted (min
, max
);
876 ClassCHostPool::Id::bytesIn(int qty
)
878 theClassCHost
->buckets
.values
[theNet
].individuals
.values
[theHost
].bytesIn (qty
);
881 #endif /* USE_DELAY_POOLS */