#include "dolog.hh"
#endif
-void CircularWriteBuffer::write(const std::string& str)
+bool CircularWriteBuffer::hasRoomFor(const std::string& str) const
{
- if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
- flush();
+ if (d_buffer.size() + 2 + str.size() > d_buffer.capacity()) {
+ return false;
+ }
- if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
- throw std::runtime_error("Full!");
+ return true;
+}
+
+bool CircularWriteBuffer::write(const std::string& str)
+{
+ if (!hasRoomFor(str)) {
+ return false;
+ }
uint16_t len = htons(str.size());
- char* ptr = (char*)&len;
+ const char* ptr = reinterpret_cast<const char*>(&len);
d_buffer.insert(d_buffer.end(), ptr, ptr + 2);
d_buffer.insert(d_buffer.end(), str.begin(), str.end());
+
+ return true;
}
-void CircularWriteBuffer::flush()
+bool CircularWriteBuffer::flush(int fd)
{
- if(d_buffer.empty()) // not optional, we report EOF otherwise
- return;
+ if (d_buffer.empty()) {
+ // not optional, we report EOF otherwise
+ return false;
+ }
auto arr1 = d_buffer.array_one();
auto arr2 = d_buffer.array_two();
struct iovec iov[2];
- int pos=0;
- size_t total=0;
+ int pos = 0;
+ size_t total = 0;
for(const auto& arr : {arr1, arr2}) {
if(arr.second) {
iov[pos].iov_base = arr.first;
}
}
- int res = writev(d_fd, iov, pos);
- if(res < 0) {
- throw std::runtime_error("Couldn't flush a thing: "+stringerror());
- }
- if(!res) {
- throw std::runtime_error("EOF");
+ ssize_t res = 0;
+ do {
+ res = writev(fd, iov, pos);
+
+ if (res < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return false;
+ }
+
+ /* we can't be sure we haven't sent a partial message,
+ and we don't want to send the remaining part after reconnecting */
+ d_buffer.clear();
+ throw std::runtime_error("Couldn't flush a thing: " + stringerror());
+ }
+ else if (!res) {
+ /* we can't be sure we haven't sent a partial message,
+ and we don't want to send the remaining part after reconnecting */
+ d_buffer.clear();
+ throw std::runtime_error("EOF");
+ }
}
+ while (res < 0);
+
// cout<<"Flushed "<<res<<" bytes out of " << total <<endl;
if (static_cast<size_t>(res) == d_buffer.size()) {
d_buffer.clear();
}
else {
- while(res--) {
+ while (res--) {
d_buffer.pop_front();
}
}
+
+ return true;
}
-RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_maxQueuedBytes(maxQueuedBytes), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect)
+RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_writer(maxQueuedBytes), d_remote(remote), d_maxQueuedBytes(maxQueuedBytes), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect)
{
if (!d_asyncConnect) {
- if(reconnect())
- d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
+ reconnect();
}
+
d_thread = std::thread(&RemoteLogger::maintenanceThread, this);
}
bool RemoteLogger::reconnect()
{
- if (d_socket >= 0) {
- close(d_socket);
- d_socket = -1;
- }
try {
- d_socket = SSocket(d_remote.sin4.sin_family, SOCK_STREAM, 0);
- setNonBlocking(d_socket);
- SConnectWithTimeout(d_socket, d_remote, d_timeout);
+ auto newSock = make_unique<Socket>(d_remote.sin4.sin_family, SOCK_STREAM, 0);
+ newSock->setNonBlocking();
+ newSock->connect(d_remote, d_timeout);
+
+ {
+ /* we are now successfully connected, time to take the lock and update the
+ socket */
+ std::unique_lock<std::mutex> lock(d_mutex);
+ d_socket = std::move(newSock);
+ }
}
- catch(const std::exception& e) {
+ catch (const std::exception& e) {
#ifdef WE_ARE_RECURSOR
g_log<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl;
#else
warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what());
#endif
+
return false;
}
return true;
void RemoteLogger::queueData(const std::string& data)
{
- if(!d_writer) {
- ++d_drops;
- return;
- }
std::unique_lock<std::mutex> lock(d_mutex);
- if(d_writer) {
+
+ if (!d_writer.hasRoomFor(data)) {
+ /* not connected, queue is full, just drop */
+ if (!d_socket) {
+ ++d_drops;
+ return;
+ }
try {
- d_writer->write(data);
- ++d_queued;
+ /* we try to flush some data */
+ if (!d_writer.flush(d_socket->getHandle())) {
+ /* but failed, let's just drop */
+ ++d_drops;
+ return;
+ }
+
+ /* see if we freed enough data */
+ if (!d_writer.hasRoomFor(data)) {
+ /* we didn't */
+ ++d_drops;
+ return;
+ }
}
catch(const std::exception& e) {
// cout << "Got exception writing: "<<e.what()<<endl;
++d_drops;
- d_writer.reset();
- close(d_socket);
- d_socket = -1;
+ d_socket.reset();
+ return;
}
}
-}
+ d_writer.write(data);
+ ++d_queued;
+}
void RemoteLogger::maintenanceThread()
try
#endif
setThreadName(threadName);
- for(;;) {
- if(d_exiting)
+ for (;;) {
+ if (d_exiting) {
break;
+ }
- if(d_writer) {
- std::unique_lock<std::mutex> lock(d_mutex);
- if(d_writer) { // check if it is still set
- // cout<<"Flush"<<endl;
- try {
- d_writer->flush();
+ bool connected = true;
+ if (d_socket == nullptr) {
+ // if it was unset, it will remain so, we are the only ones setting it!
+ connected = reconnect();
+ }
+
+ /* we will just go to sleep if the reconnection just failed */
+ if (connected) {
+ try {
+ /* we don't want to take the lock while trying to reconnect */
+ std::unique_lock<std::mutex> lock(d_mutex);
+ if (d_socket) { // check if it is set
+ d_writer.flush(d_socket->getHandle());
}
- catch(std::exception& e) {
- // cout<<"Flush failed!"<<endl;
- d_writer.reset();
- close(d_socket);
- d_socket = -1;
+ else {
+ connected = false;
}
}
+ catch(const std::exception& e) {
+ d_socket.reset();
+ connected = false;
+ }
+
+ if (!connected) {
+ /* let's try to reconnect right away, we are about to sleep anyway */
+ reconnect();
+ }
}
- else if(reconnect()) { // if it was zero, it will remain zero, we are the only ones setting it!
- std::unique_lock<std::mutex> lock(d_mutex);
- d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
- }
+
sleep(d_reconnectWaitTime);
}
}
RemoteLogger::~RemoteLogger()
{
d_exiting = true;
- if (d_socket >= 0) {
- close(d_socket);
- d_socket = -1;
- }
d_thread.join();
}