]> git.ipfire.org Git - thirdparty/squid.git/blob - src/log/TcpLogger.cc
Source Format Enforcement (#763)
[thirdparty/squid.git] / src / log / TcpLogger.cc
1 /*
2 * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 #include "squid.h"
10 #include "comm.h"
11 #include "comm/Connection.h"
12 #include "comm/ConnOpener.h"
13 #include "comm/Loops.h"
14 #include "comm/Write.h"
15 #include "fatal.h"
16 #include "fde.h"
17 #include "globals.h" // for shutting_down
18 #include "log/CustomLog.h"
19 #include "log/File.h"
20 #include "log/TcpLogger.h"
21 #include "Parsing.h"
22 #include "sbuf/MemBlob.h"
23 #include "SquidConfig.h"
24 #include "SquidTime.h"
25
26 // a single I/O buffer should be large enough to store any access.log record
27 const size_t Log::TcpLogger::IoBufSize = 2*MAX_URL;
28
29 // We need at least two buffers because when we write the first buffer,
30 // we have to use the second buffer to accumulate new entries.
31 const size_t Log::TcpLogger::BufferCapacityMin = 2*Log::TcpLogger::IoBufSize;
32
33 #define MY_DEBUG_SECTION 50 /* Log file handling */
34
35 CBDATA_NAMESPACED_CLASS_INIT(Log, TcpLogger);
36
37 Log::TcpLogger::TcpLogger(size_t bufCap, bool dieOnErr, Ip::Address them):
38 AsyncJob("TcpLogger"),
39 dieOnError(dieOnErr),
40 bufferCapacity(bufCap),
41 bufferedSize(0),
42 flushDebt(0),
43 quitOnEmpty(false),
44 reconnectScheduled(false),
45 writeScheduled(false),
46 conn(NULL),
47 remote(them),
48 connectFailures(0),
49 drops(0)
50 {
51 if (bufferCapacity < BufferCapacityMin) {
52 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT,
53 "WARNING: tcp:" << remote << " logger configured buffer " <<
54 "size " << bufferCapacity << " is smaller than the " <<
55 BufferCapacityMin << "-byte" << " minimum. " <<
56 "Using the minimum instead.");
57 bufferCapacity = BufferCapacityMin;
58 }
59 }
60
61 Log::TcpLogger::~TcpLogger()
62 {
63 // make sure Comm::Write does not have our buffer pointer
64 assert(!writeScheduled);
65 }
66
67 void
68 Log::TcpLogger::start()
69 {
70 doConnect();
71 }
72
73 bool
74 Log::TcpLogger::doneAll() const
75 {
76 debugs(MY_DEBUG_SECTION, 5, "quitOnEmpty: " << quitOnEmpty <<
77 " buffered: " << bufferedSize <<
78 " conn: " << conn << ' ' << connectFailures);
79
80 // we do not quit unless we are told that we may
81 if (!quitOnEmpty)
82 return false;
83
84 /* We were asked to quit after we are done writing buffers. Are we done? */
85
86 // If we have records but are failing to connect, quit. Otherwise, we may
87 // be trying to connect forever due to a [since fixed] misconfiguration!
88 const bool failingToConnect = !conn && connectFailures;
89 if (bufferedSize && !failingToConnect)
90 return false;
91
92 return AsyncJob::doneAll();
93 }
94
95 void
96 Log::TcpLogger::swanSong()
97 {
98 disconnect(); // optional: refcounting should close/delete conn eventually
99 AsyncJob::swanSong();
100 }
101
102 void
103 Log::TcpLogger::endGracefully()
104 {
105 // job call protection must end our job if we are done logging current bufs
106 assert(inCall != NULL);
107 quitOnEmpty = true;
108 flush();
109 }
110
111 void
112 Log::TcpLogger::flush()
113 {
114 flushDebt = bufferedSize;
115 writeIfNeeded();
116 }
117
118 void
119 Log::TcpLogger::logRecord(const char *buf, const size_t len)
120 {
121 appendRecord(buf, len);
122 writeIfNeeded();
123 }
124
125 /// starts writing if and only if it is time to write accumulated records
126 void
127 Log::TcpLogger::writeIfNeeded()
128 {
129 // write if an earlier flush command forces us to write or
130 // if we have filled at least one I/O buffer
131 if (flushDebt > 0 || buffers.size() > 1)
132 writeIfPossible();
133 }
134
135 /// starts writing if possible
136 void Log::TcpLogger::writeIfPossible()
137 {
138 debugs(MY_DEBUG_SECTION, 7, "guards: " << (!writeScheduled) <<
139 (bufferedSize > 0) << (conn != NULL) <<
140 (conn != NULL && !fd_table[conn->fd].closing()) << " buffered: " <<
141 bufferedSize << '/' << buffers.size());
142
143 // XXX: Squid shutdown sequence starts closing our connection before
144 // calling LogfileClose, leading to loss of log records during shutdown.
145 if (!writeScheduled && bufferedSize > 0 && conn != NULL &&
146 !fd_table[conn->fd].closing()) {
147 debugs(MY_DEBUG_SECTION, 5, "writing first buffer");
148
149 typedef CommCbMemFunT<TcpLogger, CommIoCbParams> WriteDialer;
150 AsyncCall::Pointer callback = JobCallback(MY_DEBUG_SECTION, 5, WriteDialer, this, Log::TcpLogger::writeDone);
151 const MemBlob::Pointer &buffer = buffers.front();
152 Comm::Write(conn, buffer->mem, buffer->size, callback, NULL);
153 writeScheduled = true;
154 }
155 }
156
157 /// whether len more bytes can be buffered
158 bool
159 Log::TcpLogger::canFit(const size_t len) const
160 {
161 // TODO: limit reporting frequency in addition to reporting only changes
162
163 if (bufferedSize+len <= bufferCapacity) {
164 if (drops) {
165 // We can get here if a shorter record accidentally fits after we
166 // started dropping records. When that happens, the following
167 // DBG_IMPORTANT message will mislead admin into thinking that
168 // the problem was resolved (for a brief period of time, until
169 // another record comes in and overflows the buffer). It is
170 // difficult to prevent this without also creating the opposite
171 // problem: A huge record that does not fit and is dropped blocks
172 // subsequent regular records from being buffered until we write.
173 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
174 " logger stops dropping records after " << drops << " drops" <<
175 "; current buffer use: " << (bufferedSize+len) <<
176 " out of " << bufferCapacity << " bytes");
177 }
178 return true;
179 }
180
181 if (!drops || dieOnError) {
182 debugs(MY_DEBUG_SECTION,
183 dieOnError ? DBG_CRITICAL : DBG_IMPORTANT,
184 "tcp:" << remote << " logger " << bufferCapacity << "-byte " <<
185 "buffer overflowed; cannot fit " <<
186 (bufferedSize+len-bufferCapacity) << " bytes");
187 }
188
189 if (dieOnError)
190 fatal("tcp logger buffer overflowed");
191
192 if (!drops) {
193 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
194 " logger starts dropping records.");
195 }
196
197 return false;
198 }
199
200 /// buffer a record that might exceed IoBufSize
201 void
202 Log::TcpLogger::appendRecord(const char *record, const size_t len)
203 {
204 // they should not happen, but to be safe, let's protect drop start/stop
205 // monitoring algorithm from empty records (which can never be dropped)
206 if (!len)
207 return;
208
209 if (!canFit(len)) {
210 ++drops;
211 return;
212 }
213
214 drops = 0;
215 // append without splitting buf, unless it exceeds IoBufSize
216 for (size_t off = 0; off < len; off += IoBufSize)
217 appendChunk(record + off, min(len - off, IoBufSize));
218 }
219
220 /// buffer a record chunk without splitting it across buffers
221 void
222 Log::TcpLogger::appendChunk(const char *chunk, const size_t len)
223 {
224 Must(len <= IoBufSize);
225 // add a buffer if there is not one that can accommodate len bytes
226 bool addBuffer = buffers.empty() ||
227 (buffers.back()->size+len > IoBufSize);
228 // also add a buffer if there is only one and that one is being written
229 addBuffer = addBuffer || (writeScheduled && buffers.size() == 1);
230
231 if (addBuffer) {
232 buffers.push_back(new MemBlob(IoBufSize));
233 debugs(MY_DEBUG_SECTION, 7, "added buffer #" << buffers.size());
234 }
235
236 Must(!buffers.empty());
237 buffers.back()->append(chunk, len);
238 bufferedSize += len;
239 }
240
241 /// starts [re]connecting to the remote logger
242 void
243 Log::TcpLogger::doConnect()
244 {
245 if (shutting_down)
246 return;
247
248 debugs(MY_DEBUG_SECTION, 3, "connecting");
249 Must(!conn);
250
251 Comm::ConnectionPointer futureConn = new Comm::Connection;
252 futureConn->remote = remote;
253 futureConn->local.setAnyAddr();
254 if (futureConn->remote.isIPv4())
255 futureConn->local.setIPv4();
256
257 typedef CommCbMemFunT<TcpLogger, CommConnectCbParams> Dialer;
258 AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer, this, Log::TcpLogger::connectDone);
259 AsyncJob::Start(new Comm::ConnOpener(futureConn, call, 2));
260 }
261
262 /// Comm::ConnOpener callback
263 void
264 Log::TcpLogger::connectDone(const CommConnectCbParams &params)
265 {
266 if (params.flag != Comm::OK) {
267 const double delay = 0.5; // seconds
268 if (connectFailures++ % 100 == 0) {
269 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
270 " logger connection attempt #" << connectFailures <<
271 " failed. Will keep trying every " << delay << " seconds.");
272 }
273
274 if (!reconnectScheduled) {
275 reconnectScheduled = true;
276 eventAdd("Log::TcpLogger::DelayedReconnect",
277 Log::TcpLogger::DelayedReconnect,
278 new Pointer(this), 0.5, 0, false);
279 }
280 } else {
281 if (connectFailures > 0) {
282 debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
283 " logger connectivity restored after " <<
284 (connectFailures+1) << " attempts.");
285 connectFailures = 0;
286 }
287
288 Must(!conn);
289 conn = params.conn;
290
291 Must(!closer);
292 typedef CommCbMemFunT<TcpLogger, CommCloseCbParams> Closer;
293 closer = JobCallback(MY_DEBUG_SECTION, 4, Closer, this, Log::TcpLogger::handleClosure);
294 comm_add_close_handler(conn->fd, closer);
295
296 writeIfNeeded();
297 }
298 }
299
300 // XXX: Needed until eventAdd() starts accepting Async calls directly.
301 /// Log::TcpLogger::delayedReconnect() wrapper.
302 void
303 Log::TcpLogger::DelayedReconnect(void *data)
304 {
305 Pointer *ptr = static_cast<Pointer*>(data);
306 assert(ptr);
307 if (TcpLogger *logger = ptr->valid()) {
308 // Get back inside AsyncJob protections by scheduling another call.
309 typedef NullaryMemFunT<TcpLogger> Dialer;
310 AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer,
311 logger,
312 Log::TcpLogger::delayedReconnect);
313 ScheduleCallHere(call);
314 }
315 delete ptr;
316 }
317
318 /// "sleep a little before trying to connect again" event callback
319 void
320 Log::TcpLogger::delayedReconnect()
321 {
322 Must(reconnectScheduled);
323 Must(!conn);
324 reconnectScheduled = false;
325 doConnect();
326 }
327
328 /// Comm::Write callback
329 void
330 Log::TcpLogger::writeDone(const CommIoCbParams &io)
331 {
332 writeScheduled = false;
333 if (io.flag == Comm::ERR_CLOSING) {
334 debugs(MY_DEBUG_SECTION, 7, "closing");
335 // do nothing here -- our comm_close_handler will be called to clean up
336 } else if (io.flag != Comm::OK) {
337 debugs(MY_DEBUG_SECTION, 2, "write failure: " << xstrerr(io.xerrno));
338 // keep the first buffer (the one we failed to write)
339 disconnect();
340 doConnect();
341 } else {
342 debugs(MY_DEBUG_SECTION, 5, "write successful");
343
344 Must(!buffers.empty()); // we had a buffer to write
345 const MemBlob::Pointer &written = buffers.front();
346 const size_t writtenSize = static_cast<size_t>(written->size);
347 // and we wrote the whole buffer
348 Must(io.size == writtenSize);
349 Must(bufferedSize >= writtenSize);
350 bufferedSize -= writtenSize;
351
352 buffers.pop_front();
353
354 if (flushDebt > io.size)
355 flushDebt -= io.size;
356 else
357 flushDebt = 0; // wrote everything we owed (or more)
358
359 writeIfNeeded();
360 }
361 }
362
363 /// This is our comm_close_handler. It is called when some external force
364 /// (e.g., reconfigure or shutdown) is closing the connection (rather than us).
365 void
366 Log::TcpLogger::handleClosure(const CommCloseCbParams &)
367 {
368 assert(inCall != NULL);
369 closer = NULL;
370 conn = NULL;
371 // in all current use cases, we should not try to reconnect
372 mustStop("Log::TcpLogger::handleClosure");
373 }
374
375 /// close our connection now, without flushing
376 void
377 Log::TcpLogger::disconnect()
378 {
379 if (conn != NULL) {
380 if (closer != NULL) {
381 comm_remove_close_handler(conn->fd, closer);
382 closer = NULL;
383 }
384 conn->close();
385 conn = NULL;
386 }
387 }
388
389 /// Converts Logfile into a pointer to a valid TcpLogger job or,
390 /// if the logger job has quit, into a nill pointer
391 Log::TcpLogger *
392 Log::TcpLogger::StillLogging(Logfile *lf)
393 {
394 if (Pointer *pptr = static_cast<Pointer*>(lf->data))
395 return pptr->get(); // may be nil
396 return NULL;
397 }
398
399 void
400 Log::TcpLogger::Flush(Logfile * lf)
401 {
402 if (TcpLogger *logger = StillLogging(lf))
403 logger->flush();
404 }
405
406 void
407 Log::TcpLogger::WriteLine(Logfile * lf, const char *buf, size_t len)
408 {
409 if (TcpLogger *logger = StillLogging(lf))
410 logger->logRecord(buf, len);
411 }
412
413 void
414 Log::TcpLogger::StartLine(Logfile *)
415 {
416 }
417
418 void
419 Log::TcpLogger::EndLine(Logfile * lf)
420 {
421 if (!Config.onoff.buffered_logs)
422 Flush(lf);
423 }
424
425 void
426 Log::TcpLogger::Rotate(Logfile *, const int16_t)
427 {
428 }
429
430 void
431 Log::TcpLogger::Close(Logfile * lf)
432 {
433 if (TcpLogger *logger = StillLogging(lf)) {
434 debugs(50, 3, "Closing " << logger);
435 typedef NullaryMemFunT<TcpLogger> Dialer;
436 Dialer dialer(logger, &Log::TcpLogger::endGracefully);
437 AsyncCall::Pointer call = asyncCall(50, 3, "Log::TcpLogger::endGracefully", dialer);
438 ScheduleCallHere(call);
439 }
440 delete static_cast<Pointer*>(lf->data);
441 lf->data = NULL;
442 }
443
444 /*
445 * This code expects the path to be //host:port
446 */
447 int
448 Log::TcpLogger::Open(Logfile * lf, const char *path, size_t bufsz, int fatalFlag)
449 {
450 assert(!StillLogging(lf));
451 debugs(5, 3, "Tcp Open called");
452
453 Ip::Address addr;
454
455 if (strncmp(path, "//", 2) == 0)
456 path += 2;
457 char *strAddr = xstrdup(path);
458 if (!GetHostWithPort(strAddr, &addr)) {
459 if (lf->flags.fatal) {
460 fatalf("Invalid TCP logging address '%s'\n", lf->path);
461 } else {
462 debugs(50, DBG_IMPORTANT, "Invalid TCP logging address '" << lf->path << "'");
463 safe_free(strAddr);
464 return FALSE;
465 }
466 }
467 safe_free(strAddr);
468
469 TcpLogger *logger = new TcpLogger(bufsz, fatalFlag, addr);
470 lf->data = new Pointer(logger);
471 lf->f_close = &Close;
472 lf->f_linewrite = &WriteLine;
473 lf->f_linestart = &StartLine;
474 lf->f_lineend = &EndLine;
475 lf->f_flush = &Flush;
476 lf->f_rotate = &Rotate;
477 AsyncJob::Start(logger);
478
479 return 1;
480 }
481