From: Stefan Schantl Date: Tue, 24 Nov 2015 13:09:09 +0000 (+0100) Subject: Allow to dynamically start/stop worker threads. X-Git-Tag: 2.0~71 X-Git-Url: http://git.ipfire.org/?p=people%2Fstevee%2Fguardian.git;a=commitdiff_plain;h=9c74e9bbd848b6deda01e65d4edb043758df5fc3 Allow to dynamically start/stop worker threads. All worker threads now dynamically can be started or stopped by calling the responsible functions (StartWorkers, StopWorkers). Signed-off-by: Stefan Schantl --- diff --git a/guardian b/guardian index 2928773..c72a9dc 100644 --- a/guardian +++ b/guardian @@ -73,6 +73,10 @@ my %mainsettings = &Guardian::Config::UseConfig($cmdargs{"config"}); # reported and enqueued by the worker threads. my $queue :shared = new Thread::Queue or die "Could not create new, empty queue. $!\n";; +# Array to store all currently running worker objects. +# (Does not include the socket thread) +my @running_workers; + # Call Init function to initzialize guardian. &Init(); @@ -110,15 +114,8 @@ sub Init () { # Setup IPC mechanism via Socket in an own thread. threads->create(\&Socket); - # Loop through the array of which files should be monitored and - # create a worker thread for each single one. - foreach my $monitored_file (@monitored_files) { - # Check if the file exists and is readable. - if (-r "$monitored_file") { - # Create worker thread for the file. - threads->create(\&Worker,$monitored_file); - } - } + # Start worker threads. + &StartWorkers(); } # @@ -142,7 +139,10 @@ sub Init () { ## shared event queue. # sub Worker ($) { - my $file = @_[0]; + my $file = $_[0]; + + # Signal handler to kill worker. + $SIG{'KILL'} = sub { threads->exit(); }; # Get the fileposition. my $fileposition = &Init_fileposition("$file"); @@ -153,43 +153,53 @@ sub Worker ($) { # Monitor the specified file. $watcher->watch("$file", IN_MODIFY) or die "Could not monitor $file. $!\n"; - # Get all notifications. - while ($watcher->read) { - my @message = (); + # Switch watcher into non-blocking mode. + $watcher->blocking(0); - # Open the file. - open (FILE, $file) or die "Could not open $file. $!\n"; + # Infinite loop. + while(1) { + # Check for any events and perform them, if there + # is a least one. + if ($watcher->read) { + my @message = (); - # Seek to the last known position. - seek (FILE, $fileposition, 0); + # Open the file. + open (FILE, $file) or die "Could not open $file. $!\n"; - # Get the log message. - while (my $line = ) { - # Remove any newlines. - chomp $line; + # Seek to the last known position. + seek (FILE, $fileposition, 0); - # Add all lines to the message array. - push (@message, $line); - } + # Get the log message. + while (my $line = ) { + # Remove any newlines. + chomp $line; - # Update fileposition. - $fileposition = tell(FILE); + # Add all lines to the message array. + push (@message, $line); + } - # Close file. - close(FILE); + # Update fileposition. + $fileposition = tell(FILE); - # Send filename and message to the parser, - # which will return if an action has to be performed. - my @action = &Guardian::Parser::Parser("$file", @message); + # Close file. + close(FILE); - # Send the action to the main process and put it into - # the queue. - if (@action) { - # Lock the queue. - lock($queue); + # Send filename and message to the parser, + # which will return if an action has to be performed. + my @action = &Guardian::Parser::Parser("$file", @message); - # Put the required action into the queue. - $queue->enqueue(@action); + # Send the action to the main process and put it into + # the queue. + if (@action) { + # Lock the queue. + lock($queue); + + # Put the required action into the queue. + $queue->enqueue(@action); + } + } else { + # Sleep for 10ms until the next round of the loop will start. + sleep(0.01); } } } @@ -278,6 +288,40 @@ sub SignalHandler { $SIG{QUIT} = \&Shutdown; } +# +## Function to start the workers (threads) for all monitored files. +# +## This function will loop through the array of monitored files and will +## spawn an own thread based worker for each file. Every created worker will +## be added to the array of running workers. +# +sub StartWorkers () { + # Loop through the array of which files should be monitored and + # create a worker thread for each single one. + foreach my $monitored_file (@monitored_files) { + # Check if the file exists and is readable. + if (-r "$monitored_file") { + # Create worker thread for the file. + push @running_workers, threads->create(\&Worker,$monitored_file); + } + } +} + +# +## Function to stop all running workers. +# +## This function is used to stop all currently running workers and will be +## called when reloading or shutting down guardian. +# +sub StopWorkers () { + # Loop through all running workers. + foreach my $worker (@running_workers) { + # Send the worker the "KILL" signal and detach the + # thread so perl can do an automatically clean-up. + $worker->kill('KILL')->detach(); + } +} + # ## Shutdown function. # @@ -285,6 +329,9 @@ sub SignalHandler { ## by the signal handler when recieving INT (2), QUIT (3) and TERM (15) signals. # sub Shutdown () { + # Stop all workers. + &StopWorkers(); + # Remove socket file on exit. &Guardian::Socket::RemoveSocketFile();