]> git.ipfire.org Git - thirdparty/coreutils.git/commitdiff
shuf: use reservoir-sampling for large or unknown sized inputs
authorAssaf Gordon <assafgordon@gmail.com>
Wed, 6 Mar 2013 23:25:49 +0000 (18:25 -0500)
committerPádraig Brady <P@draigBrady.com>
Mon, 25 Mar 2013 20:07:14 +0000 (20:07 +0000)
Reservoir sampling optimizes selecting K random lines from large or
unknown-sized input: http://en.wikipedia.org/wiki/Reservoir_sampling
Note this also avoids reading any input when -n0 is specified.

* src/shuf.c (main): Use reservoir-sampling when the number of output
lines is known, and the input size is large or unknown.
(input_size): A new function to get the input size for regular files.
(read_input_reservoir_sampling): New function to read lines from input,
keeping only K lines in memory, replacing lines with decreasing prob.
(write_permuted_output_reservoir): New function to output reservoir.
* tests/misc/shuf-reservoir.sh: An expensive_ test using valgrind to
exercise the reservoir-sampling code.
* tests/local.mk: Reference new test.
* NEWS: Mention the improvement.

NEWS
src/shuf.c
tests/local.mk
tests/misc/shuf-reservoir.sh [new file with mode: 0755]

diff --git a/NEWS b/NEWS
index 483669d6e80a3db17fc0b04be3ab632d410c8056..0c2daad3eccbcc20b850ade7218aafd0360f9fd0 100644 (file)
--- a/NEWS
+++ b/NEWS
@@ -25,6 +25,10 @@ GNU coreutils NEWS                                    -*- outline -*-
   inotify for files on those file systems, rather than the default (for unknown
   file system types) of issuing a warning and reverting to polling.
 
+  shuf outputs subsets of large inputs much more efficiently.
+  Reservoir sampling is used to limit memory usage based on the number of
+  outputs, rather than the number of inputs.
+
 ** Build-related
 
   factor now builds on aarch64 based systems [bug introduced in coreutils-8.20]
index 71ac3e60c9868d84f7963ef6f25bb6eb3627e2eb..bbf3a86c273b8fa6fd502ad317fdf08219e64a06 100644 (file)
@@ -25,6 +25,7 @@
 #include "error.h"
 #include "fadvise.h"
 #include "getopt.h"
+#include "linebuffer.h"
 #include "quote.h"
 #include "quotearg.h"
 #include "randint.h"
 
 #define AUTHORS proper_name ("Paul Eggert")
 
+/* For reservoir-sampling, allocate the reservoir lines in batches.  */
+enum { RESERVOIR_LINES_INCREMENT = 1024 };
+
+/* reservoir-sampling introduces CPU overhead for small inputs.
+   So only enable it for inputs >= this limit.
+   This limit was determined using these commands:
+     $ for p in $(seq 7); do src/seq $((10**$p)) > 10p$p.in; done
+     $ for p in $(seq 7); do time shuf-nores -n10 10p$p.in >/dev/null; done
+     $ for p in $(seq 7); do time shuf -n10 10p$p.in >/dev/null; done  .*/
+enum { RESERVOIR_MIN_INPUT = 8192 * 1024 };
+
+
 void
 usage (int status)
 {
@@ -135,6 +148,114 @@ next_line (char *line, char eolbyte, size_t n)
   return p + 1;
 }
 
