]>
Commit | Line | Data |
---|---|---|
f7f3304a | 1 | #include "squid.h" |
9a0a18de | 2 | #if USE_DELAY_POOLS |
ec41b64c AJ |
3 | #include "ClientInfo.h" |
4 | #endif | |
b0388924 | 5 | #include "comm/Connection.h" |
ec41b64c AJ |
6 | #include "comm/IoCallback.h" |
7 | #include "comm/Write.h" | |
8 | #include "fde.h" | |
582c2af2 | 9 | #include "globals.h" |
e4f1fdae | 10 | #include "StatCounters.h" |
ec41b64c AJ |
11 | #include "SquidTime.h" |
12 | #include "MemBuf.h" | |
582c2af2 FC |
13 | #include "profiler/Profiler.h" |
14 | #include "protos.h" | |
ec41b64c AJ |
15 | |
16 | void | |
b0388924 | 17 | Comm::Write(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback) |
ec41b64c | 18 | { |
b0388924 | 19 | Comm::Write(conn, mb->buf, mb->size, callback, mb->freeFunc()); |
ec41b64c AJ |
20 | } |
21 | ||
22 | void | |
b0388924 | 23 | Comm::Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func) |
ec41b64c | 24 | { |
b0388924 | 25 | debugs(5, 5, HERE << conn << ": sz " << size << ": asynCall " << callback); |
ec41b64c AJ |
26 | |
27 | /* Make sure we are open, not closing, and not writing */ | |
b0388924 AJ |
28 | assert(fd_table[conn->fd].flags.open); |
29 | assert(!fd_table[conn->fd].closing()); | |
30 | Comm::IoCallback *ccb = COMMIO_FD_WRITECB(conn->fd); | |
ec41b64c AJ |
31 | assert(!ccb->active()); |
32 | ||
b0388924 AJ |
33 | fd_table[conn->fd].writeStart = squid_curtime; |
34 | ccb->conn = conn; | |
ec41b64c AJ |
35 | /* Queue the write */ |
36 | ccb->setCallback(IOCB_WRITE, callback, (char *)buf, free_func, size); | |
37 | ccb->selectOrQueueWrite(); | |
38 | } | |
39 | ||
40 | /** Write to FD. | |
41 | * This function is used by the lowest level of IO loop which only has access to FD numbers. | |
b0388924 | 42 | * We have to use the comm iocb_table to map FD numbers to waiting data and Comm::Connections. |
ec41b64c AJ |
43 | * Once the write has been concluded we schedule the waiting call with success/fail results. |
44 | */ | |
45 | void | |
46 | Comm::HandleWrite(int fd, void *data) | |
47 | { | |
48 | Comm::IoCallback *state = static_cast<Comm::IoCallback *>(data); | |
49 | int len = 0; | |
50 | int nleft; | |
51 | ||
b0388924 | 52 | assert(state->conn != NULL && state->conn->fd == fd); |
ec41b64c AJ |
53 | |
54 | PROF_start(commHandleWrite); | |
b0388924 | 55 | debugs(5, 5, HERE << state->conn << ": off " << |
ec41b64c AJ |
56 | (long int) state->offset << ", sz " << (long int) state->size << "."); |
57 | ||
58 | nleft = state->size - state->offset; | |
59 | ||
9a0a18de | 60 | #if USE_DELAY_POOLS |
ec41b64c AJ |
61 | ClientInfo * clientInfo=fd_table[fd].clientInfo; |
62 | ||
63 | if (clientInfo && !clientInfo->writeLimitingActive) | |
64 | clientInfo = NULL; // we only care about quota limits here | |
65 | ||
66 | if (clientInfo) { | |
67 | assert(clientInfo->selectWaiting); | |
68 | clientInfo->selectWaiting = false; | |
69 | ||
70 | assert(clientInfo->hasQueue()); | |
71 | assert(clientInfo->quotaPeekFd() == fd); | |
72 | clientInfo->quotaDequeue(); // we will write or requeue below | |
73 | ||
f31be4b0 | 74 | if (nleft > 0) { |
ec41b64c AJ |
75 | const int quota = clientInfo->quotaForDequed(); |
76 | if (!quota) { // if no write quota left, queue this fd | |
77 | state->quotaQueueReserv = clientInfo->quotaEnqueue(fd); | |
78 | clientInfo->kickQuotaQueue(); | |
79 | PROF_stop(commHandleWrite); | |
80 | return; | |
81 | } | |
82 | ||
83 | const int nleft_corrected = min(nleft, quota); | |
84 | if (nleft != nleft_corrected) { | |
b0388924 | 85 | debugs(5, 5, HERE << state->conn << " writes only " << |
ec41b64c AJ |
86 | nleft_corrected << " out of " << nleft); |
87 | nleft = nleft_corrected; | |
88 | } | |
89 | ||
90 | } | |
91 | } | |
9a0a18de | 92 | #endif /* USE_DELAY_POOLS */ |
ec41b64c AJ |
93 | |
94 | /* actually WRITE data */ | |
95 | len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft); | |
96 | debugs(5, 5, HERE << "write() returns " << len); | |
97 | ||
9a0a18de | 98 | #if USE_DELAY_POOLS |
ec41b64c AJ |
99 | if (clientInfo) { |
100 | if (len > 0) { | |
101 | /* we wrote data - drain them from bucket */ | |
102 | clientInfo->bucketSize -= len; | |
103 | if (clientInfo->bucketSize < 0.0) { | |
e0236918 | 104 | debugs(5, DBG_IMPORTANT, HERE << "drained too much"); // should not happen |
ec41b64c AJ |
105 | clientInfo->bucketSize = 0; |
106 | } | |
107 | } | |
108 | ||
109 | // even if we wrote nothing, we were served; give others a chance | |
110 | clientInfo->kickQuotaQueue(); | |
111 | } | |
9a0a18de | 112 | #endif /* USE_DELAY_POOLS */ |
ec41b64c AJ |
113 | |
114 | fd_bytes(fd, len, FD_WRITE); | |
e4f1fdae | 115 | ++statCounter.syscalls.sock.writes; |
ec41b64c AJ |
116 | // After each successful partial write, |
117 | // reset fde::writeStart to the current time. | |
118 | fd_table[fd].writeStart = squid_curtime; | |
119 | ||
120 | if (len == 0) { | |
121 | /* Note we even call write if nleft == 0 */ | |
122 | /* We're done */ | |
123 | if (nleft != 0) | |
124 | debugs(5, DBG_IMPORTANT, "FD " << fd << " write failure: connection closed with " << nleft << " bytes remaining."); | |
125 | ||
126 | state->finish(nleft ? COMM_ERROR : COMM_OK, errno); | |
127 | } else if (len < 0) { | |
128 | /* An error */ | |
129 | if (fd_table[fd].flags.socket_eof) { | |
130 | debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); | |
131 | state->finish(nleft ? COMM_ERROR : COMM_OK, errno); | |
132 | } else if (ignoreErrno(errno)) { | |
133 | debugs(50, 9, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); | |
134 | state->selectOrQueueWrite(); | |
135 | } else { | |
136 | debugs(50, 2, HERE << "FD " << fd << " write failure: " << xstrerror() << "."); | |
137 | state->finish(nleft ? COMM_ERROR : COMM_OK, errno); | |
138 | } | |
139 | } else { | |
140 | /* A successful write, continue */ | |
141 | state->offset += len; | |
142 | ||
143 | if (state->offset < state->size) { | |
144 | /* Not done, reinstall the write handler and write some more */ | |
145 | state->selectOrQueueWrite(); | |
146 | } else { | |
147 | state->finish(nleft ? COMM_OK : COMM_ERROR, errno); | |
148 | } | |
149 | } | |
150 | ||
151 | PROF_stop(commHandleWrite); | |
152 | } |