]> git.ipfire.org Git - thirdparty/squid.git/blob - src/icp_v2.cc
Support ICP and HTCP _servers_ sharing listening sockets.
[thirdparty/squid.git] / src / icp_v2.cc
1 /*
2 * DEBUG: section 12 Internet Cache Protocol (ICP)
3 * AUTHOR: Duane Wessels
4 *
5 * SQUID Web Proxy Cache http://www.squid-cache.org/
6 * ----------------------------------------------------------
7 *
8 * Squid is the result of efforts by numerous individuals from
9 * the Internet community; see the CONTRIBUTORS file for full
10 * details. Many organizations have provided support for Squid's
11 * development; see the SPONSORS file for full details. Squid is
12 * Copyrighted (C) 2001 by the Regents of the University of
13 * California; see the COPYRIGHT file for full details. Squid
14 * incorporates software developed and/or copyrighted by other
15 * sources; see the CREDITS file for full details.
16 *
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
21 *
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
26 *
27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
30 *
31 */
32
33 /**
34 \defgroup ServerProtocolICPInternal2 ICPv2 Internals
35 \ingroup ServerProtocolICPAPI
36 */
37
38 #include "squid.h"
39 #include "Store.h"
40 #include "comm.h"
41 #include "ICP.h"
42 #include "HttpRequest.h"
43 #include "acl/FilledChecklist.h"
44 #include "acl/Acl.h"
45 #include "AccessLogEntry.h"
46 #include "wordlist.h"
47 #include "SquidTime.h"
48 #include "SwapDir.h"
49 #include "icmp/net_db.h"
50 #include "ip/IpAddress.h"
51 #include "ipc/StartListening.h"
52 #include "rfc1738.h"
53
54 /// dials icpIncomingConnectionOpened call
55 class IcpListeningStartedDialer: public CallDialer,
56 public Ipc::StartListeningCb
57 {
58 public:
59 typedef void (*Handler)(int fd, int errNo, IpAddress& addr);
60 IcpListeningStartedDialer(Handler aHandler, IpAddress& anAddr):
61 handler(aHandler), addr(anAddr) {}
62
63 virtual void print(std::ostream &os) const { startPrint(os) <<
64 ", address=" << addr << ')'; }
65
66 virtual bool canDial(AsyncCall &) const { return true; }
67 virtual void dial(AsyncCall &) { (handler)(fd, errNo, addr); }
68
69 public:
70 Handler handler;
71 IpAddress addr;
72 };
73
74 static void icpIncomingConnectionOpened(int fd, int errNo, IpAddress& addr);
75
76 /// \ingroup ServerProtocolICPInternal2
77 static void icpLogIcp(const IpAddress &, log_type, int, const char *, int);
78
79 /// \ingroup ServerProtocolICPInternal2
80 static void icpHandleIcpV2(int, IpAddress &, char *, int);
81
82 /// \ingroup ServerProtocolICPInternal2
83 static void icpCount(void *, int, size_t, int);
84
85 /**
86 \ingroup ServerProtocolICPInternal2
87 * IcpQueueHead is global so comm_incoming() knows whether or not
88 * to call icpUdpSendQueue.
89 */
90 static icpUdpData *IcpQueueHead = NULL;
91 /// \ingroup ServerProtocolICPInternal2
92 static icpUdpData *IcpQueueTail = NULL;
93
94 /// \ingroup ServerProtocolICPInternal2
95 IpAddress theOutICPAddr;
96
97 /* icp_common_t */
98 _icp_common_t::_icp_common_t() : opcode(ICP_INVALID), version(0), length(0), reqnum(0), flags(0), pad(0), shostid(0)
99 {}
100
101 _icp_common_t::_icp_common_t(char *buf, unsigned int len)
102 {
103 if (len < sizeof(_icp_common_t)) {
104 /* mark as invalid */
105 length = len + 1;
106 return;
107 }
108
109 xmemcpy(this, buf, sizeof(icp_common_t));
110 /*
111 * Convert network order sensitive fields
112 */
113 length = ntohs(length);
114 reqnum = ntohl(reqnum);
115 flags = ntohl(flags);
116 pad = ntohl(pad);
117 }
118
119 icp_opcode
120 _icp_common_t::getOpCode() const
121 {
122 if (opcode > (char)ICP_END)
123 return ICP_INVALID;
124
125 return (icp_opcode)opcode;
126 }
127
128 /* ICPState */
129
130 ICPState::ICPState(icp_common_t &aHeader, HttpRequest *aRequest):
131 header(aHeader),
132 request(HTTPMSGLOCK(aRequest)),
133 fd(-1),
134 url(NULL)
135 {}
136
137 ICPState::~ICPState()
138 {
139 safe_free(url);
140 HTTPMSGUNLOCK(request);
141 }
142
143
144 /* End ICPState */
145
146 /* ICP2State */
147
148 /// \ingroup ServerProtocolICPInternal2
149 class ICP2State : public ICPState, public StoreClient
150 {
151
152 public:
153 ICP2State(icp_common_t & aHeader, HttpRequest *aRequest):
154 ICPState(aHeader, aRequest),rtt(0),src_rtt(0),flags(0) {}
155
156 ~ICP2State();
157 void created(StoreEntry * newEntry);
158
159 int rtt;
160 int src_rtt;
161 u_int32_t flags;
162 };
163
164 ICP2State::~ICP2State()
165 {}
166
167 void
168 ICP2State::created(StoreEntry *newEntry)
169 {
170 StoreEntry *entry = newEntry->isNull () ? NULL : newEntry;
171 debugs(12, 5, "icpHandleIcpV2: OPCODE " << icp_opcode_str[header.opcode]);
172 icp_opcode codeToSend;
173
174 if (icpCheckUdpHit(entry, request)) {
175 codeToSend = ICP_HIT;
176 } else {
177 #if USE_ICMP
178 if (Config.onoff.test_reachability && rtt == 0) {
179 if ((rtt = netdbHostRtt(request->GetHost())) == 0)
180 netdbPingSite(request->GetHost());
181 }
182 #endif /* USE_ICMP */
183
184 if (icpGetCommonOpcode() != ICP_ERR)
185 codeToSend = icpGetCommonOpcode();
186 else if (Config.onoff.test_reachability && rtt == 0)
187 codeToSend = ICP_MISS_NOFETCH;
188 else
189 codeToSend = ICP_MISS;
190 }
191
192 icpCreateAndSend(codeToSend, flags, url, header.reqnum, src_rtt, fd, from);
193 delete this;
194 }
195
196 /* End ICP2State */
197
198 /// \ingroup ServerProtocolICPInternal2
199 static void
200 icpLogIcp(const IpAddress &caddr, log_type logcode, int len, const char *url, int delay)
201 {
202 AccessLogEntry al;
203
204 if (LOG_TAG_NONE == logcode)
205 return;
206
207 if (LOG_ICP_QUERY == logcode)
208 return;
209
210 clientdbUpdate(caddr, logcode, PROTO_ICP, len);
211
212 if (!Config.onoff.log_udp)
213 return;
214
215 al.icp.opcode = ICP_QUERY;
216
217 al.url = url;
218
219 al.cache.caddr = caddr;
220
221 al.cache.replySize = len;
222
223 al.cache.code = logcode;
224
225 al.cache.msec = delay;
226
227 accessLogLog(&al, NULL);
228 }
229
230 /// \ingroup ServerProtocolICPInternal2
231 void
232 icpUdpSendQueue(int fd, void *unused)
233 {
234 icpUdpData *q;
235 int x;
236 int delay;
237
238 while ((q = IcpQueueHead) != NULL) {
239 delay = tvSubUsec(q->queue_time, current_time);
240 /* increment delay to prevent looping */
241 x = icpUdpSend(fd, q->address, (icp_common_t *) q->msg, q->logcode, ++delay);
242 IcpQueueHead = q->next;
243 safe_free(q);
244
245 if (x < 0)
246 break;
247 }
248 }
249
250 _icp_common_t *
251 _icp_common_t::createMessage(
252 icp_opcode opcode,
253 int flags,
254 const char *url,
255 int reqnum,
256 int pad)
257 {
258 char *buf = NULL;
259 icp_common_t *headerp = NULL;
260 char *urloffset = NULL;
261 int buf_len;
262 buf_len = sizeof(icp_common_t) + strlen(url) + 1;
263
264 if (opcode == ICP_QUERY)
265 buf_len += sizeof(u_int32_t);
266
267 buf = (char *) xcalloc(buf_len, 1);
268
269 headerp = (icp_common_t *) (void *) buf;
270
271 headerp->opcode = (char) opcode;
272
273 headerp->version = ICP_VERSION_CURRENT;
274
275 headerp->length = (u_int16_t) htons(buf_len);
276
277 headerp->reqnum = htonl(reqnum);
278
279 headerp->flags = htonl(flags);
280
281 headerp->pad = htonl(pad);
282
283 theOutICPAddr.GetInAddr( *((struct in_addr*)&headerp->shostid) );
284
285 urloffset = buf + sizeof(icp_common_t);
286
287 if (opcode == ICP_QUERY)
288 urloffset += sizeof(u_int32_t);
289
290 xmemcpy(urloffset, url, strlen(url));
291
292 return (icp_common_t *)buf;
293 }
294
295 int
296 icpUdpSend(int fd,
297 const IpAddress &to,
298 icp_common_t * msg,
299 log_type logcode,
300 int delay)
301 {
302 icpUdpData *queue;
303 int x;
304 int len;
305 len = (int) ntohs(msg->length);
306 debugs(12, 5, "icpUdpSend: FD " << fd << " sending " <<
307 icp_opcode_str[msg->opcode] << ", " << len << " bytes to " << to);
308
309 x = comm_udp_sendto(fd, to, msg, len);
310
311 if (x >= 0) {
312 /* successfully written */
313 icpLogIcp(to, logcode, len, (char *) (msg + 1), delay);
314 icpCount(msg, SENT, (size_t) len, delay);
315 safe_free(msg);
316 } else if (0 == delay) {
317 /* send failed, but queue it */
318 queue = (icpUdpData *) xcalloc(1, sizeof(icpUdpData));
319 queue->address = to;
320 queue->msg = msg;
321 queue->len = (int) ntohs(msg->length);
322 queue->queue_time = current_time;
323 queue->logcode = logcode;
324
325 if (IcpQueueHead == NULL) {
326 IcpQueueHead = queue;
327 IcpQueueTail = queue;
328 } else if (IcpQueueTail == IcpQueueHead) {
329 IcpQueueTail = queue;
330 IcpQueueHead->next = queue;
331 } else {
332 IcpQueueTail->next = queue;
333 IcpQueueTail = queue;
334 }
335
336 commSetSelect(fd, COMM_SELECT_WRITE, icpUdpSendQueue, NULL, 0);
337 statCounter.icp.replies_queued++;
338 } else {
339 /* don't queue it */
340 statCounter.icp.replies_dropped++;
341 }
342
343 return x;
344 }
345
346 int
347 icpCheckUdpHit(StoreEntry * e, HttpRequest * request)
348 {
349 if (e == NULL)
350 return 0;
351
352 if (!e->validToSend())
353 return 0;
354
355 if (Config.onoff.icp_hit_stale)
356 return 1;
357
358 if (refreshCheckICP(e, request))
359 return 0;
360
361 return 1;
362 }
363
364 /**
365 * This routine selects an ICP opcode for ICP misses.
366 *
367 \retval ICP_ERR no opcode selected here
368 \retval ICP_MISS_NOFETCH store is rebuilding, no fetch is possible yet
369 */
370 icp_opcode
371 icpGetCommonOpcode()
372 {
373 /* if store is rebuilding, return a UDP_MISS_NOFETCH */
374
375 if ((StoreController::store_dirs_rebuilding && opt_reload_hit_only) ||
376 hit_only_mode_until > squid_curtime) {
377 return ICP_MISS_NOFETCH;
378 }
379
380 return ICP_ERR;
381 }
382
383 log_type
384 icpLogFromICPCode(icp_opcode opcode)
385 {
386 if (opcode == ICP_ERR)
387 return LOG_UDP_INVALID;
388
389 if (opcode == ICP_DENIED)
390 return LOG_UDP_DENIED;
391
392 if (opcode == ICP_HIT)
393 return LOG_UDP_HIT;
394
395 if (opcode == ICP_MISS)
396 return LOG_UDP_MISS;
397
398 if (opcode == ICP_MISS_NOFETCH)
399 return LOG_UDP_MISS_NOFETCH;
400
401 fatal("expected ICP opcode\n");
402
403 return LOG_UDP_INVALID;
404 }
405
406 void
407 icpCreateAndSend(icp_opcode opcode, int flags, char const *url, int reqnum, int pad, int fd, const IpAddress &from)
408 {
409 icp_common_t *reply = _icp_common_t::createMessage(opcode, flags, url, reqnum, pad);
410 icpUdpSend(fd, from, reply, icpLogFromICPCode(opcode), 0);
411 }
412
413 void
414 icpDenyAccess(IpAddress &from, char *url, int reqnum, int fd)
415 {
416 debugs(12, 2, "icpDenyAccess: Access Denied for " << from << " by " << AclMatchedName << ".");
417
418 if (clientdbCutoffDenied(from)) {
419 /*
420 * count this DENIED query in the clientdb, even though
421 * we're not sending an ICP reply...
422 */
423 clientdbUpdate(from, LOG_UDP_DENIED, PROTO_ICP, 0);
424 } else {
425 icpCreateAndSend(ICP_DENIED, 0, url, reqnum, 0, fd, from);
426 }
427 }
428
429 int
430 icpAccessAllowed(IpAddress &from, HttpRequest * icp_request)
431 {
432 /* absent an explicit allow, we deny all */
433 if (!Config.accessList.icp)
434 return 0;
435
436 ACLFilledChecklist checklist(Config.accessList.icp, icp_request, NULL);
437 checklist.src_addr = from;
438 checklist.my_addr.SetNoAddr();
439 int result = checklist.fastCheck();
440 return result;
441 }
442
443 char const *
444 icpGetUrlToSend(char *url)
445 {
446 if (strpbrk(url, w_space))
447 return rfc1738_escape(url);
448 else
449 return url;
450 }
451
452 HttpRequest *
453 icpGetRequest(char *url, int reqnum, int fd, IpAddress &from)
454 {
455 if (strpbrk(url, w_space)) {
456 url = rfc1738_escape(url);
457 icpCreateAndSend(ICP_ERR, 0, rfc1738_escape(url), reqnum, 0, fd, from);
458 return NULL;
459 }
460
461 HttpRequest *result;
462
463 if ((result = HttpRequest::CreateFromUrl(url)) == NULL)
464 icpCreateAndSend(ICP_ERR, 0, url, reqnum, 0, fd, from);
465
466 return result;
467
468 }
469
470 static void
471 doV2Query(int fd, IpAddress &from, char *buf, icp_common_t header)
472 {
473 int rtt = 0;
474 int src_rtt = 0;
475 u_int32_t flags = 0;
476 /* We have a valid packet */
477 char *url = buf + sizeof(icp_common_t) + sizeof(u_int32_t);
478 HttpRequest *icp_request = icpGetRequest(url, header.reqnum, fd, from);
479
480 if (!icp_request)
481 return;
482
483 HTTPMSGLOCK(icp_request);
484
485 if (!icpAccessAllowed(from, icp_request)) {
486 icpDenyAccess(from, url, header.reqnum, fd);
487 HTTPMSGUNLOCK(icp_request);
488 return;
489 }
490 #if USE_ICMP
491 if (header.flags & ICP_FLAG_SRC_RTT) {
492 rtt = netdbHostRtt(icp_request->GetHost());
493 int hops = netdbHostHops(icp_request->GetHost());
494 src_rtt = ((hops & 0xFFFF) << 16) | (rtt & 0xFFFF);
495
496 if (rtt)
497 flags |= ICP_FLAG_SRC_RTT;
498 }
499 #endif /* USE_ICMP */
500
501 /* The peer is allowed to use this cache */
502 ICP2State *state = new ICP2State (header, icp_request);
503
504 state->fd = fd;
505
506 state->from = from;
507
508 state->url = xstrdup (url);
509
510 state->flags = flags;
511
512 state->rtt = rtt;
513
514 state->src_rtt = src_rtt;
515
516 StoreEntry::getPublic (state, url, METHOD_GET);
517
518 HTTPMSGUNLOCK(icp_request);
519 }
520
521 void
522 _icp_common_t::handleReply(char *buf, IpAddress &from)
523 {
524 if (neighbors_do_private_keys && reqnum == 0) {
525 debugs(12, 0, "icpHandleIcpV2: Neighbor " << from << " returned reqnum = 0");
526 debugs(12, 0, "icpHandleIcpV2: Disabling use of private keys");
527 neighbors_do_private_keys = 0;
528 }
529
530 char *url = buf + sizeof(icp_common_t);
531 debugs(12, 3, "icpHandleIcpV2: " << icp_opcode_str[opcode] << " from " << from << " for '" << url << "'");
532
533 const cache_key *key = icpGetCacheKey(url, (int) reqnum);
534 /* call neighborsUdpAck even if ping_status != PING_WAITING */
535 neighborsUdpAck(key, this, from);
536 }
537
538 static void
539 icpHandleIcpV2(int fd, IpAddress &from, char *buf, int len)
540 {
541 if (len <= 0) {
542 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
543 return;
544 }
545
546 icp_common_t header(buf, len);
547 /*
548 * Length field should match the number of bytes read
549 */
550
551 if (len != header.length) {
552 debugs(12, 3, "icpHandleIcpV2: ICP message is too small");
553 return;
554 }
555
556 switch (header.opcode) {
557
558 case ICP_QUERY:
559 /* We have a valid packet */
560 doV2Query(fd, from, buf, header);
561 break;
562
563 case ICP_HIT:
564
565 case ICP_DECHO:
566
567 case ICP_MISS:
568
569 case ICP_DENIED:
570
571 case ICP_MISS_NOFETCH:
572 header.handleReply(buf, from);
573 break;
574
575 case ICP_INVALID:
576
577 case ICP_ERR:
578 break;
579
580 default:
581 debugs(12, 0, "icpHandleIcpV2: UNKNOWN OPCODE: " << header.opcode << " from " << from);
582
583 break;
584 }
585 }
586
587 #ifdef ICP_PKT_DUMP
588 static void
589 icpPktDump(icp_common_t * pkt)
590 {
591
592 IpAddress a;
593
594 debugs(12, 9, "opcode: " << std::setw(3) << pkt->opcode << " " << icp_opcode_str[pkt->opcode]);
595 debugs(12, 9, "version: "<< std::left << std::setw(8) << pkt->version);
596 debugs(12, 9, "length: "<< std::left << std::setw(8) << ntohs(pkt->length));
597 debugs(12, 9, "reqnum: "<< std::left << std::setw(8) << ntohl(pkt->reqnum));
598 debugs(12, 9, "flags: "<< std::left << std::hex << std::setw(8) << ntohl(pkt->flags));
599 a = (struct in_addr)pkt->shostid;
600 debugs(12, 9, "shostid: " << a );
601 debugs(12, 9, "payload: " << (char *) pkt + sizeof(icp_common_t));
602 }
603
604 #endif
605
606 void
607 icpHandleUdp(int sock, void *data)
608 {
609 int *N = &incoming_sockets_accepted;
610
611 IpAddress from;
612 LOCAL_ARRAY(char, buf, SQUID_UDP_SO_RCVBUF);
613 int len;
614 int icp_version;
615 int max = INCOMING_ICP_MAX;
616 commSetSelect(sock, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
617
618 while (max--) {
619 len = comm_udp_recvfrom(sock,
620 buf,
621 SQUID_UDP_SO_RCVBUF - 1,
622 0,
623 from);
624
625 if (len == 0)
626 break;
627
628 if (len < 0) {
629 if (ignoreErrno(errno))
630 break;
631
632 #ifdef _SQUID_LINUX_
633 /* Some Linux systems seem to set the FD for reading and then
634 * return ECONNREFUSED when sendto() fails and generates an ICMP
635 * port unreachable message. */
636 /* or maybe an EHOSTUNREACH "No route to host" message */
637 if (errno != ECONNREFUSED && errno != EHOSTUNREACH)
638 #endif
639
640 debugs(50, 1, "icpHandleUdp: FD " << sock << " recvfrom: " << xstrerror());
641
642 break;
643 }
644
645 (*N)++;
646 icpCount(buf, RECV, (size_t) len, 0);
647 buf[len] = '\0';
648 debugs(12, 4, "icpHandleUdp: FD " << sock << ": received " <<
649 (unsigned long int)len << " bytes from " << from);
650
651 #ifdef ICP_PACKET_DUMP
652
653 icpPktDump(buf);
654 #endif
655
656 if ((size_t) len < sizeof(icp_common_t)) {
657 debugs(12, 4, "icpHandleUdp: Ignoring too-small UDP packet");
658 break;
659 }
660
661 icp_version = (int) buf[1]; /* cheat! */
662
663 if (icp_version == ICP_VERSION_2)
664 icpHandleIcpV2(sock, from, buf, len);
665 else if (icp_version == ICP_VERSION_3)
666 icpHandleIcpV3(sock, from, buf, len);
667 else
668 debugs(12, 1, "WARNING: Unused ICP version " << icp_version <<
669 " received from " << from);
670 }
671 }
672
673 void
674 icpConnectionsOpen(void)
675 {
676 u_int16_t port;
677
678 IpAddress addr;
679
680 struct addrinfo *xai = NULL;
681 int x;
682
683 if ((port = Config.Port.icp) <= 0)
684 return;
685
686 addr = Config.Addrs.udp_incoming;
687 addr.SetPort(port);
688
689 AsyncCall::Pointer call = asyncCall(12, 2,
690 "icpIncomingConnectionOpened",
691 IcpListeningStartedDialer(&icpIncomingConnectionOpened, addr));
692
693 Ipc::StartListening(SOCK_DGRAM,
694 IPPROTO_UDP,
695 addr,
696 COMM_NONBLOCKING,
697 Ipc::fdnInIcpSocket, call);
698
699 addr.SetEmpty(); // clear for next use.
700 addr = Config.Addrs.udp_outgoing;
701 if ( !addr.IsNoAddr() ) {
702 enter_suid();
703 addr.SetPort(port);
704 theOutIcpConnection = comm_open_listener(SOCK_DGRAM,
705 IPPROTO_UDP,
706 addr,
707 COMM_NONBLOCKING,
708 "ICP Port");
709 leave_suid();
710
711 if (theOutIcpConnection < 0)
712 fatal("Cannot open Outgoing ICP Port");
713
714 commSetSelect(theOutIcpConnection,
715 COMM_SELECT_READ,
716 icpHandleUdp,
717 NULL,
718 0);
719
720 debugs(12, 1, "Outgoing ICP messages on port " << addr.GetPort() << ", FD " << theOutIcpConnection << ".");
721
722 fd_note(theOutIcpConnection, "Outgoing ICP socket");
723 }
724
725 theOutICPAddr.SetEmpty();
726
727 theOutICPAddr.InitAddrInfo(xai);
728
729 x = getsockname(theOutIcpConnection, xai->ai_addr, &xai->ai_addrlen);
730
731 if (x < 0)
732 debugs(50, 1, "theOutIcpConnection FD " << theOutIcpConnection << ": getsockname: " << xstrerror());
733 else
734 theOutICPAddr = *xai;
735
736 theOutICPAddr.FreeAddrInfo(xai);
737 }
738
739 static void
740 icpIncomingConnectionOpened(int fd, int errNo, IpAddress& addr)
741 {
742 theInIcpConnection = fd;
743
744 if (theInIcpConnection < 0)
745 fatal("Cannot open ICP Port");
746
747 commSetSelect(theInIcpConnection,
748 COMM_SELECT_READ,
749 icpHandleUdp,
750 NULL,
751 0);
752
753 for (const wordlist *s = Config.mcast_group_list; s; s = s->next)
754 ipcache_nbgethostbyname(s->key, mcastJoinGroups, NULL);
755
756 debugs(12, 1, "Accepting ICP messages at " << addr << ", FD " << theInIcpConnection << ".");
757
758 fd_note(theInIcpConnection, "Incoming ICP socket");
759
760 if (Config.Addrs.udp_outgoing.IsNoAddr())
761 theOutIcpConnection = theInIcpConnection;
762 }
763
764 /**
765 * icpConnectionShutdown only closes the 'in' socket if it is
766 * different than the 'out' socket.
767 */
768 void
769 icpConnectionShutdown(void)
770 {
771 if (theInIcpConnection < 0)
772 return;
773
774 if (theInIcpConnection != theOutIcpConnection) {
775 debugs(12, 1, "FD " << theInIcpConnection << " Closing ICP connection");
776 comm_close(theInIcpConnection);
777 }
778
779 /**
780 * Here we set 'theInIcpConnection' to -1 even though the ICP 'in'
781 * and 'out' sockets might be just one FD. This prevents this
782 * function from executing repeatedly. When we are really ready to
783 * exit or restart, main will comm_close the 'out' descriptor.
784 */
785 theInIcpConnection = -1;
786
787 /**
788 * Normally we only write to the outgoing ICP socket, but
789 * we also have a read handler there to catch messages sent
790 * to that specific interface. During shutdown, we must
791 * disable reading on the outgoing socket.
792 */
793 assert(theOutIcpConnection > -1);
794
795 commSetSelect(theOutIcpConnection, COMM_SELECT_READ, NULL, NULL, 0);
796 }
797
798 void
799 icpConnectionClose(void)
800 {
801 icpConnectionShutdown();
802
803 if (theOutIcpConnection > -1) {
804 debugs(12, 1, "FD " << theOutIcpConnection << " Closing ICP connection");
805 comm_close(theOutIcpConnection);
806 theOutIcpConnection = -1;
807 }
808 }
809
810 static void
811 icpCount(void *buf, int which, size_t len, int delay)
812 {
813 icp_common_t *icp = (icp_common_t *) buf;
814
815 if (len < sizeof(*icp))
816 return;
817
818 if (SENT == which) {
819 statCounter.icp.pkts_sent++;
820 kb_incr(&statCounter.icp.kbytes_sent, len);
821
822 if (ICP_QUERY == icp->opcode) {
823 statCounter.icp.queries_sent++;
824 kb_incr(&statCounter.icp.q_kbytes_sent, len);
825 } else {
826 statCounter.icp.replies_sent++;
827 kb_incr(&statCounter.icp.r_kbytes_sent, len);
828 /* this is the sent-reply service time */
829 statHistCount(&statCounter.icp.reply_svc_time, delay);
830 }
831
832 if (ICP_HIT == icp->opcode)
833 statCounter.icp.hits_sent++;
834 } else if (RECV == which) {
835 statCounter.icp.pkts_recv++;
836 kb_incr(&statCounter.icp.kbytes_recv, len);
837
838 if (ICP_QUERY == icp->opcode) {
839 statCounter.icp.queries_recv++;
840 kb_incr(&statCounter.icp.q_kbytes_recv, len);
841 } else {
842 statCounter.icp.replies_recv++;
843 kb_incr(&statCounter.icp.r_kbytes_recv, len);
844 /* statCounter.icp.query_svc_time set in clientUpdateCounters */
845 }
846
847 if (ICP_HIT == icp->opcode)
848 statCounter.icp.hits_recv++;
849 }
850 }
851
852 #define N_QUERIED_KEYS 8192
853 #define N_QUERIED_KEYS_MASK 8191
854 static cache_key queried_keys[N_QUERIED_KEYS][SQUID_MD5_DIGEST_LENGTH];
855
856 int
857 icpSetCacheKey(const cache_key * key)
858 {
859 static int reqnum = 0;
860
861 if (++reqnum < 0)
862 reqnum = 1;
863
864 storeKeyCopy(queried_keys[reqnum & N_QUERIED_KEYS_MASK], key);
865
866 return reqnum;
867 }
868
869 const cache_key *
870 icpGetCacheKey(const char *url, int reqnum)
871 {
872 if (neighbors_do_private_keys && reqnum)
873 return queried_keys[reqnum & N_QUERIED_KEYS_MASK];
874
875 return storeKeyPublic(url, METHOD_GET);
876 }