Changeset 73255 in webkit


Ignore:
Timestamp:
Dec 3, 2010 2:03:44 AM (13 years ago)
Author:
commit-queue@webkit.org
Message:

2010-12-03 Sheriff Bot <webkit.review.bot@gmail.com>

Unreviewed, rolling out r73231.
http://trac.webkit.org/changeset/73231
https://bugs.webkit.org/show_bug.cgi?id=50443

r73211 seemed to broke Chromium's "Webkit Win (dbg)(2)" bot.
(Requested by yutak on #webkit).

  • Scripts/webkitpy/layout_tests/layout_package/message_broker.py:
  • Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py:
  • Scripts/webkitpy/layout_tests/run_webkit_tests.py:
  • Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:
Location:
trunk/WebKitTools
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • trunk/WebKitTools/ChangeLog

    r73248 r73255  
     12010-12-03  Sheriff Bot  <webkit.review.bot@gmail.com>
     2
     3        Unreviewed, rolling out r73231.
     4        http://trac.webkit.org/changeset/73231
     5        https://bugs.webkit.org/show_bug.cgi?id=50443
     6
     7        r73211 seemed to broke Chromium's "Webkit Win (dbg)(2)" bot.
     8        (Requested by yutak on #webkit).
     9
     10        * Scripts/webkitpy/layout_tests/layout_package/message_broker.py:
     11        * Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py:
     12        * Scripts/webkitpy/layout_tests/run_webkit_tests.py:
     13        * Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:
     14
    1152010-12-03  David Levin  <levin@chromium.org>
    216
  • trunk/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py

    r73231 r73255  
    6565        self._port = port
    6666        self._options = options
     67        self._num_workers = int(self._options.child_processes)
    6768
    68         # This maps worker_names to TestShellThreads
     69        # This maps worker names to their TestShellThread objects.
    6970        self._threads = {}
    7071
    71     def start_worker(self, test_runner, worker_number):
    72         """Start a worker with the given index number.
     72    def start_workers(self, test_runner):
     73        """Starts up the pool of workers for running the tests.
    7374
    74         Returns the actual TestShellThread object."""
    75         # FIXME: Remove dependencies on test_runner.
    76         # FIXME: Replace with something that isn't a thread, and return
    77         # the name of the worker, not the thread itself. We need to return
    78         # the thread itself for now to allow TestRunner to access the object
    79         # directly to read shared state.
    80         thread = dump_render_tree_thread.TestShellThread(self._port,
    81             self._options, worker_number, test_runner._current_filename_queue,
    82             test_runner._result_queue)
    83         self._threads[thread.name()] = thread
     75        Args:
     76            test_runner: a handle to the manager/TestRunner object
     77        """
     78        self._test_runner = test_runner
     79        for worker_number in xrange(self._num_workers):
     80            thread = self.start_worker(worker_number)
     81            self._threads[thread.name()] = thread
     82        return self._threads.values()
     83
     84    def start_worker(self, worker_number):
     85        # FIXME: Replace with something that isn't a thread.
    8486        # Note: Don't start() the thread! If we did, it would actually
    8587        # create another thread and start executing it, and we'd no longer
    8688        # be single-threaded.
    87         return thread
     89        return dump_render_tree_thread.TestShellThread(self._port,
     90            self._options, worker_number,
     91            self._test_runner._current_filename_queue,
     92            self._test_runner._result_queue)
    8893
    89     def cancel_worker(self, worker_name):
    90         """Attempt to cancel a worker (best-effort). The worker may still be
    91         running after this call returns."""
    92         self._threads[worker_name].cancel()
    93 
    94     def log_wedged_worker(self, worker_name):
    95         """Log information about the given worker's state."""
     94    def run_message_loop(self):
     95        """Loop processing messages until done."""
    9696        raise NotImplementedError
    9797
    98     def run_message_loop(self, test_runner):
    99         """Loop processing messages until done."""
    100         # FIXME: eventually we'll need a message loop that the workers
    101         # can also call.
    102         raise NotImplementedError
     98    def cancel_workers(self):
     99        """Cancel/interrupt any workers that are still alive."""
     100        pass
     101
     102    def cleanup(self):
     103        """Perform any necessary cleanup on shutdown."""
     104        pass
    103105
    104106
    105107class _InlineBroker(_WorkerMessageBroker):
    106     def run_message_loop(self, test_runner):
     108    def run_message_loop(self):
    107109        thread = self._threads.values()[0]
    108         thread.run_in_main_thread(test_runner,
    109                                   test_runner._current_result_summary)
    110 
    111     def log_wedged_worker(self, worker_name):
    112         raise AssertionError('_InlineBroker.log_wedged_worker() called')
     110        thread.run_in_main_thread(self._test_runner,
     111                                  self._test_runner._current_result_summary)
     112        self._test_runner.update()
    113113
    114114
    115115class _MultiThreadedBroker(_WorkerMessageBroker):
    116     def start_worker(self, test_runner, worker_number):
    117         thread = _WorkerMessageBroker.start_worker(self, test_runner,
    118                                                    worker_number)
    119         # Unlike the base implementation, here we actually want to start
    120         # the thread.
     116    def start_worker(self, worker_number):
     117        thread = _WorkerMessageBroker.start_worker(self, worker_number)
    121118        thread.start()
    122119        return thread
    123120
    124     def run_message_loop(self, test_runner):
    125         # FIXME: Remove the dependencies on test_runner. Checking on workers
    126         # should be done via a timer firing.
    127         test_runner._check_on_workers()
     121    def run_message_loop(self):
     122        # Loop through all the threads waiting for them to finish.
     123        some_thread_is_alive = True
     124        while some_thread_is_alive:
     125            some_thread_is_alive = False
     126            t = time.time()
     127            for thread in self._threads.values():
     128                exception_info = thread.exception_info()
     129                if exception_info is not None:
     130                    # Re-raise the thread's exception here to make it
     131                    # clear that testing was aborted. Otherwise,
     132                    # the tests that did not run would be assumed
     133                    # to have passed.
     134                    raise exception_info[0], exception_info[1], exception_info[2]
    128135
    129     def log_wedged_worker(self, worker_name):
    130         thread = self._threads[worker_name]
    131         stack = self._find_thread_stack(thread.id())
    132         assert(stack is not None)
    133         _log.error("")
    134         _log.error("%s (tid %d) is wedged" % (worker_name, thread.id()))
    135         self._log_stack(stack)
    136         _log.error("")
     136                if thread.isAlive():
     137                    some_thread_is_alive = True
     138                    next_timeout = thread.next_timeout()
     139                    if next_timeout and t > next_timeout:
     140                        log_wedged_worker(thread.name(), thread.id())
     141                        thread.clear_next_timeout()
    137142
    138     def _find_thread_stack(self, id):
    139         """Returns a stack object that can be used to dump a stack trace for
    140         the given thread id (or None if the id is not found)."""
    141         for thread_id, stack in sys._current_frames().items():
    142             if thread_id == id:
    143                 return stack
    144         return None
     143            self._test_runner.update()
    145144
    146     def _log_stack(self, stack):
    147         """Log a stack trace to log.error()."""
    148         for filename, lineno, name, line in traceback.extract_stack(stack):
    149             _log.error('File: "%s", line %d, in %s' % (filename, lineno, name))
    150             if line:
    151                 _log.error('  %s' % line.strip())
     145            if some_thread_is_alive:
     146                time.sleep(0.01)
     147
     148    def cancel_workers(self):
     149        for thread in self._threads.values():
     150            thread.cancel()
     151
     152
     153def log_wedged_worker(name, id):
     154    """Log information about the given worker state."""
     155    stack = _find_thread_stack(id)
     156    assert(stack is not None)
     157    _log.error("")
     158    _log.error("%s (tid %d) is wedged" % (name, id))
     159    _log_stack(stack)
     160    _log.error("")
     161
     162
     163def _find_thread_stack(id):
     164    """Returns a stack object that can be used to dump a stack trace for
     165    the given thread id (or None if the id is not found)."""
     166    for thread_id, stack in sys._current_frames().items():
     167        if thread_id == id:
     168            return stack
     169    return None
     170
     171
     172def _log_stack(stack):
     173    """Log a stack trace to log.error()."""
     174    for filename, lineno, name, line in traceback.extract_stack(stack):
     175        _log.error('File: "%s", line %d, in %s' % (filename, lineno, name))
     176        if line:
     177            _log.error('  %s' % line.strip())
  • trunk/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py

    r73231 r73255  
    4343import message_broker
    4444
    45 # FIXME: Boy do we need a lot more tests here ...
     45
     46class TestThread(threading.Thread):
     47    def __init__(self, started_queue, stopping_queue):
     48        threading.Thread.__init__(self)
     49        self._id = None
     50        self._started_queue = started_queue
     51        self._stopping_queue = stopping_queue
     52        self._timeout = False
     53        self._timeout_queue = Queue.Queue()
     54        self._exception_info = None
     55
     56    def id(self):
     57        return self._id
     58
     59    def name(self):
     60        return 'worker/0'
     61
     62    def run(self):
     63        self._covered_run()
     64
     65    def _covered_run(self):
     66        # FIXME: this is a separate routine to work around a bug
     67        # in coverage: see http://bitbucket.org/ned/coveragepy/issue/85.
     68        self._id = thread.get_ident()
     69        try:
     70            self._started_queue.put('')
     71            msg = self._stopping_queue.get()
     72            if msg == 'KeyboardInterrupt':
     73                raise KeyboardInterrupt
     74            elif msg == 'Exception':
     75                raise ValueError()
     76            elif msg == 'Timeout':
     77                self._timeout = True
     78                self._timeout_queue.get()
     79        except:
     80            self._exception_info = sys.exc_info()
     81
     82    def exception_info(self):
     83        return self._exception_info
     84
     85    def next_timeout(self):
     86        if self._timeout:
     87            self._timeout_queue.put('done')
     88            return time.time() - 10
     89        return time.time()
     90
     91    def clear_next_timeout(self):
     92        self._next_timeout = None
     93
     94class TestHandler(logging.Handler):
     95    def __init__(self, astream):
     96        logging.Handler.__init__(self)
     97        self._stream = astream
     98
     99    def emit(self, record):
     100        self._stream.write(self.format(record))
    46101
    47102
    48 class TestThreadStacks(unittest.TestCase):
    49     class Thread(threading.Thread):
    50         def __init__(self, started_queue, stopping_queue):
    51             threading.Thread.__init__(self)
    52             self._id = None
    53             self._started_queue = started_queue
    54             self._stopping_queue = stopping_queue
     103class MultiThreadedBrokerTest(unittest.TestCase):
     104    class MockTestRunner(object):
     105        def __init__(self):
     106            pass
    55107
    56         def id(self):
    57             return self._id
     108        def __del__(self):
     109            pass
    58110
    59         def name(self):
    60             return 'worker/0'
     111        def update(self):
     112            pass
    61113
    62         def run(self):
    63             self._id = thread.get_ident()
    64             self._started_queue.put('')
    65             msg = self._stopping_queue.get()
     114    def run_one_thread(self, msg):
     115        runner = self.MockTestRunner()
     116        port = None
     117        options = mocktool.MockOptions(child_processes='1')
     118        starting_queue = Queue.Queue()
     119        stopping_queue = Queue.Queue()
     120        broker = message_broker._MultiThreadedBroker(port, options)
     121        broker._test_runner = runner
     122        child_thread = TestThread(starting_queue, stopping_queue)
     123        name = child_thread.name()
     124        broker._threads[name] = child_thread
     125        child_thread.start()
     126        started_msg = starting_queue.get()
     127        stopping_queue.put(msg)
     128        return broker.run_message_loop()
    66129
    67     def make_broker(self):
    68         options = mocktool.MockOptions()
    69         return message_broker._MultiThreadedBroker(port=None,
    70                                                      options=options)
     130    def test_basic(self):
     131        interrupted = self.run_one_thread('')
     132        self.assertFalse(interrupted)
    71133
     134    def test_interrupt(self):
     135        self.assertRaises(KeyboardInterrupt, self.run_one_thread, 'KeyboardInterrupt')
     136
     137    def test_timeout(self):
     138        oc = outputcapture.OutputCapture()
     139        oc.capture_output()
     140        interrupted = self.run_one_thread('Timeout')
     141        self.assertFalse(interrupted)
     142        oc.restore_output()
     143
     144    def test_exception(self):
     145        self.assertRaises(ValueError, self.run_one_thread, 'Exception')
     146
     147
     148class Test(unittest.TestCase):
    72149    def test_find_thread_stack_found(self):
    73         broker = self.make_broker()
    74150        id, stack = sys._current_frames().items()[0]
    75         found_stack = broker._find_thread_stack(id)
     151        found_stack = message_broker._find_thread_stack(id)
    76152        self.assertNotEqual(found_stack, None)
    77153
    78154    def test_find_thread_stack_not_found(self):
    79         broker = self.make_broker()
    80         found_stack = broker._find_thread_stack(0)
     155        found_stack = message_broker._find_thread_stack(0)
    81156        self.assertEqual(found_stack, None)
    82157
    83158    def test_log_wedged_worker(self):
    84         broker = self.make_broker()
    85159        oc = outputcapture.OutputCapture()
    86160        oc.capture_output()
     161        logger = message_broker._log
     162        astream = array_stream.ArrayStream()
     163        handler = TestHandler(astream)
     164        logger.addHandler(handler)
    87165
    88166        starting_queue = Queue.Queue()
    89167        stopping_queue = Queue.Queue()
    90         child_thread = TestThreadStacks.Thread(starting_queue, stopping_queue)
     168        child_thread = TestThread(starting_queue, stopping_queue)
    91169        child_thread.start()
    92         broker._threads[child_thread.name()] = child_thread
    93170        msg = starting_queue.get()
    94171
    95         broker.log_wedged_worker(child_thread.name())
     172        message_broker.log_wedged_worker(child_thread.name(),
     173                                         child_thread.id())
    96174        stopping_queue.put('')
    97175        child_thread.join(timeout=1.0)
    98176
     177        self.assertFalse(astream.empty())
    99178        self.assertFalse(child_thread.isAlive())
    100179        oc.restore_output()
  • trunk/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests.py

    r73231 r73255  
    229229
    230230
    231 class WorkerState(object):
    232     """A class for the TestRunner/manager to use to track the current state
    233     of the workers."""
    234     def __init__(self, name, number, thread):
    235         self.name = name
    236         self.number = number
    237         self.thread = thread
    238 
    239 
    240231class TestRunner:
    241232    """A class for managing running a series of tests on a series of layout
     
    250241    DEFAULT_TEST_TIMEOUT_MS = 6 * 1000
    251242
    252     def __init__(self, port, options, printer):
     243    def __init__(self, port, options, printer, message_broker):
    253244        """Initialize test runner data structures.
    254245
     
    257248          options: a dictionary of command line options
    258249          printer: a Printer object to record updates to.
     250          message_broker: object used to communicate with workers.
    259251        """
    260252        self._port = port
    261253        self._options = options
    262254        self._printer = printer
    263 
    264         # This maps worker names to the state we are tracking for each of them.
    265         self._workers = {}
     255        self._message_broker = message_broker
    266256
    267257        # disable wss server. need to install pyOpenSSL on buildbots.
     
    597587        """
    598588
    599         self._workers = {}
    600 
    601589        self._printer.print_update('Sharding tests ...')
    602590        num_workers = self._num_workers()
    603591        test_lists = self._shard_tests(file_list,
    604592            num_workers > 1 and not self._options.experimental_fully_parallel)
    605 
    606         broker = message_broker.get(self._port, self._options)
    607         self._message_broker = broker
    608 
    609593        filename_queue = Queue.Queue()
    610594        for item in test_lists:
     
    613597        self._printer.print_update('Starting %s ...' %
    614598                                   grammar.pluralize('worker', num_workers))
     599        message_broker = self._message_broker
    615600        self._current_filename_queue = filename_queue
    616601        self._current_result_summary = result_summary
    617602
    618         for worker_number in xrange(num_workers):
    619             thread = broker.start_worker(self, worker_number)
    620             w = WorkerState(thread.name(), worker_number, thread)
    621             self._workers[thread.name()] = w
     603        if not self._options.dry_run:
     604            threads = message_broker.start_workers(self)
     605        else:
     606            threads = []
    622607
    623608        self._printer.print_update("Starting testing ...")
     
    625610        if not self._options.dry_run:
    626611            try:
    627                 broker.run_message_loop(self)
     612                message_broker.run_message_loop()
    628613            except KeyboardInterrupt:
    629614                _log.info("Interrupted, exiting")
    630                 for worker_name in self._workers.keys():
    631                     broker.cancel_worker(worker_name)
     615                message_broker.cancel_workers()
    632616                keyboard_interrupted = True
    633617            except:
     
    637621
    638622        thread_timings, test_timings, individual_test_timings = \
    639             self._collect_timing_info(self._workers)
    640         self._message_broker = None
     623            self._collect_timing_info(threads)
    641624
    642625        return (keyboard_interrupted, thread_timings, test_timings,
    643626                individual_test_timings)
    644627
    645     def _check_on_workers(self):
    646         """Returns True iff all the workers have either completed or wedged."""
    647 
    648         # Loop through all the threads waiting for them to finish.
    649         some_thread_is_alive = True
    650         while some_thread_is_alive:
    651             some_thread_is_alive = False
    652             t = time.time()
    653             for worker in self._workers.values():
    654                 thread = worker.thread
    655                 exception_info = thread.exception_info()
    656                 if exception_info is not None:
    657                     # Re-raise the thread's exception here to make it
    658                     # clear that testing was aborted. Otherwise,
    659                     # the tests that did not run would be assumed
    660                     # to have passed.
    661                     raise exception_info[0], exception_info[1], exception_info[2]
    662 
    663                 if thread.isAlive():
    664                     some_thread_is_alive = True
    665                     next_timeout = thread.next_timeout()
    666                     if next_timeout and t > next_timeout:
    667                         self._message_broker.log_wedged_worker(worker.name)
    668                         thread.clear_next_timeout()
    669 
    670             self.update_summary(self._current_result_summary)
    671 
    672             if some_thread_is_alive:
    673                 time.sleep(0.01)
    674 
    675     def _collect_timing_info(self, workers):
     628    def update(self):
     629        self.update_summary(self._current_result_summary)
     630
     631    def _collect_timing_info(self, threads):
    676632        test_timings = {}
    677633        individual_test_timings = []
    678634        thread_timings = []
    679635
    680         for w in workers.values():
    681             thread = w.thread
    682             thread_timings.append({'name': thread.name(),
     636        for thread in threads:
     637            thread_timings.append({'name': thread.getName(),
    683638                                   'num_tests': thread.get_num_tests(),
    684639                                   'total_time': thread.get_total_time()})
     
    10521007        """Prints the run times for slow, timeout and crash tests.
    10531008        Args:
    1054           individual_test_timings: List of TestStats for all tests.
     1009          individual_test_timings: List of dump_render_tree_thread.TestStats
     1010              for all tests.
    10551011          result_summary: summary object for test run
    10561012        """
     
    13401296        return 0
    13411297
     1298    broker = message_broker.get(port, options)
     1299
    13421300    # We wrap any parts of the run that are slow or likely to raise exceptions
    13431301    # in a try/finally to ensure that we clean up the logging configuration.
    13441302    num_unexpected_results = -1
    13451303    try:
    1346         test_runner = TestRunner(port, options, printer)
     1304        test_runner = TestRunner(port, options, printer, broker)
    13471305        test_runner._print_config()
    13481306
     
    13731331                       num_unexpected_results)
    13741332    finally:
     1333        broker.cleanup()
    13751334        printer.cleanup()
    13761335
  • trunk/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py

    r73231 r73255  
    490490
    491491        runner = run_webkit_tests.TestRunner(port=mock_port, options=Mock(),
    492                                              printer=Mock())
     492            printer=Mock(), message_broker=Mock())
    493493        expected_html = u"""<html>
    494494  <head>
     
    508508        # put the http tests first in the queue.
    509509        runner = TestRunnerWrapper(port=Mock(), options=Mock(),
    510                                    printer=Mock())
     510            printer=Mock(), message_broker=Mock())
    511511
    512512        test_list = [
Note: See TracChangeset for help on using the changeset viewer.