]> git.ipfire.org Git - thirdparty/squid.git/blob - src/icp_v2.cc
transaction_initiator ACL for detecting various unusual transactions
[thirdparty/squid.git] / src / icp_v2.cc
1 /*
2 * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 12 Internet Cache Protocol (ICP) */
10
11 /**
12 \defgroup ServerProtocolICPInternal2 ICPv2 Internals
13 \ingroup ServerProtocolICPAPI
14 */
15
16 #include "squid.h"
17 #include "AccessLogEntry.h"
18 #include "acl/Acl.h"
19 #include "acl/FilledChecklist.h"
20 #include "client_db.h"
21 #include "comm.h"
22 #include "comm/Connection.h"
23 #include "comm/Loops.h"
24 #include "comm/UdpOpenDialer.h"
25 #include "fd.h"
26 #include "HttpRequest.h"
27 #include "icmp/net_db.h"
28 #include "ICP.h"
29 #include "ip/Address.h"
30 #include "ip/tools.h"
31 #include "ipcache.h"
32 #include "md5.h"
33 #include "multicast.h"
34 #include "neighbors.h"
35 #include "refresh.h"
36 #include "rfc1738.h"
37 #include "SquidConfig.h"
38 #include "SquidTime.h"
39 #include "StatCounters.h"
40 #include "Store.h"
41 #include "store_key_md5.h"
42 #include "tools.h"
43 #include "wordlist.h"
44
45 // for tvSubUsec() which should be in SquidTime.h
46 #include "util.h"
47
48 #include <cerrno>
49
50 static void icpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int errNo);
51
52 /// \ingroup ServerProtocolICPInternal2
53 static void icpLogIcp(const Ip::Address &, const LogTags &, int, const char *, int);
54
55 /// \ingroup ServerProtocolICPInternal2
56 static void icpHandleIcpV2(int, Ip::Address &, char *, int);
57
58 /// \ingroup ServerProtocolICPInternal2
59 static void icpCount(void *, int, size_t, int);
60
61 /**
62 \ingroup ServerProtocolICPInternal2
63 * IcpQueueHead is global so comm_incoming() knows whether or not
64 * to call icpUdpSendQueue.
65 */
66 static icpUdpData *IcpQueueHead = NULL;
67 /// \ingroup ServerProtocolICPInternal2
68 static icpUdpData *IcpQueueTail = NULL;
69
70 /// \ingroup ServerProtocolICPInternal2
71 Comm::ConnectionPointer icpIncomingConn = NULL;
72 /// \ingroup ServerProtocolICPInternal2
73 Comm::ConnectionPointer icpOutgoingConn = NULL;
74
75 /* icp_common_t */
76 _icp_common_t::_icp_common_t() :
77 opcode(ICP_INVALID), version(0), length(0), reqnum(0),
78 flags(0), pad(0), shostid(0)
79 {}
80
81 _icp_common_t::_icp_common_t(char *buf, unsigned int len) :
82 opcode(ICP_INVALID), version(0), reqnum(0), flags(0), pad(0), shostid(0)
83 {
84 if (len < sizeof(_icp_common_t)) {
85 /* mark as invalid */
86 length = len + 1;
87 return;
88 }
89
90 memcpy(this, buf, sizeof(icp_common_t));
91 /*
92 * Convert network order sensitive fields
93 */
94 length = ntohs(length);
95 reqnum = ntohl(reqnum);
96 flags = ntohl(flags);
97 pad = ntohl(pad);
98 }
99
100 icp_opcode
101 _icp_common_t::getOpCode() const
102 {
103 if (opcode > static_cast<char>(icp_opcode::ICP_END))
104 return ICP_INVALID;
105
106 return static_cast<icp_opcode>(opcode);
107 }
108
109 /* ICPState */
110
111 ICPState::ICPState(icp_common_t &aHeader, HttpRequest *aRequest):
112 header(aHeader),
113 request(aRequest),
114 fd(-1),
115 url(NULL)
116 {
117 HTTPMSGLOCK(request);
118 }
119
120 ICPState::~ICPState()
121 {
122 safe_free(url);
123 HTTPMSGUNLOCK(request);
124 }
125
126 /* End ICPState */
127
128 /* ICP2State */
129
130 /// \ingroup ServerProtocolICPInternal2
131 class ICP2State : public ICPState, public StoreClient
132 {
133
134 public:
135 ICP2State(icp_common_t & aHeader, HttpRequest *aRequest):
136 ICPState(aHeader, aRequest),rtt(0),src_rtt(0),flags(0) {}
137
138 ~ICP2State();
139 void created(StoreEntry * newEntry);
140
141 int rtt;
142 int src_rtt;
143 uint32_t flags;
144 };
145
146 ICP2State::~ICP2State()
147 {}
148
149 void
150 ICP2State::created(StoreEntry *newEntry)
151 {
152 StoreEntry *entry = newEntry->isNull () ? NULL : newEntry;
153 debugs(12, 5, "icpHandleIcpV2: OPCODE " << icp_opcode_str[header.opcode]);
154 icp_opcode codeToSend;
155
156 if (icpCheckUdpHit(entry, request)) {
157 codeToSend = ICP_HIT;
158 } else {
159 #if USE_ICMP
160 if (Config.onoff.test_reachability && rtt == 0) {
161 if ((rtt = netdbHostRtt(request->url.host())) == 0)
162 netdbPingSite(request->url.host());
163 }
164 #endif /* USE_ICMP */
165
166 if (icpGetCommonOpcode() != ICP_ERR)
167 codeToSend = icpGetCommonOpcode();
168 else if (Config.onoff.test_reachability && rtt == 0)
169 codeToSend = ICP_MISS_NOFETCH;
170 else
171 codeToSend = ICP_MISS;
172 }
173
174 icpCreateAndSend(codeToSend, flags, url, header.reqnum, src_rtt, fd, from);
175 delete this;
176 }
177
178 /* End ICP2State */
179
180 /// \ingroup ServerProtocolICPInternal2
181 static void
182 icpLogIcp(const Ip::Address &caddr, const LogTags &logcode, int len, const char *url, int delay)
183 {
184 AccessLogEntry::Pointer al = new AccessLogEntry();
185
186 if (LOG_TAG_NONE == logcode.oldType)
187 return;
188
189 if (LOG_ICP_QUERY == logcode.oldType)
190 return;
191
192 clientdbUpdate(caddr, logcode, AnyP::PROTO_ICP, len);
193
194 if (!Config.onoff.log_udp)
195 return;
196
197 al->icp.opcode = ICP_QUERY;
198
199 al->url = url;
200
201 al->cache.caddr = caddr;
202
203 // XXX: move to use icp.clientReply instead
204 al->http.clientReplySz.payloadData = len;
205
206 al->cache.code = logcode;
207
208 al->cache.trTime.tv_sec = delay;
209
210 accessLogLog(al, NULL);
211 }
212
213 /// \ingroup ServerProtocolICPInternal2
214 void
215 icpUdpSendQueue(int fd, void *)
216 {
217 icpUdpData *q;
218
219 while ((q = IcpQueueHead) != NULL) {
220 int delay = tvSubUsec(q->queue_time, current_time);
221 /* increment delay to prevent looping */
222 const int x = icpUdpSend(fd, q->address, (icp_common_t *) q->msg, q->logcode, ++delay);
223 IcpQueueHead = q->next;
224 xfree(q);
225
226 if (x < 0)
227 break;
228 }
229 }
230
231 _icp_common_t *
232 _icp_common_t::createMessage(
233 icp_opcode opcode,
234 int flags,
235 const char *url,
236 int reqnum,
237 int pad)
238 {
239 char *buf = NULL;
240 icp_common_t *headerp = NULL;
241 char *urloffset = NULL;
242 int buf_len;
243 buf_len = sizeof(icp_common_t) + strlen(url) + 1;
244
245 if (opcode == ICP_QUERY)
246 buf_len += sizeof(uint32_t);
247
248 buf = (char *) xcalloc(buf_len, 1);
249
250 headerp = (icp_common_t *) (void *) buf;
251
252 headerp->opcode = (char) opcode;
253
254 headerp->version = ICP_VERSION_CURRENT;
255
256 headerp->length = (uint16_t) htons(buf_len);
257
258 headerp->reqnum = htonl(reqnum);
259
260 headerp->flags = htonl(flags);
261
262 headerp->pad = htonl(pad);
263
264 headerp->shostid = 0;
265
266 urloffset = buf + sizeof(icp_common_t);
267
268 if (opcode == ICP_QUERY)
269 urloffset += sizeof(uint32_t);
270
271 memcpy(urloffset, url, strlen(url));
272
273 return (icp_common_t *)buf;
274 }
275
276 int
277 icpUdpSend(int fd,
278 const Ip::Address &to,
279 icp_common_t * msg,
280 const LogTags &logcode,
281 int delay)
282 {
283 icpUdpData *queue;
284 int x;
285 int len;
286 len = (int) ntohs(msg->length);
287 debugs(12, 5, "icpUdpSend: FD " << fd << " sending " <<
288 icp_opcode_str[msg->opcode] << ", " << len << " bytes to " << to);
289
290 x = comm_udp_sendto(fd, to, msg, len);
291
292 if (x >= 0) {
293 /* successfully written */
294 icpLogIcp(to, logcode, len, (char *) (msg + 1), delay);
295 icpCount(msg, SENT, (size_t) len, delay);
296 safe_free(msg);
297 } else if (0 == delay) {
298 /* send failed, but queue it */
299 queue = (icpUdpData *) xcalloc(1, sizeof(icpUdpData));
300 queue->address = to;
301 queue->msg = msg;
302 queue->len = (int) ntohs(msg->length);
303 queue->queue_time = current_time;
304 queue->logcode = logcode;
305
306 if (IcpQueueHead == NULL) {
307 IcpQueueHead = queue;
308 IcpQueueTail = queue;
309 } else if (IcpQueueTail == IcpQueueHead) {
310 IcpQueueTail = queue;
311 IcpQueueHead->next = queue;
312 } else {
313 IcpQueueTail->next = queue;
314 IcpQueueTail = queue;
315 }
316
317 Comm::SetSelect(fd, COMM_SELECT_WRITE, icpUdpSendQueue, NULL, 0);
318 ++statCounter.icp.replies_queued;
319 } else {
320 /* don't queue it */
321 ++statCounter.icp.replies_dropped;
322 }
323
324 return x;
325 }
326
327 int
328 icpCheckUdpHit(StoreEntry * e, HttpRequest * request)
329 {
330 if (e == NULL)
331 return 0;
332
333 if (!e->validToSend())
334 return 0;
335
336 if (Config.onoff.icp_hit_stale)
337 return 1;
338
339 if (refreshCheckICP(e, request))
340 return 0;
341
342 return 1;
343 }
344
345 /**
346 * This routine selects an ICP opcode for ICP misses.
347 *
348 \retval ICP_ERR no opcode selected here
349 \retval ICP_MISS_NOFETCH store is rebuilding, no fetch is possible yet
350 */
351 icp_opcode
352 icpGetCommonOpcode()
353 {
354 /* if store is rebuilding, return a UDP_MISS_NOFETCH */
355
356 if ((StoreController::store_dirs_rebuilding && opt_reload_hit_only) ||
357 hit_only_mode_until > squid_curtime) {
358 return ICP_MISS_NOFETCH;
359 }
360
361 return ICP_ERR;
362 }
363
364 LogTags
365 icpLogFromICPCode(icp_opcode opcode)
366 {
367 if (opcode == ICP_ERR)
368 return LOG_UDP_INVALID;
369
370 if (opcode == ICP_DENIED)
371 return LOG_UDP_DENIED;
372
373 if (opcode == ICP_HIT)
374 return LOG_UDP_HIT;
375
376 if (opcode == ICP_MISS)
377 return LOG_UDP_MISS;
378
379 if (opcode == ICP_MISS_NOFETCH)
380 return LOG_UDP_MISS_NOFETCH;
381
382 fatal("expected ICP opcode\n");
383
384 return LOG_UDP_INVALID;
385 }
386
387 void
388 icpCreateAndSend(icp_opcode opcode, int flags, char const *url, int reqnum, int pad, int fd, const Ip::Address &from)
389 {
390 icp_common_t *reply = _icp_common_t::createMessage(opcode, flags, url, reqnum, pad);
391 icpUdpSend(fd, from, reply, icpLogFromICPCode(opcode), 0);
392 }
393
394 void
395 icpDenyAccess(Ip::Address &from, char *url, int reqnum, int fd)
396 {
397 debugs(12, 2, "icpDenyAccess: Access Denied for " << from << " by " << AclMatchedName << ".");
398
399 if (clientdbCutoffDenied(from)) {
400 /*
401 * count this DENIED query in the clientdb, even though
402 * we're not sending an ICP reply...
403 */
404 clientdbUpdate(from, LOG_UDP_DENIED, AnyP::PROTO_ICP, 0);
405 } else {
406 icpCreateAndSend(ICP_DENIED, 0, url, reqnum, 0, fd, from);
407 }
408 }
409
410 bool
411 icpAccessAllowed(Ip::Address &from, HttpRequest * icp_request)
412 {
413 /* absent any explicit rules, we deny all */
414 if (!Config.accessList.icp)
415 return false;
416
417 ACLFilledChecklist checklist(Config.accessList.icp, icp_request, NULL);
418 checklist.src_addr = from;
419 checklist.my_addr.setNoAddr();
420 return (checklist.fastCheck() == ACCESS_ALLOWED);
421 }
422
423 char const *
424 icpGetUrlToSend(char *url)
425 {
426 if (strpbrk(url, w_space))
427 return rfc1738_escape(url);
428 else
429 return url;
430 }
431
432 HttpRequest *
433 icpGetRequest(char *url, int reqnum, int fd, Ip::Address &from)
434 {
435 if (strpbrk(url, w_space)) {
436 url = rfc1738_escape(url);
437 icpCreateAndSend(ICP_ERR, 0, rfc1738_escape(url), reqnum, 0, fd, from);
438 return NULL;
439 }
440
441 HttpRequest *result;
442 const MasterXaction::Pointer mx = new MasterXaction(XactionInitiator::initIcp);
443 if ((result = HttpRequest::FromUrl(url, mx)) == NULL)
444 icpCreateAndSend(ICP_ERR, 0, url, reqnum, 0, fd, from);
445
446 return result;
447
448 }
449
450 static void
451 doV2Query(int fd, Ip::Address &from, char *buf, icp_common_t header)
452 {
453 int rtt = 0;
454 int src_rtt = 0;
455 uint32_t flags = 0;
456 /* We have a valid packet */
457 char *url = buf + sizeof(icp_common_t) + sizeof(uint32_t);
458 HttpRequest *icp_request = icpGetRequest(url, header.reqnum, fd, from);
459
460 if (!icp_request)
461 return;
462
463 HTTPMSGLOCK(icp_request);
464
465 if (!icpAccessAllowed(from, icp_request)) {
466 icpDenyAccess(from, url, header.reqnum, fd);
467 HTTPMSGUNLOCK(icp_request);
468 return;
469 }
470 #if USE_ICMP
471 if (header.flags & ICP_FLAG_SRC_RTT) {
472 rtt = netdbHostRtt(icp_request->url.host());
473 int hops = netdbHostHops(icp_request->url.host());
474 src_rtt = ((hops & 0xFFFF) << 16) | (rtt & 0xFFFF);
475
476 if (rtt)
477 flags |= ICP_FLAG_SRC_RTT;
478 }
479 #endif /* USE_ICMP */
480
481 /* The peer is allowed to use this cache */
482 ICP2State *state = new ICP2State(header, icp_request);
483 state->fd = fd;
484 state->from = from;
485 state->url = xstrdup(url);
486 state->flags = flags;
487 state->rtt = rtt;
488 state->src_rtt = src_rtt;
489
490 StoreEntry::getPublic(state, url, Http::METHOD_GET);
491
492 HTTPMSGUNLOCK(icp_request);
493 }
494
495 void
496 _icp_common_t::handleReply(char *buf, Ip::Address &from)
497 {
498 if (neighbors_do_private_keys && reqnum == 0) {
499 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: Neighbor " << from << " returned reqnum = 0");
500 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: Disabling use of private keys");
501 neighbors_do_private_keys = 0;
502 }
503
504 char *url = buf + sizeof(icp_common_t);
505 debugs(12, 3, "icpHandleIcpV2: " << icp_opcode_str[opcode] << " from " << from << " for '" << url << "'");
506
507 const cache_key *key = icpGetCacheKey(url, (int) reqnum);
508 /* call neighborsUdpAck even if ping_status != PING_WAITING */
509 neighborsUdpAck(key, this, from);
510 }
511
512 static void
513 icpHandleIcpV2(int fd, Ip::Address &from, char *buf, int len)
514 {
515 if (len <= 0) {
516 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
517 return;
518 }
519
520 icp_common_t header(buf, len);
521 /*
522 * Length field should match the number of bytes read
523 */
524
525 if (len != header.length) {
526 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
527 return;
528 }
529
530 switch (header.opcode) {
531
532 case ICP_QUERY:
533 /* We have a valid packet */
534 doV2Query(fd, from, buf, header);
535 break;
536
537 case ICP_HIT:
538
539 case ICP_DECHO:
540
541 case ICP_MISS:
542
543 case ICP_DENIED:
544
545 case ICP_MISS_NOFETCH:
546 header.handleReply(buf, from);
547 break;
548
549 case ICP_INVALID:
550
551 case ICP_ERR:
552 break;
553
554 default:
555 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: UNKNOWN OPCODE: " << header.opcode << " from " << from);
556
557 break;
558 }
559 }
560
561 #ifdef ICP_PKT_DUMP
562 static void
563 icpPktDump(icp_common_t * pkt)
564 {
565 Ip::Address a;
566
567 debugs(12, 9, "opcode: " << std::setw(3) << pkt->opcode << " " << icp_opcode_str[pkt->opcode]);
568 debugs(12, 9, "version: "<< std::left << std::setw(8) << pkt->version);
569 debugs(12, 9, "length: "<< std::left << std::setw(8) << ntohs(pkt->length));
570 debugs(12, 9, "reqnum: "<< std::left << std::setw(8) << ntohl(pkt->reqnum));
571 debugs(12, 9, "flags: "<< std::left << std::hex << std::setw(8) << ntohl(pkt->flags));
572 a = (struct in_addr)pkt->shostid;
573 debugs(12, 9, "shostid: " << a );
574 debugs(12, 9, "payload: " << (char *) pkt + sizeof(icp_common_t));
575 }
576
577 #endif
578
579 void
580 icpHandleUdp(int sock, void *)
581 {
582 int *N = &incoming_sockets_accepted;
583
584 Ip::Address from;
585 LOCAL_ARRAY(char, buf, SQUID_UDP_SO_RCVBUF);
586 int len;
587 int icp_version;
588 int max = INCOMING_UDP_MAX;
589 Comm::SetSelect(sock, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
590
591 while (max) {
592 --max;
593 len = comm_udp_recvfrom(sock,
594 buf,
595 SQUID_UDP_SO_RCVBUF - 1,
596 0,
597 from);
598
599 if (len == 0)
600 break;
601
602 if (len < 0) {
603 int xerrno = errno;
604 if (ignoreErrno(xerrno))
605 break;
606
607 #if _SQUID_LINUX_
608 /* Some Linux systems seem to set the FD for reading and then
609 * return ECONNREFUSED when sendto() fails and generates an ICMP
610 * port unreachable message. */
611 /* or maybe an EHOSTUNREACH "No route to host" message */
612 if (xerrno != ECONNREFUSED && xerrno != EHOSTUNREACH)
613 #endif
614 debugs(50, DBG_IMPORTANT, "icpHandleUdp: FD " << sock << " recvfrom: " << xstrerr(xerrno));
615
616 break;
617 }
618
619 ++(*N);
620 icpCount(buf, RECV, (size_t) len, 0);
621 buf[len] = '\0';
622 debugs(12, 4, "icpHandleUdp: FD " << sock << ": received " <<
623 (unsigned long int)len << " bytes from " << from);
624
625 #ifdef ICP_PACKET_DUMP
626
627 icpPktDump(buf);
628 #endif
629
630 if ((size_t) len < sizeof(icp_common_t)) {
631 debugs(12, 4, "icpHandleUdp: Ignoring too-small UDP packet");
632 break;
633 }
634
635 icp_version = (int) buf[1]; /* cheat! */
636
637 if (icpOutgoingConn->local == from)
638 // ignore ICP packets which loop back (multicast usually)
639 debugs(12, 4, "icpHandleUdp: Ignoring UDP packet sent by myself");
640 else if (icp_version == ICP_VERSION_2)
641 icpHandleIcpV2(sock, from, buf, len);
642 else if (icp_version == ICP_VERSION_3)
643 icpHandleIcpV3(sock, from, buf, len);
644 else
645 debugs(12, DBG_IMPORTANT, "WARNING: Unused ICP version " << icp_version <<
646 " received from " << from);
647 }
648 }
649
650 void
651 icpOpenPorts(void)
652 {
653 uint16_t port;
654
655 if ((port = Config.Port.icp) <= 0)
656 return;
657
658 icpIncomingConn = new Comm::Connection;
659 icpIncomingConn->local = Config.Addrs.udp_incoming;
660 icpIncomingConn->local.port(port);
661
662 if (!Ip::EnableIpv6 && !icpIncomingConn->local.setIPv4()) {
663 debugs(12, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpIncomingConn->local << " is not an IPv4 address.");
664 fatal("ICP port cannot be opened.");
665 }
666 /* split-stack for now requires default IPv4-only ICP */
667 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpIncomingConn->local.isAnyAddr()) {
668 icpIncomingConn->local.setIPv4();
669 }
670
671 AsyncCall::Pointer call = asyncCall(12, 2,
672 "icpIncomingConnectionOpened",
673 Comm::UdpOpenDialer(&icpIncomingConnectionOpened));
674
675 Ipc::StartListening(SOCK_DGRAM,
676 IPPROTO_UDP,
677 icpIncomingConn,
678 Ipc::fdnInIcpSocket, call);
679
680 if ( !Config.Addrs.udp_outgoing.isNoAddr() ) {
681 icpOutgoingConn = new Comm::Connection;
682 icpOutgoingConn->local = Config.Addrs.udp_outgoing;
683 icpOutgoingConn->local.port(port);
684
685 if (!Ip::EnableIpv6 && !icpOutgoingConn->local.setIPv4()) {
686 debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpOutgoingConn->local << " is not an IPv4 address.");
687 fatal("ICP port cannot be opened.");
688 }
689 /* split-stack for now requires default IPv4-only ICP */
690 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpOutgoingConn->local.isAnyAddr()) {
691 icpOutgoingConn->local.setIPv4();
692 }
693
694 enter_suid();
695 comm_open_listener(SOCK_DGRAM, IPPROTO_UDP, icpOutgoingConn, "Outgoing ICP Port");
696 leave_suid();
697
698 if (!Comm::IsConnOpen(icpOutgoingConn))
699 fatal("Cannot open Outgoing ICP Port");
700
701 debugs(12, DBG_CRITICAL, "Sending ICP messages from " << icpOutgoingConn->local);
702
703 Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
704 fd_note(icpOutgoingConn->fd, "Outgoing ICP socket");
705 }
706 }
707
708 static void
709 icpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int)
710 {
711 if (!Comm::IsConnOpen(conn))
712 fatal("Cannot open ICP Port");
713
714 Comm::SetSelect(conn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
715
716 for (const wordlist *s = Config.mcast_group_list; s; s = s->next)
717 ipcache_nbgethostbyname(s->key, mcastJoinGroups, NULL); // XXX: pass the conn for mcastJoinGroups usage.
718
719 debugs(12, DBG_IMPORTANT, "Accepting ICP messages on " << conn->local);
720
721 fd_note(conn->fd, "Incoming ICP port");
722
723 if (Config.Addrs.udp_outgoing.isNoAddr()) {
724 icpOutgoingConn = conn;
725 debugs(12, DBG_IMPORTANT, "Sending ICP messages from " << icpOutgoingConn->local);
726 }
727 }
728
729 /**
730 * icpConnectionShutdown only closes the 'in' socket if it is
731 * different than the 'out' socket.
732 */
733 void
734 icpConnectionShutdown(void)
735 {
736 if (!Comm::IsConnOpen(icpIncomingConn))
737 return;
738
739 debugs(12, DBG_IMPORTANT, "Stop receiving ICP on " << icpIncomingConn->local);
740
741 /** Release the 'in' socket for lazy closure.
742 * in and out sockets may be sharing one same FD.
743 * This prevents this function from executing repeatedly.
744 */
745 icpIncomingConn = NULL;
746
747 /**
748 * Normally we only write to the outgoing ICP socket, but
749 * we also have a read handler there to catch messages sent
750 * to that specific interface. During shutdown, we must
751 * disable reading on the outgoing socket.
752 */
753 assert(Comm::IsConnOpen(icpOutgoingConn));
754
755 Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, NULL, NULL, 0);
756 }
757
758 void
759 icpClosePorts(void)
760 {
761 icpConnectionShutdown();
762
763 if (icpOutgoingConn != NULL) {
764 debugs(12, DBG_IMPORTANT, "Stop sending ICP from " << icpOutgoingConn->local);
765 icpOutgoingConn = NULL;
766 }
767 }
768
769 static void
770 icpCount(void *buf, int which, size_t len, int delay)
771 {
772 icp_common_t *icp = (icp_common_t *) buf;
773
774 if (len < sizeof(*icp))
775 return;
776
777 if (SENT == which) {
778 ++statCounter.icp.pkts_sent;
779 statCounter.icp.kbytes_sent += len;
780
781 if (ICP_QUERY == icp->opcode) {
782 ++statCounter.icp.queries_sent;
783 statCounter.icp.q_kbytes_sent += len;
784 } else {
785 ++statCounter.icp.replies_sent;
786 statCounter.icp.r_kbytes_sent += len;
787 /* this is the sent-reply service time */
788 statCounter.icp.replySvcTime.count(delay);
789 }
790
791 if (ICP_HIT == icp->opcode)
792 ++statCounter.icp.hits_sent;
793 } else if (RECV == which) {
794 ++statCounter.icp.pkts_recv;
795 statCounter.icp.kbytes_recv += len;
796
797 if (ICP_QUERY == icp->opcode) {
798 ++statCounter.icp.queries_recv;
799 statCounter.icp.q_kbytes_recv += len;
800 } else {
801 ++statCounter.icp.replies_recv;
802 statCounter.icp.r_kbytes_recv += len;
803 /* statCounter.icp.querySvcTime set in clientUpdateCounters */
804 }
805
806 if (ICP_HIT == icp->opcode)
807 ++statCounter.icp.hits_recv;
808 }
809 }
810
811 #define N_QUERIED_KEYS 8192
812 #define N_QUERIED_KEYS_MASK 8191
813 static cache_key queried_keys[N_QUERIED_KEYS][SQUID_MD5_DIGEST_LENGTH];
814
815 int
816 icpSetCacheKey(const cache_key * key)
817 {
818 static int reqnum = 0;
819
820 if (++reqnum < 0)
821 reqnum = 1;
822
823 storeKeyCopy(queried_keys[reqnum & N_QUERIED_KEYS_MASK], key);
824
825 return reqnum;
826 }
827
828 const cache_key *
829 icpGetCacheKey(const char *url, int reqnum)
830 {
831 if (neighbors_do_private_keys && reqnum)
832 return queried_keys[reqnum & N_QUERIED_KEYS_MASK];
833
834 return storeKeyPublic(url, Http::METHOD_GET);
835 }
836