]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/SharedListen.cc
Merge from trunk
[thirdparty/squid.git] / src / ipc / SharedListen.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8 #include "config.h"
9 #include "base/TextException.h"
10 #include "comm.h"
11 #include "comm/Connection.h"
12 #include "ipc/Port.h"
13 #include "ipc/Messages.h"
14 #include "ipc/Kids.h"
15 #include "ipc/TypedMsgHdr.h"
16 #include "ipc/StartListening.h"
17 #include "ipc/SharedListen.h"
18
19 #include <map>
20
21 /// holds information necessary to handle JoinListen response
22 class PendingOpenRequest
23 {
24 public:
25 Ipc::OpenListenerParams params; ///< actual comm_open_sharedListen() parameters
26 AsyncCall::Pointer callback; // who to notify
27 };
28
29 /// maps ID assigned at request time to the response callback
30 typedef std::map<int, PendingOpenRequest> SharedListenRequestMap;
31 static SharedListenRequestMap TheSharedListenRequestMap;
32
33 static int
34 AddToMap(const PendingOpenRequest &por)
35 {
36 // find unused ID using linear seach; there should not be many entries
37 for (int id = 0; true; ++id) {
38 if (TheSharedListenRequestMap.find(id) == TheSharedListenRequestMap.end()) {
39 TheSharedListenRequestMap[id] = por;
40 return id;
41 }
42 }
43 assert(false); // not reached
44 return -1;
45 }
46
47 Ipc::OpenListenerParams::OpenListenerParams()
48 {
49 xmemset(this, 0, sizeof(*this));
50 }
51
52 bool
53 Ipc::OpenListenerParams::operator <(const OpenListenerParams &p) const
54 {
55 if (sock_type != p.sock_type)
56 return sock_type < p.sock_type;
57
58 if (proto != p.proto)
59 return proto < p.proto;
60
61 // ignore flags and fdNote differences because they do not affect binding
62
63 return addr.compareWhole(p.addr) < 0;
64 }
65
66
67
68 Ipc::SharedListenRequest::SharedListenRequest(): requestorId(-1), mapId(-1)
69 {
70 // caller will then set public data members
71 }
72
73 Ipc::SharedListenRequest::SharedListenRequest(const TypedMsgHdr &hdrMsg)
74 {
75 hdrMsg.getData(mtSharedListenRequest, this, sizeof(*this));
76 }
77
78 void Ipc::SharedListenRequest::pack(TypedMsgHdr &hdrMsg) const
79 {
80 hdrMsg.putData(mtSharedListenRequest, this, sizeof(*this));
81 }
82
83
84 Ipc::SharedListenResponse::SharedListenResponse(const Comm::ConnectionPointer &c, int anErrNo, int aMapId):
85 conn(c), errNo(anErrNo), mapId(aMapId)
86 {
87 }
88
89 Ipc::SharedListenResponse::SharedListenResponse(const TypedMsgHdr &hdrMsg):
90 conn(NULL), errNo(0), mapId(-1)
91 {
92 hdrMsg.getData(mtSharedListenResponse, this, sizeof(*this));
93 conn = new Comm::Connection;
94 conn->fd = hdrMsg.getFd();
95 // other conn details are passed in OpenListenerParams and filled out by SharedListenJoin()
96 }
97
98 void Ipc::SharedListenResponse::pack(TypedMsgHdr &hdrMsg) const
99 {
100 hdrMsg.putData(mtSharedListenResponse, this, sizeof(*this));
101 hdrMsg.putFd(conn->fd);
102 }
103
104
105 void Ipc::JoinSharedListen(const OpenListenerParams &params,
106 AsyncCall::Pointer &callback)
107 {
108 PendingOpenRequest por;
109 por.params = params;
110 por.callback = callback;
111
112 SharedListenRequest request;
113 request.requestorId = KidIdentifier;
114 request.params = por.params;
115 request.mapId = AddToMap(por);
116
117 debugs(54, 3, HERE << "getting listening FD for " << request.params.addr <<
118 " mapId=" << request.mapId);
119
120 TypedMsgHdr message;
121 request.pack(message);
122 SendMessage(coordinatorAddr, message);
123 }
124
125 void Ipc::SharedListenJoined(const SharedListenResponse &response)
126 {
127 Comm::ConnectionPointer c = response.conn;
128
129 // Dont debugs c fully since only FD is filled right now.
130 debugs(54, 3, HERE << "got listening FD " << c->fd << " errNo=" <<
131 response.errNo << " mapId=" << response.mapId);
132
133 Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end());
134 PendingOpenRequest por = TheSharedListenRequestMap[response.mapId];
135 Must(por.callback != NULL);
136 TheSharedListenRequestMap.erase(response.mapId);
137
138 if (Comm::IsConnOpen(c)) {
139 OpenListenerParams &p = por.params;
140 c->local = p.addr;
141 c->flags = p.flags;
142 // XXX: leave the comm AI stuff to comm_import_opened()?
143 struct addrinfo *AI = NULL;
144 p.addr.GetAddrInfo(AI);
145 AI->ai_socktype = p.sock_type;
146 AI->ai_protocol = p.proto;
147 comm_import_opened(c, FdNote(p.fdNote), AI);
148 p.addr.FreeAddrInfo(AI);
149 }
150
151 StartListeningCb *cbd = dynamic_cast<StartListeningCb*>(por.callback->getDialer());
152 Must(cbd);
153 cbd->conn = c;
154 cbd->errNo = response.errNo;
155 cbd->handlerSubscription = por.params.handlerSubscription;
156 ScheduleCallHere(por.callback);
157 }