+/* Return the size of the input if possible or OFF_T_MAX if not.  */
+
+static off_t
+input_size (void)
+{
+  off_t file_size;
+
+  struct stat stat_buf;
+  if (fstat (STDIN_FILENO, &stat_buf) != 0)
+    return OFF_T_MAX;
+  if (usable_st_size (&stat_buf))
+    file_size = stat_buf.st_size;
+  else
+    return OFF_T_MAX;
+
+  off_t input_offset = lseek (STDIN_FILENO, 0, SEEK_CUR);
+  if (input_offset < 0)
+    return OFF_T_MAX;
+
+  file_size -= input_offset;
+
+  return file_size;
+}
+
+/* Read all lines and store up to K permuted lines in *OUT_RSRV.
+   Return the number of lines read, up to a maximum of K.  */
+
+static size_t
+read_input_reservoir_sampling (FILE *in, char eolbyte, size_t k,
+                               struct randint_source *s,
+                               struct linebuffer **out_rsrv)
+{
+  randint n_lines = 0;
+  size_t n_alloc_lines = MIN (k, RESERVOIR_LINES_INCREMENT);
+  struct linebuffer *line = NULL;
+  struct linebuffer *rsrv;
+
+  rsrv = xcalloc (n_alloc_lines, sizeof (struct linebuffer));
+
+  /* Fill the first K lines, directly into the reservoir.  */
+  while (n_lines < k
+         && (line =
+             readlinebuffer_delim (&rsrv[n_lines], in, eolbyte)) != NULL)
+    {
+      n_lines++;
+
+      /* Enlarge reservoir.  */
+      if (n_lines >= n_alloc_lines)
+        {
+          n_alloc_lines += RESERVOIR_LINES_INCREMENT;
+          rsrv = xnrealloc (rsrv, n_alloc_lines, sizeof (struct linebuffer));
+          memset (&rsrv[n_lines], 0,
+                  RESERVOIR_LINES_INCREMENT * sizeof (struct linebuffer));
+        }
+    }
+
+  /* last line wasn't NULL - so there may be more lines to read.  */
+  if (line != NULL)
+    {
+      struct linebuffer dummy;
+      initbuffer (&dummy);  /* space for lines not put in reservoir.  */
+
+      /* Choose the fate of the next line, with decreasing probability (as
+         n_lines increases in size).
+
+         If the line will be used, store it directly in the reservoir.
+         Otherwise, store it in dummy space.
+
+         With 'struct linebuffer', storing into existing buffer will reduce
+         re-allocations (will only re-allocate if the new line is longer than
+         the currently allocated space).  */
+      do
+        {
+          randint j = randint_choose (s, n_lines + 1);  /* 0 .. n_lines.  */
+          line = (j < k) ? (&rsrv[j]) : (&dummy);
+        }
+      while (readlinebuffer_delim (line, in, eolbyte) != NULL && n_lines++);
+
+      if (! n_lines)
+        error (EXIT_FAILURE, EOVERFLOW, _("too many input lines"));
+
+      freebuffer (&dummy);
+    }
+
+  /* no more input lines, or an input error.  */
+  if (ferror (in))
+    error (EXIT_FAILURE, errno, _("read error"));
+
+  *out_rsrv = rsrv;
+  return MIN (k, n_lines);
+}
+
+static int
+write_permuted_output_reservoir (size_t n_lines, struct linebuffer *lines,
+                                 size_t const *permutation)
+{
+  size_t i;
+
+  for (i = 0; i < n_lines; i++)
+    {
+      const struct linebuffer *p = &lines[permutation[i]];
+      if (fwrite (p->buffer, sizeof (char), p->length, stdout) != p->length)
+        return -1;
+    }
+
+  return 0;
+}
+
 /* Read data from file IN.  Input lines are delimited by EOLBYTE;
    silently append a trailing EOLBYTE if the file ends in some other
    byte.  Store a pointer to the resulting array of lines into *PLINE.
@@ -152,6 +273,15 @@ read_input (FILE *in, char eolbyte, char ***pline)
   size_t i;
   size_t n_lines;
 
+  /* TODO: We should limit the amount of data read here,
+     to less than RESERVOIR_MIN_INPUT.  I.E. adjust fread_file() to support
+     taking a byte limit.  We'd then need to ensure we handle a line spanning
+     this boundary.  With that in place we could set use_reservoir_sampling
+     when used==RESERVOIR_MIN_INPUT, and have read_input_reservoir_sampling()
+     call a wrapper function to populate a linebuffer from the internal pline
+     or if none left, stdin.  Doing that would give better performance by
+     avoiding the reservoir CPU overhead when reading < RESERVOIR_MIN_INPUT
+     from a pipe, and allow us to dispense with the input_size() function.  */
   if (!(buf = fread_file (in, &used)))
     error (EXIT_FAILURE, errno, _("read error"));
 
@@ -174,7 +304,7 @@ read_input (FILE *in, char eolbyte, char ***pline)
 }
 
 static int
