]> git.ipfire.org Git - thirdparty/squid.git/blob - src/comm/Read.cc
Update the Comm:: API for read(2)
[thirdparty/squid.git] / src / comm / Read.cc
1 /*
2 * DEBUG: section 05 Socket Functions
3 */
4 #include "squid.h"
5 #include "comm.h"
6 #include "comm_internal.h"
7 #include "CommCalls.h"
8 #include "comm/IoCallback.h"
9 #include "comm/Loops.h"
10 #include "comm/Read.h"
11 #include "Debug.h"
12 #include "fd.h"
13 #include "fde.h"
14 #include "SBuf.h"
15 #include "StatCounters.h"
16 //#include "tools.h"
17
18 // Does comm check this fd for read readiness?
19 // Note that when comm is not monitoring, there can be a pending callback
20 // call, which may resume comm monitoring once fired.
21 bool
22 Comm::MonitorsRead(int fd)
23 {
24 assert(isOpen(fd) && COMMIO_FD_READCB(fd));
25 // Being active is usually the same as monitoring because we always
26 // start monitoring the FD when we configure Comm::IoCallback for I/O
27 // and we usually configure Comm::IoCallback for I/O when we starting
28 // monitoring a FD for reading.
29 return COMMIO_FD_READCB(fd)->active();
30 }
31
32 void
33 Comm::Read(const Comm::ConnectionPointer &conn, AsyncCall::Pointer &callback)
34 {
35 // TODO: move comm_read_base() internals into here
36 // when comm_read() char* API is no longer needed
37 comm_read_base(conn, NULL, 0, callback);
38 }
39
40 /**
41 * Queue a read.
42 * If a buffer is given the callback is scheduled when the read
43 * completes, on error, or on file descriptor close.
44 *
45 * If no buffer (NULL) is given the callback is scheduled when
46 * the socket FD is ready for a read(2)/recv(2).
47 */
48 void
49 comm_read_base(const Comm::ConnectionPointer &conn, char *buf, int size, AsyncCall::Pointer &callback)
50 {
51 debugs(5, 5, "comm_read, queueing read for " << conn << "; asynCall " << callback);
52
53 /* Make sure we are open and not closing */
54 assert(Comm::IsConnOpen(conn));
55 assert(!fd_table[conn->fd].closing());
56 Comm::IoCallback *ccb = COMMIO_FD_READCB(conn->fd);
57
58 // Make sure we are either not reading or just passively monitoring.
59 // Active/passive conflicts are OK and simply cancel passive monitoring.
60 if (ccb->active()) {
61 // if the assertion below fails, we have an active comm_read conflict
62 assert(fd_table[conn->fd].halfClosedReader != NULL);
63 commStopHalfClosedMonitor(conn->fd);
64 assert(!ccb->active());
65 }
66 ccb->conn = conn;
67
68 /* Queue the read */
69 ccb->setCallback(Comm::IOCB_READ, callback, (char *)buf, NULL, size);
70 Comm::SetSelect(conn->fd, COMM_SELECT_READ, Comm::HandleRead, ccb, 0);
71 }
72
73 comm_err_t
74 Comm::ReadNow(CommIoCbParams &params, SBuf &buf)
75 {
76 /* Attempt a read */
77 ++ statCounter.syscalls.sock.reads;
78 const SBuf::size_type sz = buf.spaceSize();
79 char *theBuf = buf.rawSpace(sz);
80 errno = 0;
81 const int retval = FD_READ_METHOD(params.conn->fd, theBuf, sz);
82 params.xerrno = errno;
83
84 debugs(5, 3, params.conn << ", size " << sz << ", retval " << retval << ", errno " << params.xerrno);
85
86 if (retval > 0) { // data read most common case
87 buf.append(theBuf, retval);
88 fd_bytes(params.conn->fd, retval, FD_READ);
89 params.flag = COMM_OK;
90 params.size = retval;
91
92 } else if (retval == 0) { // remote closure (somewhat less) common
93 // Note - read 0 == socket EOF, which is a valid read.
94 params.flag = COMM_EOF;
95
96 } else if (retval < 0) { // connection errors are worst-case
97 debugs(5, 3, params.conn << " COMM_ERROR: " << xstrerr(params.xerrno));
98 if (ignoreErrno(params.xerrno))
99 params.flag = COMM_INPROGRESS;
100 else
101 params.flag = COMM_ERROR;
102 }
103
104 return params.flag;
105 }
106
107 /**
108 * Handle an FD which is ready for read(2).
109 *
110 * If there is no provided buffer to fill call the callback.
111 *
112 * Otherwise attempt a read into the provided buffer.
113 * If the read attempt succeeds or fails, call the callback.
114 * Else, wait for another IO notification.
115 */
116 void
117 Comm::HandleRead(int fd, void *data)
118 {
119 Comm::IoCallback *ccb = (Comm::IoCallback *) data;
120
121 assert(data == COMMIO_FD_READCB(fd));
122 assert(ccb->active());
123
124 // without a buffer, just call back
125 if (!ccb->buf) {
126 ccb->finish(COMM_OK, 0);
127 return;
128 }
129
130 /* For legacy callers : Attempt a read */
131 // Keep in sync with Comm::ReadNow()!
132 ++ statCounter.syscalls.sock.reads;
133 errno = 0;
134 int retval = FD_READ_METHOD(fd, ccb->buf, ccb->size);
135 debugs(5, 3, "FD " << fd << ", size " << ccb->size << ", retval " << retval << ", errno " << errno);
136
137 /* See if we read anything */
138 /* Note - read 0 == socket EOF, which is a valid read */
139 if (retval >= 0) {
140 fd_bytes(fd, retval, FD_READ);
141 ccb->offset = retval;
142 ccb->finish(COMM_OK, errno);
143 return;
144
145 } else if (retval < 0 && !ignoreErrno(errno)) {
146 debugs(5, 3, "comm_read_try: scheduling COMM_ERROR");
147 ccb->offset = 0;
148 ccb->finish(COMM_ERROR, errno);
149 return;
150 };
151
152
153 /* Nope, register for some more IO */
154 Comm::SetSelect(fd, COMM_SELECT_READ, Comm::HandleRead, data, 0);
155 }
156
157 /**
158 * Cancel a pending read. Assert that we have the right parameters,
159 * and that there are no pending read events!
160 *
161 * XXX: We do not assert that there are no pending read events and
162 * with async calls it becomes even more difficult.
163 * The whole interface should be reworked to do callback->cancel()
164 * instead of searching for places where the callback may be stored and
165 * updating the state of those places.
166 *
167 * AHC Don't call the comm handlers?
168 */
169 void
170 comm_read_cancel(int fd, IOCB *callback, void *data)
171 {
172 if (!isOpen(fd)) {
173 debugs(5, 4, "fails: FD " << fd << " closed");
174 return;
175 }
176
177 Comm::IoCallback *cb = COMMIO_FD_READCB(fd);
178 // TODO: is "active" == "monitors FD"?
179 if (!cb->active()) {
180 debugs(5, 4, "fails: FD " << fd << " inactive");
181 return;
182 }
183
184 typedef CommCbFunPtrCallT<CommIoCbPtrFun> Call;
185 Call *call = dynamic_cast<Call*>(cb->callback.getRaw());
186 if (!call) {
187 debugs(5, 4, "fails: FD " << fd << " lacks callback");
188 return;
189 }
190
191 call->cancel("old comm_read_cancel");
192
193 typedef CommIoCbParams Params;
194 const Params &params = GetCommParams<Params>(cb->callback);
195
196 /* Ok, we can be reasonably sure we won't lose any data here! */
197 assert(call->dialer.handler == callback);
198 assert(params.data == data);
199
200 /* Delete the callback */
201 cb->cancel("old comm_read_cancel");
202
203 /* And the IO event */
204 Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
205 }
206
207 void
208 Comm::ReadCancel(int fd, AsyncCall::Pointer &callback)
209 {
210 callback->cancel("comm_read_cancel");
211
212 if (!isOpen(fd)) {
213 debugs(5, 4, "fails: FD " << fd << " closed");
214 return;
215 }
216
217 Comm::IoCallback *cb = COMMIO_FD_READCB(fd);
218
219 if (!cb->active()) {
220 debugs(5, 4, "fails: FD " << fd << " inactive");
221 return;
222 }
223
224 AsyncCall::Pointer call = cb->callback;
225
226 /* Ok, we can be reasonably sure we won't lose any data here! */
227 assert(call == callback);
228
229 /* Delete the callback */
230 cb->cancel("comm_read_cancel");
231
232 /* And the IO event */
233 Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
234 }