]>
Commit | Line | Data |
---|---|---|
bbc27441 AJ |
1 | /* |
2 | * Copyright (C) 1996-2014 The Squid Software Foundation and contributors | |
3 | * | |
4 | * Squid software is distributed under GPLv2+ license and includes | |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
7 | */ | |
8 | ||
fb0c2f17 NH |
9 | #include "squid.h" |
10 | #include "comm.h" | |
fb0c2f17 | 11 | #include "comm/Connection.h" |
602d9612 | 12 | #include "comm/ConnOpener.h" |
fb0c2f17 NH |
13 | #include "comm/Loops.h" |
14 | #include "comm/Write.h" | |
15 | #include "fde.h" | |
16 | #include "globals.h" // for shutting_down | |
17 | #include "log/CustomLog.h" | |
18 | #include "log/File.h" | |
19 | #include "log/TcpLogger.h" | |
20 | #include "MemBlob.h" | |
21 | #include "Parsing.h" | |
22 | #include "SquidConfig.h" | |
23 | #include "SquidTime.h" | |
24 | ||
25 | // a single I/O buffer should be large enough to store any access.log record | |
26 | const size_t Log::TcpLogger::IoBufSize = 2*MAX_URL; | |
27 | ||
28 | // We need at least two buffers because when we write the first buffer, | |
29 | // we have to use the second buffer to accumulate new entries. | |
30 | const size_t Log::TcpLogger::BufferCapacityMin = 2*Log::TcpLogger::IoBufSize; | |
31 | ||
32 | #define MY_DEBUG_SECTION 50 /* Log file handling */ | |
33 | ||
34 | CBDATA_NAMESPACED_CLASS_INIT(Log, TcpLogger); | |
35 | ||
36 | Log::TcpLogger::TcpLogger(size_t bufCap, bool dieOnErr, Ip::Address them): | |
b4bb4694 A |
37 | AsyncJob("TcpLogger"), |
38 | dieOnError(dieOnErr), | |
39 | bufferCapacity(bufCap), | |
40 | bufferedSize(0), | |
41 | flushDebt(0), | |
42 | quitOnEmpty(false), | |
43 | reconnectScheduled(false), | |
44 | writeScheduled(false), | |
45 | conn(NULL), | |
46 | remote(them), | |
47 | connectFailures(0), | |
48 | drops(0) | |
fb0c2f17 NH |
49 | { |
50 | if (bufferCapacity < BufferCapacityMin) { | |
51 | debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, | |
52 | "WARNING: tcp:" << remote << " logger configured buffer " << | |
53 | "size " << bufferCapacity << " is smaller than the " << | |
54 | BufferCapacityMin << "-byte" << " minimum. " << | |
55 | "Using the minimum instead."); | |
56 | bufferCapacity = BufferCapacityMin; | |
57 | } | |
58 | } | |
59 | ||
60 | Log::TcpLogger::~TcpLogger() | |
61 | { | |
62 | // make sure Comm::Write does not have our buffer pointer | |
63 | assert(!writeScheduled); | |
64 | } | |
65 | ||
66 | void | |
67 | Log::TcpLogger::start() | |
68 | { | |
d4c669ca | 69 | doConnect(); |
fb0c2f17 NH |
70 | } |
71 | ||
72 | bool | |
73 | Log::TcpLogger::doneAll() const | |
74 | { | |
75 | debugs(MY_DEBUG_SECTION, 5, "quitOnEmpty: " << quitOnEmpty << | |
76 | " buffered: " << bufferedSize << | |
77 | " conn: " << conn << ' ' << connectFailures); | |
78 | ||
79 | // we do not quit unless we are told that we may | |
80 | if (!quitOnEmpty) | |
81 | return false; | |
82 | ||
83 | /* We were asked to quit after we are done writing buffers. Are we done? */ | |
84 | ||
85 | // If we have records but are failing to connect, quit. Otherwise, we may | |
86 | // be trying to connect forever due to a [since fixed] misconfiguration! | |
87 | const bool failingToConnect = !conn && connectFailures; | |
88 | if (bufferedSize && !failingToConnect) | |
89 | return false; | |
90 | ||
91 | return AsyncJob::doneAll(); | |
92 | } | |
93 | ||
94 | void | |
95 | Log::TcpLogger::swanSong() | |
96 | { | |
97 | disconnect(); // optional: refcounting should close/delete conn eventually | |
98 | AsyncJob::swanSong(); | |
99 | } | |
100 | ||
101 | void | |
102 | Log::TcpLogger::endGracefully() | |
103 | { | |
104 | // job call protection must end our job if we are done logging current bufs | |
105 | assert(inCall != NULL); | |
106 | quitOnEmpty = true; | |
107 | flush(); | |
108 | } | |
109 | ||
110 | void | |
111 | Log::TcpLogger::flush() | |
112 | { | |
113 | flushDebt = bufferedSize; | |
114 | writeIfNeeded(); | |
115 | } | |
116 | ||
117 | void | |
118 | Log::TcpLogger::logRecord(const char *buf, const size_t len) | |
119 | { | |
120 | appendRecord(buf, len); | |
121 | writeIfNeeded(); | |
122 | } | |
123 | ||
124 | /// starts writing if and only if it is time to write accumulated records | |
125 | void | |
126 | Log::TcpLogger::writeIfNeeded() | |
127 | { | |
128 | // write if an earlier flush command forces us to write or | |
129 | // if we have filled at least one I/O buffer | |
130 | if (flushDebt > 0 || buffers.size() > 1) | |
131 | writeIfPossible(); | |
132 | } | |
133 | ||
134 | /// starts writing if possible | |
135 | void Log::TcpLogger::writeIfPossible() | |
136 | { | |
137 | debugs(MY_DEBUG_SECTION, 7, "guards: " << (!writeScheduled) << | |
138 | (bufferedSize > 0) << (conn != NULL) << | |
139 | (conn != NULL && !fd_table[conn->fd].closing()) << " buffered: " << | |
140 | bufferedSize << '/' << buffers.size()); | |
141 | ||
142 | // XXX: Squid shutdown sequence starts closing our connection before | |
143 | // calling LogfileClose, leading to loss of log records during shutdown. | |
144 | if (!writeScheduled && bufferedSize > 0 && conn != NULL && | |
b4bb4694 | 145 | !fd_table[conn->fd].closing()) { |
fb0c2f17 NH |
146 | debugs(MY_DEBUG_SECTION, 5, "writing first buffer"); |
147 | ||
148 | typedef CommCbMemFunT<TcpLogger, CommIoCbParams> WriteDialer; | |
149 | AsyncCall::Pointer callback = JobCallback(MY_DEBUG_SECTION, 5, WriteDialer, this, Log::TcpLogger::writeDone); | |
150 | const MemBlob::Pointer &buffer = buffers.front(); | |
151 | Comm::Write(conn, buffer->mem, buffer->size, callback, NULL); | |
152 | writeScheduled = true; | |
153 | } | |
154 | } | |
155 | ||
156 | /// whether len more bytes can be buffered | |
157 | bool | |
158 | Log::TcpLogger::canFit(const size_t len) const | |
159 | { | |
160 | // TODO: limit reporting frequency in addition to reporting only changes | |
161 | ||
162 | if (bufferedSize+len <= bufferCapacity) { | |
163 | if (drops) { | |
164 | // We can get here if a shorter record accidentally fits after we | |
165 | // started dropping records. When that happens, the following | |
166 | // DBG_IMPORTANT message will mislead admin into thinking that | |
167 | // the problem was resolved (for a brief period of time, until | |
168 | // another record comes in and overflows the buffer). It is | |
169 | // difficult to prevent this without also creating the opposite | |
170 | // problem: A huge record that does not fit and is dropped blocks | |
171 | // subsequent regular records from being buffered until we write. | |
172 | debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote << | |
b4bb4694 A |
173 | " logger stops dropping records after " << drops << " drops" << |
174 | "; current buffer use: " << (bufferedSize+len) << | |
175 | " out of " << bufferCapacity << " bytes"); | |
fb0c2f17 NH |
176 | } |
177 | return true; | |
178 | } | |
179 | ||
180 | if (!drops || dieOnError) { | |
181 | debugs(MY_DEBUG_SECTION, | |
182 | dieOnError ? DBG_CRITICAL : DBG_IMPORTANT, | |
183 | "tcp:" << remote << " logger " << bufferCapacity << "-byte " << | |
184 | "buffer overflowed; cannot fit " << | |
185 | (bufferedSize+len-bufferCapacity) << " bytes"); | |
186 | } | |
187 | ||
188 | if (dieOnError) | |
189 | fatal("tcp logger buffer overflowed"); | |
190 | ||
191 | if (!drops) { | |
192 | debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote << | |
193 | " logger starts dropping records."); | |
194 | } | |
195 | ||
196 | return false; | |
197 | } | |
198 | ||
199 | /// buffer a record that might exceed IoBufSize | |
200 | void | |
201 | Log::TcpLogger::appendRecord(const char *record, const size_t len) | |
202 | { | |
203 | // they should not happen, but to be safe, let's protect drop start/stop | |
204 | // monitoring algorithm from empty records (which can never be dropped) | |
205 | if (!len) | |
206 | return; | |
207 | ||
208 | if (!canFit(len)) { | |
209 | ++drops; | |
210 | return; | |
211 | } | |
212 | ||
213 | drops = 0; | |
214 | // append without spliting buf, unless it exceeds IoBufSize | |
215 | for (size_t off = 0; off < len; off += IoBufSize) | |
216 | appendChunk(record + off, min(len - off, IoBufSize)); | |
217 | } | |
218 | ||
219 | /// buffer a record chunk without splitting it across buffers | |
220 | void | |
221 | Log::TcpLogger::appendChunk(const char *chunk, const size_t len) | |
222 | { | |
223 | Must(len <= IoBufSize); | |
224 | // add a buffer if there is not one that can accomodate len bytes | |
225 | bool addBuffer = buffers.empty() || | |
226 | (buffers.back()->size+len > IoBufSize); | |
227 | // also add a buffer if there is only one and that one is being written | |
228 | addBuffer = addBuffer || (writeScheduled && buffers.size() == 1); | |
229 | ||
230 | if (addBuffer) { | |
231 | buffers.push_back(new MemBlob(IoBufSize)); | |
232 | debugs(MY_DEBUG_SECTION, 7, "added buffer #" << buffers.size()); | |
233 | } | |
234 | ||
235 | Must(!buffers.empty()); | |
236 | buffers.back()->append(chunk, len); | |
237 | bufferedSize += len; | |
238 | } | |
239 | ||
240 | /// starts [re]connecting to the remote logger | |
241 | void | |
d4c669ca | 242 | Log::TcpLogger::doConnect() |
fb0c2f17 NH |
243 | { |
244 | if (shutting_down) | |
245 | return; | |
246 | ||
247 | debugs(MY_DEBUG_SECTION, 3, "connecting"); | |
248 | Must(!conn); | |
249 | ||
250 | Comm::ConnectionPointer futureConn = new Comm::Connection; | |
251 | futureConn->remote = remote; | |
4dd643d5 AJ |
252 | futureConn->local.setAnyAddr(); |
253 | if (futureConn->remote.isIPv4()) | |
254 | futureConn->local.setIPv4(); | |
fb0c2f17 NH |
255 | |
256 | typedef CommCbMemFunT<TcpLogger, CommConnectCbParams> Dialer; | |
257 | AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer, this, Log::TcpLogger::connectDone); | |
258 | AsyncJob::Start(new Comm::ConnOpener(futureConn, call, 2)); | |
259 | } | |
260 | ||
261 | /// Comm::ConnOpener callback | |
262 | void | |
263 | Log::TcpLogger::connectDone(const CommConnectCbParams ¶ms) | |
264 | { | |
c8407295 | 265 | if (params.flag != Comm::OK) { |
fb0c2f17 NH |
266 | const double delay = 0.5; // seconds |
267 | if (connectFailures++ % 100 == 0) { | |
268 | debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote << | |
269 | " logger connection attempt #" << connectFailures << | |
270 | " failed. Will keep trying every " << delay << " seconds."); | |
271 | } | |
272 | ||
273 | if (!reconnectScheduled) { | |
274 | reconnectScheduled = true; | |
275 | eventAdd("Log::TcpLogger::DelayedReconnect", | |
276 | Log::TcpLogger::DelayedReconnect, | |
277 | new Pointer(this), 0.5, 0, false); | |
278 | } | |
279 | } else { | |
280 | if (connectFailures > 0) { | |
281 | debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote << | |
282 | " logger connectivity restored after " << | |
283 | (connectFailures+1) << " attempts."); | |
284 | connectFailures = 0; | |
285 | } | |
286 | ||
287 | Must(!conn); | |
288 | conn = params.conn; | |
289 | ||
290 | Must(!closer); | |
291 | typedef CommCbMemFunT<TcpLogger, CommCloseCbParams> Closer; | |
292 | closer = JobCallback(MY_DEBUG_SECTION, 4, Closer, this, Log::TcpLogger::handleClosure); | |
293 | comm_add_close_handler(conn->fd, closer); | |
294 | ||
295 | writeIfNeeded(); | |
296 | } | |
297 | } | |
298 | ||
299 | // XXX: Needed until eventAdd() starts accepting Async calls directly. | |
300 | /// Log::TcpLogger::delayedReconnect() wrapper. | |
301 | void | |
302 | Log::TcpLogger::DelayedReconnect(void *data) | |
303 | { | |
304 | Pointer *ptr = static_cast<Pointer*>(data); | |
305 | assert(ptr); | |
306 | if (TcpLogger *logger = ptr->valid()) { | |
307 | // Get back inside AsyncJob protections by scheduling another call. | |
308 | typedef NullaryMemFunT<TcpLogger> Dialer; | |
309 | AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer, | |
310 | logger, | |
311 | Log::TcpLogger::delayedReconnect); | |
312 | ScheduleCallHere(call); | |
313 | } | |
314 | delete ptr; | |
315 | } | |
316 | ||
317 | /// "sleep a little before trying to connect again" event callback | |
318 | void | |
b4bb4694 A |
319 | Log::TcpLogger::delayedReconnect() |
320 | { | |
fb0c2f17 NH |
321 | Must(reconnectScheduled); |
322 | Must(!conn); | |
323 | reconnectScheduled = false; | |
d4c669ca | 324 | doConnect(); |
fb0c2f17 NH |
325 | } |
326 | ||
327 | /// Comm::Write callback | |
328 | void | |
329 | Log::TcpLogger::writeDone(const CommIoCbParams &io) | |
330 | { | |
331 | writeScheduled = false; | |
c8407295 | 332 | if (io.flag == Comm::ERR_CLOSING) { |
fb0c2f17 NH |
333 | debugs(MY_DEBUG_SECTION, 7, "closing"); |
334 | // do nothing here -- our comm_close_handler will be called to clean up | |
c8407295 | 335 | } else if (io.flag != Comm::OK) { |
fb0c2f17 NH |
336 | debugs(MY_DEBUG_SECTION, 2, "write failure: " << xstrerr(io.xerrno)); |
337 | // keep the first buffer (the one we failed to write) | |
338 | disconnect(); | |
d4c669ca | 339 | doConnect(); |
fb0c2f17 NH |
340 | } else { |
341 | debugs(MY_DEBUG_SECTION, 5, "write successful"); | |
342 | ||
343 | Must(!buffers.empty()); // we had a buffer to write | |
344 | const MemBlob::Pointer &written = buffers.front(); | |
345 | const size_t writtenSize = static_cast<size_t>(written->size); | |
346 | // and we wrote the whole buffer | |
16b1fd57 | 347 | Must(io.size == writtenSize); |
fb0c2f17 NH |
348 | Must(bufferedSize >= writtenSize); |
349 | bufferedSize -= writtenSize; | |
350 | ||
351 | buffers.pop_front(); | |
352 | ||
353 | if (flushDebt > io.size) | |
354 | flushDebt -= io.size; | |
355 | else | |
356 | flushDebt = 0; // wrote everything we owed (or more) | |
357 | ||
358 | writeIfNeeded(); | |
359 | } | |
360 | } | |
361 | ||
b4bb4694 | 362 | /// This is our comm_close_handler. It is called when some external force |
fb0c2f17 NH |
363 | /// (e.g., reconfigure or shutdown) is closing the connection (rather than us). |
364 | void | |
365 | Log::TcpLogger::handleClosure(const CommCloseCbParams &io) | |
366 | { | |
367 | assert(inCall != NULL); | |
368 | closer = NULL; | |
369 | conn = NULL; | |
370 | // in all current use cases, we should not try to reconnect | |
371 | mustStop("Log::TcpLogger::handleClosure"); | |
372 | } | |
373 | ||
374 | /// close our connection now, without flushing | |
375 | void | |
376 | Log::TcpLogger::disconnect() | |
377 | { | |
378 | if (conn != NULL) { | |
379 | if (closer != NULL) { | |
380 | comm_remove_close_handler(conn->fd, closer); | |
381 | closer = NULL; | |
382 | } | |
383 | conn->close(); | |
384 | conn = NULL; | |
385 | } | |
386 | } | |
387 | ||
b4bb4694 | 388 | /// Converts Logfile into a pointer to a valid TcpLogger job or, |
fb0c2f17 NH |
389 | /// if the logger job has quit, into a nill pointer |
390 | Log::TcpLogger * | |
391 | Log::TcpLogger::StillLogging(Logfile *lf) | |
392 | { | |
393 | if (Pointer *pptr = static_cast<Pointer*>(lf->data)) | |
394 | return pptr->get(); // may be nil | |
395 | return NULL; | |
396 | } | |
397 | ||
398 | void | |
399 | Log::TcpLogger::Flush(Logfile * lf) | |
400 | { | |
401 | if (TcpLogger *logger = StillLogging(lf)) | |
402 | logger->flush(); | |
403 | } | |
404 | ||
405 | void | |
406 | Log::TcpLogger::WriteLine(Logfile * lf, const char *buf, size_t len) | |
407 | { | |
408 | if (TcpLogger *logger = StillLogging(lf)) | |
409 | logger->logRecord(buf, len); | |
410 | } | |
411 | ||
412 | void | |
413 | Log::TcpLogger::StartLine(Logfile * lf) | |
414 | { | |
415 | } | |
416 | ||
417 | void | |
418 | Log::TcpLogger::EndLine(Logfile * lf) | |
419 | { | |
420 | if (!Config.onoff.buffered_logs) | |
421 | Flush(lf); | |
422 | } | |
423 | ||
424 | void | |
425 | Log::TcpLogger::Rotate(Logfile * lf) | |
426 | { | |
427 | } | |
428 | ||
429 | void | |
430 | Log::TcpLogger::Close(Logfile * lf) | |
431 | { | |
432 | if (TcpLogger *logger = StillLogging(lf)) { | |
433 | debugs(50, 3, "Closing " << logger); | |
434 | typedef NullaryMemFunT<TcpLogger> Dialer; | |
435 | Dialer dialer(logger, &Log::TcpLogger::endGracefully); | |
436 | AsyncCall::Pointer call = asyncCall(50, 3, "Log::TcpLogger::endGracefully", dialer); | |
437 | ScheduleCallHere(call); | |
438 | } | |
439 | delete static_cast<Pointer*>(lf->data); | |
440 | lf->data = NULL; | |
441 | } | |
442 | ||
443 | /* | |
444 | * This code expects the path to be //host:port | |
445 | */ | |
446 | int | |
447 | Log::TcpLogger::Open(Logfile * lf, const char *path, size_t bufsz, int fatalFlag) | |
448 | { | |
449 | assert(!StillLogging(lf)); | |
450 | debugs(5, 3, "Tcp Open called"); | |
451 | ||
452 | Ip::Address addr; | |
453 | ||
454 | if (strncmp(path, "//", 2) == 0) | |
455 | path += 2; | |
456 | char *strAddr = xstrdup(path); | |
457 | if (!GetHostWithPort(strAddr, &addr)) { | |
458 | if (lf->flags.fatal) { | |
459 | fatalf("Invalid TCP logging address '%s'\n", lf->path); | |
460 | } else { | |
461 | debugs(50, DBG_IMPORTANT, "Invalid TCP logging address '" << lf->path << "'"); | |
462 | safe_free(strAddr); | |
463 | return FALSE; | |
464 | } | |
465 | } | |
466 | safe_free(strAddr); | |
467 | ||
468 | TcpLogger *logger = new TcpLogger(bufsz, fatalFlag, addr); | |
469 | lf->data = new Pointer(logger); | |
470 | lf->f_close = &Close; | |
471 | lf->f_linewrite = &WriteLine; | |
472 | lf->f_linestart = &StartLine; | |
473 | lf->f_lineend = &EndLine; | |
474 | lf->f_flush = &Flush; | |
475 | lf->f_rotate = &Rotate; | |
476 | AsyncJob::Start(logger); | |
477 | ||
478 | return 1; | |
479 | } |