]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Strand.cc
Support a "shared listen" concept when multiple concurrent processes listen
[thirdparty/squid.git] / src / ipc / Strand.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8 #include "config.h"
9 #include "ipc/Strand.h"
10 #include "ipc/Messages.h"
11 #include "ipc/SharedListen.h"
12 #include "ipc/Kids.h"
13
14
15 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand);
16
17
18 Ipc::Strand::Strand():
19 Port(MakeAddr(strandAddrPfx, KidIdentifier)),
20 isRegistered(false)
21 {
22 }
23
24 void Ipc::Strand::start()
25 {
26 Port::start();
27 registerSelf();
28 }
29
30 void Ipc::Strand::registerSelf()
31 {
32 debugs(54, 6, HERE);
33 Must(!isRegistered);
34 TypedMsgHdr message;
35 StrandCoord(KidIdentifier, getpid()).pack(message);
36 SendMessage(coordinatorAddr, message);
37 setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
38 }
39
40 void Ipc::Strand::receive(const TypedMsgHdr &message)
41 {
42 debugs(54, 6, HERE << message.type());
43 switch (message.type()) {
44
45 case mtRegistration:
46 handleRegistrationResponse(StrandCoord(message));
47 break;
48
49 case mtSharedListenResponse:
50 SharedListenJoined(SharedListenResponse(message));
51 break;
52
53 case mtDescriptorPut:
54 putDescriptor(Descriptor(message));
55 break;
56
57 default:
58 debugs(54, 1, HERE << "Unhandled message type: " << message.type());
59 break;
60 }
61 }
62
63 void Ipc::Strand::handleRegistrationResponse(const StrandCoord &strand)
64 {
65 // handle registration response from the coordinator; it could be stale
66 if (strand.kidId == KidIdentifier && strand.pid == getpid()) {
67 debugs(54, 6, "kid" << KidIdentifier << " registered");
68 clearTimeout(); // we are done
69
70 debugs(54, 6, HERE << "requesting FD");
71 Descriptor request(KidIdentifier, -1);
72 TypedMsgHdr message;
73 request.pack(message);
74 SendMessage(coordinatorAddr, message);
75 } else {
76 // could be an ACK to the registration message of our dead predecessor
77 debugs(54, 6, "kid" << KidIdentifier << " is not yet registered");
78 // keep listening, with a timeout
79 }
80 }
81
82 /// receive descriptor we asked for
83 void Ipc::Strand::putDescriptor(const Descriptor &message)
84 {
85 debugs(54, 6, HERE << "got FD " << message.fd);
86 char buffer[64];
87 const int n = snprintf(buffer, sizeof(buffer), "strand: kid%d wrote using FD %d\n", KidIdentifier, message.fd);
88 ssize_t bytes = write(message.fd, buffer, n);
89 Must(bytes == n);
90 }
91
92 void Ipc::Strand::timedout()
93 {
94 debugs(54, 6, HERE << isRegistered);
95 if (!isRegistered)
96 fatalf("kid%d registration timed out", KidIdentifier);
97 }