Merge bitcoin/bitcoin#33141: test: Remove polling loop from test_runner (take 2)

fa4885ef2f test: Remove polling loop from test_runner (MarcoFalke)

Pull request description:

  (This picks up my prior attempt from https://github.com/bitcoin/bitcoin/pull/13384)

  Currently, the test_runner is using a `time.sleep` before polling to check if any tests have completed. This is largely fine when running a few tests, or when the tests take a long time.

  However, when running many fast tests, this can accumulate and leave the CPU idle for no reason.

  A trivial improvement would be to only sleep when really needed:

  ```diff
  diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py
  index 7c8c15f391..1d9f28cee4 100755
  --- a/test/functional/test_runner.py
  +++ b/test/functional/test_runner.py
  @@ -747,7 +747,6 @@ class TestHandler:
           dot_count = 0
           while True:
               # Return all procs that have finished, if any. Otherwise sleep until there is one.
  -            time.sleep(.5)
               ret = []
               for job in self.jobs:
                   (name, start_time, proc, testdir, log_out, log_err) = job
  @@ -771,6 +770,7 @@ class TestHandler:
                       ret.append((TestResult(name, status, int(time.time() - start_time)), testdir, stdout, stderr, skip_reason))
               if ret:
                   return ret
  +            time.sleep(.5)
               if self.use_term_control:
                   print('.', end='', flush=True)
               dot_count += 1
  ```

  However, ideally there is no sleep at all. So do that by using a `ThreadPoolExecutor`.

  This can be tested via something like:

  ```
  time ./bld-cmake/test/functional/test_runner.py $(for i in {1..200}; do echo -n "tool_rpcauth "; done) -j 200
  ```

  The result should show:

  * Current `master` is the slowest
  * The "sleep patch" from above is a bit faster (1.5x improvement)
  * This pull request is the fastest (2x improvement)

ACKs for top commit:
  achow101:
    ACK fa4885ef2f
  l0rinc:
    tested ACK fa4885ef2f
  Eunovo:
    ReACK fa4885ef2f

Tree-SHA512: f097636c5d9e005781012d8e20c2886cd9968544d4d555b1d2e28982d420ff63fec15cfabb6bd30e4d3c389b8b8350a1ddad721cceaf4b7760cad38b95160175
This commit is contained in:
Ava Chow 2025-09-11 13:05:13 -07:00
commit 176fac0f16
No known key found for this signature in database
GPG Key ID: 17565732E08E5E41
1 changed files with 46 additions and 33 deletions

View File

@ -14,6 +14,7 @@ For a description of arguments recognized by test scripts, see
import argparse
from collections import deque
from concurrent import futures
import configparser
import csv
import datetime
@ -706,15 +707,15 @@ class TestHandler:
"""
Trigger the test scripts passed in via the list.
"""
def __init__(self, *, num_tests_parallel, tests_dir, tmpdir, test_list, flags, use_term_control):
assert num_tests_parallel >= 1
self.executor = futures.ThreadPoolExecutor(max_workers=num_tests_parallel)
self.num_jobs = num_tests_parallel
self.tests_dir = tests_dir
self.tmpdir = tmpdir
self.test_list = test_list
self.flags = flags
self.jobs = []
self.jobs = {}
self.use_term_control = use_term_control
def done(self):
@ -731,47 +732,59 @@ class TestHandler:
test_argv = test.split()
testdir = "{}/{}_{}".format(self.tmpdir, re.sub(".py$", "", test_argv[0]), portseed)
tmpdir_arg = ["--tmpdir={}".format(testdir)]
self.jobs.append((test,
time.time(),
subprocess.Popen([sys.executable, self.tests_dir + test_argv[0]] + test_argv[1:] + self.flags + portseed_arg + tmpdir_arg,
text=True,
stdout=log_stdout,
stderr=log_stderr),
testdir,
log_stdout,
log_stderr))
def proc_wait(task):
task[2].wait()
return task
task = [
test,
time.time(),
subprocess.Popen(
[sys.executable, self.tests_dir + test_argv[0]] + test_argv[1:] + self.flags + portseed_arg + tmpdir_arg,
text=True,
stdout=log_stdout,
stderr=log_stderr,
),
testdir,
log_stdout,
log_stderr,
]
fut = self.executor.submit(proc_wait, task)
self.jobs[fut] = test
if not self.jobs:
raise IndexError('pop from empty list')
# Print remaining running jobs when all jobs have been started.
if not self.test_list:
print("Remaining jobs: [{}]".format(", ".join(j[0] for j in self.jobs)))
print("Remaining jobs: [{}]".format(", ".join(sorted(self.jobs.values()))))
dot_count = 0
while True:
# Return all procs that have finished, if any. Otherwise sleep until there is one.
time.sleep(.5)
procs = futures.wait(self.jobs.keys(), timeout=.5, return_when=futures.FIRST_COMPLETED)
self.jobs = {fut: self.jobs[fut] for fut in procs.not_done}
ret = []
for job in self.jobs:
(name, start_time, proc, testdir, log_out, log_err) = job
if proc.poll() is not None:
log_out.seek(0), log_err.seek(0)
[stdout, stderr] = [log_file.read().decode('utf-8') for log_file in (log_out, log_err)]
log_out.close(), log_err.close()
skip_reason = None
if proc.returncode == TEST_EXIT_PASSED and stderr == "":
status = "Passed"
elif proc.returncode == TEST_EXIT_SKIPPED:
status = "Skipped"
skip_reason = re.search(r"Test Skipped: (.*)", stdout).group(1)
else:
status = "Failed"
self.jobs.remove(job)
if self.use_term_control:
clearline = '\r' + (' ' * dot_count) + '\r'
print(clearline, end='', flush=True)
dot_count = 0
ret.append((TestResult(name, status, int(time.time() - start_time)), testdir, stdout, stderr, skip_reason))
for job in procs.done:
(name, start_time, proc, testdir, log_out, log_err) = job.result()
log_out.seek(0), log_err.seek(0)
[stdout, stderr] = [log_file.read().decode('utf-8') for log_file in (log_out, log_err)]
log_out.close(), log_err.close()
skip_reason = None
if proc.returncode == TEST_EXIT_PASSED and stderr == "":
status = "Passed"
elif proc.returncode == TEST_EXIT_SKIPPED:
status = "Skipped"
skip_reason = re.search(r"Test Skipped: (.*)", stdout).group(1)
else:
status = "Failed"
if self.use_term_control:
clearline = '\r' + (' ' * dot_count) + '\r'
print(clearline, end='', flush=True)
dot_count = 0
ret.append((TestResult(name, status, int(time.time() - start_time)), testdir, stdout, stderr, skip_reason))
if ret:
return ret
if self.use_term_control: