3 #include "comm/Connection.h"
4 #include "comm/ConnOpener.h"
5 #include "comm/Loops.h"
6 #include "comm/Write.h"
8 #include "globals.h" // for shutting_down
9 #include "log/CustomLog.h"
11 #include "log/TcpLogger.h"
14 #include "SquidConfig.h"
15 #include "SquidTime.h"
17 // a single I/O buffer should be large enough to store any access.log record
18 const size_t Log::TcpLogger::IoBufSize
= 2*MAX_URL
;
20 // We need at least two buffers because when we write the first buffer,
21 // we have to use the second buffer to accumulate new entries.
22 const size_t Log::TcpLogger::BufferCapacityMin
= 2*Log::TcpLogger::IoBufSize
;
24 #define MY_DEBUG_SECTION 50 /* Log file handling */
26 CBDATA_NAMESPACED_CLASS_INIT(Log
, TcpLogger
);
28 Log::TcpLogger::TcpLogger(size_t bufCap
, bool dieOnErr
, Ip::Address them
):
29 AsyncJob("TcpLogger"),
31 bufferCapacity(bufCap
),
35 reconnectScheduled(false),
36 writeScheduled(false),
42 if (bufferCapacity
< BufferCapacityMin
) {
43 debugs(MY_DEBUG_SECTION
, DBG_IMPORTANT
,
44 "WARNING: tcp:" << remote
<< " logger configured buffer " <<
45 "size " << bufferCapacity
<< " is smaller than the " <<
46 BufferCapacityMin
<< "-byte" << " minimum. " <<
47 "Using the minimum instead.");
48 bufferCapacity
= BufferCapacityMin
;
52 Log::TcpLogger::~TcpLogger()
54 // make sure Comm::Write does not have our buffer pointer
55 assert(!writeScheduled
);
59 Log::TcpLogger::start()
65 Log::TcpLogger::doneAll() const
67 debugs(MY_DEBUG_SECTION
, 5, "quitOnEmpty: " << quitOnEmpty
<<
68 " buffered: " << bufferedSize
<<
69 " conn: " << conn
<< ' ' << connectFailures
);
71 // we do not quit unless we are told that we may
75 /* We were asked to quit after we are done writing buffers. Are we done? */
77 // If we have records but are failing to connect, quit. Otherwise, we may
78 // be trying to connect forever due to a [since fixed] misconfiguration!
79 const bool failingToConnect
= !conn
&& connectFailures
;
80 if (bufferedSize
&& !failingToConnect
)
83 return AsyncJob::doneAll();
87 Log::TcpLogger::swanSong()
89 disconnect(); // optional: refcounting should close/delete conn eventually
94 Log::TcpLogger::endGracefully()
96 // job call protection must end our job if we are done logging current bufs
97 assert(inCall
!= NULL
);
103 Log::TcpLogger::flush()
105 flushDebt
= bufferedSize
;
110 Log::TcpLogger::logRecord(const char *buf
, const size_t len
)
112 appendRecord(buf
, len
);
116 /// starts writing if and only if it is time to write accumulated records
118 Log::TcpLogger::writeIfNeeded()
120 // write if an earlier flush command forces us to write or
121 // if we have filled at least one I/O buffer
122 if (flushDebt
> 0 || buffers
.size() > 1)
126 /// starts writing if possible
127 void Log::TcpLogger::writeIfPossible()
129 debugs(MY_DEBUG_SECTION
, 7, "guards: " << (!writeScheduled
) <<
130 (bufferedSize
> 0) << (conn
!= NULL
) <<
131 (conn
!= NULL
&& !fd_table
[conn
->fd
].closing()) << " buffered: " <<
132 bufferedSize
<< '/' << buffers
.size());
134 // XXX: Squid shutdown sequence starts closing our connection before
135 // calling LogfileClose, leading to loss of log records during shutdown.
136 if (!writeScheduled
&& bufferedSize
> 0 && conn
!= NULL
&&
137 !fd_table
[conn
->fd
].closing()) {
138 debugs(MY_DEBUG_SECTION
, 5, "writing first buffer");
140 typedef CommCbMemFunT
<TcpLogger
, CommIoCbParams
> WriteDialer
;
141 AsyncCall::Pointer callback
= JobCallback(MY_DEBUG_SECTION
, 5, WriteDialer
, this, Log::TcpLogger::writeDone
);
142 const MemBlob::Pointer
&buffer
= buffers
.front();
143 Comm::Write(conn
, buffer
->mem
, buffer
->size
, callback
, NULL
);
144 writeScheduled
= true;
148 /// whether len more bytes can be buffered
150 Log::TcpLogger::canFit(const size_t len
) const
152 // TODO: limit reporting frequency in addition to reporting only changes
154 if (bufferedSize
+len
<= bufferCapacity
) {
156 // We can get here if a shorter record accidentally fits after we
157 // started dropping records. When that happens, the following
158 // DBG_IMPORTANT message will mislead admin into thinking that
159 // the problem was resolved (for a brief period of time, until
160 // another record comes in and overflows the buffer). It is
161 // difficult to prevent this without also creating the opposite
162 // problem: A huge record that does not fit and is dropped blocks
163 // subsequent regular records from being buffered until we write.
164 debugs(MY_DEBUG_SECTION
, DBG_IMPORTANT
, "tcp:" << remote
<<
165 " logger stops dropping records after " << drops
<< " drops" <<
166 "; current buffer use: " << (bufferedSize
+len
) <<
167 " out of " << bufferCapacity
<< " bytes");
172 if (!drops
|| dieOnError
) {
173 debugs(MY_DEBUG_SECTION
,
174 dieOnError
? DBG_CRITICAL
: DBG_IMPORTANT
,
175 "tcp:" << remote
<< " logger " << bufferCapacity
<< "-byte " <<
176 "buffer overflowed; cannot fit " <<
177 (bufferedSize
+len
-bufferCapacity
) << " bytes");
181 fatal("tcp logger buffer overflowed");
184 debugs(MY_DEBUG_SECTION
, DBG_IMPORTANT
, "tcp:" << remote
<<
185 " logger starts dropping records.");
191 /// buffer a record that might exceed IoBufSize
193 Log::TcpLogger::appendRecord(const char *record
, const size_t len
)
195 // they should not happen, but to be safe, let's protect drop start/stop
196 // monitoring algorithm from empty records (which can never be dropped)
206 // append without spliting buf, unless it exceeds IoBufSize
207 for (size_t off
= 0; off
< len
; off
+= IoBufSize
)
208 appendChunk(record
+ off
, min(len
- off
, IoBufSize
));
211 /// buffer a record chunk without splitting it across buffers
213 Log::TcpLogger::appendChunk(const char *chunk
, const size_t len
)
215 Must(len
<= IoBufSize
);
216 // add a buffer if there is not one that can accomodate len bytes
217 bool addBuffer
= buffers
.empty() ||
218 (buffers
.back()->size
+len
> IoBufSize
);
219 // also add a buffer if there is only one and that one is being written
220 addBuffer
= addBuffer
|| (writeScheduled
&& buffers
.size() == 1);
223 buffers
.push_back(new MemBlob(IoBufSize
));
224 debugs(MY_DEBUG_SECTION
, 7, "added buffer #" << buffers
.size());
227 Must(!buffers
.empty());
228 buffers
.back()->append(chunk
, len
);
232 /// starts [re]connecting to the remote logger
234 Log::TcpLogger::connect()
239 debugs(MY_DEBUG_SECTION
, 3, "connecting");
242 Comm::ConnectionPointer futureConn
= new Comm::Connection
;
243 futureConn
->remote
= remote
;
244 futureConn
->local
.setAnyAddr();
245 if (futureConn
->remote
.isIPv4())
246 futureConn
->local
.setIPv4();
248 typedef CommCbMemFunT
<TcpLogger
, CommConnectCbParams
> Dialer
;
249 AsyncCall::Pointer call
= JobCallback(MY_DEBUG_SECTION
, 5, Dialer
, this, Log::TcpLogger::connectDone
);
250 AsyncJob::Start(new Comm::ConnOpener(futureConn
, call
, 2));
253 /// Comm::ConnOpener callback
255 Log::TcpLogger::connectDone(const CommConnectCbParams
¶ms
)
257 if (params
.flag
!= COMM_OK
) {
258 const double delay
= 0.5; // seconds
259 if (connectFailures
++ % 100 == 0) {
260 debugs(MY_DEBUG_SECTION
, DBG_IMPORTANT
, "tcp:" << remote
<<
261 " logger connection attempt #" << connectFailures
<<
262 " failed. Will keep trying every " << delay
<< " seconds.");
265 if (!reconnectScheduled
) {
266 reconnectScheduled
= true;
267 eventAdd("Log::TcpLogger::DelayedReconnect",
268 Log::TcpLogger::DelayedReconnect
,
269 new Pointer(this), 0.5, 0, false);
272 if (connectFailures
> 0) {
273 debugs(MY_DEBUG_SECTION
, DBG_IMPORTANT
, "tcp:" << remote
<<
274 " logger connectivity restored after " <<
275 (connectFailures
+1) << " attempts.");
283 typedef CommCbMemFunT
<TcpLogger
, CommCloseCbParams
> Closer
;
284 closer
= JobCallback(MY_DEBUG_SECTION
, 4, Closer
, this, Log::TcpLogger::handleClosure
);
285 comm_add_close_handler(conn
->fd
, closer
);
291 // XXX: Needed until eventAdd() starts accepting Async calls directly.
292 /// Log::TcpLogger::delayedReconnect() wrapper.
294 Log::TcpLogger::DelayedReconnect(void *data
)
296 Pointer
*ptr
= static_cast<Pointer
*>(data
);
298 if (TcpLogger
*logger
= ptr
->valid()) {
299 // Get back inside AsyncJob protections by scheduling another call.
300 typedef NullaryMemFunT
<TcpLogger
> Dialer
;
301 AsyncCall::Pointer call
= JobCallback(MY_DEBUG_SECTION
, 5, Dialer
,
303 Log::TcpLogger::delayedReconnect
);
304 ScheduleCallHere(call
);
309 /// "sleep a little before trying to connect again" event callback
311 Log::TcpLogger::delayedReconnect()
313 Must(reconnectScheduled
);
315 reconnectScheduled
= false;
319 /// Comm::Write callback
321 Log::TcpLogger::writeDone(const CommIoCbParams
&io
)
323 writeScheduled
= false;
324 if (io
.flag
== COMM_ERR_CLOSING
) {
325 debugs(MY_DEBUG_SECTION
, 7, "closing");
326 // do nothing here -- our comm_close_handler will be called to clean up
327 } else if (io
.flag
!= COMM_OK
) {
328 debugs(MY_DEBUG_SECTION
, 2, "write failure: " << xstrerr(io
.xerrno
));
329 // keep the first buffer (the one we failed to write)
333 debugs(MY_DEBUG_SECTION
, 5, "write successful");
335 Must(!buffers
.empty()); // we had a buffer to write
336 const MemBlob::Pointer
&written
= buffers
.front();
337 const size_t writtenSize
= static_cast<size_t>(written
->size
);
338 // and we wrote the whole buffer
339 Must(io
.size
== writtenSize
);
340 Must(bufferedSize
>= writtenSize
);
341 bufferedSize
-= writtenSize
;
345 if (flushDebt
> io
.size
)
346 flushDebt
-= io
.size
;
348 flushDebt
= 0; // wrote everything we owed (or more)
354 /// This is our comm_close_handler. It is called when some external force
355 /// (e.g., reconfigure or shutdown) is closing the connection (rather than us).
357 Log::TcpLogger::handleClosure(const CommCloseCbParams
&io
)
359 assert(inCall
!= NULL
);
362 // in all current use cases, we should not try to reconnect
363 mustStop("Log::TcpLogger::handleClosure");
366 /// close our connection now, without flushing
368 Log::TcpLogger::disconnect()
371 if (closer
!= NULL
) {
372 comm_remove_close_handler(conn
->fd
, closer
);
380 /// Converts Logfile into a pointer to a valid TcpLogger job or,
381 /// if the logger job has quit, into a nill pointer
383 Log::TcpLogger::StillLogging(Logfile
*lf
)
385 if (Pointer
*pptr
= static_cast<Pointer
*>(lf
->data
))
386 return pptr
->get(); // may be nil
391 Log::TcpLogger::Flush(Logfile
* lf
)
393 if (TcpLogger
*logger
= StillLogging(lf
))
398 Log::TcpLogger::WriteLine(Logfile
* lf
, const char *buf
, size_t len
)
400 if (TcpLogger
*logger
= StillLogging(lf
))
401 logger
->logRecord(buf
, len
);
405 Log::TcpLogger::StartLine(Logfile
* lf
)
410 Log::TcpLogger::EndLine(Logfile
* lf
)
412 if (!Config
.onoff
.buffered_logs
)
417 Log::TcpLogger::Rotate(Logfile
* lf
)
422 Log::TcpLogger::Close(Logfile
* lf
)
424 if (TcpLogger
*logger
= StillLogging(lf
)) {
425 debugs(50, 3, "Closing " << logger
);
426 typedef NullaryMemFunT
<TcpLogger
> Dialer
;
427 Dialer
dialer(logger
, &Log::TcpLogger::endGracefully
);
428 AsyncCall::Pointer call
= asyncCall(50, 3, "Log::TcpLogger::endGracefully", dialer
);
429 ScheduleCallHere(call
);
431 delete static_cast<Pointer
*>(lf
->data
);
436 * This code expects the path to be //host:port
439 Log::TcpLogger::Open(Logfile
* lf
, const char *path
, size_t bufsz
, int fatalFlag
)
441 assert(!StillLogging(lf
));
442 debugs(5, 3, "Tcp Open called");
446 if (strncmp(path
, "//", 2) == 0)
448 char *strAddr
= xstrdup(path
);
449 if (!GetHostWithPort(strAddr
, &addr
)) {
450 if (lf
->flags
.fatal
) {
451 fatalf("Invalid TCP logging address '%s'\n", lf
->path
);
453 debugs(50, DBG_IMPORTANT
, "Invalid TCP logging address '" << lf
->path
<< "'");
460 TcpLogger
*logger
= new TcpLogger(bufsz
, fatalFlag
, addr
);
461 lf
->data
= new Pointer(logger
);
462 lf
->f_close
= &Close
;
463 lf
->f_linewrite
= &WriteLine
;
464 lf
->f_linestart
= &StartLine
;
465 lf
->f_lineend
= &EndLine
;
466 lf
->f_flush
= &Flush
;
467 lf
->f_rotate
= &Rotate
;
468 AsyncJob::Start(logger
);