From: Eric Bollengier Date: Fri, 5 Mar 2021 14:55:38 +0000 (+0100) Subject: Add SDPacketCheck in the FileDaemon resource to control the network flow X-Git-Tag: Release-11.3.2~633 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=86292a5c5859bd1a3af4a80c1ad7dd04c7631aa7;p=thirdparty%2Fbacula.git Add SDPacketCheck in the FileDaemon resource to control the network flow --- diff --git a/bacula/src/filed/backup.c b/bacula/src/filed/backup.c index 1c4973446..e0fb6c7e1 100644 --- a/bacula/src/filed/backup.c +++ b/bacula/src/filed/backup.c @@ -67,7 +67,11 @@ bool blast_data_to_storage_daemon(JCR *jcr, char *addr) { BSOCK *sd; bool ok = true; - // TODO landonf: Allow user to specify encryption algorithm + + /* Initialize the poll manager if we have the configuration set */ + if (me->sd_packet_check) { + jcr->sd_packet_mgr = New(bnet_poll_manager(me->sd_packet_check)); + } sd = jcr->store_bsock; @@ -226,6 +230,7 @@ bool blast_data_to_storage_daemon(JCR *jcr, char *addr) crypto_session_end(jcr); + bdelete_and_null(jcr->sd_packet_mgr); Dmsg1(100, "end blast_data ok=%d\n", ok); return ok; @@ -781,6 +786,9 @@ static int send_data(bctx_t &bctx, int stream) if (!process_and_send_data(bctx)) { goto err; } + if (jcr->sd_packet_mgr) { + jcr->sd_packet_mgr->send(jcr, sd); // Send a POLL request if needed + } } /* end while read file data */ goto finish_sending; diff --git a/bacula/src/filed/filed.h b/bacula/src/filed/filed.h index a40bfeb60..6f4305db9 100644 --- a/bacula/src/filed/filed.h +++ b/bacula/src/filed/filed.h @@ -86,6 +86,27 @@ typedef struct { int bacula_client_test_metric2; } fdstatmetrics_t; +class bnet_poll_manager: public SMARTALLOC +{ +private: + pthread_cond_t m_cond; + pthread_mutex_t m_mutex; + int32_t m_check_done; /* small state machine to sync the two threads */ + + int32_t m_count; /* Current value, at 0 we send the POLL */ + int32_t m_check; /* configuration file value */ + +public: + bnet_poll_manager(int32_t val); + ~bnet_poll_manager(); + + void init(int32_t val); + void destroy(); + void send(JCR *jcr, BSOCK *sd); + void recv(JCR *jcr, const char *msg); +}; + + void allow_os_suspensions(); void prevent_os_suspensions(); bool update_permanent_stats(void *data); diff --git a/bacula/src/filed/filed_conf.c b/bacula/src/filed/filed_conf.c index 1720d4e74..bfd79aebb 100644 --- a/bacula/src/filed/filed_conf.c +++ b/bacula/src/filed/filed_conf.c @@ -118,6 +118,7 @@ static RES_ITEM cli_items[] = { {"CommCompression", store_bool, ITEM(res_client.comm_compression), 0, ITEM_DEFAULT, true}, {"DisableCommand", store_alist_str, ITEM(res_client.disable_cmds), 0, 0, 0}, {"MaximumJobErrorCount", store_pint32, ITEM(res_client.max_job_errors), 0, ITEM_DEFAULT, 1000}, + {"SDPacketCheck", store_pint32, ITEM(res_client.sd_packet_check), 0, ITEM_DEFAULT, 0}, #if BEEF {"DedupIndexDirectory", store_dir, ITEM(res_client.dedup_index_dir), 0, 0, 0}, /* deprecated */ {"EnableClientRehydration", store_bool, ITEM(res_client.allow_dedup_cache), 0, ITEM_DEFAULT, false}, diff --git a/bacula/src/filed/filed_conf.h b/bacula/src/filed/filed_conf.h index 0d3e380e8..cc32c7fd8 100644 --- a/bacula/src/filed/filed_conf.h +++ b/bacula/src/filed/filed_conf.h @@ -136,6 +136,7 @@ struct CLIENT { utime_t heartbeat_interval; /* Interval to send heartbeats */ uint32_t max_network_buffer_size; /* max network buf size */ uint32_t max_job_errors; /* Maximum number of errors tolerated by the client to fail the job */ + int32_t sd_packet_check; /* Send a POLL request every X data packets */ bool comm_compression; /* Enable comm line compression */ bool pki_sign; /* Enable Data Integrity Verification via Digital Signatures */ bool pki_encrypt; /* Enable Data Encryption */ diff --git a/bacula/src/filed/heartbeat.c b/bacula/src/filed/heartbeat.c index e8a355fbc..f57fa705d 100644 --- a/bacula/src/filed/heartbeat.c +++ b/bacula/src/filed/heartbeat.c @@ -107,6 +107,7 @@ extern "C" void *sd_heartbeat_thread(void *arg) Dmsg2(100, "Got m=%d BNET_SIG %d from SD\n", m, sd->msglen); } else { Dmsg3(100, "Got m=%d msglen=%d bytes from SD. MSG=%s\n", m, sd->msglen, sd->msg); + jcr->sd_packet_mgr->recv(jcr, sd->msg); // Might be to ack a POLL request } } Dmsg2(200, "wait_intr=%d stop=%d\n", n, sd->is_stop()); diff --git a/bacula/src/filed/job.c b/bacula/src/filed/job.c index 337efdc13..51c4d1754 100644 --- a/bacula/src/filed/job.c +++ b/bacula/src/filed/job.c @@ -3493,3 +3493,82 @@ int response(JCR *jcr, BSOCK *sd, char *resp, const char *cmd) } return 0; } + +/* + * Small helper to manager a POLL request when the heartbeat is started + * When we send POLL, we get a OK message, but if the heartbeat is started + * the message is discarded. The following class is doing the synchronization + * between the two threads after a POLL message. + */ +bnet_poll_manager::bnet_poll_manager(int32_t val) +{ + pthread_cond_init(&m_cond, NULL); + pthread_mutex_init(&m_mutex, NULL); + init(val); +} + +/* Call one time per job in blast_data_to_storage_daemon() */ +void bnet_poll_manager::init(int32_t val) +{ + m_check = val; /* Value of the config file */ + m_count = val; /* Current value */ + m_check_done = 0; /* small state machine to sync the two threads */ +} + +bnet_poll_manager::~bnet_poll_manager() +{ + destroy(); +} + +void bnet_poll_manager::destroy() +{ + pthread_cond_destroy(&m_cond); + pthread_mutex_destroy(&m_mutex); +} + +/* Send a POLL and get the answer every X packets, called in save_data() */ +void bnet_poll_manager::send(JCR *jcr, BSOCK *sd) +{ + int32_t val = m_count; + + if (val == 0) { + Dmsg1(DT_NETWORK|10, "Request a POLL after %d packets...\n", m_check); + m_check_done = 1; /* We sent the request */ + sd->signal(BNET_POLL); + + struct timespec t; + + P(m_mutex); + do { + t.tv_sec = time(NULL) + 5; + t.tv_nsec = 0; + pthread_cond_timedwait(&m_cond, &m_mutex, &t); + } while (m_check_done == 1 && !jcr->is_canceled()); + V(m_mutex); + + if (m_check_done == 2) { + Dmsg0(DT_NETWORK|10, "Got it\n"); + } + m_check_done = 0; + } + + val--; + + if (val < 0) { /* Initialization or loop found */ + val = m_check; + } + + m_count = val; +} + +/* Called from the heartbeat thread */ +void bnet_poll_manager::recv(JCR *jcr, const char *msg) +{ + if (m_check_done == 1 && strncmp(msg, "2000 OK\n", 8) == 0) { + Dmsg0(DT_NETWORK|10, "Wake up the other thread after POLL\n"); + P(m_mutex); + m_check_done = 2; + pthread_cond_signal(&m_cond); + V(m_mutex); + } +} diff --git a/bacula/src/jcr.h b/bacula/src/jcr.h index af457ace9..099746789 100644 --- a/bacula/src/jcr.h +++ b/bacula/src/jcr.h @@ -160,6 +160,7 @@ class htable; class BACL; class BXATTR; class snapshot_manager; +class bnet_poll_manager; struct CRYPTO_CTX { bool pki_sign; /* Enable PKI Signatures? */ @@ -448,6 +449,9 @@ public: uint32_t EndFile; uint32_t StartBlock; uint32_t EndBlock; + + bnet_poll_manager *sd_packet_mgr; /* Manage POLL requests to control the flow */ + int32_t sd_dedup; /* set if SD has dedup */ int32_t sd_hash; /* SD hash type */ int32_t sd_hash_size; /* SD hash size */