Changeset 73231 in webkit


Ignore:
Timestamp:
Dec 2, 2010 7:41:43 PM (13 years ago)
Author:
dpranke@chromium.org
Message:

2010-12-02 Dirk Pranke <dpranke@chromium.org>

Reviewed by Tony Chang.

nrwt multiprocessing - move logic back into run_webkit_tests

This change moves a bunch of logic that I had put into
message_broker back into run_webkit_tests, in a slightly
different format. WorkerMessageBroker needed to become less aware of
the logic the TestRunner class uses, and more generic.
Eventually the MessageBroker will only do generic messaging and
thread/process-pooling, and (almost) all of the
run-webkit-tests-specific logic will be moved to
run_webkit_tests.py and dump_render_tree_thread.py.

The biggest changes are that the Broker can now start a single
worker, but the responsibility for starting all of them is pushed
back to the TestRunner (Manager), and the logic for checking if
the threads are done or wedged is moved back to TestRunner. We
also remove WorkerMessageBroker.cleanup (not needed) and
cancel_workers (they have to be cancelled individually).

The message_broker is now encapsulated inside
TestRunner._run_tests(); it only needs to exist while actually
running the tests.

Also, delete a bunch of tests in message_broker_unittest that no
longer make much sense.

This patch depends on bug 50372.

https://bugs.webkit.org/show_bug.cgi?id=50374

  • Scripts/webkitpy/layout_tests/layout_package/dump_render_tree_thread.py:
  • Scripts/webkitpy/layout_tests/layout_package/message_broker.py:
  • Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py:
  • Scripts/webkitpy/layout_tests/run_webkit_tests.py:
  • Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:
