]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/mgr/StoreToCommWriter.cc
4 * DEBUG: section 16 Cache Manager API
9 #include "base/TextException.h"
10 #include "comm/Connection.h"
11 #include "CommCalls.h"
12 #include "comm/Write.h"
13 #include "ipc/FdNotes.h"
14 #include "mgr/StoreToCommWriter.h"
15 #include "StoreClient.h"
19 CBDATA_NAMESPACED_CLASS_INIT(Mgr
, StoreToCommWriter
);
22 Mgr::StoreToCommWriter::StoreToCommWriter(const Comm::ConnectionPointer
&conn
, StoreEntry
* anEntry
):
23 AsyncJob("Mgr::StoreToCommWriter"),
24 clientConnection(conn
), entry(anEntry
), sc(NULL
), writeOffset(0), closer(NULL
)
26 debugs(16, 6, HERE
<< clientConnection
);
27 closer
= asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommClosed",
28 CommCbMemFunT
<StoreToCommWriter
, CommCloseCbParams
>(this, &StoreToCommWriter::noteCommClosed
));
29 comm_add_close_handler(clientConnection
->fd
, closer
);
32 Mgr::StoreToCommWriter::~StoreToCommWriter()
40 /// closes our copy of the client HTTP connection socket
42 Mgr::StoreToCommWriter::close()
44 if (Comm::IsConnOpen(clientConnection
)) {
46 comm_remove_close_handler(clientConnection
->fd
, closer
);
49 clientConnection
->close();
54 Mgr::StoreToCommWriter::start()
57 Must(Comm::IsConnOpen(clientConnection
));
59 entry
->registerAbort(&StoreToCommWriter::Abort
, this);
60 sc
= storeClientListAdd(entry
, this);
63 // initiate the receive-from-store, write-to-comm sequence
68 Mgr::StoreToCommWriter::scheduleStoreCopy()
73 StoreIOBuffer
readBuf(sizeof(buffer
), writeOffset
, buffer
);
74 storeClientCopy(sc
, entry
, readBuf
, &NoteStoreCopied
, this);
78 Mgr::StoreToCommWriter::NoteStoreCopied(void* data
, StoreIOBuffer ioBuf
)
81 // make sync Store call async to get async call protections and features
82 StoreToCommWriter
* writer
= static_cast<StoreToCommWriter
*>(data
);
83 typedef UnaryMemFunT
<StoreToCommWriter
, StoreIOBuffer
> MyDialer
;
84 AsyncCall::Pointer call
=
85 asyncCall(16, 5, "Mgr::StoreToCommWriter::noteStoreCopied",
86 MyDialer(writer
, &StoreToCommWriter::noteStoreCopied
, ioBuf
));
87 ScheduleCallHere(call
);
91 Mgr::StoreToCommWriter::noteStoreCopied(StoreIOBuffer ioBuf
)
94 Must(!ioBuf
.flags
.error
);
96 scheduleCommWrite(ioBuf
); // write received action results to client
98 Must(doneAll()); // otherwise, why would Store call us with no data?
102 Mgr::StoreToCommWriter::scheduleCommWrite(const StoreIOBuffer
& ioBuf
)
105 Must(Comm::IsConnOpen(clientConnection
));
106 Must(ioBuf
.data
!= NULL
);
107 // write filled buffer
108 typedef CommCbMemFunT
<StoreToCommWriter
, CommIoCbParams
> MyDialer
;
109 AsyncCall::Pointer writer
=
110 asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommWrote",
111 MyDialer(this, &StoreToCommWriter::noteCommWrote
));
112 Comm::Write(clientConnection
, ioBuf
.data
, ioBuf
.length
, writer
, NULL
);
116 Mgr::StoreToCommWriter::noteCommWrote(const CommIoCbParams
& params
)
119 Must(params
.flag
== COMM_OK
);
120 Must(clientConnection
!= NULL
&& params
.fd
== clientConnection
->fd
);
121 Must(params
.size
!= 0);
122 writeOffset
+= params
.size
;
124 scheduleStoreCopy(); // retrieve the next data portion
128 Mgr::StoreToCommWriter::noteCommClosed(const CommCloseCbParams
& params
)
131 Must(!Comm::IsConnOpen(clientConnection
));
132 mustStop("commClosed");
136 Mgr::StoreToCommWriter::swanSong()
141 storeUnregister(sc
, entry
, this);
144 entry
->unregisterAbort();
152 Mgr::StoreToCommWriter::doneAll() const
155 entry
->store_status
== STORE_OK
&& // the action is over
156 writeOffset
>= entry
->objectLen(); // we wrote all the results
160 Mgr::StoreToCommWriter::Abort(void* param
)
162 StoreToCommWriter
* mgrWriter
= static_cast<StoreToCommWriter
*>(param
);
163 if (Comm::IsConnOpen(mgrWriter
->clientConnection
))
164 mgrWriter
->clientConnection
->close();
167 Comm::ConnectionPointer
168 Mgr::ImportHttpFdIntoComm(int fd
)
170 Comm::ConnectionPointer result
= new Comm::Connection();
171 struct sockaddr_in addr
;
172 socklen_t len
= sizeof(addr
);
173 if (getsockname(fd
, reinterpret_cast<sockaddr
*>(&addr
), &len
) == 0) {
175 result
->local
= addr
;
176 struct addrinfo
* addr_info
= NULL
;
177 result
->local
.GetAddrInfo(addr_info
);
178 addr_info
->ai_socktype
= SOCK_STREAM
;
179 addr_info
->ai_protocol
= IPPROTO_TCP
;
180 comm_import_opened(result
, Ipc::FdNote(Ipc::fdnHttpSocket
), addr_info
);
181 result
->local
.FreeAddrInfo(addr_info
);
183 debugs(16, DBG_CRITICAL
, HERE
<< "ERROR: FD " << fd
<< ' ' << xstrerror());