From: Radosław Korzeniewski Date: Fri, 10 Aug 2018 10:34:05 +0000 (+0200) Subject: Refactoring of BSOCK and introducing BSOCKCORE. X-Git-Tag: Release-9.2.1~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c3c124adbe784a55bb31c66401630b2d22768baf;p=thirdparty%2Fbacula.git Refactoring of BSOCK and introducing BSOCKCORE. A BSOCKCORE class was designed as a code split from previous BSOCK class and as a base class for new BSOCK. The new BSOCK class should retain virtually the same API as the old one. This was achieved (not)as a simple class variables and methods split between two new classes. All the methods and code which implement Bacula communication protocol goes to the new BSOCK. This includes the following functionalities: communication line compression, data spooling and authentication. All the rest as a generic functions and methods goes to BSOCKCORE. --- diff --git a/bacula/src/lib/Makefile.in b/bacula/src/lib/Makefile.in index 56b6d36fb..5d73bd289 100644 --- a/bacula/src/lib/Makefile.in +++ b/bacula/src/lib/Makefile.in @@ -32,7 +32,7 @@ dummy: # INCLUDE_FILES = ../baconfig.h ../bacula.h ../bc_types.h \ ../config.h ../jcr.h ../version.h \ - address_conf.h alist.h attr.h base64.h \ + address_conf.h alist.h attr.h base64.h bsockcore.h \ berrno.h bits.h bjson.h bpipe.h breg.h bregex.h \ bsock.h btime.h btimers.h crypto.h dlist.h \ flist.h fnmatch.h guid_to_name.h htable.h lex.h \ @@ -51,7 +51,7 @@ LIBBAC_SRCS = attr.c base64.c berrno.c bsys.c binflate.c bget_msg.c \ cram-md5.c crc32.c crypto.c daemon.c edit.c fnmatch.c \ guid_to_name.c hmac.c jcr.c lex.c lz4.c alist.c dlist.c \ md5.c message.c mem_pool.c openssl.c \ - plugins.c priv.c queue.c bregex.c \ + plugins.c priv.c queue.c bregex.c bsockcore.c \ runscript.c rwlock.c scan.c sellist.c serial.c sha1.c sha2.c \ signal.c smartall.c rblist.c tls.c tree.c \ util.c var.c watchdog.c workq.c btimers.c \ @@ -203,7 +203,6 @@ sellist_test: Makefile sellist.c $(RMF) sellist.o $(CXX) $(DEFS) $(DEBUG) -c $(CPPFLAGS) -I$(srcdir) -I$(basedir) $(DINCLUDE) $(CFLAGS) sellist.c - xml_test: Makefile xml.o $(RMF) xml.o $(CXX) -DTEST_PROG $(DEFS) $(DEBUG) -c $(CPPFLAGS) -I$(srcdir) -I$(basedir) $(DINCLUDE) $(CFLAGS) xml.c @@ -239,6 +238,20 @@ ini: Makefile ini.o $(RMF) ini.o $(CXX) $(DEFS) $(DEBUG) -c $(CPPFLAGS) -I$(srcdir) -I$(basedir) $(DINCLUDE) $(CFLAGS) ini.c +bsockcore_test: Makefile bsockcore.o + $(RMF) bsockcore.o + $(CXX) -DTEST_PROGRAM $(DEFS) $(DEBUG) -c $(CPPFLAGS) -I$(srcdir) -I$(basedir) $(DINCLUDE) $(CFLAGS) bsockcore.c + $(LIBTOOL_LINK) $(CXX) $(LDFLAGS) -L. -o $@ bsockcore.o $(DLIB) -lbac -lm $(LIBS) + $(RMF) bsockcore.o + $(CXX) $(DEFS) $(DEBUG) -c $(CPPFLAGS) -I$(srcdir) -I$(basedir) $(DINCLUDE) $(CFLAGS) bsockcore.c + +bsock_test: Makefile bsock.o + $(RMF) bsock.o + $(CXX) -DTEST_PROGRAM $(DEFS) $(DEBUG) -c $(CPPFLAGS) -I$(srcdir) -I$(basedir) $(DINCLUDE) $(CFLAGS) bsock.c + $(LIBTOOL_LINK) $(CXX) $(LDFLAGS) -L. -o $@ bsock.o $(DLIB) -lbac -lm $(LIBS) + $(RMF) bsock.o + $(CXX) $(DEFS) $(DEBUG) -c $(CPPFLAGS) -I$(srcdir) -I$(basedir) $(DINCLUDE) $(CFLAGS) bsock.c + install-includes: $(MKDIR) $(DESTDIR)/$(includedir)/bacula for I in $(INCLUDE_FILES); do \ diff --git a/bacula/src/lib/bnet.c b/bacula/src/lib/bnet.c index 584388d16..0d2ad257e 100644 --- a/bacula/src/lib/bnet.c +++ b/bacula/src/lib/bnet.c @@ -51,149 +51,6 @@ static pthread_mutex_t ip_mutex = PTHREAD_MUTEX_INITIALIZER; #endif -/* - * Read a nbytes from the network. - * It is possible that the total bytes require in several - * read requests - */ - -int32_t read_nbytes(BSOCK * bsock, char *ptr, int32_t nbytes) -{ - int32_t nleft, nread; - -#ifdef HAVE_TLS - if (bsock->tls) { - /* TLS enabled */ - return (tls_bsock_readn(bsock, ptr, nbytes)); - } -#endif /* HAVE_TLS */ - - nleft = nbytes; - while (nleft > 0) { - errno = 0; - nread = socketRead(bsock->m_fd, ptr, nleft); - if (bsock->is_timed_out() || bsock->is_terminated()) { - return -1; - } - -#ifdef HAVE_WIN32 - /* - * We simulate errno on Windows for a socket - * error in order to handle errors correctly. - */ - if (nread == SOCKET_ERROR) { - DWORD err = WSAGetLastError(); - nread = -1; - if (err == WSAEINTR) { - errno = EINTR; - } else if (err == WSAEWOULDBLOCK) { - errno = EAGAIN; - } else { - errno = EIO; /* some other error */ - } - } -#endif - - if (nread == -1) { - if (errno == EINTR) { - continue; - } - if (errno == EAGAIN) { - bmicrosleep(0, 20000); /* try again in 20ms */ - continue; - } - } - if (nread <= 0) { - return -1; /* error, or EOF */ - } - nleft -= nread; - ptr += nread; - if (bsock->use_bwlimit()) { - bsock->control_bwlimit(nread); - } - } - return nbytes - nleft; /* return >= 0 */ -} - -/* - * Write nbytes to the network. - * It may require several writes. - */ - -int32_t write_nbytes(BSOCK * bsock, char *ptr, int32_t nbytes) -{ - int32_t nleft, nwritten; - - if (bsock->is_spooling()) { - nwritten = fwrite(ptr, 1, nbytes, bsock->m_spool_fd); - if (nwritten != nbytes) { - berrno be; - bsock->b_errno = errno; - Qmsg3(bsock->jcr(), M_FATAL, 0, _("Attr spool write error. wrote=%d wanted=%d bytes. ERR=%s\n"), - nbytes, nwritten, be.bstrerror()); - Dmsg2(400, "nwritten=%d nbytes=%d.\n", nwritten, nbytes); - errno = bsock->b_errno; - return -1; - } - return nbytes; - } - -#ifdef HAVE_TLS - if (bsock->tls) { - /* TLS enabled */ - return (tls_bsock_writen(bsock, ptr, nbytes)); - } -#endif /* HAVE_TLS */ - - nleft = nbytes; - while (nleft > 0) { - do { - errno = 0; - nwritten = socketWrite(bsock->m_fd, ptr, nleft); - if (bsock->is_timed_out() || bsock->is_terminated()) { - return -1; - } - -#ifdef HAVE_WIN32 - /* - * We simulate errno on Windows for a socket - * error in order to handle errors correctly. - */ - if (nwritten == SOCKET_ERROR) { - DWORD err = WSAGetLastError(); - nwritten = -1; - if (err == WSAEINTR) { - errno = EINTR; - } else if (err == WSAEWOULDBLOCK) { - errno = EAGAIN; - } else { - errno = EIO; /* some other error */ - } - } -#endif - - } while (nwritten == -1 && errno == EINTR); - /* - * If connection is non-blocking, we will get EAGAIN, so - * use select()/poll to keep from consuming all the CPU - * and try again. - */ - if (nwritten == -1 && errno == EAGAIN) { - fd_wait_data(bsock->m_fd, WAIT_WRITE, 1, 0); - continue; - } - if (nwritten <= 0) { - return -1; /* error */ - } - nleft -= nwritten; - ptr += nwritten; - if (bsock->use_bwlimit()) { - bsock->control_bwlimit(nwritten); - } - } - return nbytes - nleft; -} - /* * Establish a TLS connection -- server side * Returns: true on success @@ -321,16 +178,16 @@ bool bnet_tls_client(TLS_CONTEXT *ctx, BSOCK * bsock, alist *verify_list) #endif #if defined(HAVE_GETADDRINFO) -/* +/* * getaddrinfo.c - Simple example of using getaddrinfo(3) function. - * + * * Michal Ludvig (c) 2002, 2003 * http://www.logix.cz/michal/devel/ * * License: public domain. */ -const char *resolv_host(int family, const char *host, dlist *addr_list) -{ +const char *resolv_host(int family, const char *host, dlist *addr_list) +{ IPADDR *ipaddr; struct addrinfo hints, *res, *rp; int errcode; @@ -338,8 +195,8 @@ const char *resolv_host(int family, const char *host, dlist *addr_list) void *ptr; memset (&hints, 0, sizeof(hints)); - hints.ai_family = family; - hints.ai_socktype = SOCK_STREAM; + hints.ai_family = family; + hints.ai_socktype = SOCK_STREAM; //hints.ai_flags |= AI_CANONNAME; errcode = getaddrinfo (host, NULL, &hints, &res); @@ -348,33 +205,33 @@ const char *resolv_host(int family, const char *host, dlist *addr_list) for (rp=res; res; res=res->ai_next) { //inet_ntop (res->ai_family, res->ai_addr->sa_data, addrstr, 100); switch (res->ai_family) { - case AF_INET: + case AF_INET: ipaddr = New(IPADDR(rp->ai_addr->sa_family)); ipaddr->set_type(IPADDR::R_MULTIPLE); ptr = &((struct sockaddr_in *) res->ai_addr)->sin_addr; ipaddr->set_addr4((in_addr *)ptr); - break; + break; #if defined(HAVE_IPV6) - case AF_INET6: + case AF_INET6: ipaddr = New(IPADDR(rp->ai_addr->sa_family)); ipaddr->set_type(IPADDR::R_MULTIPLE); ptr = &((struct sockaddr_in6 *) res->ai_addr)->sin6_addr; ipaddr->set_addr6((in6_addr *)ptr); - break; -#endif - default: - continue; - } + break; +#endif + default: + continue; + } //inet_ntop (res->ai_family, ptr, addrstr, 100); //Pmsg3(000, "IPv%d address: %s (%s)\n", res->ai_family == PF_INET6 ? 6 : 4, // addrstr, res->ai_canonname); addr_list->append(ipaddr); - } + } freeaddrinfo(rp); - return NULL; -} + return NULL; +} -#else +#else /* * Get human readable error for gethostbyname() @@ -484,13 +341,13 @@ dlist *bnet_host2ipaddrs(const char *host, int family, const char **errstr) addr->set_addr4(&inaddr); addr_list->append(addr); #ifdef HAVE_IPV6 - } else if (inet_pton(AF_INET6, host, &inaddr6) == 1) { + } else if (inet_pton(AF_INET6, host, &inaddr6) == 1) { addr = New(IPADDR(AF_INET6)); addr->set_type(IPADDR::R_MULTIPLE); addr->set_addr6(&inaddr6); addr_list->append(addr); #endif - } else { + } else { if (family != 0) { errmsg = resolv_host(family, host, addr_list); if (errmsg) { @@ -527,121 +384,70 @@ const char *bnet_sig_to_ascii(int32_t msglen) static char buf[30]; switch (msglen) { case BNET_EOD: - return "BNET_EOD"; /* End of data stream, new data may follow */ + return "BNET_EOD"; /* End of data stream, new data may follow */ case BNET_EOD_POLL: - return "BNET_EOD_POLL"; /* End of data and poll all in one */ + return "BNET_EOD_POLL"; /* End of data and poll all in one */ case BNET_STATUS: - return "BNET_STATUS"; /* Send full status */ + return "BNET_STATUS"; /* Send full status */ case BNET_TERMINATE: - return "BNET_TERMINATE"; /* Conversation terminated, doing close() */ + return "BNET_TERMINATE"; /* Conversation terminated, doing close() */ case BNET_POLL: - return "BNET_POLL"; /* Poll request, I'm hanging on a read */ + return "BNET_POLL"; /* Poll request, I'm hanging on a read */ case BNET_HEARTBEAT: - return "BNET_HEARTBEAT"; /* Heartbeat Response requested */ + return "BNET_HEARTBEAT"; /* Heartbeat Response requested */ case BNET_HB_RESPONSE: - return "BNET_HB_RESPONSE"; /* Only response permited to HB */ + return "BNET_HB_RESPONSE"; /* Only response permited to HB */ case BNET_BTIME: - return "BNET_BTIME"; /* Send UTC btime */ + return "BNET_BTIME"; /* Send UTC btime */ case BNET_BREAK: - return "BNET_BREAK"; /* Stop current command -- ctl-c */ + return "BNET_BREAK"; /* Stop current command -- ctl-c */ case BNET_START_SELECT: - return "BNET_START_SELECT"; /* Start of a selection list */ + return "BNET_START_SELECT"; /* Start of a selection list */ case BNET_END_SELECT: - return "BNET_END_SELECT"; /* End of a select list */ + return "BNET_END_SELECT"; /* End of a select list */ case BNET_INVALID_CMD: - return "BNET_INVALID_CMD"; /* Invalid command sent */ + return "BNET_INVALID_CMD"; /* Invalid command sent */ case BNET_CMD_FAILED: - return "BNET_CMD_FAILED"; /* Command failed */ + return "BNET_CMD_FAILED"; /* Command failed */ case BNET_CMD_OK: - return "BNET_CMD_OK"; /* Command succeeded */ + return "BNET_CMD_OK"; /* Command succeeded */ case BNET_CMD_BEGIN: - return "BNET_CMD_BEGIN"; /* Start command execution */ + return "BNET_CMD_BEGIN"; /* Start command execution */ case BNET_MSGS_PENDING: - return "BNET_MSGS_PENDING"; /* Messages pending */ + return "BNET_MSGS_PENDING"; /* Messages pending */ case BNET_MAIN_PROMPT: - return "BNET_MAIN_PROMPT"; /* Server ready and waiting */ + return "BNET_MAIN_PROMPT"; /* Server ready and waiting */ case BNET_SELECT_INPUT: - return "BNET_SELECT_INPUT"; /* Return selection input */ + return "BNET_SELECT_INPUT"; /* Return selection input */ case BNET_WARNING_MSG: - return "BNET_WARNING_MSG"; /* Warning message */ + return "BNET_WARNING_MSG"; /* Warning message */ case BNET_ERROR_MSG: - return "BNET_ERROR_MSG"; /* Error message -- command failed */ + return "BNET_ERROR_MSG"; /* Error message -- command failed */ case BNET_INFO_MSG: - return "BNET_INFO_MSG"; /* Info message -- status line */ + return "BNET_INFO_MSG"; /* Info message -- status line */ case BNET_RUN_CMD: - return "BNET_RUN_CMD"; /* Run command follows */ + return "BNET_RUN_CMD"; /* Run command follows */ case BNET_YESNO: - return "BNET_YESNO"; /* Request yes no response */ + return "BNET_YESNO"; /* Request yes no response */ case BNET_START_RTREE: - return "BNET_START_RTREE"; /* Start restore tree mode */ + return "BNET_START_RTREE"; /* Start restore tree mode */ case BNET_END_RTREE: - return "BNET_END_RTREE"; /* End restore tree mode */ + return "BNET_END_RTREE"; /* End restore tree mode */ case BNET_SUB_PROMPT: - return "BNET_SUB_PROMPT"; /* Indicate we are at a subprompt */ + return "BNET_SUB_PROMPT"; /* Indicate we are at a subprompt */ case BNET_TEXT_INPUT: - return "BNET_TEXT_INPUT"; /* Get text input from user */ + return "BNET_TEXT_INPUT"; /* Get text input from user */ case BNET_EXT_TERMINATE: return "BNET_EXT_TERMINATE"; /* A Terminate condition has been met and - already reported somewhere else */ - case BNET_FDCALLED : - return "BNET_FDCALLED"; /* The FD should keep the connection for a new job */ + already reported somewhere else */ + case BNET_FDCALLED: + return "BNET_FDCALLED"; /* The FD should keep the connection for a new job */ default: bsnprintf(buf, sizeof(buf), _("Unknown sig %d"), (int)msglen); return buf; } } -/* Initialize internal socket structure. - * This probably should be done in bsock.c - */ -BSOCK *init_bsock(JCR *jcr, int sockfd, const char *who, - const char *host, int port, struct sockaddr *client_addr) -{ - Dmsg4(100, "socket=%d who=%s host=%s port=%d\n", sockfd, who, host, port); - BSOCK *bsock = (BSOCK *)malloc(sizeof(BSOCK)); - bmemzero(bsock, sizeof(BSOCK)); - bsock->m_master=bsock; /* don't use set_master() here */ - bsock->m_fd = sockfd; - bsock->tls = NULL; - bsock->errors = 0; - bsock->m_blocking = 1; - bsock->pout_msg_no = &bsock->out_msg_no; - bsock->uninstall_send_hook_cb(); - bsock->msg = get_pool_memory(PM_BSOCK); - bsock->cmsg = get_pool_memory(PM_BSOCK); - bsock->errmsg = get_pool_memory(PM_MESSAGE); - bsock->set_who(bstrdup(who)); - bsock->set_host(bstrdup(host)); - bsock->set_port(port); - bmemzero(&bsock->peer_addr, sizeof(bsock->peer_addr)); - memcpy(&bsock->client_addr, client_addr, sizeof(bsock->client_addr)); - bsock->timeout = BSOCK_TIMEOUT; - bsock->set_jcr(jcr); - return bsock; -} - -BSOCK *dup_bsock(BSOCK *osock) -{ - BSOCK *bsock = (BSOCK *)malloc(sizeof(BSOCK)); - osock->set_locking(); - memcpy(bsock, osock, sizeof(BSOCK)); - bsock->msg = get_pool_memory(PM_BSOCK); - bsock->cmsg = get_pool_memory(PM_BSOCK); - bsock->errmsg = get_pool_memory(PM_MESSAGE); - if (osock->who()) { - bsock->set_who(bstrdup(osock->who())); - } - if (osock->host()) { - bsock->set_host(bstrdup(osock->host())); - } - if (osock->src_addr) { - bsock->src_addr = New( IPADDR( *(osock->src_addr)) ); - } - bsock->set_duped(); - bsock->set_master(osock); - return bsock; -} - int set_socket_errno(int sockstat) { #ifdef HAVE_WIN32 diff --git a/bacula/src/lib/bsock.c b/bacula/src/lib/bsock.c index e3481c34e..a6e4d98c3 100644 --- a/bacula/src/lib/bsock.c +++ b/bacula/src/lib/bsock.c @@ -19,7 +19,17 @@ /* * Network Utility Routines * + * The new code inherit common functions from BSOCKCORE class + * and implement BSOCK/Bacula specific protocols and data flow. + * * Written by Kern Sibbald + * + * Major refactoring of BSOCK code written by: + * + * Radosław Korzeniewski, MMXVIII + * radoslaw@korzeniewski.net, radekk@inteos.pl + * Inteos Sp. z o.o. http://www.inteos.pl/ + * */ #include "bacula.h" @@ -28,560 +38,249 @@ #include #include -#if !defined(ENODATA) /* not defined on BSD systems */ -#define ENODATA EPIPE -#endif - -#if !defined(SOL_TCP) /* Not defined on some systems */ -#define SOL_TCP IPPROTO_TCP -#endif - -#ifdef HAVE_WIN32 -#include -#define socketRead(fd, buf, len) ::recv(fd, buf, len, 0) -#define socketWrite(fd, buf, len) ::send(fd, buf, len, 0) -#define socketClose(fd) ::closesocket(fd) -static void win_close_wait(int fd); -#ifndef SOCK_CLOEXEC -#define SOCK_CLOEXEC 0 -#endif -#else -#define socketRead(fd, buf, len) ::read(fd, buf, len) -#define socketWrite(fd, buf, len) ::write(fd, buf, len) -#define socketClose(fd) ::close(fd) -#endif +#define BSOCK_DEBUG_LVL 900 + +/* Commands sent to Director */ +static char hello[] = "Hello %s calling\n"; +/* Response from Director */ +static char OKhello[] = "1000 OK:"; -/* TODO: We are flooded by tickets about lost console messages. We probably - * need to store them in a specific place, but waiting for that, we can - * discard them. See #3615 for example. - */ -#define isJobMessage(jcr) (jcr && jcr->JobId != 0) /* - * make a nice dump of a message + * BSOCK default constructor - initializes object. */ -void dump_bsock_msg(int sock, uint32_t msgno, const char *what, uint32_t rc, int32_t pktsize, uint32_t flags, POOLMEM *msg, int32_t msglen) -{ - char buf[54]; - bool is_ascii; - int dbglvl = DT_ASX; - - if (msglen<0) { - Dmsg4(dbglvl, "%s %d:%d SIGNAL=%s\n", what, sock, msgno, bnet_sig_to_ascii(msglen)); - // data - smartdump(msg, msglen, buf, sizeof(buf)-9, &is_ascii); - if (is_ascii) { - Dmsg5(dbglvl, "%s %d:%d len=%d \"%s\"\n", what, sock, msgno, msglen, buf); - } else { - Dmsg5(dbglvl, "%s %d:%d len=%d %s\n", what, sock, msgno, msglen, buf); - } - } -} - - -BSOCKCallback::BSOCKCallback() -{ -} - -BSOCKCallback::~BSOCKCallback() +BSOCK::BSOCK() { -} + init(); +}; +/* + * BSOCK special constructor initializes object and sets proper socked descriptor. + */ +BSOCK::BSOCK(int sockfd): + BSOCKCORE(), + m_spool_fd(NULL), + cmsg(NULL), + m_data_end(0), + m_last_data_end(0), + m_FileIndex(0), + m_lastFileIndex(0), + m_spool(false), + m_compress(false), + m_CommBytes(0), + m_CommCompressedBytes(0) +{ + init(); + m_terminated = false; + m_closed = false; + m_fd = sockfd; +}; /* - * This is a non-class BSOCK "constructor" because we want to - * call the Bacula smartalloc routines instead of new. + * BSOCK default destructor. */ -BSOCK *new_bsock() +BSOCK::~BSOCK() { - BSOCK *bsock = (BSOCK *)malloc(sizeof(BSOCK)); - bsock->init(); - return bsock; -} + Dmsg0(BSOCK_DEBUG_LVL, "BSOCK::~BSOCK()\n"); + _destroy(); +}; +/* + * BSOCK initialization method handles bsock specific variables. + */ void BSOCK::init() { - memset(this, 0, sizeof(BSOCK)); - m_master = this; - set_closed(); - set_terminated(); - m_blocking = 1; - pout_msg_no = &out_msg_no; - uninstall_send_hook_cb(); - msg = get_pool_memory(PM_BSOCK); - cmsg = get_pool_memory(PM_BSOCK); - errmsg = get_pool_memory(PM_MESSAGE); + /* the BSOCKCORE::init() is executed in base class constructor */ timeout = BSOCK_TIMEOUT; -} - -void BSOCK::free_tls() -{ - free_tls_connection(this->tls); - this->tls = NULL; + m_spool_fd = NULL; + cmsg = get_pool_memory(PM_BSOCK); } /* - * Try to connect to host for max_retry_time at retry_time intervals. - * Note, you must have called the constructor prior to calling - * this routine. + * BSOCK private destroy method releases bsock specific variables. */ -bool BSOCK::connect(JCR * jcr, int retry_interval, utime_t max_retry_time, - utime_t heart_beat, - const char *name, char *host, char *service, int port, - int verbose) +void BSOCK::_destroy() { - bool ok = false; - int i; - int fatal = 0; - time_t begin_time = time(NULL); - time_t now; - btimer_t *tid = NULL; - - /* Try to trap out of OS call when time expires */ - if (max_retry_time) { - tid = start_thread_timer(jcr, pthread_self(), (uint32_t)max_retry_time); - } - - for (i = 0; !open(jcr, name, host, service, port, heart_beat, &fatal); - i -= retry_interval) { - berrno be; - if (fatal || (jcr && job_canceled(jcr))) { - goto bail_out; - } - Dmsg4(50, "Unable to connect to %s on %s:%d. ERR=%s\n", - name, host, port, be.bstrerror()); - if (i < 0) { - i = 60 * 5; /* complain again in 5 minutes */ - if (verbose) - Qmsg4(jcr, M_WARNING, 0, _( - "Could not connect to %s on %s:%d. ERR=%s\n" - "Retrying ...\n"), name, host, port, be.bstrerror()); - } - bmicrosleep(retry_interval, 0); - now = time(NULL); - if (begin_time + max_retry_time <= now) { - Qmsg4(jcr, M_FATAL, 0, _("Unable to connect to %s on %s:%d. ERR=%s\n"), - name, host, port, be.bstrerror()); - goto bail_out; - } - } - ok = true; - -bail_out: - if (tid) { - stop_thread_timer(tid); + Dmsg0(BSOCK_DEBUG_LVL, "BSOCK::_destroy()\n"); + if (cmsg) { + free_pool_memory(cmsg); + cmsg = NULL; } - return ok; -} +}; /* - * Finish initialization of the packet structure. + * Authenticate Director */ -void BSOCK::fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port, - struct sockaddr *lclient_addr) +bool BSOCK::authenticate_director(const char *name, const char *password, + TLS_CONTEXT *tls_ctx, char *errmsg, int errmsg_len) { - Dmsg3(100, "who=%s host=%s port=%d\n", who, host, port); - m_fd = sockfd; - if (m_who) { - free(m_who); - } - if (m_host) { - free(m_host); - } - set_who(bstrdup(who)); - set_host(bstrdup(host)); - set_port(port); - memcpy(&client_addr, lclient_addr, sizeof(client_addr)); - set_jcr(jcr); -} + int tls_local_need = BNET_TLS_NONE; + int tls_remote_need = BNET_TLS_NONE; + int compatible = true; + char bashed_name[MAX_NAME_LENGTH]; + BSOCK *dir = this; /* for readability */ -/* - * Copy the address from the configuration dlist that gets passed in - */ -void BSOCK::set_source_address(dlist *src_addr_list) -{ - IPADDR *addr = NULL; + *errmsg = 0; + /* + * Send my name to the Director then do authentication + */ - // delete the object we already have, if it's allocated - if (src_addr) { - free( src_addr); - src_addr = NULL; - } + /* Timeout Hello after 15 secs */ + dir->start_timer(15); + dir->fsend(hello, bashed_name); - if (src_addr_list) { - addr = (IPADDR*) src_addr_list->first(); - src_addr = New( IPADDR(*addr)); + if (get_tls_enable(tls_ctx)) { + tls_local_need = get_tls_enable(tls_ctx) ? BNET_TLS_REQUIRED : BNET_TLS_OK; } -} -/* - * Open a TCP connection to the server - * Returns NULL - * Returns BSOCK * pointer on success - */ -bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service, - int port, utime_t heart_beat, int *fatal) -{ - int sockfd = -1; - dlist *addr_list; - IPADDR *ipaddr; - bool connected = false; - int turnon = 1; - const char *errstr; - int save_errno = 0; + /* respond to Dir challenge */ + if (!cram_md5_respond(dir, password, &tls_remote_need, &compatible) || + /* Now challenge dir */ + !cram_md5_challenge(dir, password, tls_local_need, compatible)) { + bsnprintf(errmsg, errmsg_len, _("Director authorization error at \"%s:%d\"\n"), + dir->host(), dir->port()); + goto bail_out; + } - /* - * Fill in the structure serv_addr with the address of - * the server that we want to connect with. - */ - if ((addr_list = bnet_host2ipaddrs(host, 0, &errstr)) == NULL) { - /* Note errstr is not malloc'ed */ - Qmsg2(jcr, M_ERROR, 0, _("gethostbyname() for host \"%s\" failed: ERR=%s\n"), - host, errstr); - Dmsg2(100, "bnet_host2ipaddrs() for host %s failed: ERR=%s\n", - host, errstr); - *fatal = 1; - return false; + /* Verify that the remote host is willing to meet our TLS requirements */ + if (tls_remote_need < tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) { + bsnprintf(errmsg, errmsg_len, _("Authorization error:" + " Remote server at \"%s:%d\" did not advertise required TLS support.\n"), + dir->host(), dir->port()); + goto bail_out; } - remove_duplicate_addresses(addr_list); - foreach_dlist(ipaddr, addr_list) { - ipaddr->set_port_net(htons(port)); - char allbuf[256 * 10]; - char curbuf[256]; - Dmsg2(100, "Current %sAll %s\n", - ipaddr->build_address_str(curbuf, sizeof(curbuf)), - build_addresses_str(addr_list, allbuf, sizeof(allbuf))); - /* Open a TCP socket */ - if ((sockfd = socket(ipaddr->get_family(), SOCK_STREAM|SOCK_CLOEXEC, 0)) < 0) { - berrno be; - save_errno = errno; - switch (errno) { -#ifdef EAFNOSUPPORT - case EAFNOSUPPORT: - /* - * The name lookup of the host returned an address in a protocol family - * we don't support. Suppress the error and try the next address. - */ - break; -#endif -#ifdef EPROTONOSUPPORT - /* See above comments */ - case EPROTONOSUPPORT: - break; -#endif -#ifdef EPROTOTYPE - /* See above comments */ - case EPROTOTYPE: - break; -#endif - default: - *fatal = 1; - Qmsg3(jcr, M_ERROR, 0, _("Socket open error. proto=%d port=%d. ERR=%s\n"), - ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror()); - Pmsg3(300, _("Socket open error. proto=%d port=%d. ERR=%s\n"), - ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror()); - break; - } - continue; - } + /* Verify that we are willing to meet the remote host's requirements */ + if (tls_remote_need > tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) { + bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\":" + " Remote server requires TLS.\n"), + dir->host(), dir->port()); - /* Bind to the source address if it is set */ - if (src_addr) { - if (bind(sockfd, src_addr->get_sockaddr(), src_addr->get_sockaddr_len()) < 0) { - berrno be; - save_errno = errno; - *fatal = 1; - Qmsg2(jcr, M_ERROR, 0, _("Source address bind error. proto=%d. ERR=%s\n"), - src_addr->get_family(), be.bstrerror() ); - Pmsg2(000, _("Source address bind error. proto=%d. ERR=%s\n"), - src_addr->get_family(), be.bstrerror() ); - if (sockfd >= 0) socketClose(sockfd); - continue; - } - } + goto bail_out; + } - /* - * Keep socket from timing out from inactivity - */ - if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) { - berrno be; - Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"), - be.bstrerror()); - } -#if defined(TCP_KEEPIDLE) - if (heart_beat) { - int opt = heart_beat; - if (setsockopt(sockfd, SOL_TCP, TCP_KEEPIDLE, (sockopt_val_t)&opt, sizeof(opt)) < 0) { - berrno be; - Qmsg1(jcr, M_WARNING, 0, _("Cannot set TCP_KEEPIDLE on socket: %s\n"), - be.bstrerror()); + /* Is TLS Enabled? */ + if (have_tls) { + if (tls_local_need >= BNET_TLS_OK && tls_remote_need >= BNET_TLS_OK) { + /* Engage TLS! Full Speed Ahead! */ + if (!bnet_tls_client(tls_ctx, dir, NULL)) { + bsnprintf(errmsg, errmsg_len, _("TLS negotiation failed with Director at \"%s:%d\"\n"), + dir->host(), dir->port()); + goto bail_out; } } -#endif - - /* connect to server */ - if (::connect(sockfd, ipaddr->get_sockaddr(), ipaddr->get_sockaddr_len()) < 0) { - save_errno = errno; - if (sockfd >= 0) socketClose(sockfd); - continue; - } - *fatal = 0; - connected = true; - break; } - if (!connected) { - berrno be; - free_addresses(addr_list); - errno = save_errno | b_errno_win32; - Dmsg4(50, "Could not connect to server %s %s:%d. ERR=%s\n", - name, host, port, be.bstrerror()); + Dmsg1(6, ">dird: %s", dir->msg); + if (dir->recv() <= 0) { + dir->stop_timer(); + bsnprintf(errmsg, errmsg_len, _("Bad errmsg to Hello command: ERR=%s\n" + "The Director at \"%s:%d\" may not be running.\n"), + dir->bstrerror(), dir->host(), dir->port()); return false; } - /* - * Keep socket from timing out from inactivity - * Do this a second time out of paranoia - */ - if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) { - berrno be; - Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"), - be.bstrerror()); - } - fin_init(jcr, sockfd, name, host, port, ipaddr->get_sockaddr()); - free_addresses(addr_list); - - /* Clean the packet a bit */ - m_closed = false; - m_duped = false; - m_spool = false; - m_use_locking = false; - m_timed_out = false; - m_terminated = false; - m_suppress_error_msgs = false; - errors = 0; - m_blocking = 0; - - Dmsg3(50, "OK connected to server %s %s:%d.\n", - name, host, port); + dir->stop_timer(); + Dmsg1(10, "msg); + if (strncmp(dir->msg, OKhello, sizeof(OKhello)-1) != 0) { + bsnprintf(errmsg, errmsg_len, _("Director at \"%s:%d\" rejected Hello command\n"), + dir->host(), dir->port()); + return false; + } else { + bsnprintf(errmsg, errmsg_len, "%s", dir->msg); + } return true; + +bail_out: + dir->stop_timer(); + bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\"\n" + "Most likely the passwords do not agree.\n" + "If you are using TLS, there may have been a certificate validation error during the TLS handshake.\n" + "For help, please see: " MANUAL_AUTH_URL "\n"), + dir->host(), dir->port()); + return false; } /* - * Force read/write to use locking + * Send a message over the network. Everything is sent in one + * write request, but depending on the mode you are using + * there will be either two or three read requests done. + * Read 1: 32 bits, gets essentially the packet length, but may + * have the upper bits set to indicate compression or + * an extended header packet. + * Read 2: 32 bits, this read is done only of BNET_HDR_EXTEND is set. + * In this case the top 16 bits of this 32 bit word are reserved + * for flags and the lower 16 bits for data. This word will be + * stored in the field "flags" in the BSOCKCORE packet. + * Read 2 or 3: depending on if Read 2 is done. This is the data. + * + * For normal comm line compression, the whole data packet is compressed + * but not the msglen (first read). + * To do data compression rather than full comm line compression, prior to + * call send(flags) where the lower 32 bits is the offset to the data to + * be compressed. The top 32 bits are reserved for flags that can be + * set. The are: + * BNET_IS_CMD We are sending a command + * BNET_OFFSET An offset is specified (this implies data compression) + * BNET_NOCOMPRESS Inhibit normal comm line compression + * BNET_DATACOMPRESSED The data using the specified offset was + * compressed, and normal comm line compression will + * not be done. + * If any of the above bits are set, then BNET_HDR_EXTEND will be set + * in the top bits of msglen, and the full set of flags + the offset + * will be passed as a 32 bit word just after the msglen, and then + * followed by any data that is either compressed or not. + * + * Note, neither comm line nor data compression is not + * guaranteed since it may result in more data, in which case, the + * record is sent uncompressed and there will be no offset. + * On the receive side, if BNET_OFFSET is set, then the data is compressed. + * + * Returns: false on failure + * true on success */ -bool BSOCK::set_locking() +bool BSOCK::send(int aflags) { - int stat; - if (m_use_locking) { - return true; /* already set */ - } - pm_rmutex = &m_rmutex; - pm_wmutex = &m_wmutex; - if ((stat = pthread_mutex_init(pm_rmutex, NULL)) != 0) { - berrno be; - Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock read mutex. ERR=%s\n"), - be.bstrerror(stat)); + int32_t rc; + int32_t pktsiz; + int32_t *hdrptr; + int offset; + int hdrsiz; + bool ok = true; + int32_t save_msglen; + POOLMEM *save_msg; + bool compressed; + bool locked = false; + + if (is_closed()) { + if (!m_suppress_error_msgs) { + Qmsg0(m_jcr, M_ERROR, 0, _("Socket is closed\n")); + } return false; } - if ((stat = pthread_mutex_init(pm_wmutex, NULL)) != 0) { - berrno be; - Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock write mutex. ERR=%s\n"), - be.bstrerror(stat)); + if (errors) { + if (!m_suppress_error_msgs) { + Qmsg4(m_jcr, M_ERROR, 0, _("Socket has errors=%d on call to %s:%s:%d\n"), + errors, m_who, m_host, m_port); + } return false; } - if ((stat = pthread_mutex_init(&m_mmutex, NULL)) != 0) { - berrno be; - Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock attribute mutex. ERR=%s\n"), - be.bstrerror(stat)); + if (is_terminated()) { + if (!m_suppress_error_msgs) { + Qmsg4(m_jcr, M_ERROR, 0, _("Bsock send while terminated=%d on call to %s:%s:%d\n"), + is_terminated(), m_who, m_host, m_port); + } return false; } - m_use_locking = true; - return true; -} -void BSOCK::clear_locking() -{ - if (!m_use_locking || m_duped) { - return; - } - m_use_locking = false; - pthread_mutex_destroy(pm_rmutex); - pthread_mutex_destroy(pm_wmutex); - pthread_mutex_destroy(&m_mmutex); - pm_rmutex = NULL; - pm_wmutex = NULL; - return; -} - -/* - * Do comm line compression (LZ4) of a bsock message. - * Returns: true if the compression was done - * false if no compression was done - * The "offset" defines where to start compressing the message. This - * allows passing "header" information uncompressed and the actual - * data part compressed. - * - * Note, we don't compress lines less than 20 characters because we - * want to save at least 10 characters otherwise compression doesn't - * help enough to warrant doing the decompression. - */ -bool BSOCK::comm_compress() -{ - bool compress = false; - bool compressed = false; - int offset = m_flags & 0xFF; - - /* - * Enable compress if allowed and not spooling and the - * message is long enough (>20) to get some reasonable savings. - */ - if (msglen > 20) { - compress = can_compress() && !is_spooling(); - } - m_CommBytes += msglen; /* uncompressed bytes */ - Dmsg4(DT_NETWORK|200, "can_compress=%d compress=%d CommBytes=%lld CommCompresedBytes=%lld\n", - can_compress(), compress, m_CommBytes, m_CommCompressedBytes); - if (compress) { - int clen; - int need_size; - - ASSERT2(offset <= msglen, "Comm offset bigger than message\n"); - ASSERT2(offset < 255, "Offset greater than 254\n"); - need_size = LZ4_compressBound(msglen); - if (need_size >= ((int32_t)sizeof_pool_memory(cmsg))) { - cmsg = realloc_pool_memory(cmsg, need_size + 100); - } - msglen -= offset; - msg += offset; - cmsg += offset; - clen = LZ4_compress_default(msg, cmsg, msglen, msglen); - //Dmsg2(000, "clen=%d msglen=%d\n", clen, msglen); - /* Compression should save at least 10 characters */ - if (clen > 0 && clen + 10 <= msglen) { - -#ifdef xxx_debug - /* Debug code -- decompress and compare */ - int blen, rlen, olen; - olen = msglen; - POOLMEM *rmsg = get_pool_memory(PM_BSOCK); - blen = sizeof_pool_memory(msg) * 2; - if (blen >= sizeof_pool_memory(rmsg)) { - rmsg = realloc_pool_memory(rmsg, blen); - } - rlen = LZ4_decompress_safe(cmsg, rmsg, clen, blen); - //Dmsg4(000, "blen=%d clen=%d olen=%d rlen=%d\n", blen, clen, olen, rlen); - ASSERT(olen == rlen); - ASSERT(memcmp(msg, rmsg, olen) == 0); - free_pool_memory(rmsg); - /* end Debug code */ -#endif - - msg = cmsg; - msglen = clen; - compressed = true; - } - msglen += offset; - msg -= offset; - cmsg -= offset; - } - m_CommCompressedBytes += msglen; - return compressed; -} - - -/* - * Send a message over the network. Everything is sent in one - * write request, but depending on the mode you are using - * there will be either two or three read requests done. - * Read 1: 32 bits, gets essentially the packet length, but may - * have the upper bits set to indicate compression or - * an extended header packet. - * Read 2: 32 bits, this read is done only of BNET_HDR_EXTEND is set. - * In this case the top 16 bits of this 32 bit word are reserved - * for flags and the lower 16 bits for data. This word will be - * stored in the field "flags" in the BSOCK packet. - * Read 2 or 3: depending on if Read 2 is done. This is the data. - * - * For normal comm line compression, the whole data packet is compressed - * but not the msglen (first read). - * To do data compression rather than full comm line compression, prior to - * call send(flags) where the lower 32 bits is the offset to the data to - * be compressed. The top 32 bits are reserved for flags that can be - * set. The are: - * BNET_IS_CMD We are sending a command - * BNET_OFFSET An offset is specified (this implies data compression) - * BNET_NOCOMPRESS Inhibit normal comm line compression - * BNET_DATACOMPRESSED The data using the specified offset was - * compressed, and normal comm line compression will - * not be done. - * If any of the above bits are set, then BNET_HDR_EXTEND will be set - * in the top bits of msglen, and the full set of flags + the offset - * will be passed as a 32 bit word just after the msglen, and then - * followed by any data that is either compressed or not. - * - * Note, neither comm line nor data compression is not - * guaranteed since it may result in more data, in which case, the - * record is sent uncompressed and there will be no offset. - * On the receive side, if BNET_OFFSET is set, then the data is compressed. - * - * Returns: false on failure - * true on success - */ -bool BSOCK::send(int aflags) -{ - int32_t rc; - int32_t pktsiz; - int32_t *hdrptr; - int offset; - int hdrsiz; - bool ok = true; - int32_t save_msglen; - POOLMEM *save_msg; - bool compressed; - bool locked = false; - - if (is_closed()) { - if (!m_suppress_error_msgs) { - /* TODO: We probably need to store them in a specific place see mantis #3615 */ - if (isJobMessage(m_jcr)) { - Qmsg0(m_jcr, M_ERROR, 0, _("Socket is closed\n")); - } else { - Dmsg0(100, "Socket is closed\n"); - } - } - return false; - } - if (errors) { - if (!m_suppress_error_msgs) { - /* TODO: We probably need to store them in a specific place */ - if (isJobMessage(m_jcr)) { - Qmsg4(m_jcr, M_ERROR, 0, _("Socket has errors=%d on call to %s:%s:%d\n"), - errors, m_who, m_host, m_port); - } else { - Dmsg4(100, "Socket has errors=%d on call to %s:%s:%d\n", - errors, m_who, m_host, m_port); - } - } - return false; - } - if (is_terminated()) { - if (!m_suppress_error_msgs) { - Qmsg4(m_jcr, M_ERROR, 0, _("Bsock send while terminated=%d on call to %s:%s:%d\n"), - is_terminated(), m_who, m_host, m_port); - } - return false; - } - - if (msglen > 4000000) { - if (!m_suppress_error_msgs) { - Qmsg4(m_jcr, M_ERROR, 0, - _("Socket has insane msglen=%d on call to %s:%s:%d\n"), - msglen, m_who, m_host, m_port); - } - return false; + if (msglen > 4000000) { + if (!m_suppress_error_msgs) { + Qmsg4(m_jcr, M_ERROR, 0, + _("Socket has insane msglen=%d on call to %s:%s:%d\n"), + msglen, m_who, m_host, m_port); + } + return false; } if (send_hook_cb) { @@ -660,7 +359,7 @@ bool BSOCK::send(int aflags) timer_start = watchdog_time; /* start timer */ clear_timed_out(); /* Full I/O done in one write */ - rc = write_nbytes(this, (char *)hdrptr, pktsiz); + rc = write_nbytes((char *)hdrptr, pktsiz); if (chk_dbglvl(DT_NETWORK|1900)) dump_bsock_msg(m_fd, *pout_msg_no, "SEND", rc, msglen, m_flags, save_msg, save_msglen); timer_start = 0; /* clear timer */ if (rc != pktsiz) { @@ -672,17 +371,10 @@ bool BSOCK::send(int aflags) } if (rc < 0) { if (!m_suppress_error_msgs) { - /* TODO: We probably need to store them in a specific place see Mantis #3615 */ - if (isJobMessage(m_jcr)) { - Qmsg5(m_jcr, M_ERROR, 0, - _("Write error sending %d bytes to %s:%s:%d: ERR=%s\n"), - pktsiz, m_who, - m_host, m_port, this->bstrerror()); - } else { - Dmsg5(100, "Write error sending %d bytes to %s:%s:%d: ERR=%s\n", - pktsiz, m_who, - m_host, m_port, this->bstrerror()); - } + Qmsg5(m_jcr, M_ERROR, 0, + _("Write error sending %d bytes to %s:%s:%d: ERR=%s\n"), + pktsiz, m_who, + m_host, m_port, this->bstrerror()); } } else { Qmsg5(m_jcr, M_ERROR, 0, @@ -699,40 +391,6 @@ bool BSOCK::send(int aflags) return ok; } -/* - * Format and send a message - * Returns: false on error - * true on success - */ -bool BSOCK::fsend(const char *fmt, ...) -{ - va_list arg_ptr; - int maxlen; - - if (is_null(this)) { - return false; /* do not seg fault */ - } - if (errors || is_terminated() || is_closed()) { - return false; - } - /* This probably won't work, but we vsnprintf, then if we - * get a negative length or a length greater than our buffer - * (depending on which library is used), the printf was truncated, so - * get a bigger buffer and try again. - */ - for (;;) { - maxlen = sizeof_pool_memory(msg) - 1; - va_start(arg_ptr, fmt); - msglen = bvsnprintf(msg, maxlen, fmt, arg_ptr); - va_end(arg_ptr); - if (msglen >= 0 && msglen < (maxlen - 5)) { - break; - } - msg = realloc_pool_memory(msg, maxlen + maxlen / 2); - } - return send(); -} - /* * Receive a message from the other end. Each message consists of * two packets. The first is a header that contains the size @@ -774,7 +432,7 @@ int32_t BSOCK::recv() timer_start = watchdog_time; /* set start wait time */ clear_timed_out(); /* get data size -- in int32_t */ - if ((nbytes = read_nbytes(this, (char *)&pktsiz, sizeof(int32_t))) <= 0) { + if ((nbytes = read_nbytes((char *)&pktsiz, sizeof(int32_t))) <= 0) { timer_start = 0; /* clear timer */ /* probably pipe broken because client died */ if (errno == 0) { @@ -802,7 +460,7 @@ int32_t BSOCK::recv() if (pktsiz > 0 && (pktsiz & BNET_HDR_EXTEND)) { timer_start = watchdog_time; /* set start wait time */ clear_timed_out(); - if ((nbytes = read_nbytes(this, (char *)&m_flags, sizeof(int32_t))) <= 0) { + if ((nbytes = read_nbytes((char *)&m_flags, sizeof(int32_t))) <= 0) { timer_start = 0; /* clear timer */ /* probably pipe broken because client died */ if (errno == 0) { @@ -873,7 +531,7 @@ int32_t BSOCK::recv() timer_start = watchdog_time; /* set start wait time */ clear_timed_out(); /* now read the actual data */ - if ((nbytes = read_nbytes(this, msg, pktsiz)) <= 0) { + if ((nbytes = read_nbytes(msg, pktsiz)) <= 0) { timer_start = 0; /* clear timer */ if (errno == 0) { b_errno = ENODATA; @@ -1038,552 +696,271 @@ bool BSOCK::despool(void update_attr_spool_size(ssize_t size), ssize_t tsize) } /* - * Return the string for the error that occurred - * on the socket. Only the first error is retained. + * Open a TCP connection to the server + * Returns NULL + * Returns BSOCKCORE * pointer on success */ -const char *BSOCK::bstrerror() -{ - berrno be; - if (errmsg == NULL) { - errmsg = get_pool_memory(PM_MESSAGE); - } - if (b_errno == 0) { - pm_strcpy(errmsg, "I/O Error"); - } else { - pm_strcpy(errmsg, be.bstrerror(b_errno)); - } - return errmsg; -} - -int BSOCK::get_peer(char *buf, socklen_t buflen) +bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service, + int port, utime_t heart_beat, int *fatal) { -#if !defined(HAVE_WIN32) - if (peer_addr.sin_family == 0) { - socklen_t salen = sizeof(peer_addr); - int rval = (getpeername)(m_fd, (struct sockaddr *)&peer_addr, &salen); - if (rval < 0) return rval; - } - if (!inet_ntop(peer_addr.sin_family, &peer_addr.sin_addr, buf, buflen)) - return -1; - - return 0; -#else - return -1; -#endif -} + bool status = BSOCKCORE::open(jcr, name, host, service, port, heart_beat, fatal); + m_spool = false; + return status; +}; /* - * Set the network buffer size, suggested size is in size. - * Actual size obtained is returned in bs->msglen + * Do comm line compression (LZ4) of a bsock message. + * Returns: true if the compression was done + * false if no compression was done + * The "offset" defines where to start compressing the message. This + * allows passing "header" information uncompressed and the actual + * data part compressed. * - * Returns: false on failure - * true on success + * Note, we don't compress lines less than 20 characters because we + * want to save at least 10 characters otherwise compression doesn't + * help enough to warrant doing the decompression. */ -bool BSOCK::set_buffer_size(uint32_t size, int rw) +bool BSOCK::comm_compress() { - uint32_t dbuf_size, start_size; - -#if defined(IP_TOS) && defined(IPTOS_THROUGHPUT) - int opt; - opt = IPTOS_THROUGHPUT; - setsockopt(m_fd, IPPROTO_IP, IP_TOS, (sockopt_val_t)&opt, sizeof(opt)); -#endif - - if (size != 0) { - dbuf_size = size; - } else { - dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE; - } - start_size = dbuf_size; - if ((msg = realloc_pool_memory(msg, dbuf_size + 100)) == NULL) { - Qmsg0(get_jcr(), M_FATAL, 0, _("Could not malloc BSOCK data buffer\n")); - return false; - } + bool compress = false; + bool compressed = false; + int offset = m_flags & 0xFF; /* - * If user has not set the size, use the OS default -- i.e. do not - * try to set it. This allows sys admins to set the size they - * want in the OS, and Bacula will comply. See bug #1493 + * Enable compress if allowed and not spooling and the + * message is long enough (>20) to get some reasonable savings. */ - if (size == 0) { - msglen = dbuf_size; - return true; + if (msglen > 20) { + compress = can_compress() && !is_spooling(); } + m_CommBytes += msglen; /* uncompressed bytes */ + Dmsg4(DT_NETWORK|200, "can_compress=%d compress=%d CommBytes=%lld CommCompresedBytes=%lld\n", + can_compress(), compress, m_CommBytes, m_CommCompressedBytes); + if (compress) { + int clen; + int need_size; - if (rw & BNET_SETBUF_READ) { - while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET, - SO_RCVBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) { - berrno be; - Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror()); - dbuf_size -= TAPE_BSIZE; - } - Dmsg1(200, "set network buffer size=%d\n", dbuf_size); - if (dbuf_size != start_size) { - Qmsg1(get_jcr(), M_WARNING, 0, - _("Warning network buffer = %d bytes not max size.\n"), dbuf_size); - } - } - if (size != 0) { - dbuf_size = size; - } else { - dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE; - } - start_size = dbuf_size; - if (rw & BNET_SETBUF_WRITE) { - while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET, - SO_SNDBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) { - berrno be; - Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror()); - dbuf_size -= TAPE_BSIZE; - } - Dmsg1(900, "set network buffer size=%d\n", dbuf_size); - if (dbuf_size != start_size) { - Qmsg1(get_jcr(), M_WARNING, 0, - _("Warning network buffer = %d bytes not max size.\n"), dbuf_size); + ASSERT2(offset <= msglen, "Comm offset bigger than message\n"); + ASSERT2(offset < 255, "Offset greater than 254\n"); + need_size = LZ4_compressBound(msglen); + if (need_size >= ((int32_t)sizeof_pool_memory(cmsg))) { + cmsg = realloc_pool_memory(cmsg, need_size + 100); } - } - - msglen = dbuf_size; - return true; -} - -/* - * Set socket non-blocking - * Returns previous socket flag - */ -int BSOCK::set_nonblocking() -{ -#ifndef HAVE_WIN32 - int oflags; - - /* Get current flags */ - if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) { - berrno be; - Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror()); - } - - /* Set O_NONBLOCK flag */ - if ((fcntl(m_fd, F_SETFL, oflags|O_NONBLOCK)) < 0) { - berrno be; - Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror()); - } - - m_blocking = 0; - return oflags; -#else - int flags; - u_long ioctlArg = 1; - - flags = m_blocking; - ioctlsocket(m_fd, FIONBIO, &ioctlArg); - m_blocking = 0; - - return flags; -#endif -} - -/* - * Set socket blocking - * Returns previous socket flags - */ -int BSOCK::set_blocking() -{ -#ifndef HAVE_WIN32 - int oflags; - /* Get current flags */ - if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) { - berrno be; - Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror()); - } - - /* Set O_NONBLOCK flag */ - if ((fcntl(m_fd, F_SETFL, oflags & ~O_NONBLOCK)) < 0) { - berrno be; - Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror()); - } - - m_blocking = 1; - return oflags; -#else - int flags; - u_long ioctlArg = 0; - - flags = m_blocking; - ioctlsocket(m_fd, FIONBIO, &ioctlArg); - m_blocking = 1; - - return flags; -#endif -} - -void BSOCK::set_killable(bool killable) -{ - if (m_jcr) { - m_jcr->set_killable(killable); - } -} - -/* - * Restores socket flags - */ -void BSOCK::restore_blocking (int flags) -{ -#ifndef HAVE_WIN32 - if ((fcntl(m_fd, F_SETFL, flags)) < 0) { - berrno be; - Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror()); - } - - m_blocking = (flags & O_NONBLOCK) ? true : false; -#else - u_long ioctlArg = flags; - - ioctlsocket(m_fd, FIONBIO, &ioctlArg); - m_blocking = 1; -#endif -} + msglen -= offset; + msg += offset; + cmsg += offset; + clen = LZ4_compress_default(msg, cmsg, msglen, msglen); + //Dmsg2(000, "clen=%d msglen=%d\n", clen, msglen); + /* Compression should save at least 10 characters */ + if (clen > 0 && clen + 10 <= msglen) { -/* - * Wait for a specified time for data to appear on - * the BSOCK connection. - * - * Returns: 1 if data available - * 0 if timeout - * -1 if error - */ -int BSOCK::wait_data(int sec, int msec) -{ - for (;;) { - switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) { - case 0: /* timeout */ - b_errno = 0; - return 0; - case -1: - b_errno = errno; - if (errno == EINTR) { - continue; - } - return -1; /* error return */ - default: - b_errno = 0; -#ifdef HAVE_TLS - if (this->tls && !tls_bsock_probe(this)) { - continue; /* false alarm, maybe a session key negotiation in progress on the socket */ +#ifdef xxx_debug + /* Debug code -- decompress and compare */ + int blen, rlen, olen; + olen = msglen; + POOLMEM *rmsg = get_pool_memory(PM_BSOCK); + blen = sizeof_pool_memory(msg) * 2; + if (blen >= sizeof_pool_memory(rmsg)) { + rmsg = realloc_pool_memory(rmsg, blen); } -#endif - return 1; - } - } -} - -/* - * As above, but returns on interrupt - */ -int BSOCK::wait_data_intr(int sec, int msec) -{ - switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) { - case 0: /* timeout */ - b_errno = 0; - return 0; - case -1: - b_errno = errno; - return -1; /* error return */ - default: - b_errno = 0; -#ifdef HAVE_TLS - if (this->tls && !tls_bsock_probe(this)) { - /* maybe a session key negotiation waked up the socket */ - return 0; - } -#endif - break; - } - return 1; -} - -/* - * This routine closes the current BSOCK. - * It does not delete the socket packet - * resources, which are released int - * bsock->destroy(). - */ -#ifndef SHUT_RDWR -#define SHUT_RDWR 2 + rlen = LZ4_decompress_safe(cmsg, rmsg, clen, blen); + //Dmsg4(000, "blen=%d clen=%d olen=%d rlen=%d\n", blen, clen, olen, rlen); + ASSERT(olen == rlen); + ASSERT(memcmp(msg, rmsg, olen) == 0); + free_pool_memory(rmsg); + /* end Debug code */ #endif -/* - * The JCR is canceled, set terminate for chained BSOCKs starting from master - */ -void BSOCK::cancel() -{ - master_lock(); - for (BSOCK *next = m_master; next != NULL; next = next->m_next) { - if (!next->m_closed) { - next->m_terminated = true; - next->m_timed_out = true; + msg = cmsg; + msglen = clen; + compressed = true; } + msglen += offset; + msg -= offset; + cmsg -= offset; } - master_unlock(); + m_CommCompressedBytes += msglen; + return compressed; } /* * Note, this routine closes the socket, but leaves the * bsock memory in place. * every thread is responsible of closing and destroying its own duped or not - * duped BSOCK + * duped BSOCKCORE */ void BSOCK::close() { - BSOCK *bsock = this; - - if (bsock->is_closed()) { - return; - } - if (!m_duped) { - clear_locking(); - } - bsock->set_closed(); - bsock->set_terminated(); - if (!bsock->m_duped) { - /* Shutdown tls cleanly. */ - if (bsock->tls) { - tls_bsock_shutdown(bsock); - free_tls_connection(bsock->tls); - bsock->tls = NULL; - } - -#ifdef HAVE_WIN32 - if (!bsock->is_timed_out()) { - win_close_wait(bsock->m_fd); /* Ensure that data is not discarded */ - } -#else - if (bsock->is_timed_out()) { - shutdown(bsock->m_fd, SHUT_RDWR); /* discard any pending I/O */ - } -#endif - /* On Windows this discards data if we did not do a close_wait() */ - socketClose(bsock->m_fd); /* normal close */ - } + Dmsg0(BSOCK_DEBUG_LVL, "BSOCK::close()\n"); + BSOCKCORE::close(); return; } /* - * Destroy the socket (i.e. release all resources) + * Write nbytes to the network. + * It may require several writes. */ -void BSOCK::_destroy() + +int32_t BSOCK::write_nbytes(char *ptr, int32_t nbytes) { - this->close(); /* Ensure that socket is closed */ + int32_t nwritten; - if (msg) { - free_pool_memory(msg); - msg = NULL; - } else { - ASSERT2(1 == 0, "Two calls to destroy socket"); /* double destroy */ - } - if (cmsg) { - free_pool_memory(cmsg); - cmsg = NULL; - } - if (errmsg) { - free_pool_memory(errmsg); - errmsg = NULL; - } - if (m_who) { - free(m_who); - m_who = NULL; - } - if (m_host) { - free(m_host); - m_host = NULL; - } - if (src_addr) { - free(src_addr); - src_addr = NULL; + if (is_spooling()) { + nwritten = fwrite(ptr, 1, nbytes, m_spool_fd); + if (nwritten != nbytes) { + berrno be; + b_errno = errno; + Qmsg3(jcr(), M_FATAL, 0, _("Attr spool write error. wrote=%d wanted=%d bytes. ERR=%s\n"), + nbytes, nwritten, be.bstrerror()); + Dmsg2(400, "nwritten=%d nbytes=%d.\n", nwritten, nbytes); + errno = b_errno; + return -1; + } + return nbytes; } - free(this); + + /* reuse base code */ + return BSOCKCORE::write_nbytes(ptr, nbytes); } /* - * Destroy the socket (i.e. release all resources) - * including duped sockets. - * should not be called from duped BSOCK + * This is a non-class BSOCK "constructor" because we want to + * call the Bacula smartalloc routines instead of new. */ -void BSOCK::destroy() +BSOCK *new_bsock() { - ASSERTD(reinterpret_cast(m_next) != 0xaaaaaaaaaaaaaaaa, "BSOCK::destroy() already called\n") - ASSERTD(this == m_master, "BSOCK::destroy() called by a non master BSOCK\n") - ASSERTD(!m_duped, "BSOCK::destroy() called by a duped BSOCK\n") - /* I'm the master I must destroy() all the duped BSOCKs */ - master_lock(); - BSOCK *ahead; - for (BSOCK *next = m_next; next != NULL; next = ahead) { - ahead = next->m_next; - next->_destroy(); - } - master_unlock(); - _destroy(); + BSOCK *bsock = New(BSOCK); + return bsock; } -/* Commands sent to Director */ -static char hello[] = "Hello %s calling\n"; -/* Response from Director */ -static char OKhello[] = "1000 OK:"; - -/* - * Authenticate Director +/* Initialize internal socket structure. + * This probably should be done in bsock.c */ -bool BSOCK::authenticate_director(const char *name, const char *password, - TLS_CONTEXT *tls_ctx, char *errmsg, int errmsg_len) -{ - int tls_local_need = BNET_TLS_NONE; - int tls_remote_need = BNET_TLS_NONE; - int compatible = true; - char bashed_name[MAX_NAME_LENGTH]; - BSOCK *dir = this; /* for readability */ - - *errmsg = 0; - /* - * Send my name to the Director then do authentication - */ - - /* Timeout Hello after 15 secs */ - dir->start_timer(15); - dir->fsend(hello, bashed_name); - - if (get_tls_enable(tls_ctx)) { - tls_local_need = get_tls_enable(tls_ctx) ? BNET_TLS_REQUIRED : BNET_TLS_OK; - } - - /* respond to Dir challenge */ - if (!cram_md5_respond(dir, password, &tls_remote_need, &compatible) || - /* Now challenge dir */ - !cram_md5_challenge(dir, password, tls_local_need, compatible)) { - bsnprintf(errmsg, errmsg_len, _("Director authorization error at \"%s:%d\"\n"), - dir->host(), dir->port()); - goto bail_out; - } - - /* Verify that the remote host is willing to meet our TLS requirements */ - if (tls_remote_need < tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) { - bsnprintf(errmsg, errmsg_len, _("Authorization error:" - " Remote server at \"%s:%d\" did not advertise required TLS support.\n"), - dir->host(), dir->port()); - goto bail_out; - } - - /* Verify that we are willing to meet the remote host's requirements */ - if (tls_remote_need > tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) { - bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\":" - " Remote server requires TLS.\n"), - dir->host(), dir->port()); - - goto bail_out; - } - - /* Is TLS Enabled? */ - if (have_tls) { - if (tls_local_need >= BNET_TLS_OK && tls_remote_need >= BNET_TLS_OK) { - /* Engage TLS! Full Speed Ahead! */ - if (!bnet_tls_client(tls_ctx, dir, NULL)) { - bsnprintf(errmsg, errmsg_len, _("TLS negotiation failed with Director at \"%s:%d\"\n"), - dir->host(), dir->port()); - goto bail_out; - } - } - } - - Dmsg1(6, ">dird: %s", dir->msg); - if (dir->recv() <= 0) { - dir->stop_timer(); - bsnprintf(errmsg, errmsg_len, _("Bad errmsg to Hello command: ERR=%s\n" - "The Director at \"%s:%d\" may not be running.\n"), - dir->bstrerror(), dir->host(), dir->port()); - return false; - } - - dir->stop_timer(); - Dmsg1(10, "msg); - if (strncmp(dir->msg, OKhello, sizeof(OKhello)-1) != 0) { - bsnprintf(errmsg, errmsg_len, _("Director at \"%s:%d\" rejected Hello command\n"), - dir->host(), dir->port()); - return false; - } else { - bsnprintf(errmsg, errmsg_len, "%s", dir->msg); - } - return true; - -bail_out: - dir->stop_timer(); - bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\"\n" - "Most likely the passwords do not agree.\n" - "If you are using TLS, there may have been a certificate validation error during the TLS handshake.\n" - "For help, please see: " MANUAL_AUTH_URL "\n"), - dir->host(), dir->port()); - return false; +BSOCK *init_bsock(JCR *jcr, int sockfd, const char *who, + const char *host, int port, struct sockaddr *client_addr) +{ + Dmsg4(100, "socket=%d who=%s host=%s port=%d\n", sockfd, who, host, port); + BSOCK *bsock = New(BSOCK(sockfd)); + bsock->m_master = bsock; /* don't use set_master() here */ + bsock->set_who(bstrdup(who)); + bsock->set_host(bstrdup(host)); + bsock->set_port(port); + bmemzero(&bsock->peer_addr, sizeof(bsock->peer_addr)); + memcpy(&bsock->client_addr, client_addr, sizeof(bsock->client_addr)); + bsock->set_jcr(jcr); + return bsock; } -/* Try to limit the bandwidth of a network connection - */ -void BSOCK::control_bwlimit(int bytes) +BSOCK *dup_bsock(BSOCK *osock) { - btime_t now, temp; - if (bytes == 0) { - return; - } - - now = get_current_btime(); /* microseconds */ - temp = now - m_last_tick; /* microseconds */ - - m_nb_bytes += bytes; - - if (temp < 0 || temp > 10000000) { /* Take care of clock problems (>10s) or back in time */ - m_nb_bytes = bytes; - m_last_tick = now; - return; - } + POOLMEM *cmsg; + POOLMEM *msg; + POOLMEM *errmsg; - /* Less than 0.1ms since the last call, see the next time */ - if (temp < 100) { - return; + osock->set_locking(); + BSOCK *bsock = New(BSOCK); + // save already allocated variables + msg = bsock->msg; + cmsg = bsock->cmsg; + errmsg = bsock->errmsg; + // with this we make virtually the same job as with memcpy() + *bsock = *osock; + // restore saved variables + bsock->msg = msg; + bsock->cmsg = cmsg; + bsock->errmsg = errmsg; + if (osock->who()) { + bsock->set_who(bstrdup(osock->who())); } - - /* Remove what was authorised to be written in temp us */ - m_nb_bytes -= (int64_t)(temp * ((double)m_bwlimit / 1000000.0)); - - if (m_nb_bytes < 0) { - m_nb_bytes = 0; + if (osock->host()) { + bsock->set_host(bstrdup(osock->host())); } - - /* What exceed should be converted in sleep time */ - int64_t usec_sleep = (int64_t)(m_nb_bytes /((double)m_bwlimit / 1000000.0)); - if (usec_sleep > 100) { - bmicrosleep(usec_sleep/1000000, usec_sleep%1000000); /* TODO: Check that bmicrosleep slept enough or sleep again */ - m_last_tick = get_current_btime(); - m_nb_bytes = 0; - } else { - m_last_tick = now; + if (osock->src_addr) { + bsock->src_addr = New( IPADDR( *(osock->src_addr)) ); } + bsock->set_duped(); + bsock->set_master(osock); + return bsock; } -#ifdef HAVE_WIN32 -/* - * closesocket is supposed to do a graceful disconnect under Window - * but it doesn't. Comments on http://msdn.microsoft.com/en-us/li - * confirm this behaviour. DisconnectEx is required instead, but - * that function needs to be retrieved via WS IOCTL - */ -static void -win_close_wait(int fd) -{ - int ret; - GUID disconnectex_guid = WSAID_DISCONNECTEX; - DWORD bytes_returned; - LPFN_DISCONNECTEX DisconnectEx; - ret = WSAIoctl(fd, SIO_GET_EXTENSION_FUNCTION_POINTER, &disconnectex_guid, sizeof(disconnectex_guid), &DisconnectEx, sizeof(DisconnectEx), &bytes_returned, NULL, NULL); - Dmsg1(100, "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER, WSAID_DISCONNECTEX) ret = %d\n", ret); - if (!ret) { - DisconnectEx(fd, NULL, 0, 0); - } -} +#ifndef TEST_PROGRAM +#define TEST_PROGRAM_A #endif + +void BSOCK::dump() +{ +#ifdef TEST_PROGRAM + char ed1[50]; + BSOCKCORE::dump(); + Pmsg1(-1, "BSOCK::dump(): %p\n", this); + Pmsg1(-1, "\tm_spool_fd: %p\n", m_spool_fd); + Pmsg1(-1, "\tcmsg: %p\n", cmsg); + Pmsg1(-1, "\tm_data_end: %s\n", edit_int64(m_data_end, ed1)); + Pmsg1(-1, "\tm_last_data_end: %s\n", edit_int64(m_last_data_end, ed1)); + Pmsg1(-1, "\tm_FileIndex: %s\n", edit_int64(m_FileIndex, ed1)); + Pmsg1(-1, "\tm_lastFileIndex: %s\n", edit_int64(m_lastFileIndex, ed1)); + Pmsg1(-1, "\tm_spool: %s\n", m_spool?"true":"false"); + Pmsg1(-1, "\tm_compress: %s\n", m_compress?"true":"false"); + Pmsg1(-1, "\tm_CommBytes: %s\n", edit_uint64(m_CommBytes, ed1)); + Pmsg1(-1, "\tm_CommCompressedBytes: %s\n", edit_uint64(m_CommCompressedBytes, ed1)); +#endif +}; + +#ifdef TEST_PROGRAM + +void terminate(int sig){}; +void free_my_jcr(JCR *jcr){ + /* TODO: handle full JCR free */ + free_jcr(jcr); +}; + +int main() +{ + BSOCK *bs; + BSOCK *bsdup; + pid_t pid; + int rc; + char *host = (char*)"localhost"; + char *name = (char*)"Test"; + JCR *jcr; + + debug_level = 500; + my_name_is(0, NULL, "bsock_test"); + init_signals(terminate); + lmgr_init_thread(); /* initialize the lockmanager stack */ + + jcr = new_jcr(sizeof(JCR), NULL); + bs = New(BSOCK); + Pmsg0(0, "Initialize ...\n"); + bs->set_jcr(jcr); + bs->dump(); + + pid = fork(); + if (0 == pid){ + Pmsg0(0, "prepare to execute netcat\n"); + rc = execl("/bin/netcat", "netcat", "-p", "20000", "-l", NULL); + Pmsg1(0, "Error executing netcat: %s\n", strerror(rc)); + exit(1); + } + Pmsg1(0, "after fork: %d\n", pid); + bmicrosleep(1, 0); + if (bs->connect(jcr, 1, 10, 0, name, host, NULL, 20000, 0)) { + /* connected */ + Pmsg0(0, "connected ...\n"); + bs->dump(); + Pmsg0(0, "duped BSOCK\n"); + bsdup = dup_bsock(bs); + bsdup->dump(); + Pmsg0(0, "orignal after duped\n"); + bs->dump(); + } else { + Pmsg1(0, "connection error: %s\n", bs->bstrerror()); + } + kill(pid, SIGTERM); + delete(bs); + free_my_jcr(jcr); + lmgr_cleanup_main(); + // sm_dump(false); +}; +#endif /* TEST_PROGRAM */ diff --git a/bacula/src/lib/bsock.h b/bacula/src/lib/bsock.h index b18b9bcee..9ea4ec1f8 100644 --- a/bacula/src/lib/bsock.h +++ b/bacula/src/lib/bsock.h @@ -21,8 +21,17 @@ * Note, the old non-class code is in bnet.c, and the * new class code associated with this file is in bsock.c * + * The new code inherit common functions from BSOCKCORE class + * and implement BSOCK/Bacula specific protocols and data flow. + * * Kern Sibbald, May MM * + * Major refactoring of BSOCK code written by: + * + * Radosław Korzeniewski, MMXVIII + * radoslaw@korzeniewski.net, radekk@inteos.pl + * Inteos Sp. z o.o. http://www.inteos.pl/ + * * Zero msglen from other end indicates soft eof (usually * end of some binary data stream, but not end of conversation). * @@ -33,158 +42,45 @@ #ifndef __BSOCK_H_ #define __BSOCK_H_ -struct btimer_t; /* forward reference */ -class BSOCK; -/* Effectively disable the bsock time out */ #define BSOCK_TIMEOUT 3600 * 24 * 200; /* default 200 days */ -btimer_t *start_bsock_timer(BSOCK *bs, uint32_t wait); -void stop_bsock_timer(btimer_t *wid); - -class BSOCKCallback { -public: - BSOCKCallback(); - virtual ~BSOCKCallback(); - virtual bool bsock_send_cb() = 0; -}; -class BSOCK { -/* - * Note, keep this public part before the private otherwise - * bat breaks on some systems such as RedHat. - */ +class BSOCK: public BSOCKCORE { public: - uint64_t read_seqno; /* read sequence number */ - POOLMEM *msg; /* message pool buffer */ - POOLMEM *cmsg; /* Compress buffer */ - POOLMEM *errmsg; /* edited error message */ - RES *res; /* Resource to which we are connected */ FILE *m_spool_fd; /* spooling file */ - TLS_CONNECTION *tls; /* associated tls connection */ - IPADDR *src_addr; /* IP address to source connections from */ - uint32_t in_msg_no; /* input message number */ - uint32_t out_msg_no; /* output message number */ - uint32_t *pout_msg_no; /* pointer to the above */ - int32_t msglen; /* message length */ - volatile time_t timer_start; /* time started read/write */ - volatile time_t timeout; /* timeout BSOCK after this interval */ - int m_fd; /* socket file descriptor */ - int b_errno; /* bsock errno */ - int m_blocking; /* blocking state (0 = nonblocking, 1 = blocking) */ - volatile int errors; /* incremented for each error on socket */ - volatile bool m_suppress_error_msgs; /* set to suppress error messages */ - - struct sockaddr client_addr; /* client's IP address */ - struct sockaddr_in peer_addr; /* peer's IP address */ - - /* when "installed", send_hook_cb->bsock_send_cb() is called before - * any ::send(). - */ - BSOCKCallback *send_hook_cb; + POOLMEM *cmsg; /* Compress buffer */ private: - /* m_master is used by "duped" BSOCK to access some attributes of the "parent" - * thread to have an up2date status (for example when the job is canceled, - * the "parent" BSOCK is "terminated", but the duped BSOCK is unchanged) - * In the future more attributes and method could use the "m_master" - * indirection. - * master->m_rmutex could replace pm_rmutex, idem for the (w)rite" mutex - * "m_master->error" should be incremented instead of "error", but - * this require a lock. - * - * USAGE: the parent thread MUST be sure that the child thread have quit - * before to free the "parent" BSOCK. - */ - BSOCK *m_next; /* next BSOCK if duped (not actually used) */ - JCR *m_jcr; /* jcr or NULL for error msgs */ - pthread_mutex_t m_rmutex; /* for read locking if use_locking set */ - pthread_mutex_t m_wmutex; /* for write locking if use_locking set */ - mutable pthread_mutex_t m_mmutex; /* when accessing the master/next chain */ - pthread_mutex_t *pm_rmutex; /* Pointer to the read mutex */ - pthread_mutex_t *pm_wmutex; /* Pointer to the write mutex */ - char *m_who; /* Name of daemon to which we are talking */ - char *m_host; /* Host name/IP */ - int m_port; /* desired port */ - btimer_t *m_tid; /* timer id */ boffset_t m_data_end; /* offset of data written */ - boffset_t m_last_data_end; /* offset of last valid data written */ + boffset_t m_last_data_end; /* offset of last valid data written */ int32_t m_FileIndex; /* attr spool FI */ int32_t m_lastFileIndex; /* last valid attr spool FI */ - uint32_t m_flags; /* Special flags */ - volatile bool m_timed_out: 1; /* timed out in read/write */ - volatile bool m_terminated: 1; /* set when BNET_TERMINATE arrives */ - bool m_closed: 1; /* set when socket is closed */ - bool m_duped: 1; /* set if duped BSOCK */ bool m_spool: 1; /* set for spooling */ bool m_compress: 1; /* set to use comm line compression */ - bool m_use_locking; /* set to use locking (out of a bitfield */ - /* to avoid race conditions) */ - - int64_t m_bwlimit; /* set to limit bandwidth */ - int64_t m_nb_bytes; /* bytes sent/recv since the last tick */ - btime_t m_last_tick; /* last tick used by bwlimit */ uint64_t m_CommBytes; /* Bytes sent */ uint64_t m_CommCompressedBytes; /* Compressed bytes sent */ - void fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port, - struct sockaddr *lclient_addr); bool open(JCR *jcr, const char *name, char *host, char *service, int port, utime_t heart_beat, int *fatal); - void master_lock() const { if (m_use_locking) pP((&m_mmutex)); }; - void master_unlock() const { if (m_use_locking) pV((&m_mmutex)); }; + void init(); + void _destroy(); + int32_t write_nbytes(char *ptr, int32_t nbytes); public: - BSOCK *m_master; /* "this" or the "parent" BSOCK if duped */ - /* methods -- in bsock.c */ - void init(); - void free_tls(); - bool connect(JCR * jcr, int retry_interval, utime_t max_retry_time, - utime_t heart_beat, const char *name, char *host, - char *service, int port, int verbose); + BSOCK(); + BSOCK(int sockfd); + ~BSOCK(); int32_t recv(); bool send() { return send(0); }; bool send(int flags); - bool fsend(const char*, ...); bool signal(int signal); - void close(); /* close connection and destroy packet */ - void _destroy(); /* called by destroy() */ - void destroy(); /* destroy socket packet */ - bool comm_compress(); /* in bsock.c */ - const char *bstrerror(); /* last error on socket */ - int get_peer(char *buf, socklen_t buflen); + void close(); /* close connection and destroy packet */ + bool comm_compress(); /* in bsock.c */ bool despool(void update_attr_spool_size(ssize_t size), ssize_t tsize); - bool set_buffer_size(uint32_t size, int rw); - int set_nonblocking(); - int set_blocking(); - void restore_blocking(int flags); - void set_killable(bool killable); - int wait_data(int sec, int msec=0); - int wait_data_intr(int sec, int msec=0); bool authenticate_director(const char *name, const char *password, TLS_CONTEXT *tls_ctx, char *response, int response_len); - bool set_locking(); /* in bsock.c */ - void clear_locking(); /* in bsock.c */ - void set_source_address(dlist *src_addr_list); - void control_bwlimit(int bytes); /* in bsock.c */ /* Inline functions */ - void suppress_error_messages(bool flag) { m_suppress_error_msgs = flag; }; - void set_jcr(JCR *jcr) { m_jcr = jcr; }; - void set_who(char *who) { m_who = who; }; - void set_host(char *host) { m_host = host; }; - void set_port(int port) { m_port = port; }; - char *who() const { return m_who; }; - char *host() const { return m_host; }; - int port() const { return m_port; }; - JCR *jcr() const { return m_jcr; }; - JCR *get_jcr() const { return m_jcr; }; bool is_spooling() const { return m_spool; }; - bool is_duped() const { return m_duped; }; - bool is_terminated() const { return m_terminated; }; - bool is_timed_out() const { return m_timed_out; }; - bool is_closed() const { return m_closed; }; - bool is_open() const { return !m_closed; }; - bool is_stop() const { return errors || is_terminated() || is_closed(); }; - bool is_error() { errno = b_errno; return errors; }; bool can_compress() const { return m_compress; }; void set_data_end(int32_t FileIndex) { if (m_spool && FileIndex > m_FileIndex) { @@ -198,25 +94,11 @@ public: int32_t get_lastFileIndex() { return m_lastFileIndex; }; uint32_t CommBytes() { return m_CommBytes; }; uint32_t CommCompressedBytes() { return m_CommCompressedBytes; }; - void set_bwlimit(int64_t maxspeed) { m_bwlimit = maxspeed; }; - bool use_bwlimit() { return m_bwlimit > 0;}; void set_spooling() { m_spool = true; }; void clear_spooling() { m_spool = false; }; void set_compress() { m_compress = true; }; void clear_compress() { m_compress = false; }; - void set_duped() { m_duped = true; }; - void set_master(BSOCK *master) { master_lock(); m_master = master; m_next = master->m_next; master->m_next = this; master_unlock(); }; - void set_timed_out() { m_timed_out = true; }; - void clear_timed_out() { m_timed_out = false; }; - void set_terminated() { m_terminated = true; }; - void set_closed() { m_closed = true; }; - void start_timer(int sec) { m_tid = start_bsock_timer(this, sec); }; - void stop_timer() { stop_bsock_timer(m_tid); }; - void swap_msgs(); - void install_send_hook_cb(BSOCKCallback *obj) { send_hook_cb=obj; }; - void uninstall_send_hook_cb() { send_hook_cb=NULL; }; - void cancel(); /* call it when JCR is canceled */ - + void dump(); }; /* @@ -272,10 +154,10 @@ enum { * for flags, and the low 16 bits are for other info such as * compressed data offset (BNET_OFFSET) */ -#define BNET_IS_CMD (1<<28) /* set for command data */ -#define BNET_OFFSET (1<<27) /* Data compression offset specified */ -#define BNET_NOCOMPRESS (1<<25) /* Disable compression */ -#define BNET_DATACOMPRESSED (1<<24) /* Data compression */ +#define BNET_IS_CMD (1<<28) /* set for command data */ +#define BNET_OFFSET (1<<27) /* Data compression offset specified */ +#define BNET_NOCOMPRESS (1<<25) /* Disable compression */ +#define BNET_DATACOMPRESSED (1<<24) /* Data compression */ #define BNET_SETBUF_READ 1 /* Arg for bnet_set_buffer_size */ #define BNET_SETBUF_WRITE 2 /* Arg for bnet_set_buffer_size */ @@ -310,8 +192,6 @@ enum { BNET_CMD_STP_FLOWCTRL = 7 /* backup FD->SD SD must stop sending flowcontrol information */ }; -const char *bnet_cmd_to_name(int val); - /* * TLS enabling values. Value is important for comparison, ie: * if (tls_remote_need < BNET_TLS_REQUIRED) { ... } @@ -322,18 +202,17 @@ enum { BNET_TLS_REQUIRED = 2 /* TLS is required */ }; -int32_t read_nbytes(BSOCK * bsock, char *ptr, int32_t nbytes); -int32_t write_nbytes(BSOCK * bsock, char *ptr, int32_t nbytes); +const char *bnet_cmd_to_name(int val); BSOCK *new_bsock(); /* * Completely release the socket packet, and NULL the pointer */ -#define free_bsock(a) do{if(a){(a)->close(); (a)->destroy(); (a)=NULL;}} while(0) +#define free_bsock(a) free_bsockcore(a) /* * Does the socket exist and is it open? */ -#define is_bsock_open(a) ((a) && (a)->is_open()) +#define is_bsock_open(a) is_bsockcore_open(a) #endif /* __BSOCK_H_ */ diff --git a/bacula/src/lib/bsockcore.c b/bacula/src/lib/bsockcore.c new file mode 100644 index 000000000..5efbb98e8 --- /dev/null +++ b/bacula/src/lib/bsockcore.c @@ -0,0 +1,1312 @@ +/* + Bacula(R) - The Network Backup Solution + + Copyright (C) 2000-2017 Kern Sibbald + + The original author of Bacula is Kern Sibbald, with contributions + from many others, a complete list can be found in the file AUTHORS. + + You may use this file and others of this release according to the + license defined in the LICENSE file, which includes the Affero General + Public License, v3.0 ("AGPLv3") and some additional permissions and + terms pursuant to its AGPLv3 Section 7. + + This notice must be preserved when any source code is + conveyed and/or propagated. + + Bacula(R) is a registered trademark of Kern Sibbald. +*/ +/* + * Bacula Core Sock Class definition + * + * Kern Sibbald, May MM + * + * Major refactoring of BSOCK code written by: + * + * Radosław Korzeniewski, MMXVIII + * radoslaw@korzeniewski.net, radekk@inteos.pl + * Inteos Sp. z o.o. http://www.inteos.pl/ + * + * This is a common class for socket network communication derived from + * BSOCK class. It acts as a base class for non-Bacula network communication + * and as a base class for standard BSOCK implementation. Basically the BSOCK + * class did not changed its functionality for any Bacula specific part. + * Now you can use a BSOCKCLASS for other network communication. + */ + +#include "bacula.h" +#include "jcr.h" +#include +#include + +#define BSOCKCORE_DEBUG_LVL 900 + +#if !defined(ENODATA) /* not defined on BSD systems */ +#define ENODATA EPIPE +#endif + +#if !defined(SOL_TCP) /* Not defined on some systems */ +#define SOL_TCP IPPROTO_TCP +#endif + +#ifdef HAVE_WIN32 +#include +static void win_close_wait(int fd); +#ifndef SOCK_CLOEXEC +#define SOCK_CLOEXEC 0 +#endif +#endif + +/* + * make a nice dump of a message + */ +void dump_bsock_msg(int sock, uint32_t msgno, const char *what, uint32_t rc, int32_t pktsize, uint32_t flags, + POOLMEM *msg, int32_t msglen) +{ + char buf[54]; + bool is_ascii; + int dbglvl = DT_ASX; + + if (msglen<0) { + Dmsg4(dbglvl, "%s %d:%d SIGNAL=%s\n", what, sock, msgno, bnet_sig_to_ascii(msglen)); + // data + smartdump(msg, msglen, buf, sizeof(buf)-9, &is_ascii); + if (is_ascii) { + Dmsg5(dbglvl, "%s %d:%d len=%d \"%s\"\n", what, sock, msgno, msglen, buf); + } else { + Dmsg5(dbglvl, "%s %d:%d len=%d %s\n", what, sock, msgno, msglen, buf); + } + } +} + +BSOCKCallback::BSOCKCallback() +{ +} + +BSOCKCallback::~BSOCKCallback() +{ +} + +/* + * Default constructor does class initialization. + */ +BSOCKCORE::BSOCKCORE() : + msg(NULL), + errmsg(NULL), + res(NULL), + tls(NULL), + src_addr(NULL), + read_seqno(0), + in_msg_no(0), + out_msg_no(0), + pout_msg_no(NULL), + msglen(0), + timer_start(0), + timeout(0), + m_fd(-1), + b_errno(0), + m_blocking(0), + errors(0), + m_suppress_error_msgs(false), + send_hook_cb(NULL), + m_next(NULL), + m_jcr(NULL), + pm_rmutex(NULL), + pm_wmutex(NULL), + m_who(NULL), + m_host(NULL), + m_port(0), + m_tid(NULL), + m_flags(0), + m_timed_out(false), + m_terminated(false), + m_closed(false), + m_duped(false), + m_use_locking(false), + m_bwlimit(0), + m_nb_bytes(0), + m_last_tick(0) +{ + pthread_mutex_init(&m_rmutex, NULL); + pthread_mutex_init(&m_wmutex, NULL); + pthread_mutex_init(&m_mmutex, NULL); + bmemzero(&peer_addr, sizeof(peer_addr)); + bmemzero(&client_addr, sizeof(client_addr)); + init(); +}; + +/* + * Default destructor releases resources. + */ +BSOCKCORE::~BSOCKCORE() +{ + Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::~BSOCKCORE()\n"); + _destroy(); +}; + +/* + * Initialization method. + */ +void BSOCKCORE::init() +{ + m_master = this; + set_closed(); + set_terminated(); + m_blocking = 1; + msg = get_pool_memory(PM_BSOCK); + errmsg = get_pool_memory(PM_MESSAGE); + timeout = BSOCKCORE_TIMEOUT; + pout_msg_no = &out_msg_no; +} + +void BSOCKCORE::free_tls() +{ + free_tls_connection(this->tls); + this->tls = NULL; +} + +/* + * Try to connect to host for max_retry_time at retry_time intervals. + * Note, you must have called the constructor prior to calling + * this routine. + */ +bool BSOCKCORE::connect(JCR * jcr, int retry_interval, utime_t max_retry_time, + utime_t heart_beat, + const char *name, char *host, char *service, int port, + int verbose) +{ + bool ok = false; + int i; + int fatal = 0; + time_t begin_time = time(NULL); + time_t now; + btimer_t *tid = NULL; + + /* Try to trap out of OS call when time expires */ + if (max_retry_time) { + tid = start_thread_timer(jcr, pthread_self(), (uint32_t)max_retry_time); + } + + for (i = 0; !open(jcr, name, host, service, port, heart_beat, &fatal); + i -= retry_interval) { + berrno be; + if (fatal || (jcr && job_canceled(jcr))) { + goto bail_out; + } + Dmsg4(50, "Unable to connect to %s on %s:%d. ERR=%s\n", + name, host, port, be.bstrerror()); + if (i < 0) { + i = 60 * 5; /* complain again in 5 minutes */ + if (verbose) + Qmsg4(jcr, M_WARNING, 0, _( + "Could not connect to %s on %s:%d. ERR=%s\n" + "Retrying ...\n"), name, host, port, be.bstrerror()); + } + bmicrosleep(retry_interval, 0); + now = time(NULL); + if (begin_time + max_retry_time <= now) { + Qmsg4(jcr, M_FATAL, 0, _("Unable to connect to %s on %s:%d. ERR=%s\n"), + name, host, port, be.bstrerror()); + goto bail_out; + } + } + ok = true; + +bail_out: + if (tid) { + stop_thread_timer(tid); + } + return ok; +} + +/* + * Finish initialization of the packet structure. + */ +void BSOCKCORE::fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port, + struct sockaddr *lclient_addr) +{ + Dmsg3(100, "who=%s host=%s port=%d\n", who, host, port); + m_fd = sockfd; + if (m_who) { + free(m_who); + } + if (m_host) { + free(m_host); + } + set_who(bstrdup(who)); + set_host(bstrdup(host)); + set_port(port); + memcpy(&client_addr, lclient_addr, sizeof(client_addr)); + set_jcr(jcr); +} + +/* + * Copy the address from the configuration dlist that gets passed in + */ +void BSOCKCORE::set_source_address(dlist *src_addr_list) +{ + IPADDR *addr = NULL; + + // delete the object we already have, if it's allocated + if (src_addr) { + /* TODO: Why free() instead of delete as src_addr is a IPADDR class */ + free( src_addr); + src_addr = NULL; + } + + if (src_addr_list) { + addr = (IPADDR*) src_addr_list->first(); + src_addr = New( IPADDR(*addr)); + } +} + +/* + * Open a TCP connection to the server + * Returns true when connection was successful or false otherwise. + */ +bool BSOCKCORE::open(JCR *jcr, const char *name, char *host, char *service, + int port, utime_t heart_beat, int *fatal) +{ + int sockfd = -1; + dlist *addr_list; + IPADDR *ipaddr; + bool connected = false; + int turnon = 1; + const char *errstr; + int save_errno = 0; + + /* + * Fill in the structure serv_addr with the address of + * the server that we want to connect with. + */ + if ((addr_list = bnet_host2ipaddrs(host, 0, &errstr)) == NULL) { + /* Note errstr is not malloc'ed */ + Qmsg2(jcr, M_ERROR, 0, _("gethostbyname() for host \"%s\" failed: ERR=%s\n"), + host, errstr); + Dmsg2(100, "bnet_host2ipaddrs() for host %s failed: ERR=%s\n", + host, errstr); + *fatal = 1; + return false; + } + + remove_duplicate_addresses(addr_list); + foreach_dlist(ipaddr, addr_list) { + ipaddr->set_port_net(htons(port)); + char allbuf[256 * 10]; + char curbuf[256]; + Dmsg2(100, "Current %sAll %s\n", + ipaddr->build_address_str(curbuf, sizeof(curbuf)), + build_addresses_str(addr_list, allbuf, sizeof(allbuf))); + /* Open a TCP socket */ + if ((sockfd = socket(ipaddr->get_family(), SOCK_STREAM|SOCK_CLOEXEC, 0)) < 0) { + berrno be; + save_errno = errno; + switch (errno) { +#ifdef EAFNOSUPPORT + case EAFNOSUPPORT: + /* + * The name lookup of the host returned an address in a protocol family + * we don't support. Suppress the error and try the next address. + */ + break; +#endif +#ifdef EPROTONOSUPPORT + /* See above comments */ + case EPROTONOSUPPORT: + break; +#endif +#ifdef EPROTOTYPE + /* See above comments */ + case EPROTOTYPE: + break; +#endif + default: + *fatal = 1; + Qmsg3(jcr, M_ERROR, 0, _("Socket open error. proto=%d port=%d. ERR=%s\n"), + ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror()); + Pmsg3(300, _("Socket open error. proto=%d port=%d. ERR=%s\n"), + ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror()); + break; + } + continue; + } + + /* Bind to the source address if it is set */ + if (src_addr) { + if (bind(sockfd, src_addr->get_sockaddr(), src_addr->get_sockaddr_len()) < 0) { + berrno be; + save_errno = errno; + *fatal = 1; + Qmsg2(jcr, M_ERROR, 0, _("Source address bind error. proto=%d. ERR=%s\n"), + src_addr->get_family(), be.bstrerror() ); + Pmsg2(000, _("Source address bind error. proto=%d. ERR=%s\n"), + src_addr->get_family(), be.bstrerror() ); + if (sockfd >= 0) socketClose(sockfd); + continue; + } + } + + /* + * Keep socket from timing out from inactivity + */ + if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) { + berrno be; + Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"), + be.bstrerror()); + } +#if defined(TCP_KEEPIDLE) + if (heart_beat) { + int opt = heart_beat; + if (setsockopt(sockfd, SOL_TCP, TCP_KEEPIDLE, (sockopt_val_t)&opt, sizeof(opt)) < 0) { + berrno be; + Qmsg1(jcr, M_WARNING, 0, _("Cannot set TCP_KEEPIDLE on socket: %s\n"), + be.bstrerror()); + } + } +#endif + + /* connect to server */ + if (::connect(sockfd, ipaddr->get_sockaddr(), ipaddr->get_sockaddr_len()) < 0) { + save_errno = errno; + if (sockfd >= 0) socketClose(sockfd); + continue; + } + *fatal = 0; + connected = true; + break; + } + + if (!connected) { + berrno be; + free_addresses(addr_list); + errno = save_errno | b_errno_win32; + Dmsg4(50, "Could not connect to server %s %s:%d. ERR=%s\n", + name, host, port, be.bstrerror()); + return false; + } + /* + * Keep socket from timing out from inactivity + * Do this a second time out of paranoia + */ + if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) { + berrno be; + Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"), + be.bstrerror()); + } + fin_init(jcr, sockfd, name, host, port, ipaddr->get_sockaddr()); + free_addresses(addr_list); + + /* Clean the packet a bit */ + m_closed = false; + m_duped = false; + // Moved to BSOCK m_spool = false; + m_use_locking = false; + m_timed_out = false; + m_terminated = false; + m_suppress_error_msgs = false; + errors = 0; + m_blocking = 0; + + Dmsg3(50, "OK connected to server %s %s:%d.\n", + name, host, port); + + return true; +} + +/* + * Force read/write to use locking + */ +bool BSOCKCORE::set_locking() +{ + int stat; + if (m_use_locking) { + return true; /* already set */ + } + pm_rmutex = &m_rmutex; + pm_wmutex = &m_wmutex; + if ((stat = pthread_mutex_init(pm_rmutex, NULL)) != 0) { + berrno be; + Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsockcore read mutex. ERR=%s\n"), + be.bstrerror(stat)); + return false; + } + if ((stat = pthread_mutex_init(pm_wmutex, NULL)) != 0) { + berrno be; + Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsockcore write mutex. ERR=%s\n"), + be.bstrerror(stat)); + return false; + } + if ((stat = pthread_mutex_init(&m_mmutex, NULL)) != 0) { + berrno be; + Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsockcore attribute mutex. ERR=%s\n"), + be.bstrerror(stat)); + return false; + } + m_use_locking = true; + return true; +} + +void BSOCKCORE::clear_locking() +{ + if (!m_use_locking || m_duped) { + return; + } + m_use_locking = false; + pthread_mutex_destroy(pm_rmutex); + pthread_mutex_destroy(pm_wmutex); + pthread_mutex_destroy(&m_mmutex); + pm_rmutex = NULL; + pm_wmutex = NULL; + return; +} + +/* + * Send a message over the network. Everything is sent in one write request. + * + * Returns: false on failure + * true on success + */ +bool BSOCKCORE::send() +{ + int32_t rc; + bool ok = true; + bool locked = false; + + if (is_closed()) { + if (!m_suppress_error_msgs) { + Qmsg0(m_jcr, M_ERROR, 0, _("Socket is closed\n")); + } + return false; + } + if (errors) { + if (!m_suppress_error_msgs) { + Qmsg4(m_jcr, M_ERROR, 0, _("Socket has errors=%d on call to %s:%s:%d\n"), + errors, m_who, m_host, m_port); + } + return false; + } + if (is_terminated()) { + if (!m_suppress_error_msgs) { + Qmsg4(m_jcr, M_ERROR, 0, _("BSOCKCORE send while terminated=%d on call to %s:%s:%d\n"), + is_terminated(), m_who, m_host, m_port); + } + return false; + } + + if (msglen > 4000000) { + if (!m_suppress_error_msgs) { + Qmsg4(m_jcr, M_ERROR, 0, + _("Socket has insane msglen=%d on call to %s:%s:%d\n"), + msglen, m_who, m_host, m_port); + } + return false; + } + + if (send_hook_cb) { + if (!send_hook_cb->bsock_send_cb()) { + Dmsg3(1, "Flowcontrol failure on %s:%s:%d\n", m_who, m_host, m_port); + Qmsg3(m_jcr, M_ERROR, 0, _("Flowcontrol failure on %s:%s:%d\n"), m_who, m_host, m_port); + return false; + } + } + if (m_use_locking) { + pP(pm_wmutex); + locked = true; + } + + (*pout_msg_no)++; /* increment message number */ + + /* send data packet */ + timer_start = watchdog_time; /* start timer */ + clear_timed_out(); + /* Full I/O done in one write */ + rc = write_nbytes(msg, msglen); + if (chk_dbglvl(DT_NETWORK|1900)) dump_bsock_msg(m_fd, *pout_msg_no, "SEND", rc, msglen, m_flags, msg, msglen); + timer_start = 0; /* clear timer */ + if (rc != msglen) { + errors++; + if (errno == 0) { + b_errno = EIO; + } else { + b_errno = errno; + } + if (rc < 0) { + if (!m_suppress_error_msgs) { + Qmsg5(m_jcr, M_ERROR, 0, + _("Write error sending %d bytes to %s:%s:%d: ERR=%s\n"), + msglen, m_who, + m_host, m_port, this->bstrerror()); + } + } else { + Qmsg5(m_jcr, M_ERROR, 0, + _("Wrote %d bytes to %s:%s:%d, but only %d accepted.\n"), + msglen, m_who, m_host, m_port, rc); + } + ok = false; + } +// Dmsg4(000, "cmpr=%d ext=%d cmd=%d m_flags=0x%x\n", msglen&BNET_COMPRESSED?1:0, +// msglen&BNET_HDR_EXTEND?1:0, msglen&BNET_CMD_BIT?1:0, m_flags); + if (locked) pV(pm_wmutex); + return ok; +} + +/* + * Format and send a message + * Returns: false on error + * true on success + */ +bool BSOCKCORE::fsend(const char *fmt, ...) +{ + va_list arg_ptr; + int maxlen; + + if (is_null(this)) { + return false; /* do not seg fault */ + } + if (errors || is_terminated() || is_closed()) { + return false; + } + /* This probably won't work, but we vsnprintf, then if we + * get a negative length or a length greater than our buffer + * (depending on which library is used), the printf was truncated, so + * get a bigger buffer and try again. + */ + for (;;) { + maxlen = sizeof_pool_memory(msg) - 1; + va_start(arg_ptr, fmt); + msglen = bvsnprintf(msg, maxlen, fmt, arg_ptr); + va_end(arg_ptr); + if (msglen >= 0 && msglen < (maxlen - 5)) { + break; + } + msg = realloc_pool_memory(msg, maxlen + maxlen / 2); + } + return send(); +} + +/* + * Receive a data from the other end. + * The number of expected bytes in len. + * Returns number of bytes read (may return zero), the msglen is set accordingly. + * Returns -1 on error so msglen will be zero. + */ +int32_t BSOCKCORE::recv(int len) +{ + /* The method has to be redesigned from scratch */ + int32_t nbytes; + bool locked = false; + + msglen = nbytes = 0; + msg[msglen] = 0; + if (errors || is_terminated() || is_closed()) { + /* error, cannot receive */ + return -1; + } + + if (len > 0) { + /* do read only when len > 0 */ + if (m_use_locking) { + pP(pm_rmutex); + locked = true; + } + read_seqno++; /* bump sequence number */ + timer_start = watchdog_time; /* set start wait time */ + clear_timed_out(); + /* Make sure the buffer is big enough + one byte for EOS */ + if (len >= (int32_t) sizeof_pool_memory(msg)) { + msg = realloc_pool_memory(msg, len + 100); + } + timer_start = watchdog_time; /* set start wait time */ + clear_timed_out(); + if ((nbytes = read_nbytes(msg, len)) <= 0) { + timer_start = 0; /* clear timer */ + /* probably pipe broken because client died */ + if (errno == 0) { + b_errno = ENODATA; + } else { + b_errno = errno; + } + nbytes = -1; + errors++; + msglen = 0; /* assume hard EOF received */ + Qmsg4(m_jcr, M_ERROR, 0, _("Read error from %s:%s:%d: ERR=%s\n"), + m_who, m_host, m_port, this->bstrerror()); + goto bailout; + } + timer_start = 0; /* clear timer */ + in_msg_no++; + msglen = nbytes; + /* + * always add a zero by to properly terminate any + * string that was send to us. Note, we ensured above that the + * buffer is at least one byte longer than the message length. + */ + msg[nbytes] = 0; /* terminate in case it is a string */ + /* + * The following uses *lots* of resources so turn it on only for + * serious debugging. + */ + Dsm_check(300); + } + +bailout: + if ((chk_dbglvl(DT_NETWORK|1900))) dump_bsock_msg(m_fd, read_seqno, "GRECV", nbytes, len, m_flags, msg, msglen); + + if (locked) pV(pm_rmutex); + return nbytes; /* return actual length of message or -1 */ +} + +/* + * Return the string for the error that occurred + * on the socket. Only the first error is retained. + */ +const char *BSOCKCORE::bstrerror() +{ + berrno be; + if (errmsg == NULL) { + errmsg = get_pool_memory(PM_MESSAGE); + } + if (b_errno == 0) { + pm_strcpy(errmsg, "I/O Error"); + } else { + pm_strcpy(errmsg, be.bstrerror(b_errno)); + } + return errmsg; +} + +int BSOCKCORE::get_peer(char *buf, socklen_t buflen) +{ +#if !defined(HAVE_WIN32) + if (peer_addr.sin_family == 0) { + socklen_t salen = sizeof(peer_addr); + int rval = (getpeername)(m_fd, (struct sockaddr *)&peer_addr, &salen); + if (rval < 0) return rval; + } + if (!inet_ntop(peer_addr.sin_family, &peer_addr.sin_addr, buf, buflen)) + return -1; + + return 0; +#else + return -1; +#endif +} + +/* + * Set the network buffer size, suggested size is in size. + * Actual size obtained is returned in bs->msglen + * + * Returns: false on failure + * true on success + */ +bool BSOCKCORE::set_buffer_size(uint32_t size, int rw) +{ + uint32_t dbuf_size, start_size; + +#if defined(IP_TOS) && defined(IPTOS_THROUGHPUT) + int opt; + opt = IPTOS_THROUGHPUT; + setsockopt(m_fd, IPPROTO_IP, IP_TOS, (sockopt_val_t)&opt, sizeof(opt)); +#endif + + if (size != 0) { + dbuf_size = size; + } else { + dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE; + } + start_size = dbuf_size; + if ((msg = realloc_pool_memory(msg, dbuf_size + 100)) == NULL) { + Qmsg0(get_jcr(), M_FATAL, 0, _("Could not malloc BSOCKCORE data buffer\n")); + return false; + } + + /* + * If user has not set the size, use the OS default -- i.e. do not + * try to set it. This allows sys admins to set the size they + * want in the OS, and Bacula will comply. See bug #1493 + */ + if (size == 0) { + msglen = dbuf_size; + return true; + } + + if (rw & BNET_SETBUF_READ) { + while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET, + SO_RCVBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) { + berrno be; + Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror()); + dbuf_size -= TAPE_BSIZE; + } + Dmsg1(200, "set network buffer size=%d\n", dbuf_size); + if (dbuf_size != start_size) { + Qmsg1(get_jcr(), M_WARNING, 0, + _("Warning network buffer = %d bytes not max size.\n"), dbuf_size); + } + } + if (size != 0) { + dbuf_size = size; + } else { + dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE; + } + start_size = dbuf_size; + if (rw & BNET_SETBUF_WRITE) { + while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET, + SO_SNDBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) { + berrno be; + Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror()); + dbuf_size -= TAPE_BSIZE; + } + Dmsg1(900, "set network buffer size=%d\n", dbuf_size); + if (dbuf_size != start_size) { + Qmsg1(get_jcr(), M_WARNING, 0, + _("Warning network buffer = %d bytes not max size.\n"), dbuf_size); + } + } + + msglen = dbuf_size; + return true; +} + +/* + * Set socket non-blocking + * Returns previous socket flag + */ +int BSOCKCORE::set_nonblocking() +{ + int oflags; + + /* Get current flags */ + if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) { + berrno be; + Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror()); + } + + /* Set O_NONBLOCK flag */ + if ((fcntl(m_fd, F_SETFL, oflags|O_NONBLOCK)) < 0) { + berrno be; + Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror()); + } + + m_blocking = 0; + return oflags; +} + +/* + * Set socket blocking + * Returns previous socket flags + */ +int BSOCKCORE::set_blocking() +{ + int oflags; + /* Get current flags */ + if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) { + berrno be; + Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror()); + } + + /* Set O_NONBLOCK flag */ + if ((fcntl(m_fd, F_SETFL, oflags & ~O_NONBLOCK)) < 0) { + berrno be; + Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror()); + } + + m_blocking = 1; + return oflags; +} + +void BSOCKCORE::set_killable(bool killable) +{ + if (m_jcr) { + m_jcr->set_killable(killable); + } +} + +/* + * Restores socket flags + */ +void BSOCKCORE::restore_blocking (int flags) +{ + if ((fcntl(m_fd, F_SETFL, flags)) < 0) { + berrno be; + Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror()); + } + + m_blocking = (flags & O_NONBLOCK) ? true : false; +} + +/* + * Wait for a specified time for data to appear on + * the BSOCKCORE connection. + * + * Returns: 1 if data available + * 0 if timeout + * -1 if error + */ +int BSOCKCORE::wait_data(int sec, int msec) +{ + for (;;) { + switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) { + case 0: /* timeout */ + b_errno = 0; + return 0; + case -1: + b_errno = errno; + if (errno == EINTR) { + continue; + } + return -1; /* error return */ + default: + b_errno = 0; +#ifdef HAVE_TLS + if (this->tls && !tls_bsock_probe(this)) { + continue; /* false alarm, maybe a session key negotiation in progress on the socket */ + } +#endif + return 1; + } + } +} + +/* + * As above, but returns on interrupt + */ +int BSOCKCORE::wait_data_intr(int sec, int msec) +{ + switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) { + case 0: /* timeout */ + b_errno = 0; + return 0; + case -1: + b_errno = errno; + return -1; /* error return */ + default: + b_errno = 0; +#ifdef HAVE_TLS + if (this->tls && !tls_bsock_probe(this)) { + /* maybe a session key negotiation waked up the socket */ + return 0; + } +#endif + break; + } + return 1; +} + +/* + * This routine closes the current BSOCKCORE. + * It does not delete the socket packet + * resources, which are released in bsock->destroy(). + */ +#ifndef SHUT_RDWR +#define SHUT_RDWR 2 +#endif + +/* + * The JCR is canceled, set terminate for chained BSOCKCOREs starting from master + */ +void BSOCKCORE::cancel() +{ + master_lock(); + for (BSOCKCORE *next = m_master; next != NULL; next = next->m_next) { + if (!next->m_closed) { + next->m_terminated = true; + next->m_timed_out = true; + } + } + master_unlock(); +} + +/* + * Note, this routine closes the socket, but leaves the + * bsockcore memory in place. + * every thread is responsible of closing and destroying its own duped or not + * duped BSOCKCORE + */ +void BSOCKCORE::close() +{ + BSOCKCORE *bsock = this; + + Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::close()\n"); + if (bsock->is_closed()) { + return; + } + if (!m_duped) { + clear_locking(); + } + bsock->set_closed(); + bsock->set_terminated(); + if (!bsock->m_duped) { + /* Shutdown tls cleanly. */ + if (bsock->tls) { + tls_bsock_shutdown(bsock); + free_tls_connection(bsock->tls); + bsock->tls = NULL; + } + +#ifdef HAVE_WIN32 + if (!bsock->is_timed_out()) { + win_close_wait(bsock->m_fd); /* Ensure that data is not discarded */ + } +#else + if (bsock->is_timed_out()) { + shutdown(bsock->m_fd, SHUT_RDWR); /* discard any pending I/O */ + } +#endif + /* On Windows this discards data if we did not do a close_wait() */ + socketClose(bsock->m_fd); /* normal close */ + } + return; +} + +/* + * Destroy the socket (i.e. release all resources) + */ +void BSOCKCORE::_destroy() +{ + Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::_destroy()\n"); + this->close(); /* Ensure that socket is closed */ + if (msg) { + free_pool_memory(msg); + msg = NULL; + } else { + ASSERT2(1 == 0, "Two calls to destroy socket"); /* double destroy */ + } + if (errmsg) { + free_pool_memory(errmsg); + errmsg = NULL; + } + if (m_who) { + free(m_who); + m_who = NULL; + } + if (m_host) { + free(m_host); + m_host = NULL; + } + if (src_addr) { + free(src_addr); + src_addr = NULL; + } +} + +/* + * Destroy the socket (i.e. release all resources) + * including duped sockets. + * should not be called from duped BSOCKCORE + */ +void BSOCKCORE::destroy() +{ + Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::destroy()\n"); + ASSERTD(reinterpret_cast(m_next) != 0xaaaaaaaaaaaaaaaa, "BSOCKCORE::destroy() already called\n") + ASSERTD(this == m_master, "BSOCKCORE::destroy() called by a non master BSOCKCORE\n") + ASSERTD(!m_duped, "BSOCKCORE::destroy() called by a duped BSOCKCORE\n") + /* I'm the master I must destroy() all the duped BSOCKCOREs */ + master_lock(); + BSOCKCORE *ahead; + for (BSOCKCORE *next = m_next; next != NULL; next = ahead) { + ahead = next->m_next; + Dmsg1(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::destroy():delete(%p)\n", next); + delete(next); + } + master_unlock(); + Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::destroy():delete(this)\n"); + delete(this); +} + +/* Try to limit the bandwidth of a network connection + */ +void BSOCKCORE::control_bwlimit(int bytes) +{ + btime_t now, temp; + if (bytes == 0) { + return; + } + + now = get_current_btime(); /* microseconds */ + temp = now - m_last_tick; /* microseconds */ + + m_nb_bytes += bytes; + + if (temp < 0 || temp > 10000000) { /* Take care of clock problems (>10s) or back in time */ + m_nb_bytes = bytes; + m_last_tick = now; + return; + } + + /* Less than 0.1ms since the last call, see the next time */ + if (temp < 100) { + return; + } + + /* Remove what was authorised to be written in temp us */ + m_nb_bytes -= (int64_t)(temp * ((double)m_bwlimit / 1000000.0)); + + if (m_nb_bytes < 0) { + m_nb_bytes = 0; + } + + /* What exceed should be converted in sleep time */ + int64_t usec_sleep = (int64_t)(m_nb_bytes /((double)m_bwlimit / 1000000.0)); + if (usec_sleep > 100) { + bmicrosleep(usec_sleep/1000000, usec_sleep%1000000); /* TODO: Check that bmicrosleep slept enough or sleep again */ + m_last_tick = get_current_btime(); + m_nb_bytes = 0; + } else { + m_last_tick = now; + } +} + +/* + * Write nbytes to the network. + * It may require several writes. + */ + +int32_t BSOCKCORE::write_nbytes(char *ptr, int32_t nbytes) +{ + int32_t nleft, nwritten; + +#ifdef HAVE_TLS + if (tls) { + /* TLS enabled */ + return (tls_bsock_writen((BSOCK*)this, ptr, nbytes)); + } +#endif /* HAVE_TLS */ + + nleft = nbytes; + while (nleft > 0) { + do { + errno = 0; + nwritten = socketWrite(m_fd, ptr, nleft); + if (is_timed_out() || is_terminated()) { + return -1; + } + +#ifdef HAVE_WIN32 + /* + * We simulate errno on Windows for a socket + * error in order to handle errors correctly. + */ + if (nwritten == SOCKET_ERROR) { + DWORD err = WSAGetLastError(); + nwritten = -1; + if (err == WSAEINTR) { + errno = EINTR; + } else if (err == WSAEWOULDBLOCK) { + errno = EAGAIN; + } else { + errno = EIO; /* some other error */ + } + } +#endif + + } while (nwritten == -1 && errno == EINTR); + /* + * If connection is non-blocking, we will get EAGAIN, so + * use select()/poll to keep from consuming all the CPU + * and try again. + */ + if (nwritten == -1 && errno == EAGAIN) { + fd_wait_data(m_fd, WAIT_WRITE, 1, 0); + continue; + } + if (nwritten <= 0) { + return -1; /* error */ + } + nleft -= nwritten; + ptr += nwritten; + if (use_bwlimit()) { + control_bwlimit(nwritten); + } + } + return nbytes - nleft; +} + +/* + * Read a nbytes from the network. + * It is possible that the total bytes require in several + * read requests + */ + +int32_t BSOCKCORE::read_nbytes(char *ptr, int32_t nbytes) +{ + int32_t nleft, nread; + +#ifdef HAVE_TLS + if (tls) { + /* TLS enabled */ + return (tls_bsock_readn((BSOCK*)this, ptr, nbytes)); + } +#endif /* HAVE_TLS */ + + nleft = nbytes; + while (nleft > 0) { + errno = 0; + nread = socketRead(m_fd, ptr, nleft); + if (is_timed_out() || is_terminated()) { + return -1; + } + +#ifdef HAVE_WIN32 + /* + * We simulate errno on Windows for a socket + * error in order to handle errors correctly. + */ + if (nread == SOCKET_ERROR) { + DWORD err = WSAGetLastError(); + nread = -1; + if (err == WSAEINTR) { + errno = EINTR; + } else if (err == WSAEWOULDBLOCK) { + errno = EAGAIN; + } else { + errno = EIO; /* some other error */ + } + } +#endif + + if (nread == -1) { + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN) { + bmicrosleep(0, 20000); /* try again in 20ms */ + continue; + } + } + if (nread <= 0) { + return -1; /* error, or EOF */ + } + nleft -= nread; + ptr += nread; + if (use_bwlimit()) { + control_bwlimit(nread); + } + } + return nbytes - nleft; /* return >= 0 */ +} + +#ifdef HAVE_WIN32 +/* + * closesocket is supposed to do a graceful disconnect under Window + * but it doesn't. Comments on http://msdn.microsoft.com/en-us/li + * confirm this behaviour. DisconnectEx is required instead, but + * that function needs to be retrieved via WS IOCTL + */ +static void +win_close_wait(int fd) +{ + int ret; + GUID disconnectex_guid = WSAID_DISCONNECTEX; + DWORD bytes_returned; + LPFN_DISCONNECTEX DisconnectEx; + ret = WSAIoctl(fd, SIO_GET_EXTENSION_FUNCTION_POINTER, &disconnectex_guid, sizeof(disconnectex_guid), &DisconnectEx, sizeof(DisconnectEx), &bytes_returned, NULL, NULL); + Dmsg1(100, "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER, WSAID_DISCONNECTEX) ret = %d\n", ret); + if (!ret) { + DisconnectEx(fd, NULL, 0, 0); + } +} +#endif + +#ifndef TEST_PROGRAM +#define TEST_PROGRAM_A +#endif + +void BSOCKCORE::dump() +{ +#ifdef TEST_PROGRAM + char ed1[50]; + Pmsg1(-1, "BSOCKCORE::dump(): %p\n", this); + Pmsg1(-1, "\tmsg: %p\n", msg); + Pmsg1(-1, "\terrmsg: %p\n", errmsg); + Pmsg1(-1, "\tres: %p\n", res); + Pmsg1(-1, "\ttls: %p\n", tls); + Pmsg1(-1, "\tsrc_addr: %p\n", src_addr); + Pmsg1(-1, "\tread_seqno: %s\n", edit_uint64(read_seqno, ed1)); + Pmsg1(-1, "\tin_msg_no: %s\n", edit_uint64(in_msg_no, ed1)); + Pmsg1(-1, "\tout_msg_no: %s\n", edit_uint64(out_msg_no, ed1)); + Pmsg1(-1, "\tpout_msg_no: %p\n", pout_msg_no); + Pmsg1(-1, "\tmsglen: %s\n", edit_int64(msglen, ed1)); + Pmsg1(-1, "\ttimer_start: %ld\n", timer_start); + Pmsg1(-1, "\ttimeout: %ld\n", timeout); + Pmsg1(-1, "\tm_fd: %d\n", m_fd); + Pmsg1(-1, "\tb_errno: %d\n", b_errno); + Pmsg1(-1, "\tm_blocking: %d\n", m_blocking); + Pmsg1(-1, "\terrors: %d\n", errors); + Pmsg1(-1, "\tm_suppress_error_msgs: %s\n", m_suppress_error_msgs?"true":"false"); +// Pmsg1(0, "\tclient_addr:{ } struct sockaddr client_addr; /* client's IP address */ +// Pmsg1(0, "\tstruct sockaddr_in peer_addr; /* peer's IP address */ + Pmsg1(-1, "\tsend_hook_cb: %p\n", send_hook_cb); + Pmsg1(-1, "\tm_master: %p\n", m_master); + Pmsg1(-1, "\tm_next: %p\n", m_next); + Pmsg1(-1, "\tm_jcr: %p\n", m_jcr); +// pthread_mutex_t m_rmutex; /* for read locking if use_locking set */ +// pthread_mutex_t m_wmutex; /* for write locking if use_locking set */ +// mutable pthread_mutex_t m_mmutex; /* when accessing the master/next chain */ +// pthread_mutex_t *pm_rmutex; /* Pointer to the read mutex */ +// pthread_mutex_t *pm_wmutex; /* Pointer to the write mutex */ + Pmsg1(-1, "\tm_who: %p\n", m_who); + Pmsg1(-1, "\tm_host: %p\n", m_host); + Pmsg1(-1, "\tm_port: %d\n", m_port); + Pmsg1(-1, "\tm_tid: %p\n", m_tid); + Pmsg1(-1, "\tm_flags: %s\n", edit_uint64(m_flags, ed1)); + Pmsg1(-1, "\tm_timed_out: %s\n", m_timed_out?"true":"false"); + Pmsg1(-1, "\tm_terminated: %s\n", m_terminated?"true":"false"); + Pmsg1(-1, "\tm_closed: %s\n", m_closed?"true":"false"); + Pmsg1(-1, "\tm_duped: %s\n", m_duped?"true":"false"); + Pmsg1(-1, "\tm_use_locking: %s\n", m_use_locking?"true":"false"); + Pmsg1(-1, "\tm_bwlimit: %s\n", edit_int64(m_bwlimit, ed1)); + Pmsg1(-1, "\tm_nb_bytes: %s\n", edit_int64(m_nb_bytes, ed1)); + Pmsg1(-1, "\tm_last_tick: %s\n", edit_int64(m_last_tick, ed1)); +#endif +}; + +#ifdef TEST_PROGRAM + +void terminate(int sig){}; +void free_my_jcr(JCR *jcr){ + /* TODO: handle full JCR free */ + free_jcr(jcr); +}; + +int main() +{ + BSOCKCORE *bs; + pid_t pid; + int rc; + char *host = (char*)"localhost"; + char *name = (char*)"Test"; + JCR *jcr; + + debug_level = 500; + my_name_is(0, NULL, "bsockcore_test"); + init_signals(terminate); + lmgr_init_thread(); /* initialize the lockmanager stack */ + + jcr = new_jcr(sizeof(JCR), NULL); + bs = New(BSOCKCORE); + Pmsg0(0, "Initialize ...\n"); + bs->set_jcr(jcr); + bs->dump(); + + pid = fork(); + if (0 == pid){ + Pmsg0(0, "prepare to execute netcat\n"); + rc = execl("/bin/netcat", "netcat", "-p", "20000", "-l", NULL); + Pmsg1(0, "Error executing netcat: %s\n", strerror(rc)); + exit(1); + } + Pmsg1(0, "after fork: %d\n", pid); + bmicrosleep(1, 0); + if (bs->connect(jcr, 1, 10, 0, name, host, NULL, 20000, 0)) { + /* connected */ + Pmsg0(0, "connected ...\n"); + bs->dump(); + } else { + Pmsg1(0, "connection error: %s\n", bs->bstrerror()); + } + kill(pid, SIGTERM); + delete(bs); + free_my_jcr(jcr); + lmgr_cleanup_main(); + // sm_dump(false); +}; +#endif /* TEST_PROGRAM */ diff --git a/bacula/src/lib/bsockcore.h b/bacula/src/lib/bsockcore.h new file mode 100644 index 000000000..662baf318 --- /dev/null +++ b/bacula/src/lib/bsockcore.h @@ -0,0 +1,219 @@ +/* + Bacula(R) - The Network Backup Solution + + Copyright (C) 2000-2017 Kern Sibbald + + The original author of Bacula is Kern Sibbald, with contributions + from many others, a complete list can be found in the file AUTHORS. + + You may use this file and others of this release according to the + license defined in the LICENSE file, which includes the Affero General + Public License, v3.0 ("AGPLv3") and some additional permissions and + terms pursuant to its AGPLv3 Section 7. + + This notice must be preserved when any source code is + conveyed and/or propagated. + + Bacula(R) is a registered trademark of Kern Sibbald. +*/ +/* + * Bacula Core Sock Class definition + * + * Kern Sibbald, May MM + * + * Major refactoring of BSOCK code written by: + * + * Radosław Korzeniewski, MMXVIII + * radoslaw@korzeniewski.net, radekk@inteos.pl + * Inteos Sp. z o.o. http://www.inteos.pl/ + * + * This is a common class for socket network communication derived from + * BSOCK class. It acts as a base class for non-Bacula network communication + * and as a base class for standard BSOCK implementation. Basically the BSOCK + * class did not changed its functionality for any Bacula specific part. + * Now you can use a BSOCKCLASS for other network communication. + */ + +#ifndef __BSOCKCORE_H_ +#define __BSOCKCORE_H_ + +#define BSOCKCORE_TIMEOUT 3600 * 24 * 5; /* default 5 days */ + +struct btimer_t; /* forward reference */ +class BSOCKCORE; +btimer_t *start_bsock_timer(BSOCKCORE *bs, uint32_t wait); +void stop_bsock_timer(btimer_t *wid); +void dump_bsock_msg(int sock, uint32_t msgno, const char *what, uint32_t rc, int32_t pktsize, uint32_t flags, + POOLMEM *msg, int32_t msglen); + +class BSOCKCallback { +public: + BSOCKCallback(); + virtual ~BSOCKCallback(); + virtual bool bsock_send_cb() = 0; +}; + +class BSOCKCORE: public SMARTALLOC { +/* + * Note, keep this public part before the private otherwise + * bat breaks on some systems such as RedHat. + */ +public: + POOLMEM *msg; /* message pool buffer */ + POOLMEM *errmsg; /* edited error message */ + RES *res; /* Resource to which we are connected */ + TLS_CONNECTION *tls; /* associated tls connection */ + IPADDR *src_addr; /* IP address to source connections from */ + uint64_t read_seqno; /* read sequence number */ + uint32_t in_msg_no; /* input message number */ + uint32_t out_msg_no; /* output message number */ + uint32_t *pout_msg_no; /* pointer to the above */ + int32_t msglen; /* message length */ + volatile time_t timer_start; /* time started read/write */ + volatile time_t timeout; /* timeout BSOCKCORE after this interval */ + int m_fd; /* socket file descriptor */ + int b_errno; /* bsockcore errno */ + int m_blocking; /* blocking state (0 = nonblocking, 1 = blocking) */ + volatile int errors; /* incremented for each error on socket */ + volatile bool m_suppress_error_msgs; /* set to suppress error messages */ + /* when "installed", send_hook_cb->bsock_send_cb() is called before + * any ::send(). */ + BSOCKCallback *send_hook_cb; + struct sockaddr client_addr; /* client's IP address */ + struct sockaddr_in peer_addr; /* peer's IP address */ + +protected: + /* m_master is used by "duped" BSOCKCORE to access some attributes of the "parent" + * thread to have an up2date status (for example when the job is canceled, + * the "parent" BSOCKCORE is "terminated", but the duped BSOCKCORE is unchanged) + * In the future more attributes and method could use the "m_master" + * indirection. + * master->m_rmutex could replace pm_rmutex, idem for the (w)rite" mutex + * "m_master->error" should be incremented instead of "error", but + * this require a lock. + * + * USAGE: the parent thread MUST be sure that the child thread have quit + * before to free the "parent" BSOCKCORE. + */ + BSOCKCORE *m_next; /* next BSOCKCORE if duped (not actually used) */ + JCR *m_jcr; /* jcr or NULL for error msgs */ + pthread_mutex_t m_rmutex; /* for read locking if use_locking set */ + pthread_mutex_t m_wmutex; /* for write locking if use_locking set */ + mutable pthread_mutex_t m_mmutex; /* when accessing the master/next chain */ + pthread_mutex_t *pm_rmutex; /* Pointer to the read mutex */ + pthread_mutex_t *pm_wmutex; /* Pointer to the write mutex */ + char *m_who; /* Name of daemon to which we are talking */ + char *m_host; /* Host name/IP */ + int m_port; /* desired port */ + btimer_t *m_tid; /* timer id */ + uint32_t m_flags; /* Special flags */ + volatile bool m_timed_out: 1; /* timed out in read/write */ + volatile bool m_terminated: 1; /* set when BNET_TERMINATE arrives */ + bool m_closed: 1; /* set when socket is closed */ + bool m_duped: 1; /* set if duped BSOCKCORE */ + bool m_use_locking; /* set to use locking (out of a bitfield */ + /* to avoid race conditions) */ + int64_t m_bwlimit; /* set to limit bandwidth */ + int64_t m_nb_bytes; /* bytes sent/recv since the last tick */ + btime_t m_last_tick; /* last tick used by bwlimit */ + + void fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port, + struct sockaddr *lclient_addr); + virtual bool open(JCR *jcr, const char *name, char *host, char *service, + int port, utime_t heart_beat, int *fatal); + void master_lock() const { if (m_use_locking) pP((&m_mmutex)); }; + void master_unlock() const { if (m_use_locking) pV((&m_mmutex)); }; + virtual void init(); + void _destroy(); /* called by destroy() */ + virtual int32_t write_nbytes(char *ptr, int32_t nbytes); + virtual int32_t read_nbytes(char *ptr, int32_t nbytes); + +public: + BSOCKCORE *m_master; /* "this" or the "parent" BSOCK if duped */ + /* methods -- in bsockcore.c */ + BSOCKCORE(); + virtual ~BSOCKCORE(); + void free_tls(); + bool connect(JCR * jcr, int retry_interval, utime_t max_retry_time, + utime_t heart_beat, const char *name, char *host, + char *service, int port, int verbose); + virtual int32_t recv(int len); + virtual bool send(); + bool fsend(const char*, ...); + void close(); /* close connection and destroy packet */ + void destroy(); /* destroy socket packet */ + const char *bstrerror(); /* last error on socket */ + int get_peer(char *buf, socklen_t buflen); + bool set_buffer_size(uint32_t size, int rw); + int set_nonblocking(); + int set_blocking(); + void restore_blocking(int flags); + void set_killable(bool killable); + int wait_data(int sec, int msec=0); + int wait_data_intr(int sec, int msec=0); + bool set_locking(); + void clear_locking(); + void set_source_address(dlist *src_addr_list); + void control_bwlimit(int bytes); + + /* Inline functions */ + void suppress_error_messages(bool flag) { m_suppress_error_msgs = flag; }; + void set_jcr(JCR *jcr) { m_jcr = jcr; }; + void set_who(char *who) { m_who = who; }; + void set_host(char *host) { m_host = host; }; + void set_port(int port) { m_port = port; }; + char *who() const { return m_who; }; + char *host() const { return m_host; }; + int port() const { return m_port; }; + JCR *jcr() const { return m_jcr; }; + JCR *get_jcr() const { return m_jcr; }; + bool is_duped() const { return m_duped; }; + bool is_terminated() const { return m_terminated; }; + bool is_timed_out() const { return m_timed_out; }; + bool is_closed() const { return m_closed; }; + bool is_open() const { return !m_closed; }; + bool is_stop() const { return errors || is_terminated() || is_closed(); }; + bool is_error() { errno = b_errno; return errors; }; + void set_bwlimit(int64_t maxspeed) { m_bwlimit = maxspeed; }; + bool use_bwlimit() { return m_bwlimit > 0;}; + void set_duped() { m_duped = true; }; + void set_master(BSOCKCORE *master) { + master_lock(); + m_master = master; + m_next = master->m_next; + master->m_next = this; + master_unlock(); + }; + void set_timed_out() { m_timed_out = true; }; + void clear_timed_out() { m_timed_out = false; }; + void set_terminated() { m_terminated = true; }; + void set_closed() { m_closed = true; }; + void start_timer(int sec) { m_tid = start_bsock_timer(this, sec); }; + void stop_timer() { stop_bsock_timer(m_tid); }; + void swap_msgs(); + void install_send_hook_cb(BSOCKCallback *obj) { send_hook_cb=obj; }; + void uninstall_send_hook_cb() { send_hook_cb=NULL; }; + void cancel(); /* call it when JCR is canceled */ +#ifdef HAVE_WIN32 + int socketRead(int fd, void *buf, size_t len) { return ::recv(fd, (char *)buf, len, 0); }; + int socketWrite(int fd, void *buf, size_t len) { return ::send(fd, (const char*)buf, len, 0); }; + int socketClose(int fd) { return ::closesocket(fd); }; +#else + int socketRead(int fd, void *buf, size_t len) { return ::read(fd, buf, len); }; + int socketWrite(int fd, void *buf, size_t len) { return ::write(fd, buf, len); }; + int socketClose(int fd) { return ::close(fd); }; +#endif + void dump(); +}; + +/* + * Completely release the socket packet, and NULL the pointer + */ +#define free_bsockcore(a) do{if(a){(a)->destroy(); (a)=NULL;}} while(0) + +/* + * Does the socket exist and is it open? + */ +#define is_bsockcore_open(a) ((a) && (a)->is_open()) + +#endif /* __BSOCKCORE_H_ */ diff --git a/bacula/src/lib/btimers.c b/bacula/src/lib/btimers.c index ff04d4c4f..64bbdcf40 100644 --- a/bacula/src/lib/btimers.c +++ b/bacula/src/lib/btimers.c @@ -11,7 +11,7 @@ Public License, v3.0 ("AGPLv3") and some additional permissions and terms pursuant to its AGPLv3 Section 7. - This notice must be preserved when any source code is + This notice must be preserved when any source code is conveyed and/or propagated. Bacula(R) is a registered trademark of Kern Sibbald. @@ -153,7 +153,7 @@ btimer_t *start_thread_timer(JCR *jcr, pthread_t tid, uint32_t wait) * Returns: btimer_t *(pointer to btimer_t struct) on success * NULL on failure */ -btimer_t *start_bsock_timer(BSOCK *bsock, uint32_t wait) +btimer_t *_start_bsock_timer(BSOCK *bsock, uint32_t wait) { btimer_t *wid; if (wait <= 0) { /* wait should be > 0 */ @@ -179,6 +179,28 @@ btimer_t *start_bsock_timer(BSOCK *bsock, uint32_t wait) return wid; } +/* + * Start a timer on a BSOCK. kill it after wait seconds. + * + * Returns: btimer_t *(pointer to btimer_t struct) on success + * NULL on failure + */ +btimer_t *start_bsock_timer(BSOCKCORE *bsock, uint32_t wait) +{ + return _start_bsock_timer((BSOCK*)bsock, wait); +}; + +/* + * Start a timer on a BSOCK. kill it after wait seconds. + * + * Returns: btimer_t *(pointer to btimer_t struct) on success + * NULL on failure + */ +btimer_t *start_bsock_timer(BSOCK *bsock, uint32_t wait) +{ + return _start_bsock_timer(bsock, wait); +}; + /* * Stop bsock timer */ diff --git a/bacula/src/lib/lib.h b/bacula/src/lib/lib.h index 88a55cac3..11f970c60 100644 --- a/bacula/src/lib/lib.h +++ b/bacula/src/lib/lib.h @@ -11,7 +11,7 @@ Public License, v3.0 ("AGPLv3") and some additional permissions and terms pursuant to its AGPLv3 Section 7. - This notice must be preserved when any source code is + This notice must be preserved when any source code is conveyed and/or propagated. Bacula(R) is a registered trademark of Kern Sibbald. @@ -47,6 +47,7 @@ #include "bjson.h" #include "tls.h" #include "address_conf.h" +#include "bsockcore.h" #include "bsock.h" #include "workq.h" #ifndef HAVE_FNMATCH diff --git a/bacula/src/lib/protos.h b/bacula/src/lib/protos.h index e9730a6b0..939546be2 100644 --- a/bacula/src/lib/protos.h +++ b/bacula/src/lib/protos.h @@ -338,10 +338,10 @@ TLS_CONNECTION *new_tls_connection (TLS_CONTEXT *ctx, int fd); bool tls_bsock_accept (BSOCK *bsock); int tls_bsock_writen (BSOCK *bsock, char *ptr, int32_t nbytes); int tls_bsock_readn (BSOCK *bsock, char *ptr, int32_t nbytes); -bool tls_bsock_probe (BSOCK *bsock); +bool tls_bsock_probe (BSOCKCORE *bsock); #endif /* HAVE_TLS */ bool tls_bsock_connect (BSOCK *bsock); -void tls_bsock_shutdown (BSOCK *bsock); +void tls_bsock_shutdown (BSOCKCORE *bsock); void free_tls_connection (TLS_CONNECTION *tls); bool get_tls_require (TLS_CONTEXT *ctx); bool get_tls_enable (TLS_CONTEXT *ctx); diff --git a/bacula/src/lib/tls.c b/bacula/src/lib/tls.c index 9634dc951..f607533bb 100644 --- a/bacula/src/lib/tls.c +++ b/bacula/src/lib/tls.c @@ -11,7 +11,7 @@ Public License, v3.0 ("AGPLv3") and some additional permissions and terms pursuant to its AGPLv3 Section 7. - This notice must be preserved when any source code is + This notice must be preserved when any source code is conveyed and/or propagated. Bacula(R) is a registered trademark of Kern Sibbald. @@ -79,15 +79,6 @@ static int openssl_verify_peer(int ok, X509_STORE_CTX *store) char issuer[256]; char subject[256]; - if (err == X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT || - err == X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN) - { - Jmsg0(NULL, M_ERROR, 0, _("CA certificate is self signed. With OpenSSL 1.1, enforce basicConstraints = CA:true in the certificate creation to avoid this issue.\n")); - } else if (err == X509_V_ERR_INVALID_CA) { - Jmsg0(NULL, M_ERROR, 0, _("CA certificate is invalid (possibly self signed). With OpenSSL 1.1, enforce basicConstraints = CA:true in the certificate creation to avoid this issue.\n")); - } - - X509_NAME_oneline(X509_get_issuer_name(cert), issuer, 256); X509_NAME_oneline(X509_get_subject_name(cert), subject, 256); @@ -136,7 +127,7 @@ TLS_CONTEXT *new_tls_context(const char *ca_certfile, const char *ca_certdir, #endif /* Use SSL_OP_ALL to turn on all "rather harmless" workarounds that - * OpenSSL offers + * OpenSSL offers */ SSL_CTX_set_options(ctx->openssl, SSL_OP_ALL); @@ -580,7 +571,7 @@ bool tls_bsock_accept(BSOCK *bsock) /* * Shutdown TLS_CONNECTION instance */ -void tls_bsock_shutdown(BSOCK *bsock) +void tls_bsock_shutdown(BSOCKCORE *bsock) { /* * SSL_shutdown must be called twice to fully complete the process - @@ -734,7 +725,7 @@ int tls_bsock_readn(BSOCK *bsock, char *ptr, int32_t nbytes) } /* test if 4 bytes can be read without "blocking" */ -bool tls_bsock_probe(BSOCK *bsock) +bool tls_bsock_probe(BSOCKCORE *bsock) { int32_t pktsiz; return SSL_peek(bsock->tls->openssl, &pktsiz, sizeof(pktsiz))==sizeof(pktsiz);