]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Strand.cc
Merge from trunk
[thirdparty/squid.git] / src / ipc / Strand.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8 #include "config.h"
9 #include "base/Subscription.h"
10 #include "base/TextException.h"
11 #include "comm/Connection.h"
12 #include "ipc/Strand.h"
13 #include "ipc/StrandCoord.h"
14 #include "ipc/Messages.h"
15 #include "ipc/SharedListen.h"
16 #include "ipc/Kids.h"
17 #include "mgr/Request.h"
18 #include "mgr/Response.h"
19 #include "mgr/Forwarder.h"
20 #include "CacheManager.h"
21
22 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand);
23
24
25 Ipc::Strand::Strand():
26 Port(MakeAddr(strandAddrPfx, KidIdentifier)),
27 isRegistered(false)
28 {
29 }
30
31 void Ipc::Strand::start()
32 {
33 Port::start();
34 registerSelf();
35 }
36
37 void Ipc::Strand::registerSelf()
38 {
39 debugs(54, 6, HERE);
40 Must(!isRegistered);
41 TypedMsgHdr message;
42 StrandCoord(KidIdentifier, getpid()).pack(message);
43 SendMessage(coordinatorAddr, message);
44 setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
45 }
46
47 void Ipc::Strand::receive(const TypedMsgHdr &message)
48 {
49 debugs(54, 6, HERE << message.type());
50 switch (message.type()) {
51
52 case mtRegistration:
53 handleRegistrationResponse(StrandCoord(message));
54 break;
55
56 case mtSharedListenResponse:
57 SharedListenJoined(SharedListenResponse(message));
58 break;
59
60 case mtCacheMgrRequest:
61 handleCacheMgrRequest(Mgr::Request(message));
62 break;
63
64 case mtCacheMgrResponse:
65 handleCacheMgrResponse(Mgr::Response(message));
66 break;
67
68 default:
69 debugs(54, 1, HERE << "Unhandled message type: " << message.type());
70 break;
71 }
72 }
73
74 void Ipc::Strand::handleRegistrationResponse(const StrandCoord &strand)
75 {
76 // handle registration response from the coordinator; it could be stale
77 if (strand.kidId == KidIdentifier && strand.pid == getpid()) {
78 debugs(54, 6, "kid" << KidIdentifier << " registered");
79 clearTimeout(); // we are done
80 } else {
81 // could be an ACK to the registration message of our dead predecessor
82 debugs(54, 6, "kid" << KidIdentifier << " is not yet registered");
83 // keep listening, with a timeout
84 }
85 }
86
87 void Ipc::Strand::handleCacheMgrRequest(const Mgr::Request& request)
88 {
89 Mgr::Action::Pointer action =
90 CacheManager::GetInstance()->createRequestedAction(request.params);
91 action->respond(request);
92 }
93
94 void Ipc::Strand::handleCacheMgrResponse(const Mgr::Response& response)
95 {
96 Mgr::Forwarder::HandleRemoteAck(response.requestId);
97 }
98
99 void Ipc::Strand::timedout()
100 {
101 debugs(54, 6, HERE << isRegistered);
102 if (!isRegistered)
103 fatalf("kid%d registration timed out", KidIdentifier);
104 }