#
INCLUDE_FILES = ../baconfig.h ../bacula.h ../bc_types.h \
../config.h ../jcr.h ../version.h \
- address_conf.h alist.h attr.h base64.h \
+ address_conf.h alist.h attr.h base64.h bsockcore.h \
berrno.h bits.h bjson.h bpipe.h breg.h bregex.h \
bsock.h btime.h btimers.h crypto.h dlist.h \
flist.h fnmatch.h guid_to_name.h htable.h lex.h \
cram-md5.c crc32.c crypto.c daemon.c edit.c fnmatch.c \
guid_to_name.c hmac.c jcr.c lex.c lz4.c alist.c dlist.c \
md5.c message.c mem_pool.c openssl.c \
- plugins.c priv.c queue.c bregex.c \
+ plugins.c priv.c queue.c bregex.c bsockcore.c \
runscript.c rwlock.c scan.c sellist.c serial.c sha1.c sha2.c \
signal.c smartall.c rblist.c tls.c tree.c \
util.c var.c watchdog.c workq.c btimers.c \
$(RMF) sellist.o
$(CXX) $(DEFS) $(DEBUG) -c $(CPPFLAGS) -I$(srcdir) -I$(basedir) $(DINCLUDE) $(CFLAGS) sellist.c
-
xml_test: Makefile xml.o
$(RMF) xml.o
$(CXX) -DTEST_PROG $(DEFS) $(DEBUG) -c $(CPPFLAGS) -I$(srcdir) -I$(basedir) $(DINCLUDE) $(CFLAGS) xml.c
$(RMF) ini.o
$(CXX) $(DEFS) $(DEBUG) -c $(CPPFLAGS) -I$(srcdir) -I$(basedir) $(DINCLUDE) $(CFLAGS) ini.c
+bsockcore_test: Makefile bsockcore.o
+ $(RMF) bsockcore.o
+ $(CXX) -DTEST_PROGRAM $(DEFS) $(DEBUG) -c $(CPPFLAGS) -I$(srcdir) -I$(basedir) $(DINCLUDE) $(CFLAGS) bsockcore.c
+ $(LIBTOOL_LINK) $(CXX) $(LDFLAGS) -L. -o $@ bsockcore.o $(DLIB) -lbac -lm $(LIBS)
+ $(RMF) bsockcore.o
+ $(CXX) $(DEFS) $(DEBUG) -c $(CPPFLAGS) -I$(srcdir) -I$(basedir) $(DINCLUDE) $(CFLAGS) bsockcore.c
+
+bsock_test: Makefile bsock.o
+ $(RMF) bsock.o
+ $(CXX) -DTEST_PROGRAM $(DEFS) $(DEBUG) -c $(CPPFLAGS) -I$(srcdir) -I$(basedir) $(DINCLUDE) $(CFLAGS) bsock.c
+ $(LIBTOOL_LINK) $(CXX) $(LDFLAGS) -L. -o $@ bsock.o $(DLIB) -lbac -lm $(LIBS)
+ $(RMF) bsock.o
+ $(CXX) $(DEFS) $(DEBUG) -c $(CPPFLAGS) -I$(srcdir) -I$(basedir) $(DINCLUDE) $(CFLAGS) bsock.c
+
install-includes:
$(MKDIR) $(DESTDIR)/$(includedir)/bacula
for I in $(INCLUDE_FILES); do \
static pthread_mutex_t ip_mutex = PTHREAD_MUTEX_INITIALIZER;
#endif
-/*
- * Read a nbytes from the network.
- * It is possible that the total bytes require in several
- * read requests
- */
-
-int32_t read_nbytes(BSOCK * bsock, char *ptr, int32_t nbytes)
-{
- int32_t nleft, nread;
-
-#ifdef HAVE_TLS
- if (bsock->tls) {
- /* TLS enabled */
- return (tls_bsock_readn(bsock, ptr, nbytes));
- }
-#endif /* HAVE_TLS */
-
- nleft = nbytes;
- while (nleft > 0) {
- errno = 0;
- nread = socketRead(bsock->m_fd, ptr, nleft);
- if (bsock->is_timed_out() || bsock->is_terminated()) {
- return -1;
- }
-
-#ifdef HAVE_WIN32
- /*
- * We simulate errno on Windows for a socket
- * error in order to handle errors correctly.
- */
- if (nread == SOCKET_ERROR) {
- DWORD err = WSAGetLastError();
- nread = -1;
- if (err == WSAEINTR) {
- errno = EINTR;
- } else if (err == WSAEWOULDBLOCK) {
- errno = EAGAIN;
- } else {
- errno = EIO; /* some other error */
- }
- }
-#endif
-
- if (nread == -1) {
- if (errno == EINTR) {
- continue;
- }
- if (errno == EAGAIN) {
- bmicrosleep(0, 20000); /* try again in 20ms */
- continue;
- }
- }
- if (nread <= 0) {
- return -1; /* error, or EOF */
- }
- nleft -= nread;
- ptr += nread;
- if (bsock->use_bwlimit()) {
- bsock->control_bwlimit(nread);
- }
- }
- return nbytes - nleft; /* return >= 0 */
-}
-
-/*
- * Write nbytes to the network.
- * It may require several writes.
- */
-
-int32_t write_nbytes(BSOCK * bsock, char *ptr, int32_t nbytes)
-{
- int32_t nleft, nwritten;
-
- if (bsock->is_spooling()) {
- nwritten = fwrite(ptr, 1, nbytes, bsock->m_spool_fd);
- if (nwritten != nbytes) {
- berrno be;
- bsock->b_errno = errno;
- Qmsg3(bsock->jcr(), M_FATAL, 0, _("Attr spool write error. wrote=%d wanted=%d bytes. ERR=%s\n"),
- nbytes, nwritten, be.bstrerror());
- Dmsg2(400, "nwritten=%d nbytes=%d.\n", nwritten, nbytes);
- errno = bsock->b_errno;
- return -1;
- }
- return nbytes;
- }
-
-#ifdef HAVE_TLS
- if (bsock->tls) {
- /* TLS enabled */
- return (tls_bsock_writen(bsock, ptr, nbytes));
- }
-#endif /* HAVE_TLS */
-
- nleft = nbytes;
- while (nleft > 0) {
- do {
- errno = 0;
- nwritten = socketWrite(bsock->m_fd, ptr, nleft);
- if (bsock->is_timed_out() || bsock->is_terminated()) {
- return -1;
- }
-
-#ifdef HAVE_WIN32
- /*
- * We simulate errno on Windows for a socket
- * error in order to handle errors correctly.
- */
- if (nwritten == SOCKET_ERROR) {
- DWORD err = WSAGetLastError();
- nwritten = -1;
- if (err == WSAEINTR) {
- errno = EINTR;
- } else if (err == WSAEWOULDBLOCK) {
- errno = EAGAIN;
- } else {
- errno = EIO; /* some other error */
- }
- }
-#endif
-
- } while (nwritten == -1 && errno == EINTR);
- /*
- * If connection is non-blocking, we will get EAGAIN, so
- * use select()/poll to keep from consuming all the CPU
- * and try again.
- */
- if (nwritten == -1 && errno == EAGAIN) {
- fd_wait_data(bsock->m_fd, WAIT_WRITE, 1, 0);
- continue;
- }
- if (nwritten <= 0) {
- return -1; /* error */
- }
- nleft -= nwritten;
- ptr += nwritten;
- if (bsock->use_bwlimit()) {
- bsock->control_bwlimit(nwritten);
- }
- }
- return nbytes - nleft;
-}
-
/*
* Establish a TLS connection -- server side
* Returns: true on success
#endif
#if defined(HAVE_GETADDRINFO)
-/*
+/*
* getaddrinfo.c - Simple example of using getaddrinfo(3) function.
- *
+ *
* Michal Ludvig <michal@logix.cz> (c) 2002, 2003
* http://www.logix.cz/michal/devel/
*
* License: public domain.
*/
-const char *resolv_host(int family, const char *host, dlist *addr_list)
-{
+const char *resolv_host(int family, const char *host, dlist *addr_list)
+{
IPADDR *ipaddr;
struct addrinfo hints, *res, *rp;
int errcode;
void *ptr;
memset (&hints, 0, sizeof(hints));
- hints.ai_family = family;
- hints.ai_socktype = SOCK_STREAM;
+ hints.ai_family = family;
+ hints.ai_socktype = SOCK_STREAM;
//hints.ai_flags |= AI_CANONNAME;
errcode = getaddrinfo (host, NULL, &hints, &res);
for (rp=res; res; res=res->ai_next) {
//inet_ntop (res->ai_family, res->ai_addr->sa_data, addrstr, 100);
switch (res->ai_family) {
- case AF_INET:
+ case AF_INET:
ipaddr = New(IPADDR(rp->ai_addr->sa_family));
ipaddr->set_type(IPADDR::R_MULTIPLE);
ptr = &((struct sockaddr_in *) res->ai_addr)->sin_addr;
ipaddr->set_addr4((in_addr *)ptr);
- break;
+ break;
#if defined(HAVE_IPV6)
- case AF_INET6:
+ case AF_INET6:
ipaddr = New(IPADDR(rp->ai_addr->sa_family));
ipaddr->set_type(IPADDR::R_MULTIPLE);
ptr = &((struct sockaddr_in6 *) res->ai_addr)->sin6_addr;
ipaddr->set_addr6((in6_addr *)ptr);
- break;
-#endif
- default:
- continue;
- }
+ break;
+#endif
+ default:
+ continue;
+ }
//inet_ntop (res->ai_family, ptr, addrstr, 100);
//Pmsg3(000, "IPv%d address: %s (%s)\n", res->ai_family == PF_INET6 ? 6 : 4,
// addrstr, res->ai_canonname);
addr_list->append(ipaddr);
- }
+ }
freeaddrinfo(rp);
- return NULL;
-}
+ return NULL;
+}
-#else
+#else
/*
* Get human readable error for gethostbyname()
addr->set_addr4(&inaddr);
addr_list->append(addr);
#ifdef HAVE_IPV6
- } else if (inet_pton(AF_INET6, host, &inaddr6) == 1) {
+ } else if (inet_pton(AF_INET6, host, &inaddr6) == 1) {
addr = New(IPADDR(AF_INET6));
addr->set_type(IPADDR::R_MULTIPLE);
addr->set_addr6(&inaddr6);
addr_list->append(addr);
#endif
- } else {
+ } else {
if (family != 0) {
errmsg = resolv_host(family, host, addr_list);
if (errmsg) {
static char buf[30];
switch (msglen) {
case BNET_EOD:
- return "BNET_EOD"; /* End of data stream, new data may follow */
+ return "BNET_EOD"; /* End of data stream, new data may follow */
case BNET_EOD_POLL:
- return "BNET_EOD_POLL"; /* End of data and poll all in one */
+ return "BNET_EOD_POLL"; /* End of data and poll all in one */
case BNET_STATUS:
- return "BNET_STATUS"; /* Send full status */
+ return "BNET_STATUS"; /* Send full status */
case BNET_TERMINATE:
- return "BNET_TERMINATE"; /* Conversation terminated, doing close() */
+ return "BNET_TERMINATE"; /* Conversation terminated, doing close() */
case BNET_POLL:
- return "BNET_POLL"; /* Poll request, I'm hanging on a read */
+ return "BNET_POLL"; /* Poll request, I'm hanging on a read */
case BNET_HEARTBEAT:
- return "BNET_HEARTBEAT"; /* Heartbeat Response requested */
+ return "BNET_HEARTBEAT"; /* Heartbeat Response requested */
case BNET_HB_RESPONSE:
- return "BNET_HB_RESPONSE"; /* Only response permited to HB */
+ return "BNET_HB_RESPONSE"; /* Only response permited to HB */
case BNET_BTIME:
- return "BNET_BTIME"; /* Send UTC btime */
+ return "BNET_BTIME"; /* Send UTC btime */
case BNET_BREAK:
- return "BNET_BREAK"; /* Stop current command -- ctl-c */
+ return "BNET_BREAK"; /* Stop current command -- ctl-c */
case BNET_START_SELECT:
- return "BNET_START_SELECT"; /* Start of a selection list */
+ return "BNET_START_SELECT"; /* Start of a selection list */
case BNET_END_SELECT:
- return "BNET_END_SELECT"; /* End of a select list */
+ return "BNET_END_SELECT"; /* End of a select list */
case BNET_INVALID_CMD:
- return "BNET_INVALID_CMD"; /* Invalid command sent */
+ return "BNET_INVALID_CMD"; /* Invalid command sent */
case BNET_CMD_FAILED:
- return "BNET_CMD_FAILED"; /* Command failed */
+ return "BNET_CMD_FAILED"; /* Command failed */
case BNET_CMD_OK:
- return "BNET_CMD_OK"; /* Command succeeded */
+ return "BNET_CMD_OK"; /* Command succeeded */
case BNET_CMD_BEGIN:
- return "BNET_CMD_BEGIN"; /* Start command execution */
+ return "BNET_CMD_BEGIN"; /* Start command execution */
case BNET_MSGS_PENDING:
- return "BNET_MSGS_PENDING"; /* Messages pending */
+ return "BNET_MSGS_PENDING"; /* Messages pending */
case BNET_MAIN_PROMPT:
- return "BNET_MAIN_PROMPT"; /* Server ready and waiting */
+ return "BNET_MAIN_PROMPT"; /* Server ready and waiting */
case BNET_SELECT_INPUT:
- return "BNET_SELECT_INPUT"; /* Return selection input */
+ return "BNET_SELECT_INPUT"; /* Return selection input */
case BNET_WARNING_MSG:
- return "BNET_WARNING_MSG"; /* Warning message */
+ return "BNET_WARNING_MSG"; /* Warning message */
case BNET_ERROR_MSG:
- return "BNET_ERROR_MSG"; /* Error message -- command failed */
+ return "BNET_ERROR_MSG"; /* Error message -- command failed */
case BNET_INFO_MSG:
- return "BNET_INFO_MSG"; /* Info message -- status line */
+ return "BNET_INFO_MSG"; /* Info message -- status line */
case BNET_RUN_CMD:
- return "BNET_RUN_CMD"; /* Run command follows */
+ return "BNET_RUN_CMD"; /* Run command follows */
case BNET_YESNO:
- return "BNET_YESNO"; /* Request yes no response */
+ return "BNET_YESNO"; /* Request yes no response */
case BNET_START_RTREE:
- return "BNET_START_RTREE"; /* Start restore tree mode */
+ return "BNET_START_RTREE"; /* Start restore tree mode */
case BNET_END_RTREE:
- return "BNET_END_RTREE"; /* End restore tree mode */
+ return "BNET_END_RTREE"; /* End restore tree mode */
case BNET_SUB_PROMPT:
- return "BNET_SUB_PROMPT"; /* Indicate we are at a subprompt */
+ return "BNET_SUB_PROMPT"; /* Indicate we are at a subprompt */
case BNET_TEXT_INPUT:
- return "BNET_TEXT_INPUT"; /* Get text input from user */
+ return "BNET_TEXT_INPUT"; /* Get text input from user */
case BNET_EXT_TERMINATE:
return "BNET_EXT_TERMINATE"; /* A Terminate condition has been met and
- already reported somewhere else */
- case BNET_FDCALLED :
- return "BNET_FDCALLED"; /* The FD should keep the connection for a new job */
+ already reported somewhere else */
+ case BNET_FDCALLED:
+ return "BNET_FDCALLED"; /* The FD should keep the connection for a new job */
default:
bsnprintf(buf, sizeof(buf), _("Unknown sig %d"), (int)msglen);
return buf;
}
}
-/* Initialize internal socket structure.
- * This probably should be done in bsock.c
- */
-BSOCK *init_bsock(JCR *jcr, int sockfd, const char *who,
- const char *host, int port, struct sockaddr *client_addr)
-{
- Dmsg4(100, "socket=%d who=%s host=%s port=%d\n", sockfd, who, host, port);
- BSOCK *bsock = (BSOCK *)malloc(sizeof(BSOCK));
- bmemzero(bsock, sizeof(BSOCK));
- bsock->m_master=bsock; /* don't use set_master() here */
- bsock->m_fd = sockfd;
- bsock->tls = NULL;
- bsock->errors = 0;
- bsock->m_blocking = 1;
- bsock->pout_msg_no = &bsock->out_msg_no;
- bsock->uninstall_send_hook_cb();
- bsock->msg = get_pool_memory(PM_BSOCK);
- bsock->cmsg = get_pool_memory(PM_BSOCK);
- bsock->errmsg = get_pool_memory(PM_MESSAGE);
- bsock->set_who(bstrdup(who));
- bsock->set_host(bstrdup(host));
- bsock->set_port(port);
- bmemzero(&bsock->peer_addr, sizeof(bsock->peer_addr));
- memcpy(&bsock->client_addr, client_addr, sizeof(bsock->client_addr));
- bsock->timeout = BSOCK_TIMEOUT;
- bsock->set_jcr(jcr);
- return bsock;
-}
-
-BSOCK *dup_bsock(BSOCK *osock)
-{
- BSOCK *bsock = (BSOCK *)malloc(sizeof(BSOCK));
- osock->set_locking();
- memcpy(bsock, osock, sizeof(BSOCK));
- bsock->msg = get_pool_memory(PM_BSOCK);
- bsock->cmsg = get_pool_memory(PM_BSOCK);
- bsock->errmsg = get_pool_memory(PM_MESSAGE);
- if (osock->who()) {
- bsock->set_who(bstrdup(osock->who()));
- }
- if (osock->host()) {
- bsock->set_host(bstrdup(osock->host()));
- }
- if (osock->src_addr) {
- bsock->src_addr = New( IPADDR( *(osock->src_addr)) );
- }
- bsock->set_duped();
- bsock->set_master(osock);
- return bsock;
-}
-
int set_socket_errno(int sockstat)
{
#ifdef HAVE_WIN32
/*
* Network Utility Routines
*
+ * The new code inherit common functions from BSOCKCORE class
+ * and implement BSOCK/Bacula specific protocols and data flow.
+ *
* Written by Kern Sibbald
+ *
+ * Major refactoring of BSOCK code written by:
+ *
+ * Radosław Korzeniewski, MMXVIII
+ * radoslaw@korzeniewski.net, radekk@inteos.pl
+ * Inteos Sp. z o.o. http://www.inteos.pl/
+ *
*/
#include "bacula.h"
#include <netdb.h>
#include <netinet/tcp.h>
-#if !defined(ENODATA) /* not defined on BSD systems */
-#define ENODATA EPIPE
-#endif
-
-#if !defined(SOL_TCP) /* Not defined on some systems */
-#define SOL_TCP IPPROTO_TCP
-#endif
-
-#ifdef HAVE_WIN32
-#include <mswsock.h>
-#define socketRead(fd, buf, len) ::recv(fd, buf, len, 0)
-#define socketWrite(fd, buf, len) ::send(fd, buf, len, 0)
-#define socketClose(fd) ::closesocket(fd)
-static void win_close_wait(int fd);
-#ifndef SOCK_CLOEXEC
-#define SOCK_CLOEXEC 0
-#endif
-#else
-#define socketRead(fd, buf, len) ::read(fd, buf, len)
-#define socketWrite(fd, buf, len) ::write(fd, buf, len)
-#define socketClose(fd) ::close(fd)
-#endif
+#define BSOCK_DEBUG_LVL 900
+
+/* Commands sent to Director */
+static char hello[] = "Hello %s calling\n";
+/* Response from Director */
+static char OKhello[] = "1000 OK:";
-/* TODO: We are flooded by tickets about lost console messages. We probably
- * need to store them in a specific place, but waiting for that, we can
- * discard them. See #3615 for example.
- */
-#define isJobMessage(jcr) (jcr && jcr->JobId != 0)
/*
- * make a nice dump of a message
+ * BSOCK default constructor - initializes object.
*/
-void dump_bsock_msg(int sock, uint32_t msgno, const char *what, uint32_t rc, int32_t pktsize, uint32_t flags, POOLMEM *msg, int32_t msglen)
-{
- char buf[54];
- bool is_ascii;
- int dbglvl = DT_ASX;
-
- if (msglen<0) {
- Dmsg4(dbglvl, "%s %d:%d SIGNAL=%s\n", what, sock, msgno, bnet_sig_to_ascii(msglen));
- // data
- smartdump(msg, msglen, buf, sizeof(buf)-9, &is_ascii);
- if (is_ascii) {
- Dmsg5(dbglvl, "%s %d:%d len=%d \"%s\"\n", what, sock, msgno, msglen, buf);
- } else {
- Dmsg5(dbglvl, "%s %d:%d len=%d %s\n", what, sock, msgno, msglen, buf);
- }
- }
-}
-
-
-BSOCKCallback::BSOCKCallback()
-{
-}
-
-BSOCKCallback::~BSOCKCallback()
+BSOCK::BSOCK()
{
-}
+ init();
+};
+/*
+ * BSOCK special constructor initializes object and sets proper socked descriptor.
+ */
+BSOCK::BSOCK(int sockfd):
+ BSOCKCORE(),
+ m_spool_fd(NULL),
+ cmsg(NULL),
+ m_data_end(0),
+ m_last_data_end(0),
+ m_FileIndex(0),
+ m_lastFileIndex(0),
+ m_spool(false),
+ m_compress(false),
+ m_CommBytes(0),
+ m_CommCompressedBytes(0)
+{
+ init();
+ m_terminated = false;
+ m_closed = false;
+ m_fd = sockfd;
+};
/*
- * This is a non-class BSOCK "constructor" because we want to
- * call the Bacula smartalloc routines instead of new.
+ * BSOCK default destructor.
*/
-BSOCK *new_bsock()
+BSOCK::~BSOCK()
{
- BSOCK *bsock = (BSOCK *)malloc(sizeof(BSOCK));
- bsock->init();
- return bsock;
-}
+ Dmsg0(BSOCK_DEBUG_LVL, "BSOCK::~BSOCK()\n");
+ _destroy();
+};
+/*
+ * BSOCK initialization method handles bsock specific variables.
+ */
void BSOCK::init()
{
- memset(this, 0, sizeof(BSOCK));
- m_master = this;
- set_closed();
- set_terminated();
- m_blocking = 1;
- pout_msg_no = &out_msg_no;
- uninstall_send_hook_cb();
- msg = get_pool_memory(PM_BSOCK);
- cmsg = get_pool_memory(PM_BSOCK);
- errmsg = get_pool_memory(PM_MESSAGE);
+ /* the BSOCKCORE::init() is executed in base class constructor */
timeout = BSOCK_TIMEOUT;
-}
-
-void BSOCK::free_tls()
-{
- free_tls_connection(this->tls);
- this->tls = NULL;
+ m_spool_fd = NULL;
+ cmsg = get_pool_memory(PM_BSOCK);
}
/*
- * Try to connect to host for max_retry_time at retry_time intervals.
- * Note, you must have called the constructor prior to calling
- * this routine.
+ * BSOCK private destroy method releases bsock specific variables.
*/
-bool BSOCK::connect(JCR * jcr, int retry_interval, utime_t max_retry_time,
- utime_t heart_beat,
- const char *name, char *host, char *service, int port,
- int verbose)
+void BSOCK::_destroy()
{
- bool ok = false;
- int i;
- int fatal = 0;
- time_t begin_time = time(NULL);
- time_t now;
- btimer_t *tid = NULL;
-
- /* Try to trap out of OS call when time expires */
- if (max_retry_time) {
- tid = start_thread_timer(jcr, pthread_self(), (uint32_t)max_retry_time);
- }
-
- for (i = 0; !open(jcr, name, host, service, port, heart_beat, &fatal);
- i -= retry_interval) {
- berrno be;
- if (fatal || (jcr && job_canceled(jcr))) {
- goto bail_out;
- }
- Dmsg4(50, "Unable to connect to %s on %s:%d. ERR=%s\n",
- name, host, port, be.bstrerror());
- if (i < 0) {
- i = 60 * 5; /* complain again in 5 minutes */
- if (verbose)
- Qmsg4(jcr, M_WARNING, 0, _(
- "Could not connect to %s on %s:%d. ERR=%s\n"
- "Retrying ...\n"), name, host, port, be.bstrerror());
- }
- bmicrosleep(retry_interval, 0);
- now = time(NULL);
- if (begin_time + max_retry_time <= now) {
- Qmsg4(jcr, M_FATAL, 0, _("Unable to connect to %s on %s:%d. ERR=%s\n"),
- name, host, port, be.bstrerror());
- goto bail_out;
- }
- }
- ok = true;
-
-bail_out:
- if (tid) {
- stop_thread_timer(tid);
+ Dmsg0(BSOCK_DEBUG_LVL, "BSOCK::_destroy()\n");
+ if (cmsg) {
+ free_pool_memory(cmsg);
+ cmsg = NULL;
}
- return ok;
-}
+};
/*
- * Finish initialization of the packet structure.
+ * Authenticate Director
*/
-void BSOCK::fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port,
- struct sockaddr *lclient_addr)
+bool BSOCK::authenticate_director(const char *name, const char *password,
+ TLS_CONTEXT *tls_ctx, char *errmsg, int errmsg_len)
{
- Dmsg3(100, "who=%s host=%s port=%d\n", who, host, port);
- m_fd = sockfd;
- if (m_who) {
- free(m_who);
- }
- if (m_host) {
- free(m_host);
- }
- set_who(bstrdup(who));
- set_host(bstrdup(host));
- set_port(port);
- memcpy(&client_addr, lclient_addr, sizeof(client_addr));
- set_jcr(jcr);
-}
+ int tls_local_need = BNET_TLS_NONE;
+ int tls_remote_need = BNET_TLS_NONE;
+ int compatible = true;
+ char bashed_name[MAX_NAME_LENGTH];
+ BSOCK *dir = this; /* for readability */
-/*
- * Copy the address from the configuration dlist that gets passed in
- */
-void BSOCK::set_source_address(dlist *src_addr_list)
-{
- IPADDR *addr = NULL;
+ *errmsg = 0;
+ /*
+ * Send my name to the Director then do authentication
+ */
- // delete the object we already have, if it's allocated
- if (src_addr) {
- free( src_addr);
- src_addr = NULL;
- }
+ /* Timeout Hello after 15 secs */
+ dir->start_timer(15);
+ dir->fsend(hello, bashed_name);
- if (src_addr_list) {
- addr = (IPADDR*) src_addr_list->first();
- src_addr = New( IPADDR(*addr));
+ if (get_tls_enable(tls_ctx)) {
+ tls_local_need = get_tls_enable(tls_ctx) ? BNET_TLS_REQUIRED : BNET_TLS_OK;
}
-}
-/*
- * Open a TCP connection to the server
- * Returns NULL
- * Returns BSOCK * pointer on success
- */
-bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service,
- int port, utime_t heart_beat, int *fatal)
-{
- int sockfd = -1;
- dlist *addr_list;
- IPADDR *ipaddr;
- bool connected = false;
- int turnon = 1;
- const char *errstr;
- int save_errno = 0;
+ /* respond to Dir challenge */
+ if (!cram_md5_respond(dir, password, &tls_remote_need, &compatible) ||
+ /* Now challenge dir */
+ !cram_md5_challenge(dir, password, tls_local_need, compatible)) {
+ bsnprintf(errmsg, errmsg_len, _("Director authorization error at \"%s:%d\"\n"),
+ dir->host(), dir->port());
+ goto bail_out;
+ }
- /*
- * Fill in the structure serv_addr with the address of
- * the server that we want to connect with.
- */
- if ((addr_list = bnet_host2ipaddrs(host, 0, &errstr)) == NULL) {
- /* Note errstr is not malloc'ed */
- Qmsg2(jcr, M_ERROR, 0, _("gethostbyname() for host \"%s\" failed: ERR=%s\n"),
- host, errstr);
- Dmsg2(100, "bnet_host2ipaddrs() for host %s failed: ERR=%s\n",
- host, errstr);
- *fatal = 1;
- return false;
+ /* Verify that the remote host is willing to meet our TLS requirements */
+ if (tls_remote_need < tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
+ bsnprintf(errmsg, errmsg_len, _("Authorization error:"
+ " Remote server at \"%s:%d\" did not advertise required TLS support.\n"),
+ dir->host(), dir->port());
+ goto bail_out;
}
- remove_duplicate_addresses(addr_list);
- foreach_dlist(ipaddr, addr_list) {
- ipaddr->set_port_net(htons(port));
- char allbuf[256 * 10];
- char curbuf[256];
- Dmsg2(100, "Current %sAll %s\n",
- ipaddr->build_address_str(curbuf, sizeof(curbuf)),
- build_addresses_str(addr_list, allbuf, sizeof(allbuf)));
- /* Open a TCP socket */
- if ((sockfd = socket(ipaddr->get_family(), SOCK_STREAM|SOCK_CLOEXEC, 0)) < 0) {
- berrno be;
- save_errno = errno;
- switch (errno) {
-#ifdef EAFNOSUPPORT
- case EAFNOSUPPORT:
- /*
- * The name lookup of the host returned an address in a protocol family
- * we don't support. Suppress the error and try the next address.
- */
- break;
-#endif
-#ifdef EPROTONOSUPPORT
- /* See above comments */
- case EPROTONOSUPPORT:
- break;
-#endif
-#ifdef EPROTOTYPE
- /* See above comments */
- case EPROTOTYPE:
- break;
-#endif
- default:
- *fatal = 1;
- Qmsg3(jcr, M_ERROR, 0, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
- ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
- Pmsg3(300, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
- ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
- break;
- }
- continue;
- }
+ /* Verify that we are willing to meet the remote host's requirements */
+ if (tls_remote_need > tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
+ bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\":"
+ " Remote server requires TLS.\n"),
+ dir->host(), dir->port());
- /* Bind to the source address if it is set */
- if (src_addr) {
- if (bind(sockfd, src_addr->get_sockaddr(), src_addr->get_sockaddr_len()) < 0) {
- berrno be;
- save_errno = errno;
- *fatal = 1;
- Qmsg2(jcr, M_ERROR, 0, _("Source address bind error. proto=%d. ERR=%s\n"),
- src_addr->get_family(), be.bstrerror() );
- Pmsg2(000, _("Source address bind error. proto=%d. ERR=%s\n"),
- src_addr->get_family(), be.bstrerror() );
- if (sockfd >= 0) socketClose(sockfd);
- continue;
- }
- }
+ goto bail_out;
+ }
- /*
- * Keep socket from timing out from inactivity
- */
- if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
- berrno be;
- Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
- be.bstrerror());
- }
-#if defined(TCP_KEEPIDLE)
- if (heart_beat) {
- int opt = heart_beat;
- if (setsockopt(sockfd, SOL_TCP, TCP_KEEPIDLE, (sockopt_val_t)&opt, sizeof(opt)) < 0) {
- berrno be;
- Qmsg1(jcr, M_WARNING, 0, _("Cannot set TCP_KEEPIDLE on socket: %s\n"),
- be.bstrerror());
+ /* Is TLS Enabled? */
+ if (have_tls) {
+ if (tls_local_need >= BNET_TLS_OK && tls_remote_need >= BNET_TLS_OK) {
+ /* Engage TLS! Full Speed Ahead! */
+ if (!bnet_tls_client(tls_ctx, dir, NULL)) {
+ bsnprintf(errmsg, errmsg_len, _("TLS negotiation failed with Director at \"%s:%d\"\n"),
+ dir->host(), dir->port());
+ goto bail_out;
}
}
-#endif
-
- /* connect to server */
- if (::connect(sockfd, ipaddr->get_sockaddr(), ipaddr->get_sockaddr_len()) < 0) {
- save_errno = errno;
- if (sockfd >= 0) socketClose(sockfd);
- continue;
- }
- *fatal = 0;
- connected = true;
- break;
}
- if (!connected) {
- berrno be;
- free_addresses(addr_list);
- errno = save_errno | b_errno_win32;
- Dmsg4(50, "Could not connect to server %s %s:%d. ERR=%s\n",
- name, host, port, be.bstrerror());
+ Dmsg1(6, ">dird: %s", dir->msg);
+ if (dir->recv() <= 0) {
+ dir->stop_timer();
+ bsnprintf(errmsg, errmsg_len, _("Bad errmsg to Hello command: ERR=%s\n"
+ "The Director at \"%s:%d\" may not be running.\n"),
+ dir->bstrerror(), dir->host(), dir->port());
return false;
}
- /*
- * Keep socket from timing out from inactivity
- * Do this a second time out of paranoia
- */
- if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
- berrno be;
- Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
- be.bstrerror());
- }
- fin_init(jcr, sockfd, name, host, port, ipaddr->get_sockaddr());
- free_addresses(addr_list);
-
- /* Clean the packet a bit */
- m_closed = false;
- m_duped = false;
- m_spool = false;
- m_use_locking = false;
- m_timed_out = false;
- m_terminated = false;
- m_suppress_error_msgs = false;
- errors = 0;
- m_blocking = 0;
-
- Dmsg3(50, "OK connected to server %s %s:%d.\n",
- name, host, port);
+ dir->stop_timer();
+ Dmsg1(10, "<dird: %s", dir->msg);
+ if (strncmp(dir->msg, OKhello, sizeof(OKhello)-1) != 0) {
+ bsnprintf(errmsg, errmsg_len, _("Director at \"%s:%d\" rejected Hello command\n"),
+ dir->host(), dir->port());
+ return false;
+ } else {
+ bsnprintf(errmsg, errmsg_len, "%s", dir->msg);
+ }
return true;
+
+bail_out:
+ dir->stop_timer();
+ bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\"\n"
+ "Most likely the passwords do not agree.\n"
+ "If you are using TLS, there may have been a certificate validation error during the TLS handshake.\n"
+ "For help, please see: " MANUAL_AUTH_URL "\n"),
+ dir->host(), dir->port());
+ return false;
}
/*
- * Force read/write to use locking
+ * Send a message over the network. Everything is sent in one
+ * write request, but depending on the mode you are using
+ * there will be either two or three read requests done.
+ * Read 1: 32 bits, gets essentially the packet length, but may
+ * have the upper bits set to indicate compression or
+ * an extended header packet.
+ * Read 2: 32 bits, this read is done only of BNET_HDR_EXTEND is set.
+ * In this case the top 16 bits of this 32 bit word are reserved
+ * for flags and the lower 16 bits for data. This word will be
+ * stored in the field "flags" in the BSOCKCORE packet.
+ * Read 2 or 3: depending on if Read 2 is done. This is the data.
+ *
+ * For normal comm line compression, the whole data packet is compressed
+ * but not the msglen (first read).
+ * To do data compression rather than full comm line compression, prior to
+ * call send(flags) where the lower 32 bits is the offset to the data to
+ * be compressed. The top 32 bits are reserved for flags that can be
+ * set. The are:
+ * BNET_IS_CMD We are sending a command
+ * BNET_OFFSET An offset is specified (this implies data compression)
+ * BNET_NOCOMPRESS Inhibit normal comm line compression
+ * BNET_DATACOMPRESSED The data using the specified offset was
+ * compressed, and normal comm line compression will
+ * not be done.
+ * If any of the above bits are set, then BNET_HDR_EXTEND will be set
+ * in the top bits of msglen, and the full set of flags + the offset
+ * will be passed as a 32 bit word just after the msglen, and then
+ * followed by any data that is either compressed or not.
+ *
+ * Note, neither comm line nor data compression is not
+ * guaranteed since it may result in more data, in which case, the
+ * record is sent uncompressed and there will be no offset.
+ * On the receive side, if BNET_OFFSET is set, then the data is compressed.
+ *
+ * Returns: false on failure
+ * true on success
*/
-bool BSOCK::set_locking()
+bool BSOCK::send(int aflags)
{
- int stat;
- if (m_use_locking) {
- return true; /* already set */
- }
- pm_rmutex = &m_rmutex;
- pm_wmutex = &m_wmutex;
- if ((stat = pthread_mutex_init(pm_rmutex, NULL)) != 0) {
- berrno be;
- Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock read mutex. ERR=%s\n"),
- be.bstrerror(stat));
+ int32_t rc;
+ int32_t pktsiz;
+ int32_t *hdrptr;
+ int offset;
+ int hdrsiz;
+ bool ok = true;
+ int32_t save_msglen;
+ POOLMEM *save_msg;
+ bool compressed;
+ bool locked = false;
+
+ if (is_closed()) {
+ if (!m_suppress_error_msgs) {
+ Qmsg0(m_jcr, M_ERROR, 0, _("Socket is closed\n"));
+ }
return false;
}
- if ((stat = pthread_mutex_init(pm_wmutex, NULL)) != 0) {
- berrno be;
- Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock write mutex. ERR=%s\n"),
- be.bstrerror(stat));
+ if (errors) {
+ if (!m_suppress_error_msgs) {
+ Qmsg4(m_jcr, M_ERROR, 0, _("Socket has errors=%d on call to %s:%s:%d\n"),
+ errors, m_who, m_host, m_port);
+ }
return false;
}
- if ((stat = pthread_mutex_init(&m_mmutex, NULL)) != 0) {
- berrno be;
- Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsock attribute mutex. ERR=%s\n"),
- be.bstrerror(stat));
+ if (is_terminated()) {
+ if (!m_suppress_error_msgs) {
+ Qmsg4(m_jcr, M_ERROR, 0, _("Bsock send while terminated=%d on call to %s:%s:%d\n"),
+ is_terminated(), m_who, m_host, m_port);
+ }
return false;
}
- m_use_locking = true;
- return true;
-}
-void BSOCK::clear_locking()
-{
- if (!m_use_locking || m_duped) {
- return;
- }
- m_use_locking = false;
- pthread_mutex_destroy(pm_rmutex);
- pthread_mutex_destroy(pm_wmutex);
- pthread_mutex_destroy(&m_mmutex);
- pm_rmutex = NULL;
- pm_wmutex = NULL;
- return;
-}
-
-/*
- * Do comm line compression (LZ4) of a bsock message.
- * Returns: true if the compression was done
- * false if no compression was done
- * The "offset" defines where to start compressing the message. This
- * allows passing "header" information uncompressed and the actual
- * data part compressed.
- *
- * Note, we don't compress lines less than 20 characters because we
- * want to save at least 10 characters otherwise compression doesn't
- * help enough to warrant doing the decompression.
- */
-bool BSOCK::comm_compress()
-{
- bool compress = false;
- bool compressed = false;
- int offset = m_flags & 0xFF;
-
- /*
- * Enable compress if allowed and not spooling and the
- * message is long enough (>20) to get some reasonable savings.
- */
- if (msglen > 20) {
- compress = can_compress() && !is_spooling();
- }
- m_CommBytes += msglen; /* uncompressed bytes */
- Dmsg4(DT_NETWORK|200, "can_compress=%d compress=%d CommBytes=%lld CommCompresedBytes=%lld\n",
- can_compress(), compress, m_CommBytes, m_CommCompressedBytes);
- if (compress) {
- int clen;
- int need_size;
-
- ASSERT2(offset <= msglen, "Comm offset bigger than message\n");
- ASSERT2(offset < 255, "Offset greater than 254\n");
- need_size = LZ4_compressBound(msglen);
- if (need_size >= ((int32_t)sizeof_pool_memory(cmsg))) {
- cmsg = realloc_pool_memory(cmsg, need_size + 100);
- }
- msglen -= offset;
- msg += offset;
- cmsg += offset;
- clen = LZ4_compress_default(msg, cmsg, msglen, msglen);
- //Dmsg2(000, "clen=%d msglen=%d\n", clen, msglen);
- /* Compression should save at least 10 characters */
- if (clen > 0 && clen + 10 <= msglen) {
-
-#ifdef xxx_debug
- /* Debug code -- decompress and compare */
- int blen, rlen, olen;
- olen = msglen;
- POOLMEM *rmsg = get_pool_memory(PM_BSOCK);
- blen = sizeof_pool_memory(msg) * 2;
- if (blen >= sizeof_pool_memory(rmsg)) {
- rmsg = realloc_pool_memory(rmsg, blen);
- }
- rlen = LZ4_decompress_safe(cmsg, rmsg, clen, blen);
- //Dmsg4(000, "blen=%d clen=%d olen=%d rlen=%d\n", blen, clen, olen, rlen);
- ASSERT(olen == rlen);
- ASSERT(memcmp(msg, rmsg, olen) == 0);
- free_pool_memory(rmsg);
- /* end Debug code */
-#endif
-
- msg = cmsg;
- msglen = clen;
- compressed = true;
- }
- msglen += offset;
- msg -= offset;
- cmsg -= offset;
- }
- m_CommCompressedBytes += msglen;
- return compressed;
-}
-
-
-/*
- * Send a message over the network. Everything is sent in one
- * write request, but depending on the mode you are using
- * there will be either two or three read requests done.
- * Read 1: 32 bits, gets essentially the packet length, but may
- * have the upper bits set to indicate compression or
- * an extended header packet.
- * Read 2: 32 bits, this read is done only of BNET_HDR_EXTEND is set.
- * In this case the top 16 bits of this 32 bit word are reserved
- * for flags and the lower 16 bits for data. This word will be
- * stored in the field "flags" in the BSOCK packet.
- * Read 2 or 3: depending on if Read 2 is done. This is the data.
- *
- * For normal comm line compression, the whole data packet is compressed
- * but not the msglen (first read).
- * To do data compression rather than full comm line compression, prior to
- * call send(flags) where the lower 32 bits is the offset to the data to
- * be compressed. The top 32 bits are reserved for flags that can be
- * set. The are:
- * BNET_IS_CMD We are sending a command
- * BNET_OFFSET An offset is specified (this implies data compression)
- * BNET_NOCOMPRESS Inhibit normal comm line compression
- * BNET_DATACOMPRESSED The data using the specified offset was
- * compressed, and normal comm line compression will
- * not be done.
- * If any of the above bits are set, then BNET_HDR_EXTEND will be set
- * in the top bits of msglen, and the full set of flags + the offset
- * will be passed as a 32 bit word just after the msglen, and then
- * followed by any data that is either compressed or not.
- *
- * Note, neither comm line nor data compression is not
- * guaranteed since it may result in more data, in which case, the
- * record is sent uncompressed and there will be no offset.
- * On the receive side, if BNET_OFFSET is set, then the data is compressed.
- *
- * Returns: false on failure
- * true on success
- */
-bool BSOCK::send(int aflags)
-{
- int32_t rc;
- int32_t pktsiz;
- int32_t *hdrptr;
- int offset;
- int hdrsiz;
- bool ok = true;
- int32_t save_msglen;
- POOLMEM *save_msg;
- bool compressed;
- bool locked = false;
-
- if (is_closed()) {
- if (!m_suppress_error_msgs) {
- /* TODO: We probably need to store them in a specific place see mantis #3615 */
- if (isJobMessage(m_jcr)) {
- Qmsg0(m_jcr, M_ERROR, 0, _("Socket is closed\n"));
- } else {
- Dmsg0(100, "Socket is closed\n");
- }
- }
- return false;
- }
- if (errors) {
- if (!m_suppress_error_msgs) {
- /* TODO: We probably need to store them in a specific place */
- if (isJobMessage(m_jcr)) {
- Qmsg4(m_jcr, M_ERROR, 0, _("Socket has errors=%d on call to %s:%s:%d\n"),
- errors, m_who, m_host, m_port);
- } else {
- Dmsg4(100, "Socket has errors=%d on call to %s:%s:%d\n",
- errors, m_who, m_host, m_port);
- }
- }
- return false;
- }
- if (is_terminated()) {
- if (!m_suppress_error_msgs) {
- Qmsg4(m_jcr, M_ERROR, 0, _("Bsock send while terminated=%d on call to %s:%s:%d\n"),
- is_terminated(), m_who, m_host, m_port);
- }
- return false;
- }
-
- if (msglen > 4000000) {
- if (!m_suppress_error_msgs) {
- Qmsg4(m_jcr, M_ERROR, 0,
- _("Socket has insane msglen=%d on call to %s:%s:%d\n"),
- msglen, m_who, m_host, m_port);
- }
- return false;
+ if (msglen > 4000000) {
+ if (!m_suppress_error_msgs) {
+ Qmsg4(m_jcr, M_ERROR, 0,
+ _("Socket has insane msglen=%d on call to %s:%s:%d\n"),
+ msglen, m_who, m_host, m_port);
+ }
+ return false;
}
if (send_hook_cb) {
timer_start = watchdog_time; /* start timer */
clear_timed_out();
/* Full I/O done in one write */
- rc = write_nbytes(this, (char *)hdrptr, pktsiz);
+ rc = write_nbytes((char *)hdrptr, pktsiz);
if (chk_dbglvl(DT_NETWORK|1900)) dump_bsock_msg(m_fd, *pout_msg_no, "SEND", rc, msglen, m_flags, save_msg, save_msglen);
timer_start = 0; /* clear timer */
if (rc != pktsiz) {
}
if (rc < 0) {
if (!m_suppress_error_msgs) {
- /* TODO: We probably need to store them in a specific place see Mantis #3615 */
- if (isJobMessage(m_jcr)) {
- Qmsg5(m_jcr, M_ERROR, 0,
- _("Write error sending %d bytes to %s:%s:%d: ERR=%s\n"),
- pktsiz, m_who,
- m_host, m_port, this->bstrerror());
- } else {
- Dmsg5(100, "Write error sending %d bytes to %s:%s:%d: ERR=%s\n",
- pktsiz, m_who,
- m_host, m_port, this->bstrerror());
- }
+ Qmsg5(m_jcr, M_ERROR, 0,
+ _("Write error sending %d bytes to %s:%s:%d: ERR=%s\n"),
+ pktsiz, m_who,
+ m_host, m_port, this->bstrerror());
}
} else {
Qmsg5(m_jcr, M_ERROR, 0,
return ok;
}
-/*
- * Format and send a message
- * Returns: false on error
- * true on success
- */
-bool BSOCK::fsend(const char *fmt, ...)
-{
- va_list arg_ptr;
- int maxlen;
-
- if (is_null(this)) {
- return false; /* do not seg fault */
- }
- if (errors || is_terminated() || is_closed()) {
- return false;
- }
- /* This probably won't work, but we vsnprintf, then if we
- * get a negative length or a length greater than our buffer
- * (depending on which library is used), the printf was truncated, so
- * get a bigger buffer and try again.
- */
- for (;;) {
- maxlen = sizeof_pool_memory(msg) - 1;
- va_start(arg_ptr, fmt);
- msglen = bvsnprintf(msg, maxlen, fmt, arg_ptr);
- va_end(arg_ptr);
- if (msglen >= 0 && msglen < (maxlen - 5)) {
- break;
- }
- msg = realloc_pool_memory(msg, maxlen + maxlen / 2);
- }
- return send();
-}
-
/*
* Receive a message from the other end. Each message consists of
* two packets. The first is a header that contains the size
timer_start = watchdog_time; /* set start wait time */
clear_timed_out();
/* get data size -- in int32_t */
- if ((nbytes = read_nbytes(this, (char *)&pktsiz, sizeof(int32_t))) <= 0) {
+ if ((nbytes = read_nbytes((char *)&pktsiz, sizeof(int32_t))) <= 0) {
timer_start = 0; /* clear timer */
/* probably pipe broken because client died */
if (errno == 0) {
if (pktsiz > 0 && (pktsiz & BNET_HDR_EXTEND)) {
timer_start = watchdog_time; /* set start wait time */
clear_timed_out();
- if ((nbytes = read_nbytes(this, (char *)&m_flags, sizeof(int32_t))) <= 0) {
+ if ((nbytes = read_nbytes((char *)&m_flags, sizeof(int32_t))) <= 0) {
timer_start = 0; /* clear timer */
/* probably pipe broken because client died */
if (errno == 0) {
timer_start = watchdog_time; /* set start wait time */
clear_timed_out();
/* now read the actual data */
- if ((nbytes = read_nbytes(this, msg, pktsiz)) <= 0) {
+ if ((nbytes = read_nbytes(msg, pktsiz)) <= 0) {
timer_start = 0; /* clear timer */
if (errno == 0) {
b_errno = ENODATA;
}
/*
- * Return the string for the error that occurred
- * on the socket. Only the first error is retained.
+ * Open a TCP connection to the server
+ * Returns NULL
+ * Returns BSOCKCORE * pointer on success
*/
-const char *BSOCK::bstrerror()
-{
- berrno be;
- if (errmsg == NULL) {
- errmsg = get_pool_memory(PM_MESSAGE);
- }
- if (b_errno == 0) {
- pm_strcpy(errmsg, "I/O Error");
- } else {
- pm_strcpy(errmsg, be.bstrerror(b_errno));
- }
- return errmsg;
-}
-
-int BSOCK::get_peer(char *buf, socklen_t buflen)
+bool BSOCK::open(JCR *jcr, const char *name, char *host, char *service,
+ int port, utime_t heart_beat, int *fatal)
{
-#if !defined(HAVE_WIN32)
- if (peer_addr.sin_family == 0) {
- socklen_t salen = sizeof(peer_addr);
- int rval = (getpeername)(m_fd, (struct sockaddr *)&peer_addr, &salen);
- if (rval < 0) return rval;
- }
- if (!inet_ntop(peer_addr.sin_family, &peer_addr.sin_addr, buf, buflen))
- return -1;
-
- return 0;
-#else
- return -1;
-#endif
-}
+ bool status = BSOCKCORE::open(jcr, name, host, service, port, heart_beat, fatal);
+ m_spool = false;
+ return status;
+};
/*
- * Set the network buffer size, suggested size is in size.
- * Actual size obtained is returned in bs->msglen
+ * Do comm line compression (LZ4) of a bsock message.
+ * Returns: true if the compression was done
+ * false if no compression was done
+ * The "offset" defines where to start compressing the message. This
+ * allows passing "header" information uncompressed and the actual
+ * data part compressed.
*
- * Returns: false on failure
- * true on success
+ * Note, we don't compress lines less than 20 characters because we
+ * want to save at least 10 characters otherwise compression doesn't
+ * help enough to warrant doing the decompression.
*/
-bool BSOCK::set_buffer_size(uint32_t size, int rw)
+bool BSOCK::comm_compress()
{
- uint32_t dbuf_size, start_size;
-
-#if defined(IP_TOS) && defined(IPTOS_THROUGHPUT)
- int opt;
- opt = IPTOS_THROUGHPUT;
- setsockopt(m_fd, IPPROTO_IP, IP_TOS, (sockopt_val_t)&opt, sizeof(opt));
-#endif
-
- if (size != 0) {
- dbuf_size = size;
- } else {
- dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
- }
- start_size = dbuf_size;
- if ((msg = realloc_pool_memory(msg, dbuf_size + 100)) == NULL) {
- Qmsg0(get_jcr(), M_FATAL, 0, _("Could not malloc BSOCK data buffer\n"));
- return false;
- }
+ bool compress = false;
+ bool compressed = false;
+ int offset = m_flags & 0xFF;
/*
- * If user has not set the size, use the OS default -- i.e. do not
- * try to set it. This allows sys admins to set the size they
- * want in the OS, and Bacula will comply. See bug #1493
+ * Enable compress if allowed and not spooling and the
+ * message is long enough (>20) to get some reasonable savings.
*/
- if (size == 0) {
- msglen = dbuf_size;
- return true;
+ if (msglen > 20) {
+ compress = can_compress() && !is_spooling();
}
+ m_CommBytes += msglen; /* uncompressed bytes */
+ Dmsg4(DT_NETWORK|200, "can_compress=%d compress=%d CommBytes=%lld CommCompresedBytes=%lld\n",
+ can_compress(), compress, m_CommBytes, m_CommCompressedBytes);
+ if (compress) {
+ int clen;
+ int need_size;
- if (rw & BNET_SETBUF_READ) {
- while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
- SO_RCVBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
- berrno be;
- Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
- dbuf_size -= TAPE_BSIZE;
- }
- Dmsg1(200, "set network buffer size=%d\n", dbuf_size);
- if (dbuf_size != start_size) {
- Qmsg1(get_jcr(), M_WARNING, 0,
- _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
- }
- }
- if (size != 0) {
- dbuf_size = size;
- } else {
- dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
- }
- start_size = dbuf_size;
- if (rw & BNET_SETBUF_WRITE) {
- while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
- SO_SNDBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
- berrno be;
- Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
- dbuf_size -= TAPE_BSIZE;
- }
- Dmsg1(900, "set network buffer size=%d\n", dbuf_size);
- if (dbuf_size != start_size) {
- Qmsg1(get_jcr(), M_WARNING, 0,
- _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
+ ASSERT2(offset <= msglen, "Comm offset bigger than message\n");
+ ASSERT2(offset < 255, "Offset greater than 254\n");
+ need_size = LZ4_compressBound(msglen);
+ if (need_size >= ((int32_t)sizeof_pool_memory(cmsg))) {
+ cmsg = realloc_pool_memory(cmsg, need_size + 100);
}
- }
-
- msglen = dbuf_size;
- return true;
-}
-
-/*
- * Set socket non-blocking
- * Returns previous socket flag
- */
-int BSOCK::set_nonblocking()
-{
-#ifndef HAVE_WIN32
- int oflags;
-
- /* Get current flags */
- if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
- berrno be;
- Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
- }
-
- /* Set O_NONBLOCK flag */
- if ((fcntl(m_fd, F_SETFL, oflags|O_NONBLOCK)) < 0) {
- berrno be;
- Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
- }
-
- m_blocking = 0;
- return oflags;
-#else
- int flags;
- u_long ioctlArg = 1;
-
- flags = m_blocking;
- ioctlsocket(m_fd, FIONBIO, &ioctlArg);
- m_blocking = 0;
-
- return flags;
-#endif
-}
-
-/*
- * Set socket blocking
- * Returns previous socket flags
- */
-int BSOCK::set_blocking()
-{
-#ifndef HAVE_WIN32
- int oflags;
- /* Get current flags */
- if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
- berrno be;
- Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
- }
-
- /* Set O_NONBLOCK flag */
- if ((fcntl(m_fd, F_SETFL, oflags & ~O_NONBLOCK)) < 0) {
- berrno be;
- Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
- }
-
- m_blocking = 1;
- return oflags;
-#else
- int flags;
- u_long ioctlArg = 0;
-
- flags = m_blocking;
- ioctlsocket(m_fd, FIONBIO, &ioctlArg);
- m_blocking = 1;
-
- return flags;
-#endif
-}
-
-void BSOCK::set_killable(bool killable)
-{
- if (m_jcr) {
- m_jcr->set_killable(killable);
- }
-}
-
-/*
- * Restores socket flags
- */
-void BSOCK::restore_blocking (int flags)
-{
-#ifndef HAVE_WIN32
- if ((fcntl(m_fd, F_SETFL, flags)) < 0) {
- berrno be;
- Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
- }
-
- m_blocking = (flags & O_NONBLOCK) ? true : false;
-#else
- u_long ioctlArg = flags;
-
- ioctlsocket(m_fd, FIONBIO, &ioctlArg);
- m_blocking = 1;
-#endif
-}
+ msglen -= offset;
+ msg += offset;
+ cmsg += offset;
+ clen = LZ4_compress_default(msg, cmsg, msglen, msglen);
+ //Dmsg2(000, "clen=%d msglen=%d\n", clen, msglen);
+ /* Compression should save at least 10 characters */
+ if (clen > 0 && clen + 10 <= msglen) {
-/*
- * Wait for a specified time for data to appear on
- * the BSOCK connection.
- *
- * Returns: 1 if data available
- * 0 if timeout
- * -1 if error
- */
-int BSOCK::wait_data(int sec, int msec)
-{
- for (;;) {
- switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
- case 0: /* timeout */
- b_errno = 0;
- return 0;
- case -1:
- b_errno = errno;
- if (errno == EINTR) {
- continue;
- }
- return -1; /* error return */
- default:
- b_errno = 0;
-#ifdef HAVE_TLS
- if (this->tls && !tls_bsock_probe(this)) {
- continue; /* false alarm, maybe a session key negotiation in progress on the socket */
+#ifdef xxx_debug
+ /* Debug code -- decompress and compare */
+ int blen, rlen, olen;
+ olen = msglen;
+ POOLMEM *rmsg = get_pool_memory(PM_BSOCK);
+ blen = sizeof_pool_memory(msg) * 2;
+ if (blen >= sizeof_pool_memory(rmsg)) {
+ rmsg = realloc_pool_memory(rmsg, blen);
}
-#endif
- return 1;
- }
- }
-}
-
-/*
- * As above, but returns on interrupt
- */
-int BSOCK::wait_data_intr(int sec, int msec)
-{
- switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
- case 0: /* timeout */
- b_errno = 0;
- return 0;
- case -1:
- b_errno = errno;
- return -1; /* error return */
- default:
- b_errno = 0;
-#ifdef HAVE_TLS
- if (this->tls && !tls_bsock_probe(this)) {
- /* maybe a session key negotiation waked up the socket */
- return 0;
- }
-#endif
- break;
- }
- return 1;
-}
-
-/*
- * This routine closes the current BSOCK.
- * It does not delete the socket packet
- * resources, which are released int
- * bsock->destroy().
- */
-#ifndef SHUT_RDWR
-#define SHUT_RDWR 2
+ rlen = LZ4_decompress_safe(cmsg, rmsg, clen, blen);
+ //Dmsg4(000, "blen=%d clen=%d olen=%d rlen=%d\n", blen, clen, olen, rlen);
+ ASSERT(olen == rlen);
+ ASSERT(memcmp(msg, rmsg, olen) == 0);
+ free_pool_memory(rmsg);
+ /* end Debug code */
#endif
-/*
- * The JCR is canceled, set terminate for chained BSOCKs starting from master
- */
-void BSOCK::cancel()
-{
- master_lock();
- for (BSOCK *next = m_master; next != NULL; next = next->m_next) {
- if (!next->m_closed) {
- next->m_terminated = true;
- next->m_timed_out = true;
+ msg = cmsg;
+ msglen = clen;
+ compressed = true;
}
+ msglen += offset;
+ msg -= offset;
+ cmsg -= offset;
}
- master_unlock();
+ m_CommCompressedBytes += msglen;
+ return compressed;
}
/*
* Note, this routine closes the socket, but leaves the
* bsock memory in place.
* every thread is responsible of closing and destroying its own duped or not
- * duped BSOCK
+ * duped BSOCKCORE
*/
void BSOCK::close()
{
- BSOCK *bsock = this;
-
- if (bsock->is_closed()) {
- return;
- }
- if (!m_duped) {
- clear_locking();
- }
- bsock->set_closed();
- bsock->set_terminated();
- if (!bsock->m_duped) {
- /* Shutdown tls cleanly. */
- if (bsock->tls) {
- tls_bsock_shutdown(bsock);
- free_tls_connection(bsock->tls);
- bsock->tls = NULL;
- }
-
-#ifdef HAVE_WIN32
- if (!bsock->is_timed_out()) {
- win_close_wait(bsock->m_fd); /* Ensure that data is not discarded */
- }
-#else
- if (bsock->is_timed_out()) {
- shutdown(bsock->m_fd, SHUT_RDWR); /* discard any pending I/O */
- }
-#endif
- /* On Windows this discards data if we did not do a close_wait() */
- socketClose(bsock->m_fd); /* normal close */
- }
+ Dmsg0(BSOCK_DEBUG_LVL, "BSOCK::close()\n");
+ BSOCKCORE::close();
return;
}
/*
- * Destroy the socket (i.e. release all resources)
+ * Write nbytes to the network.
+ * It may require several writes.
*/
-void BSOCK::_destroy()
+
+int32_t BSOCK::write_nbytes(char *ptr, int32_t nbytes)
{
- this->close(); /* Ensure that socket is closed */
+ int32_t nwritten;
- if (msg) {
- free_pool_memory(msg);
- msg = NULL;
- } else {
- ASSERT2(1 == 0, "Two calls to destroy socket"); /* double destroy */
- }
- if (cmsg) {
- free_pool_memory(cmsg);
- cmsg = NULL;
- }
- if (errmsg) {
- free_pool_memory(errmsg);
- errmsg = NULL;
- }
- if (m_who) {
- free(m_who);
- m_who = NULL;
- }
- if (m_host) {
- free(m_host);
- m_host = NULL;
- }
- if (src_addr) {
- free(src_addr);
- src_addr = NULL;
+ if (is_spooling()) {
+ nwritten = fwrite(ptr, 1, nbytes, m_spool_fd);
+ if (nwritten != nbytes) {
+ berrno be;
+ b_errno = errno;
+ Qmsg3(jcr(), M_FATAL, 0, _("Attr spool write error. wrote=%d wanted=%d bytes. ERR=%s\n"),
+ nbytes, nwritten, be.bstrerror());
+ Dmsg2(400, "nwritten=%d nbytes=%d.\n", nwritten, nbytes);
+ errno = b_errno;
+ return -1;
+ }
+ return nbytes;
}
- free(this);
+
+ /* reuse base code */
+ return BSOCKCORE::write_nbytes(ptr, nbytes);
}
/*
- * Destroy the socket (i.e. release all resources)
- * including duped sockets.
- * should not be called from duped BSOCK
+ * This is a non-class BSOCK "constructor" because we want to
+ * call the Bacula smartalloc routines instead of new.
*/
-void BSOCK::destroy()
+BSOCK *new_bsock()
{
- ASSERTD(reinterpret_cast<uintptr_t>(m_next) != 0xaaaaaaaaaaaaaaaa, "BSOCK::destroy() already called\n")
- ASSERTD(this == m_master, "BSOCK::destroy() called by a non master BSOCK\n")
- ASSERTD(!m_duped, "BSOCK::destroy() called by a duped BSOCK\n")
- /* I'm the master I must destroy() all the duped BSOCKs */
- master_lock();
- BSOCK *ahead;
- for (BSOCK *next = m_next; next != NULL; next = ahead) {
- ahead = next->m_next;
- next->_destroy();
- }
- master_unlock();
- _destroy();
+ BSOCK *bsock = New(BSOCK);
+ return bsock;
}
-/* Commands sent to Director */
-static char hello[] = "Hello %s calling\n";
-/* Response from Director */
-static char OKhello[] = "1000 OK:";
-
-/*
- * Authenticate Director
+/* Initialize internal socket structure.
+ * This probably should be done in bsock.c
*/
-bool BSOCK::authenticate_director(const char *name, const char *password,
- TLS_CONTEXT *tls_ctx, char *errmsg, int errmsg_len)
-{
- int tls_local_need = BNET_TLS_NONE;
- int tls_remote_need = BNET_TLS_NONE;
- int compatible = true;
- char bashed_name[MAX_NAME_LENGTH];
- BSOCK *dir = this; /* for readability */
-
- *errmsg = 0;
- /*
- * Send my name to the Director then do authentication
- */
-
- /* Timeout Hello after 15 secs */
- dir->start_timer(15);
- dir->fsend(hello, bashed_name);
-
- if (get_tls_enable(tls_ctx)) {
- tls_local_need = get_tls_enable(tls_ctx) ? BNET_TLS_REQUIRED : BNET_TLS_OK;
- }
-
- /* respond to Dir challenge */
- if (!cram_md5_respond(dir, password, &tls_remote_need, &compatible) ||
- /* Now challenge dir */
- !cram_md5_challenge(dir, password, tls_local_need, compatible)) {
- bsnprintf(errmsg, errmsg_len, _("Director authorization error at \"%s:%d\"\n"),
- dir->host(), dir->port());
- goto bail_out;
- }
-
- /* Verify that the remote host is willing to meet our TLS requirements */
- if (tls_remote_need < tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
- bsnprintf(errmsg, errmsg_len, _("Authorization error:"
- " Remote server at \"%s:%d\" did not advertise required TLS support.\n"),
- dir->host(), dir->port());
- goto bail_out;
- }
-
- /* Verify that we are willing to meet the remote host's requirements */
- if (tls_remote_need > tls_local_need && tls_local_need != BNET_TLS_OK && tls_remote_need != BNET_TLS_OK) {
- bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\":"
- " Remote server requires TLS.\n"),
- dir->host(), dir->port());
-
- goto bail_out;
- }
-
- /* Is TLS Enabled? */
- if (have_tls) {
- if (tls_local_need >= BNET_TLS_OK && tls_remote_need >= BNET_TLS_OK) {
- /* Engage TLS! Full Speed Ahead! */
- if (!bnet_tls_client(tls_ctx, dir, NULL)) {
- bsnprintf(errmsg, errmsg_len, _("TLS negotiation failed with Director at \"%s:%d\"\n"),
- dir->host(), dir->port());
- goto bail_out;
- }
- }
- }
-
- Dmsg1(6, ">dird: %s", dir->msg);
- if (dir->recv() <= 0) {
- dir->stop_timer();
- bsnprintf(errmsg, errmsg_len, _("Bad errmsg to Hello command: ERR=%s\n"
- "The Director at \"%s:%d\" may not be running.\n"),
- dir->bstrerror(), dir->host(), dir->port());
- return false;
- }
-
- dir->stop_timer();
- Dmsg1(10, "<dird: %s", dir->msg);
- if (strncmp(dir->msg, OKhello, sizeof(OKhello)-1) != 0) {
- bsnprintf(errmsg, errmsg_len, _("Director at \"%s:%d\" rejected Hello command\n"),
- dir->host(), dir->port());
- return false;
- } else {
- bsnprintf(errmsg, errmsg_len, "%s", dir->msg);
- }
- return true;
-
-bail_out:
- dir->stop_timer();
- bsnprintf(errmsg, errmsg_len, _("Authorization error with Director at \"%s:%d\"\n"
- "Most likely the passwords do not agree.\n"
- "If you are using TLS, there may have been a certificate validation error during the TLS handshake.\n"
- "For help, please see: " MANUAL_AUTH_URL "\n"),
- dir->host(), dir->port());
- return false;
+BSOCK *init_bsock(JCR *jcr, int sockfd, const char *who,
+ const char *host, int port, struct sockaddr *client_addr)
+{
+ Dmsg4(100, "socket=%d who=%s host=%s port=%d\n", sockfd, who, host, port);
+ BSOCK *bsock = New(BSOCK(sockfd));
+ bsock->m_master = bsock; /* don't use set_master() here */
+ bsock->set_who(bstrdup(who));
+ bsock->set_host(bstrdup(host));
+ bsock->set_port(port);
+ bmemzero(&bsock->peer_addr, sizeof(bsock->peer_addr));
+ memcpy(&bsock->client_addr, client_addr, sizeof(bsock->client_addr));
+ bsock->set_jcr(jcr);
+ return bsock;
}
-/* Try to limit the bandwidth of a network connection
- */
-void BSOCK::control_bwlimit(int bytes)
+BSOCK *dup_bsock(BSOCK *osock)
{
- btime_t now, temp;
- if (bytes == 0) {
- return;
- }
-
- now = get_current_btime(); /* microseconds */
- temp = now - m_last_tick; /* microseconds */
-
- m_nb_bytes += bytes;
-
- if (temp < 0 || temp > 10000000) { /* Take care of clock problems (>10s) or back in time */
- m_nb_bytes = bytes;
- m_last_tick = now;
- return;
- }
+ POOLMEM *cmsg;
+ POOLMEM *msg;
+ POOLMEM *errmsg;
- /* Less than 0.1ms since the last call, see the next time */
- if (temp < 100) {
- return;
+ osock->set_locking();
+ BSOCK *bsock = New(BSOCK);
+ // save already allocated variables
+ msg = bsock->msg;
+ cmsg = bsock->cmsg;
+ errmsg = bsock->errmsg;
+ // with this we make virtually the same job as with memcpy()
+ *bsock = *osock;
+ // restore saved variables
+ bsock->msg = msg;
+ bsock->cmsg = cmsg;
+ bsock->errmsg = errmsg;
+ if (osock->who()) {
+ bsock->set_who(bstrdup(osock->who()));
}
-
- /* Remove what was authorised to be written in temp us */
- m_nb_bytes -= (int64_t)(temp * ((double)m_bwlimit / 1000000.0));
-
- if (m_nb_bytes < 0) {
- m_nb_bytes = 0;
+ if (osock->host()) {
+ bsock->set_host(bstrdup(osock->host()));
}
-
- /* What exceed should be converted in sleep time */
- int64_t usec_sleep = (int64_t)(m_nb_bytes /((double)m_bwlimit / 1000000.0));
- if (usec_sleep > 100) {
- bmicrosleep(usec_sleep/1000000, usec_sleep%1000000); /* TODO: Check that bmicrosleep slept enough or sleep again */
- m_last_tick = get_current_btime();
- m_nb_bytes = 0;
- } else {
- m_last_tick = now;
+ if (osock->src_addr) {
+ bsock->src_addr = New( IPADDR( *(osock->src_addr)) );
}
+ bsock->set_duped();
+ bsock->set_master(osock);
+ return bsock;
}
-#ifdef HAVE_WIN32
-/*
- * closesocket is supposed to do a graceful disconnect under Window
- * but it doesn't. Comments on http://msdn.microsoft.com/en-us/li
- * confirm this behaviour. DisconnectEx is required instead, but
- * that function needs to be retrieved via WS IOCTL
- */
-static void
-win_close_wait(int fd)
-{
- int ret;
- GUID disconnectex_guid = WSAID_DISCONNECTEX;
- DWORD bytes_returned;
- LPFN_DISCONNECTEX DisconnectEx;
- ret = WSAIoctl(fd, SIO_GET_EXTENSION_FUNCTION_POINTER, &disconnectex_guid, sizeof(disconnectex_guid), &DisconnectEx, sizeof(DisconnectEx), &bytes_returned, NULL, NULL);
- Dmsg1(100, "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER, WSAID_DISCONNECTEX) ret = %d\n", ret);
- if (!ret) {
- DisconnectEx(fd, NULL, 0, 0);
- }
-}
+#ifndef TEST_PROGRAM
+#define TEST_PROGRAM_A
#endif
+
+void BSOCK::dump()
+{
+#ifdef TEST_PROGRAM
+ char ed1[50];
+ BSOCKCORE::dump();
+ Pmsg1(-1, "BSOCK::dump(): %p\n", this);
+ Pmsg1(-1, "\tm_spool_fd: %p\n", m_spool_fd);
+ Pmsg1(-1, "\tcmsg: %p\n", cmsg);
+ Pmsg1(-1, "\tm_data_end: %s\n", edit_int64(m_data_end, ed1));
+ Pmsg1(-1, "\tm_last_data_end: %s\n", edit_int64(m_last_data_end, ed1));
+ Pmsg1(-1, "\tm_FileIndex: %s\n", edit_int64(m_FileIndex, ed1));
+ Pmsg1(-1, "\tm_lastFileIndex: %s\n", edit_int64(m_lastFileIndex, ed1));
+ Pmsg1(-1, "\tm_spool: %s\n", m_spool?"true":"false");
+ Pmsg1(-1, "\tm_compress: %s\n", m_compress?"true":"false");
+ Pmsg1(-1, "\tm_CommBytes: %s\n", edit_uint64(m_CommBytes, ed1));
+ Pmsg1(-1, "\tm_CommCompressedBytes: %s\n", edit_uint64(m_CommCompressedBytes, ed1));
+#endif
+};
+
+#ifdef TEST_PROGRAM
+
+void terminate(int sig){};
+void free_my_jcr(JCR *jcr){
+ /* TODO: handle full JCR free */
+ free_jcr(jcr);
+};
+
+int main()
+{
+ BSOCK *bs;
+ BSOCK *bsdup;
+ pid_t pid;
+ int rc;
+ char *host = (char*)"localhost";
+ char *name = (char*)"Test";
+ JCR *jcr;
+
+ debug_level = 500;
+ my_name_is(0, NULL, "bsock_test");
+ init_signals(terminate);
+ lmgr_init_thread(); /* initialize the lockmanager stack */
+
+ jcr = new_jcr(sizeof(JCR), NULL);
+ bs = New(BSOCK);
+ Pmsg0(0, "Initialize ...\n");
+ bs->set_jcr(jcr);
+ bs->dump();
+
+ pid = fork();
+ if (0 == pid){
+ Pmsg0(0, "prepare to execute netcat\n");
+ rc = execl("/bin/netcat", "netcat", "-p", "20000", "-l", NULL);
+ Pmsg1(0, "Error executing netcat: %s\n", strerror(rc));
+ exit(1);
+ }
+ Pmsg1(0, "after fork: %d\n", pid);
+ bmicrosleep(1, 0);
+ if (bs->connect(jcr, 1, 10, 0, name, host, NULL, 20000, 0)) {
+ /* connected */
+ Pmsg0(0, "connected ...\n");
+ bs->dump();
+ Pmsg0(0, "duped BSOCK\n");
+ bsdup = dup_bsock(bs);
+ bsdup->dump();
+ Pmsg0(0, "orignal after duped\n");
+ bs->dump();
+ } else {
+ Pmsg1(0, "connection error: %s\n", bs->bstrerror());
+ }
+ kill(pid, SIGTERM);
+ delete(bs);
+ free_my_jcr(jcr);
+ lmgr_cleanup_main();
+ // sm_dump(false);
+};
+#endif /* TEST_PROGRAM */
* Note, the old non-class code is in bnet.c, and the
* new class code associated with this file is in bsock.c
*
+ * The new code inherit common functions from BSOCKCORE class
+ * and implement BSOCK/Bacula specific protocols and data flow.
+ *
* Kern Sibbald, May MM
*
+ * Major refactoring of BSOCK code written by:
+ *
+ * Radosław Korzeniewski, MMXVIII
+ * radoslaw@korzeniewski.net, radekk@inteos.pl
+ * Inteos Sp. z o.o. http://www.inteos.pl/
+ *
* Zero msglen from other end indicates soft eof (usually
* end of some binary data stream, but not end of conversation).
*
#ifndef __BSOCK_H_
#define __BSOCK_H_
-struct btimer_t; /* forward reference */
-class BSOCK;
-/* Effectively disable the bsock time out */
#define BSOCK_TIMEOUT 3600 * 24 * 200; /* default 200 days */
-btimer_t *start_bsock_timer(BSOCK *bs, uint32_t wait);
-void stop_bsock_timer(btimer_t *wid);
-
-class BSOCKCallback {
-public:
- BSOCKCallback();
- virtual ~BSOCKCallback();
- virtual bool bsock_send_cb() = 0;
-};
-class BSOCK {
-/*
- * Note, keep this public part before the private otherwise
- * bat breaks on some systems such as RedHat.
- */
+class BSOCK: public BSOCKCORE {
public:
- uint64_t read_seqno; /* read sequence number */
- POOLMEM *msg; /* message pool buffer */
- POOLMEM *cmsg; /* Compress buffer */
- POOLMEM *errmsg; /* edited error message */
- RES *res; /* Resource to which we are connected */
FILE *m_spool_fd; /* spooling file */
- TLS_CONNECTION *tls; /* associated tls connection */
- IPADDR *src_addr; /* IP address to source connections from */
- uint32_t in_msg_no; /* input message number */
- uint32_t out_msg_no; /* output message number */
- uint32_t *pout_msg_no; /* pointer to the above */
- int32_t msglen; /* message length */
- volatile time_t timer_start; /* time started read/write */
- volatile time_t timeout; /* timeout BSOCK after this interval */
- int m_fd; /* socket file descriptor */
- int b_errno; /* bsock errno */
- int m_blocking; /* blocking state (0 = nonblocking, 1 = blocking) */
- volatile int errors; /* incremented for each error on socket */
- volatile bool m_suppress_error_msgs; /* set to suppress error messages */
-
- struct sockaddr client_addr; /* client's IP address */
- struct sockaddr_in peer_addr; /* peer's IP address */
-
- /* when "installed", send_hook_cb->bsock_send_cb() is called before
- * any ::send().
- */
- BSOCKCallback *send_hook_cb;
+ POOLMEM *cmsg; /* Compress buffer */
private:
- /* m_master is used by "duped" BSOCK to access some attributes of the "parent"
- * thread to have an up2date status (for example when the job is canceled,
- * the "parent" BSOCK is "terminated", but the duped BSOCK is unchanged)
- * In the future more attributes and method could use the "m_master"
- * indirection.
- * master->m_rmutex could replace pm_rmutex, idem for the (w)rite" mutex
- * "m_master->error" should be incremented instead of "error", but
- * this require a lock.
- *
- * USAGE: the parent thread MUST be sure that the child thread have quit
- * before to free the "parent" BSOCK.
- */
- BSOCK *m_next; /* next BSOCK if duped (not actually used) */
- JCR *m_jcr; /* jcr or NULL for error msgs */
- pthread_mutex_t m_rmutex; /* for read locking if use_locking set */
- pthread_mutex_t m_wmutex; /* for write locking if use_locking set */
- mutable pthread_mutex_t m_mmutex; /* when accessing the master/next chain */
- pthread_mutex_t *pm_rmutex; /* Pointer to the read mutex */
- pthread_mutex_t *pm_wmutex; /* Pointer to the write mutex */
- char *m_who; /* Name of daemon to which we are talking */
- char *m_host; /* Host name/IP */
- int m_port; /* desired port */
- btimer_t *m_tid; /* timer id */
boffset_t m_data_end; /* offset of data written */
- boffset_t m_last_data_end; /* offset of last valid data written */
+ boffset_t m_last_data_end; /* offset of last valid data written */
int32_t m_FileIndex; /* attr spool FI */
int32_t m_lastFileIndex; /* last valid attr spool FI */
- uint32_t m_flags; /* Special flags */
- volatile bool m_timed_out: 1; /* timed out in read/write */
- volatile bool m_terminated: 1; /* set when BNET_TERMINATE arrives */
- bool m_closed: 1; /* set when socket is closed */
- bool m_duped: 1; /* set if duped BSOCK */
bool m_spool: 1; /* set for spooling */
bool m_compress: 1; /* set to use comm line compression */
- bool m_use_locking; /* set to use locking (out of a bitfield */
- /* to avoid race conditions) */
-
- int64_t m_bwlimit; /* set to limit bandwidth */
- int64_t m_nb_bytes; /* bytes sent/recv since the last tick */
- btime_t m_last_tick; /* last tick used by bwlimit */
uint64_t m_CommBytes; /* Bytes sent */
uint64_t m_CommCompressedBytes; /* Compressed bytes sent */
- void fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port,
- struct sockaddr *lclient_addr);
bool open(JCR *jcr, const char *name, char *host, char *service,
int port, utime_t heart_beat, int *fatal);
- void master_lock() const { if (m_use_locking) pP((&m_mmutex)); };
- void master_unlock() const { if (m_use_locking) pV((&m_mmutex)); };
+ void init();
+ void _destroy();
+ int32_t write_nbytes(char *ptr, int32_t nbytes);
public:
- BSOCK *m_master; /* "this" or the "parent" BSOCK if duped */
- /* methods -- in bsock.c */
- void init();
- void free_tls();
- bool connect(JCR * jcr, int retry_interval, utime_t max_retry_time,
- utime_t heart_beat, const char *name, char *host,
- char *service, int port, int verbose);
+ BSOCK();
+ BSOCK(int sockfd);
+ ~BSOCK();
int32_t recv();
bool send() { return send(0); };
bool send(int flags);
- bool fsend(const char*, ...);
bool signal(int signal);
- void close(); /* close connection and destroy packet */
- void _destroy(); /* called by destroy() */
- void destroy(); /* destroy socket packet */
- bool comm_compress(); /* in bsock.c */
- const char *bstrerror(); /* last error on socket */
- int get_peer(char *buf, socklen_t buflen);
+ void close(); /* close connection and destroy packet */
+ bool comm_compress(); /* in bsock.c */
bool despool(void update_attr_spool_size(ssize_t size), ssize_t tsize);
- bool set_buffer_size(uint32_t size, int rw);
- int set_nonblocking();
- int set_blocking();
- void restore_blocking(int flags);
- void set_killable(bool killable);
- int wait_data(int sec, int msec=0);
- int wait_data_intr(int sec, int msec=0);
bool authenticate_director(const char *name, const char *password,
TLS_CONTEXT *tls_ctx, char *response, int response_len);
- bool set_locking(); /* in bsock.c */
- void clear_locking(); /* in bsock.c */
- void set_source_address(dlist *src_addr_list);
- void control_bwlimit(int bytes); /* in bsock.c */
/* Inline functions */
- void suppress_error_messages(bool flag) { m_suppress_error_msgs = flag; };
- void set_jcr(JCR *jcr) { m_jcr = jcr; };
- void set_who(char *who) { m_who = who; };
- void set_host(char *host) { m_host = host; };
- void set_port(int port) { m_port = port; };
- char *who() const { return m_who; };
- char *host() const { return m_host; };
- int port() const { return m_port; };
- JCR *jcr() const { return m_jcr; };
- JCR *get_jcr() const { return m_jcr; };
bool is_spooling() const { return m_spool; };
- bool is_duped() const { return m_duped; };
- bool is_terminated() const { return m_terminated; };
- bool is_timed_out() const { return m_timed_out; };
- bool is_closed() const { return m_closed; };
- bool is_open() const { return !m_closed; };
- bool is_stop() const { return errors || is_terminated() || is_closed(); };
- bool is_error() { errno = b_errno; return errors; };
bool can_compress() const { return m_compress; };
void set_data_end(int32_t FileIndex) {
if (m_spool && FileIndex > m_FileIndex) {
int32_t get_lastFileIndex() { return m_lastFileIndex; };
uint32_t CommBytes() { return m_CommBytes; };
uint32_t CommCompressedBytes() { return m_CommCompressedBytes; };
- void set_bwlimit(int64_t maxspeed) { m_bwlimit = maxspeed; };
- bool use_bwlimit() { return m_bwlimit > 0;};
void set_spooling() { m_spool = true; };
void clear_spooling() { m_spool = false; };
void set_compress() { m_compress = true; };
void clear_compress() { m_compress = false; };
- void set_duped() { m_duped = true; };
- void set_master(BSOCK *master) { master_lock(); m_master = master; m_next = master->m_next; master->m_next = this; master_unlock(); };
- void set_timed_out() { m_timed_out = true; };
- void clear_timed_out() { m_timed_out = false; };
- void set_terminated() { m_terminated = true; };
- void set_closed() { m_closed = true; };
- void start_timer(int sec) { m_tid = start_bsock_timer(this, sec); };
- void stop_timer() { stop_bsock_timer(m_tid); };
- void swap_msgs();
- void install_send_hook_cb(BSOCKCallback *obj) { send_hook_cb=obj; };
- void uninstall_send_hook_cb() { send_hook_cb=NULL; };
- void cancel(); /* call it when JCR is canceled */
-
+ void dump();
};
/*
* for flags, and the low 16 bits are for other info such as
* compressed data offset (BNET_OFFSET)
*/
-#define BNET_IS_CMD (1<<28) /* set for command data */
-#define BNET_OFFSET (1<<27) /* Data compression offset specified */
-#define BNET_NOCOMPRESS (1<<25) /* Disable compression */
-#define BNET_DATACOMPRESSED (1<<24) /* Data compression */
+#define BNET_IS_CMD (1<<28) /* set for command data */
+#define BNET_OFFSET (1<<27) /* Data compression offset specified */
+#define BNET_NOCOMPRESS (1<<25) /* Disable compression */
+#define BNET_DATACOMPRESSED (1<<24) /* Data compression */
#define BNET_SETBUF_READ 1 /* Arg for bnet_set_buffer_size */
#define BNET_SETBUF_WRITE 2 /* Arg for bnet_set_buffer_size */
BNET_CMD_STP_FLOWCTRL = 7 /* backup FD->SD SD must stop sending flowcontrol information */
};
-const char *bnet_cmd_to_name(int val);
-
/*
* TLS enabling values. Value is important for comparison, ie:
* if (tls_remote_need < BNET_TLS_REQUIRED) { ... }
BNET_TLS_REQUIRED = 2 /* TLS is required */
};
-int32_t read_nbytes(BSOCK * bsock, char *ptr, int32_t nbytes);
-int32_t write_nbytes(BSOCK * bsock, char *ptr, int32_t nbytes);
+const char *bnet_cmd_to_name(int val);
BSOCK *new_bsock();
/*
* Completely release the socket packet, and NULL the pointer
*/
-#define free_bsock(a) do{if(a){(a)->close(); (a)->destroy(); (a)=NULL;}} while(0)
+#define free_bsock(a) free_bsockcore(a)
/*
* Does the socket exist and is it open?
*/
-#define is_bsock_open(a) ((a) && (a)->is_open())
+#define is_bsock_open(a) is_bsockcore_open(a)
#endif /* __BSOCK_H_ */
--- /dev/null
+/*
+ Bacula(R) - The Network Backup Solution
+
+ Copyright (C) 2000-2017 Kern Sibbald
+
+ The original author of Bacula is Kern Sibbald, with contributions
+ from many others, a complete list can be found in the file AUTHORS.
+
+ You may use this file and others of this release according to the
+ license defined in the LICENSE file, which includes the Affero General
+ Public License, v3.0 ("AGPLv3") and some additional permissions and
+ terms pursuant to its AGPLv3 Section 7.
+
+ This notice must be preserved when any source code is
+ conveyed and/or propagated.
+
+ Bacula(R) is a registered trademark of Kern Sibbald.
+*/
+/*
+ * Bacula Core Sock Class definition
+ *
+ * Kern Sibbald, May MM
+ *
+ * Major refactoring of BSOCK code written by:
+ *
+ * Radosław Korzeniewski, MMXVIII
+ * radoslaw@korzeniewski.net, radekk@inteos.pl
+ * Inteos Sp. z o.o. http://www.inteos.pl/
+ *
+ * This is a common class for socket network communication derived from
+ * BSOCK class. It acts as a base class for non-Bacula network communication
+ * and as a base class for standard BSOCK implementation. Basically the BSOCK
+ * class did not changed its functionality for any Bacula specific part.
+ * Now you can use a BSOCKCLASS for other network communication.
+ */
+
+#include "bacula.h"
+#include "jcr.h"
+#include <netdb.h>
+#include <netinet/tcp.h>
+
+#define BSOCKCORE_DEBUG_LVL 900
+
+#if !defined(ENODATA) /* not defined on BSD systems */
+#define ENODATA EPIPE
+#endif
+
+#if !defined(SOL_TCP) /* Not defined on some systems */
+#define SOL_TCP IPPROTO_TCP
+#endif
+
+#ifdef HAVE_WIN32
+#include <mswsock.h>
+static void win_close_wait(int fd);
+#ifndef SOCK_CLOEXEC
+#define SOCK_CLOEXEC 0
+#endif
+#endif
+
+/*
+ * make a nice dump of a message
+ */
+void dump_bsock_msg(int sock, uint32_t msgno, const char *what, uint32_t rc, int32_t pktsize, uint32_t flags,
+ POOLMEM *msg, int32_t msglen)
+{
+ char buf[54];
+ bool is_ascii;
+ int dbglvl = DT_ASX;
+
+ if (msglen<0) {
+ Dmsg4(dbglvl, "%s %d:%d SIGNAL=%s\n", what, sock, msgno, bnet_sig_to_ascii(msglen));
+ // data
+ smartdump(msg, msglen, buf, sizeof(buf)-9, &is_ascii);
+ if (is_ascii) {
+ Dmsg5(dbglvl, "%s %d:%d len=%d \"%s\"\n", what, sock, msgno, msglen, buf);
+ } else {
+ Dmsg5(dbglvl, "%s %d:%d len=%d %s\n", what, sock, msgno, msglen, buf);
+ }
+ }
+}
+
+BSOCKCallback::BSOCKCallback()
+{
+}
+
+BSOCKCallback::~BSOCKCallback()
+{
+}
+
+/*
+ * Default constructor does class initialization.
+ */
+BSOCKCORE::BSOCKCORE() :
+ msg(NULL),
+ errmsg(NULL),
+ res(NULL),
+ tls(NULL),
+ src_addr(NULL),
+ read_seqno(0),
+ in_msg_no(0),
+ out_msg_no(0),
+ pout_msg_no(NULL),
+ msglen(0),
+ timer_start(0),
+ timeout(0),
+ m_fd(-1),
+ b_errno(0),
+ m_blocking(0),
+ errors(0),
+ m_suppress_error_msgs(false),
+ send_hook_cb(NULL),
+ m_next(NULL),
+ m_jcr(NULL),
+ pm_rmutex(NULL),
+ pm_wmutex(NULL),
+ m_who(NULL),
+ m_host(NULL),
+ m_port(0),
+ m_tid(NULL),
+ m_flags(0),
+ m_timed_out(false),
+ m_terminated(false),
+ m_closed(false),
+ m_duped(false),
+ m_use_locking(false),
+ m_bwlimit(0),
+ m_nb_bytes(0),
+ m_last_tick(0)
+{
+ pthread_mutex_init(&m_rmutex, NULL);
+ pthread_mutex_init(&m_wmutex, NULL);
+ pthread_mutex_init(&m_mmutex, NULL);
+ bmemzero(&peer_addr, sizeof(peer_addr));
+ bmemzero(&client_addr, sizeof(client_addr));
+ init();
+};
+
+/*
+ * Default destructor releases resources.
+ */
+BSOCKCORE::~BSOCKCORE()
+{
+ Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::~BSOCKCORE()\n");
+ _destroy();
+};
+
+/*
+ * Initialization method.
+ */
+void BSOCKCORE::init()
+{
+ m_master = this;
+ set_closed();
+ set_terminated();
+ m_blocking = 1;
+ msg = get_pool_memory(PM_BSOCK);
+ errmsg = get_pool_memory(PM_MESSAGE);
+ timeout = BSOCKCORE_TIMEOUT;
+ pout_msg_no = &out_msg_no;
+}
+
+void BSOCKCORE::free_tls()
+{
+ free_tls_connection(this->tls);
+ this->tls = NULL;
+}
+
+/*
+ * Try to connect to host for max_retry_time at retry_time intervals.
+ * Note, you must have called the constructor prior to calling
+ * this routine.
+ */
+bool BSOCKCORE::connect(JCR * jcr, int retry_interval, utime_t max_retry_time,
+ utime_t heart_beat,
+ const char *name, char *host, char *service, int port,
+ int verbose)
+{
+ bool ok = false;
+ int i;
+ int fatal = 0;
+ time_t begin_time = time(NULL);
+ time_t now;
+ btimer_t *tid = NULL;
+
+ /* Try to trap out of OS call when time expires */
+ if (max_retry_time) {
+ tid = start_thread_timer(jcr, pthread_self(), (uint32_t)max_retry_time);
+ }
+
+ for (i = 0; !open(jcr, name, host, service, port, heart_beat, &fatal);
+ i -= retry_interval) {
+ berrno be;
+ if (fatal || (jcr && job_canceled(jcr))) {
+ goto bail_out;
+ }
+ Dmsg4(50, "Unable to connect to %s on %s:%d. ERR=%s\n",
+ name, host, port, be.bstrerror());
+ if (i < 0) {
+ i = 60 * 5; /* complain again in 5 minutes */
+ if (verbose)
+ Qmsg4(jcr, M_WARNING, 0, _(
+ "Could not connect to %s on %s:%d. ERR=%s\n"
+ "Retrying ...\n"), name, host, port, be.bstrerror());
+ }
+ bmicrosleep(retry_interval, 0);
+ now = time(NULL);
+ if (begin_time + max_retry_time <= now) {
+ Qmsg4(jcr, M_FATAL, 0, _("Unable to connect to %s on %s:%d. ERR=%s\n"),
+ name, host, port, be.bstrerror());
+ goto bail_out;
+ }
+ }
+ ok = true;
+
+bail_out:
+ if (tid) {
+ stop_thread_timer(tid);
+ }
+ return ok;
+}
+
+/*
+ * Finish initialization of the packet structure.
+ */
+void BSOCKCORE::fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port,
+ struct sockaddr *lclient_addr)
+{
+ Dmsg3(100, "who=%s host=%s port=%d\n", who, host, port);
+ m_fd = sockfd;
+ if (m_who) {
+ free(m_who);
+ }
+ if (m_host) {
+ free(m_host);
+ }
+ set_who(bstrdup(who));
+ set_host(bstrdup(host));
+ set_port(port);
+ memcpy(&client_addr, lclient_addr, sizeof(client_addr));
+ set_jcr(jcr);
+}
+
+/*
+ * Copy the address from the configuration dlist that gets passed in
+ */
+void BSOCKCORE::set_source_address(dlist *src_addr_list)
+{
+ IPADDR *addr = NULL;
+
+ // delete the object we already have, if it's allocated
+ if (src_addr) {
+ /* TODO: Why free() instead of delete as src_addr is a IPADDR class */
+ free( src_addr);
+ src_addr = NULL;
+ }
+
+ if (src_addr_list) {
+ addr = (IPADDR*) src_addr_list->first();
+ src_addr = New( IPADDR(*addr));
+ }
+}
+
+/*
+ * Open a TCP connection to the server
+ * Returns true when connection was successful or false otherwise.
+ */
+bool BSOCKCORE::open(JCR *jcr, const char *name, char *host, char *service,
+ int port, utime_t heart_beat, int *fatal)
+{
+ int sockfd = -1;
+ dlist *addr_list;
+ IPADDR *ipaddr;
+ bool connected = false;
+ int turnon = 1;
+ const char *errstr;
+ int save_errno = 0;
+
+ /*
+ * Fill in the structure serv_addr with the address of
+ * the server that we want to connect with.
+ */
+ if ((addr_list = bnet_host2ipaddrs(host, 0, &errstr)) == NULL) {
+ /* Note errstr is not malloc'ed */
+ Qmsg2(jcr, M_ERROR, 0, _("gethostbyname() for host \"%s\" failed: ERR=%s\n"),
+ host, errstr);
+ Dmsg2(100, "bnet_host2ipaddrs() for host %s failed: ERR=%s\n",
+ host, errstr);
+ *fatal = 1;
+ return false;
+ }
+
+ remove_duplicate_addresses(addr_list);
+ foreach_dlist(ipaddr, addr_list) {
+ ipaddr->set_port_net(htons(port));
+ char allbuf[256 * 10];
+ char curbuf[256];
+ Dmsg2(100, "Current %sAll %s\n",
+ ipaddr->build_address_str(curbuf, sizeof(curbuf)),
+ build_addresses_str(addr_list, allbuf, sizeof(allbuf)));
+ /* Open a TCP socket */
+ if ((sockfd = socket(ipaddr->get_family(), SOCK_STREAM|SOCK_CLOEXEC, 0)) < 0) {
+ berrno be;
+ save_errno = errno;
+ switch (errno) {
+#ifdef EAFNOSUPPORT
+ case EAFNOSUPPORT:
+ /*
+ * The name lookup of the host returned an address in a protocol family
+ * we don't support. Suppress the error and try the next address.
+ */
+ break;
+#endif
+#ifdef EPROTONOSUPPORT
+ /* See above comments */
+ case EPROTONOSUPPORT:
+ break;
+#endif
+#ifdef EPROTOTYPE
+ /* See above comments */
+ case EPROTOTYPE:
+ break;
+#endif
+ default:
+ *fatal = 1;
+ Qmsg3(jcr, M_ERROR, 0, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
+ ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
+ Pmsg3(300, _("Socket open error. proto=%d port=%d. ERR=%s\n"),
+ ipaddr->get_family(), ipaddr->get_port_host_order(), be.bstrerror());
+ break;
+ }
+ continue;
+ }
+
+ /* Bind to the source address if it is set */
+ if (src_addr) {
+ if (bind(sockfd, src_addr->get_sockaddr(), src_addr->get_sockaddr_len()) < 0) {
+ berrno be;
+ save_errno = errno;
+ *fatal = 1;
+ Qmsg2(jcr, M_ERROR, 0, _("Source address bind error. proto=%d. ERR=%s\n"),
+ src_addr->get_family(), be.bstrerror() );
+ Pmsg2(000, _("Source address bind error. proto=%d. ERR=%s\n"),
+ src_addr->get_family(), be.bstrerror() );
+ if (sockfd >= 0) socketClose(sockfd);
+ continue;
+ }
+ }
+
+ /*
+ * Keep socket from timing out from inactivity
+ */
+ if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
+ berrno be;
+ Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
+ be.bstrerror());
+ }
+#if defined(TCP_KEEPIDLE)
+ if (heart_beat) {
+ int opt = heart_beat;
+ if (setsockopt(sockfd, SOL_TCP, TCP_KEEPIDLE, (sockopt_val_t)&opt, sizeof(opt)) < 0) {
+ berrno be;
+ Qmsg1(jcr, M_WARNING, 0, _("Cannot set TCP_KEEPIDLE on socket: %s\n"),
+ be.bstrerror());
+ }
+ }
+#endif
+
+ /* connect to server */
+ if (::connect(sockfd, ipaddr->get_sockaddr(), ipaddr->get_sockaddr_len()) < 0) {
+ save_errno = errno;
+ if (sockfd >= 0) socketClose(sockfd);
+ continue;
+ }
+ *fatal = 0;
+ connected = true;
+ break;
+ }
+
+ if (!connected) {
+ berrno be;
+ free_addresses(addr_list);
+ errno = save_errno | b_errno_win32;
+ Dmsg4(50, "Could not connect to server %s %s:%d. ERR=%s\n",
+ name, host, port, be.bstrerror());
+ return false;
+ }
+ /*
+ * Keep socket from timing out from inactivity
+ * Do this a second time out of paranoia
+ */
+ if (setsockopt(sockfd, SOL_SOCKET, SO_KEEPALIVE, (sockopt_val_t)&turnon, sizeof(turnon)) < 0) {
+ berrno be;
+ Qmsg1(jcr, M_WARNING, 0, _("Cannot set SO_KEEPALIVE on socket: %s\n"),
+ be.bstrerror());
+ }
+ fin_init(jcr, sockfd, name, host, port, ipaddr->get_sockaddr());
+ free_addresses(addr_list);
+
+ /* Clean the packet a bit */
+ m_closed = false;
+ m_duped = false;
+ // Moved to BSOCK m_spool = false;
+ m_use_locking = false;
+ m_timed_out = false;
+ m_terminated = false;
+ m_suppress_error_msgs = false;
+ errors = 0;
+ m_blocking = 0;
+
+ Dmsg3(50, "OK connected to server %s %s:%d.\n",
+ name, host, port);
+
+ return true;
+}
+
+/*
+ * Force read/write to use locking
+ */
+bool BSOCKCORE::set_locking()
+{
+ int stat;
+ if (m_use_locking) {
+ return true; /* already set */
+ }
+ pm_rmutex = &m_rmutex;
+ pm_wmutex = &m_wmutex;
+ if ((stat = pthread_mutex_init(pm_rmutex, NULL)) != 0) {
+ berrno be;
+ Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsockcore read mutex. ERR=%s\n"),
+ be.bstrerror(stat));
+ return false;
+ }
+ if ((stat = pthread_mutex_init(pm_wmutex, NULL)) != 0) {
+ berrno be;
+ Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsockcore write mutex. ERR=%s\n"),
+ be.bstrerror(stat));
+ return false;
+ }
+ if ((stat = pthread_mutex_init(&m_mmutex, NULL)) != 0) {
+ berrno be;
+ Qmsg(m_jcr, M_FATAL, 0, _("Could not init bsockcore attribute mutex. ERR=%s\n"),
+ be.bstrerror(stat));
+ return false;
+ }
+ m_use_locking = true;
+ return true;
+}
+
+void BSOCKCORE::clear_locking()
+{
+ if (!m_use_locking || m_duped) {
+ return;
+ }
+ m_use_locking = false;
+ pthread_mutex_destroy(pm_rmutex);
+ pthread_mutex_destroy(pm_wmutex);
+ pthread_mutex_destroy(&m_mmutex);
+ pm_rmutex = NULL;
+ pm_wmutex = NULL;
+ return;
+}
+
+/*
+ * Send a message over the network. Everything is sent in one write request.
+ *
+ * Returns: false on failure
+ * true on success
+ */
+bool BSOCKCORE::send()
+{
+ int32_t rc;
+ bool ok = true;
+ bool locked = false;
+
+ if (is_closed()) {
+ if (!m_suppress_error_msgs) {
+ Qmsg0(m_jcr, M_ERROR, 0, _("Socket is closed\n"));
+ }
+ return false;
+ }
+ if (errors) {
+ if (!m_suppress_error_msgs) {
+ Qmsg4(m_jcr, M_ERROR, 0, _("Socket has errors=%d on call to %s:%s:%d\n"),
+ errors, m_who, m_host, m_port);
+ }
+ return false;
+ }
+ if (is_terminated()) {
+ if (!m_suppress_error_msgs) {
+ Qmsg4(m_jcr, M_ERROR, 0, _("BSOCKCORE send while terminated=%d on call to %s:%s:%d\n"),
+ is_terminated(), m_who, m_host, m_port);
+ }
+ return false;
+ }
+
+ if (msglen > 4000000) {
+ if (!m_suppress_error_msgs) {
+ Qmsg4(m_jcr, M_ERROR, 0,
+ _("Socket has insane msglen=%d on call to %s:%s:%d\n"),
+ msglen, m_who, m_host, m_port);
+ }
+ return false;
+ }
+
+ if (send_hook_cb) {
+ if (!send_hook_cb->bsock_send_cb()) {
+ Dmsg3(1, "Flowcontrol failure on %s:%s:%d\n", m_who, m_host, m_port);
+ Qmsg3(m_jcr, M_ERROR, 0, _("Flowcontrol failure on %s:%s:%d\n"), m_who, m_host, m_port);
+ return false;
+ }
+ }
+ if (m_use_locking) {
+ pP(pm_wmutex);
+ locked = true;
+ }
+
+ (*pout_msg_no)++; /* increment message number */
+
+ /* send data packet */
+ timer_start = watchdog_time; /* start timer */
+ clear_timed_out();
+ /* Full I/O done in one write */
+ rc = write_nbytes(msg, msglen);
+ if (chk_dbglvl(DT_NETWORK|1900)) dump_bsock_msg(m_fd, *pout_msg_no, "SEND", rc, msglen, m_flags, msg, msglen);
+ timer_start = 0; /* clear timer */
+ if (rc != msglen) {
+ errors++;
+ if (errno == 0) {
+ b_errno = EIO;
+ } else {
+ b_errno = errno;
+ }
+ if (rc < 0) {
+ if (!m_suppress_error_msgs) {
+ Qmsg5(m_jcr, M_ERROR, 0,
+ _("Write error sending %d bytes to %s:%s:%d: ERR=%s\n"),
+ msglen, m_who,
+ m_host, m_port, this->bstrerror());
+ }
+ } else {
+ Qmsg5(m_jcr, M_ERROR, 0,
+ _("Wrote %d bytes to %s:%s:%d, but only %d accepted.\n"),
+ msglen, m_who, m_host, m_port, rc);
+ }
+ ok = false;
+ }
+// Dmsg4(000, "cmpr=%d ext=%d cmd=%d m_flags=0x%x\n", msglen&BNET_COMPRESSED?1:0,
+// msglen&BNET_HDR_EXTEND?1:0, msglen&BNET_CMD_BIT?1:0, m_flags);
+ if (locked) pV(pm_wmutex);
+ return ok;
+}
+
+/*
+ * Format and send a message
+ * Returns: false on error
+ * true on success
+ */
+bool BSOCKCORE::fsend(const char *fmt, ...)
+{
+ va_list arg_ptr;
+ int maxlen;
+
+ if (is_null(this)) {
+ return false; /* do not seg fault */
+ }
+ if (errors || is_terminated() || is_closed()) {
+ return false;
+ }
+ /* This probably won't work, but we vsnprintf, then if we
+ * get a negative length or a length greater than our buffer
+ * (depending on which library is used), the printf was truncated, so
+ * get a bigger buffer and try again.
+ */
+ for (;;) {
+ maxlen = sizeof_pool_memory(msg) - 1;
+ va_start(arg_ptr, fmt);
+ msglen = bvsnprintf(msg, maxlen, fmt, arg_ptr);
+ va_end(arg_ptr);
+ if (msglen >= 0 && msglen < (maxlen - 5)) {
+ break;
+ }
+ msg = realloc_pool_memory(msg, maxlen + maxlen / 2);
+ }
+ return send();
+}
+
+/*
+ * Receive a data from the other end.
+ * The number of expected bytes in len.
+ * Returns number of bytes read (may return zero), the msglen is set accordingly.
+ * Returns -1 on error so msglen will be zero.
+ */
+int32_t BSOCKCORE::recv(int len)
+{
+ /* The method has to be redesigned from scratch */
+ int32_t nbytes;
+ bool locked = false;
+
+ msglen = nbytes = 0;
+ msg[msglen] = 0;
+ if (errors || is_terminated() || is_closed()) {
+ /* error, cannot receive */
+ return -1;
+ }
+
+ if (len > 0) {
+ /* do read only when len > 0 */
+ if (m_use_locking) {
+ pP(pm_rmutex);
+ locked = true;
+ }
+ read_seqno++; /* bump sequence number */
+ timer_start = watchdog_time; /* set start wait time */
+ clear_timed_out();
+ /* Make sure the buffer is big enough + one byte for EOS */
+ if (len >= (int32_t) sizeof_pool_memory(msg)) {
+ msg = realloc_pool_memory(msg, len + 100);
+ }
+ timer_start = watchdog_time; /* set start wait time */
+ clear_timed_out();
+ if ((nbytes = read_nbytes(msg, len)) <= 0) {
+ timer_start = 0; /* clear timer */
+ /* probably pipe broken because client died */
+ if (errno == 0) {
+ b_errno = ENODATA;
+ } else {
+ b_errno = errno;
+ }
+ nbytes = -1;
+ errors++;
+ msglen = 0; /* assume hard EOF received */
+ Qmsg4(m_jcr, M_ERROR, 0, _("Read error from %s:%s:%d: ERR=%s\n"),
+ m_who, m_host, m_port, this->bstrerror());
+ goto bailout;
+ }
+ timer_start = 0; /* clear timer */
+ in_msg_no++;
+ msglen = nbytes;
+ /*
+ * always add a zero by to properly terminate any
+ * string that was send to us. Note, we ensured above that the
+ * buffer is at least one byte longer than the message length.
+ */
+ msg[nbytes] = 0; /* terminate in case it is a string */
+ /*
+ * The following uses *lots* of resources so turn it on only for
+ * serious debugging.
+ */
+ Dsm_check(300);
+ }
+
+bailout:
+ if ((chk_dbglvl(DT_NETWORK|1900))) dump_bsock_msg(m_fd, read_seqno, "GRECV", nbytes, len, m_flags, msg, msglen);
+
+ if (locked) pV(pm_rmutex);
+ return nbytes; /* return actual length of message or -1 */
+}
+
+/*
+ * Return the string for the error that occurred
+ * on the socket. Only the first error is retained.
+ */
+const char *BSOCKCORE::bstrerror()
+{
+ berrno be;
+ if (errmsg == NULL) {
+ errmsg = get_pool_memory(PM_MESSAGE);
+ }
+ if (b_errno == 0) {
+ pm_strcpy(errmsg, "I/O Error");
+ } else {
+ pm_strcpy(errmsg, be.bstrerror(b_errno));
+ }
+ return errmsg;
+}
+
+int BSOCKCORE::get_peer(char *buf, socklen_t buflen)
+{
+#if !defined(HAVE_WIN32)
+ if (peer_addr.sin_family == 0) {
+ socklen_t salen = sizeof(peer_addr);
+ int rval = (getpeername)(m_fd, (struct sockaddr *)&peer_addr, &salen);
+ if (rval < 0) return rval;
+ }
+ if (!inet_ntop(peer_addr.sin_family, &peer_addr.sin_addr, buf, buflen))
+ return -1;
+
+ return 0;
+#else
+ return -1;
+#endif
+}
+
+/*
+ * Set the network buffer size, suggested size is in size.
+ * Actual size obtained is returned in bs->msglen
+ *
+ * Returns: false on failure
+ * true on success
+ */
+bool BSOCKCORE::set_buffer_size(uint32_t size, int rw)
+{
+ uint32_t dbuf_size, start_size;
+
+#if defined(IP_TOS) && defined(IPTOS_THROUGHPUT)
+ int opt;
+ opt = IPTOS_THROUGHPUT;
+ setsockopt(m_fd, IPPROTO_IP, IP_TOS, (sockopt_val_t)&opt, sizeof(opt));
+#endif
+
+ if (size != 0) {
+ dbuf_size = size;
+ } else {
+ dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
+ }
+ start_size = dbuf_size;
+ if ((msg = realloc_pool_memory(msg, dbuf_size + 100)) == NULL) {
+ Qmsg0(get_jcr(), M_FATAL, 0, _("Could not malloc BSOCKCORE data buffer\n"));
+ return false;
+ }
+
+ /*
+ * If user has not set the size, use the OS default -- i.e. do not
+ * try to set it. This allows sys admins to set the size they
+ * want in the OS, and Bacula will comply. See bug #1493
+ */
+ if (size == 0) {
+ msglen = dbuf_size;
+ return true;
+ }
+
+ if (rw & BNET_SETBUF_READ) {
+ while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
+ SO_RCVBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
+ berrno be;
+ Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
+ dbuf_size -= TAPE_BSIZE;
+ }
+ Dmsg1(200, "set network buffer size=%d\n", dbuf_size);
+ if (dbuf_size != start_size) {
+ Qmsg1(get_jcr(), M_WARNING, 0,
+ _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
+ }
+ }
+ if (size != 0) {
+ dbuf_size = size;
+ } else {
+ dbuf_size = DEFAULT_NETWORK_BUFFER_SIZE;
+ }
+ start_size = dbuf_size;
+ if (rw & BNET_SETBUF_WRITE) {
+ while ((dbuf_size > TAPE_BSIZE) && (setsockopt(m_fd, SOL_SOCKET,
+ SO_SNDBUF, (sockopt_val_t) & dbuf_size, sizeof(dbuf_size)) < 0)) {
+ berrno be;
+ Qmsg1(get_jcr(), M_ERROR, 0, _("sockopt error: %s\n"), be.bstrerror());
+ dbuf_size -= TAPE_BSIZE;
+ }
+ Dmsg1(900, "set network buffer size=%d\n", dbuf_size);
+ if (dbuf_size != start_size) {
+ Qmsg1(get_jcr(), M_WARNING, 0,
+ _("Warning network buffer = %d bytes not max size.\n"), dbuf_size);
+ }
+ }
+
+ msglen = dbuf_size;
+ return true;
+}
+
+/*
+ * Set socket non-blocking
+ * Returns previous socket flag
+ */
+int BSOCKCORE::set_nonblocking()
+{
+ int oflags;
+
+ /* Get current flags */
+ if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
+ berrno be;
+ Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
+ }
+
+ /* Set O_NONBLOCK flag */
+ if ((fcntl(m_fd, F_SETFL, oflags|O_NONBLOCK)) < 0) {
+ berrno be;
+ Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
+ }
+
+ m_blocking = 0;
+ return oflags;
+}
+
+/*
+ * Set socket blocking
+ * Returns previous socket flags
+ */
+int BSOCKCORE::set_blocking()
+{
+ int oflags;
+ /* Get current flags */
+ if ((oflags = fcntl(m_fd, F_GETFL, 0)) < 0) {
+ berrno be;
+ Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_GETFL error. ERR=%s\n"), be.bstrerror());
+ }
+
+ /* Set O_NONBLOCK flag */
+ if ((fcntl(m_fd, F_SETFL, oflags & ~O_NONBLOCK)) < 0) {
+ berrno be;
+ Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
+ }
+
+ m_blocking = 1;
+ return oflags;
+}
+
+void BSOCKCORE::set_killable(bool killable)
+{
+ if (m_jcr) {
+ m_jcr->set_killable(killable);
+ }
+}
+
+/*
+ * Restores socket flags
+ */
+void BSOCKCORE::restore_blocking (int flags)
+{
+ if ((fcntl(m_fd, F_SETFL, flags)) < 0) {
+ berrno be;
+ Qmsg1(get_jcr(), M_ABORT, 0, _("fcntl F_SETFL error. ERR=%s\n"), be.bstrerror());
+ }
+
+ m_blocking = (flags & O_NONBLOCK) ? true : false;
+}
+
+/*
+ * Wait for a specified time for data to appear on
+ * the BSOCKCORE connection.
+ *
+ * Returns: 1 if data available
+ * 0 if timeout
+ * -1 if error
+ */
+int BSOCKCORE::wait_data(int sec, int msec)
+{
+ for (;;) {
+ switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
+ case 0: /* timeout */
+ b_errno = 0;
+ return 0;
+ case -1:
+ b_errno = errno;
+ if (errno == EINTR) {
+ continue;
+ }
+ return -1; /* error return */
+ default:
+ b_errno = 0;
+#ifdef HAVE_TLS
+ if (this->tls && !tls_bsock_probe(this)) {
+ continue; /* false alarm, maybe a session key negotiation in progress on the socket */
+ }
+#endif
+ return 1;
+ }
+ }
+}
+
+/*
+ * As above, but returns on interrupt
+ */
+int BSOCKCORE::wait_data_intr(int sec, int msec)
+{
+ switch (fd_wait_data(m_fd, WAIT_READ, sec, msec)) {
+ case 0: /* timeout */
+ b_errno = 0;
+ return 0;
+ case -1:
+ b_errno = errno;
+ return -1; /* error return */
+ default:
+ b_errno = 0;
+#ifdef HAVE_TLS
+ if (this->tls && !tls_bsock_probe(this)) {
+ /* maybe a session key negotiation waked up the socket */
+ return 0;
+ }
+#endif
+ break;
+ }
+ return 1;
+}
+
+/*
+ * This routine closes the current BSOCKCORE.
+ * It does not delete the socket packet
+ * resources, which are released in bsock->destroy().
+ */
+#ifndef SHUT_RDWR
+#define SHUT_RDWR 2
+#endif
+
+/*
+ * The JCR is canceled, set terminate for chained BSOCKCOREs starting from master
+ */
+void BSOCKCORE::cancel()
+{
+ master_lock();
+ for (BSOCKCORE *next = m_master; next != NULL; next = next->m_next) {
+ if (!next->m_closed) {
+ next->m_terminated = true;
+ next->m_timed_out = true;
+ }
+ }
+ master_unlock();
+}
+
+/*
+ * Note, this routine closes the socket, but leaves the
+ * bsockcore memory in place.
+ * every thread is responsible of closing and destroying its own duped or not
+ * duped BSOCKCORE
+ */
+void BSOCKCORE::close()
+{
+ BSOCKCORE *bsock = this;
+
+ Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::close()\n");
+ if (bsock->is_closed()) {
+ return;
+ }
+ if (!m_duped) {
+ clear_locking();
+ }
+ bsock->set_closed();
+ bsock->set_terminated();
+ if (!bsock->m_duped) {
+ /* Shutdown tls cleanly. */
+ if (bsock->tls) {
+ tls_bsock_shutdown(bsock);
+ free_tls_connection(bsock->tls);
+ bsock->tls = NULL;
+ }
+
+#ifdef HAVE_WIN32
+ if (!bsock->is_timed_out()) {
+ win_close_wait(bsock->m_fd); /* Ensure that data is not discarded */
+ }
+#else
+ if (bsock->is_timed_out()) {
+ shutdown(bsock->m_fd, SHUT_RDWR); /* discard any pending I/O */
+ }
+#endif
+ /* On Windows this discards data if we did not do a close_wait() */
+ socketClose(bsock->m_fd); /* normal close */
+ }
+ return;
+}
+
+/*
+ * Destroy the socket (i.e. release all resources)
+ */
+void BSOCKCORE::_destroy()
+{
+ Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::_destroy()\n");
+ this->close(); /* Ensure that socket is closed */
+ if (msg) {
+ free_pool_memory(msg);
+ msg = NULL;
+ } else {
+ ASSERT2(1 == 0, "Two calls to destroy socket"); /* double destroy */
+ }
+ if (errmsg) {
+ free_pool_memory(errmsg);
+ errmsg = NULL;
+ }
+ if (m_who) {
+ free(m_who);
+ m_who = NULL;
+ }
+ if (m_host) {
+ free(m_host);
+ m_host = NULL;
+ }
+ if (src_addr) {
+ free(src_addr);
+ src_addr = NULL;
+ }
+}
+
+/*
+ * Destroy the socket (i.e. release all resources)
+ * including duped sockets.
+ * should not be called from duped BSOCKCORE
+ */
+void BSOCKCORE::destroy()
+{
+ Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::destroy()\n");
+ ASSERTD(reinterpret_cast<uintptr_t>(m_next) != 0xaaaaaaaaaaaaaaaa, "BSOCKCORE::destroy() already called\n")
+ ASSERTD(this == m_master, "BSOCKCORE::destroy() called by a non master BSOCKCORE\n")
+ ASSERTD(!m_duped, "BSOCKCORE::destroy() called by a duped BSOCKCORE\n")
+ /* I'm the master I must destroy() all the duped BSOCKCOREs */
+ master_lock();
+ BSOCKCORE *ahead;
+ for (BSOCKCORE *next = m_next; next != NULL; next = ahead) {
+ ahead = next->m_next;
+ Dmsg1(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::destroy():delete(%p)\n", next);
+ delete(next);
+ }
+ master_unlock();
+ Dmsg0(BSOCKCORE_DEBUG_LVL, "BSOCKCORE::destroy():delete(this)\n");
+ delete(this);
+}
+
+/* Try to limit the bandwidth of a network connection
+ */
+void BSOCKCORE::control_bwlimit(int bytes)
+{
+ btime_t now, temp;
+ if (bytes == 0) {
+ return;
+ }
+
+ now = get_current_btime(); /* microseconds */
+ temp = now - m_last_tick; /* microseconds */
+
+ m_nb_bytes += bytes;
+
+ if (temp < 0 || temp > 10000000) { /* Take care of clock problems (>10s) or back in time */
+ m_nb_bytes = bytes;
+ m_last_tick = now;
+ return;
+ }
+
+ /* Less than 0.1ms since the last call, see the next time */
+ if (temp < 100) {
+ return;
+ }
+
+ /* Remove what was authorised to be written in temp us */
+ m_nb_bytes -= (int64_t)(temp * ((double)m_bwlimit / 1000000.0));
+
+ if (m_nb_bytes < 0) {
+ m_nb_bytes = 0;
+ }
+
+ /* What exceed should be converted in sleep time */
+ int64_t usec_sleep = (int64_t)(m_nb_bytes /((double)m_bwlimit / 1000000.0));
+ if (usec_sleep > 100) {
+ bmicrosleep(usec_sleep/1000000, usec_sleep%1000000); /* TODO: Check that bmicrosleep slept enough or sleep again */
+ m_last_tick = get_current_btime();
+ m_nb_bytes = 0;
+ } else {
+ m_last_tick = now;
+ }
+}
+
+/*
+ * Write nbytes to the network.
+ * It may require several writes.
+ */
+
+int32_t BSOCKCORE::write_nbytes(char *ptr, int32_t nbytes)
+{
+ int32_t nleft, nwritten;
+
+#ifdef HAVE_TLS
+ if (tls) {
+ /* TLS enabled */
+ return (tls_bsock_writen((BSOCK*)this, ptr, nbytes));
+ }
+#endif /* HAVE_TLS */
+
+ nleft = nbytes;
+ while (nleft > 0) {
+ do {
+ errno = 0;
+ nwritten = socketWrite(m_fd, ptr, nleft);
+ if (is_timed_out() || is_terminated()) {
+ return -1;
+ }
+
+#ifdef HAVE_WIN32
+ /*
+ * We simulate errno on Windows for a socket
+ * error in order to handle errors correctly.
+ */
+ if (nwritten == SOCKET_ERROR) {
+ DWORD err = WSAGetLastError();
+ nwritten = -1;
+ if (err == WSAEINTR) {
+ errno = EINTR;
+ } else if (err == WSAEWOULDBLOCK) {
+ errno = EAGAIN;
+ } else {
+ errno = EIO; /* some other error */
+ }
+ }
+#endif
+
+ } while (nwritten == -1 && errno == EINTR);
+ /*
+ * If connection is non-blocking, we will get EAGAIN, so
+ * use select()/poll to keep from consuming all the CPU
+ * and try again.
+ */
+ if (nwritten == -1 && errno == EAGAIN) {
+ fd_wait_data(m_fd, WAIT_WRITE, 1, 0);
+ continue;
+ }
+ if (nwritten <= 0) {
+ return -1; /* error */
+ }
+ nleft -= nwritten;
+ ptr += nwritten;
+ if (use_bwlimit()) {
+ control_bwlimit(nwritten);
+ }
+ }
+ return nbytes - nleft;
+}
+
+/*
+ * Read a nbytes from the network.
+ * It is possible that the total bytes require in several
+ * read requests
+ */
+
+int32_t BSOCKCORE::read_nbytes(char *ptr, int32_t nbytes)
+{
+ int32_t nleft, nread;
+
+#ifdef HAVE_TLS
+ if (tls) {
+ /* TLS enabled */
+ return (tls_bsock_readn((BSOCK*)this, ptr, nbytes));
+ }
+#endif /* HAVE_TLS */
+
+ nleft = nbytes;
+ while (nleft > 0) {
+ errno = 0;
+ nread = socketRead(m_fd, ptr, nleft);
+ if (is_timed_out() || is_terminated()) {
+ return -1;
+ }
+
+#ifdef HAVE_WIN32
+ /*
+ * We simulate errno on Windows for a socket
+ * error in order to handle errors correctly.
+ */
+ if (nread == SOCKET_ERROR) {
+ DWORD err = WSAGetLastError();
+ nread = -1;
+ if (err == WSAEINTR) {
+ errno = EINTR;
+ } else if (err == WSAEWOULDBLOCK) {
+ errno = EAGAIN;
+ } else {
+ errno = EIO; /* some other error */
+ }
+ }
+#endif
+
+ if (nread == -1) {
+ if (errno == EINTR) {
+ continue;
+ }
+ if (errno == EAGAIN) {
+ bmicrosleep(0, 20000); /* try again in 20ms */
+ continue;
+ }
+ }
+ if (nread <= 0) {
+ return -1; /* error, or EOF */
+ }
+ nleft -= nread;
+ ptr += nread;
+ if (use_bwlimit()) {
+ control_bwlimit(nread);
+ }
+ }
+ return nbytes - nleft; /* return >= 0 */
+}
+
+#ifdef HAVE_WIN32
+/*
+ * closesocket is supposed to do a graceful disconnect under Window
+ * but it doesn't. Comments on http://msdn.microsoft.com/en-us/li
+ * confirm this behaviour. DisconnectEx is required instead, but
+ * that function needs to be retrieved via WS IOCTL
+ */
+static void
+win_close_wait(int fd)
+{
+ int ret;
+ GUID disconnectex_guid = WSAID_DISCONNECTEX;
+ DWORD bytes_returned;
+ LPFN_DISCONNECTEX DisconnectEx;
+ ret = WSAIoctl(fd, SIO_GET_EXTENSION_FUNCTION_POINTER, &disconnectex_guid, sizeof(disconnectex_guid), &DisconnectEx, sizeof(DisconnectEx), &bytes_returned, NULL, NULL);
+ Dmsg1(100, "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER, WSAID_DISCONNECTEX) ret = %d\n", ret);
+ if (!ret) {
+ DisconnectEx(fd, NULL, 0, 0);
+ }
+}
+#endif
+
+#ifndef TEST_PROGRAM
+#define TEST_PROGRAM_A
+#endif
+
+void BSOCKCORE::dump()
+{
+#ifdef TEST_PROGRAM
+ char ed1[50];
+ Pmsg1(-1, "BSOCKCORE::dump(): %p\n", this);
+ Pmsg1(-1, "\tmsg: %p\n", msg);
+ Pmsg1(-1, "\terrmsg: %p\n", errmsg);
+ Pmsg1(-1, "\tres: %p\n", res);
+ Pmsg1(-1, "\ttls: %p\n", tls);
+ Pmsg1(-1, "\tsrc_addr: %p\n", src_addr);
+ Pmsg1(-1, "\tread_seqno: %s\n", edit_uint64(read_seqno, ed1));
+ Pmsg1(-1, "\tin_msg_no: %s\n", edit_uint64(in_msg_no, ed1));
+ Pmsg1(-1, "\tout_msg_no: %s\n", edit_uint64(out_msg_no, ed1));
+ Pmsg1(-1, "\tpout_msg_no: %p\n", pout_msg_no);
+ Pmsg1(-1, "\tmsglen: %s\n", edit_int64(msglen, ed1));
+ Pmsg1(-1, "\ttimer_start: %ld\n", timer_start);
+ Pmsg1(-1, "\ttimeout: %ld\n", timeout);
+ Pmsg1(-1, "\tm_fd: %d\n", m_fd);
+ Pmsg1(-1, "\tb_errno: %d\n", b_errno);
+ Pmsg1(-1, "\tm_blocking: %d\n", m_blocking);
+ Pmsg1(-1, "\terrors: %d\n", errors);
+ Pmsg1(-1, "\tm_suppress_error_msgs: %s\n", m_suppress_error_msgs?"true":"false");
+// Pmsg1(0, "\tclient_addr:{ } struct sockaddr client_addr; /* client's IP address */
+// Pmsg1(0, "\tstruct sockaddr_in peer_addr; /* peer's IP address */
+ Pmsg1(-1, "\tsend_hook_cb: %p\n", send_hook_cb);
+ Pmsg1(-1, "\tm_master: %p\n", m_master);
+ Pmsg1(-1, "\tm_next: %p\n", m_next);
+ Pmsg1(-1, "\tm_jcr: %p\n", m_jcr);
+// pthread_mutex_t m_rmutex; /* for read locking if use_locking set */
+// pthread_mutex_t m_wmutex; /* for write locking if use_locking set */
+// mutable pthread_mutex_t m_mmutex; /* when accessing the master/next chain */
+// pthread_mutex_t *pm_rmutex; /* Pointer to the read mutex */
+// pthread_mutex_t *pm_wmutex; /* Pointer to the write mutex */
+ Pmsg1(-1, "\tm_who: %p\n", m_who);
+ Pmsg1(-1, "\tm_host: %p\n", m_host);
+ Pmsg1(-1, "\tm_port: %d\n", m_port);
+ Pmsg1(-1, "\tm_tid: %p\n", m_tid);
+ Pmsg1(-1, "\tm_flags: %s\n", edit_uint64(m_flags, ed1));
+ Pmsg1(-1, "\tm_timed_out: %s\n", m_timed_out?"true":"false");
+ Pmsg1(-1, "\tm_terminated: %s\n", m_terminated?"true":"false");
+ Pmsg1(-1, "\tm_closed: %s\n", m_closed?"true":"false");
+ Pmsg1(-1, "\tm_duped: %s\n", m_duped?"true":"false");
+ Pmsg1(-1, "\tm_use_locking: %s\n", m_use_locking?"true":"false");
+ Pmsg1(-1, "\tm_bwlimit: %s\n", edit_int64(m_bwlimit, ed1));
+ Pmsg1(-1, "\tm_nb_bytes: %s\n", edit_int64(m_nb_bytes, ed1));
+ Pmsg1(-1, "\tm_last_tick: %s\n", edit_int64(m_last_tick, ed1));
+#endif
+};
+
+#ifdef TEST_PROGRAM
+
+void terminate(int sig){};
+void free_my_jcr(JCR *jcr){
+ /* TODO: handle full JCR free */
+ free_jcr(jcr);
+};
+
+int main()
+{
+ BSOCKCORE *bs;
+ pid_t pid;
+ int rc;
+ char *host = (char*)"localhost";
+ char *name = (char*)"Test";
+ JCR *jcr;
+
+ debug_level = 500;
+ my_name_is(0, NULL, "bsockcore_test");
+ init_signals(terminate);
+ lmgr_init_thread(); /* initialize the lockmanager stack */
+
+ jcr = new_jcr(sizeof(JCR), NULL);
+ bs = New(BSOCKCORE);
+ Pmsg0(0, "Initialize ...\n");
+ bs->set_jcr(jcr);
+ bs->dump();
+
+ pid = fork();
+ if (0 == pid){
+ Pmsg0(0, "prepare to execute netcat\n");
+ rc = execl("/bin/netcat", "netcat", "-p", "20000", "-l", NULL);
+ Pmsg1(0, "Error executing netcat: %s\n", strerror(rc));
+ exit(1);
+ }
+ Pmsg1(0, "after fork: %d\n", pid);
+ bmicrosleep(1, 0);
+ if (bs->connect(jcr, 1, 10, 0, name, host, NULL, 20000, 0)) {
+ /* connected */
+ Pmsg0(0, "connected ...\n");
+ bs->dump();
+ } else {
+ Pmsg1(0, "connection error: %s\n", bs->bstrerror());
+ }
+ kill(pid, SIGTERM);
+ delete(bs);
+ free_my_jcr(jcr);
+ lmgr_cleanup_main();
+ // sm_dump(false);
+};
+#endif /* TEST_PROGRAM */
--- /dev/null
+/*
+ Bacula(R) - The Network Backup Solution
+
+ Copyright (C) 2000-2017 Kern Sibbald
+
+ The original author of Bacula is Kern Sibbald, with contributions
+ from many others, a complete list can be found in the file AUTHORS.
+
+ You may use this file and others of this release according to the
+ license defined in the LICENSE file, which includes the Affero General
+ Public License, v3.0 ("AGPLv3") and some additional permissions and
+ terms pursuant to its AGPLv3 Section 7.
+
+ This notice must be preserved when any source code is
+ conveyed and/or propagated.
+
+ Bacula(R) is a registered trademark of Kern Sibbald.
+*/
+/*
+ * Bacula Core Sock Class definition
+ *
+ * Kern Sibbald, May MM
+ *
+ * Major refactoring of BSOCK code written by:
+ *
+ * Radosław Korzeniewski, MMXVIII
+ * radoslaw@korzeniewski.net, radekk@inteos.pl
+ * Inteos Sp. z o.o. http://www.inteos.pl/
+ *
+ * This is a common class for socket network communication derived from
+ * BSOCK class. It acts as a base class for non-Bacula network communication
+ * and as a base class for standard BSOCK implementation. Basically the BSOCK
+ * class did not changed its functionality for any Bacula specific part.
+ * Now you can use a BSOCKCLASS for other network communication.
+ */
+
+#ifndef __BSOCKCORE_H_
+#define __BSOCKCORE_H_
+
+#define BSOCKCORE_TIMEOUT 3600 * 24 * 5; /* default 5 days */
+
+struct btimer_t; /* forward reference */
+class BSOCKCORE;
+btimer_t *start_bsock_timer(BSOCKCORE *bs, uint32_t wait);
+void stop_bsock_timer(btimer_t *wid);
+void dump_bsock_msg(int sock, uint32_t msgno, const char *what, uint32_t rc, int32_t pktsize, uint32_t flags,
+ POOLMEM *msg, int32_t msglen);
+
+class BSOCKCallback {
+public:
+ BSOCKCallback();
+ virtual ~BSOCKCallback();
+ virtual bool bsock_send_cb() = 0;
+};
+
+class BSOCKCORE: public SMARTALLOC {
+/*
+ * Note, keep this public part before the private otherwise
+ * bat breaks on some systems such as RedHat.
+ */
+public:
+ POOLMEM *msg; /* message pool buffer */
+ POOLMEM *errmsg; /* edited error message */
+ RES *res; /* Resource to which we are connected */
+ TLS_CONNECTION *tls; /* associated tls connection */
+ IPADDR *src_addr; /* IP address to source connections from */
+ uint64_t read_seqno; /* read sequence number */
+ uint32_t in_msg_no; /* input message number */
+ uint32_t out_msg_no; /* output message number */
+ uint32_t *pout_msg_no; /* pointer to the above */
+ int32_t msglen; /* message length */
+ volatile time_t timer_start; /* time started read/write */
+ volatile time_t timeout; /* timeout BSOCKCORE after this interval */
+ int m_fd; /* socket file descriptor */
+ int b_errno; /* bsockcore errno */
+ int m_blocking; /* blocking state (0 = nonblocking, 1 = blocking) */
+ volatile int errors; /* incremented for each error on socket */
+ volatile bool m_suppress_error_msgs; /* set to suppress error messages */
+ /* when "installed", send_hook_cb->bsock_send_cb() is called before
+ * any ::send(). */
+ BSOCKCallback *send_hook_cb;
+ struct sockaddr client_addr; /* client's IP address */
+ struct sockaddr_in peer_addr; /* peer's IP address */
+
+protected:
+ /* m_master is used by "duped" BSOCKCORE to access some attributes of the "parent"
+ * thread to have an up2date status (for example when the job is canceled,
+ * the "parent" BSOCKCORE is "terminated", but the duped BSOCKCORE is unchanged)
+ * In the future more attributes and method could use the "m_master"
+ * indirection.
+ * master->m_rmutex could replace pm_rmutex, idem for the (w)rite" mutex
+ * "m_master->error" should be incremented instead of "error", but
+ * this require a lock.
+ *
+ * USAGE: the parent thread MUST be sure that the child thread have quit
+ * before to free the "parent" BSOCKCORE.
+ */
+ BSOCKCORE *m_next; /* next BSOCKCORE if duped (not actually used) */
+ JCR *m_jcr; /* jcr or NULL for error msgs */
+ pthread_mutex_t m_rmutex; /* for read locking if use_locking set */
+ pthread_mutex_t m_wmutex; /* for write locking if use_locking set */
+ mutable pthread_mutex_t m_mmutex; /* when accessing the master/next chain */
+ pthread_mutex_t *pm_rmutex; /* Pointer to the read mutex */
+ pthread_mutex_t *pm_wmutex; /* Pointer to the write mutex */
+ char *m_who; /* Name of daemon to which we are talking */
+ char *m_host; /* Host name/IP */
+ int m_port; /* desired port */
+ btimer_t *m_tid; /* timer id */
+ uint32_t m_flags; /* Special flags */
+ volatile bool m_timed_out: 1; /* timed out in read/write */
+ volatile bool m_terminated: 1; /* set when BNET_TERMINATE arrives */
+ bool m_closed: 1; /* set when socket is closed */
+ bool m_duped: 1; /* set if duped BSOCKCORE */
+ bool m_use_locking; /* set to use locking (out of a bitfield */
+ /* to avoid race conditions) */
+ int64_t m_bwlimit; /* set to limit bandwidth */
+ int64_t m_nb_bytes; /* bytes sent/recv since the last tick */
+ btime_t m_last_tick; /* last tick used by bwlimit */
+
+ void fin_init(JCR * jcr, int sockfd, const char *who, const char *host, int port,
+ struct sockaddr *lclient_addr);
+ virtual bool open(JCR *jcr, const char *name, char *host, char *service,
+ int port, utime_t heart_beat, int *fatal);
+ void master_lock() const { if (m_use_locking) pP((&m_mmutex)); };
+ void master_unlock() const { if (m_use_locking) pV((&m_mmutex)); };
+ virtual void init();
+ void _destroy(); /* called by destroy() */
+ virtual int32_t write_nbytes(char *ptr, int32_t nbytes);
+ virtual int32_t read_nbytes(char *ptr, int32_t nbytes);
+
+public:
+ BSOCKCORE *m_master; /* "this" or the "parent" BSOCK if duped */
+ /* methods -- in bsockcore.c */
+ BSOCKCORE();
+ virtual ~BSOCKCORE();
+ void free_tls();
+ bool connect(JCR * jcr, int retry_interval, utime_t max_retry_time,
+ utime_t heart_beat, const char *name, char *host,
+ char *service, int port, int verbose);
+ virtual int32_t recv(int len);
+ virtual bool send();
+ bool fsend(const char*, ...);
+ void close(); /* close connection and destroy packet */
+ void destroy(); /* destroy socket packet */
+ const char *bstrerror(); /* last error on socket */
+ int get_peer(char *buf, socklen_t buflen);
+ bool set_buffer_size(uint32_t size, int rw);
+ int set_nonblocking();
+ int set_blocking();
+ void restore_blocking(int flags);
+ void set_killable(bool killable);
+ int wait_data(int sec, int msec=0);
+ int wait_data_intr(int sec, int msec=0);
+ bool set_locking();
+ void clear_locking();
+ void set_source_address(dlist *src_addr_list);
+ void control_bwlimit(int bytes);
+
+ /* Inline functions */
+ void suppress_error_messages(bool flag) { m_suppress_error_msgs = flag; };
+ void set_jcr(JCR *jcr) { m_jcr = jcr; };
+ void set_who(char *who) { m_who = who; };
+ void set_host(char *host) { m_host = host; };
+ void set_port(int port) { m_port = port; };
+ char *who() const { return m_who; };
+ char *host() const { return m_host; };
+ int port() const { return m_port; };
+ JCR *jcr() const { return m_jcr; };
+ JCR *get_jcr() const { return m_jcr; };
+ bool is_duped() const { return m_duped; };
+ bool is_terminated() const { return m_terminated; };
+ bool is_timed_out() const { return m_timed_out; };
+ bool is_closed() const { return m_closed; };
+ bool is_open() const { return !m_closed; };
+ bool is_stop() const { return errors || is_terminated() || is_closed(); };
+ bool is_error() { errno = b_errno; return errors; };
+ void set_bwlimit(int64_t maxspeed) { m_bwlimit = maxspeed; };
+ bool use_bwlimit() { return m_bwlimit > 0;};
+ void set_duped() { m_duped = true; };
+ void set_master(BSOCKCORE *master) {
+ master_lock();
+ m_master = master;
+ m_next = master->m_next;
+ master->m_next = this;
+ master_unlock();
+ };
+ void set_timed_out() { m_timed_out = true; };
+ void clear_timed_out() { m_timed_out = false; };
+ void set_terminated() { m_terminated = true; };
+ void set_closed() { m_closed = true; };
+ void start_timer(int sec) { m_tid = start_bsock_timer(this, sec); };
+ void stop_timer() { stop_bsock_timer(m_tid); };
+ void swap_msgs();
+ void install_send_hook_cb(BSOCKCallback *obj) { send_hook_cb=obj; };
+ void uninstall_send_hook_cb() { send_hook_cb=NULL; };
+ void cancel(); /* call it when JCR is canceled */
+#ifdef HAVE_WIN32
+ int socketRead(int fd, void *buf, size_t len) { return ::recv(fd, (char *)buf, len, 0); };
+ int socketWrite(int fd, void *buf, size_t len) { return ::send(fd, (const char*)buf, len, 0); };
+ int socketClose(int fd) { return ::closesocket(fd); };
+#else
+ int socketRead(int fd, void *buf, size_t len) { return ::read(fd, buf, len); };
+ int socketWrite(int fd, void *buf, size_t len) { return ::write(fd, buf, len); };
+ int socketClose(int fd) { return ::close(fd); };
+#endif
+ void dump();
+};
+
+/*
+ * Completely release the socket packet, and NULL the pointer
+ */
+#define free_bsockcore(a) do{if(a){(a)->destroy(); (a)=NULL;}} while(0)
+
+/*
+ * Does the socket exist and is it open?
+ */
+#define is_bsockcore_open(a) ((a) && (a)->is_open())
+
+#endif /* __BSOCKCORE_H_ */
Public License, v3.0 ("AGPLv3") and some additional permissions and
terms pursuant to its AGPLv3 Section 7.
- This notice must be preserved when any source code is
+ This notice must be preserved when any source code is
conveyed and/or propagated.
Bacula(R) is a registered trademark of Kern Sibbald.
* Returns: btimer_t *(pointer to btimer_t struct) on success
* NULL on failure
*/
-btimer_t *start_bsock_timer(BSOCK *bsock, uint32_t wait)
+btimer_t *_start_bsock_timer(BSOCK *bsock, uint32_t wait)
{
btimer_t *wid;
if (wait <= 0) { /* wait should be > 0 */
return wid;
}
+/*
+ * Start a timer on a BSOCK. kill it after wait seconds.
+ *
+ * Returns: btimer_t *(pointer to btimer_t struct) on success
+ * NULL on failure
+ */
+btimer_t *start_bsock_timer(BSOCKCORE *bsock, uint32_t wait)
+{
+ return _start_bsock_timer((BSOCK*)bsock, wait);
+};
+
+/*
+ * Start a timer on a BSOCK. kill it after wait seconds.
+ *
+ * Returns: btimer_t *(pointer to btimer_t struct) on success
+ * NULL on failure
+ */
+btimer_t *start_bsock_timer(BSOCK *bsock, uint32_t wait)
+{
+ return _start_bsock_timer(bsock, wait);
+};
+
/*
* Stop bsock timer
*/
Public License, v3.0 ("AGPLv3") and some additional permissions and
terms pursuant to its AGPLv3 Section 7.
- This notice must be preserved when any source code is
+ This notice must be preserved when any source code is
conveyed and/or propagated.
Bacula(R) is a registered trademark of Kern Sibbald.
#include "bjson.h"
#include "tls.h"
#include "address_conf.h"
+#include "bsockcore.h"
#include "bsock.h"
#include "workq.h"
#ifndef HAVE_FNMATCH
bool tls_bsock_accept (BSOCK *bsock);
int tls_bsock_writen (BSOCK *bsock, char *ptr, int32_t nbytes);
int tls_bsock_readn (BSOCK *bsock, char *ptr, int32_t nbytes);
-bool tls_bsock_probe (BSOCK *bsock);
+bool tls_bsock_probe (BSOCKCORE *bsock);
#endif /* HAVE_TLS */
bool tls_bsock_connect (BSOCK *bsock);
-void tls_bsock_shutdown (BSOCK *bsock);
+void tls_bsock_shutdown (BSOCKCORE *bsock);
void free_tls_connection (TLS_CONNECTION *tls);
bool get_tls_require (TLS_CONTEXT *ctx);
bool get_tls_enable (TLS_CONTEXT *ctx);
Public License, v3.0 ("AGPLv3") and some additional permissions and
terms pursuant to its AGPLv3 Section 7.
- This notice must be preserved when any source code is
+ This notice must be preserved when any source code is
conveyed and/or propagated.
Bacula(R) is a registered trademark of Kern Sibbald.
char issuer[256];
char subject[256];
- if (err == X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT ||
- err == X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN)
- {
- Jmsg0(NULL, M_ERROR, 0, _("CA certificate is self signed. With OpenSSL 1.1, enforce basicConstraints = CA:true in the certificate creation to avoid this issue.\n"));
- } else if (err == X509_V_ERR_INVALID_CA) {
- Jmsg0(NULL, M_ERROR, 0, _("CA certificate is invalid (possibly self signed). With OpenSSL 1.1, enforce basicConstraints = CA:true in the certificate creation to avoid this issue.\n"));
- }
-
-
X509_NAME_oneline(X509_get_issuer_name(cert), issuer, 256);
X509_NAME_oneline(X509_get_subject_name(cert), subject, 256);
#endif
/* Use SSL_OP_ALL to turn on all "rather harmless" workarounds that
- * OpenSSL offers
+ * OpenSSL offers
*/
SSL_CTX_set_options(ctx->openssl, SSL_OP_ALL);
/*
* Shutdown TLS_CONNECTION instance
*/
-void tls_bsock_shutdown(BSOCK *bsock)
+void tls_bsock_shutdown(BSOCKCORE *bsock)
{
/*
* SSL_shutdown must be called twice to fully complete the process -
}
/* test if 4 bytes can be read without "blocking" */
-bool tls_bsock_probe(BSOCK *bsock)
+bool tls_bsock_probe(BSOCKCORE *bsock)
{
int32_t pktsiz;
return SSL_peek(bsock->tls->openssl, &pktsiz, sizeof(pktsiz))==sizeof(pktsiz);