]> git.ipfire.org Git - thirdparty/squid.git/blob - src/mgr/StoreToCommWriter.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / mgr / StoreToCommWriter.cc
1 /*
2 * DEBUG: section 16 Cache Manager API
3 *
4 */
5
6 #include "squid.h"
7 #include "base/TextException.h"
8 #include "comm/Connection.h"
9 #include "comm/Write.h"
10 #include "CommCalls.h"
11 #include "ipc/FdNotes.h"
12 #include "mgr/StoreToCommWriter.h"
13 #include "Store.h"
14 #include "StoreClient.h"
15
16 CBDATA_NAMESPACED_CLASS_INIT(Mgr, StoreToCommWriter);
17
18 Mgr::StoreToCommWriter::StoreToCommWriter(const Comm::ConnectionPointer &conn, StoreEntry* anEntry):
19 AsyncJob("Mgr::StoreToCommWriter"),
20 clientConnection(conn), entry(anEntry), sc(NULL), writeOffset(0), closer(NULL)
21 {
22 debugs(16, 6, HERE << clientConnection);
23 closer = asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommClosed",
24 CommCbMemFunT<StoreToCommWriter, CommCloseCbParams>(this, &StoreToCommWriter::noteCommClosed));
25 comm_add_close_handler(clientConnection->fd, closer);
26 }
27
28 Mgr::StoreToCommWriter::~StoreToCommWriter()
29 {
30 debugs(16, 6, HERE);
31 assert(!entry);
32 assert(!sc);
33 close();
34 }
35
36 /// closes our copy of the client HTTP connection socket
37 void
38 Mgr::StoreToCommWriter::close()
39 {
40 if (Comm::IsConnOpen(clientConnection)) {
41 if (closer != NULL) {
42 comm_remove_close_handler(clientConnection->fd, closer);
43 closer = NULL;
44 }
45 clientConnection->close();
46 }
47 }
48
49 void
50 Mgr::StoreToCommWriter::start()
51 {
52 debugs(16, 6, HERE);
53 Must(Comm::IsConnOpen(clientConnection));
54 Must(entry != NULL);
55 entry->registerAbort(&StoreToCommWriter::Abort, this);
56 sc = storeClientListAdd(entry, this);
57 Must(sc != NULL);
58
59 // initiate the receive-from-store, write-to-comm sequence
60 scheduleStoreCopy();
61 }
62
63 void
64 Mgr::StoreToCommWriter::scheduleStoreCopy()
65 {
66 debugs(16, 6, HERE);
67 Must(entry != NULL);
68 Must(sc != NULL);
69 StoreIOBuffer readBuf(sizeof(buffer), writeOffset, buffer);
70 storeClientCopy(sc, entry, readBuf, &NoteStoreCopied, this);
71 }
72
73 void
74 Mgr::StoreToCommWriter::NoteStoreCopied(void* data, StoreIOBuffer ioBuf)
75 {
76 Must(data != NULL);
77 // make sync Store call async to get async call protections and features
78 StoreToCommWriter* writer = static_cast<StoreToCommWriter*>(data);
79 typedef UnaryMemFunT<StoreToCommWriter, StoreIOBuffer> MyDialer;
80 AsyncCall::Pointer call =
81 asyncCall(16, 5, "Mgr::StoreToCommWriter::noteStoreCopied",
82 MyDialer(writer, &StoreToCommWriter::noteStoreCopied, ioBuf));
83 ScheduleCallHere(call);
84 }
85
86 void
87 Mgr::StoreToCommWriter::noteStoreCopied(StoreIOBuffer ioBuf)
88 {
89 debugs(16, 6, HERE);
90 Must(!ioBuf.flags.error);
91 if (ioBuf.length > 0)
92 scheduleCommWrite(ioBuf); // write received action results to client
93 else
94 Must(doneAll()); // otherwise, why would Store call us with no data?
95 }
96
97 void
98 Mgr::StoreToCommWriter::scheduleCommWrite(const StoreIOBuffer& ioBuf)
99 {
100 debugs(16, 6, HERE);
101 Must(Comm::IsConnOpen(clientConnection));
102 Must(ioBuf.data != NULL);
103 // write filled buffer
104 typedef CommCbMemFunT<StoreToCommWriter, CommIoCbParams> MyDialer;
105 AsyncCall::Pointer writer =
106 asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommWrote",
107 MyDialer(this, &StoreToCommWriter::noteCommWrote));
108 Comm::Write(clientConnection, ioBuf.data, ioBuf.length, writer, NULL);
109 }
110
111 void
112 Mgr::StoreToCommWriter::noteCommWrote(const CommIoCbParams& params)
113 {
114 debugs(16, 6, HERE);
115 Must(params.flag == COMM_OK);
116 Must(clientConnection != NULL && params.fd == clientConnection->fd);
117 Must(params.size != 0);
118 writeOffset += params.size;
119 if (!doneAll())
120 scheduleStoreCopy(); // retrieve the next data portion
121 }
122
123 void
124 Mgr::StoreToCommWriter::noteCommClosed(const CommCloseCbParams& params)
125 {
126 debugs(16, 6, HERE);
127 Must(!Comm::IsConnOpen(clientConnection));
128 mustStop("commClosed");
129 }
130
131 void
132 Mgr::StoreToCommWriter::swanSong()
133 {
134 debugs(16, 6, HERE);
135 if (entry != NULL) {
136 if (sc != NULL) {
137 storeUnregister(sc, entry, this);
138 sc = NULL;
139 }
140 entry->unregisterAbort();
141 entry->unlock();
142 entry = NULL;
143 }
144 close();
145 }
146
147 bool
148 Mgr::StoreToCommWriter::doneAll() const
149 {
150 return entry &&
151 entry->store_status == STORE_OK && // the action is over
152 writeOffset >= entry->objectLen(); // we wrote all the results
153 }
154
155 void
156 Mgr::StoreToCommWriter::Abort(void* param)
157 {
158 StoreToCommWriter* mgrWriter = static_cast<StoreToCommWriter*>(param);
159 if (Comm::IsConnOpen(mgrWriter->clientConnection))
160 mgrWriter->clientConnection->close();
161 }