]> git.ipfire.org Git - thirdparty/squid.git/blob - src/mgr/StoreToCommWriter.cc
Preserve caller context across (and improve) deferred reads (#1025)
[thirdparty/squid.git] / src / mgr / StoreToCommWriter.cc
1 /*
2 * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 16 Cache Manager API */
10
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/TextException.h"
14 #include "comm.h"
15 #include "comm/Connection.h"
16 #include "comm/Write.h"
17 #include "CommCalls.h"
18 #include "ipc/FdNotes.h"
19 #include "mgr/StoreToCommWriter.h"
20 #include "Store.h"
21 #include "StoreClient.h"
22
23 CBDATA_NAMESPACED_CLASS_INIT(Mgr, StoreToCommWriter);
24
25 Mgr::StoreToCommWriter::StoreToCommWriter(const Comm::ConnectionPointer &conn, StoreEntry* anEntry):
26 AsyncJob("Mgr::StoreToCommWriter"),
27 clientConnection(conn), entry(anEntry), sc(NULL), writeOffset(0), closer(NULL)
28 {
29 debugs(16, 6, clientConnection);
30 closer = asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommClosed",
31 CommCbMemFunT<StoreToCommWriter, CommCloseCbParams>(this, &StoreToCommWriter::noteCommClosed));
32 comm_add_close_handler(clientConnection->fd, closer);
33 }
34
35 Mgr::StoreToCommWriter::~StoreToCommWriter()
36 {
37 debugs(16, 6, MYNAME);
38 assert(!entry);
39 assert(!sc);
40 close();
41 }
42
43 /// closes our copy of the client HTTP connection socket
44 void
45 Mgr::StoreToCommWriter::close()
46 {
47 if (Comm::IsConnOpen(clientConnection)) {
48 if (closer != NULL) {
49 comm_remove_close_handler(clientConnection->fd, closer);
50 closer = NULL;
51 }
52 clientConnection->close();
53 }
54 }
55
56 void
57 Mgr::StoreToCommWriter::start()
58 {
59 debugs(16, 6, MYNAME);
60 Must(Comm::IsConnOpen(clientConnection));
61 Must(entry != NULL);
62 AsyncCall::Pointer call = asyncCall(16, 4, "StoreToCommWriter::Abort", cbdataDialer(&StoreToCommWriter::HandleStoreAbort, this));
63 entry->registerAbortCallback(call);
64 sc = storeClientListAdd(entry, this);
65 Must(sc != NULL);
66
67 // initiate the receive-from-store, write-to-comm sequence
68 scheduleStoreCopy();
69 }
70
71 void
72 Mgr::StoreToCommWriter::scheduleStoreCopy()
73 {
74 debugs(16, 6, MYNAME);
75 Must(entry != NULL);
76 Must(sc != NULL);
77 StoreIOBuffer readBuf(sizeof(buffer), writeOffset, buffer);
78 storeClientCopy(sc, entry, readBuf, &NoteStoreCopied, this);
79 }
80
81 void
82 Mgr::StoreToCommWriter::NoteStoreCopied(void* data, StoreIOBuffer ioBuf)
83 {
84 Must(data != NULL);
85 // make sync Store call async to get async call protections and features
86 StoreToCommWriter* writer = static_cast<StoreToCommWriter*>(data);
87 typedef UnaryMemFunT<StoreToCommWriter, StoreIOBuffer> MyDialer;
88 AsyncCall::Pointer call =
89 asyncCall(16, 5, "Mgr::StoreToCommWriter::noteStoreCopied",
90 MyDialer(writer, &StoreToCommWriter::noteStoreCopied, ioBuf));
91 ScheduleCallHere(call);
92 }
93
94 void
95 Mgr::StoreToCommWriter::noteStoreCopied(StoreIOBuffer ioBuf)
96 {
97 debugs(16, 6, MYNAME);
98 Must(!ioBuf.flags.error);
99 if (ioBuf.length > 0)
100 scheduleCommWrite(ioBuf); // write received action results to client
101 else
102 Must(doneAll()); // otherwise, why would Store call us with no data?
103 }
104
105 void
106 Mgr::StoreToCommWriter::scheduleCommWrite(const StoreIOBuffer& ioBuf)
107 {
108 debugs(16, 6, MYNAME);
109 Must(Comm::IsConnOpen(clientConnection));
110 Must(ioBuf.data != NULL);
111 // write filled buffer
112 typedef CommCbMemFunT<StoreToCommWriter, CommIoCbParams> MyDialer;
113 AsyncCall::Pointer writer =
114 asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommWrote",
115 MyDialer(this, &StoreToCommWriter::noteCommWrote));
116 Comm::Write(clientConnection, ioBuf.data, ioBuf.length, writer, NULL);
117 }
118
119 void
120 Mgr::StoreToCommWriter::noteCommWrote(const CommIoCbParams& params)
121 {
122 debugs(16, 6, MYNAME);
123 Must(params.flag == Comm::OK);
124 Must(clientConnection != NULL && params.fd == clientConnection->fd);
125 Must(params.size != 0);
126 writeOffset += params.size;
127 if (!doneAll())
128 scheduleStoreCopy(); // retrieve the next data portion
129 }
130
131 void
132 Mgr::StoreToCommWriter::noteCommClosed(const CommCloseCbParams &)
133 {
134 debugs(16, 6, MYNAME);
135 if (clientConnection) {
136 clientConnection->noteClosure();
137 clientConnection = nullptr;
138 }
139 closer = nullptr;
140 mustStop("commClosed");
141 }
142
143 void
144 Mgr::StoreToCommWriter::swanSong()
145 {
146 debugs(16, 6, MYNAME);
147 if (entry != NULL) {
148 if (sc != NULL) {
149 storeUnregister(sc, entry, this);
150 sc = NULL;
151 }
152 entry->unregisterAbortCallback("StoreToCommWriter done");
153 entry->unlock("Mgr::StoreToCommWriter::swanSong");
154 entry = NULL;
155 }
156 close();
157 }
158
159 bool
160 Mgr::StoreToCommWriter::doneAll() const
161 {
162 return entry &&
163 entry->store_status == STORE_OK && // the action is over
164 writeOffset >= entry->objectLen(); // we wrote all the results
165 }
166
167 void
168 Mgr::StoreToCommWriter::HandleStoreAbort(StoreToCommWriter *mgrWriter)
169 {
170 if (Comm::IsConnOpen(mgrWriter->clientConnection))
171 mgrWriter->clientConnection->close();
172 }
173