4 * DEBUG: section 54 Interprocess Communication
9 #include "base/TextException.h"
11 #include "comm/Connection.h"
13 #include "ipc/Messages.h"
15 #include "ipc/TypedMsgHdr.h"
16 #include "ipc/StartListening.h"
17 #include "ipc/SharedListen.h"
21 /// holds information necessary to handle JoinListen response
22 class PendingOpenRequest
25 Ipc::OpenListenerParams params
; ///< actual comm_open_sharedListen() parameters
26 AsyncCall::Pointer callback
; // who to notify
29 /// maps ID assigned at request time to the response callback
30 typedef std::map
<int, PendingOpenRequest
> SharedListenRequestMap
;
31 static SharedListenRequestMap TheSharedListenRequestMap
;
34 AddToMap(const PendingOpenRequest
&por
)
36 // find unused ID using linear seach; there should not be many entries
37 for (int id
= 0; true; ++id
) {
38 if (TheSharedListenRequestMap
.find(id
) == TheSharedListenRequestMap
.end()) {
39 TheSharedListenRequestMap
[id
] = por
;
43 assert(false); // not reached
47 Ipc::OpenListenerParams::OpenListenerParams()
49 xmemset(this, 0, sizeof(*this));
53 Ipc::OpenListenerParams::operator <(const OpenListenerParams
&p
) const
55 if (sock_type
!= p
.sock_type
)
56 return sock_type
< p
.sock_type
;
59 return proto
< p
.proto
;
61 // ignore flags and fdNote differences because they do not affect binding
63 return addr
.compareWhole(p
.addr
) < 0;
68 Ipc::SharedListenRequest::SharedListenRequest(): requestorId(-1), mapId(-1)
70 // caller will then set public data members
73 Ipc::SharedListenRequest::SharedListenRequest(const TypedMsgHdr
&hdrMsg
)
75 hdrMsg
.getData(mtSharedListenRequest
, this, sizeof(*this));
78 void Ipc::SharedListenRequest::pack(TypedMsgHdr
&hdrMsg
) const
80 hdrMsg
.putData(mtSharedListenRequest
, this, sizeof(*this));
84 Ipc::SharedListenResponse::SharedListenResponse(const Comm::ConnectionPointer
&c
, int anErrNo
, int aMapId
):
85 conn(c
), errNo(anErrNo
), mapId(aMapId
)
89 Ipc::SharedListenResponse::SharedListenResponse(const TypedMsgHdr
&hdrMsg
):
90 conn(NULL
), errNo(0), mapId(-1)
92 hdrMsg
.getData(mtSharedListenResponse
, this, sizeof(*this));
93 conn
= new Comm::Connection
;
94 conn
->fd
= hdrMsg
.getFd();
95 // other conn details are passed in OpenListenerParams and filled out by SharedListenJoin()
98 void Ipc::SharedListenResponse::pack(TypedMsgHdr
&hdrMsg
) const
100 hdrMsg
.putData(mtSharedListenResponse
, this, sizeof(*this));
101 hdrMsg
.putFd(conn
->fd
);
105 void Ipc::JoinSharedListen(const OpenListenerParams
¶ms
,
106 AsyncCall::Pointer
&callback
)
108 PendingOpenRequest por
;
110 por
.callback
= callback
;
112 SharedListenRequest request
;
113 request
.requestorId
= KidIdentifier
;
114 request
.params
= por
.params
;
115 request
.mapId
= AddToMap(por
);
117 debugs(54, 3, HERE
<< "getting listening FD for " << request
.params
.addr
<<
118 " mapId=" << request
.mapId
);
121 request
.pack(message
);
122 SendMessage(coordinatorAddr
, message
);
125 void Ipc::SharedListenJoined(const SharedListenResponse
&response
)
127 Comm::ConnectionPointer c
= response
.conn
;
129 // Dont debugs c fully since only FD is filled right now.
130 debugs(54, 3, HERE
<< "got listening FD " << c
->fd
<< " errNo=" <<
131 response
.errNo
<< " mapId=" << response
.mapId
);
133 Must(TheSharedListenRequestMap
.find(response
.mapId
) != TheSharedListenRequestMap
.end());
134 PendingOpenRequest por
= TheSharedListenRequestMap
[response
.mapId
];
135 Must(por
.callback
!= NULL
);
136 TheSharedListenRequestMap
.erase(response
.mapId
);
138 if (Comm::IsConnOpen(c
)) {
139 OpenListenerParams
&p
= por
.params
;
142 // XXX: leave the comm AI stuff to comm_import_opened()?
143 struct addrinfo
*AI
= NULL
;
144 p
.addr
.GetAddrInfo(AI
);
145 AI
->ai_socktype
= p
.sock_type
;
146 AI
->ai_protocol
= p
.proto
;
147 comm_import_opened(c
, FdNote(p
.fdNote
), AI
);
148 p
.addr
.FreeAddrInfo(AI
);
151 StartListeningCb
*cbd
= dynamic_cast<StartListeningCb
*>(por
.callback
->getDialer());
154 cbd
->errNo
= response
.errNo
;
155 cbd
->handlerSubscription
= por
.params
.handlerSubscription
;
156 ScheduleCallHere(por
.callback
);