]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Coordinator.cc
Removed Descriptor-related code used for testing fd passing via UDS messages.
[thirdparty/squid.git] / src / ipc / Coordinator.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8
9 #include "config.h"
10 #include "comm.h"
11 #include "ipc/Coordinator.h"
12 #include "ipc/FdNotes.h"
13 #include "ipc/SharedListen.h"
14
15
16 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
17 Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
18
19
20 Ipc::Coordinator::Coordinator():
21 Port(coordinatorAddr)
22 {
23 }
24
25 void Ipc::Coordinator::start()
26 {
27 Port::start();
28 }
29
30 Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
31 {
32 typedef Strands::iterator SI;
33 for (SI iter = strands.begin(); iter != strands.end(); ++iter) {
34 if (iter->kidId == kidId)
35 return &(*iter);
36 }
37 return NULL;
38 }
39
40 void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
41 {
42 if (StrandCoord* found = findStrand(strand.kidId))
43 *found = strand;
44 else
45 strands.push_back(strand);
46 }
47
48 void Ipc::Coordinator::receive(const TypedMsgHdr& message)
49 {
50 switch (message.type()) {
51 case mtRegistration:
52 debugs(54, 6, HERE << "Registration request");
53 handleRegistrationRequest(StrandCoord(message));
54 break;
55
56 case mtSharedListenRequest:
57 debugs(54, 6, HERE << "Shared listen request");
58 handleSharedListenRequest(SharedListenRequest(message));
59 break;
60
61 default:
62 debugs(54, 1, HERE << "Unhandled message type: " << message.type());
63 break;
64 }
65 }
66
67 void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand)
68 {
69 registerStrand(strand);
70
71 // send back an acknowledgement; TODO: remove as not needed?
72 TypedMsgHdr message;
73 strand.pack(message);
74 SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message);
75 }
76
77 void
78 Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
79 {
80 debugs(54, 4, HERE << "kid" << request.requestorId <<
81 " needs shared listen FD for " << request.params.addr);
82 Listeners::const_iterator i = listeners.find(request.params);
83 int errNo = 0;
84 const int sock = (i != listeners.end()) ?
85 i->second : openListenSocket(request, errNo);
86
87 debugs(54, 3, HERE << "sending shared listen FD " << sock << " for " <<
88 request.params.addr << " to kid" << request.requestorId <<
89 " mapId=" << request.mapId);
90
91 SharedListenResponse response(sock, errNo, request.mapId);
92 TypedMsgHdr message;
93 response.pack(message);
94 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
95 }
96
97 int
98 Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
99 int &errNo)
100 {
101 const OpenListenerParams &p = request.params;
102
103 debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
104 request.requestorId);
105
106 IpAddress addr = p.addr; // comm_open_listener may modify it
107
108 enter_suid();
109 const int sock = comm_open_listener(p.sock_type, p.proto, addr, p.flags,
110 FdNote(p.fdNote));
111 errNo = (sock >= 0) ? 0 : errno;
112 leave_suid();
113
114 // cache positive results
115 if (sock >= 0)
116 listeners[request.params] = sock;
117
118 return sock;
119 }
120
121 void Ipc::Coordinator::broadcastSignal(int sig) const
122 {
123 typedef Strands::const_iterator SCI;
124 for (SCI iter = strands.begin(); iter != strands.end(); ++iter) {
125 debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
126 ", PID=" << iter->pid);
127 kill(iter->pid, sig);
128 }
129 }
130
131 Ipc::Coordinator* Ipc::Coordinator::Instance()
132 {
133 if (!TheInstance)
134 TheInstance = new Coordinator;
135 // XXX: if the Coordinator job quits, this pointer will become invalid
136 // we could make Coordinator death fatal, except during exit, but since
137 // Strands do not re-register, even process death would be pointless.
138 return TheInstance;
139 }