# reported and enqueued by the worker threads.
my $queue :shared = new Thread::Queue or die "Could not create new, empty queue. $!";;
-# Array to store all currently running worker objects.
+# Hash to store all currently running worker objects and their corresponding files.
# (Does not include the socket thread)
-my @running_workers;
+my %running_workers;
+
+# Variable to store if the workers should pause or compute.
+my $workers_pause :shared = 0;
# Check if guardian should be daemonized or keep in the foreground.
unless (defined($cmdargs{"foreground"})) {
# Infinite loop.
while(1) {
- # Check for any events and perform them, if there
- # is a least one.
- if ($watcher->read) {
- my @message = ();
-
- # Obtain fileposition from hash.
- my $fileposition = $file_positions{$file};
-
- # Open the file.
- open (FILE, $file) or die "Could not open $file. $!";
+ # Check if the workers should pause or perform it's desired work.
+ if ($workers_pause) {
+ # Wait 1 second until the next check.
+ sleep(1);
+ } else {
+ # Check for any events and perform them, if there
+ # is a least one.
+ if ($watcher->read) {
+ my @message = ();
- # Seek to the last known position.
- seek (FILE, $fileposition, 0);
+ # Obtain fileposition from hash.
+ my $fileposition = $file_positions{$file};
- # Get the log message.
- while (my $line = <FILE>) {
- # Remove any newlines.
- chomp $line;
+ # Open the file.
+ open (FILE, $file) or die "Could not open $file. $!";
- # Add all lines to the message array.
- push (@message, $line);
- }
+ # Seek to the last known position.
+ seek (FILE, $fileposition, 0);
- {
- # Lock shared hash.
- lock(%file_positions);
+ # Get the log message.
+ while (my $line = <FILE>) {
+ # Remove any newlines.
+ chomp $line;
- # Update fileposition.
- $file_positions{$file} = tell(FILE);
- }
-
- # Close file.
- close(FILE);
+ # Add all lines to the message array.
+ push (@message, $line);
+ }
- # Send filename and message to the parser,
- # which will return if any actions have to be performed.
- my @actions = &Guardian::Parser::Parser("$parser", @message);
+ {
+ # Lock shared hash.
+ lock(%file_positions);
- # Send the action to the main process and put it into
- # the queue.
- if (@actions) {
- # Lock the queue.
- lock($queue);
+ # Update fileposition.
+ $file_positions{$file} = tell(FILE);
+ }
- # Loop through the actions array, and perform
- # every single action.
- foreach my $action (@actions) {
- # Prevent from enqueuing empty actions.
- if (defined($action)) {
- # Put the required action into the queue.
- $queue->enqueue($action);
+ # Close file.
+ close(FILE);
+
+ # Send filename and message to the parser,
+ # which will return if any actions have to be performed.
+ my @actions = &Guardian::Parser::Parser("$parser", @message);
+
+ # Send the action to the main process and put it into
+ # the queue.
+ if (@actions) {
+ # Lock the queue.
+ lock($queue);
+
+ # Loop through the actions array, and perform
+ # every single action.
+ foreach my $action (@actions) {
+ # Prevent from enqueuing empty actions.
+ if (defined($action)) {
+ # Put the required action into the queue.
+ $queue->enqueue($action);
+ }
}
}
+ } else {
+ # Sleep for 10ms until the next round of the loop will start.
+ sleep(0.01);
}
- } else {
- # Sleep for 10ms until the next round of the loop will start.
- sleep(0.01);
}
}
}
# Loop through the hash which contains the monitored files and start
# a worker thread for each single one.
foreach my $file (keys %monitored_files) {
- $logger->Log("debug", "Starting worker thread for $file");
- # Create worker thread for the file.
- push @running_workers, threads->create(\&Worker,$file);
+ # Check if an worker allready is running for this file.
+ # If not, start the worker.
+ unless (exists($running_workers{$file})) {
+ $logger->Log("debug", "Starting worker thread for $file");
+
+ # Create worker thread for the file.
+ $running_workers{$file} = threads->create(\&Worker,$file);
+ }
}
}
#
sub StopWorkers () {
# Loop through all running workers.
- foreach my $worker (@running_workers) {
- # Send the worker the "KILL" signal and detach the
- # thread so perl can do an automatically clean-up.
- $worker->kill('KILL');
+ foreach my $worker (keys %running_workers) {
+ # Determine if the worker should be stopped.
+ # This happen if the file should not be longer monitored.
+ unless(exists($monitored_files{$worker})) {
+ $logger->Log("debug", "Stopping worker thread for $worker");
+
+ # Send a "KILL" signal to the worker.
+ $running_workers{$worker}->kill('KILL');
+
+ # Remove worker from hash of running workers.
+ delete($running_workers{$worker});
+ }
}
- $logger->Log("debug", "All workers are stopped now...");
+
+ # Get amount of currently running worker threads.
+ if (! keys(%running_workers)) {
+ $logger->Log("debug", "All workers have been stopped...");
+ }
+
+ # Return nothing.
+ return;
+}
+
+#
+## Function to pause all running workers.
+#
+## This function is used to pause all currently running workers.
+#
+sub PauseWorkers() {
+ # Set workers_pause variable to "1".
+ # All workers will be sleep until the variable has been set to "0".
+ $workers_pause = 1;
+
+ # Log paused workers.
+ $logger->Log("debug", "All workers have been paused...");
+
+ # Return nothing.
+ return;
+}
+
+#
+## Function to continue all running workers.
+#
+## This function is used to continue all paused workers.
+#
+sub ResumeWorkers() {
+ # Set workers_suspend variable to "0" - they will continue their work
+ # again.
+ $workers_pause = 0;
+
+ # Log continued workers.
+ $logger->Log("debug", "All workers are working again...");
+
+ # Return nothing.
+ return;
}
#
# Log reload.
$logger->Log("info", "Reload configuration...");
- # Stop all running workers.
- &StopWorkers();
+ # Pause all running workers.
+ &PauseWorkers();
# Re-read configuration file.
%mainsettings = &Guardian::Config::UseConfig($cmdargs{"config"});
# Re-generate hash of monitored files.
%monitored_files = &Guardian::Base::GenerateMonitoredFiles(\%mainsettings, \%monitored_files);
- # Restart the worker threads.
+ # Stop workers if they are not needed anymore.
+ &StopWorkers();
+
+ # Start new worker threads if required.
&StartWorkers();
+
+ # Resume workers.
+ &ResumeWorkers();
+
+ # Return nothing.
+ return;
}
#
#
sub Logrotate () {
# Stop all running workers.
- &StopWorkers();
+ &PauseWorkers();
{
# Lock shared hash.
}
# Restart all worker threads.
- &StartWorkers();
+ &ResumeWorkers();
+
+ # Return nothing.
+ return;
}
#
# Log shutdown.
$logger->Log("info", "Shutting down...");
+ # Reset hash of monitored files.
+ %monitored_files = ();
+
# Stop all workers.
&StopWorkers();