Pause/Continue worker threads instead of restarting them.
authorStefan Schantl <stefan.schantl@ipfire.org>
Fri, 29 Jul 2016 08:34:50 +0000 (10:34 +0200)
committerStefan Schantl <stefan.schantl@ipfire.org>
Fri, 29 Jul 2016 08:38:25 +0000 (10:38 +0200)
When reloading the configuration or recieving a logrotate event,
the running workers now will be paused and afterwards continued.

This will save system ressources, because they do not need get killed and
restarted anymore.

Signed-off-by: Stefan Schantl <stefan.schantl@ipfire.org>
guardian.in

index c52b832..7671714 100644 (file)
@@ -102,9 +102,12 @@ my %file_positions :shared = ();
 # reported and enqueued by the worker threads.
 my $queue :shared = new Thread::Queue or die "Could not create new, empty queue. $!";;
 
-# Array to store all currently running worker objects.
+# Hash to store all currently running worker objects and their corresponding files.
 # (Does not include the socket thread)
-my @running_workers;
+my %running_workers;
+
+# Variable to store if the workers should pause or compute.
+my $workers_pause :shared = 0;
 
 # Check if guardian should be daemonized or keep in the foreground.
 unless (defined($cmdargs{"foreground"})) {
@@ -212,63 +215,69 @@ sub Worker ($) {
 
        # Infinite loop.
        while(1) {
-               # Check for any events and perform them, if there
-               # is a least one.
-               if ($watcher->read) {
-                       my @message = ();
-
-                       # Obtain fileposition from hash.
-                       my $fileposition = $file_positions{$file};
-
-                       # Open the file.
-                       open (FILE, $file) or die "Could not open $file. $!";
+               # Check if the workers should pause or perform it's desired work.
+               if ($workers_pause) {
+                       # Wait 1 second until the next check.
+                       sleep(1);
+               } else {
+                       # Check for any events and perform them, if there
+                       # is a least one.
+                       if ($watcher->read) {
+                               my @message = ();
 
-                       # Seek to the last known position.
-                       seek (FILE, $fileposition, 0);
+                               # Obtain fileposition from hash.
+                               my $fileposition = $file_positions{$file};
 
-                       # Get the log message.
-                       while (my $line = <FILE>) {
-                               # Remove any newlines.
-                               chomp $line;
+                               # Open the file.
+                               open (FILE, $file) or die "Could not open $file. $!";
 
-                               # Add all lines to the message array.
-                               push (@message, $line);
-                       }
+                               # Seek to the last known position.
+                               seek (FILE, $fileposition, 0);
 
-                       {
-                               # Lock shared hash.
-                               lock(%file_positions);
+                               # Get the log message.
+                               while (my $line = <FILE>) {
+                                       # Remove any newlines.
+                                       chomp $line;
 
-                               # Update fileposition.
-                               $file_positions{$file} = tell(FILE);
-                       }
-
-                       # Close file.
-                       close(FILE);
+                                       # Add all lines to the message array.
+                                       push (@message, $line);
+                               }
 
-                       # Send filename and message to the parser,
-                       # which will return if any actions have to be performed.
-                       my @actions = &Guardian::Parser::Parser("$parser", @message);
+                               {
+                                       # Lock shared hash.
+                                       lock(%file_positions);
 
-                       # Send the action to the main process and put it into
-                       # the queue.
-                       if (@actions) {
-                               # Lock the queue.
-                               lock($queue);
+                                       # Update fileposition.
+                                       $file_positions{$file} = tell(FILE);
+                               }
 
-                               # Loop through the actions array, and perform
-                               # every single action.
-                               foreach my $action (@actions) {
-                                       # Prevent from enqueuing empty actions.
-                                       if (defined($action)) {
-                                               # Put the required action into the queue.
-                                               $queue->enqueue($action);
+                               # Close file.
+                               close(FILE);
+
+                               # Send filename and message to the parser,
+                               # which will return if any actions have to be performed.
+                               my @actions = &Guardian::Parser::Parser("$parser", @message);
+
+                               # Send the action to the main process and put it into
+                               # the queue.
+                               if (@actions) {
+                                       # Lock the queue.
+                                       lock($queue);
+
+                                       # Loop through the actions array, and perform
+                                       # every single action.
+                                       foreach my $action (@actions) {
+                                               # Prevent from enqueuing empty actions.
+                                               if (defined($action)) {
+                                                       # Put the required action into the queue.
+                                                       $queue->enqueue($action);
+                                               }
                                        }
                                }
+                       } else {
+                               # Sleep for 10ms until the next round of the loop will start.
+                               sleep(0.01);
                        }
-               } else {
-                       # Sleep for 10ms until the next round of the loop will start.
-                       sleep(0.01);
                }
        }
 }
@@ -350,9 +359,14 @@ sub StartWorkers () {
        # Loop through the hash which contains the monitored files and start
        # a worker thread for each single one.
        foreach my $file (keys %monitored_files) {
-               $logger->Log("debug", "Starting worker thread for $file");
-               # Create worker thread for the file.
-               push @running_workers, threads->create(\&Worker,$file);
+               # Check if an worker allready is running for this file.
+               # If not, start the worker.
+               unless (exists($running_workers{$file})) {
+                       $logger->Log("debug", "Starting worker thread for $file");
+
+                       # Create worker thread for the file.
+                       $running_workers{$file} = threads->create(\&Worker,$file);
+               }
        }
 }
 
@@ -364,12 +378,61 @@ sub StartWorkers () {
 #
 sub StopWorkers () {
        # Loop through all running workers.
-       foreach my $worker (@running_workers) {
-               # Send the worker the "KILL" signal and detach the
-               # thread so perl can do an automatically clean-up.
-               $worker->kill('KILL');
+       foreach my $worker (keys %running_workers) {
+               # Determine if the worker should be stopped.
+               # This happen if the file should not be longer monitored.
+               unless(exists($monitored_files{$worker})) {
+                       $logger->Log("debug", "Stopping worker thread for $worker");
+
+                       # Send a "KILL" signal to the worker.
+                       $running_workers{$worker}->kill('KILL');
+
+                       # Remove worker from hash of running workers.
+                       delete($running_workers{$worker});
+               }
        }
-       $logger->Log("debug", "All workers are stopped now...");
+
+       # Get amount of currently running worker threads.
+       if (! keys(%running_workers)) {
+               $logger->Log("debug", "All workers have been stopped...");
+       }
+
+       # Return nothing.
+       return;
+}
+
+#
+## Function to pause all running workers.
+#
+## This function is used to pause all currently running workers.
+#
+sub PauseWorkers() {
+       # Set workers_pause variable to "1".
+       # All workers will be sleep until the variable has been set to "0".
+       $workers_pause = 1;
+
+       # Log paused workers.
+       $logger->Log("debug", "All workers have been paused...");
+
+       # Return nothing.
+       return;
+}
+
+#
+## Function to continue all running workers.
+#
+## This function is used to continue all paused workers.
+#
+sub ResumeWorkers() {
+       # Set workers_suspend variable to "0" - they will continue their work
+       # again.
+       $workers_pause = 0;
+
+       # Log continued workers.
+       $logger->Log("debug", "All workers are working again...");
+
+       # Return nothing.
+       return;
 }
 
 #
@@ -383,8 +446,8 @@ sub Reload () {
        # Log reload.
        $logger->Log("info", "Reload configuration...");
 
-       # Stop all running workers.
-       &StopWorkers();
+       # Pause all running workers.
+       &PauseWorkers();
 
        # Re-read configuration file.
        %mainsettings = &Guardian::Config::UseConfig($cmdargs{"config"});
@@ -404,8 +467,17 @@ sub Reload () {
        # Re-generate hash of monitored files.
        %monitored_files = &Guardian::Base::GenerateMonitoredFiles(\%mainsettings, \%monitored_files);
 
-       # Restart the worker threads.
+       # Stop workers if they are not needed anymore.
+       &StopWorkers();
+
+       # Start new worker threads if required.
        &StartWorkers();
+
+       # Resume workers.
+       &ResumeWorkers();
+
+       # Return nothing.
+       return;
 }
 
 #
@@ -436,7 +508,7 @@ sub ReloadIgnoreList () {
 #
 sub Logrotate () {
        # Stop all running workers.
-       &StopWorkers();
+       &PauseWorkers();
 
        {
        # Lock shared hash.
@@ -467,7 +539,10 @@ sub Logrotate () {
        }
 
        # Restart all worker threads.
-       &StartWorkers();
+       &ResumeWorkers();
+
+       # Return nothing.
+       return;
 }
 
 #
@@ -480,6 +555,9 @@ sub Shutdown () {
        # Log shutdown.
        $logger->Log("info", "Shutting down...");
 
+       # Reset hash of monitored files.
+       %monitored_files = ();
+
        # Stop all workers.
        &StopWorkers();