]> git.ipfire.org Git - thirdparty/squid.git/blob - src/mgr/StoreToCommWriter.cc
5764ec7e53a0eb8e59d252ae9d0603be01422c14
[thirdparty/squid.git] / src / mgr / StoreToCommWriter.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 16 Cache Manager API
5 *
6 */
7
8 #include "squid.h"
9 #include "base/TextException.h"
10 #include "comm/Connection.h"
11 #include "CommCalls.h"
12 #include "comm/Write.h"
13 #include "ipc/FdNotes.h"
14 #include "mgr/StoreToCommWriter.h"
15 #include "StoreClient.h"
16 #include "Store.h"
17
18
19 CBDATA_NAMESPACED_CLASS_INIT(Mgr, StoreToCommWriter);
20
21
22 Mgr::StoreToCommWriter::StoreToCommWriter(const Comm::ConnectionPointer &conn, StoreEntry* anEntry):
23 AsyncJob("Mgr::StoreToCommWriter"),
24 clientConnection(conn), entry(anEntry), sc(NULL), writeOffset(0), closer(NULL)
25 {
26 debugs(16, 6, HERE << clientConnection);
27 closer = asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommClosed",
28 CommCbMemFunT<StoreToCommWriter, CommCloseCbParams>(this, &StoreToCommWriter::noteCommClosed));
29 comm_add_close_handler(clientConnection->fd, closer);
30 }
31
32 Mgr::StoreToCommWriter::~StoreToCommWriter()
33 {
34 debugs(16, 6, HERE);
35 assert(!entry);
36 assert(!sc);
37 close();
38 }
39
40 /// closes our copy of the client HTTP connection socket
41 void
42 Mgr::StoreToCommWriter::close()
43 {
44 if (Comm::IsConnOpen(clientConnection)) {
45 if (closer != NULL) {
46 comm_remove_close_handler(clientConnection->fd, closer);
47 closer = NULL;
48 }
49 clientConnection->close();
50 }
51 }
52
53 void
54 Mgr::StoreToCommWriter::start()
55 {
56 debugs(16, 6, HERE);
57 Must(Comm::IsConnOpen(clientConnection));
58 Must(entry != NULL);
59 entry->registerAbort(&StoreToCommWriter::Abort, this);
60 sc = storeClientListAdd(entry, this);
61 Must(sc != NULL);
62
63 // initiate the receive-from-store, write-to-comm sequence
64 scheduleStoreCopy();
65 }
66
67 void
68 Mgr::StoreToCommWriter::scheduleStoreCopy()
69 {
70 debugs(16, 6, HERE);
71 Must(entry != NULL);
72 Must(sc != NULL);
73 StoreIOBuffer readBuf(sizeof(buffer), writeOffset, buffer);
74 storeClientCopy(sc, entry, readBuf, &NoteStoreCopied, this);
75 }
76
77 void
78 Mgr::StoreToCommWriter::NoteStoreCopied(void* data, StoreIOBuffer ioBuf)
79 {
80 Must(data != NULL);
81 // make sync Store call async to get async call protections and features
82 StoreToCommWriter* writer = static_cast<StoreToCommWriter*>(data);
83 typedef UnaryMemFunT<StoreToCommWriter, StoreIOBuffer> MyDialer;
84 AsyncCall::Pointer call =
85 asyncCall(16, 5, "Mgr::StoreToCommWriter::noteStoreCopied",
86 MyDialer(writer, &StoreToCommWriter::noteStoreCopied, ioBuf));
87 ScheduleCallHere(call);
88 }
89
90 void
91 Mgr::StoreToCommWriter::noteStoreCopied(StoreIOBuffer ioBuf)
92 {
93 debugs(16, 6, HERE);
94 Must(!ioBuf.flags.error);
95 if (ioBuf.length > 0)
96 scheduleCommWrite(ioBuf); // write received action results to client
97 else
98 Must(doneAll()); // otherwise, why would Store call us with no data?
99 }
100
101 void
102 Mgr::StoreToCommWriter::scheduleCommWrite(const StoreIOBuffer& ioBuf)
103 {
104 debugs(16, 6, HERE);
105 Must(Comm::IsConnOpen(clientConnection));
106 Must(ioBuf.data != NULL);
107 // write filled buffer
108 typedef CommCbMemFunT<StoreToCommWriter, CommIoCbParams> MyDialer;
109 AsyncCall::Pointer writer =
110 asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommWrote",
111 MyDialer(this, &StoreToCommWriter::noteCommWrote));
112 Comm::Write(clientConnection, ioBuf.data, ioBuf.length, writer, NULL);
113 }
114
115 void
116 Mgr::StoreToCommWriter::noteCommWrote(const CommIoCbParams& params)
117 {
118 debugs(16, 6, HERE);
119 Must(params.flag == COMM_OK);
120 Must(clientConnection != NULL && params.fd == clientConnection->fd);
121 Must(params.size != 0);
122 writeOffset += params.size;
123 if (!doneAll())
124 scheduleStoreCopy(); // retrieve the next data portion
125 }
126
127 void
128 Mgr::StoreToCommWriter::noteCommClosed(const CommCloseCbParams& params)
129 {
130 debugs(16, 6, HERE);
131 Must(!Comm::IsConnOpen(clientConnection));
132 mustStop("commClosed");
133 }
134
135 void
136 Mgr::StoreToCommWriter::swanSong()
137 {
138 debugs(16, 6, HERE);
139 if (entry != NULL) {
140 if (sc != NULL) {
141 storeUnregister(sc, entry, this);
142 sc = NULL;
143 }
144 entry->unregisterAbort();
145 entry->unlock();
146 entry = NULL;
147 }
148 close();
149 }
150
151 bool
152 Mgr::StoreToCommWriter::doneAll() const
153 {
154 return entry &&
155 entry->store_status == STORE_OK && // the action is over
156 writeOffset >= entry->objectLen(); // we wrote all the results
157 }
158
159 void
160 Mgr::StoreToCommWriter::Abort(void* param)
161 {
162 StoreToCommWriter* mgrWriter = static_cast<StoreToCommWriter*>(param);
163 if (Comm::IsConnOpen(mgrWriter->clientConnection))
164 mgrWriter->clientConnection->close();
165 }