2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 12 Internet Cache Protocol (ICP) */
12 \defgroup ServerProtocolICPInternal2 ICPv2 Internals
13 \ingroup ServerProtocolICPAPI
17 #include "AccessLogEntry.h"
19 #include "acl/FilledChecklist.h"
20 #include "client_db.h"
22 #include "comm/Connection.h"
23 #include "comm/Loops.h"
24 #include "comm/UdpOpenDialer.h"
26 #include "HttpRequest.h"
27 #include "icmp/net_db.h"
29 #include "ip/Address.h"
33 #include "multicast.h"
34 #include "neighbors.h"
37 #include "SquidConfig.h"
38 #include "SquidTime.h"
39 #include "StatCounters.h"
41 #include "store_key_md5.h"
46 // for tvSubUsec() which should be in SquidTime.h
51 static void icpIncomingConnectionOpened(const Comm::ConnectionPointer
&conn
, int errNo
);
53 /// \ingroup ServerProtocolICPInternal2
54 static void icpLogIcp(const Ip::Address
&, LogTags
, int, const char *, int);
56 /// \ingroup ServerProtocolICPInternal2
57 static void icpHandleIcpV2(int, Ip::Address
&, char *, int);
59 /// \ingroup ServerProtocolICPInternal2
60 static void icpCount(void *, int, size_t, int);
63 \ingroup ServerProtocolICPInternal2
64 * IcpQueueHead is global so comm_incoming() knows whether or not
65 * to call icpUdpSendQueue.
67 static icpUdpData
*IcpQueueHead
= NULL
;
68 /// \ingroup ServerProtocolICPInternal2
69 static icpUdpData
*IcpQueueTail
= NULL
;
71 /// \ingroup ServerProtocolICPInternal2
72 Comm::ConnectionPointer icpIncomingConn
= NULL
;
73 /// \ingroup ServerProtocolICPInternal2
74 Comm::ConnectionPointer icpOutgoingConn
= NULL
;
77 _icp_common_t::_icp_common_t() :
78 opcode(ICP_INVALID
), version(0), length(0), reqnum(0),
79 flags(0), pad(0), shostid(0)
82 _icp_common_t::_icp_common_t(char *buf
, unsigned int len
) :
83 opcode(ICP_INVALID
), version(0), reqnum(0), flags(0), pad(0), shostid(0)
85 if (len
< sizeof(_icp_common_t
)) {
91 memcpy(this, buf
, sizeof(icp_common_t
));
93 * Convert network order sensitive fields
95 length
= ntohs(length
);
96 reqnum
= ntohl(reqnum
);
102 _icp_common_t::getOpCode() const
104 if (opcode
> (char)ICP_END
)
107 return (icp_opcode
)opcode
;
112 ICPState::ICPState(icp_common_t
&aHeader
, HttpRequest
*aRequest
):
118 HTTPMSGLOCK(request
);
121 ICPState::~ICPState()
124 HTTPMSGUNLOCK(request
);
131 /// \ingroup ServerProtocolICPInternal2
132 class ICP2State
: public ICPState
, public StoreClient
136 ICP2State(icp_common_t
& aHeader
, HttpRequest
*aRequest
):
137 ICPState(aHeader
, aRequest
),rtt(0),src_rtt(0),flags(0) {}
140 void created(StoreEntry
* newEntry
);
147 ICP2State::~ICP2State()
151 ICP2State::created(StoreEntry
*newEntry
)
153 StoreEntry
*entry
= newEntry
->isNull () ? NULL
: newEntry
;
154 debugs(12, 5, "icpHandleIcpV2: OPCODE " << icp_opcode_str
[header
.opcode
]);
155 icp_opcode codeToSend
;
157 if (icpCheckUdpHit(entry
, request
)) {
158 codeToSend
= ICP_HIT
;
161 if (Config
.onoff
.test_reachability
&& rtt
== 0) {
162 if ((rtt
= netdbHostRtt(request
->GetHost())) == 0)
163 netdbPingSite(request
->GetHost());
165 #endif /* USE_ICMP */
167 if (icpGetCommonOpcode() != ICP_ERR
)
168 codeToSend
= icpGetCommonOpcode();
169 else if (Config
.onoff
.test_reachability
&& rtt
== 0)
170 codeToSend
= ICP_MISS_NOFETCH
;
172 codeToSend
= ICP_MISS
;
175 icpCreateAndSend(codeToSend
, flags
, url
, header
.reqnum
, src_rtt
, fd
, from
);
181 /// \ingroup ServerProtocolICPInternal2
183 icpLogIcp(const Ip::Address
&caddr
, LogTags logcode
, int len
, const char *url
, int delay
)
185 AccessLogEntry::Pointer al
= new AccessLogEntry();
187 if (LOG_TAG_NONE
== logcode
)
190 if (LOG_ICP_QUERY
== logcode
)
193 clientdbUpdate(caddr
, logcode
, AnyP::PROTO_ICP
, len
);
195 if (!Config
.onoff
.log_udp
)
198 al
->icp
.opcode
= ICP_QUERY
;
202 al
->cache
.caddr
= caddr
;
204 // XXX: move to use icp.clientReply instead
205 al
->http
.clientReplySz
.payloadData
= len
;
207 al
->cache
.code
= logcode
;
209 al
->cache
.trTime
.tv_sec
= delay
;
211 accessLogLog(al
, NULL
);
214 /// \ingroup ServerProtocolICPInternal2
216 icpUdpSendQueue(int fd
, void *)
220 while ((q
= IcpQueueHead
) != NULL
) {
221 int delay
= tvSubUsec(q
->queue_time
, current_time
);
222 /* increment delay to prevent looping */
223 const int x
= icpUdpSend(fd
, q
->address
, (icp_common_t
*) q
->msg
, q
->logcode
, ++delay
);
224 IcpQueueHead
= q
->next
;
233 _icp_common_t::createMessage(
241 icp_common_t
*headerp
= NULL
;
242 char *urloffset
= NULL
;
244 buf_len
= sizeof(icp_common_t
) + strlen(url
) + 1;
246 if (opcode
== ICP_QUERY
)
247 buf_len
+= sizeof(uint32_t);
249 buf
= (char *) xcalloc(buf_len
, 1);
251 headerp
= (icp_common_t
*) (void *) buf
;
253 headerp
->opcode
= (char) opcode
;
255 headerp
->version
= ICP_VERSION_CURRENT
;
257 headerp
->length
= (uint16_t) htons(buf_len
);
259 headerp
->reqnum
= htonl(reqnum
);
261 headerp
->flags
= htonl(flags
);
263 headerp
->pad
= htonl(pad
);
265 headerp
->shostid
= 0;
267 urloffset
= buf
+ sizeof(icp_common_t
);
269 if (opcode
== ICP_QUERY
)
270 urloffset
+= sizeof(uint32_t);
272 memcpy(urloffset
, url
, strlen(url
));
274 return (icp_common_t
*)buf
;
279 const Ip::Address
&to
,
287 len
= (int) ntohs(msg
->length
);
288 debugs(12, 5, "icpUdpSend: FD " << fd
<< " sending " <<
289 icp_opcode_str
[msg
->opcode
] << ", " << len
<< " bytes to " << to
);
291 x
= comm_udp_sendto(fd
, to
, msg
, len
);
294 /* successfully written */
295 icpLogIcp(to
, logcode
, len
, (char *) (msg
+ 1), delay
);
296 icpCount(msg
, SENT
, (size_t) len
, delay
);
298 } else if (0 == delay
) {
299 /* send failed, but queue it */
300 queue
= (icpUdpData
*) xcalloc(1, sizeof(icpUdpData
));
303 queue
->len
= (int) ntohs(msg
->length
);
304 queue
->queue_time
= current_time
;
305 queue
->logcode
= logcode
;
307 if (IcpQueueHead
== NULL
) {
308 IcpQueueHead
= queue
;
309 IcpQueueTail
= queue
;
310 } else if (IcpQueueTail
== IcpQueueHead
) {
311 IcpQueueTail
= queue
;
312 IcpQueueHead
->next
= queue
;
314 IcpQueueTail
->next
= queue
;
315 IcpQueueTail
= queue
;
318 Comm::SetSelect(fd
, COMM_SELECT_WRITE
, icpUdpSendQueue
, NULL
, 0);
319 ++statCounter
.icp
.replies_queued
;
322 ++statCounter
.icp
.replies_dropped
;
329 icpCheckUdpHit(StoreEntry
* e
, HttpRequest
* request
)
334 if (!e
->validToSend())
337 if (Config
.onoff
.icp_hit_stale
)
340 if (refreshCheckICP(e
, request
))
347 * This routine selects an ICP opcode for ICP misses.
349 \retval ICP_ERR no opcode selected here
350 \retval ICP_MISS_NOFETCH store is rebuilding, no fetch is possible yet
355 /* if store is rebuilding, return a UDP_MISS_NOFETCH */
357 if ((StoreController::store_dirs_rebuilding
&& opt_reload_hit_only
) ||
358 hit_only_mode_until
> squid_curtime
) {
359 return ICP_MISS_NOFETCH
;
366 icpLogFromICPCode(icp_opcode opcode
)
368 if (opcode
== ICP_ERR
)
369 return LOG_UDP_INVALID
;
371 if (opcode
== ICP_DENIED
)
372 return LOG_UDP_DENIED
;
374 if (opcode
== ICP_HIT
)
377 if (opcode
== ICP_MISS
)
380 if (opcode
== ICP_MISS_NOFETCH
)
381 return LOG_UDP_MISS_NOFETCH
;
383 fatal("expected ICP opcode\n");
385 return LOG_UDP_INVALID
;
389 icpCreateAndSend(icp_opcode opcode
, int flags
, char const *url
, int reqnum
, int pad
, int fd
, const Ip::Address
&from
)
391 icp_common_t
*reply
= _icp_common_t::createMessage(opcode
, flags
, url
, reqnum
, pad
);
392 icpUdpSend(fd
, from
, reply
, icpLogFromICPCode(opcode
), 0);
396 icpDenyAccess(Ip::Address
&from
, char *url
, int reqnum
, int fd
)
398 debugs(12, 2, "icpDenyAccess: Access Denied for " << from
<< " by " << AclMatchedName
<< ".");
400 if (clientdbCutoffDenied(from
)) {
402 * count this DENIED query in the clientdb, even though
403 * we're not sending an ICP reply...
405 clientdbUpdate(from
, LOG_UDP_DENIED
, AnyP::PROTO_ICP
, 0);
407 icpCreateAndSend(ICP_DENIED
, 0, url
, reqnum
, 0, fd
, from
);
412 icpAccessAllowed(Ip::Address
&from
, HttpRequest
* icp_request
)
414 /* absent any explicit rules, we deny all */
415 if (!Config
.accessList
.icp
)
418 ACLFilledChecklist
checklist(Config
.accessList
.icp
, icp_request
, NULL
);
419 checklist
.src_addr
= from
;
420 checklist
.my_addr
.setNoAddr();
421 return (checklist
.fastCheck() == ACCESS_ALLOWED
);
425 icpGetUrlToSend(char *url
)
427 if (strpbrk(url
, w_space
))
428 return rfc1738_escape(url
);
434 icpGetRequest(char *url
, int reqnum
, int fd
, Ip::Address
&from
)
436 if (strpbrk(url
, w_space
)) {
437 url
= rfc1738_escape(url
);
438 icpCreateAndSend(ICP_ERR
, 0, rfc1738_escape(url
), reqnum
, 0, fd
, from
);
444 if ((result
= HttpRequest::CreateFromUrl(url
)) == NULL
)
445 icpCreateAndSend(ICP_ERR
, 0, url
, reqnum
, 0, fd
, from
);
452 doV2Query(int fd
, Ip::Address
&from
, char *buf
, icp_common_t header
)
457 /* We have a valid packet */
458 char *url
= buf
+ sizeof(icp_common_t
) + sizeof(uint32_t);
459 HttpRequest
*icp_request
= icpGetRequest(url
, header
.reqnum
, fd
, from
);
464 HTTPMSGLOCK(icp_request
);
466 if (!icpAccessAllowed(from
, icp_request
)) {
467 icpDenyAccess(from
, url
, header
.reqnum
, fd
);
468 HTTPMSGUNLOCK(icp_request
);
472 if (header
.flags
& ICP_FLAG_SRC_RTT
) {
473 rtt
= netdbHostRtt(icp_request
->GetHost());
474 int hops
= netdbHostHops(icp_request
->GetHost());
475 src_rtt
= ((hops
& 0xFFFF) << 16) | (rtt
& 0xFFFF);
478 flags
|= ICP_FLAG_SRC_RTT
;
480 #endif /* USE_ICMP */
482 /* The peer is allowed to use this cache */
483 ICP2State
*state
= new ICP2State(header
, icp_request
);
486 state
->url
= xstrdup(url
);
487 state
->flags
= flags
;
489 state
->src_rtt
= src_rtt
;
491 StoreEntry::getPublic(state
, url
, Http::METHOD_GET
);
493 HTTPMSGUNLOCK(icp_request
);
497 _icp_common_t::handleReply(char *buf
, Ip::Address
&from
)
499 if (neighbors_do_private_keys
&& reqnum
== 0) {
500 debugs(12, DBG_CRITICAL
, "icpHandleIcpV2: Neighbor " << from
<< " returned reqnum = 0");
501 debugs(12, DBG_CRITICAL
, "icpHandleIcpV2: Disabling use of private keys");
502 neighbors_do_private_keys
= 0;
505 char *url
= buf
+ sizeof(icp_common_t
);
506 debugs(12, 3, "icpHandleIcpV2: " << icp_opcode_str
[opcode
] << " from " << from
<< " for '" << url
<< "'");
508 const cache_key
*key
= icpGetCacheKey(url
, (int) reqnum
);
509 /* call neighborsUdpAck even if ping_status != PING_WAITING */
510 neighborsUdpAck(key
, this, from
);
514 icpHandleIcpV2(int fd
, Ip::Address
&from
, char *buf
, int len
)
517 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
521 icp_common_t
header(buf
, len
);
523 * Length field should match the number of bytes read
526 if (len
!= header
.length
) {
527 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
531 switch (header
.opcode
) {
534 /* We have a valid packet */
535 doV2Query(fd
, from
, buf
, header
);
546 case ICP_MISS_NOFETCH
:
547 header
.handleReply(buf
, from
);
556 debugs(12, DBG_CRITICAL
, "icpHandleIcpV2: UNKNOWN OPCODE: " << header
.opcode
<< " from " << from
);
564 icpPktDump(icp_common_t
* pkt
)
568 debugs(12, 9, "opcode: " << std::setw(3) << pkt
->opcode
<< " " << icp_opcode_str
[pkt
->opcode
]);
569 debugs(12, 9, "version: "<< std::left
<< std::setw(8) << pkt
->version
);
570 debugs(12, 9, "length: "<< std::left
<< std::setw(8) << ntohs(pkt
->length
));
571 debugs(12, 9, "reqnum: "<< std::left
<< std::setw(8) << ntohl(pkt
->reqnum
));
572 debugs(12, 9, "flags: "<< std::left
<< std::hex
<< std::setw(8) << ntohl(pkt
->flags
));
573 a
= (struct in_addr
)pkt
->shostid
;
574 debugs(12, 9, "shostid: " << a
);
575 debugs(12, 9, "payload: " << (char *) pkt
+ sizeof(icp_common_t
));
581 icpHandleUdp(int sock
, void *)
583 int *N
= &incoming_sockets_accepted
;
586 LOCAL_ARRAY(char, buf
, SQUID_UDP_SO_RCVBUF
);
589 int max
= INCOMING_UDP_MAX
;
590 Comm::SetSelect(sock
, COMM_SELECT_READ
, icpHandleUdp
, NULL
, 0);
594 len
= comm_udp_recvfrom(sock
,
596 SQUID_UDP_SO_RCVBUF
- 1,
604 if (ignoreErrno(errno
))
608 /* Some Linux systems seem to set the FD for reading and then
609 * return ECONNREFUSED when sendto() fails and generates an ICMP
610 * port unreachable message. */
611 /* or maybe an EHOSTUNREACH "No route to host" message */
612 if (errno
!= ECONNREFUSED
&& errno
!= EHOSTUNREACH
)
615 debugs(50, DBG_IMPORTANT
, "icpHandleUdp: FD " << sock
<< " recvfrom: " << xstrerror());
621 icpCount(buf
, RECV
, (size_t) len
, 0);
623 debugs(12, 4, "icpHandleUdp: FD " << sock
<< ": received " <<
624 (unsigned long int)len
<< " bytes from " << from
);
626 #ifdef ICP_PACKET_DUMP
631 if ((size_t) len
< sizeof(icp_common_t
)) {
632 debugs(12, 4, "icpHandleUdp: Ignoring too-small UDP packet");
636 icp_version
= (int) buf
[1]; /* cheat! */
638 if (icpOutgoingConn
->local
== from
)
639 // ignore ICP packets which loop back (multicast usually)
640 debugs(12, 4, "icpHandleUdp: Ignoring UDP packet sent by myself");
641 else if (icp_version
== ICP_VERSION_2
)
642 icpHandleIcpV2(sock
, from
, buf
, len
);
643 else if (icp_version
== ICP_VERSION_3
)
644 icpHandleIcpV3(sock
, from
, buf
, len
);
646 debugs(12, DBG_IMPORTANT
, "WARNING: Unused ICP version " << icp_version
<<
647 " received from " << from
);
656 if ((port
= Config
.Port
.icp
) <= 0)
659 icpIncomingConn
= new Comm::Connection
;
660 icpIncomingConn
->local
= Config
.Addrs
.udp_incoming
;
661 icpIncomingConn
->local
.port(port
);
663 if (!Ip::EnableIpv6
&& !icpIncomingConn
->local
.setIPv4()) {
664 debugs(12, DBG_CRITICAL
, "ERROR: IPv6 is disabled. " << icpIncomingConn
->local
<< " is not an IPv4 address.");
665 fatal("ICP port cannot be opened.");
667 /* split-stack for now requires default IPv4-only ICP */
668 if (Ip::EnableIpv6
&IPV6_SPECIAL_SPLITSTACK
&& icpIncomingConn
->local
.isAnyAddr()) {
669 icpIncomingConn
->local
.setIPv4();
672 AsyncCall::Pointer call
= asyncCall(12, 2,
673 "icpIncomingConnectionOpened",
674 Comm::UdpOpenDialer(&icpIncomingConnectionOpened
));
676 Ipc::StartListening(SOCK_DGRAM
,
679 Ipc::fdnInIcpSocket
, call
);
681 if ( !Config
.Addrs
.udp_outgoing
.isNoAddr() ) {
682 icpOutgoingConn
= new Comm::Connection
;
683 icpOutgoingConn
->local
= Config
.Addrs
.udp_outgoing
;
684 icpOutgoingConn
->local
.port(port
);
686 if (!Ip::EnableIpv6
&& !icpOutgoingConn
->local
.setIPv4()) {
687 debugs(49, DBG_CRITICAL
, "ERROR: IPv6 is disabled. " << icpOutgoingConn
->local
<< " is not an IPv4 address.");
688 fatal("ICP port cannot be opened.");
690 /* split-stack for now requires default IPv4-only ICP */
691 if (Ip::EnableIpv6
&IPV6_SPECIAL_SPLITSTACK
&& icpOutgoingConn
->local
.isAnyAddr()) {
692 icpOutgoingConn
->local
.setIPv4();
696 comm_open_listener(SOCK_DGRAM
, IPPROTO_UDP
, icpOutgoingConn
, "Outgoing ICP Port");
699 if (!Comm::IsConnOpen(icpOutgoingConn
))
700 fatal("Cannot open Outgoing ICP Port");
702 debugs(12, DBG_CRITICAL
, "Sending ICP messages from " << icpOutgoingConn
->local
);
704 Comm::SetSelect(icpOutgoingConn
->fd
, COMM_SELECT_READ
, icpHandleUdp
, NULL
, 0);
705 fd_note(icpOutgoingConn
->fd
, "Outgoing ICP socket");
710 icpIncomingConnectionOpened(const Comm::ConnectionPointer
&conn
, int)
712 if (!Comm::IsConnOpen(conn
))
713 fatal("Cannot open ICP Port");
715 Comm::SetSelect(conn
->fd
, COMM_SELECT_READ
, icpHandleUdp
, NULL
, 0);
717 for (const wordlist
*s
= Config
.mcast_group_list
; s
; s
= s
->next
)
718 ipcache_nbgethostbyname(s
->key
, mcastJoinGroups
, NULL
); // XXX: pass the conn for mcastJoinGroups usage.
720 debugs(12, DBG_IMPORTANT
, "Accepting ICP messages on " << conn
->local
);
722 fd_note(conn
->fd
, "Incoming ICP port");
724 if (Config
.Addrs
.udp_outgoing
.isNoAddr()) {
725 icpOutgoingConn
= conn
;
726 debugs(12, DBG_IMPORTANT
, "Sending ICP messages from " << icpOutgoingConn
->local
);
731 * icpConnectionShutdown only closes the 'in' socket if it is
732 * different than the 'out' socket.
735 icpConnectionShutdown(void)
737 if (!Comm::IsConnOpen(icpIncomingConn
))
740 debugs(12, DBG_IMPORTANT
, "Stop receiving ICP on " << icpIncomingConn
->local
);
742 /** Release the 'in' socket for lazy closure.
743 * in and out sockets may be sharing one same FD.
744 * This prevents this function from executing repeatedly.
746 icpIncomingConn
= NULL
;
749 * Normally we only write to the outgoing ICP socket, but
750 * we also have a read handler there to catch messages sent
751 * to that specific interface. During shutdown, we must
752 * disable reading on the outgoing socket.
754 assert(Comm::IsConnOpen(icpOutgoingConn
));
756 Comm::SetSelect(icpOutgoingConn
->fd
, COMM_SELECT_READ
, NULL
, NULL
, 0);
762 icpConnectionShutdown();
764 if (icpOutgoingConn
!= NULL
) {
765 debugs(12, DBG_IMPORTANT
, "Stop sending ICP from " << icpOutgoingConn
->local
);
766 icpOutgoingConn
= NULL
;
771 icpCount(void *buf
, int which
, size_t len
, int delay
)
773 icp_common_t
*icp
= (icp_common_t
*) buf
;
775 if (len
< sizeof(*icp
))
779 ++statCounter
.icp
.pkts_sent
;
780 kb_incr(&statCounter
.icp
.kbytes_sent
, len
);
782 if (ICP_QUERY
== icp
->opcode
) {
783 ++statCounter
.icp
.queries_sent
;
784 kb_incr(&statCounter
.icp
.q_kbytes_sent
, len
);
786 ++statCounter
.icp
.replies_sent
;
787 kb_incr(&statCounter
.icp
.r_kbytes_sent
, len
);
788 /* this is the sent-reply service time */
789 statCounter
.icp
.replySvcTime
.count(delay
);
792 if (ICP_HIT
== icp
->opcode
)
793 ++statCounter
.icp
.hits_sent
;
794 } else if (RECV
== which
) {
795 ++statCounter
.icp
.pkts_recv
;
796 kb_incr(&statCounter
.icp
.kbytes_recv
, len
);
798 if (ICP_QUERY
== icp
->opcode
) {
799 ++statCounter
.icp
.queries_recv
;
800 kb_incr(&statCounter
.icp
.q_kbytes_recv
, len
);
802 ++statCounter
.icp
.replies_recv
;
803 kb_incr(&statCounter
.icp
.r_kbytes_recv
, len
);
804 /* statCounter.icp.querySvcTime set in clientUpdateCounters */
807 if (ICP_HIT
== icp
->opcode
)
808 ++statCounter
.icp
.hits_recv
;
812 #define N_QUERIED_KEYS 8192
813 #define N_QUERIED_KEYS_MASK 8191
814 static cache_key queried_keys
[N_QUERIED_KEYS
][SQUID_MD5_DIGEST_LENGTH
];
817 icpSetCacheKey(const cache_key
* key
)
819 static int reqnum
= 0;
824 storeKeyCopy(queried_keys
[reqnum
& N_QUERIED_KEYS_MASK
], key
);
830 icpGetCacheKey(const char *url
, int reqnum
)
832 if (neighbors_do_private_keys
&& reqnum
)
833 return queried_keys
[reqnum
& N_QUERIED_KEYS_MASK
];
835 return storeKeyPublic(url
, Http::METHOD_GET
);