Location:
trunk/WebKitTools
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • trunk/WebKitTools/ChangeLog

    r73228 r73231  
     12010-12-02  Dirk Pranke  <dpranke@chromium.org>
     2
     3        Reviewed by Tony Chang.
     4
     5        nrwt multiprocessing - move logic back into run_webkit_tests
     6
     7        This change moves a bunch of logic that I had put into
     8        message_broker back into run_webkit_tests, in a slightly
     9        different format. WorkerMessageBroker needed to become less aware of
     10        the logic the TestRunner class uses, and more generic.
     11        Eventually the MessageBroker will only do generic messaging and
     12        thread/process-pooling, and (almost) all of the
     13        run-webkit-tests-specific logic will be moved to
     14        run_webkit_tests.py and dump_render_tree_thread.py.
     15       
     16        The biggest changes are that the Broker can now start a single
     17        worker, but the responsibility for starting all of them is pushed
     18        back to the TestRunner (Manager), and the logic for checking if
     19        the threads are done or wedged is moved back to TestRunner. We
     20        also remove WorkerMessageBroker.cleanup (not needed) and
     21        cancel_workers (they have to be cancelled individually).
     22       
     23        The  message_broker is now encapsulated inside
     24        TestRunner._run_tests(); it only needs to exist while actually
     25        running the tests.
     26
     27        Also, delete a bunch of tests in message_broker_unittest that no
     28        longer make much sense.
     29
     30        This patch depends on bug 50372.
     31
     32        https://bugs.webkit.org/show_bug.cgi?id=50374
     33
     34        * Scripts/webkitpy/layout_tests/layout_package/dump_render_tree_thread.py:
     35        * Scripts/webkitpy/layout_tests/layout_package/message_broker.py:
     36        * Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py:
     37        * Scripts/webkitpy/layout_tests/run_webkit_tests.py:
     38        * Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:
     39
    1402010-12-02  Hayato Ito  <hayato@chromium.org>
    241
  • trunk/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py

    r73222 r73231  
    6565        self._port = port
    6666        self._options = options
    67         self._num_workers = int(self._options.child_processes)
    6867
    69         # This maps worker names to their TestShellThread objects.
     68        # This maps worker_names to TestShellThreads
    7069        self._threads = {}
    7170
    72     def start_workers(self, test_runner):
    73         """Starts up the pool of workers for running the tests.
     71    def start_worker(self, test_runner, worker_number):
     72        """Start a worker with the given index number.
    7473
    75         Args:
    76             test_runner: a handle to the manager/TestRunner object
    77         """
    78         self._test_runner = test_runner
    79         for worker_number in xrange(self._num_workers):
    80             thread = self.start_worker(worker_number)
    81             self._threads[thread.name()] = thread
    82         return self._threads.values()
    83 
    84     def start_worker(self, worker_number):
    85         # FIXME: Replace with something that isn't a thread.
     74        Returns the actual TestShellThread object."""
     75        # FIXME: Remove dependencies on test_runner.
     76        # FIXME: Replace with something that isn't a thread, and return
     77        # the name of the worker, not the thread itself. We need to return
     78        # the thread itself for now to allow TestRunner to access the object
     79        # directly to read shared state.
     80        thread = dump_render_tree_thread.TestShellThread(self._port,
     81            self._options, worker_number, test_runner._current_filename_queue,
     82            test_runner._result_queue)
     83        self._threads[thread.name()] = thread
    8684        # Note: Don't start() the thread! If we did, it would actually
    8785        # create another thread and start executing it, and we'd no longer
    8886        # be single-threaded.
    89         return dump_render_tree_thread.TestShellThread(self._port,
    90             self._options, worker_number,
    91             self._test_runner._current_filename_queue,
    92             self._test_runner._result_queue)
     87        return thread
    9388
    94     def run_message_loop(self):
    95         """Loop processing messages until done."""
     89    def cancel_worker(self, worker_name):
     90        """Attempt to cancel a worker (best-effort). The worker may still be
     91        running after this call returns."""
     92        self._threads[worker_name].cancel()
     93
     94    def log_wedged_worker(self, worker_name):
     95        """Log information about the given worker's state."""
    9696        raise NotImplementedError
    9797
    98     def cancel_workers(self):
    99         """Cancel/interrupt any workers that are still alive."""
    100         pass
    101 
    102     def cleanup(self):
    103         """Perform any necessary cleanup on shutdown."""
    104         pass
     98    def run_message_loop(self, test_runner):
     99        """Loop processing messages until done."""
     100        # FIXME: eventually we'll need a message loop that the workers
     101        # can also call.
     102        raise NotImplementedError
    105103
    106104
    107105class _InlineBroker(_WorkerMessageBroker):
    108     def run_message_loop(self):
     106    def run_message_loop(self, test_runner):
    109107        thread = self._threads.values()[0]
    110         thread.run_in_main_thread(self._test_runner,
    111                                   self._test_runner._current_result_summary)
    112         self._test_runner.update()
     108        thread.run_in_main_thread(test_runner,
     109                                  test_runner._current_result_summary)
     110
     111    def log_wedged_worker(self, worker_name):
     112        raise AssertionError('_InlineBroker.log_wedged_worker() called')
    113113
    114114
    115115class _MultiThreadedBroker(_WorkerMessageBroker):
    116     def start_worker(self, worker_number):
    117         thread = _WorkerMessageBroker.start_worker(self, worker_number)
     116    def start_worker(self, test_runner, worker_number):
     117        thread = _WorkerMessageBroker.start_worker(self, test_runner,
     118                                                   worker_number)
     119        # Unlike the base implementation, here we actually want to start
     120        # the thread.
    118121        thread.start()
    119122        return thread
    120123
    121     def run_message_loop(self):
    122         # Loop through all the threads waiting for them to finish.
    123         some_thread_is_alive = True
    124         while some_thread_is_alive:
    125             some_thread_is_alive = False
    126             t = time.time()
    127             for thread in self._threads.values():
    128                 exception_info = thread.exception_info()
    129                 if exception_info is not None:
    130                     # Re-raise the thread's exception here to make it
    131                     # clear that testing was aborted. Otherwise,
    132                     # the tests that did not run would be assumed
    133                     # to have passed.
    134                     raise exception_info[0], exception_info[1], exception_info[2]
     124    def run_message_loop(self, test_runner):
     125        # FIXME: Remove the dependencies on test_runner. Checking on workers
     126        # should be done via a timer firing.
     127        test_runner._check_on_workers()
    135128
    136                 if thread.isAlive():
    137                     some_thread_is_alive = True
    138                     next_timeout = thread.next_timeout()
    139                     if next_timeout and t > next_timeout:
    140                         log_wedged_worker(thread.name(), thread.id())
    141                         thread.clear_next_timeout()
     129    def log_wedged_worker(self, worker_name):
     130        thread = self._threads[worker_name]
     131        stack = self._find_thread_stack(thread.id())
     132        assert(stack is not None)
     133        _log.error("")
     134        _log.error("%s (tid %d) is wedged" % (worker_name, thread.id()))
     135        self._log_stack(stack)
     136        _log.error("")
    142137
    143             self._test_runner.update()
     138    def _find_thread_stack(self, id):
     139        """Returns a stack object that can be used to dump a stack trace for
     140        the given thread id (or None if the id is not found)."""
     141        for thread_id, stack in sys._current_frames().items():
     142            if thread_id == id:
     143                return stack
     144        return None
    144145
    145             if some_thread_is_alive:
    146                 time.sleep(0.01)
    147 
    148     def cancel_workers(self):
    149         for thread in self._threads.values():
    150             thread.cancel()
    151 
    152 
    153 def log_wedged_worker(name, id):
    154     """Log information about the given worker state."""
    155     stack = _find_thread_stack(id)
    156     assert(stack is not None)
    157     _log.error("")
    158     _log.error("%s (tid %d) is wedged" % (name, id))
    159     _log_stack(stack)
    160     _log.error("")
    161 
    162 
    163 def _find_thread_stack(id):
    164     """Returns a stack object that can be used to dump a stack trace for
    165     the given thread id (or None if the id is not found)."""
    166     for thread_id, stack in sys._current_frames().items():
    167         if thread_id == id:
    168             return stack
    169     return None
    170 
    171 
    172 def _log_stack(stack):
    173     """Log a stack trace to log.error()."""
    174     for filename, lineno, name, line in traceback.extract_stack(stack):
    175         _log.error('File: "%s", line %d, in %s' % (filename, lineno, name))
    176         if line:
    177             _log.error('  %s' % line.strip())
     146    def _log_stack(self, stack):
     147        """Log a stack trace to log.error()."""
     148        for filename, lineno, name, line in traceback.extract_stack(stack):
     149            _log.error('File: "%s", line %d, in %s' % (filename, lineno, name))
     150            if line:
     151                _log.error('  %s' % line.strip())
  • trunk/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py

    r73222 r73231  
    4343import message_broker
    4444
     45# FIXME: Boy do we need a lot more tests here ...
    4546
    46 class TestThread(threading.Thread):
    47     def __init__(self, started_queue, stopping_queue):
    48         threading.Thread.__init__(self)
    49         self._id = None
    50         self._started_queue = started_queue
    51         self._stopping_queue = stopping_queue
    52         self._timeout = False
    53         self._timeout_queue = Queue.Queue()
    54         self._exception_info = None
    5547
    56     def id(self):
    57         return self._id
     48class TestThreadStacks(unittest.TestCase):
     49    class Thread(threading.Thread):
     50        def __init__(self, started_queue, stopping_queue):
     51            threading.Thread.__init__(self)
     52            self._id = None
     53            self._started_queue = started_queue
     54            self._stopping_queue = stopping_queue
    5855
    59     def name(self):
    60         return 'worker/0'
     56        def id(self):
     57            return self._id
    6158
    62     def run(self):
    63         self._covered_run()
     59        def name(self):
     60            return 'worker/0'
    6461
    65     def _covered_run(self):
    66         # FIXME: this is a separate routine to work around a bug
    67         # in coverage: see http://bitbucket.org/ned/coveragepy/issue/85.
    68         self._id = thread.get_ident()
    69         try:
     62        def run(self):
     63            self._id = thread.get_ident()
    7064            self._started_queue.put('')
    7165            msg = self._stopping_queue.get()
    72             if msg == 'KeyboardInterrupt':
    73                 raise KeyboardInterrupt
    74             elif msg == 'Exception':
    75                 raise ValueError()
    76             elif msg == 'Timeout':
    77                 self._timeout = True
    78                 self._timeout_queue.get()
    79         except:
    80             self._exception_info = sys.exc_info()
    8166
    82     def exception_info(self):
    83         return self._exception_info
     67    def make_broker(self):
     68        options = mocktool.MockOptions()
     69        return message_broker._MultiThreadedBroker(port=None,
     70                                                     options=options)
    8471
    85     def next_timeout(self):
    86         if self._timeout:
    87             self._timeout_queue.put('done')
    88             return time.time() - 10
    89         return time.time()
    90 
    91     def clear_next_timeout(self):
    92         self._next_timeout = None
    93 
    94 class TestHandler(logging.Handler):
    95     def __init__(self, astream):
    96         logging.Handler.__init__(self)
    97         self._stream = astream
    98 
    99     def emit(self, record):
    100         self._stream.write(self.format(record))
    101 
    102 
    103 class MultiThreadedBrokerTest(unittest.TestCase):
    104     class MockTestRunner(object):
    105         def __init__(self):
    106             pass
    107 
    108         def __del__(self):
    109             pass
    110 
    111         def update(self):
    112             pass
    113 
    114     def run_one_thread(self, msg):
    115         runner = self.MockTestRunner()
    116         port = None
    117         options = mocktool.MockOptions(child_processes='1')
    118         starting_queue = Queue.Queue()
    119         stopping_queue = Queue.Queue()
    120         broker = message_broker._MultiThreadedBroker(port, options)
    121         broker._test_runner = runner
    122         child_thread = TestThread(starting_queue, stopping_queue)
    123         name = child_thread.name()
    124         broker._threads[name] = child_thread
    125         child_thread.start()
    126         started_msg = starting_queue.get()
    127         stopping_queue.put(msg)
    128         return broker.run_message_loop()
    129 
    130     def test_basic(self):
    131         interrupted = self.run_one_thread('')
    132         self.assertFalse(interrupted)
    133 
    134     def test_interrupt(self):
    135         self.assertRaises(KeyboardInterrupt, self.run_one_thread, 'KeyboardInterrupt')
    136 
    137     def test_timeout(self):
    138         oc = outputcapture.OutputCapture()
    139         oc.capture_output()
    140         interrupted = self.run_one_thread('Timeout')
    141         self.assertFalse(interrupted)
    142         oc.restore_output()
    143 
    144     def test_exception(self):
    145         self.assertRaises(ValueError, self.run_one_thread, 'Exception')
    146 
    147 
    148 class Test(unittest.TestCase):
    14972    def test_find_thread_stack_found(self):
     73        broker = self.make_broker()
    15074        id, stack = sys._current_frames().items()[0]
    151         found_stack = message_broker._find_thread_stack(id)
     75        found_stack = broker._find_thread_stack(id)
    15276        self.assertNotEqual(found_stack, None)
    15377
    15478    def test_find_thread_stack_not_found(self):
    155         found_stack = message_broker._find_thread_stack(0)
     79        broker = self.make_broker()
     80        found_stack = broker._find_thread_stack(0)
    15681        self.assertEqual(found_stack, None)
    15782
    15883    def test_log_wedged_worker(self):
     84        broker = self.make_broker()
    15985        oc = outputcapture.OutputCapture()
    16086        oc.capture_output()
    161         logger = message_broker._log
    162         astream = array_stream.ArrayStream()
    163         handler = TestHandler(astream)
    164         logger.addHandler(handler)
    16587
    16688        starting_queue = Queue.Queue()
    16789        stopping_queue = Queue.Queue()
    168         child_thread = TestThread(starting_queue, stopping_queue)
     90        child_thread = TestThreadStacks.Thread(starting_queue, stopping_queue)
    16991        child_thread.start()
     92        broker._threads[child_thread.name()] = child_thread
    17093        msg = starting_queue.get()
    17194
    172         message_broker.log_wedged_worker(child_thread.name(),
    173                                          child_thread.id())
     95        broker.log_wedged_worker(child_thread.name())
    17496        stopping_queue.put('')
    17597        child_thread.join(timeout=1.0)
    17698
    177         self.assertFalse(astream.empty())
    17899        self.assertFalse(child_thread.isAlive())
    179100        oc.restore_output()
  • trunk/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests.py

    r73222 r73231  
    229229
    230230
     231class WorkerState(object):
     232    """A class for the TestRunner/manager to use to track the current state
     233    of the workers."""
     234    def __init__(self, name, number, thread):
     235        self.name = name
     236        self.number = number
     237        self.thread = thread
     238
     239
    231240class TestRunner:
    232241    """A class for managing running a series of tests on a series of layout
     
    241250    DEFAULT_TEST_TIMEOUT_MS = 6 * 1000
    242251
    243     def __init__(self, port, options, printer, message_broker):
     252    def __init__(self, port, options, printer):
    244253        """Initialize test runner data structures.
    245254
     
    248257          options: a dictionary of command line options
    249258          printer: a Printer object to record updates to.
    250           message_broker: object used to communicate with workers.
    251259        """
    252260        self._port = port
    253261        self._options = options
    254262        self._printer = printer
    255         self._message_broker = message_broker
     263
     264        # This maps worker names to the state we are tracking for each of them.
     265        self._workers = {}
    256266
    257267        # disable wss server. need to install pyOpenSSL on buildbots.
     
    587597        """
    588598
     599        self._workers = {}
     600
    589601        self._printer.print_update('Sharding tests ...')
    590602        num_workers = self._num_workers()
    591603        test_lists = self._shard_tests(file_list,
    592604            num_workers > 1 and not self._options.experimental_fully_parallel)
     605
     606        broker = message_broker.get(self._port, self._options)
     607        self._message_broker = broker
     608
    593609        filename_queue = Queue.Queue()
    594610        for item in test_lists:
     
    597613        self._printer.print_update('Starting %s ...' %
    598614                                   grammar.pluralize('worker', num_workers))
    599         message_broker = self._message_broker
    600615        self._current_filename_queue = filename_queue
    601616        self._current_result_summary = result_summary
    602617
    603         if not self._options.dry_run:
    604             threads = message_broker.start_workers(self)
    605         else:
    606             threads = []
     618        for worker_number in xrange(num_workers):
     619            thread = broker.start_worker(self, worker_number)
     620            w = WorkerState(thread.name(), worker_number, thread)
     621            self._workers[thread.name()] = w
    607622
    608623        self._printer.print_update("Starting testing ...")
     
    610625        if not self._options.dry_run:
    611626            try:
    612                 message_broker.run_message_loop()
     627                broker.run_message_loop(self)
    613628            except KeyboardInterrupt:
    614629                _log.info("Interrupted, exiting")
    615                 message_broker.cancel_workers()
     630                for worker_name in self._workers.keys():
     631                    broker.cancel_worker(worker_name)
    616632                keyboard_interrupted = True
    617633            except:
     
    621637
    622638        thread_timings, test_timings, individual_test_timings = \
    623             self._collect_timing_info(threads)
     639            self._collect_timing_info(self._workers)
     640        self._message_broker = None
    624641
    625642        return (keyboard_interrupted, thread_timings, test_timings,
    626643                individual_test_timings)
    627644
    628     def update(self):
    629         self.update_summary(self._current_result_summary)
    630 
    631     def _collect_timing_info(self, threads):
     645    def _check_on_workers(self):
     646        """Returns True iff all the workers have either completed or wedged."""
     647
     648        # Loop through all the threads waiting for them to finish.
     649        some_thread_is_alive = True
     650        while some_thread_is_alive:
     651            some_thread_is_alive = False
     652            t = time.time()
     653            for worker in self._workers.values():
     654                thread = worker.thread
     655                exception_info = thread.exception_info()
     656                if exception_info is not None:
     657                    # Re-raise the thread's exception here to make it
     658                    # clear that testing was aborted. Otherwise,
     659                    # the tests that did not run would be assumed
     660                    # to have passed.
     661                    raise exception_info[0], exception_info[1], exception_info[2]
     662
     663                if thread.isAlive():
     664                    some_thread_is_alive = True
     665                    next_timeout = thread.next_timeout()
     666                    if next_timeout and t > next_timeout:
     667                        self._message_broker.log_wedged_worker(worker.name)
     668                        thread.clear_next_timeout()
     669
     670            self.update_summary(self._current_result_summary)
     671
     672            if some_thread_is_alive:
     673                time.sleep(0.01)
     674
     675    def _collect_timing_info(self, workers):
    632676        test_timings = {}
    633677        individual_test_timings = []
    634678        thread_timings = []
    635679
    636         for thread in threads:
    637             thread_timings.append({'name': thread.getName(),
     680        for w in workers.values():
     681            thread = w.thread
     682            thread_timings.append({'name': thread.name(),
    638683                                   'num_tests': thread.get_num_tests(),
    639684                                   'total_time': thread.get_total_time()})
     
    10071052        """Prints the run times for slow, timeout and crash tests.
    10081053        Args:
    1009           individual_test_timings: List of dump_render_tree_thread.TestStats
    1010               for all tests.
     1054          individual_test_timings: List of TestStats for all tests.
    10111055          result_summary: summary object for test run
    10121056        """
     
    12961340        return 0
    12971341
    1298     broker = message_broker.get(port, options)
    1299 
    13001342    # We wrap any parts of the run that are slow or likely to raise exceptions
    13011343    # in a try/finally to ensure that we clean up the logging configuration.
    13021344    num_unexpected_results = -1
    13031345    try:
    1304         test_runner = TestRunner(port, options, printer, broker)
     1346        test_runner = TestRunner(port, options, printer)
    13051347        test_runner._print_config()
    13061348
     
    13311373                       num_unexpected_results)
    13321374    finally:
    1333         broker.cleanup()
    13341375        printer.cleanup()
    13351376
  • trunk/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py

    r73222 r73231  
    490490
    491491        runner = run_webkit_tests.TestRunner(port=mock_port, options=Mock(),
    492             printer=Mock(), message_broker=Mock())
     492                                             printer=Mock())
    493493        expected_html = u"""<html>
    494494  <head>
     
    508508        # put the http tests first in the queue.
    509509        runner = TestRunnerWrapper(port=Mock(), options=Mock(),
    510             printer=Mock(), message_broker=Mock())
     510                                   printer=Mock())
    511511
    512512        test_list = [
Note: See TracChangeset for help on using the changeset viewer.