Changeset 73255 in webkit
- Timestamp:
- Dec 3, 2010 2:03:44 AM (13 years ago)
- Location:
- trunk/WebKitTools
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/WebKitTools/ChangeLog
r73248 r73255 1 2010-12-03 Sheriff Bot <webkit.review.bot@gmail.com> 2 3 Unreviewed, rolling out r73231. 4 http://trac.webkit.org/changeset/73231 5 https://bugs.webkit.org/show_bug.cgi?id=50443 6 7 r73211 seemed to broke Chromium's "Webkit Win (dbg)(2)" bot. 8 (Requested by yutak on #webkit). 9 10 * Scripts/webkitpy/layout_tests/layout_package/message_broker.py: 11 * Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py: 12 * Scripts/webkitpy/layout_tests/run_webkit_tests.py: 13 * Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py: 14 1 15 2010-12-03 David Levin <levin@chromium.org> 2 16 -
trunk/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker.py
r73231 r73255 65 65 self._port = port 66 66 self._options = options 67 self._num_workers = int(self._options.child_processes) 67 68 68 # This maps worker _names to TestShellThreads69 # This maps worker names to their TestShellThread objects. 69 70 self._threads = {} 70 71 71 def start_worker (self, test_runner, worker_number):72 """Start a worker with the given index number.72 def start_workers(self, test_runner): 73 """Starts up the pool of workers for running the tests. 73 74 74 Returns the actual TestShellThread object.""" 75 # FIXME: Remove dependencies on test_runner. 76 # FIXME: Replace with something that isn't a thread, and return 77 # the name of the worker, not the thread itself. We need to return 78 # the thread itself for now to allow TestRunner to access the object 79 # directly to read shared state. 80 thread = dump_render_tree_thread.TestShellThread(self._port, 81 self._options, worker_number, test_runner._current_filename_queue, 82 test_runner._result_queue) 83 self._threads[thread.name()] = thread 75 Args: 76 test_runner: a handle to the manager/TestRunner object 77 """ 78 self._test_runner = test_runner 79 for worker_number in xrange(self._num_workers): 80 thread = self.start_worker(worker_number) 81 self._threads[thread.name()] = thread 82 return self._threads.values() 83 84 def start_worker(self, worker_number): 85 # FIXME: Replace with something that isn't a thread. 84 86 # Note: Don't start() the thread! If we did, it would actually 85 87 # create another thread and start executing it, and we'd no longer 86 88 # be single-threaded. 87 return thread 89 return dump_render_tree_thread.TestShellThread(self._port, 90 self._options, worker_number, 91 self._test_runner._current_filename_queue, 92 self._test_runner._result_queue) 88 93 89 def cancel_worker(self, worker_name): 90 """Attempt to cancel a worker (best-effort). The worker may still be 91 running after this call returns.""" 92 self._threads[worker_name].cancel() 93 94 def log_wedged_worker(self, worker_name): 95 """Log information about the given worker's state.""" 94 def run_message_loop(self): 95 """Loop processing messages until done.""" 96 96 raise NotImplementedError 97 97 98 def run_message_loop(self, test_runner): 99 """Loop processing messages until done.""" 100 # FIXME: eventually we'll need a message loop that the workers 101 # can also call. 102 raise NotImplementedError 98 def cancel_workers(self): 99 """Cancel/interrupt any workers that are still alive.""" 100 pass 101 102 def cleanup(self): 103 """Perform any necessary cleanup on shutdown.""" 104 pass 103 105 104 106 105 107 class _InlineBroker(_WorkerMessageBroker): 106 def run_message_loop(self , test_runner):108 def run_message_loop(self): 107 109 thread = self._threads.values()[0] 108 thread.run_in_main_thread(test_runner, 109 test_runner._current_result_summary) 110 111 def log_wedged_worker(self, worker_name): 112 raise AssertionError('_InlineBroker.log_wedged_worker() called') 110 thread.run_in_main_thread(self._test_runner, 111 self._test_runner._current_result_summary) 112 self._test_runner.update() 113 113 114 114 115 115 class _MultiThreadedBroker(_WorkerMessageBroker): 116 def start_worker(self, test_runner, worker_number): 117 thread = _WorkerMessageBroker.start_worker(self, test_runner, 118 worker_number) 119 # Unlike the base implementation, here we actually want to start 120 # the thread. 116 def start_worker(self, worker_number): 117 thread = _WorkerMessageBroker.start_worker(self, worker_number) 121 118 thread.start() 122 119 return thread 123 120 124 def run_message_loop(self, test_runner): 125 # FIXME: Remove the dependencies on test_runner. Checking on workers 126 # should be done via a timer firing. 127 test_runner._check_on_workers() 121 def run_message_loop(self): 122 # Loop through all the threads waiting for them to finish. 123 some_thread_is_alive = True 124 while some_thread_is_alive: 125 some_thread_is_alive = False 126 t = time.time() 127 for thread in self._threads.values(): 128 exception_info = thread.exception_info() 129 if exception_info is not None: 130 # Re-raise the thread's exception here to make it 131 # clear that testing was aborted. Otherwise, 132 # the tests that did not run would be assumed 133 # to have passed. 134 raise exception_info[0], exception_info[1], exception_info[2] 128 135 129 def log_wedged_worker(self, worker_name): 130 thread = self._threads[worker_name] 131 stack = self._find_thread_stack(thread.id()) 132 assert(stack is not None) 133 _log.error("") 134 _log.error("%s (tid %d) is wedged" % (worker_name, thread.id())) 135 self._log_stack(stack) 136 _log.error("") 136 if thread.isAlive(): 137 some_thread_is_alive = True 138 next_timeout = thread.next_timeout() 139 if next_timeout and t > next_timeout: 140 log_wedged_worker(thread.name(), thread.id()) 141 thread.clear_next_timeout() 137 142 138 def _find_thread_stack(self, id): 139 """Returns a stack object that can be used to dump a stack trace for 140 the given thread id (or None if the id is not found).""" 141 for thread_id, stack in sys._current_frames().items(): 142 if thread_id == id: 143 return stack 144 return None 143 self._test_runner.update() 145 144 146 def _log_stack(self, stack): 147 """Log a stack trace to log.error().""" 148 for filename, lineno, name, line in traceback.extract_stack(stack): 149 _log.error('File: "%s", line %d, in %s' % (filename, lineno, name)) 150 if line: 151 _log.error(' %s' % line.strip()) 145 if some_thread_is_alive: 146 time.sleep(0.01) 147 148 def cancel_workers(self): 149 for thread in self._threads.values(): 150 thread.cancel() 151 152 153 def log_wedged_worker(name, id): 154 """Log information about the given worker state.""" 155 stack = _find_thread_stack(id) 156 assert(stack is not None) 157 _log.error("") 158 _log.error("%s (tid %d) is wedged" % (name, id)) 159 _log_stack(stack) 160 _log.error("") 161 162 163 def _find_thread_stack(id): 164 """Returns a stack object that can be used to dump a stack trace for 165 the given thread id (or None if the id is not found).""" 166 for thread_id, stack in sys._current_frames().items(): 167 if thread_id == id: 168 return stack 169 return None 170 171 172 def _log_stack(stack): 173 """Log a stack trace to log.error().""" 174 for filename, lineno, name, line in traceback.extract_stack(stack): 175 _log.error('File: "%s", line %d, in %s' % (filename, lineno, name)) 176 if line: 177 _log.error(' %s' % line.strip()) -
trunk/WebKitTools/Scripts/webkitpy/layout_tests/layout_package/message_broker_unittest.py
r73231 r73255 43 43 import message_broker 44 44 45 # FIXME: Boy do we need a lot more tests here ... 45 46 class TestThread(threading.Thread): 47 def __init__(self, started_queue, stopping_queue): 48 threading.Thread.__init__(self) 49 self._id = None 50 self._started_queue = started_queue 51 self._stopping_queue = stopping_queue 52 self._timeout = False 53 self._timeout_queue = Queue.Queue() 54 self._exception_info = None 55 56 def id(self): 57 return self._id 58 59 def name(self): 60 return 'worker/0' 61 62 def run(self): 63 self._covered_run() 64 65 def _covered_run(self): 66 # FIXME: this is a separate routine to work around a bug 67 # in coverage: see http://bitbucket.org/ned/coveragepy/issue/85. 68 self._id = thread.get_ident() 69 try: 70 self._started_queue.put('') 71 msg = self._stopping_queue.get() 72 if msg == 'KeyboardInterrupt': 73 raise KeyboardInterrupt 74 elif msg == 'Exception': 75 raise ValueError() 76 elif msg == 'Timeout': 77 self._timeout = True 78 self._timeout_queue.get() 79 except: 80 self._exception_info = sys.exc_info() 81 82 def exception_info(self): 83 return self._exception_info 84 85 def next_timeout(self): 86 if self._timeout: 87 self._timeout_queue.put('done') 88 return time.time() - 10 89 return time.time() 90 91 def clear_next_timeout(self): 92 self._next_timeout = None 93 94 class TestHandler(logging.Handler): 95 def __init__(self, astream): 96 logging.Handler.__init__(self) 97 self._stream = astream 98 99 def emit(self, record): 100 self._stream.write(self.format(record)) 46 101 47 102 48 class TestThreadStacks(unittest.TestCase): 49 class Thread(threading.Thread): 50 def __init__(self, started_queue, stopping_queue): 51 threading.Thread.__init__(self) 52 self._id = None 53 self._started_queue = started_queue 54 self._stopping_queue = stopping_queue 103 class MultiThreadedBrokerTest(unittest.TestCase): 104 class MockTestRunner(object): 105 def __init__(self): 106 pass 55 107 56 def id(self):57 return self._id108 def __del__(self): 109 pass 58 110 59 def name(self):60 return 'worker/0'111 def update(self): 112 pass 61 113 62 def run(self): 63 self._id = thread.get_ident() 64 self._started_queue.put('') 65 msg = self._stopping_queue.get() 114 def run_one_thread(self, msg): 115 runner = self.MockTestRunner() 116 port = None 117 options = mocktool.MockOptions(child_processes='1') 118 starting_queue = Queue.Queue() 119 stopping_queue = Queue.Queue() 120 broker = message_broker._MultiThreadedBroker(port, options) 121 broker._test_runner = runner 122 child_thread = TestThread(starting_queue, stopping_queue) 123 name = child_thread.name() 124 broker._threads[name] = child_thread 125 child_thread.start() 126 started_msg = starting_queue.get() 127 stopping_queue.put(msg) 128 return broker.run_message_loop() 66 129 67 def make_broker(self): 68 options = mocktool.MockOptions() 69 return message_broker._MultiThreadedBroker(port=None, 70 options=options) 130 def test_basic(self): 131 interrupted = self.run_one_thread('') 132 self.assertFalse(interrupted) 71 133 134 def test_interrupt(self): 135 self.assertRaises(KeyboardInterrupt, self.run_one_thread, 'KeyboardInterrupt') 136 137 def test_timeout(self): 138 oc = outputcapture.OutputCapture() 139 oc.capture_output() 140 interrupted = self.run_one_thread('Timeout') 141 self.assertFalse(interrupted) 142 oc.restore_output() 143 144 def test_exception(self): 145 self.assertRaises(ValueError, self.run_one_thread, 'Exception') 146 147 148 class Test(unittest.TestCase): 72 149 def test_find_thread_stack_found(self): 73 broker = self.make_broker()74 150 id, stack = sys._current_frames().items()[0] 75 found_stack = broker._find_thread_stack(id)151 found_stack = message_broker._find_thread_stack(id) 76 152 self.assertNotEqual(found_stack, None) 77 153 78 154 def test_find_thread_stack_not_found(self): 79 broker = self.make_broker() 80 found_stack = broker._find_thread_stack(0) 155 found_stack = message_broker._find_thread_stack(0) 81 156 self.assertEqual(found_stack, None) 82 157 83 158 def test_log_wedged_worker(self): 84 broker = self.make_broker()85 159 oc = outputcapture.OutputCapture() 86 160 oc.capture_output() 161 logger = message_broker._log 162 astream = array_stream.ArrayStream() 163 handler = TestHandler(astream) 164 logger.addHandler(handler) 87 165 88 166 starting_queue = Queue.Queue() 89 167 stopping_queue = Queue.Queue() 90 child_thread = TestThread Stacks.Thread(starting_queue, stopping_queue)168 child_thread = TestThread(starting_queue, stopping_queue) 91 169 child_thread.start() 92 broker._threads[child_thread.name()] = child_thread93 170 msg = starting_queue.get() 94 171 95 broker.log_wedged_worker(child_thread.name()) 172 message_broker.log_wedged_worker(child_thread.name(), 173 child_thread.id()) 96 174 stopping_queue.put('') 97 175 child_thread.join(timeout=1.0) 98 176 177 self.assertFalse(astream.empty()) 99 178 self.assertFalse(child_thread.isAlive()) 100 179 oc.restore_output() -
trunk/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests.py
r73231 r73255 229 229 230 230 231 class WorkerState(object):232 """A class for the TestRunner/manager to use to track the current state233 of the workers."""234 def __init__(self, name, number, thread):235 self.name = name236 self.number = number237 self.thread = thread238 239 240 231 class TestRunner: 241 232 """A class for managing running a series of tests on a series of layout … … 250 241 DEFAULT_TEST_TIMEOUT_MS = 6 * 1000 251 242 252 def __init__(self, port, options, printer ):243 def __init__(self, port, options, printer, message_broker): 253 244 """Initialize test runner data structures. 254 245 … … 257 248 options: a dictionary of command line options 258 249 printer: a Printer object to record updates to. 250 message_broker: object used to communicate with workers. 259 251 """ 260 252 self._port = port 261 253 self._options = options 262 254 self._printer = printer 263 264 # This maps worker names to the state we are tracking for each of them. 265 self._workers = {} 255 self._message_broker = message_broker 266 256 267 257 # disable wss server. need to install pyOpenSSL on buildbots. … … 597 587 """ 598 588 599 self._workers = {}600 601 589 self._printer.print_update('Sharding tests ...') 602 590 num_workers = self._num_workers() 603 591 test_lists = self._shard_tests(file_list, 604 592 num_workers > 1 and not self._options.experimental_fully_parallel) 605 606 broker = message_broker.get(self._port, self._options)607 self._message_broker = broker608 609 593 filename_queue = Queue.Queue() 610 594 for item in test_lists: … … 613 597 self._printer.print_update('Starting %s ...' % 614 598 grammar.pluralize('worker', num_workers)) 599 message_broker = self._message_broker 615 600 self._current_filename_queue = filename_queue 616 601 self._current_result_summary = result_summary 617 602 618 for worker_number in xrange(num_workers):619 thread = broker.start_worker(self, worker_number)620 w = WorkerState(thread.name(), worker_number, thread)621 self._workers[thread.name()] = w603 if not self._options.dry_run: 604 threads = message_broker.start_workers(self) 605 else: 606 threads = [] 622 607 623 608 self._printer.print_update("Starting testing ...") … … 625 610 if not self._options.dry_run: 626 611 try: 627 broker.run_message_loop(self)612 message_broker.run_message_loop() 628 613 except KeyboardInterrupt: 629 614 _log.info("Interrupted, exiting") 630 for worker_name in self._workers.keys(): 631 broker.cancel_worker(worker_name) 615 message_broker.cancel_workers() 632 616 keyboard_interrupted = True 633 617 except: … … 637 621 638 622 thread_timings, test_timings, individual_test_timings = \ 639 self._collect_timing_info(self._workers) 640 self._message_broker = None 623 self._collect_timing_info(threads) 641 624 642 625 return (keyboard_interrupted, thread_timings, test_timings, 643 626 individual_test_timings) 644 627 645 def _check_on_workers(self): 646 """Returns True iff all the workers have either completed or wedged.""" 647 648 # Loop through all the threads waiting for them to finish. 649 some_thread_is_alive = True 650 while some_thread_is_alive: 651 some_thread_is_alive = False 652 t = time.time() 653 for worker in self._workers.values(): 654 thread = worker.thread 655 exception_info = thread.exception_info() 656 if exception_info is not None: 657 # Re-raise the thread's exception here to make it 658 # clear that testing was aborted. Otherwise, 659 # the tests that did not run would be assumed 660 # to have passed. 661 raise exception_info[0], exception_info[1], exception_info[2] 662 663 if thread.isAlive(): 664 some_thread_is_alive = True 665 next_timeout = thread.next_timeout() 666 if next_timeout and t > next_timeout: 667 self._message_broker.log_wedged_worker(worker.name) 668 thread.clear_next_timeout() 669 670 self.update_summary(self._current_result_summary) 671 672 if some_thread_is_alive: 673 time.sleep(0.01) 674 675 def _collect_timing_info(self, workers): 628 def update(self): 629 self.update_summary(self._current_result_summary) 630 631 def _collect_timing_info(self, threads): 676 632 test_timings = {} 677 633 individual_test_timings = [] 678 634 thread_timings = [] 679 635 680 for w in workers.values(): 681 thread = w.thread 682 thread_timings.append({'name': thread.name(), 636 for thread in threads: 637 thread_timings.append({'name': thread.getName(), 683 638 'num_tests': thread.get_num_tests(), 684 639 'total_time': thread.get_total_time()}) … … 1052 1007 """Prints the run times for slow, timeout and crash tests. 1053 1008 Args: 1054 individual_test_timings: List of TestStats for all tests. 1009 individual_test_timings: List of dump_render_tree_thread.TestStats 1010 for all tests. 1055 1011 result_summary: summary object for test run 1056 1012 """ … … 1340 1296 return 0 1341 1297 1298 broker = message_broker.get(port, options) 1299 1342 1300 # We wrap any parts of the run that are slow or likely to raise exceptions 1343 1301 # in a try/finally to ensure that we clean up the logging configuration. 1344 1302 num_unexpected_results = -1 1345 1303 try: 1346 test_runner = TestRunner(port, options, printer )1304 test_runner = TestRunner(port, options, printer, broker) 1347 1305 test_runner._print_config() 1348 1306 … … 1373 1331 num_unexpected_results) 1374 1332 finally: 1333 broker.cleanup() 1375 1334 printer.cleanup() 1376 1335 -
trunk/WebKitTools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py
r73231 r73255 490 490 491 491 runner = run_webkit_tests.TestRunner(port=mock_port, options=Mock(), 492 printer=Mock())492 printer=Mock(), message_broker=Mock()) 493 493 expected_html = u"""<html> 494 494 <head> … … 508 508 # put the http tests first in the queue. 509 509 runner = TestRunnerWrapper(port=Mock(), options=Mock(), 510 printer=Mock())510 printer=Mock(), message_broker=Mock()) 511 511 512 512 test_list = [
Note: See TracChangeset
for help on using the changeset viewer.