]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Coordinator.cc
4 * DEBUG: section 54 Interprocess Communication
11 #include "ipc/Coordinator.h"
12 #include "ipc/FdNotes.h"
13 #include "ipc/SharedListen.h"
16 CBDATA_NAMESPACED_CLASS_INIT(Ipc
, Coordinator
);
17 Ipc::Coordinator
* Ipc::Coordinator::TheInstance
= NULL
;
20 Ipc::Coordinator::Coordinator():
25 void Ipc::Coordinator::start()
30 Ipc::StrandCoord
* Ipc::Coordinator::findStrand(int kidId
)
32 typedef Strands::iterator SI
;
33 for (SI iter
= strands
.begin(); iter
!= strands
.end(); ++iter
) {
34 if (iter
->kidId
== kidId
)
40 void Ipc::Coordinator::registerStrand(const StrandCoord
& strand
)
42 if (StrandCoord
* found
= findStrand(strand
.kidId
))
45 strands
.push_back(strand
);
48 void Ipc::Coordinator::receive(const TypedMsgHdr
& message
)
50 switch (message
.type()) {
52 debugs(54, 6, HERE
<< "Registration request");
53 handleRegistrationRequest(StrandCoord(message
));
56 case mtSharedListenRequest
:
57 debugs(54, 6, HERE
<< "Shared listen request");
58 handleSharedListenRequest(SharedListenRequest(message
));
62 debugs(54, 1, HERE
<< "Unhandled message type: " << message
.type());
67 void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord
& strand
)
69 registerStrand(strand
);
71 // send back an acknowledgement; TODO: remove as not needed?
74 SendMessage(MakeAddr(strandAddrPfx
, strand
.kidId
), message
);
78 Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest
& request
)
80 debugs(54, 4, HERE
<< "kid" << request
.requestorId
<<
81 " needs shared listen FD for " << request
.params
.addr
);
82 Listeners::const_iterator i
= listeners
.find(request
.params
);
84 const int sock
= (i
!= listeners
.end()) ?
85 i
->second
: openListenSocket(request
, errNo
);
87 debugs(54, 3, HERE
<< "sending shared listen FD " << sock
<< " for " <<
88 request
.params
.addr
<< " to kid" << request
.requestorId
<<
89 " mapId=" << request
.mapId
);
91 SharedListenResponse
response(sock
, errNo
, request
.mapId
);
93 response
.pack(message
);
94 SendMessage(MakeAddr(strandAddrPfx
, request
.requestorId
), message
);
98 Ipc::Coordinator::openListenSocket(const SharedListenRequest
& request
,
101 const OpenListenerParams
&p
= request
.params
;
103 debugs(54, 6, HERE
<< "opening listen FD at " << p
.addr
<< " for kid" <<
104 request
.requestorId
);
106 IpAddress addr
= p
.addr
; // comm_open_listener may modify it
109 const int sock
= comm_open_listener(p
.sock_type
, p
.proto
, addr
, p
.flags
,
111 errNo
= (sock
>= 0) ? 0 : errno
;
114 // cache positive results
116 listeners
[request
.params
] = sock
;
121 void Ipc::Coordinator::broadcastSignal(int sig
) const
123 typedef Strands::const_iterator SCI
;
124 for (SCI iter
= strands
.begin(); iter
!= strands
.end(); ++iter
) {
125 debugs(54, 5, HERE
<< "signal " << sig
<< " to kid" << iter
->kidId
<<
126 ", PID=" << iter
->pid
);
127 kill(iter
->pid
, sig
);
131 Ipc::Coordinator
* Ipc::Coordinator::Instance()
134 TheInstance
= new Coordinator
;
135 // XXX: if the Coordinator job quits, this pointer will become invalid
136 // we could make Coordinator death fatal, except during exit, but since
137 // Strands do not re-register, even process death would be pointless.