]> git.ipfire.org Git - thirdparty/squid.git/blob - src/comm/Write.cc
Policy: use USE_* from code wrappers and ENABLE_* for conditionals.
[thirdparty/squid.git] / src / comm / Write.cc
1 #include "config.h"
2 #if USE_DELAY_POOLS
3 #include "ClientInfo.h"
4 #endif
5 #include "comm/IoCallback.h"
6 #include "comm/Write.h"
7 #include "fde.h"
8 #include "SquidTime.h"
9 #include "MemBuf.h"
10
11 void
12 Comm::Write(int fd, MemBuf *mb, AsyncCall::Pointer &callback)
13 {
14 Comm::Write(fd, mb->buf, mb->size, callback, mb->freeFunc());
15 }
16
17 void
18 Comm::Write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
19 {
20 debugs(5, 5, HERE << "FD " << fd << ": sz " << size << ": asynCall " << callback);
21
22 /* Make sure we are open, not closing, and not writing */
23 assert(fd_table[fd].flags.open);
24 assert(!fd_table[fd].closing());
25 Comm::IoCallback *ccb = COMMIO_FD_WRITECB(fd);
26 assert(!ccb->active());
27
28 fd_table[fd].writeStart = squid_curtime;
29 /* Queue the write */
30 ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size);
31 ccb->selectOrQueueWrite();
32 }
33
34 /** Write to FD.
35 * This function is used by the lowest level of IO loop which only has access to FD numbers.
36 * We have to use the comm iocb_table to map FD numbers to waiting data.
37 * Once the write has been concluded we schedule the waiting call with success/fail results.
38 */
39 void
40 Comm::HandleWrite(int fd, void *data)
41 {
42 Comm::IoCallback *state = static_cast<Comm::IoCallback *>(data);
43 int len = 0;
44 int nleft;
45
46 assert(state->fd == fd);
47
48 PROF_start(commHandleWrite);
49 debugs(5, 5, HERE << "FD " << state->fd << ": off " <<
50 (long int) state->offset << ", sz " << (long int) state->size << ".");
51
52 nleft = state->size - state->offset;
53
54 #if USE_DELAY_POOLS
55 ClientInfo * clientInfo=fd_table[fd].clientInfo;
56
57 if (clientInfo && !clientInfo->writeLimitingActive)
58 clientInfo = NULL; // we only care about quota limits here
59
60 if (clientInfo) {
61 assert(clientInfo->selectWaiting);
62 clientInfo->selectWaiting = false;
63
64 assert(clientInfo->hasQueue());
65 assert(clientInfo->quotaPeekFd() == fd);
66 clientInfo->quotaDequeue(); // we will write or requeue below
67
68 if (nleft > 0) {
69 const int quota = clientInfo->quotaForDequed();
70 if (!quota) { // if no write quota left, queue this fd
71 state->quotaQueueReserv = clientInfo->quotaEnqueue(fd);
72 clientInfo->kickQuotaQueue();
73 PROF_stop(commHandleWrite);
74 return;
75 }
76
77 const int nleft_corrected = min(nleft, quota);
78 if (nleft != nleft_corrected) {
79 debugs(5, 5, HERE << "FD " << fd << " writes only " <<
80 nleft_corrected << " out of " << nleft);
81 nleft = nleft_corrected;
82 }
83
84 }
85 }
86 #endif /* USE_DELAY_POOLS */
87
88 /* actually WRITE data */
89 len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
90 debugs(5, 5, HERE << "write() returns " << len);
91
92 #if USE_DELAY_POOLS
93 if (clientInfo) {
94 if (len > 0) {
95 /* we wrote data - drain them from bucket */
96 clientInfo->bucketSize -= len;
97 if (clientInfo->bucketSize < 0.0) {
98 debugs(5,1, HERE << "drained too much"); // should not happen
99 clientInfo->bucketSize = 0;
100 }
101 }
102
103 // even if we wrote nothing, we were served; give others a chance
104 clientInfo->kickQuotaQueue();
105 }
106 #endif /* USE_DELAY_POOLS */
107
108 fd_bytes(fd, len, FD_WRITE);
109 statCounter.syscalls.sock.writes++;
110 // After each successful partial write,
111 // reset fde::writeStart to the current time.
112 fd_table[fd].writeStart = squid_curtime;
113
114 if (len == 0) {
115 /* Note we even call write if nleft == 0 */
116 /* We're done */
117 if (nleft != 0)
118 debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure: connection closed with " << nleft << " bytes remaining.");
119
120 state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
121 } else if (len < 0) {
122 /* An error */
123 if (fd_table[fd].flags.socket_eof) {
124 debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
125 state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
126 } else if (ignoreErrno(errno)) {
127 debugs(50, 9, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
128 state->selectOrQueueWrite();
129 } else {
130 debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << ".");
131 state->finish(nleft ? COMM_ERROR : COMM_OK, errno);
132 }
133 } else {
134 /* A successful write, continue */
135 state->offset += len;
136
137 if (state->offset < state->size) {
138 /* Not done, reinstall the write handler and write some more */
139 state->selectOrQueueWrite();
140 } else {
141 state->finish(nleft ? COMM_OK : COMM_ERROR, errno);
142 }
143 }
144
145 PROF_stop(commHandleWrite);
146 }