]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-115532 Add kde_random() to the statistic module (#118210)
authorRaymond Hettinger <rhettinger@users.noreply.github.com>
Sat, 4 May 2024 04:13:36 +0000 (23:13 -0500)
committerGitHub <noreply@github.com>
Sat, 4 May 2024 04:13:36 +0000 (23:13 -0500)
Doc/library/statistics.rst
Doc/whatsnew/3.13.rst
Lib/statistics.py
Lib/test/test_statistics.py

index cc72396964342e53f91cb410fe0d2ed7fe008d2d..d5a316e45ee3e20d49f20513f262ad3669ffb3e7 100644 (file)
@@ -77,6 +77,7 @@ or sample.
 :func:`geometric_mean`   Geometric mean of data.
 :func:`harmonic_mean`    Harmonic mean of data.
 :func:`kde`              Estimate the probability density distribution of the data.
+:func:`kde_random`       Random sampling from the PDF generated by kde().
 :func:`median`           Median (middle value) of data.
 :func:`median_low`       Low median of data.
 :func:`median_high`      High median of data.
@@ -311,6 +312,30 @@ However, for reading convenience, most of the examples show sorted sequences.
    .. versionadded:: 3.13
 
 
+.. function:: kde_random(data, h, kernel='normal', *, seed=None)
+
+   Return a function that makes a random selection from the estimated
+   probability density function produced by ``kde(data, h, kernel)``.
+
+   Providing a *seed* allows reproducible selections. In the future, the
+   values may change slightly as more accurate kernel inverse CDF estimates
+   are implemented.  The seed may be an integer, float, str, or bytes.
+
+   A :exc:`StatisticsError` will be raised if the *data* sequence is empty.
+
+   Continuing the example for :func:`kde`, we can use
+   :func:`kde_random` to generate new random selections from an
+   estimated probability density function:
+
+      >>> data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
+      >>> rand = kde_random(data, h=1.5, seed=8675309)
+      >>> new_selections = [rand() for i in range(10)]
+      >>> [round(x, 1) for x in new_selections]
+      [0.7, 6.2, 1.2, 6.9, 7.0, 1.8, 2.5, -0.5, -1.8, 5.6]
+
+   .. versionadded:: 3.13
+
+
 .. function:: median(data)
 
    Return the median (middle value) of numeric data, using the common "mean of
@@ -1148,65 +1173,6 @@ The final prediction goes to the largest posterior. This is known as the
   'female'
 
 
-Sampling from kernel density estimation
-***************************************
-
-The :func:`kde()` function creates a continuous probability density
-function from discrete samples.  Some applications need a way to make
-random selections from that distribution.
-
-The technique is to pick a sample from a bandwidth scaled kernel
-function and recenter the result around a randomly chosen point from
-the input data.  This can be done with any kernel that has a known or
-accurately approximated inverse cumulative distribution function.
-
-.. testcode::
-
-   from random import choice, random, seed
-   from math import sqrt, log, pi, tan, asin, cos, acos
-   from statistics import NormalDist
-
-   kernel_invcdfs = {
-       'normal': NormalDist().inv_cdf,
-       'logistic': lambda p: log(p / (1 - p)),
-       'sigmoid': lambda p: log(tan(p * pi/2)),
-       'rectangular': lambda p: 2*p - 1,
-       'triangular': lambda p: sqrt(2*p) - 1 if p < 0.5 else 1 - sqrt(2 - 2*p),
-       'parabolic': lambda p: 2 * cos((acos(2*p-1) + pi) / 3),
-       'cosine': lambda p: 2*asin(2*p - 1)/pi,
-   }
-
-   def kde_random(data, h, kernel='normal'):
-       'Return a function that samples from kde() smoothed data.'
-       kernel_invcdf = kernel_invcdfs[kernel]
-       def rand():
-           return h * kernel_invcdf(random()) + choice(data)
-       return rand
-
-For example:
-
-.. doctest::
-
-   >>> discrete_samples = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
-   >>> rand = kde_random(discrete_samples, h=1.5)
-   >>> seed(8675309)
-   >>> selections = [rand() for i in range(10)]
-   >>> [round(x, 1) for x in selections]
-   [4.7, 7.4, 1.2, 7.8, 6.9, -1.3, 5.8, 0.2, -1.4, 5.7]
-
-.. testcode::
-    :hide:
-
-    from statistics import kde
-    from math import isclose
-
-    # Verify that cdf / invcdf will round trip
-    xarr = [i/100 for i in range(-100, 101)]
-    for kernel, invcdf in kernel_invcdfs.items():
-        cdf = kde([0.0], h=1.0, kernel=kernel, cumulative=True)
-        for x in xarr:
-            assert isclose(invcdf(cdf(x)), x, abs_tol=1E-9)
-
 ..
    # This modelines must appear within the last ten lines of the file.
    kate: indent-width 3; remove-trailing-space on; replace-tabs on; encoding utf-8;
index d996cf6c4d9b52bf38506b413461b27adbf20337..269a7cc985ad198489d816b6897e81850f444b15 100644 (file)
@@ -745,7 +745,8 @@ statistics
 
 * Add :func:`statistics.kde` for kernel density estimation.
   This makes it possible to estimate a continuous probability density function
-  from a fixed number of discrete samples.
+  from a fixed number of discrete samples.  Also added :func:`statistics.kde_random`
+  for sampling from the estimated probability density function.
   (Contributed by Raymond Hettinger in :gh:`115863`.)
 
 .. _whatsnew313-subprocess:
index fc00891b083dc37d6627a12b6fb827400ba63d3f..f3ce2d8b6b442a5d2145882cefa6e9e43e6ff6d4 100644 (file)
@@ -113,6 +113,7 @@ __all__ = [
     'geometric_mean',
     'harmonic_mean',
     'kde',
+    'kde_random',
     'linear_regression',
     'mean',
     'median',
@@ -138,12 +139,13 @@ from decimal import Decimal
 from itertools import count, groupby, repeat
 from bisect import bisect_left, bisect_right
 from math import hypot, sqrt, fabs, exp, erf, tau, log, fsum, sumprod
-from math import isfinite, isinf, pi, cos, sin, cosh, atan
+from math import isfinite, isinf, pi, cos, sin, tan, cosh, asin, atan, acos
 from functools import reduce
 from operator import itemgetter
 from collections import Counter, namedtuple, defaultdict
 
 _SQRT2 = sqrt(2.0)
+_random = random
 
 # === Exceptions ===
 
@@ -978,11 +980,9 @@ def kde(data, h, kernel='normal', *, cumulative=False):
             return sum(K((x - x_i) / h) for x_i in data) / (n * h)
 
         def cdf(x):
-
             n = len(data)
             return sum(W((x - x_i) / h) for x_i in data) / n
 
-
     else:
 
         sample = sorted(data)
@@ -1078,6 +1078,7 @@ def quantiles(data, *, n=4, method='exclusive'):
         if ld == 1:
             return data * (n - 1)
         raise StatisticsError('must have at least one data point')
+
     if method == 'inclusive':
         m = ld - 1
         result = []
@@ -1086,6 +1087,7 @@ def quantiles(data, *, n=4, method='exclusive'):
             interpolated = (data[j] * (n - delta) + data[j + 1] * delta) / n
             result.append(interpolated)
         return result
+
     if method == 'exclusive':
         m = ld + 1
         result = []
@@ -1096,6 +1098,7 @@ def quantiles(data, *, n=4, method='exclusive'):
             interpolated = (data[j - 1] * (n - delta) + data[j] * delta) / n
             result.append(interpolated)
         return result
+
     raise ValueError(f'Unknown method: {method!r}')
 
 
@@ -1709,3 +1712,97 @@ class NormalDist:
 
     def __setstate__(self, state):
         self._mu, self._sigma = state
+
+
+## kde_random() ##############################################################
+
+def _newton_raphson(f_inv_estimate, f, f_prime, tolerance=1e-12):
+    def f_inv(y):
+        "Return x such that f(x) ≈ y within the specified tolerance."
+        x = f_inv_estimate(y)
+        while abs(diff := f(x) - y) > tolerance:
+            x -= diff / f_prime(x)
+        return x
+    return f_inv
+
+def _quartic_invcdf_estimate(p):
+    sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p)
+    x = (2.0 * p) ** 0.4258865685331 - 1.0
+    if p >= 0.004 < 0.499:
+        x += 0.026818732 * sin(7.101753784 * p + 2.73230839482953)
+    return x * sign
+
+_quartic_invcdf = _newton_raphson(
+    f_inv_estimate = _quartic_invcdf_estimate,
+    f = lambda t: 3/16 * t**5 - 5/8 * t**3 + 15/16 * t + 1/2,
+    f_prime = lambda t: 15/16 * (1.0 - t * t) ** 2)
+
+def _triweight_invcdf_estimate(p):
+    sign, p = (1.0, p) if p <= 1/2 else (-1.0, 1.0 - p)
+    x = (2.0 * p) ** 0.3400218741872791 - 1.0
+    return x * sign
+
+_triweight_invcdf = _newton_raphson(
+    f_inv_estimate = _triweight_invcdf_estimate,
+    f = lambda t: 35/32 * (-1/7*t**7 + 3/5*t**5 - t**3 + t) + 1/2,
+    f_prime = lambda t: 35/32 * (1.0 - t * t) ** 3)
+
+_kernel_invcdfs = {
+    'normal': NormalDist().inv_cdf,
+    'logistic': lambda p: log(p / (1 - p)),
+    'sigmoid': lambda p: log(tan(p * pi/2)),
+    'rectangular': lambda p: 2*p - 1,
+    'parabolic': lambda p: 2 * cos((acos(2*p-1) + pi) / 3),
+    'quartic': _quartic_invcdf,
+    'triweight': _triweight_invcdf,
+    'triangular': lambda p: sqrt(2*p) - 1 if p < 1/2 else 1 - sqrt(2 - 2*p),
+    'cosine': lambda p: 2 * asin(2*p - 1) / pi,
+}
+_kernel_invcdfs['gauss'] = _kernel_invcdfs['normal']
+_kernel_invcdfs['uniform'] = _kernel_invcdfs['rectangular']
+_kernel_invcdfs['epanechnikov'] = _kernel_invcdfs['parabolic']
+_kernel_invcdfs['biweight'] = _kernel_invcdfs['quartic']
+
+def kde_random(data, h, kernel='normal', *, seed=None):
+    """Return a function that makes a random selection from the estimated
+    probability density function created by kde(data, h, kernel).
+
+    Providing a *seed* allows reproducible selections within a single
+    thread.  The seed may be an integer, float, str, or bytes.
+
+    A StatisticsError will be raised if the *data* sequence is empty.
+
+    Example:
+
+    >>> data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
+    >>> rand = kde_random(data, h=1.5, seed=8675309)
+    >>> new_selections = [rand() for i in range(10)]
+    >>> [round(x, 1) for x in new_selections]
+    [0.7, 6.2, 1.2, 6.9, 7.0, 1.8, 2.5, -0.5, -1.8, 5.6]
+
+    """
+    n = len(data)
+    if not n:
+        raise StatisticsError('Empty data sequence')
+
+    if not isinstance(data[0], (int, float)):
+        raise TypeError('Data sequence must contain ints or floats')
+
+    if h <= 0.0:
+        raise StatisticsError(f'Bandwidth h must be positive, not {h=!r}')
+
+    try:
+        kernel_invcdf = _kernel_invcdfs[kernel]
+    except KeyError:
+        raise StatisticsError(f'Unknown kernel name: {kernel!r}')
+
+    prng = _random.Random(seed)
+    random = prng.random
+    choice = prng.choice
+
+    def rand():
+        return choice(data) + h * kernel_invcdf(random())
+
+    rand.__doc__ = f'Random KDE selection with {h=!r} and {kernel=!r}'
+
+    return rand
index 204787a88a9c5fb2b468cd64dbb96ed37c641b56..fe6c59c30dae283c53a460db0f6d5dfa4f49d945 100644 (file)
@@ -2426,6 +2426,86 @@ class TestKDE(unittest.TestCase):
         self.assertEqual(f_hat(-1.0), 1/2)
         self.assertEqual(f_hat(1.0), 1/2)
 
+    def test_kde_kernel_invcdfs(self):
+        kernel_invcdfs = statistics._kernel_invcdfs
+        kde = statistics.kde
+
+        # Verify that cdf / invcdf will round trip
+        xarr = [i/100 for i in range(-100, 101)]
+        for kernel, invcdf in kernel_invcdfs.items():
+            with self.subTest(kernel=kernel):
+                cdf = kde([0.0], h=1.0, kernel=kernel, cumulative=True)
+                for x in xarr:
+                    self.assertAlmostEqual(invcdf(cdf(x)), x, places=5)
+
+    def test_kde_random(self):
+        kde_random = statistics.kde_random
+        StatisticsError = statistics.StatisticsError
+        kernels = ['normal', 'gauss', 'logistic', 'sigmoid', 'rectangular',
+                   'uniform', 'triangular', 'parabolic', 'epanechnikov',
+                   'quartic', 'biweight', 'triweight', 'cosine']
+        sample = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2]
+
+        # Smoke test
+
+        for kernel in kernels:
+            with self.subTest(kernel=kernel):
+                rand = kde_random(sample, h=1.5, kernel=kernel)
+                selections = [rand() for i in range(10)]
+
+        # Check error cases
+
+        with self.assertRaises(StatisticsError):
+            kde_random([], h=1.0)                       # Empty dataset
+        with self.assertRaises(TypeError):
+            kde_random(['abc', 'def'], 1.5)             # Non-numeric data
+        with self.assertRaises(TypeError):
+            kde_random(iter(sample), 1.5)               # Data is not a sequence
+        with self.assertRaises(StatisticsError):
+            kde_random(sample, h=0.0)                   # Zero bandwidth
+        with self.assertRaises(StatisticsError):
+            kde_random(sample, h=0.0)                   # Negative bandwidth
+        with self.assertRaises(TypeError):
+            kde_random(sample, h='str')                 # Wrong bandwidth type
+        with self.assertRaises(StatisticsError):
+            kde_random(sample, h=1.0, kernel='bogus')   # Invalid kernel
+
+        # Test name and docstring of the generated function
+
+        h = 1.5
+        kernel = 'cosine'
+        prng = kde_random(sample, h, kernel)
+        self.assertEqual(prng.__name__, 'rand')
+        self.assertIn(kernel, prng.__doc__)
+        self.assertIn(repr(h), prng.__doc__)
+
+        # Approximate distribution test: Compare a random sample to the expected distribution
+
+        data = [-2.1, -1.3, -0.4, 1.9, 5.1, 6.2, 7.8, 14.3, 15.1, 15.3, 15.8, 17.0]
+        n = 1_000_000
+        h = 1.75
+        dx = 0.1
+
+        def p_expected(x):
+            return F_hat(x + dx) - F_hat(x - dx)
+
+        def p_observed(x):
+            # P(x-dx <= X < x+dx) / (2*dx)
+            i = bisect.bisect_left(big_sample, x - dx)
+            j = bisect.bisect_right(big_sample, x + dx)
+            return (j - i) / len(big_sample)
+
+        for kernel in kernels:
+            with self.subTest(kernel=kernel):
+
+                F_hat = statistics.kde(data, h, kernel, cumulative=True)
+                rand = kde_random(data, h, kernel, seed=8675309**2)
+                big_sample = sorted([rand() for i in range(n)])
+
+                for x in range(-40, 190):
+                    x /= 10
+                    self.assertTrue(math.isclose(p_observed(x), p_expected(x), abs_tol=0.001))
+
 
 class TestQuantiles(unittest.TestCase):