]>
Commit | Line | Data |
---|---|---|
10cefb7b | 1 | /* |
2 | * $Id$ | |
3 | * | |
4 | * DEBUG: section 54 Interprocess Communication | |
5 | * | |
6 | */ | |
7 | ||
8 | ||
9 | #include "config.h" | |
0d0bce6a | 10 | #include "comm.h" |
10cefb7b | 11 | #include "ipc/Coordinator.h" |
0d0bce6a AR |
12 | #include "ipc/FdNotes.h" |
13 | #include "ipc/SharedListen.h" | |
10cefb7b | 14 | |
15 | ||
16 | CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator); | |
a2c48c98 | 17 | Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL; |
10cefb7b | 18 | |
19 | ||
20 | Ipc::Coordinator::Coordinator(): | |
5667a628 | 21 | Port(coordinatorAddr) |
10cefb7b | 22 | { |
23 | } | |
24 | ||
25 | void Ipc::Coordinator::start() | |
26 | { | |
ba568924 | 27 | Port::start(); |
10cefb7b | 28 | } |
29 | ||
1bac0258 | 30 | Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId) |
10cefb7b | 31 | { |
1bac0258 AR |
32 | typedef Strands::iterator SI; |
33 | for (SI iter = strands.begin(); iter != strands.end(); ++iter) { | |
10cefb7b | 34 | if (iter->kidId == kidId) |
35 | return &(*iter); | |
36 | } | |
37 | return NULL; | |
38 | } | |
39 | ||
1bac0258 | 40 | void Ipc::Coordinator::registerStrand(const StrandCoord& strand) |
10cefb7b | 41 | { |
1bac0258 | 42 | if (StrandCoord* found = findStrand(strand.kidId)) |
10cefb7b | 43 | *found = strand; |
44 | else | |
45 | strands.push_back(strand); | |
46 | } | |
47 | ||
1bac0258 | 48 | void Ipc::Coordinator::receive(const TypedMsgHdr& message) |
10cefb7b | 49 | { |
50 | switch (message.type()) { | |
ba568924 | 51 | case mtRegistration: |
10cefb7b | 52 | debugs(54, 6, HERE << "Registration request"); |
1bac0258 | 53 | handleRegistrationRequest(StrandCoord(message)); |
10cefb7b | 54 | break; |
55 | ||
0d0bce6a AR |
56 | case mtSharedListenRequest: |
57 | debugs(54, 6, HERE << "Shared listen request"); | |
58 | handleSharedListenRequest(SharedListenRequest(message)); | |
59 | break; | |
60 | ||
10cefb7b | 61 | default: |
7230e9c7 | 62 | debugs(54, 1, HERE << "Unhandled message type: " << message.type()); |
10cefb7b | 63 | break; |
64 | } | |
65 | } | |
66 | ||
1bac0258 | 67 | void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand) |
10cefb7b | 68 | { |
ba568924 AR |
69 | registerStrand(strand); |
70 | ||
71 | // send back an acknowledgement; TODO: remove as not needed? | |
1bac0258 AR |
72 | TypedMsgHdr message; |
73 | strand.pack(message); | |
74 | SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message); | |
10cefb7b | 75 | } |
a2c48c98 | 76 | |
0d0bce6a AR |
77 | void |
78 | Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request) | |
79 | { | |
80 | debugs(54, 4, HERE << "kid" << request.requestorId << | |
5667a628 | 81 | " needs shared listen FD for " << request.params.addr); |
0d0bce6a AR |
82 | Listeners::const_iterator i = listeners.find(request.params); |
83 | int errNo = 0; | |
84 | const int sock = (i != listeners.end()) ? | |
5667a628 | 85 | i->second : openListenSocket(request, errNo); |
0d0bce6a AR |
86 | |
87 | debugs(54, 3, HERE << "sending shared listen FD " << sock << " for " << | |
5667a628 AR |
88 | request.params.addr << " to kid" << request.requestorId << |
89 | " mapId=" << request.mapId); | |
0d0bce6a AR |
90 | |
91 | SharedListenResponse response(sock, errNo, request.mapId); | |
92 | TypedMsgHdr message; | |
93 | response.pack(message); | |
94 | SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); | |
95 | } | |
96 | ||
97 | int | |
98 | Ipc::Coordinator::openListenSocket(const SharedListenRequest& request, | |
5667a628 | 99 | int &errNo) |
0d0bce6a AR |
100 | { |
101 | const OpenListenerParams &p = request.params; | |
102 | ||
103 | debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" << | |
5667a628 | 104 | request.requestorId); |
0d0bce6a | 105 | |
a67d2b2e | 106 | Ip::Address addr = p.addr; // comm_open_listener may modify it |
0d0bce6a AR |
107 | |
108 | enter_suid(); | |
109 | const int sock = comm_open_listener(p.sock_type, p.proto, addr, p.flags, | |
5667a628 | 110 | FdNote(p.fdNote)); |
0d0bce6a AR |
111 | errNo = (sock >= 0) ? 0 : errno; |
112 | leave_suid(); | |
113 | ||
114 | // cache positive results | |
115 | if (sock >= 0) | |
116 | listeners[request.params] = sock; | |
117 | ||
118 | return sock; | |
119 | } | |
120 | ||
a2c48c98 AR |
121 | void Ipc::Coordinator::broadcastSignal(int sig) const |
122 | { | |
1bac0258 AR |
123 | typedef Strands::const_iterator SCI; |
124 | for (SCI iter = strands.begin(); iter != strands.end(); ++iter) { | |
a2c48c98 | 125 | debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId << |
5667a628 | 126 | ", PID=" << iter->pid); |
a2c48c98 AR |
127 | kill(iter->pid, sig); |
128 | } | |
129 | } | |
130 | ||
131 | Ipc::Coordinator* Ipc::Coordinator::Instance() | |
132 | { | |
133 | if (!TheInstance) | |
134 | TheInstance = new Coordinator; | |
135 | // XXX: if the Coordinator job quits, this pointer will become invalid | |
136 | // we could make Coordinator death fatal, except during exit, but since | |
137 | // Strands do not re-register, even process death would be pointless. | |
138 | return TheInstance; | |
139 | } |