2 * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
11 #include "comm/Connection.h"
12 #include "comm/ConnOpener.h"
13 #include "comm/Loops.h"
14 #include "comm/Write.h"
17 #include "globals.h" // for shutting_down
18 #include "log/CustomLog.h"
20 #include "log/TcpLogger.h"
22 #include "sbuf/MemBlob.h"
23 #include "SquidConfig.h"
24 #include "SquidTime.h"
26 // a single I/O buffer should be large enough to store any access.log record
27 const size_t Log::TcpLogger::IoBufSize
= 2*MAX_URL
;
29 // We need at least two buffers because when we write the first buffer,
30 // we have to use the second buffer to accumulate new entries.
31 const size_t Log::TcpLogger::BufferCapacityMin
= 2*Log::TcpLogger::IoBufSize
;
33 #define MY_DEBUG_SECTION 50 /* Log file handling */
35 CBDATA_NAMESPACED_CLASS_INIT(Log
, TcpLogger
);
37 Log::TcpLogger::TcpLogger(size_t bufCap
, bool dieOnErr
, Ip::Address them
):
38 AsyncJob("TcpLogger"),
40 bufferCapacity(bufCap
),
44 reconnectScheduled(false),
45 writeScheduled(false),
51 if (bufferCapacity
< BufferCapacityMin
) {
52 debugs(MY_DEBUG_SECTION
, DBG_IMPORTANT
,
53 "WARNING: tcp:" << remote
<< " logger configured buffer " <<
54 "size " << bufferCapacity
<< " is smaller than the " <<
55 BufferCapacityMin
<< "-byte" << " minimum. " <<
56 "Using the minimum instead.");
57 bufferCapacity
= BufferCapacityMin
;
61 Log::TcpLogger::~TcpLogger()
63 // make sure Comm::Write does not have our buffer pointer
64 assert(!writeScheduled
);
68 Log::TcpLogger::start()
74 Log::TcpLogger::doneAll() const
76 debugs(MY_DEBUG_SECTION
, 5, "quitOnEmpty: " << quitOnEmpty
<<
77 " buffered: " << bufferedSize
<<
78 " conn: " << conn
<< ' ' << connectFailures
);
80 // we do not quit unless we are told that we may
84 /* We were asked to quit after we are done writing buffers. Are we done? */
86 // If we have records but are failing to connect, quit. Otherwise, we may
87 // be trying to connect forever due to a [since fixed] misconfiguration!
88 const bool failingToConnect
= !conn
&& connectFailures
;
89 if (bufferedSize
&& !failingToConnect
)
92 return AsyncJob::doneAll();
96 Log::TcpLogger::swanSong()
98 disconnect(); // optional: refcounting should close/delete conn eventually
103 Log::TcpLogger::endGracefully()
105 // job call protection must end our job if we are done logging current bufs
106 assert(inCall
!= NULL
);
112 Log::TcpLogger::flush()
114 flushDebt
= bufferedSize
;
119 Log::TcpLogger::logRecord(const char *buf
, const size_t len
)
121 appendRecord(buf
, len
);
125 /// starts writing if and only if it is time to write accumulated records
127 Log::TcpLogger::writeIfNeeded()
129 // write if an earlier flush command forces us to write or
130 // if we have filled at least one I/O buffer
131 if (flushDebt
> 0 || buffers
.size() > 1)
135 /// starts writing if possible
136 void Log::TcpLogger::writeIfPossible()
138 debugs(MY_DEBUG_SECTION
, 7, "guards: " << (!writeScheduled
) <<
139 (bufferedSize
> 0) << (conn
!= NULL
) <<
140 (conn
!= NULL
&& !fd_table
[conn
->fd
].closing()) << " buffered: " <<
141 bufferedSize
<< '/' << buffers
.size());
143 // XXX: Squid shutdown sequence starts closing our connection before
144 // calling LogfileClose, leading to loss of log records during shutdown.
145 if (!writeScheduled
&& bufferedSize
> 0 && conn
!= NULL
&&
146 !fd_table
[conn
->fd
].closing()) {
147 debugs(MY_DEBUG_SECTION
, 5, "writing first buffer");
149 typedef CommCbMemFunT
<TcpLogger
, CommIoCbParams
> WriteDialer
;
150 AsyncCall::Pointer callback
= JobCallback(MY_DEBUG_SECTION
, 5, WriteDialer
, this, Log::TcpLogger::writeDone
);
151 const MemBlob::Pointer
&buffer
= buffers
.front();
152 Comm::Write(conn
, buffer
->mem
, buffer
->size
, callback
, NULL
);
153 writeScheduled
= true;
157 /// whether len more bytes can be buffered
159 Log::TcpLogger::canFit(const size_t len
) const
161 // TODO: limit reporting frequency in addition to reporting only changes
163 if (bufferedSize
+len
<= bufferCapacity
) {
165 // We can get here if a shorter record accidentally fits after we
166 // started dropping records. When that happens, the following
167 // DBG_IMPORTANT message will mislead admin into thinking that
168 // the problem was resolved (for a brief period of time, until
169 // another record comes in and overflows the buffer). It is
170 // difficult to prevent this without also creating the opposite
171 // problem: A huge record that does not fit and is dropped blocks
172 // subsequent regular records from being buffered until we write.
173 debugs(MY_DEBUG_SECTION
, DBG_IMPORTANT
, "tcp:" << remote
<<
174 " logger stops dropping records after " << drops
<< " drops" <<
175 "; current buffer use: " << (bufferedSize
+len
) <<
176 " out of " << bufferCapacity
<< " bytes");
181 if (!drops
|| dieOnError
) {
182 debugs(MY_DEBUG_SECTION
,
183 dieOnError
? DBG_CRITICAL
: DBG_IMPORTANT
,
184 "tcp:" << remote
<< " logger " << bufferCapacity
<< "-byte " <<
185 "buffer overflowed; cannot fit " <<
186 (bufferedSize
+len
-bufferCapacity
) << " bytes");
190 fatal("tcp logger buffer overflowed");
193 debugs(MY_DEBUG_SECTION
, DBG_IMPORTANT
, "tcp:" << remote
<<
194 " logger starts dropping records.");
200 /// buffer a record that might exceed IoBufSize
202 Log::TcpLogger::appendRecord(const char *record
, const size_t len
)
204 // they should not happen, but to be safe, let's protect drop start/stop
205 // monitoring algorithm from empty records (which can never be dropped)
215 // append without splitting buf, unless it exceeds IoBufSize
216 for (size_t off
= 0; off
< len
; off
+= IoBufSize
)
217 appendChunk(record
+ off
, min(len
- off
, IoBufSize
));
220 /// buffer a record chunk without splitting it across buffers
222 Log::TcpLogger::appendChunk(const char *chunk
, const size_t len
)
224 Must(len
<= IoBufSize
);
225 // add a buffer if there is not one that can accommodate len bytes
226 bool addBuffer
= buffers
.empty() ||
227 (buffers
.back()->size
+len
> IoBufSize
);
228 // also add a buffer if there is only one and that one is being written
229 addBuffer
= addBuffer
|| (writeScheduled
&& buffers
.size() == 1);
232 buffers
.push_back(new MemBlob(IoBufSize
));
233 debugs(MY_DEBUG_SECTION
, 7, "added buffer #" << buffers
.size());
236 Must(!buffers
.empty());
237 buffers
.back()->append(chunk
, len
);
241 /// starts [re]connecting to the remote logger
243 Log::TcpLogger::doConnect()
248 debugs(MY_DEBUG_SECTION
, 3, "connecting");
251 Comm::ConnectionPointer futureConn
= new Comm::Connection
;
252 futureConn
->remote
= remote
;
253 futureConn
->local
.setAnyAddr();
254 if (futureConn
->remote
.isIPv4())
255 futureConn
->local
.setIPv4();
257 typedef CommCbMemFunT
<TcpLogger
, CommConnectCbParams
> Dialer
;
258 AsyncCall::Pointer call
= JobCallback(MY_DEBUG_SECTION
, 5, Dialer
, this, Log::TcpLogger::connectDone
);
259 AsyncJob::Start(new Comm::ConnOpener(futureConn
, call
, 2));
262 /// Comm::ConnOpener callback
264 Log::TcpLogger::connectDone(const CommConnectCbParams
¶ms
)
266 if (params
.flag
!= Comm::OK
) {
267 const double delay
= 0.5; // seconds
268 if (connectFailures
++ % 100 == 0) {
269 debugs(MY_DEBUG_SECTION
, DBG_IMPORTANT
, "tcp:" << remote
<<
270 " logger connection attempt #" << connectFailures
<<
271 " failed. Will keep trying every " << delay
<< " seconds.");
274 if (!reconnectScheduled
) {
275 reconnectScheduled
= true;
276 eventAdd("Log::TcpLogger::DelayedReconnect",
277 Log::TcpLogger::DelayedReconnect
,
278 new Pointer(this), 0.5, 0, false);
281 if (connectFailures
> 0) {
282 debugs(MY_DEBUG_SECTION
, DBG_IMPORTANT
, "tcp:" << remote
<<
283 " logger connectivity restored after " <<
284 (connectFailures
+1) << " attempts.");
292 typedef CommCbMemFunT
<TcpLogger
, CommCloseCbParams
> Closer
;
293 closer
= JobCallback(MY_DEBUG_SECTION
, 4, Closer
, this, Log::TcpLogger::handleClosure
);
294 comm_add_close_handler(conn
->fd
, closer
);
300 // XXX: Needed until eventAdd() starts accepting Async calls directly.
301 /// Log::TcpLogger::delayedReconnect() wrapper.
303 Log::TcpLogger::DelayedReconnect(void *data
)
305 Pointer
*ptr
= static_cast<Pointer
*>(data
);
307 if (TcpLogger
*logger
= ptr
->valid()) {
308 // Get back inside AsyncJob protections by scheduling another call.
309 typedef NullaryMemFunT
<TcpLogger
> Dialer
;
310 AsyncCall::Pointer call
= JobCallback(MY_DEBUG_SECTION
, 5, Dialer
,
312 Log::TcpLogger::delayedReconnect
);
313 ScheduleCallHere(call
);
318 /// "sleep a little before trying to connect again" event callback
320 Log::TcpLogger::delayedReconnect()
322 Must(reconnectScheduled
);
324 reconnectScheduled
= false;
328 /// Comm::Write callback
330 Log::TcpLogger::writeDone(const CommIoCbParams
&io
)
332 writeScheduled
= false;
333 if (io
.flag
== Comm::ERR_CLOSING
) {
334 debugs(MY_DEBUG_SECTION
, 7, "closing");
335 // do nothing here -- our comm_close_handler will be called to clean up
336 } else if (io
.flag
!= Comm::OK
) {
337 debugs(MY_DEBUG_SECTION
, 2, "write failure: " << xstrerr(io
.xerrno
));
338 // keep the first buffer (the one we failed to write)
342 debugs(MY_DEBUG_SECTION
, 5, "write successful");
344 Must(!buffers
.empty()); // we had a buffer to write
345 const MemBlob::Pointer
&written
= buffers
.front();
346 const size_t writtenSize
= static_cast<size_t>(written
->size
);
347 // and we wrote the whole buffer
348 Must(io
.size
== writtenSize
);
349 Must(bufferedSize
>= writtenSize
);
350 bufferedSize
-= writtenSize
;
354 if (flushDebt
> io
.size
)
355 flushDebt
-= io
.size
;
357 flushDebt
= 0; // wrote everything we owed (or more)
363 /// This is our comm_close_handler. It is called when some external force
364 /// (e.g., reconfigure or shutdown) is closing the connection (rather than us).
366 Log::TcpLogger::handleClosure(const CommCloseCbParams
&)
368 assert(inCall
!= NULL
);
371 // in all current use cases, we should not try to reconnect
372 mustStop("Log::TcpLogger::handleClosure");
375 /// close our connection now, without flushing
377 Log::TcpLogger::disconnect()
380 if (closer
!= NULL
) {
381 comm_remove_close_handler(conn
->fd
, closer
);
389 /// Converts Logfile into a pointer to a valid TcpLogger job or,
390 /// if the logger job has quit, into a nill pointer
392 Log::TcpLogger::StillLogging(Logfile
*lf
)
394 if (Pointer
*pptr
= static_cast<Pointer
*>(lf
->data
))
395 return pptr
->get(); // may be nil
400 Log::TcpLogger::Flush(Logfile
* lf
)
402 if (TcpLogger
*logger
= StillLogging(lf
))
407 Log::TcpLogger::WriteLine(Logfile
* lf
, const char *buf
, size_t len
)
409 if (TcpLogger
*logger
= StillLogging(lf
))
410 logger
->logRecord(buf
, len
);
414 Log::TcpLogger::StartLine(Logfile
*)
419 Log::TcpLogger::EndLine(Logfile
* lf
)
421 if (!Config
.onoff
.buffered_logs
)
426 Log::TcpLogger::Rotate(Logfile
*, const int16_t)
431 Log::TcpLogger::Close(Logfile
* lf
)
433 if (TcpLogger
*logger
= StillLogging(lf
)) {
434 debugs(50, 3, "Closing " << logger
);
435 typedef NullaryMemFunT
<TcpLogger
> Dialer
;
436 Dialer
dialer(logger
, &Log::TcpLogger::endGracefully
);
437 AsyncCall::Pointer call
= asyncCall(50, 3, "Log::TcpLogger::endGracefully", dialer
);
438 ScheduleCallHere(call
);
440 delete static_cast<Pointer
*>(lf
->data
);
445 * This code expects the path to be //host:port
448 Log::TcpLogger::Open(Logfile
* lf
, const char *path
, size_t bufsz
, int fatalFlag
)
450 assert(!StillLogging(lf
));
451 debugs(5, 3, "Tcp Open called");
455 if (strncmp(path
, "//", 2) == 0)
457 char *strAddr
= xstrdup(path
);
458 if (!GetHostWithPort(strAddr
, &addr
)) {
459 if (lf
->flags
.fatal
) {
460 fatalf("Invalid TCP logging address '%s'\n", lf
->path
);
462 debugs(50, DBG_IMPORTANT
, "Invalid TCP logging address '" << lf
->path
<< "'");
469 TcpLogger
*logger
= new TcpLogger(bufsz
, fatalFlag
, addr
);
470 lf
->data
= new Pointer(logger
);
471 lf
->f_close
= &Close
;
472 lf
->f_linewrite
= &WriteLine
;
473 lf
->f_linestart
= &StartLine
;
474 lf
->f_lineend
= &EndLine
;
475 lf
->f_flush
= &Flush
;
476 lf
->f_rotate
= &Rotate
;
477 AsyncJob::Start(logger
);