]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Strand.cc
Merge 3p2-rock.
[thirdparty/squid.git] / src / ipc / Strand.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8 #include "config.h"
9 #include "base/TextException.h"
10 #include "ipc/Strand.h"
11 #include "ipc/StrandCoord.h"
12 #include "ipc/Messages.h"
13 #include "ipc/SharedListen.h"
14 #include "ipc/Kids.h"
15 #include "mgr/Request.h"
16 #include "mgr/Response.h"
17 #include "mgr/Forwarder.h"
18 #include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: scope boundary violation */
19 #include "SwapDir.h" /* XXX: scope boundary violation */
20 #include "CacheManager.h"
21
22
23 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand);
24
25
26 Ipc::Strand::Strand():
27 Port(MakeAddr(strandAddrPfx, KidIdentifier)),
28 isRegistered(false)
29 {
30 }
31
32 void Ipc::Strand::start()
33 {
34 Port::start();
35 registerSelf();
36 }
37
38 void Ipc::Strand::registerSelf()
39 {
40 debugs(54, 6, HERE);
41 Must(!isRegistered);
42
43 HereIamMessage ann(StrandCoord(KidIdentifier, getpid()));
44
45 // announce that we are responsible for our cache_dir if needed
46 // XXX: misplaced
47 if (IamDiskProcess()) {
48 const int myDisk = KidIdentifier % Config.cacheSwap.n_processes;
49 if (const SwapDir *sd = dynamic_cast<const SwapDir*>(INDEXSD(myDisk))) {
50 ann.strand.tag = sd->path;
51 ann.strand.tag.append("/rock"); // XXX: scope boundary violation
52 }
53 }
54
55 TypedMsgHdr message;
56 ann.pack(message);
57 SendMessage(coordinatorAddr, message);
58 setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
59 }
60
61 void Ipc::Strand::receive(const TypedMsgHdr &message)
62 {
63 debugs(54, 6, HERE << message.type());
64 switch (message.type()) {
65
66 case mtRegistration:
67 handleRegistrationResponse(HereIamMessage(message));
68 break;
69
70 case mtSharedListenResponse:
71 SharedListenJoined(SharedListenResponse(message));
72 break;
73
74 case mtIpcIoRequest:
75 IpcIoFile::HandleRequest(IpcIoRequest(message));
76 break;
77
78 case mtIpcIoResponse:
79 IpcIoFile::HandleResponse(message);
80 break;
81
82 case mtCacheMgrRequest:
83 handleCacheMgrRequest(Mgr::Request(message));
84 break;
85
86 case mtCacheMgrResponse:
87 handleCacheMgrResponse(Mgr::Response(message));
88 break;
89
90 default:
91 debugs(54, 1, HERE << "Unhandled message type: " << message.type());
92 break;
93 }
94 }
95
96 void Ipc::Strand::handleRegistrationResponse(const HereIamMessage &msg)
97 {
98 // handle registration response from the coordinator; it could be stale
99 if (msg.strand.kidId == KidIdentifier && msg.strand.pid == getpid()) {
100 debugs(54, 6, "kid" << KidIdentifier << " registered");
101 clearTimeout(); // we are done
102 } else {
103 // could be an ACK to the registration message of our dead predecessor
104 debugs(54, 6, "kid" << KidIdentifier << " is not yet registered");
105 // keep listening, with a timeout
106 }
107 }
108
109 void Ipc::Strand::handleCacheMgrRequest(const Mgr::Request& request)
110 {
111 Mgr::Action::Pointer action =
112 CacheManager::GetInstance()->createRequestedAction(request.params);
113 action->respond(request);
114 }
115
116 void Ipc::Strand::handleCacheMgrResponse(const Mgr::Response& response)
117 {
118 Mgr::Forwarder::HandleRemoteAck(response.requestId);
119 }
120
121 void Ipc::Strand::timedout()
122 {
123 debugs(54, 6, HERE << isRegistered);
124 if (!isRegistered)
125 fatalf("kid%d registration timed out", KidIdentifier);
126 }