-write_permuted_output (size_t n_lines, char * const *line, size_t lo_input,
+write_permuted_output (size_t n_lines, char *const *line, size_t lo_input,
                        size_t const *permutation, char eolbyte)
 {
   size_t i;
@@ -182,7 +312,7 @@ write_permuted_output (size_t n_lines, char * const *line, size_t lo_input,
   if (line)
     for (i = 0; i < n_lines; i++)
       {
-        char * const *p = line + permutation[i];
+        char *const *p = line + permutation[i];
         size_t len = p[1] - p[0];
         if (fwrite (p[0], sizeof *p[0], len, stdout) != len)
           return -1;
@@ -209,14 +339,17 @@ main (int argc, char **argv)
   char *random_source = NULL;
   char eolbyte = '\n';
   char **input_lines = NULL;
+  bool use_reservoir_sampling = false;
 
   int optc;
   int n_operands;
   char **operand;
   size_t n_lines;
-  char **line;
+  char **line = NULL;
+  struct linebuffer *reservoir = NULL;
   struct randint_source *randint_source;
   size_t *permutation;
+  int i;
 
   initialize_main (&argc, &argv);
   set_program_name (argv[0]);
@@ -341,17 +474,35 @@ main (int argc, char **argv)
 
       fadvise (stdin, FADVISE_SEQUENTIAL);
 
-      n_lines = read_input (stdin, eolbyte, &input_lines);
-      line = input_lines;
+      if (head_lines != SIZE_MAX && input_size () > RESERVOIR_MIN_INPUT)
+        {
+          use_reservoir_sampling = true;
+          n_lines = SIZE_MAX;   /* unknown number of input lines, for now.  */
+        }
+      else
+        {
+          n_lines = read_input (stdin, eolbyte, &input_lines);
+          line = input_lines;
+        }
     }
 
   head_lines = MIN (head_lines, n_lines);
 
   randint_source = randint_all_new (random_source,
+                                    use_reservoir_sampling ? SIZE_MAX :
                                     randperm_bound (head_lines, n_lines));
   if (! randint_source)
     error (EXIT_FAILURE, errno, "%s", quotearg_colon (random_source));
 
+  if (use_reservoir_sampling)
+    {
+      /* Instead of reading the entire file into 'line',
+         use reservoir-sampling to store just "head_lines" random lines.  */
+      n_lines = read_input_reservoir_sampling (stdin, eolbyte, head_lines,
+                                               randint_source, &reservoir);
+      head_lines = n_lines;
+    }
+
   /* Close stdin now, rather than earlier, so that randint_all_new
      doesn't have to worry about opening something other than
      stdin.  */
@@ -363,8 +514,13 @@ main (int argc, char **argv)
 
   if (outfile && ! freopen (outfile, "w", stdout))
     error (EXIT_FAILURE, errno, "%s", quotearg_colon (outfile));
-  if (write_permuted_output (head_lines, line, lo_input, permutation, eolbyte)
-      != 0)
+
+  if (use_reservoir_sampling)
+    i = write_permuted_output_reservoir (n_lines, reservoir, permutation);
+  else
+    i = write_permuted_output (head_lines, line, lo_input,
+                               permutation, eolbyte);
+  if (i != 0)
     error (EXIT_FAILURE, errno, _("write error"));
 
 #ifdef lint
@@ -375,6 +531,13 @@ main (int argc, char **argv)
       free (input_lines[0]);
       free (input_lines);
     }
+  if (reservoir)
+    {
+      size_t j;
+      for (j = 0; j < n_lines; ++j)
+        freebuffer (&reservoir[j]);
+      free (reservoir);
+    }
 #endif
 
   return EXIT_SUCCESS;
index 607ddc4d9014a2a1e5f8e2d9c135e60fc8b48d05..dc87ef4916d11377120598ef3435fb46259b9409 100644 (file)
@@ -313,6 +313,7 @@ all_tests =                                 \
   tests/misc/shred-passes.sh                   \
   tests/misc/shred-remove.sh                   \
   tests/misc/shuf.sh                           \
+  tests/misc/shuf-reservoir.sh                 \
   tests/misc/sort.pl                           \
   tests/misc/sort-benchmark-random.sh          \
   tests/misc/sort-compress.sh                  \
diff --git a/tests/misc/shuf-reservoir.sh b/tests/misc/shuf-reservoir.sh
new file mode 100755 (executable)
index 0000000..e971c59
--- /dev/null
@@ -0,0 +1,69 @@
+#!/bin/sh
+# Exercise shuf's reservoir-sampling code
+# NOTE:
+#  These tests do not check valid randomness,
+#  they just check memory allocation related code.
+
+# Copyright (C) 2013 Free Software Foundation, Inc.
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+. "${srcdir=.}/tests/init.sh"; path_prepend_ ./src
+print_ver_ shuf
+expensive_
+require_valgrind_
+
+# Run "shuf" with specific number of input lines and output lines
+# Check the output for expected number of lines.
+run_shuf_n()
+{
+  INPUT_LINES="$1"
+  OUTPUT_LINES="$2"
+
+  # Critical memory-related bugs will cause a segfault here
+  # (with varying numbres of input/output lines)
+  seq "$INPUT_LINES" | valgrind --leak-check=full --error-exitcode=1 \
+  shuf -n "$OUTPUT_LINES" -o "out_${INPUT_LINES}_${OUTPUT_LINES}" || return 1
+
+  EXPECTED_LINES="$OUTPUT_LINES"
+  test "$INPUT_LINES" -lt "$OUTPUT_LINES" && EXPECTED_LINES="$INPUT_LINES"
+
+  # There is no sure way to verify shuffled output (as it is random).
+  # Ensure we have the correct number of all numeric lines non duplicated lines.
+  GOOD_LINES=$(grep '^[0-9][0-9]*$' "out_${INPUT_LINES}_${OUTPUT_LINES}" |
+               sort -un | wc -l) || framework_failure_
+  LINES=$(wc -l < "out_${INPUT_LINES}_${OUTPUT_LINES}") || framework_failure_
+
+  test "$EXPECTED_LINES" -eq "$GOOD_LINES" || return 1
+  test "$EXPECTED_LINES" -eq "$LINES" || return 1
+
+  return 0
+}
+
+# Test multiple combinations of input lines and output lines.
+# (e.g. small number of input lines and large number of output lines,
+#  and vice-versa. Also, each reservoir allocation uses a 1024-lines batch,
+#  so test 1023/1024/1025 and related values).
+TEST_LINES="0 1 5 1023 1024 1025 3071 3072 3073"
+
+for IN_N in $TEST_LINES; do
+  for OUT_N in $TEST_LINES; do
+    run_shuf_n "$IN_N" "$OUT_N" || {
+      fail=1
+      echo "shuf-reservoir-sampling failed with IN_N=$IN_N OUT_N=$OUT_N" >&2;
+    }
+  done
+done
+
+Exit $fail