]>
Commit | Line | Data |
---|---|---|
1 | /* | |
2 | * Copyright (C) 1996-2020 The Squid Software Foundation and contributors | |
3 | * | |
4 | * Squid software is distributed under GPLv2+ license and includes | |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
7 | */ | |
8 | ||
9 | #include "squid.h" | |
10 | #include "comm.h" | |
11 | #include "comm/Connection.h" | |
12 | #include "comm/ConnOpener.h" | |
13 | #include "comm/Loops.h" | |
14 | #include "comm/Write.h" | |
15 | #include "fatal.h" | |
16 | #include "fde.h" | |
17 | #include "globals.h" // for shutting_down | |
18 | #include "log/CustomLog.h" | |
19 | #include "log/File.h" | |
20 | #include "log/TcpLogger.h" | |
21 | #include "Parsing.h" | |
22 | #include "sbuf/MemBlob.h" | |
23 | #include "SquidConfig.h" | |
24 | #include "SquidTime.h" | |
25 | ||
26 | // a single I/O buffer should be large enough to store any access.log record | |
27 | const size_t Log::TcpLogger::IoBufSize = 2*MAX_URL; | |
28 | ||
29 | // We need at least two buffers because when we write the first buffer, | |
30 | // we have to use the second buffer to accumulate new entries. | |
31 | const size_t Log::TcpLogger::BufferCapacityMin = 2*Log::TcpLogger::IoBufSize; | |
32 | ||
33 | #define MY_DEBUG_SECTION 50 /* Log file handling */ | |
34 | ||
35 | CBDATA_NAMESPACED_CLASS_INIT(Log, TcpLogger); | |
36 | ||
37 | Log::TcpLogger::TcpLogger(size_t bufCap, bool dieOnErr, Ip::Address them): | |
38 | AsyncJob("TcpLogger"), | |
39 | dieOnError(dieOnErr), | |
40 | bufferCapacity(bufCap), | |
41 | bufferedSize(0), | |
42 | flushDebt(0), | |
43 | quitOnEmpty(false), | |
44 | reconnectScheduled(false), | |
45 | writeScheduled(false), | |
46 | conn(NULL), | |
47 | remote(them), | |
48 | connectFailures(0), | |
49 | drops(0) | |
50 | { | |
51 | if (bufferCapacity < BufferCapacityMin) { | |
52 | debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, | |
53 | "WARNING: tcp:" << remote << " logger configured buffer " << | |
54 | "size " << bufferCapacity << " is smaller than the " << | |
55 | BufferCapacityMin << "-byte" << " minimum. " << | |
56 | "Using the minimum instead."); | |
57 | bufferCapacity = BufferCapacityMin; | |
58 | } | |
59 | } | |
60 | ||
61 | Log::TcpLogger::~TcpLogger() | |
62 | { | |
63 | // make sure Comm::Write does not have our buffer pointer | |
64 | assert(!writeScheduled); | |
65 | } | |
66 | ||
67 | void | |
68 | Log::TcpLogger::start() | |
69 | { | |
70 | doConnect(); | |
71 | } | |
72 | ||
73 | bool | |
74 | Log::TcpLogger::doneAll() const | |
75 | { | |
76 | debugs(MY_DEBUG_SECTION, 5, "quitOnEmpty: " << quitOnEmpty << | |
77 | " buffered: " << bufferedSize << | |
78 | " conn: " << conn << ' ' << connectFailures); | |
79 | ||
80 | // we do not quit unless we are told that we may | |
81 | if (!quitOnEmpty) | |
82 | return false; | |
83 | ||
84 | /* We were asked to quit after we are done writing buffers. Are we done? */ | |
85 | ||
86 | // If we have records but are failing to connect, quit. Otherwise, we may | |
87 | // be trying to connect forever due to a [since fixed] misconfiguration! | |
88 | const bool failingToConnect = !conn && connectFailures; | |
89 | if (bufferedSize && !failingToConnect) | |
90 | return false; | |
91 | ||
92 | return AsyncJob::doneAll(); | |
93 | } | |
94 | ||
95 | void | |
96 | Log::TcpLogger::swanSong() | |
97 | { | |
98 | disconnect(); // optional: refcounting should close/delete conn eventually | |
99 | AsyncJob::swanSong(); | |
100 | } | |
101 | ||
102 | void | |
103 | Log::TcpLogger::endGracefully() | |
104 | { | |
105 | // job call protection must end our job if we are done logging current bufs | |
106 | assert(inCall != NULL); | |
107 | quitOnEmpty = true; | |
108 | flush(); | |
109 | } | |
110 | ||
111 | void | |
112 | Log::TcpLogger::flush() | |
113 | { | |
114 | flushDebt = bufferedSize; | |
115 | writeIfNeeded(); | |
116 | } | |
117 | ||
118 | void | |
119 | Log::TcpLogger::logRecord(const char *buf, const size_t len) | |
120 | { | |
121 | appendRecord(buf, len); | |
122 | writeIfNeeded(); | |
123 | } | |
124 | ||
125 | /// starts writing if and only if it is time to write accumulated records | |
126 | void | |
127 | Log::TcpLogger::writeIfNeeded() | |
128 | { | |
129 | // write if an earlier flush command forces us to write or | |
130 | // if we have filled at least one I/O buffer | |
131 | if (flushDebt > 0 || buffers.size() > 1) | |
132 | writeIfPossible(); | |
133 | } | |
134 | ||
135 | /// starts writing if possible | |
136 | void Log::TcpLogger::writeIfPossible() | |
137 | { | |
138 | debugs(MY_DEBUG_SECTION, 7, "guards: " << (!writeScheduled) << | |
139 | (bufferedSize > 0) << (conn != NULL) << | |
140 | (conn != NULL && !fd_table[conn->fd].closing()) << " buffered: " << | |
141 | bufferedSize << '/' << buffers.size()); | |
142 | ||
143 | // XXX: Squid shutdown sequence starts closing our connection before | |
144 | // calling LogfileClose, leading to loss of log records during shutdown. | |
145 | if (!writeScheduled && bufferedSize > 0 && conn != NULL && | |
146 | !fd_table[conn->fd].closing()) { | |
147 | debugs(MY_DEBUG_SECTION, 5, "writing first buffer"); | |
148 | ||
149 | typedef CommCbMemFunT<TcpLogger, CommIoCbParams> WriteDialer; | |
150 | AsyncCall::Pointer callback = JobCallback(MY_DEBUG_SECTION, 5, WriteDialer, this, Log::TcpLogger::writeDone); | |
151 | const MemBlob::Pointer &buffer = buffers.front(); | |
152 | Comm::Write(conn, buffer->mem, buffer->size, callback, NULL); | |
153 | writeScheduled = true; | |
154 | } | |
155 | } | |
156 | ||
157 | /// whether len more bytes can be buffered | |
158 | bool | |
159 | Log::TcpLogger::canFit(const size_t len) const | |
160 | { | |
161 | // TODO: limit reporting frequency in addition to reporting only changes | |
162 | ||
163 | if (bufferedSize+len <= bufferCapacity) { | |
164 | if (drops) { | |
165 | // We can get here if a shorter record accidentally fits after we | |
166 | // started dropping records. When that happens, the following | |
167 | // DBG_IMPORTANT message will mislead admin into thinking that | |
168 | // the problem was resolved (for a brief period of time, until | |
169 | // another record comes in and overflows the buffer). It is | |
170 | // difficult to prevent this without also creating the opposite | |
171 | // problem: A huge record that does not fit and is dropped blocks | |
172 | // subsequent regular records from being buffered until we write. | |
173 | debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote << | |
174 | " logger stops dropping records after " << drops << " drops" << | |
175 | "; current buffer use: " << (bufferedSize+len) << | |
176 | " out of " << bufferCapacity << " bytes"); | |
177 | } | |
178 | return true; | |
179 | } | |
180 | ||
181 | if (!drops || dieOnError) { | |
182 | debugs(MY_DEBUG_SECTION, | |
183 | dieOnError ? DBG_CRITICAL : DBG_IMPORTANT, | |
184 | "tcp:" << remote << " logger " << bufferCapacity << "-byte " << | |
185 | "buffer overflowed; cannot fit " << | |
186 | (bufferedSize+len-bufferCapacity) << " bytes"); | |
187 | } | |
188 | ||
189 | if (dieOnError) | |
190 | fatal("tcp logger buffer overflowed"); | |
191 | ||
192 | if (!drops) { | |
193 | debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote << | |
194 | " logger starts dropping records."); | |
195 | } | |
196 | ||
197 | return false; | |
198 | } | |
199 | ||
200 | /// buffer a record that might exceed IoBufSize | |
201 | void | |
202 | Log::TcpLogger::appendRecord(const char *record, const size_t len) | |
203 | { | |
204 | // they should not happen, but to be safe, let's protect drop start/stop | |
205 | // monitoring algorithm from empty records (which can never be dropped) | |
206 | if (!len) | |
207 | return; | |
208 | ||
209 | if (!canFit(len)) { | |
210 | ++drops; | |
211 | return; | |
212 | } | |
213 | ||
214 | drops = 0; | |
215 | // append without spliting buf, unless it exceeds IoBufSize | |
216 | for (size_t off = 0; off < len; off += IoBufSize) | |
217 | appendChunk(record + off, min(len - off, IoBufSize)); | |
218 | } | |
219 | ||
220 | /// buffer a record chunk without splitting it across buffers | |
221 | void | |
222 | Log::TcpLogger::appendChunk(const char *chunk, const size_t len) | |
223 | { | |
224 | Must(len <= IoBufSize); | |
225 | // add a buffer if there is not one that can accomodate len bytes | |
226 | bool addBuffer = buffers.empty() || | |
227 | (buffers.back()->size+len > IoBufSize); | |
228 | // also add a buffer if there is only one and that one is being written | |
229 | addBuffer = addBuffer || (writeScheduled && buffers.size() == 1); | |
230 | ||
231 | if (addBuffer) { | |
232 | buffers.push_back(new MemBlob(IoBufSize)); | |
233 | debugs(MY_DEBUG_SECTION, 7, "added buffer #" << buffers.size()); | |
234 | } | |
235 | ||
236 | Must(!buffers.empty()); | |
237 | buffers.back()->append(chunk, len); | |
238 | bufferedSize += len; | |
239 | } | |
240 | ||
241 | /// starts [re]connecting to the remote logger | |
242 | void | |
243 | Log::TcpLogger::doConnect() | |
244 | { | |
245 | if (shutting_down) | |
246 | return; | |
247 | ||
248 | debugs(MY_DEBUG_SECTION, 3, "connecting"); | |
249 | Must(!conn); | |
250 | ||
251 | Comm::ConnectionPointer futureConn = new Comm::Connection; | |
252 | futureConn->remote = remote; | |
253 | futureConn->local.setAnyAddr(); | |
254 | if (futureConn->remote.isIPv4()) | |
255 | futureConn->local.setIPv4(); | |
256 | ||
257 | typedef CommCbMemFunT<TcpLogger, CommConnectCbParams> Dialer; | |
258 | AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer, this, Log::TcpLogger::connectDone); | |
259 | AsyncJob::Start(new Comm::ConnOpener(futureConn, call, 2)); | |
260 | } | |
261 | ||
262 | /// Comm::ConnOpener callback | |
263 | void | |
264 | Log::TcpLogger::connectDone(const CommConnectCbParams ¶ms) | |
265 | { | |
266 | if (params.flag != Comm::OK) { | |
267 | const double delay = 0.5; // seconds | |
268 | if (connectFailures++ % 100 == 0) { | |
269 | debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote << | |
270 | " logger connection attempt #" << connectFailures << | |
271 | " failed. Will keep trying every " << delay << " seconds."); | |
272 | } | |
273 | ||
274 | if (!reconnectScheduled) { | |
275 | reconnectScheduled = true; | |
276 | eventAdd("Log::TcpLogger::DelayedReconnect", | |
277 | Log::TcpLogger::DelayedReconnect, | |
278 | new Pointer(this), 0.5, 0, false); | |
279 | } | |
280 | } else { | |
281 | if (connectFailures > 0) { | |
282 | debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote << | |
283 | " logger connectivity restored after " << | |
284 | (connectFailures+1) << " attempts."); | |
285 | connectFailures = 0; | |
286 | } | |
287 | ||
288 | Must(!conn); | |
289 | conn = params.conn; | |
290 | ||
291 | Must(!closer); | |
292 | typedef CommCbMemFunT<TcpLogger, CommCloseCbParams> Closer; | |
293 | closer = JobCallback(MY_DEBUG_SECTION, 4, Closer, this, Log::TcpLogger::handleClosure); | |
294 | comm_add_close_handler(conn->fd, closer); | |
295 | ||
296 | writeIfNeeded(); | |
297 | } | |
298 | } | |
299 | ||
300 | // XXX: Needed until eventAdd() starts accepting Async calls directly. | |
301 | /// Log::TcpLogger::delayedReconnect() wrapper. | |
302 | void | |
303 | Log::TcpLogger::DelayedReconnect(void *data) | |
304 | { | |
305 | Pointer *ptr = static_cast<Pointer*>(data); | |
306 | assert(ptr); | |
307 | if (TcpLogger *logger = ptr->valid()) { | |
308 | // Get back inside AsyncJob protections by scheduling another call. | |
309 | typedef NullaryMemFunT<TcpLogger> Dialer; | |
310 | AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer, | |
311 | logger, | |
312 | Log::TcpLogger::delayedReconnect); | |
313 | ScheduleCallHere(call); | |
314 | } | |
315 | delete ptr; | |
316 | } | |
317 | ||
318 | /// "sleep a little before trying to connect again" event callback | |
319 | void | |
320 | Log::TcpLogger::delayedReconnect() | |
321 | { | |
322 | Must(reconnectScheduled); | |
323 | Must(!conn); | |
324 | reconnectScheduled = false; | |
325 | doConnect(); | |
326 | } | |
327 | ||
328 | /// Comm::Write callback | |
329 | void | |
330 | Log::TcpLogger::writeDone(const CommIoCbParams &io) | |
331 | { | |
332 | writeScheduled = false; | |
333 | if (io.flag == Comm::ERR_CLOSING) { | |
334 | debugs(MY_DEBUG_SECTION, 7, "closing"); | |
335 | // do nothing here -- our comm_close_handler will be called to clean up | |
336 | } else if (io.flag != Comm::OK) { | |
337 | debugs(MY_DEBUG_SECTION, 2, "write failure: " << xstrerr(io.xerrno)); | |
338 | // keep the first buffer (the one we failed to write) | |
339 | disconnect(); | |
340 | doConnect(); | |
341 | } else { | |
342 | debugs(MY_DEBUG_SECTION, 5, "write successful"); | |
343 | ||
344 | Must(!buffers.empty()); // we had a buffer to write | |
345 | const MemBlob::Pointer &written = buffers.front(); | |
346 | const size_t writtenSize = static_cast<size_t>(written->size); | |
347 | // and we wrote the whole buffer | |
348 | Must(io.size == writtenSize); | |
349 | Must(bufferedSize >= writtenSize); | |
350 | bufferedSize -= writtenSize; | |
351 | ||
352 | buffers.pop_front(); | |
353 | ||
354 | if (flushDebt > io.size) | |
355 | flushDebt -= io.size; | |
356 | else | |
357 | flushDebt = 0; // wrote everything we owed (or more) | |
358 | ||
359 | writeIfNeeded(); | |
360 | } | |
361 | } | |
362 | ||
363 | /// This is our comm_close_handler. It is called when some external force | |
364 | /// (e.g., reconfigure or shutdown) is closing the connection (rather than us). | |
365 | void | |
366 | Log::TcpLogger::handleClosure(const CommCloseCbParams &) | |
367 | { | |
368 | assert(inCall != NULL); | |
369 | closer = NULL; | |
370 | conn = NULL; | |
371 | // in all current use cases, we should not try to reconnect | |
372 | mustStop("Log::TcpLogger::handleClosure"); | |
373 | } | |
374 | ||
375 | /// close our connection now, without flushing | |
376 | void | |
377 | Log::TcpLogger::disconnect() | |
378 | { | |
379 | if (conn != NULL) { | |
380 | if (closer != NULL) { | |
381 | comm_remove_close_handler(conn->fd, closer); | |
382 | closer = NULL; | |
383 | } | |
384 | conn->close(); | |
385 | conn = NULL; | |
386 | } | |
387 | } | |
388 | ||
389 | /// Converts Logfile into a pointer to a valid TcpLogger job or, | |
390 | /// if the logger job has quit, into a nill pointer | |
391 | Log::TcpLogger * | |
392 | Log::TcpLogger::StillLogging(Logfile *lf) | |
393 | { | |
394 | if (Pointer *pptr = static_cast<Pointer*>(lf->data)) | |
395 | return pptr->get(); // may be nil | |
396 | return NULL; | |
397 | } | |
398 | ||
399 | void | |
400 | Log::TcpLogger::Flush(Logfile * lf) | |
401 | { | |
402 | if (TcpLogger *logger = StillLogging(lf)) | |
403 | logger->flush(); | |
404 | } | |
405 | ||
406 | void | |
407 | Log::TcpLogger::WriteLine(Logfile * lf, const char *buf, size_t len) | |
408 | { | |
409 | if (TcpLogger *logger = StillLogging(lf)) | |
410 | logger->logRecord(buf, len); | |
411 | } | |
412 | ||
413 | void | |
414 | Log::TcpLogger::StartLine(Logfile *) | |
415 | { | |
416 | } | |
417 | ||
418 | void | |
419 | Log::TcpLogger::EndLine(Logfile * lf) | |
420 | { | |
421 | if (!Config.onoff.buffered_logs) | |
422 | Flush(lf); | |
423 | } | |
424 | ||
425 | void | |
426 | Log::TcpLogger::Rotate(Logfile *, const int16_t) | |
427 | { | |
428 | } | |
429 | ||
430 | void | |
431 | Log::TcpLogger::Close(Logfile * lf) | |
432 | { | |
433 | if (TcpLogger *logger = StillLogging(lf)) { | |
434 | debugs(50, 3, "Closing " << logger); | |
435 | typedef NullaryMemFunT<TcpLogger> Dialer; | |
436 | Dialer dialer(logger, &Log::TcpLogger::endGracefully); | |
437 | AsyncCall::Pointer call = asyncCall(50, 3, "Log::TcpLogger::endGracefully", dialer); | |
438 | ScheduleCallHere(call); | |
439 | } | |
440 | delete static_cast<Pointer*>(lf->data); | |
441 | lf->data = NULL; | |
442 | } | |
443 | ||
444 | /* | |
445 | * This code expects the path to be //host:port | |
446 | */ | |
447 | int | |
448 | Log::TcpLogger::Open(Logfile * lf, const char *path, size_t bufsz, int fatalFlag) | |
449 | { | |
450 | assert(!StillLogging(lf)); | |
451 | debugs(5, 3, "Tcp Open called"); | |
452 | ||
453 | Ip::Address addr; | |
454 | ||
455 | if (strncmp(path, "//", 2) == 0) | |
456 | path += 2; | |
457 | char *strAddr = xstrdup(path); | |
458 | if (!GetHostWithPort(strAddr, &addr)) { | |
459 | if (lf->flags.fatal) { | |
460 | fatalf("Invalid TCP logging address '%s'\n", lf->path); | |
461 | } else { | |
462 | debugs(50, DBG_IMPORTANT, "Invalid TCP logging address '" << lf->path << "'"); | |
463 | safe_free(strAddr); | |
464 | return FALSE; | |
465 | } | |
466 | } | |
467 | safe_free(strAddr); | |
468 | ||
469 | TcpLogger *logger = new TcpLogger(bufsz, fatalFlag, addr); | |
470 | lf->data = new Pointer(logger); | |
471 | lf->f_close = &Close; | |
472 | lf->f_linewrite = &WriteLine; | |
473 | lf->f_linestart = &StartLine; | |
474 | lf->f_lineend = &EndLine; | |
475 | lf->f_flush = &Flush; | |
476 | lf->f_rotate = &Rotate; | |
477 | AsyncJob::Start(logger); | |
478 | ||
479 | return 1; | |
480 | } | |
481 |