]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/mgr/StoreToCommWriter.cc
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 16 Cache Manager API */
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/TextException.h"
15 #include "comm/Connection.h"
16 #include "comm/Write.h"
17 #include "CommCalls.h"
18 #include "ipc/FdNotes.h"
19 #include "mgr/StoreToCommWriter.h"
21 #include "StoreClient.h"
23 Mgr::StoreToCommWriter::StoreToCommWriter(const Comm::ConnectionPointer
&conn
, StoreEntry
* anEntry
):
24 AsyncJob("Mgr::StoreToCommWriter"),
25 clientConnection(conn
), entry(anEntry
), sc(nullptr), writeOffset(0), closer(nullptr)
27 debugs(16, 6, clientConnection
);
28 closer
= asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommClosed",
29 CommCbMemFunT
<StoreToCommWriter
, CommCloseCbParams
>(this, &StoreToCommWriter::noteCommClosed
));
30 comm_add_close_handler(clientConnection
->fd
, closer
);
33 Mgr::StoreToCommWriter::~StoreToCommWriter()
35 debugs(16, 6, MYNAME
);
41 /// closes our copy of the client HTTP connection socket
43 Mgr::StoreToCommWriter::close()
45 if (Comm::IsConnOpen(clientConnection
)) {
46 if (closer
!= nullptr) {
47 comm_remove_close_handler(clientConnection
->fd
, closer
);
50 clientConnection
->close();
55 Mgr::StoreToCommWriter::start()
57 debugs(16, 6, MYNAME
);
58 Must(Comm::IsConnOpen(clientConnection
));
59 Must(entry
!= nullptr);
60 AsyncCall::Pointer call
= asyncCall(16, 4, "StoreToCommWriter::Abort", cbdataDialer(&StoreToCommWriter::HandleStoreAbort
, this));
61 entry
->registerAbortCallback(call
);
62 sc
= storeClientListAdd(entry
, this);
65 // initiate the receive-from-store, write-to-comm sequence
70 Mgr::StoreToCommWriter::scheduleStoreCopy()
72 debugs(16, 6, MYNAME
);
73 Must(entry
!= nullptr);
75 StoreIOBuffer
readBuf(sizeof(buffer
), writeOffset
, buffer
);
76 storeClientCopy(sc
, entry
, readBuf
, &NoteStoreCopied
, this);
80 Mgr::StoreToCommWriter::NoteStoreCopied(void* data
, StoreIOBuffer ioBuf
)
82 Must(data
!= nullptr);
83 // make sync Store call async to get async call protections and features
84 StoreToCommWriter
* writer
= static_cast<StoreToCommWriter
*>(data
);
85 typedef UnaryMemFunT
<StoreToCommWriter
, StoreIOBuffer
> MyDialer
;
86 AsyncCall::Pointer call
=
87 asyncCall(16, 5, "Mgr::StoreToCommWriter::noteStoreCopied",
88 MyDialer(writer
, &StoreToCommWriter::noteStoreCopied
, ioBuf
));
89 ScheduleCallHere(call
);
93 Mgr::StoreToCommWriter::noteStoreCopied(StoreIOBuffer ioBuf
)
95 debugs(16, 6, MYNAME
);
96 Must(!ioBuf
.flags
.error
);
98 scheduleCommWrite(ioBuf
); // write received action results to client
100 Must(doneAll()); // otherwise, why would Store call us with no data?
104 Mgr::StoreToCommWriter::scheduleCommWrite(const StoreIOBuffer
& ioBuf
)
106 debugs(16, 6, MYNAME
);
107 Must(Comm::IsConnOpen(clientConnection
));
108 Must(ioBuf
.data
!= nullptr);
109 // write filled buffer
110 typedef CommCbMemFunT
<StoreToCommWriter
, CommIoCbParams
> MyDialer
;
111 AsyncCall::Pointer writer
=
112 asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommWrote",
113 MyDialer(this, &StoreToCommWriter::noteCommWrote
));
114 Comm::Write(clientConnection
, ioBuf
.data
, ioBuf
.length
, writer
, nullptr);
118 Mgr::StoreToCommWriter::noteCommWrote(const CommIoCbParams
& params
)
120 debugs(16, 6, MYNAME
);
121 Must(params
.flag
== Comm::OK
);
122 Must(clientConnection
!= nullptr && params
.fd
== clientConnection
->fd
);
123 Must(params
.size
!= 0);
124 writeOffset
+= params
.size
;
126 scheduleStoreCopy(); // retrieve the next data portion
130 Mgr::StoreToCommWriter::noteCommClosed(const CommCloseCbParams
&)
132 debugs(16, 6, MYNAME
);
133 if (clientConnection
) {
134 clientConnection
->noteClosure();
135 clientConnection
= nullptr;
138 mustStop("commClosed");
142 Mgr::StoreToCommWriter::swanSong()
144 debugs(16, 6, MYNAME
);
145 if (entry
!= nullptr) {
147 storeUnregister(sc
, entry
, this);
150 entry
->unregisterAbortCallback("StoreToCommWriter done");
151 entry
->unlock("Mgr::StoreToCommWriter::swanSong");
158 Mgr::StoreToCommWriter::doneAll() const
161 entry
->store_status
== STORE_OK
&& // the action is over
162 writeOffset
>= entry
->objectLen(); // we wrote all the results
166 Mgr::StoreToCommWriter::HandleStoreAbort(StoreToCommWriter
*mgrWriter
)
168 if (Comm::IsConnOpen(mgrWriter
->clientConnection
))
169 mgrWriter
->clientConnection
->close();