]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Strand.cc
4 * DEBUG: section 54 Interprocess Communication
9 #include "base/TextException.h"
10 #include "ipc/Strand.h"
11 #include "ipc/StrandCoord.h"
12 #include "ipc/Messages.h"
13 #include "ipc/SharedListen.h"
15 #include "mgr/Request.h"
16 #include "mgr/Response.h"
17 #include "mgr/Forwarder.h"
18 #include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: scope boundary violation */
19 #include "SwapDir.h" /* XXX: scope boundary violation */
20 #include "CacheManager.h"
23 CBDATA_NAMESPACED_CLASS_INIT(Ipc
, Strand
);
26 Ipc::Strand::Strand():
27 Port(MakeAddr(strandAddrPfx
, KidIdentifier
)),
32 void Ipc::Strand::start()
38 void Ipc::Strand::registerSelf()
43 HereIamMessage
ann(StrandCoord(KidIdentifier
, getpid()));
45 // announce that we are responsible for our cache_dir if needed
47 if (IamDiskProcess()) {
48 const int myDisk
= KidIdentifier
% Config
.cacheSwap
.n_processes
;
49 if (const SwapDir
*sd
= dynamic_cast<const SwapDir
*>(INDEXSD(myDisk
))) {
50 ann
.strand
.tag
= sd
->path
;
51 ann
.strand
.tag
.append("/rock"); // XXX: scope boundary violation
57 SendMessage(coordinatorAddr
, message
);
58 setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
61 void Ipc::Strand::receive(const TypedMsgHdr
&message
)
63 debugs(54, 6, HERE
<< message
.type());
64 switch (message
.type()) {
67 handleRegistrationResponse(HereIamMessage(message
));
70 case mtSharedListenResponse
:
71 SharedListenJoined(SharedListenResponse(message
));
75 IpcIoFile::HandleRequest(IpcIoRequest(message
));
79 IpcIoFile::HandleResponse(message
);
82 case mtCacheMgrRequest
:
83 handleCacheMgrRequest(Mgr::Request(message
));
86 case mtCacheMgrResponse
:
87 handleCacheMgrResponse(Mgr::Response(message
));
91 debugs(54, 1, HERE
<< "Unhandled message type: " << message
.type());
96 void Ipc::Strand::handleRegistrationResponse(const HereIamMessage
&msg
)
98 // handle registration response from the coordinator; it could be stale
99 if (msg
.strand
.kidId
== KidIdentifier
&& msg
.strand
.pid
== getpid()) {
100 debugs(54, 6, "kid" << KidIdentifier
<< " registered");
101 clearTimeout(); // we are done
103 // could be an ACK to the registration message of our dead predecessor
104 debugs(54, 6, "kid" << KidIdentifier
<< " is not yet registered");
105 // keep listening, with a timeout
109 void Ipc::Strand::handleCacheMgrRequest(const Mgr::Request
& request
)
111 Mgr::Action::Pointer action
=
112 CacheManager::GetInstance()->createRequestedAction(request
.params
);
113 action
->respond(request
);
116 void Ipc::Strand::handleCacheMgrResponse(const Mgr::Response
& response
)
118 Mgr::Forwarder::HandleRemoteAck(response
.requestId
);
121 void Ipc::Strand::timedout()
123 debugs(54, 6, HERE
<< isRegistered
);
125 fatalf("kid%d registration timed out", KidIdentifier
);