]> git.ipfire.org Git - thirdparty/squid.git/blame_incremental - src/icp_v2.cc
Simplify appending SBuf to String (#2108)
[thirdparty/squid.git] / src / icp_v2.cc
... / ...
CommitLineData
1/*
2 * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 12 Internet Cache Protocol (ICP) */
10
11/**
12 \defgroup ServerProtocolICPInternal2 ICPv2 Internals
13 \ingroup ServerProtocolICPAPI
14 */
15
16#include "squid.h"
17#include "AccessLogEntry.h"
18#include "acl/Acl.h"
19#include "acl/FilledChecklist.h"
20#include "base/AsyncCallbacks.h"
21#include "client_db.h"
22#include "comm.h"
23#include "comm/Connection.h"
24#include "comm/Loops.h"
25#include "fd.h"
26#include "HttpRequest.h"
27#include "icmp/net_db.h"
28#include "ICP.h"
29#include "ip/Address.h"
30#include "ip/tools.h"
31#include "ipc/StartListening.h"
32#include "ipcache.h"
33#include "md5.h"
34#include "multicast.h"
35#include "neighbors.h"
36#include "refresh.h"
37#include "rfc1738.h"
38#include "SquidConfig.h"
39#include "StatCounters.h"
40#include "Store.h"
41#include "store_key_md5.h"
42#include "tools.h"
43#include "wordlist.h"
44
45#include <cerrno>
46
47/// a delayed icpUdpSend() call
48class DelayedUdpSend {
49public:
50 Ip::Address address; ///< remote peer (which may not be a cache_peer)
51 icp_common_t *msg = nullptr; ///< ICP message with network byte order fields
52 DelayedUdpSend *next = nullptr; ///< invasive FIFO queue of delayed ICP messages
53 AccessLogEntryPointer ale; ///< sender's master transaction summary
54 struct timeval queue_time = {}; ///< queuing timestamp
55};
56
57static void icpIncomingConnectionOpened(Ipc::StartListeningAnswer &);
58
59/// \ingroup ServerProtocolICPInternal2
60static void icpLogIcp(const Ip::Address &, const LogTags_ot, int, const char *, const int, AccessLogEntryPointer &);
61
62/// \ingroup ServerProtocolICPInternal2
63static void icpHandleIcpV2(int, Ip::Address &, char *, int);
64
65/// \ingroup ServerProtocolICPInternal2
66static void icpCount(void *, int, size_t, int);
67
68static LogTags_ot icpLogFromICPCode(icp_opcode);
69
70static int icpUdpSend(int fd, const Ip::Address &to, icp_common_t * msg, int delay, AccessLogEntryPointer al);
71
72static void
73icpSyncAle(AccessLogEntryPointer &al, const Ip::Address &caddr, const char *url, int len, int delay)
74{
75 if (!al)
76 al = new AccessLogEntry();
77 al->icp.opcode = ICP_QUERY;
78 al->cache.caddr = caddr;
79 al->url = url;
80 al->setVirginUrlForMissingRequest(al->url);
81 // XXX: move to use icp.clientReply instead
82 al->http.clientReplySz.payloadData = len;
83 al->cache.start_time = current_time;
84 al->cache.start_time.tv_sec -= delay;
85 al->cache.trTime.tv_sec = delay;
86 al->cache.trTime.tv_usec = 0;
87}
88
89/**
90 \ingroup ServerProtocolICPInternal2
91 * IcpQueueHead is global so comm_incoming() knows whether or not
92 * to call icpUdpSendQueue.
93 */
94static DelayedUdpSend *IcpQueueHead = nullptr;
95/// \ingroup ServerProtocolICPInternal2
96static DelayedUdpSend *IcpQueueTail = nullptr;
97
98/// \ingroup ServerProtocolICPInternal2
99Comm::ConnectionPointer icpIncomingConn = nullptr;
100/// \ingroup ServerProtocolICPInternal2
101Comm::ConnectionPointer icpOutgoingConn = nullptr;
102
103/* icp_common_t */
104icp_common_t::icp_common_t() :
105 opcode(ICP_INVALID), version(0), length(0), reqnum(0),
106 flags(0), pad(0), shostid(0)
107{}
108
109icp_common_t::icp_common_t(char *buf, unsigned int len) :
110 opcode(ICP_INVALID), version(0), reqnum(0), flags(0), pad(0), shostid(0)
111{
112 if (len < sizeof(icp_common_t)) {
113 /* mark as invalid */
114 length = len + 1;
115 return;
116 }
117
118 memcpy(this, buf, sizeof(icp_common_t));
119 /*
120 * Convert network order sensitive fields
121 */
122 length = ntohs(length);
123 reqnum = ntohl(reqnum);
124 flags = ntohl(flags);
125 pad = ntohl(pad);
126}
127
128icp_opcode
129icp_common_t::getOpCode() const
130{
131 if (opcode > static_cast<char>(icp_opcode::ICP_END))
132 return ICP_INVALID;
133
134 return static_cast<icp_opcode>(opcode);
135}
136
137/* ICPState */
138
139ICPState::ICPState(icp_common_t &aHeader, HttpRequest *aRequest):
140 header(aHeader),
141 request(aRequest),
142 fd(-1),
143 url(nullptr)
144{
145 HTTPMSGLOCK(request);
146}
147
148ICPState::~ICPState()
149{
150 safe_free(url);
151 HTTPMSGUNLOCK(request);
152}
153
154bool
155ICPState::isHit() const
156{
157 const auto e = storeGetPublic(url, Http::METHOD_GET);
158
159 const auto hit = e && confirmAndPrepHit(*e);
160
161 if (e)
162 e->abandon(__func__);
163
164 return hit;
165}
166
167bool
168ICPState::confirmAndPrepHit(const StoreEntry &e) const
169{
170 if (!e.validToSend())
171 return false;
172
173 if (e.hittingRequiresCollapsing() && !startCollapsingOn(e, false))
174 return false;
175
176 if (!Config.onoff.icp_hit_stale && !didCollapse && refreshCheckICP(&e, request))
177 return false;
178
179 return true;
180}
181
182LogTags *
183ICPState::loggingTags() const
184{
185 // calling icpSyncAle(LOG_TAG_NONE) here would not change cache.code
186 if (!al)
187 al = new AccessLogEntry();
188 return &al->cache.code;
189}
190
191void
192ICPState::fillChecklist(ACLFilledChecklist &checklist) const
193{
194 checklist.setRequest(request);
195 icpSyncAle(al, from, url, 0, 0);
196 checklist.al = al;
197}
198
199/* End ICPState */
200
201/* ICP2State */
202
203/// \ingroup ServerProtocolICPInternal2
204class ICP2State: public ICPState
205{
206
207public:
208 ICP2State(icp_common_t & aHeader, HttpRequest *aRequest):
209 ICPState(aHeader, aRequest),rtt(0),src_rtt(0),flags(0) {}
210
211 ~ICP2State() override;
212
213 int rtt;
214 int src_rtt;
215 uint32_t flags;
216};
217
218ICP2State::~ICP2State()
219{}
220
221/* End ICP2State */
222
223/// updates ALE (if any) and logs the transaction (if needed)
224static void
225icpLogIcp(const Ip::Address &caddr, const LogTags_ot logcode, const int len, const char *url, int delay, AccessLogEntry::Pointer &al)
226{
227 assert(logcode != LOG_TAG_NONE);
228
229 // Optimization: No premature (ALE creation in) icpSyncAle().
230 if (al) {
231 icpSyncAle(al, caddr, url, len, delay);
232 al->cache.code.update(logcode);
233 }
234
235 if (logcode == LOG_ICP_QUERY)
236 return; // we never log queries
237
238 if (!Config.onoff.log_udp) {
239 clientdbUpdate(caddr, al ? al->cache.code : LogTags(logcode), AnyP::PROTO_ICP, len);
240 return;
241 }
242
243 if (!al) {
244 // The above attempt to optimize ALE creation has failed. We do need it.
245 icpSyncAle(al, caddr, url, len, delay);
246 al->cache.code.update(logcode);
247 }
248 clientdbUpdate(caddr, al->cache.code, AnyP::PROTO_ICP, len);
249 accessLogLog(al, nullptr);
250}
251
252/// \ingroup ServerProtocolICPInternal2
253static void
254icpUdpSendQueue(int fd, void *)
255{
256 DelayedUdpSend *q;
257
258 while ((q = IcpQueueHead) != nullptr) {
259 int delay = tvSubUsec(q->queue_time, current_time);
260 /* increment delay to prevent looping */
261 const int x = icpUdpSend(fd, q->address, q->msg, ++delay, q->ale);
262 IcpQueueHead = q->next;
263 delete q;
264
265 if (x < 0)
266 break;
267 }
268}
269
270icp_common_t *
271icp_common_t::CreateMessage(
272 icp_opcode opcode,
273 int flags,
274 const char *url,
275 int reqnum,
276 int pad)
277{
278 char *buf = nullptr;
279 icp_common_t *headerp = nullptr;
280 char *urloffset = nullptr;
281 int buf_len;
282 buf_len = sizeof(icp_common_t) + strlen(url) + 1;
283
284 if (opcode == ICP_QUERY)
285 buf_len += sizeof(uint32_t);
286
287 buf = (char *) xcalloc(buf_len, 1);
288
289 headerp = (icp_common_t *) (void *) buf;
290
291 headerp->opcode = (char) opcode;
292
293 headerp->version = ICP_VERSION_CURRENT;
294
295 headerp->length = (uint16_t) htons(buf_len);
296
297 headerp->reqnum = htonl(reqnum);
298
299 headerp->flags = htonl(flags);
300
301 headerp->pad = htonl(pad);
302
303 headerp->shostid = 0;
304
305 urloffset = buf + sizeof(icp_common_t);
306
307 if (opcode == ICP_QUERY)
308 urloffset += sizeof(uint32_t);
309
310 memcpy(urloffset, url, strlen(url));
311
312 return (icp_common_t *)buf;
313}
314
315// TODO: Move retries to icpCreateAndSend(); the other caller does not retry.
316/// writes the given UDP msg to the socket; queues a retry on the first failure
317/// \returns a negative number on failures
318static int
319icpUdpSend(int fd,
320 const Ip::Address &to,
321 icp_common_t * msg,
322 int delay,
323 AccessLogEntryPointer al)
324{
325 int x;
326 int len;
327 len = (int) ntohs(msg->length);
328 debugs(12, 5, "icpUdpSend: FD " << fd << " sending " <<
329 icp_opcode_str[msg->opcode] << ", " << len << " bytes to " << to);
330
331 x = comm_udp_sendto(fd, to, msg, len);
332
333 if (x >= 0) {
334 /* successfully written */
335 const auto logcode = icpLogFromICPCode(static_cast<icp_opcode>(msg->opcode));
336 icpLogIcp(to, logcode, len, (char *) (msg + 1), delay, al);
337 icpCount(msg, SENT, (size_t) len, delay);
338 safe_free(msg);
339 } else if (0 == delay) {
340 /* send failed, but queue it */
341 const auto queue = new DelayedUdpSend();
342 queue->address = to;
343 queue->msg = msg;
344 queue->queue_time = current_time;
345 queue->ale = al;
346
347 if (IcpQueueHead == nullptr) {
348 IcpQueueHead = queue;
349 IcpQueueTail = queue;
350 } else if (IcpQueueTail == IcpQueueHead) {
351 IcpQueueTail = queue;
352 IcpQueueHead->next = queue;
353 } else {
354 IcpQueueTail->next = queue;
355 IcpQueueTail = queue;
356 }
357
358 Comm::SetSelect(fd, COMM_SELECT_WRITE, icpUdpSendQueue, nullptr, 0);
359 ++statCounter.icp.replies_queued;
360 } else {
361 /* don't queue it */
362 // XXX: safe_free(msg)
363 ++statCounter.icp.replies_dropped;
364 }
365
366 return x;
367}
368
369/**
370 * This routine selects an ICP opcode for ICP misses.
371 *
372 \retval ICP_ERR no opcode selected here
373 \retval ICP_MISS_NOFETCH store is rebuilding, no fetch is possible yet
374 */
375icp_opcode
376icpGetCommonOpcode()
377{
378 /* if store is rebuilding, return a UDP_MISS_NOFETCH */
379
380 if ((StoreController::store_dirs_rebuilding && opt_reload_hit_only) ||
381 hit_only_mode_until > squid_curtime) {
382 return ICP_MISS_NOFETCH;
383 }
384
385 return ICP_ERR;
386}
387
388static LogTags_ot
389icpLogFromICPCode(icp_opcode opcode)
390{
391 if (opcode == ICP_ERR)
392 return LOG_UDP_INVALID;
393
394 if (opcode == ICP_DENIED)
395 return LOG_UDP_DENIED;
396
397 if (opcode == ICP_HIT)
398 return LOG_UDP_HIT;
399
400 if (opcode == ICP_MISS)
401 return LOG_UDP_MISS;
402
403 if (opcode == ICP_MISS_NOFETCH)
404 return LOG_UDP_MISS_NOFETCH;
405
406 if (opcode == ICP_DECHO)
407 return LOG_ICP_QUERY;
408
409 if (opcode == ICP_QUERY)
410 return LOG_ICP_QUERY;
411
412 fatal("expected ICP opcode\n");
413
414 return LOG_UDP_INVALID;
415}
416
417void
418icpCreateAndSend(icp_opcode opcode, int flags, char const *url, int reqnum, int pad, int fd, const Ip::Address &from, AccessLogEntry::Pointer al)
419{
420 // update potentially shared ALE ASAP; the ICP query itself may be delayed
421 if (al)
422 al->cache.code.update(icpLogFromICPCode(opcode));
423 icp_common_t *reply = icp_common_t::CreateMessage(opcode, flags, url, reqnum, pad);
424 icpUdpSend(fd, from, reply, 0, al);
425}
426
427void
428icpDenyAccess(Ip::Address &from, char *url, int reqnum, int fd)
429{
430 if (clientdbCutoffDenied(from)) {
431 /*
432 * count this DENIED query in the clientdb, even though
433 * we're not sending an ICP reply...
434 */
435 clientdbUpdate(from, LogTags(LOG_UDP_DENIED), AnyP::PROTO_ICP, 0);
436 } else {
437 icpCreateAndSend(ICP_DENIED, 0, url, reqnum, 0, fd, from, nullptr);
438 }
439}
440
441bool
442icpAccessAllowed(Ip::Address &from, HttpRequest * icp_request)
443{
444 if (!Config.accessList.icp) {
445 debugs(12, 2, "Access Denied due to lack of ICP access rules.");
446 return false;
447 }
448
449 ACLFilledChecklist checklist(Config.accessList.icp, icp_request);
450 checklist.src_addr = from;
451 checklist.my_addr.setNoAddr();
452 const auto &answer = checklist.fastCheck();
453 if (answer.allowed())
454 return true;
455
456 debugs(12, 2, "Access Denied for " << from << " by " << answer.lastCheckDescription() << ".");
457 return false;
458}
459
460HttpRequest *
461icpGetRequest(char *url, int reqnum, int fd, Ip::Address &from)
462{
463 if (strpbrk(url, w_space)) {
464 url = rfc1738_escape(url);
465 icpCreateAndSend(ICP_ERR, 0, rfc1738_escape(url), reqnum, 0, fd, from, nullptr);
466 return nullptr;
467 }
468
469 const auto mx = MasterXaction::MakePortless<XactionInitiator::initIcp>();
470 auto *result = HttpRequest::FromUrlXXX(url, mx);
471 if (!result)
472 icpCreateAndSend(ICP_ERR, 0, url, reqnum, 0, fd, from, nullptr);
473
474 return result;
475
476}
477
478static void
479doV2Query(int fd, Ip::Address &from, char *buf, icp_common_t header)
480{
481 int rtt = 0;
482 int src_rtt = 0;
483 uint32_t flags = 0;
484 /* We have a valid packet */
485 char *url = buf + sizeof(icp_common_t) + sizeof(uint32_t);
486 HttpRequest *icp_request = icpGetRequest(url, header.reqnum, fd, from);
487
488 if (!icp_request)
489 return;
490
491 HTTPMSGLOCK(icp_request);
492
493 if (!icpAccessAllowed(from, icp_request)) {
494 icpDenyAccess(from, url, header.reqnum, fd);
495 HTTPMSGUNLOCK(icp_request);
496 return;
497 }
498#if USE_ICMP
499 if (header.flags & ICP_FLAG_SRC_RTT) {
500 rtt = netdbHostRtt(icp_request->url.host());
501 int hops = netdbHostHops(icp_request->url.host());
502 src_rtt = ((hops & 0xFFFF) << 16) | (rtt & 0xFFFF);
503
504 if (rtt)
505 flags |= ICP_FLAG_SRC_RTT;
506 }
507#endif /* USE_ICMP */
508
509 /* The peer is allowed to use this cache */
510 ICP2State state(header, icp_request);
511 state.fd = fd;
512 state.from = from;
513 state.url = xstrdup(url);
514 state.flags = flags;
515 state.rtt = rtt;
516 state.src_rtt = src_rtt;
517
518 icp_opcode codeToSend;
519
520 if (state.isHit()) {
521 codeToSend = ICP_HIT;
522 } else {
523#if USE_ICMP
524 if (Config.onoff.test_reachability && state.rtt == 0) {
525 if ((state.rtt = netdbHostRtt(state.request->url.host())) == 0)
526 netdbPingSite(state.request->url.host());
527 }
528#endif /* USE_ICMP */
529
530 if (icpGetCommonOpcode() != ICP_ERR)
531 codeToSend = icpGetCommonOpcode();
532 else if (Config.onoff.test_reachability && rtt == 0)
533 codeToSend = ICP_MISS_NOFETCH;
534 else
535 codeToSend = ICP_MISS;
536 }
537
538 icpCreateAndSend(codeToSend, flags, url, header.reqnum, src_rtt, fd, from, state.al);
539
540 HTTPMSGUNLOCK(icp_request);
541}
542
543void
544icp_common_t::handleReply(char *buf, Ip::Address &from)
545{
546 if (neighbors_do_private_keys && reqnum == 0) {
547 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: Neighbor " << from << " returned reqnum = 0");
548 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: Disabling use of private keys");
549 neighbors_do_private_keys = 0;
550 }
551
552 char *url = buf + sizeof(icp_common_t);
553 debugs(12, 3, "icpHandleIcpV2: " << icp_opcode_str[opcode] << " from " << from << " for '" << url << "'");
554
555 const cache_key *key = icpGetCacheKey(url, (int) reqnum);
556 /* call neighborsUdpAck even if ping_status != PING_WAITING */
557 neighborsUdpAck(key, this, from);
558}
559
560static void
561icpHandleIcpV2(int fd, Ip::Address &from, char *buf, int len)
562{
563 if (len <= 0) {
564 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
565 return;
566 }
567
568 icp_common_t header(buf, len);
569 /*
570 * Length field should match the number of bytes read
571 */
572
573 if (len != header.length) {
574 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
575 return;
576 }
577
578 debugs(12, 5, "OPCODE " << icp_opcode_str[header.getOpCode()] << '=' << uint8_t(header.opcode));
579
580 switch (header.opcode) {
581
582 case ICP_QUERY:
583 /* We have a valid packet */
584 doV2Query(fd, from, buf, header);
585 break;
586
587 case ICP_HIT:
588
589 case ICP_DECHO:
590
591 case ICP_MISS:
592
593 case ICP_DENIED:
594
595 case ICP_MISS_NOFETCH:
596 header.handleReply(buf, from);
597 break;
598
599 case ICP_INVALID:
600
601 case ICP_ERR:
602 break;
603
604 default:
605 debugs(12, DBG_CRITICAL, "ERROR: icpHandleIcpV2: Unknown opcode: " << header.opcode << " from " << from);
606
607 break;
608 }
609}
610
611void
612icpHandleUdp(int sock, void *)
613{
614 int *N = &incoming_sockets_accepted;
615
616 Ip::Address from;
617 LOCAL_ARRAY(char, buf, SQUID_UDP_SO_RCVBUF);
618 int len;
619 int icp_version;
620 int max = INCOMING_UDP_MAX;
621 Comm::SetSelect(sock, COMM_SELECT_READ, icpHandleUdp, nullptr, 0);
622
623 while (max) {
624 --max;
625 len = comm_udp_recvfrom(sock,
626 buf,
627 SQUID_UDP_SO_RCVBUF - 1,
628 0,
629 from);
630
631 if (len == 0)
632 break;
633
634 if (len < 0) {
635 int xerrno = errno;
636 if (ignoreErrno(xerrno))
637 break;
638
639#if _SQUID_LINUX_
640 /* Some Linux systems seem to set the FD for reading and then
641 * return ECONNREFUSED when sendto() fails and generates an ICMP
642 * port unreachable message. */
643 /* or maybe an EHOSTUNREACH "No route to host" message */
644 if (xerrno != ECONNREFUSED && xerrno != EHOSTUNREACH)
645#endif
646 debugs(50, DBG_IMPORTANT, "icpHandleUdp: FD " << sock << " recvfrom: " << xstrerr(xerrno));
647
648 break;
649 }
650
651 ++(*N);
652 icpCount(buf, RECV, (size_t) len, 0);
653 buf[len] = '\0';
654 debugs(12, 4, "icpHandleUdp: FD " << sock << ": received " <<
655 (unsigned long int)len << " bytes from " << from);
656
657 if ((size_t) len < sizeof(icp_common_t)) {
658 debugs(12, 4, "icpHandleUdp: Ignoring too-small UDP packet");
659 break;
660 }
661
662 icp_version = (int) buf[1]; /* cheat! */
663
664 if (icpOutgoingConn->local == from)
665 // ignore ICP packets which loop back (multicast usually)
666 debugs(12, 4, "icpHandleUdp: Ignoring UDP packet sent by myself");
667 else if (icp_version == ICP_VERSION_2)
668 icpHandleIcpV2(sock, from, buf, len);
669 else if (icp_version == ICP_VERSION_3)
670 icpHandleIcpV3(sock, from, buf, len);
671 else
672 debugs(12, DBG_IMPORTANT, "WARNING: Unused ICP version " << icp_version <<
673 " received from " << from);
674 }
675}
676
677void
678icpOpenPorts(void)
679{
680 uint16_t port;
681
682 if ((port = Config.Port.icp) <= 0)
683 return;
684
685 icpIncomingConn = new Comm::Connection;
686 icpIncomingConn->local = Config.Addrs.udp_incoming;
687 icpIncomingConn->local.port(port);
688
689 if (!Ip::EnableIpv6 && !icpIncomingConn->local.setIPv4()) {
690 debugs(12, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpIncomingConn->local << " is not an IPv4 address.");
691 fatal("ICP port cannot be opened.");
692 }
693 /* split-stack for now requires default IPv4-only ICP */
694 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpIncomingConn->local.isAnyAddr()) {
695 icpIncomingConn->local.setIPv4();
696 }
697
698 auto call = asyncCallbackFun(12, 2, icpIncomingConnectionOpened);
699 Ipc::StartListening(SOCK_DGRAM,
700 IPPROTO_UDP,
701 icpIncomingConn,
702 Ipc::fdnInIcpSocket, call);
703
704 if ( !Config.Addrs.udp_outgoing.isNoAddr() ) {
705 icpOutgoingConn = new Comm::Connection;
706 icpOutgoingConn->local = Config.Addrs.udp_outgoing;
707 icpOutgoingConn->local.port(port);
708
709 if (!Ip::EnableIpv6 && !icpOutgoingConn->local.setIPv4()) {
710 debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpOutgoingConn->local << " is not an IPv4 address.");
711 fatal("ICP port cannot be opened.");
712 }
713 /* split-stack for now requires default IPv4-only ICP */
714 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpOutgoingConn->local.isAnyAddr()) {
715 icpOutgoingConn->local.setIPv4();
716 }
717
718 enter_suid();
719 comm_open_listener(SOCK_DGRAM, IPPROTO_UDP, icpOutgoingConn, "Outgoing ICP Port");
720 leave_suid();
721
722 if (!Comm::IsConnOpen(icpOutgoingConn))
723 fatal("Cannot open Outgoing ICP Port");
724
725 debugs(12, DBG_CRITICAL, "Sending ICP messages from " << icpOutgoingConn->local);
726
727 Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, icpHandleUdp, nullptr, 0);
728 fd_note(icpOutgoingConn->fd, "Outgoing ICP socket");
729 }
730}
731
732static void
733icpIncomingConnectionOpened(Ipc::StartListeningAnswer &answer)
734{
735 const auto &conn = answer.conn;
736
737 if (!Comm::IsConnOpen(conn))
738 fatal("Cannot open ICP Port");
739
740 Comm::SetSelect(conn->fd, COMM_SELECT_READ, icpHandleUdp, nullptr, 0);
741
742 for (const wordlist *s = Config.mcast_group_list; s; s = s->next)
743 ipcache_nbgethostbyname(s->key, mcastJoinGroups, nullptr); // XXX: pass the conn for mcastJoinGroups usage.
744
745 debugs(12, DBG_IMPORTANT, "Accepting ICP messages on " << conn->local);
746
747 fd_note(conn->fd, "Incoming ICP port");
748
749 if (Config.Addrs.udp_outgoing.isNoAddr()) {
750 icpOutgoingConn = conn;
751 debugs(12, DBG_IMPORTANT, "Sending ICP messages from " << icpOutgoingConn->local);
752 }
753}
754
755/**
756 * icpConnectionShutdown only closes the 'in' socket if it is
757 * different than the 'out' socket.
758 */
759void
760icpConnectionShutdown(void)
761{
762 if (!Comm::IsConnOpen(icpIncomingConn))
763 return;
764
765 debugs(12, DBG_IMPORTANT, "Stop receiving ICP on " << icpIncomingConn->local);
766
767 /** Release the 'in' socket for lazy closure.
768 * in and out sockets may be sharing one same FD.
769 * This prevents this function from executing repeatedly.
770 */
771 icpIncomingConn = nullptr;
772
773 /**
774 * Normally we only write to the outgoing ICP socket, but
775 * we also have a read handler there to catch messages sent
776 * to that specific interface. During shutdown, we must
777 * disable reading on the outgoing socket.
778 */
779 assert(Comm::IsConnOpen(icpOutgoingConn));
780
781 Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, nullptr, nullptr, 0);
782}
783
784void
785icpClosePorts(void)
786{
787 icpConnectionShutdown();
788
789 if (icpOutgoingConn != nullptr) {
790 debugs(12, DBG_IMPORTANT, "Stop sending ICP from " << icpOutgoingConn->local);
791 icpOutgoingConn = nullptr;
792 }
793}
794
795static void
796icpCount(void *buf, int which, size_t len, int delay)
797{
798 icp_common_t *icp = (icp_common_t *) buf;
799
800 if (len < sizeof(*icp))
801 return;
802
803 if (SENT == which) {
804 ++statCounter.icp.pkts_sent;
805 statCounter.icp.kbytes_sent += len;
806
807 if (ICP_QUERY == icp->opcode) {
808 ++statCounter.icp.queries_sent;
809 statCounter.icp.q_kbytes_sent += len;
810 } else {
811 ++statCounter.icp.replies_sent;
812 statCounter.icp.r_kbytes_sent += len;
813 /* this is the sent-reply service time */
814 statCounter.icp.replySvcTime.count(delay);
815 }
816
817 if (ICP_HIT == icp->opcode)
818 ++statCounter.icp.hits_sent;
819 } else if (RECV == which) {
820 ++statCounter.icp.pkts_recv;
821 statCounter.icp.kbytes_recv += len;
822
823 if (ICP_QUERY == icp->opcode) {
824 ++statCounter.icp.queries_recv;
825 statCounter.icp.q_kbytes_recv += len;
826 } else {
827 ++statCounter.icp.replies_recv;
828 statCounter.icp.r_kbytes_recv += len;
829 /* statCounter.icp.querySvcTime set in clientUpdateCounters */
830 }
831
832 if (ICP_HIT == icp->opcode)
833 ++statCounter.icp.hits_recv;
834 }
835}
836
837#define N_QUERIED_KEYS 8192
838#define N_QUERIED_KEYS_MASK 8191
839static cache_key queried_keys[N_QUERIED_KEYS][SQUID_MD5_DIGEST_LENGTH];
840
841int
842icpSetCacheKey(const cache_key * key)
843{
844 static int reqnum = 0;
845
846 if (++reqnum < 0)
847 reqnum = 1;
848
849 storeKeyCopy(queried_keys[reqnum & N_QUERIED_KEYS_MASK], key);
850
851 return reqnum;
852}
853
854const cache_key *
855icpGetCacheKey(const char *url, int reqnum)
856{
857 if (neighbors_do_private_keys && reqnum)
858 return queried_keys[reqnum & N_QUERIED_KEYS_MASK];
859
860 return storeKeyPublic(url, Http::METHOD_GET);
861}
862