]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Strand.cc
SMP Cache Manager, Phase2 implementation.
[thirdparty/squid.git] / src / ipc / Strand.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8 #include "config.h"
9 #include "base/TextException.h"
10 #include "ipc/Strand.h"
11 #include "ipc/StrandCoord.h"
12 #include "ipc/Messages.h"
13 #include "ipc/SharedListen.h"
14 #include "ipc/Kids.h"
15 #include "mgr/Request.h"
16 #include "mgr/Response.h"
17 #include "mgr/Forwarder.h"
18 #include "CacheManager.h"
19
20
21 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand);
22
23
24 Ipc::Strand::Strand():
25 Port(MakeAddr(strandAddrPfx, KidIdentifier)),
26 isRegistered(false)
27 {
28 }
29
30 void Ipc::Strand::start()
31 {
32 Port::start();
33 registerSelf();
34 }
35
36 void Ipc::Strand::registerSelf()
37 {
38 debugs(54, 6, HERE);
39 Must(!isRegistered);
40 TypedMsgHdr message;
41 StrandCoord(KidIdentifier, getpid()).pack(message);
42 SendMessage(coordinatorAddr, message);
43 setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
44 }
45
46 void Ipc::Strand::receive(const TypedMsgHdr &message)
47 {
48 debugs(54, 6, HERE << message.type());
49 switch (message.type()) {
50
51 case mtRegistration:
52 handleRegistrationResponse(StrandCoord(message));
53 break;
54
55 case mtSharedListenResponse:
56 SharedListenJoined(SharedListenResponse(message));
57 break;
58
59 case mtCacheMgrRequest:
60 handleCacheMgrRequest(Mgr::Request(message));
61 break;
62
63 case mtCacheMgrResponse:
64 handleCacheMgrResponse(Mgr::Response(message));
65 break;
66
67 default:
68 debugs(54, 1, HERE << "Unhandled message type: " << message.type());
69 break;
70 }
71 }
72
73 void Ipc::Strand::handleRegistrationResponse(const StrandCoord &strand)
74 {
75 // handle registration response from the coordinator; it could be stale
76 if (strand.kidId == KidIdentifier && strand.pid == getpid()) {
77 debugs(54, 6, "kid" << KidIdentifier << " registered");
78 clearTimeout(); // we are done
79 } else {
80 // could be an ACK to the registration message of our dead predecessor
81 debugs(54, 6, "kid" << KidIdentifier << " is not yet registered");
82 // keep listening, with a timeout
83 }
84 }
85
86 void Ipc::Strand::handleCacheMgrRequest(const Mgr::Request& request)
87 {
88 Mgr::Action::Pointer action =
89 CacheManager::GetInstance()->createRequestedAction(request.params);
90 action->respond(request);
91 }
92
93 void Ipc::Strand::handleCacheMgrResponse(const Mgr::Response& response)
94 {
95 Mgr::Forwarder::HandleRemoteAck(response.requestId);
96 }
97
98 void Ipc::Strand::timedout()
99 {
100 debugs(54, 6, HERE << isRegistered);
101 if (!isRegistered)
102 fatalf("kid%d registration timed out", KidIdentifier);
103 }