]> git.ipfire.org Git - thirdparty/squid.git/blob - src/icp_v2.cc
Implement stubs for external_acl, auth/libauth, auth/libacls, time
[thirdparty/squid.git] / src / icp_v2.cc
1 /*
2 * DEBUG: section 12 Internet Cache Protocol (ICP)
3 * AUTHOR: Duane Wessels
4 *
5 * SQUID Web Proxy Cache http://www.squid-cache.org/
6 * ----------------------------------------------------------
7 *
8 * Squid is the result of efforts by numerous individuals from
9 * the Internet community; see the CONTRIBUTORS file for full
10 * details. Many organizations have provided support for Squid's
11 * development; see the SPONSORS file for full details. Squid is
12 * Copyrighted (C) 2001 by the Regents of the University of
13 * California; see the COPYRIGHT file for full details. Squid
14 * incorporates software developed and/or copyrighted by other
15 * sources; see the CREDITS file for full details.
16 *
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
21 *
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
26 *
27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
30 *
31 */
32
33 /**
34 \defgroup ServerProtocolICPInternal2 ICPv2 Internals
35 \ingroup ServerProtocolICPAPI
36 */
37
38 #include "squid.h"
39 #include "AccessLogEntry.h"
40 #include "acl/Acl.h"
41 #include "acl/FilledChecklist.h"
42 #include "client_db.h"
43 #include "comm.h"
44 #include "comm/Connection.h"
45 #include "comm/Loops.h"
46 #include "comm/UdpOpenDialer.h"
47 #include "fd.h"
48 #include "HttpRequest.h"
49 #include "icmp/net_db.h"
50 #include "ICP.h"
51 #include "ip/Address.h"
52 #include "ip/tools.h"
53 #include "ipcache.h"
54 #include "md5.h"
55 #include "multicast.h"
56 #include "neighbors.h"
57 #include "refresh.h"
58 #include "rfc1738.h"
59 #include "SquidConfig.h"
60 #include "SquidTime.h"
61 #include "StatCounters.h"
62 #include "Store.h"
63 #include "store_key_md5.h"
64 #include "SwapDir.h"
65 #include "tools.h"
66 #include "wordlist.h"
67
68 #if HAVE_ERRNO_H
69 #include <errno.h>
70 #endif
71
72 static void icpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int errNo);
73
74 /// \ingroup ServerProtocolICPInternal2
75 static void icpLogIcp(const Ip::Address &, LogTags, int, const char *, int);
76
77 /// \ingroup ServerProtocolICPInternal2
78 static void icpHandleIcpV2(int, Ip::Address &, char *, int);
79
80 /// \ingroup ServerProtocolICPInternal2
81 static void icpCount(void *, int, size_t, int);
82
83 /**
84 \ingroup ServerProtocolICPInternal2
85 * IcpQueueHead is global so comm_incoming() knows whether or not
86 * to call icpUdpSendQueue.
87 */
88 static icpUdpData *IcpQueueHead = NULL;
89 /// \ingroup ServerProtocolICPInternal2
90 static icpUdpData *IcpQueueTail = NULL;
91
92 /// \ingroup ServerProtocolICPInternal2
93 Comm::ConnectionPointer icpIncomingConn = NULL;
94 /// \ingroup ServerProtocolICPInternal2
95 Comm::ConnectionPointer icpOutgoingConn = NULL;
96
97 /* icp_common_t */
98 _icp_common_t::_icp_common_t() :
99 opcode(ICP_INVALID), version(0), length(0), reqnum(0),
100 flags(0), pad(0), shostid(0)
101 {}
102
103 _icp_common_t::_icp_common_t(char *buf, unsigned int len) :
104 opcode(ICP_INVALID), version(0), reqnum(0), flags(0), pad(0), shostid(0)
105 {
106 if (len < sizeof(_icp_common_t)) {
107 /* mark as invalid */
108 length = len + 1;
109 return;
110 }
111
112 memcpy(this, buf, sizeof(icp_common_t));
113 /*
114 * Convert network order sensitive fields
115 */
116 length = ntohs(length);
117 reqnum = ntohl(reqnum);
118 flags = ntohl(flags);
119 pad = ntohl(pad);
120 }
121
122 icp_opcode
123 _icp_common_t::getOpCode() const
124 {
125 if (opcode > (char)ICP_END)
126 return ICP_INVALID;
127
128 return (icp_opcode)opcode;
129 }
130
131 /* ICPState */
132
133 ICPState::ICPState(icp_common_t &aHeader, HttpRequest *aRequest):
134 header(aHeader),
135 request(aRequest),
136 fd(-1),
137 url(NULL)
138 {
139 HTTPMSGLOCK(request);
140 }
141
142 ICPState::~ICPState()
143 {
144 safe_free(url);
145 HTTPMSGUNLOCK(request);
146 }
147
148 /* End ICPState */
149
150 /* ICP2State */
151
152 /// \ingroup ServerProtocolICPInternal2
153 class ICP2State : public ICPState, public StoreClient
154 {
155
156 public:
157 ICP2State(icp_common_t & aHeader, HttpRequest *aRequest):
158 ICPState(aHeader, aRequest),rtt(0),src_rtt(0),flags(0) {}
159
160 ~ICP2State();
161 void created(StoreEntry * newEntry);
162
163 int rtt;
164 int src_rtt;
165 uint32_t flags;
166 };
167
168 ICP2State::~ICP2State()
169 {}
170
171 void
172 ICP2State::created(StoreEntry *newEntry)
173 {
174 StoreEntry *entry = newEntry->isNull () ? NULL : newEntry;
175 debugs(12, 5, "icpHandleIcpV2: OPCODE " << icp_opcode_str[header.opcode]);
176 icp_opcode codeToSend;
177
178 if (icpCheckUdpHit(entry, request)) {
179 codeToSend = ICP_HIT;
180 } else {
181 #if USE_ICMP
182 if (Config.onoff.test_reachability && rtt == 0) {
183 if ((rtt = netdbHostRtt(request->GetHost())) == 0)
184 netdbPingSite(request->GetHost());
185 }
186 #endif /* USE_ICMP */
187
188 if (icpGetCommonOpcode() != ICP_ERR)
189 codeToSend = icpGetCommonOpcode();
190 else if (Config.onoff.test_reachability && rtt == 0)
191 codeToSend = ICP_MISS_NOFETCH;
192 else
193 codeToSend = ICP_MISS;
194 }
195
196 icpCreateAndSend(codeToSend, flags, url, header.reqnum, src_rtt, fd, from);
197 delete this;
198 }
199
200 /* End ICP2State */
201
202 /// \ingroup ServerProtocolICPInternal2
203 static void
204 icpLogIcp(const Ip::Address &caddr, LogTags logcode, int len, const char *url, int delay)
205 {
206 AccessLogEntry::Pointer al = new AccessLogEntry();
207
208 if (LOG_TAG_NONE == logcode)
209 return;
210
211 if (LOG_ICP_QUERY == logcode)
212 return;
213
214 clientdbUpdate(caddr, logcode, AnyP::PROTO_ICP, len);
215
216 if (!Config.onoff.log_udp)
217 return;
218
219 al->icp.opcode = ICP_QUERY;
220
221 al->url = url;
222
223 al->cache.caddr = caddr;
224
225 // XXX: move to use icp.clientReply instead
226 al->http.adaptedReply.payloadDataSz = len;
227
228 al->cache.code = logcode;
229
230 al->cache.msec = delay;
231
232 accessLogLog(al, NULL);
233 }
234
235 /// \ingroup ServerProtocolICPInternal2
236 void
237 icpUdpSendQueue(int fd, void *unused)
238 {
239 icpUdpData *q;
240
241 while ((q = IcpQueueHead) != NULL) {
242 int delay = tvSubUsec(q->queue_time, current_time);
243 /* increment delay to prevent looping */
244 const int x = icpUdpSend(fd, q->address, (icp_common_t *) q->msg, q->logcode, ++delay);
245 IcpQueueHead = q->next;
246 xfree(q);
247
248 if (x < 0)
249 break;
250 }
251 }
252
253 _icp_common_t *
254 _icp_common_t::createMessage(
255 icp_opcode opcode,
256 int flags,
257 const char *url,
258 int reqnum,
259 int pad)
260 {
261 char *buf = NULL;
262 icp_common_t *headerp = NULL;
263 char *urloffset = NULL;
264 int buf_len;
265 buf_len = sizeof(icp_common_t) + strlen(url) + 1;
266
267 if (opcode == ICP_QUERY)
268 buf_len += sizeof(uint32_t);
269
270 buf = (char *) xcalloc(buf_len, 1);
271
272 headerp = (icp_common_t *) (void *) buf;
273
274 headerp->opcode = (char) opcode;
275
276 headerp->version = ICP_VERSION_CURRENT;
277
278 headerp->length = (uint16_t) htons(buf_len);
279
280 headerp->reqnum = htonl(reqnum);
281
282 headerp->flags = htonl(flags);
283
284 headerp->pad = htonl(pad);
285
286 headerp->shostid = 0;
287
288 urloffset = buf + sizeof(icp_common_t);
289
290 if (opcode == ICP_QUERY)
291 urloffset += sizeof(uint32_t);
292
293 memcpy(urloffset, url, strlen(url));
294
295 return (icp_common_t *)buf;
296 }
297
298 int
299 icpUdpSend(int fd,
300 const Ip::Address &to,
301 icp_common_t * msg,
302 LogTags logcode,
303 int delay)
304 {
305 icpUdpData *queue;
306 int x;
307 int len;
308 len = (int) ntohs(msg->length);
309 debugs(12, 5, "icpUdpSend: FD " << fd << " sending " <<
310 icp_opcode_str[msg->opcode] << ", " << len << " bytes to " << to);
311
312 x = comm_udp_sendto(fd, to, msg, len);
313
314 if (x >= 0) {
315 /* successfully written */
316 icpLogIcp(to, logcode, len, (char *) (msg + 1), delay);
317 icpCount(msg, SENT, (size_t) len, delay);
318 safe_free(msg);
319 } else if (0 == delay) {
320 /* send failed, but queue it */
321 queue = (icpUdpData *) xcalloc(1, sizeof(icpUdpData));
322 queue->address = to;
323 queue->msg = msg;
324 queue->len = (int) ntohs(msg->length);
325 queue->queue_time = current_time;
326 queue->logcode = logcode;
327
328 if (IcpQueueHead == NULL) {
329 IcpQueueHead = queue;
330 IcpQueueTail = queue;
331 } else if (IcpQueueTail == IcpQueueHead) {
332 IcpQueueTail = queue;
333 IcpQueueHead->next = queue;
334 } else {
335 IcpQueueTail->next = queue;
336 IcpQueueTail = queue;
337 }
338
339 Comm::SetSelect(fd, COMM_SELECT_WRITE, icpUdpSendQueue, NULL, 0);
340 ++statCounter.icp.replies_queued;
341 } else {
342 /* don't queue it */
343 ++statCounter.icp.replies_dropped;
344 }
345
346 return x;
347 }
348
349 int
350 icpCheckUdpHit(StoreEntry * e, HttpRequest * request)
351 {
352 if (e == NULL)
353 return 0;
354
355 if (!e->validToSend())
356 return 0;
357
358 if (Config.onoff.icp_hit_stale)
359 return 1;
360
361 if (refreshCheckICP(e, request))
362 return 0;
363
364 return 1;
365 }
366
367 /**
368 * This routine selects an ICP opcode for ICP misses.
369 *
370 \retval ICP_ERR no opcode selected here
371 \retval ICP_MISS_NOFETCH store is rebuilding, no fetch is possible yet
372 */
373 icp_opcode
374 icpGetCommonOpcode()
375 {
376 /* if store is rebuilding, return a UDP_MISS_NOFETCH */
377
378 if ((StoreController::store_dirs_rebuilding && opt_reload_hit_only) ||
379 hit_only_mode_until > squid_curtime) {
380 return ICP_MISS_NOFETCH;
381 }
382
383 return ICP_ERR;
384 }
385
386 LogTags
387 icpLogFromICPCode(icp_opcode opcode)
388 {
389 if (opcode == ICP_ERR)
390 return LOG_UDP_INVALID;
391
392 if (opcode == ICP_DENIED)
393 return LOG_UDP_DENIED;
394
395 if (opcode == ICP_HIT)
396 return LOG_UDP_HIT;
397
398 if (opcode == ICP_MISS)
399 return LOG_UDP_MISS;
400
401 if (opcode == ICP_MISS_NOFETCH)
402 return LOG_UDP_MISS_NOFETCH;
403
404 fatal("expected ICP opcode\n");
405
406 return LOG_UDP_INVALID;
407 }
408
409 void
410 icpCreateAndSend(icp_opcode opcode, int flags, char const *url, int reqnum, int pad, int fd, const Ip::Address &from)
411 {
412 icp_common_t *reply = _icp_common_t::createMessage(opcode, flags, url, reqnum, pad);
413 icpUdpSend(fd, from, reply, icpLogFromICPCode(opcode), 0);
414 }
415
416 void
417 icpDenyAccess(Ip::Address &from, char *url, int reqnum, int fd)
418 {
419 debugs(12, 2, "icpDenyAccess: Access Denied for " << from << " by " << AclMatchedName << ".");
420
421 if (clientdbCutoffDenied(from)) {
422 /*
423 * count this DENIED query in the clientdb, even though
424 * we're not sending an ICP reply...
425 */
426 clientdbUpdate(from, LOG_UDP_DENIED, AnyP::PROTO_ICP, 0);
427 } else {
428 icpCreateAndSend(ICP_DENIED, 0, url, reqnum, 0, fd, from);
429 }
430 }
431
432 bool
433 icpAccessAllowed(Ip::Address &from, HttpRequest * icp_request)
434 {
435 /* absent any explicit rules, we deny all */
436 if (!Config.accessList.icp)
437 return false;
438
439 ACLFilledChecklist checklist(Config.accessList.icp, icp_request, NULL);
440 checklist.src_addr = from;
441 checklist.my_addr.setNoAddr();
442 return (checklist.fastCheck() == ACCESS_ALLOWED);
443 }
444
445 char const *
446 icpGetUrlToSend(char *url)
447 {
448 if (strpbrk(url, w_space))
449 return rfc1738_escape(url);
450 else
451 return url;
452 }
453
454 HttpRequest *
455 icpGetRequest(char *url, int reqnum, int fd, Ip::Address &from)
456 {
457 if (strpbrk(url, w_space)) {
458 url = rfc1738_escape(url);
459 icpCreateAndSend(ICP_ERR, 0, rfc1738_escape(url), reqnum, 0, fd, from);
460 return NULL;
461 }
462
463 HttpRequest *result;
464
465 if ((result = HttpRequest::CreateFromUrl(url)) == NULL)
466 icpCreateAndSend(ICP_ERR, 0, url, reqnum, 0, fd, from);
467
468 return result;
469
470 }
471
472 static void
473 doV2Query(int fd, Ip::Address &from, char *buf, icp_common_t header)
474 {
475 int rtt = 0;
476 int src_rtt = 0;
477 uint32_t flags = 0;
478 /* We have a valid packet */
479 char *url = buf + sizeof(icp_common_t) + sizeof(uint32_t);
480 HttpRequest *icp_request = icpGetRequest(url, header.reqnum, fd, from);
481
482 if (!icp_request)
483 return;
484
485 HTTPMSGLOCK(icp_request);
486
487 if (!icpAccessAllowed(from, icp_request)) {
488 icpDenyAccess(from, url, header.reqnum, fd);
489 HTTPMSGUNLOCK(icp_request);
490 return;
491 }
492 #if USE_ICMP
493 if (header.flags & ICP_FLAG_SRC_RTT) {
494 rtt = netdbHostRtt(icp_request->GetHost());
495 int hops = netdbHostHops(icp_request->GetHost());
496 src_rtt = ((hops & 0xFFFF) << 16) | (rtt & 0xFFFF);
497
498 if (rtt)
499 flags |= ICP_FLAG_SRC_RTT;
500 }
501 #endif /* USE_ICMP */
502
503 /* The peer is allowed to use this cache */
504 ICP2State *state = new ICP2State (header, icp_request);
505
506 state->fd = fd;
507
508 state->from = from;
509
510 state->url = xstrdup (url);
511
512 state->flags = flags;
513
514 state->rtt = rtt;
515
516 state->src_rtt = src_rtt;
517
518 StoreEntry::getPublic (state, url, Http::METHOD_GET);
519
520 HTTPMSGUNLOCK(icp_request);
521 }
522
523 void
524 _icp_common_t::handleReply(char *buf, Ip::Address &from)
525 {
526 if (neighbors_do_private_keys && reqnum == 0) {
527 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: Neighbor " << from << " returned reqnum = 0");
528 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: Disabling use of private keys");
529 neighbors_do_private_keys = 0;
530 }
531
532 char *url = buf + sizeof(icp_common_t);
533 debugs(12, 3, "icpHandleIcpV2: " << icp_opcode_str[opcode] << " from " << from << " for '" << url << "'");
534
535 const cache_key *key = icpGetCacheKey(url, (int) reqnum);
536 /* call neighborsUdpAck even if ping_status != PING_WAITING */
537 neighborsUdpAck(key, this, from);
538 }
539
540 static void
541 icpHandleIcpV2(int fd, Ip::Address &from, char *buf, int len)
542 {
543 if (len <= 0) {
544 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
545 return;
546 }
547
548 icp_common_t header(buf, len);
549 /*
550 * Length field should match the number of bytes read
551 */
552
553 if (len != header.length) {
554 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
555 return;
556 }
557
558 switch (header.opcode) {
559
560 case ICP_QUERY:
561 /* We have a valid packet */
562 doV2Query(fd, from, buf, header);
563 break;
564
565 case ICP_HIT:
566
567 case ICP_DECHO:
568
569 case ICP_MISS:
570
571 case ICP_DENIED:
572
573 case ICP_MISS_NOFETCH:
574 header.handleReply(buf, from);
575 break;
576
577 case ICP_INVALID:
578
579 case ICP_ERR:
580 break;
581
582 default:
583 debugs(12, DBG_CRITICAL, "icpHandleIcpV2: UNKNOWN OPCODE: " << header.opcode << " from " << from);
584
585 break;
586 }
587 }
588
589 #ifdef ICP_PKT_DUMP
590 static void
591 icpPktDump(icp_common_t * pkt)
592 {
593 Ip::Address a;
594
595 debugs(12, 9, "opcode: " << std::setw(3) << pkt->opcode << " " << icp_opcode_str[pkt->opcode]);
596 debugs(12, 9, "version: "<< std::left << std::setw(8) << pkt->version);
597 debugs(12, 9, "length: "<< std::left << std::setw(8) << ntohs(pkt->length));
598 debugs(12, 9, "reqnum: "<< std::left << std::setw(8) << ntohl(pkt->reqnum));
599 debugs(12, 9, "flags: "<< std::left << std::hex << std::setw(8) << ntohl(pkt->flags));
600 a = (struct in_addr)pkt->shostid;
601 debugs(12, 9, "shostid: " << a );
602 debugs(12, 9, "payload: " << (char *) pkt + sizeof(icp_common_t));
603 }
604
605 #endif
606
607 void
608 icpHandleUdp(int sock, void *data)
609 {
610 int *N = &incoming_sockets_accepted;
611
612 Ip::Address from;
613 LOCAL_ARRAY(char, buf, SQUID_UDP_SO_RCVBUF);
614 int len;
615 int icp_version;
616 int max = INCOMING_UDP_MAX;
617 Comm::SetSelect(sock, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
618
619 while (max) {
620 --max;
621 len = comm_udp_recvfrom(sock,
622 buf,
623 SQUID_UDP_SO_RCVBUF - 1,
624 0,
625 from);
626
627 if (len == 0)
628 break;
629
630 if (len < 0) {
631 if (ignoreErrno(errno))
632 break;
633
634 #if _SQUID_LINUX_
635 /* Some Linux systems seem to set the FD for reading and then
636 * return ECONNREFUSED when sendto() fails and generates an ICMP
637 * port unreachable message. */
638 /* or maybe an EHOSTUNREACH "No route to host" message */
639 if (errno != ECONNREFUSED && errno != EHOSTUNREACH)
640 #endif
641
642 debugs(50, DBG_IMPORTANT, "icpHandleUdp: FD " << sock << " recvfrom: " << xstrerror());
643
644 break;
645 }
646
647 ++(*N);
648 icpCount(buf, RECV, (size_t) len, 0);
649 buf[len] = '\0';
650 debugs(12, 4, "icpHandleUdp: FD " << sock << ": received " <<
651 (unsigned long int)len << " bytes from " << from);
652
653 #ifdef ICP_PACKET_DUMP
654
655 icpPktDump(buf);
656 #endif
657
658 if ((size_t) len < sizeof(icp_common_t)) {
659 debugs(12, 4, "icpHandleUdp: Ignoring too-small UDP packet");
660 break;
661 }
662
663 icp_version = (int) buf[1]; /* cheat! */
664
665 if (icpOutgoingConn->local == from)
666 // ignore ICP packets which loop back (multicast usually)
667 debugs(12, 4, "icpHandleUdp: Ignoring UDP packet sent by myself");
668 else if (icp_version == ICP_VERSION_2)
669 icpHandleIcpV2(sock, from, buf, len);
670 else if (icp_version == ICP_VERSION_3)
671 icpHandleIcpV3(sock, from, buf, len);
672 else
673 debugs(12, DBG_IMPORTANT, "WARNING: Unused ICP version " << icp_version <<
674 " received from " << from);
675 }
676 }
677
678 void
679 icpOpenPorts(void)
680 {
681 uint16_t port;
682
683 if ((port = Config.Port.icp) <= 0)
684 return;
685
686 icpIncomingConn = new Comm::Connection;
687 icpIncomingConn->local = Config.Addrs.udp_incoming;
688 icpIncomingConn->local.port(port);
689
690 if (!Ip::EnableIpv6 && !icpIncomingConn->local.setIPv4()) {
691 debugs(12, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpIncomingConn->local << " is not an IPv4 address.");
692 fatal("ICP port cannot be opened.");
693 }
694 /* split-stack for now requires default IPv4-only ICP */
695 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpIncomingConn->local.isAnyAddr()) {
696 icpIncomingConn->local.setIPv4();
697 }
698
699 AsyncCall::Pointer call = asyncCall(12, 2,
700 "icpIncomingConnectionOpened",
701 Comm::UdpOpenDialer(&icpIncomingConnectionOpened));
702
703 Ipc::StartListening(SOCK_DGRAM,
704 IPPROTO_UDP,
705 icpIncomingConn,
706 Ipc::fdnInIcpSocket, call);
707
708 if ( !Config.Addrs.udp_outgoing.isNoAddr() ) {
709 icpOutgoingConn = new Comm::Connection;
710 icpOutgoingConn->local = Config.Addrs.udp_outgoing;
711 icpOutgoingConn->local.port(port);
712
713 if (!Ip::EnableIpv6 && !icpOutgoingConn->local.setIPv4()) {
714 debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpOutgoingConn->local << " is not an IPv4 address.");
715 fatal("ICP port cannot be opened.");
716 }
717 /* split-stack for now requires default IPv4-only ICP */
718 if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpOutgoingConn->local.isAnyAddr()) {
719 icpOutgoingConn->local.setIPv4();
720 }
721
722 enter_suid();
723 comm_open_listener(SOCK_DGRAM, IPPROTO_UDP, icpOutgoingConn, "Outgoing ICP Port");
724 leave_suid();
725
726 if (!Comm::IsConnOpen(icpOutgoingConn))
727 fatal("Cannot open Outgoing ICP Port");
728
729 debugs(12, DBG_CRITICAL, "Sending ICP messages from " << icpOutgoingConn->local);
730
731 Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
732 fd_note(icpOutgoingConn->fd, "Outgoing ICP socket");
733 }
734 }
735
736 static void
737 icpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int errNo)
738 {
739 if (!Comm::IsConnOpen(conn))
740 fatal("Cannot open ICP Port");
741
742 Comm::SetSelect(conn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
743
744 for (const wordlist *s = Config.mcast_group_list; s; s = s->next)
745 ipcache_nbgethostbyname(s->key, mcastJoinGroups, NULL); // XXX: pass the conn for mcastJoinGroups usage.
746
747 debugs(12, DBG_IMPORTANT, "Accepting ICP messages on " << conn->local);
748
749 fd_note(conn->fd, "Incoming ICP port");
750
751 if (Config.Addrs.udp_outgoing.isNoAddr()) {
752 icpOutgoingConn = conn;
753 debugs(12, DBG_IMPORTANT, "Sending ICP messages from " << icpOutgoingConn->local);
754 }
755 }
756
757 /**
758 * icpConnectionShutdown only closes the 'in' socket if it is
759 * different than the 'out' socket.
760 */
761 void
762 icpConnectionShutdown(void)
763 {
764 if (!Comm::IsConnOpen(icpIncomingConn))
765 return;
766
767 debugs(12, DBG_IMPORTANT, "Stop receiving ICP on " << icpIncomingConn->local);
768
769 /** Release the 'in' socket for lazy closure.
770 * in and out sockets may be sharing one same FD.
771 * This prevents this function from executing repeatedly.
772 */
773 icpIncomingConn = NULL;
774
775 /**
776 * Normally we only write to the outgoing ICP socket, but
777 * we also have a read handler there to catch messages sent
778 * to that specific interface. During shutdown, we must
779 * disable reading on the outgoing socket.
780 */
781 assert(Comm::IsConnOpen(icpOutgoingConn));
782
783 Comm::SetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, NULL, NULL, 0);
784 }
785
786 void
787 icpClosePorts(void)
788 {
789 icpConnectionShutdown();
790
791 if (icpOutgoingConn != NULL) {
792 debugs(12, DBG_IMPORTANT, "Stop sending ICP from " << icpOutgoingConn->local);
793 icpOutgoingConn = NULL;
794 }
795 }
796
797 static void
798 icpCount(void *buf, int which, size_t len, int delay)
799 {
800 icp_common_t *icp = (icp_common_t *) buf;
801
802 if (len < sizeof(*icp))
803 return;
804
805 if (SENT == which) {
806 ++statCounter.icp.pkts_sent;
807 kb_incr(&statCounter.icp.kbytes_sent, len);
808
809 if (ICP_QUERY == icp->opcode) {
810 ++statCounter.icp.queries_sent;
811 kb_incr(&statCounter.icp.q_kbytes_sent, len);
812 } else {
813 ++statCounter.icp.replies_sent;
814 kb_incr(&statCounter.icp.r_kbytes_sent, len);
815 /* this is the sent-reply service time */
816 statCounter.icp.replySvcTime.count(delay);
817 }
818
819 if (ICP_HIT == icp->opcode)
820 ++statCounter.icp.hits_sent;
821 } else if (RECV == which) {
822 ++statCounter.icp.pkts_recv;
823 kb_incr(&statCounter.icp.kbytes_recv, len);
824
825 if (ICP_QUERY == icp->opcode) {
826 ++statCounter.icp.queries_recv;
827 kb_incr(&statCounter.icp.q_kbytes_recv, len);
828 } else {
829 ++statCounter.icp.replies_recv;
830 kb_incr(&statCounter.icp.r_kbytes_recv, len);
831 /* statCounter.icp.querySvcTime set in clientUpdateCounters */
832 }
833
834 if (ICP_HIT == icp->opcode)
835 ++statCounter.icp.hits_recv;
836 }
837 }
838
839 #define N_QUERIED_KEYS 8192
840 #define N_QUERIED_KEYS_MASK 8191
841 static cache_key queried_keys[N_QUERIED_KEYS][SQUID_MD5_DIGEST_LENGTH];
842
843 int
844 icpSetCacheKey(const cache_key * key)
845 {
846 static int reqnum = 0;
847
848 if (++reqnum < 0)
849 reqnum = 1;
850
851 storeKeyCopy(queried_keys[reqnum & N_QUERIED_KEYS_MASK], key);
852
853 return reqnum;
854 }
855
856 const cache_key *
857 icpGetCacheKey(const char *url, int reqnum)
858 {
859 if (neighbors_do_private_keys && reqnum)
860 return queried_keys[reqnum & N_QUERIED_KEYS_MASK];
861
862 return storeKeyPublic(url, Http::METHOD_GET);
863 }