]>
Commit | Line | Data |
---|---|---|
10cefb7b | 1 | /* |
2 | * $Id$ | |
3 | * | |
4 | * DEBUG: section 54 Interprocess Communication | |
5 | * | |
6 | */ | |
7 | ||
8 | ||
9 | #include "config.h" | |
10 | #include "ipc/Coordinator.h" | |
11 | ||
12 | ||
13 | CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator); | |
a2c48c98 | 14 | Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL; |
10cefb7b | 15 | |
16 | ||
17 | Ipc::Coordinator::Coordinator(): | |
ba568924 | 18 | Port(coordinatorAddr) |
10cefb7b | 19 | { |
20 | } | |
21 | ||
22 | void Ipc::Coordinator::start() | |
23 | { | |
ba568924 | 24 | Port::start(); |
10cefb7b | 25 | } |
26 | ||
1bac0258 | 27 | Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId) |
10cefb7b | 28 | { |
1bac0258 AR |
29 | typedef Strands::iterator SI; |
30 | for (SI iter = strands.begin(); iter != strands.end(); ++iter) { | |
10cefb7b | 31 | if (iter->kidId == kidId) |
32 | return &(*iter); | |
33 | } | |
34 | return NULL; | |
35 | } | |
36 | ||
1bac0258 | 37 | void Ipc::Coordinator::registerStrand(const StrandCoord& strand) |
10cefb7b | 38 | { |
1bac0258 | 39 | if (StrandCoord* found = findStrand(strand.kidId)) |
10cefb7b | 40 | *found = strand; |
41 | else | |
42 | strands.push_back(strand); | |
43 | } | |
44 | ||
1bac0258 | 45 | void Ipc::Coordinator::receive(const TypedMsgHdr& message) |
10cefb7b | 46 | { |
47 | switch (message.type()) { | |
ba568924 | 48 | case mtRegistration: |
10cefb7b | 49 | debugs(54, 6, HERE << "Registration request"); |
1bac0258 | 50 | handleRegistrationRequest(StrandCoord(message)); |
10cefb7b | 51 | break; |
52 | ||
7230e9c7 AR |
53 | case mtDescriptorGet: |
54 | debugs(54, 6, HERE << "Descriptor get request"); | |
55 | handleDescriptorGet(Descriptor(message)); | |
56 | break; | |
57 | ||
10cefb7b | 58 | default: |
7230e9c7 | 59 | debugs(54, 1, HERE << "Unhandled message type: " << message.type()); |
10cefb7b | 60 | break; |
61 | } | |
62 | } | |
63 | ||
1bac0258 | 64 | void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand) |
10cefb7b | 65 | { |
ba568924 AR |
66 | registerStrand(strand); |
67 | ||
68 | // send back an acknowledgement; TODO: remove as not needed? | |
1bac0258 AR |
69 | TypedMsgHdr message; |
70 | strand.pack(message); | |
71 | SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message); | |
10cefb7b | 72 | } |
a2c48c98 | 73 | |
7230e9c7 AR |
74 | void Ipc::Coordinator::handleDescriptorGet(const Descriptor& request) |
75 | { | |
76 | // XXX: hack: create descriptor here | |
77 | char buffer[64]; | |
78 | snprintf(buffer, sizeof(buffer), "/tmp/squid_shared_file.txt"); | |
79 | static int fd = -1; | |
80 | if (fd < 0) { | |
81 | fd = open(buffer, O_CREAT | O_RDWR | O_APPEND, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); | |
82 | int n = snprintf(buffer, sizeof(buffer), "coord: created %d\n", fd); | |
83 | ssize_t bytes = write(fd, buffer, n); | |
84 | Must(bytes == n); | |
85 | debugs(54, 6, "Created FD " << fd << " for kid" << request.fromKid); | |
86 | } else { | |
87 | int n = snprintf(buffer, sizeof(buffer), "coord: updated %d\n", fd); | |
88 | ssize_t bytes = write(fd, buffer, n); | |
89 | Must(bytes == n); | |
90 | } | |
91 | ||
92 | debugs(54, 6, "Sending FD " << fd << " to kid" << request.fromKid); | |
93 | ||
94 | Descriptor response(-1, fd); | |
95 | TypedMsgHdr message; | |
96 | response.pack(message); | |
97 | SendMessage(MakeAddr(strandAddrPfx, request.fromKid), message); | |
98 | ||
99 | // XXX: close(fd); fd should be opened until the message has not reached rec iver | |
100 | } | |
101 | ||
a2c48c98 AR |
102 | void Ipc::Coordinator::broadcastSignal(int sig) const |
103 | { | |
1bac0258 AR |
104 | typedef Strands::const_iterator SCI; |
105 | for (SCI iter = strands.begin(); iter != strands.end(); ++iter) { | |
a2c48c98 AR |
106 | debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId << |
107 | ", PID=" << iter->pid); | |
108 | kill(iter->pid, sig); | |
109 | } | |
110 | } | |
111 | ||
112 | Ipc::Coordinator* Ipc::Coordinator::Instance() | |
113 | { | |
114 | if (!TheInstance) | |
115 | TheInstance = new Coordinator; | |
116 | // XXX: if the Coordinator job quits, this pointer will become invalid | |
117 | // we could make Coordinator death fatal, except during exit, but since | |
118 | // Strands do not re-register, even process death would be pointless. | |
119 | return TheInstance; | |
120 | } |