]> git.ipfire.org Git - thirdparty/squid.git/blob - src/mgr/StoreToCommWriter.cc
merge from trunk
[thirdparty/squid.git] / src / mgr / StoreToCommWriter.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 16 Cache Manager API
5 *
6 */
7
8 #include "config.h"
9 #include "base/TextException.h"
10 #include "CommCalls.h"
11 #include "comm/Write.h"
12 #include "ipc/FdNotes.h"
13 #include "mgr/StoreToCommWriter.h"
14 #include "StoreClient.h"
15 #include "Store.h"
16
17
18 CBDATA_NAMESPACED_CLASS_INIT(Mgr, StoreToCommWriter);
19
20
21 Mgr::StoreToCommWriter::StoreToCommWriter(int aFd, StoreEntry* anEntry):
22 AsyncJob("Mgr::StoreToCommWriter"),
23 fd(aFd), entry(anEntry), sc(NULL), writeOffset(0), closer(NULL)
24 {
25 debugs(16, 6, HERE << "FD " << fd);
26 closer = asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommClosed",
27 CommCbMemFunT<StoreToCommWriter, CommCloseCbParams>(this, &StoreToCommWriter::noteCommClosed));
28 comm_add_close_handler(fd, closer);
29 }
30
31 Mgr::StoreToCommWriter::~StoreToCommWriter()
32 {
33 debugs(16, 6, HERE);
34 assert(!entry);
35 assert(!sc);
36 close();
37 }
38
39 /// closes our copy of the client HTTP connection socket
40 void
41 Mgr::StoreToCommWriter::close()
42 {
43 if (fd >= 0) {
44 if (closer != NULL) {
45 comm_remove_close_handler(fd, closer);
46 closer = NULL;
47 }
48 comm_close(fd);
49 fd = -1;
50 }
51 }
52
53 void
54 Mgr::StoreToCommWriter::start()
55 {
56 debugs(16, 6, HERE);
57 Must(fd >= 0);
58 Must(entry != NULL);
59 entry->registerAbort(&StoreToCommWriter::Abort, this);
60 sc = storeClientListAdd(entry, this);
61 Must(sc != NULL);
62
63 // initiate the receive-from-store, write-to-comm sequence
64 scheduleStoreCopy();
65 }
66
67 void
68 Mgr::StoreToCommWriter::scheduleStoreCopy()
69 {
70 debugs(16, 6, HERE);
71 Must(entry != NULL);
72 Must(sc != NULL);
73 StoreIOBuffer readBuf(sizeof(buffer), writeOffset, buffer);
74 storeClientCopy(sc, entry, readBuf, &NoteStoreCopied, this);
75 }
76
77 void
78 Mgr::StoreToCommWriter::NoteStoreCopied(void* data, StoreIOBuffer ioBuf)
79 {
80 Must(data != NULL);
81 // make sync Store call async to get async call protections and features
82 StoreToCommWriter* writer = static_cast<StoreToCommWriter*>(data);
83 typedef UnaryMemFunT<StoreToCommWriter, StoreIOBuffer> MyDialer;
84 AsyncCall::Pointer call =
85 asyncCall(16, 5, "Mgr::StoreToCommWriter::noteStoreCopied",
86 MyDialer(writer, &StoreToCommWriter::noteStoreCopied, ioBuf));
87 ScheduleCallHere(call);
88 }
89
90 void
91 Mgr::StoreToCommWriter::noteStoreCopied(StoreIOBuffer ioBuf)
92 {
93 debugs(16, 6, HERE);
94 Must(!ioBuf.flags.error);
95 if (ioBuf.length > 0)
96 scheduleCommWrite(ioBuf); // write received action results to client
97 else
98 Must(doneAll()); // otherwise, why would Store call us with no data?
99 }
100
101 void
102 Mgr::StoreToCommWriter::scheduleCommWrite(const StoreIOBuffer& ioBuf)
103 {
104 debugs(16, 6, HERE);
105 Must(fd >= 0);
106 Must(ioBuf.data != NULL);
107 // write filled buffer
108 typedef CommCbMemFunT<StoreToCommWriter, CommIoCbParams> MyDialer;
109 AsyncCall::Pointer writer =
110 asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommWrote",
111 MyDialer(this, &StoreToCommWriter::noteCommWrote));
112 Comm::Write(fd, ioBuf.data, ioBuf.length, writer, NULL);
113 }
114
115 void
116 Mgr::StoreToCommWriter::noteCommWrote(const CommIoCbParams& params)
117 {
118 debugs(16, 6, HERE);
119 Must(params.flag == COMM_OK);
120 Must(params.fd == fd);
121 Must(params.size != 0);
122 writeOffset += params.size;
123 if (!doneAll())
124 scheduleStoreCopy(); // retrieve the next data portion
125 }
126
127 void
128 Mgr::StoreToCommWriter::noteCommClosed(const CommCloseCbParams& params)
129 {
130 debugs(16, 6, HERE);
131 Must(fd == params.fd);
132 fd = -1;
133 mustStop("commClosed");
134 }
135
136 void
137 Mgr::StoreToCommWriter::swanSong()
138 {
139 debugs(16, 6, HERE);
140 if (entry != NULL) {
141 if (sc != NULL) {
142 storeUnregister(sc, entry, this);
143 sc = NULL;
144 }
145 entry->unregisterAbort();
146 entry->unlock();
147 entry = NULL;
148 }
149 close();
150 }
151
152 bool
153 Mgr::StoreToCommWriter::doneAll() const
154 {
155 return entry &&
156 entry->store_status == STORE_OK && // the action is over
157 writeOffset >= entry->objectLen(); // we wrote all the results
158 }
159
160 void
161 Mgr::StoreToCommWriter::Abort(void* param)
162 {
163 StoreToCommWriter* mgrWriter = static_cast<StoreToCommWriter*>(param);
164 if (mgrWriter->fd >= 0)
165 comm_close(mgrWriter->fd);
166 }