2 * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 54 Interprocess Communication */
12 #include "base/TextException.h"
14 #include "comm/Connection.h"
17 #include "ipc/Messages.h"
19 #include "ipc/SharedListen.h"
20 #include "ipc/StartListening.h"
21 #include "ipc/TypedMsgHdr.h"
27 /// holds information necessary to handle JoinListen response
28 class PendingOpenRequest
31 Ipc::OpenListenerParams params
; ///< actual comm_open_sharedListen() parameters
32 AsyncCall::Pointer callback
; // who to notify
35 /// maps ID assigned at request time to the response callback
36 typedef std::map
<int, PendingOpenRequest
> SharedListenRequestMap
;
37 static SharedListenRequestMap TheSharedListenRequestMap
;
39 /// accumulates delayed requests until they are ready to be sent, in FIFO order
40 typedef std::list
<PendingOpenRequest
> DelayedSharedListenRequests
;
41 static DelayedSharedListenRequests TheDelayedRequests
;
44 AddToMap(const PendingOpenRequest
&por
)
46 // find unused ID using linear seach; there should not be many entries
47 for (int id
= 0; true; ++id
) {
48 if (TheSharedListenRequestMap
.find(id
) == TheSharedListenRequestMap
.end()) {
49 TheSharedListenRequestMap
[id
] = por
;
53 assert(false); // not reached
57 Ipc::OpenListenerParams::OpenListenerParams()
59 memset(this, 0, sizeof(*this));
63 Ipc::OpenListenerParams::operator <(const OpenListenerParams
&p
) const
65 if (sock_type
!= p
.sock_type
)
66 return sock_type
< p
.sock_type
;
69 return proto
< p
.proto
;
71 // ignore flags and fdNote differences because they do not affect binding
73 return addr
.compareWhole(p
.addr
) < 0;
76 Ipc::SharedListenRequest::SharedListenRequest(): requestorId(-1), mapId(-1)
78 // caller will then set public data members
81 Ipc::SharedListenRequest::SharedListenRequest(const TypedMsgHdr
&hdrMsg
)
83 hdrMsg
.checkType(mtSharedListenRequest
);
87 void Ipc::SharedListenRequest::pack(TypedMsgHdr
&hdrMsg
) const
89 hdrMsg
.setType(mtSharedListenRequest
);
93 Ipc::SharedListenResponse::SharedListenResponse(int aFd
, int anErrNo
, int aMapId
):
94 fd(aFd
), errNo(anErrNo
), mapId(aMapId
)
98 Ipc::SharedListenResponse::SharedListenResponse(const TypedMsgHdr
&hdrMsg
):
99 fd(-1), errNo(0), mapId(-1)
101 hdrMsg
.checkType(mtSharedListenResponse
);
102 hdrMsg
.getPod(*this);
104 // other conn details are passed in OpenListenerParams and filled out by SharedListenJoin()
107 void Ipc::SharedListenResponse::pack(TypedMsgHdr
&hdrMsg
) const
109 hdrMsg
.setType(mtSharedListenResponse
);
110 hdrMsg
.putPod(*this);
115 SendSharedListenRequest(const PendingOpenRequest
&por
)
117 Ipc::SharedListenRequest request
;
118 request
.requestorId
= KidIdentifier
;
119 request
.params
= por
.params
;
120 request
.mapId
= AddToMap(por
);
122 debugs(54, 3, "getting listening FD for " << request
.params
.addr
<<
123 " mapId=" << request
.mapId
);
125 Ipc::TypedMsgHdr message
;
126 request
.pack(message
);
127 SendMessage(Ipc::Port::CoordinatorAddr(), message
);
133 if (TheDelayedRequests
.empty())
134 return; // no pending requests to resume
136 debugs(54, 3, "resuming with " << TheSharedListenRequestMap
.size() <<
137 " active + " << TheDelayedRequests
.size() << " delayed requests");
139 SendSharedListenRequest(*TheDelayedRequests
.begin());
140 TheDelayedRequests
.pop_front();
144 Ipc::JoinSharedListen(const OpenListenerParams
¶ms
, AsyncCall::Pointer
&cb
)
146 PendingOpenRequest por
;
150 const DelayedSharedListenRequests::size_type concurrencyLimit
= 1;
151 if (TheSharedListenRequestMap
.size() >= concurrencyLimit
) {
152 debugs(54, 3, "waiting for " << TheSharedListenRequestMap
.size() <<
153 " active + " << TheDelayedRequests
.size() << " delayed requests");
154 TheDelayedRequests
.push_back(por
);
156 SendSharedListenRequest(por
);
160 void Ipc::SharedListenJoined(const SharedListenResponse
&response
)
162 // Dont debugs c fully since only FD is filled right now.
163 debugs(54, 3, "got listening FD " << response
.fd
<< " errNo=" <<
164 response
.errNo
<< " mapId=" << response
.mapId
<< " with " <<
165 TheSharedListenRequestMap
.size() << " active + " <<
166 TheDelayedRequests
.size() << " delayed requests");
168 Must(TheSharedListenRequestMap
.find(response
.mapId
) != TheSharedListenRequestMap
.end());
169 PendingOpenRequest por
= TheSharedListenRequestMap
[response
.mapId
];
170 Must(por
.callback
!= NULL
);
171 TheSharedListenRequestMap
.erase(response
.mapId
);
173 StartListeningCb
*cbd
= dynamic_cast<StartListeningCb
*>(por
.callback
->getDialer());
174 assert(cbd
&& cbd
->conn
!= NULL
);
175 Must(cbd
&& cbd
->conn
!= NULL
);
176 cbd
->conn
->fd
= response
.fd
;
178 if (Comm::IsConnOpen(cbd
->conn
)) {
179 OpenListenerParams
&p
= por
.params
;
180 cbd
->conn
->local
= p
.addr
;
181 cbd
->conn
->flags
= p
.flags
;
182 // XXX: leave the comm AI stuff to comm_import_opened()?
183 struct addrinfo
*AI
= NULL
;
184 p
.addr
.getAddrInfo(AI
);
185 AI
->ai_socktype
= p
.sock_type
;
186 AI
->ai_protocol
= p
.proto
;
187 comm_import_opened(cbd
->conn
, FdNote(p
.fdNote
), AI
);
188 Ip::Address::FreeAddr(AI
);
191 cbd
->errNo
= response
.errNo
;
192 cbd
->handlerSubscription
= por
.params
.handlerSubscription
;
193 ScheduleCallHere(por
.callback
);
195 kickDelayedRequest();