2 * Copyright (C) 1996-2019 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 54 Interprocess Communication */
12 #include "base/TextException.h"
14 #include "comm/Connection.h"
17 #include "ipc/Messages.h"
19 #include "ipc/SharedListen.h"
20 #include "ipc/StartListening.h"
21 #include "ipc/TypedMsgHdr.h"
27 /// holds information necessary to handle JoinListen response
28 class PendingOpenRequest
31 Ipc::OpenListenerParams params
; ///< actual comm_open_sharedListen() parameters
32 AsyncCall::Pointer callback
; // who to notify
35 /// maps ID assigned at request time to the response callback
36 typedef std::map
<int, PendingOpenRequest
> SharedListenRequestMap
;
37 static SharedListenRequestMap TheSharedListenRequestMap
;
39 /// accumulates delayed requests until they are ready to be sent, in FIFO order
40 typedef std::list
<PendingOpenRequest
> DelayedSharedListenRequests
;
41 static DelayedSharedListenRequests TheDelayedRequests
;
44 AddToMap(const PendingOpenRequest
&por
)
46 // find unused ID using linear seach; there should not be many entries
47 for (int id
= 0; true; ++id
) {
48 if (TheSharedListenRequestMap
.find(id
) == TheSharedListenRequestMap
.end()) {
49 TheSharedListenRequestMap
[id
] = por
;
53 assert(false); // not reached
58 Ipc::OpenListenerParams::operator <(const OpenListenerParams
&p
) const
60 if (sock_type
!= p
.sock_type
)
61 return sock_type
< p
.sock_type
;
64 return proto
< p
.proto
;
66 // ignore flags and fdNote differences because they do not affect binding
68 return addr
.compareWhole(p
.addr
) < 0;
71 Ipc::SharedListenRequest::SharedListenRequest(): requestorId(-1), mapId(-1)
73 // caller will then set public data members
76 Ipc::SharedListenRequest::SharedListenRequest(const TypedMsgHdr
&hdrMsg
)
78 hdrMsg
.checkType(mtSharedListenRequest
);
82 void Ipc::SharedListenRequest::pack(TypedMsgHdr
&hdrMsg
) const
84 hdrMsg
.setType(mtSharedListenRequest
);
88 Ipc::SharedListenResponse::SharedListenResponse(int aFd
, int anErrNo
, int aMapId
):
89 fd(aFd
), errNo(anErrNo
), mapId(aMapId
)
93 Ipc::SharedListenResponse::SharedListenResponse(const TypedMsgHdr
&hdrMsg
):
94 fd(-1), errNo(0), mapId(-1)
96 hdrMsg
.checkType(mtSharedListenResponse
);
99 // other conn details are passed in OpenListenerParams and filled out by SharedListenJoin()
102 void Ipc::SharedListenResponse::pack(TypedMsgHdr
&hdrMsg
) const
104 hdrMsg
.setType(mtSharedListenResponse
);
105 hdrMsg
.putPod(*this);
110 SendSharedListenRequest(const PendingOpenRequest
&por
)
112 Ipc::SharedListenRequest request
;
113 request
.requestorId
= KidIdentifier
;
114 request
.params
= por
.params
;
115 request
.mapId
= AddToMap(por
);
117 debugs(54, 3, "getting listening FD for " << request
.params
.addr
<<
118 " mapId=" << request
.mapId
);
120 Ipc::TypedMsgHdr message
;
121 request
.pack(message
);
122 SendMessage(Ipc::Port::CoordinatorAddr(), message
);
128 if (TheDelayedRequests
.empty())
129 return; // no pending requests to resume
131 debugs(54, 3, "resuming with " << TheSharedListenRequestMap
.size() <<
132 " active + " << TheDelayedRequests
.size() << " delayed requests");
134 SendSharedListenRequest(*TheDelayedRequests
.begin());
135 TheDelayedRequests
.pop_front();
139 Ipc::JoinSharedListen(const OpenListenerParams
¶ms
, AsyncCall::Pointer
&cb
)
141 PendingOpenRequest por
;
145 const DelayedSharedListenRequests::size_type concurrencyLimit
= 1;
146 if (TheSharedListenRequestMap
.size() >= concurrencyLimit
) {
147 debugs(54, 3, "waiting for " << TheSharedListenRequestMap
.size() <<
148 " active + " << TheDelayedRequests
.size() << " delayed requests");
149 TheDelayedRequests
.push_back(por
);
151 SendSharedListenRequest(por
);
155 void Ipc::SharedListenJoined(const SharedListenResponse
&response
)
157 // Dont debugs c fully since only FD is filled right now.
158 debugs(54, 3, "got listening FD " << response
.fd
<< " errNo=" <<
159 response
.errNo
<< " mapId=" << response
.mapId
<< " with " <<
160 TheSharedListenRequestMap
.size() << " active + " <<
161 TheDelayedRequests
.size() << " delayed requests");
163 Must(TheSharedListenRequestMap
.find(response
.mapId
) != TheSharedListenRequestMap
.end());
164 PendingOpenRequest por
= TheSharedListenRequestMap
[response
.mapId
];
165 Must(por
.callback
!= NULL
);
166 TheSharedListenRequestMap
.erase(response
.mapId
);
168 StartListeningCb
*cbd
= dynamic_cast<StartListeningCb
*>(por
.callback
->getDialer());
169 assert(cbd
&& cbd
->conn
!= NULL
);
170 Must(cbd
&& cbd
->conn
!= NULL
);
171 cbd
->conn
->fd
= response
.fd
;
173 if (Comm::IsConnOpen(cbd
->conn
)) {
174 OpenListenerParams
&p
= por
.params
;
175 cbd
->conn
->local
= p
.addr
;
176 cbd
->conn
->flags
= p
.flags
;
177 // XXX: leave the comm AI stuff to comm_import_opened()?
178 struct addrinfo
*AI
= NULL
;
179 p
.addr
.getAddrInfo(AI
);
180 AI
->ai_socktype
= p
.sock_type
;
181 AI
->ai_protocol
= p
.proto
;
182 comm_import_opened(cbd
->conn
, FdNote(p
.fdNote
), AI
);
183 Ip::Address::FreeAddr(AI
);
186 cbd
->errNo
= response
.errNo
;
187 cbd
->handlerSubscription
= por
.params
.handlerSubscription
;
188 ScheduleCallHere(por
.callback
);
190 kickDelayedRequest();