From bac2d500f92d4cc1c19cef8c9a3d6717b963d657 Mon Sep 17 00:00:00 2001 From: Stefan Schantl Date: Fri, 29 Jul 2016 10:34:50 +0200 Subject: [PATCH] Pause/Continue worker threads instead of restarting them. When reloading the configuration or recieving a logrotate event, the running workers now will be paused and afterwards continued. This will save system ressources, because they do not need get killed and restarted anymore. Signed-off-by: Stefan Schantl --- guardian.in | 200 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 139 insertions(+), 61 deletions(-) diff --git a/guardian.in b/guardian.in index c52b832..7671714 100644 --- a/guardian.in +++ b/guardian.in @@ -102,9 +102,12 @@ my %file_positions :shared = (); # reported and enqueued by the worker threads. my $queue :shared = new Thread::Queue or die "Could not create new, empty queue. $!";; -# Array to store all currently running worker objects. +# Hash to store all currently running worker objects and their corresponding files. # (Does not include the socket thread) -my @running_workers; +my %running_workers; + +# Variable to store if the workers should pause or compute. +my $workers_pause :shared = 0; # Check if guardian should be daemonized or keep in the foreground. unless (defined($cmdargs{"foreground"})) { @@ -212,63 +215,69 @@ sub Worker ($) { # Infinite loop. while(1) { - # Check for any events and perform them, if there - # is a least one. - if ($watcher->read) { - my @message = (); - - # Obtain fileposition from hash. - my $fileposition = $file_positions{$file}; - - # Open the file. - open (FILE, $file) or die "Could not open $file. $!"; + # Check if the workers should pause or perform it's desired work. + if ($workers_pause) { + # Wait 1 second until the next check. + sleep(1); + } else { + # Check for any events and perform them, if there + # is a least one. + if ($watcher->read) { + my @message = (); - # Seek to the last known position. - seek (FILE, $fileposition, 0); + # Obtain fileposition from hash. + my $fileposition = $file_positions{$file}; - # Get the log message. - while (my $line = ) { - # Remove any newlines. - chomp $line; + # Open the file. + open (FILE, $file) or die "Could not open $file. $!"; - # Add all lines to the message array. - push (@message, $line); - } + # Seek to the last known position. + seek (FILE, $fileposition, 0); - { - # Lock shared hash. - lock(%file_positions); + # Get the log message. + while (my $line = ) { + # Remove any newlines. + chomp $line; - # Update fileposition. - $file_positions{$file} = tell(FILE); - } - - # Close file. - close(FILE); + # Add all lines to the message array. + push (@message, $line); + } - # Send filename and message to the parser, - # which will return if any actions have to be performed. - my @actions = &Guardian::Parser::Parser("$parser", @message); + { + # Lock shared hash. + lock(%file_positions); - # Send the action to the main process and put it into - # the queue. - if (@actions) { - # Lock the queue. - lock($queue); + # Update fileposition. + $file_positions{$file} = tell(FILE); + } - # Loop through the actions array, and perform - # every single action. - foreach my $action (@actions) { - # Prevent from enqueuing empty actions. - if (defined($action)) { - # Put the required action into the queue. - $queue->enqueue($action); + # Close file. + close(FILE); + + # Send filename and message to the parser, + # which will return if any actions have to be performed. + my @actions = &Guardian::Parser::Parser("$parser", @message); + + # Send the action to the main process and put it into + # the queue. + if (@actions) { + # Lock the queue. + lock($queue); + + # Loop through the actions array, and perform + # every single action. + foreach my $action (@actions) { + # Prevent from enqueuing empty actions. + if (defined($action)) { + # Put the required action into the queue. + $queue->enqueue($action); + } } } + } else { + # Sleep for 10ms until the next round of the loop will start. + sleep(0.01); } - } else { - # Sleep for 10ms until the next round of the loop will start. - sleep(0.01); } } } @@ -350,9 +359,14 @@ sub StartWorkers () { # Loop through the hash which contains the monitored files and start # a worker thread for each single one. foreach my $file (keys %monitored_files) { - $logger->Log("debug", "Starting worker thread for $file"); - # Create worker thread for the file. - push @running_workers, threads->create(\&Worker,$file); + # Check if an worker allready is running for this file. + # If not, start the worker. + unless (exists($running_workers{$file})) { + $logger->Log("debug", "Starting worker thread for $file"); + + # Create worker thread for the file. + $running_workers{$file} = threads->create(\&Worker,$file); + } } } @@ -364,12 +378,61 @@ sub StartWorkers () { # sub StopWorkers () { # Loop through all running workers. - foreach my $worker (@running_workers) { - # Send the worker the "KILL" signal and detach the - # thread so perl can do an automatically clean-up. - $worker->kill('KILL'); + foreach my $worker (keys %running_workers) { + # Determine if the worker should be stopped. + # This happen if the file should not be longer monitored. + unless(exists($monitored_files{$worker})) { + $logger->Log("debug", "Stopping worker thread for $worker"); + + # Send a "KILL" signal to the worker. + $running_workers{$worker}->kill('KILL'); + + # Remove worker from hash of running workers. + delete($running_workers{$worker}); + } } - $logger->Log("debug", "All workers are stopped now..."); + + # Get amount of currently running worker threads. + if (! keys(%running_workers)) { + $logger->Log("debug", "All workers have been stopped..."); + } + + # Return nothing. + return; +} + +# +## Function to pause all running workers. +# +## This function is used to pause all currently running workers. +# +sub PauseWorkers() { + # Set workers_pause variable to "1". + # All workers will be sleep until the variable has been set to "0". + $workers_pause = 1; + + # Log paused workers. + $logger->Log("debug", "All workers have been paused..."); + + # Return nothing. + return; +} + +# +## Function to continue all running workers. +# +## This function is used to continue all paused workers. +# +sub ResumeWorkers() { + # Set workers_suspend variable to "0" - they will continue their work + # again. + $workers_pause = 0; + + # Log continued workers. + $logger->Log("debug", "All workers are working again..."); + + # Return nothing. + return; } # @@ -383,8 +446,8 @@ sub Reload () { # Log reload. $logger->Log("info", "Reload configuration..."); - # Stop all running workers. - &StopWorkers(); + # Pause all running workers. + &PauseWorkers(); # Re-read configuration file. %mainsettings = &Guardian::Config::UseConfig($cmdargs{"config"}); @@ -404,8 +467,17 @@ sub Reload () { # Re-generate hash of monitored files. %monitored_files = &Guardian::Base::GenerateMonitoredFiles(\%mainsettings, \%monitored_files); - # Restart the worker threads. + # Stop workers if they are not needed anymore. + &StopWorkers(); + + # Start new worker threads if required. &StartWorkers(); + + # Resume workers. + &ResumeWorkers(); + + # Return nothing. + return; } # @@ -436,7 +508,7 @@ sub ReloadIgnoreList () { # sub Logrotate () { # Stop all running workers. - &StopWorkers(); + &PauseWorkers(); { # Lock shared hash. @@ -467,7 +539,10 @@ sub Logrotate () { } # Restart all worker threads. - &StartWorkers(); + &ResumeWorkers(); + + # Return nothing. + return; } # @@ -480,6 +555,9 @@ sub Shutdown () { # Log shutdown. $logger->Log("info", "Shutting down..."); + # Reset hash of monitored files. + %monitored_files = (); + # Stop all workers. &StopWorkers(); -- 2.39.2