4 * DEBUG: section 54 Interprocess Communication
12 #include "ipc/Messages.h"
14 #include "ipc/TypedMsgHdr.h"
15 #include "ipc/StartListening.h"
16 #include "ipc/SharedListen.h"
19 /// holds information necessary to handle JoinListen response
20 class PendingOpenRequest
23 Ipc::OpenListenerParams params
; ///< actual comm_open_sharedListen() parameters
24 AsyncCall::Pointer callback
; // who to notify
27 /// maps ID assigned at request time to the response callback
28 typedef std::map
<int, PendingOpenRequest
> SharedListenRequestMap
;
29 static SharedListenRequestMap TheSharedListenRequestMap
;
32 AddToMap(const PendingOpenRequest
&por
)
34 // find unused ID using linear seach; there should not be many entries
35 for (int id
= 0; true; ++id
) {
36 if (TheSharedListenRequestMap
.find(id
) == TheSharedListenRequestMap
.end()) {
37 TheSharedListenRequestMap
[id
] = por
;
41 assert(false); // not reached
45 Ipc::OpenListenerParams::OpenListenerParams()
47 xmemset(this, 0, sizeof(*this));
51 Ipc::OpenListenerParams::operator <(const OpenListenerParams
&p
) const
53 if (sock_type
!= p
.sock_type
)
54 return sock_type
< p
.sock_type
;
57 return proto
< p
.proto
;
59 // ignore flags and fdNote differences because they do not affect binding
61 return addr
.compareWhole(p
.addr
) < 0;
66 Ipc::SharedListenRequest::SharedListenRequest(): requestorId(-1), mapId(-1)
68 // caller will then set public data members
71 Ipc::SharedListenRequest::SharedListenRequest(const TypedMsgHdr
&hdrMsg
)
73 hdrMsg
.getData(mtSharedListenRequest
, this, sizeof(*this));
76 void Ipc::SharedListenRequest::pack(TypedMsgHdr
&hdrMsg
) const
78 hdrMsg
.putData(mtSharedListenRequest
, this, sizeof(*this));
82 Ipc::SharedListenResponse::SharedListenResponse(int aFd
, int anErrNo
, int aMapId
):
83 fd(aFd
), errNo(anErrNo
), mapId(aMapId
)
87 Ipc::SharedListenResponse::SharedListenResponse(const TypedMsgHdr
&hdrMsg
):
88 fd(-1), errNo(0), mapId(-1)
90 hdrMsg
.getData(mtSharedListenResponse
, this, sizeof(*this));
94 void Ipc::SharedListenResponse::pack(TypedMsgHdr
&hdrMsg
) const
96 hdrMsg
.putData(mtSharedListenResponse
, this, sizeof(*this));
101 void Ipc::JoinSharedListen(const OpenListenerParams
¶ms
,
102 AsyncCall::Pointer
&callback
)
104 PendingOpenRequest por
;
106 por
.callback
= callback
;
108 SharedListenRequest request
;
109 request
.requestorId
= KidIdentifier
;
110 request
.params
= por
.params
;
111 request
.mapId
= AddToMap(por
);
113 debugs(54, 3, HERE
<< "getting listening FD for " << request
.params
.addr
<<
114 " mapId=" << request
.mapId
);
117 request
.pack(message
);
118 SendMessage(coordinatorAddr
, message
);
121 void Ipc::SharedListenJoined(const SharedListenResponse
&response
)
123 const int fd
= response
.fd
;
125 debugs(54, 3, HERE
<< "got listening FD " << fd
<< " errNo=" <<
126 response
.errNo
<< " mapId=" << response
.mapId
);
128 Must(TheSharedListenRequestMap
.find(response
.mapId
) != TheSharedListenRequestMap
.end());
129 PendingOpenRequest por
= TheSharedListenRequestMap
[response
.mapId
];
130 Must(por
.callback
!= NULL
);
131 TheSharedListenRequestMap
.erase(response
.mapId
);
134 OpenListenerParams
&p
= por
.params
;
135 struct addrinfo
*AI
= NULL
;
136 p
.addr
.GetAddrInfo(AI
);
137 AI
->ai_socktype
= p
.sock_type
;
138 AI
->ai_protocol
= p
.proto
;
139 comm_import_opened(fd
, p
.addr
, p
.flags
, FdNote(p
.fdNote
), AI
);
140 p
.addr
.FreeAddrInfo(AI
);
143 StartListeningCb
*cbd
=
144 dynamic_cast<StartListeningCb
*>(por
.callback
->getDialer());
147 cbd
->errNo
= response
.errNo
;
148 ScheduleCallHere(por
.callback
);