from contextlib import ExitStack, contextmanager
from threading import Thread
from test.support import LOOPBACK_TIMEOUT
-from time import sleep, time
+from time import time
from unittest.mock import patch
for line in self.logcat_process.stdout:
self.logcat_queue.put(line.rstrip("\n"))
self.logcat_process.stdout.close()
- Thread(target=logcat_thread).start()
+ self.logcat_thread = Thread(target=logcat_thread)
+ self.logcat_thread.start()
from ctypes import CDLL, c_char_p, c_int
android_log_write = getattr(CDLL("liblog.so"), "__android_log_write")
def tearDown(self):
self.logcat_process.terminate()
self.logcat_process.wait(LOOPBACK_TIMEOUT)
+ self.logcat_thread.join(LOOPBACK_TIMEOUT)
@contextmanager
def unbuffered(self, stream):
):
stream.write(obj)
+
+class TestAndroidRateLimit(unittest.TestCase):
def test_rate_limit(self):
# https://cs.android.com/android/platform/superproject/+/android-14.0.0_r1:system/logging/liblog/include/log/log_read.h;l=39
PER_MESSAGE_OVERHEAD = 28
1024 - PER_MESSAGE_OVERHEAD - len(tag) - len(message.format(0))
) + "\n"
+ # To avoid depending on the performance of the test device, we mock the
+ # passage of time.
+ mock_now = time()
+
+ def mock_time():
+ # Avoid division by zero by simulating a small delay.
+ mock_sleep(0.0001)
+ return mock_now
+
+ def mock_sleep(duration):
+ nonlocal mock_now
+ mock_now += duration
+
# See _android_support.py. The default values of these parameters work
# well across a wide range of devices, but we'll use smaller values to
# ensure a quick and reliable test that doesn't flood the log too much.
with (
patch("_android_support.MAX_BYTES_PER_SECOND", MAX_KB_PER_SECOND * 1024),
patch("_android_support.BUCKET_SIZE", BUCKET_KB * 1024),
+ patch("_android_support.sleep", mock_sleep),
+ patch("_android_support.time", mock_time),
):
# Make sure the token bucket is full.
- sleep(BUCKET_KB / MAX_KB_PER_SECOND)
+ stream.write("Initial message to reset _prev_write_time")
+ mock_sleep(BUCKET_KB / MAX_KB_PER_SECOND)
line_num = 0
# Write BUCKET_KB messages, and return the rate at which they were
# accepted in KB per second.
def write_bucketful():
nonlocal line_num
- start = time()
+ start = mock_time()
max_line_num = line_num + BUCKET_KB
while line_num < max_line_num:
stream.write(message.format(line_num))
line_num += 1
- return BUCKET_KB / (time() - start)
+ return BUCKET_KB / (mock_time() - start)
# The first bucketful should be written with minimal delay. The
# factor of 2 here is not arbitrary: it verifies that the system can
)
# Once the token bucket refills, we should go back to full speed.
- sleep(BUCKET_KB / MAX_KB_PER_SECOND)
+ mock_sleep(BUCKET_KB / MAX_KB_PER_SECOND)
self.assertGreater(write_bucketful(), MAX_KB_PER_SECOND * 2)