]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Coordinator.cc
Merge from trunk
[thirdparty/squid.git] / src / ipc / Coordinator.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8
9 #include "config.h"
10 #include "base/Subscription.h"
11 #include "comm.h"
12 #include "comm/Connection.h"
13 #include "ipc/Coordinator.h"
14 #include "ipc/FdNotes.h"
15 #include "ipc/SharedListen.h"
16
17
18 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
19 Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
20
21
22 Ipc::Coordinator::Coordinator():
23 Port(coordinatorAddr)
24 {
25 }
26
27 void Ipc::Coordinator::start()
28 {
29 Port::start();
30 }
31
32 Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
33 {
34 typedef Strands::iterator SI;
35 for (SI iter = strands.begin(); iter != strands.end(); ++iter) {
36 if (iter->kidId == kidId)
37 return &(*iter);
38 }
39 return NULL;
40 }
41
42 void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
43 {
44 if (StrandCoord* found = findStrand(strand.kidId))
45 *found = strand;
46 else
47 strands.push_back(strand);
48 }
49
50 void Ipc::Coordinator::receive(const TypedMsgHdr& message)
51 {
52 switch (message.type()) {
53 case mtRegistration:
54 debugs(54, 6, HERE << "Registration request");
55 handleRegistrationRequest(StrandCoord(message));
56 break;
57
58 case mtSharedListenRequest:
59 debugs(54, 6, HERE << "Shared listen request");
60 handleSharedListenRequest(SharedListenRequest(message));
61 break;
62
63 default:
64 debugs(54, 1, HERE << "Unhandled message type: " << message.type());
65 break;
66 }
67 }
68
69 void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand)
70 {
71 registerStrand(strand);
72
73 // send back an acknowledgement; TODO: remove as not needed?
74 TypedMsgHdr message;
75 strand.pack(message);
76 SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message);
77 }
78
79 void
80 Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
81 {
82 debugs(54, 4, HERE << "kid" << request.requestorId <<
83 " needs shared listen FD for " << request.params.addr);
84 Listeners::const_iterator i = listeners.find(request.params);
85 int errNo = 0;
86 const Comm::ConnectionPointer c = (i != listeners.end()) ?
87 i->second : openListenSocket(request, errNo);
88
89 debugs(54, 3, HERE << "sending shared listen " << c << " for " <<
90 request.params.addr << " to kid" << request.requestorId <<
91 " mapId=" << request.mapId);
92
93 SharedListenResponse response(c, errNo, request.mapId);
94 TypedMsgHdr message;
95 response.pack(message);
96 SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
97 }
98
99 Comm::ConnectionPointer
100 Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
101 int &errNo)
102 {
103 const OpenListenerParams &p = request.params;
104
105 debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
106 request.requestorId);
107
108 Comm::ConnectionPointer conn = new Comm::Connection;
109 conn->local = p.addr; // comm_open_listener may modify it
110 conn->flags = p.flags;
111
112 enter_suid();
113 comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote));
114 errNo = Comm::IsConnOpen(conn) ? 0 : errno;
115 leave_suid();
116
117 debugs(54, 6, HERE << "tried listening on " << conn << " for kid" <<
118 request.requestorId);
119
120 // cache positive results
121 if (Comm::IsConnOpen(conn))
122 listeners[request.params] = conn;
123
124 return conn;
125 }
126
127 void Ipc::Coordinator::broadcastSignal(int sig) const
128 {
129 typedef Strands::const_iterator SCI;
130 for (SCI iter = strands.begin(); iter != strands.end(); ++iter) {
131 debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
132 ", PID=" << iter->pid);
133 kill(iter->pid, sig);
134 }
135 }
136
137 Ipc::Coordinator* Ipc::Coordinator::Instance()
138 {
139 if (!TheInstance)
140 TheInstance = new Coordinator;
141 // XXX: if the Coordinator job quits, this pointer will become invalid
142 // we could make Coordinator death fatal, except during exit, but since
143 // Strands do not re-register, even process death would be pointless.
144 return TheInstance;
145 }