Changeset 122394 in webkit
- Timestamp:
- Jul 11, 2012 5:45:58 PM (12 years ago)
- Location:
- trunk/Tools
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Tools/ChangeLog
r122391 r122394 1 2012-07-03 Dirk Pranke <dpranke@chromium.org> 2 3 nrwt: add a MessagePool abstraction that the manager will call to replace the broker 4 https://bugs.webkit.org/show_bug.cgi?id=90511 5 6 Reviewed by Ojan Vafai. 7 8 This change introduces the new MessagePool abstraction that will 9 replace the classes in manager_worker_broker. It is a minimal 10 interface that tries to follow the conventions in 11 multiprocessing.Pool and concurrency.futures ... it provides a 12 context manager and a run() method that sends N messages to M 13 workers processes (starting workers as necessary) and waits for 14 them all to complete, handling cleanup as necessary. The caller 15 is responsible for providing a handle() method to handle 16 messages received from the workers. 17 18 This interface basically hides all of the multiprocessing logic from 19 the manager class. 20 21 The initial implementation of MessagePool is a simple shim 22 around the existing broker classes; a subsequent change will 23 replace all the other classes with a much simpler 24 implementation. 25 26 No additional tests are provided for now; existing tests should 27 provide adequate coverage, and I will add new unit tests for the 28 MessagePool class when I replace the existing implementation. 29 30 * Scripts/webkitpy/layout_tests/controllers/manager.py: 31 (TestRunInterruptedException.__reduce__): 32 (Manager.__init__): 33 (Manager._run_tests): 34 (Manager._run_tests.instead): 35 (Manager.handle): 36 (Manager._handle_started_test): 37 (Manager._handle_finished_test_list): 38 (Manager._handle_finished_test): 39 * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py: 40 (get): 41 (_MessagePool): 42 (_MessagePool.__init__): 43 (_MessagePool.__enter__): 44 (_MessagePool.__exit__): 45 (_MessagePool.run): 46 (_MessagePool.wait): 47 (_MessagePool.is_done): 48 (_MessagePool._worker_is_done): 49 (_MessagePool._close): 50 (_MessagePool.handle_done): 51 (_MessagePool.handle_started_test): 52 (_MessagePool.handle_finished_test): 53 (_MessagePool.handle_finished_test_list): 54 (_MessagePool.handle_exception): 55 (_MessagePool._log_messages): 56 (_MessagePool._handle_worker_exception): 57 (_WorkerState): 58 (_WorkerState.for): 59 (_WorkerState.__init__): 60 (_WorkerState.__repr__): 61 (_get_broker): 62 * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py: 63 (make_broker): 64 1 65 2012-07-11 Simon Fraser <simon.fraser@apple.com> 2 66 -
trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager.py
r122384 r122394 267 267 268 268 269 # Export this so callers don't need to know about manager_worker_broker. 269 270 WorkerException = manager_worker_broker.WorkerException 270 271 … … 301 302 self._options = options 302 303 self._printer = printer 303 self._message_broker = None304 304 self._expectations = None 305 305 … … 329 329 self._all_results = [] 330 330 self._group_stats = {} 331 self._worker_stats = {} 331 332 self._current_result_summary = None 332 333 # This maps worker names to the state we are tracking for each of them.334 self._worker_states = {}335 336 self.name = 'manager'337 333 338 334 def collect_tests(self, args): … … 747 743 self._all_results = [] 748 744 self._group_stats = {} 749 self._worker_stat es = {}745 self._worker_stats = {} 750 746 751 747 keyboard_interrupted = False 752 748 interrupted = False 753 thread_timings = []754 749 755 750 self._printer.print_update('Sharding tests ...') … … 778 773 779 774 if self._options.dry_run: 780 return (keyboard_interrupted, interrupted, thread_timings, self._group_stats, self._all_results)775 return (keyboard_interrupted, interrupted, self._worker_stats.values(), self._group_stats, self._all_results) 781 776 782 777 self._printer.print_update('Starting %s ...' % grammar.pluralize('worker', num_workers)) 783 for worker_number in xrange(num_workers):784 worker_connection = manager_connection.start_worker(worker_number)785 worker_state = _WorkerState(worker_number, worker_connection)786 self._worker_states[worker_connection.name] = worker_state787 788 time.sleep(self._port.worker_startup_delay_secs())789 790 self._printer.print_update("Starting testing ...")791 for shard in all_shards:792 # FIXME: Change 'test_list' to 'shard', make sharding public.793 manager_connection.post_message('test_list', shard.name, shard.test_inputs)794 795 # We post one 'stop' message for each worker. Because the stop message796 # are sent after all of the tests, and because each worker will stop797 # reading messsages after receiving a stop, we can be sure each798 # worker will get a stop message and hence they will all shut down.799 for _ in xrange(num_workers):800 manager_connection.post_message('stop')801 778 802 779 try: 803 while not self.is_done(): 804 manager_connection.run_message_loop(delay_secs=1.0) 805 806 # Make sure all of the workers have shut down (if possible). 807 for worker_state in self._worker_states.values(): 808 if worker_state.worker_connection.is_alive(): 809 _log.debug('Waiting for worker %d to exit' % worker_state.number) 810 worker_state.worker_connection.join(5.0) 811 if worker_state.worker_connection.is_alive(): 812 _log.error('Worker %d did not exit in time.' % worker_state.number) 813 780 with manager_worker_broker.get(self, worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool: 781 pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards) 814 782 except KeyboardInterrupt: 815 783 self._printer.flush() 816 784 self._printer.write('Interrupted, exiting ...') 817 self.cancel_workers()818 785 keyboard_interrupted = True 819 786 except TestRunInterruptedException, e: 820 787 _log.warning(e.reason) 821 self.cancel_workers()822 788 interrupted = True 823 except WorkerException:824 self.cancel_workers()825 raise826 789 except: 827 # Unexpected exception; don't try to clean up workers.828 790 _log.error("Exception raised, exiting") 829 self.cancel_workers()830 791 raise 831 792 finally: 832 manager_connection.cleanup()833 793 self.stop_servers_with_lock() 834 794 835 thread_timings = [worker_state.stats for worker_state in self._worker_states.values()]836 837 795 # FIXME: should this be a class instead of a tuple? 838 return (interrupted, keyboard_interrupted, thread_timings, self._group_stats, self._all_results)796 return (interrupted, keyboard_interrupted, self._worker_stats.values(), self._group_stats, self._all_results) 839 797 840 798 def results_directory(self): … … 1445 1403 self._port.show_results_html_file(results_filename) 1446 1404 1447 1448 def is_done(self): 1449 worker_states = self._worker_states.values() 1450 return worker_states and all(self._worker_is_done(worker_state) for worker_state in worker_states) 1451 1452 # FIXME: Inline this function. 1453 def _worker_is_done(self, worker_state): 1454 return worker_state.done 1455 1456 def cancel_workers(self): 1457 for worker_state in self._worker_states.values(): 1458 worker_state.worker_connection.cancel() 1459 1460 def handle_started_test(self, source, test_info, hang_timeout): 1461 worker_state = self._worker_states[source] 1462 worker_state.current_test_name = test_info.test_name 1463 worker_state.next_timeout = time.time() + hang_timeout 1464 1465 def handle_done(self, source, log_messages=None): 1466 worker_state = self._worker_states[source] 1467 worker_state.done = True 1468 self._log_messages(log_messages) 1469 1470 def handle_exception(self, source, exception_type, exception_value, stack): 1471 if exception_type in (KeyboardInterrupt, TestRunInterruptedException): 1472 raise exception_type(exception_value) 1473 _log.error("%s raised %s('%s'):" % ( 1474 source, 1475 exception_value.__class__.__name__, 1476 str(exception_value))) 1477 self._log_worker_stack(stack) 1478 raise WorkerException(str(exception_value)) 1479 1480 def handle_finished_test_list(self, source, list_name, num_tests, elapsed_time): 1405 def handle(self, name, source, *args): 1406 method = getattr(self, '_handle_' + name) 1407 if method: 1408 return method(source, *args) 1409 raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args))) 1410 1411 def _handle_started_test(self, worker_name, test_input, test_timeout_sec): 1412 # FIXME: log that we've started another test. 1413 pass 1414 1415 def _handle_finished_test_list(self, worker_name, list_name, num_tests, elapsed_time): 1481 1416 self._group_stats[list_name] = (num_tests, elapsed_time) 1482 1417 … … 1493 1428 self.stop_servers_with_lock() 1494 1429 1495 def handle_finished_test(self, source, result, elapsed_time, log_messages=None): 1496 worker_state = self._worker_states[source] 1497 worker_state.next_timeout = None 1498 worker_state.current_test_name = None 1499 worker_state.stats['total_time'] += elapsed_time 1500 worker_state.stats['num_tests'] += 1 1501 1502 self._log_messages(log_messages) 1430 def _handle_finished_test(self, worker_name, result, elapsed_time, log_messages=[]): 1431 self._worker_stats.setdefault(worker_name, {'name': worker_name, 'num_tests': 0, 'total_time': 0}) 1432 self._worker_stats[worker_name]['total_time'] += elapsed_time 1433 self._worker_stats[worker_name]['num_tests'] += 1 1503 1434 self._all_results.append(result) 1504 1435 self._update_summary_with_result(self._current_result_summary, result) 1505 1506 def _log_messages(self, messages):1507 for message in messages:1508 logging.root.handle(message)1509 1510 def _log_worker_stack(self, stack):1511 webkitpydir = self._port.path_from_webkit_base('Tools', 'Scripts', 'webkitpy') + self._filesystem.sep1512 for filename, line_number, function_name, text in stack:1513 if filename.startswith(webkitpydir):1514 filename = filename.replace(webkitpydir, '')1515 _log.error(' %s:%u (in %s)' % (filename, line_number, function_name))1516 _log.error(' %s' % text)1517 1436 1518 1437 … … 1564 1483 1565 1484 return [tryint(chunk) for chunk in re.split('(\d+)', string_to_split)] 1566 1567 1568 class _WorkerState(object):1569 """A class for the manager to use to track the current state of the workers."""1570 def __init__(self, number, worker_connection):1571 self.worker_connection = worker_connection1572 self.number = number1573 self.done = False1574 self.current_test_name = None1575 self.next_timeout = None1576 self.stats = {}1577 self.stats['name'] = worker_connection.name1578 self.stats['num_tests'] = 01579 self.stats['total_time'] = 01580 1581 def __repr__(self):1582 return "_WorkerState(" + str(self.__dict__) + ")" -
trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py
r122384 r122394 27 27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 28 29 """Module for handling messages and concurrency for run-webkit-tests. 30 31 This module implements a message broker that connects the manager to the 32 workers: it provides a messaging abstraction and message loops (building on 33 top of message_broker), and handles starting workers by launching processes. 34 35 There are a lot of classes and objects involved in a fully connected system. 36 They interact more or less like: 37 38 Manager --> _InlineManager ---> _InlineWorker <-> Worker 39 ^ \ / ^ 40 | v v | 41 \----------------------- Broker ----------------/ 42 43 The broker simply distributes messages onto topics (named queues); the actual 44 queues themselves are provided by the caller, as the queue's implementation 45 requirements varies vary depending on the desired concurrency model 46 (none/threads/processes). 47 48 In order for shared-nothing messaging between processing to be possible, 49 Messages must be picklable. 50 51 The module defines one interface and two classes. Callers of this package 52 must implement the BrokerClient interface, and most callers will create 53 _BrokerConnections as well as Brokers. 54 55 The classes relate to each other as: 56 57 BrokerClient ------> _BrokerConnection 58 ^ | 59 | v 60 \---------------- _Broker 61 62 (The BrokerClient never calls broker directly after it is created, only 63 _BrokerConnection. _BrokerConnection passes a reference to BrokerClient to 64 _Broker, and _Broker only invokes that reference, never talking directly to 65 BrokerConnection). 66 """ 29 """Module for handling messages and concurrency for run-webkit-tests.""" 67 30 68 31 import cPickle … … 73 36 import Queue 74 37 import sys 38 import time 75 39 import traceback 76 40 … … 91 55 92 56 93 def get(max_workers, client, worker_factory, host=None): 57 def get(caller, worker_factory, num_workers, worker_startup_delay_secs=0.0, host=None): 58 """Returns an object that exposes a run() method that takes a list of test shards and runs them in parallel.""" 59 return _MessagePool(caller, worker_factory, num_workers, worker_startup_delay_secs, host) 60 61 62 class _MessagePool(object): 63 def __init__(self, caller, worker_factory, num_workers, worker_startup_delay_secs=0.0, host=None): 64 self._caller = caller 65 self._worker_factory = worker_factory 66 self._num_workers = num_workers 67 self._worker_startup_delay_secs = worker_startup_delay_secs 68 self._worker_states = {} 69 self._host = host 70 self.name = 'manager' 71 72 def __enter__(self): 73 return self 74 75 def __exit__(self, exc_type, exc_value, traceback): 76 self._close() 77 return False 78 79 def run(self, shards): 80 manager_connection = _get_broker(self._num_workers, self, self._worker_factory, self._host) 81 for worker_number in xrange(self._num_workers): 82 worker_connection = manager_connection.start_worker(worker_number) 83 worker_state = _WorkerState(worker_number, worker_connection) 84 self._worker_states[worker_connection.name] = worker_state 85 time.sleep(self._worker_startup_delay_secs) 86 87 messages = list(shards) 88 for message in messages: 89 manager_connection.post_message(*message) 90 91 for _ in xrange(self._num_workers): 92 manager_connection.post_message('stop') 93 94 self.wait(manager_connection) 95 96 def wait(self, manager_connection): 97 try: 98 while not self.is_done(): 99 manager_connection.run_message_loop(delay_secs=1.0) 100 finally: 101 self._close() 102 103 def is_done(self): 104 worker_states = self._worker_states.values() 105 return worker_states and all(worker_state.done for worker_state in worker_states) 106 107 def _close(self): 108 for worker_state in self._worker_states.values(): 109 if worker_state.worker_connection.is_alive(): 110 worker_state.worker_connection.cancel() 111 _log.debug('Waiting for worker %d to exit' % worker_state.number) 112 worker_state.worker_connection.join(5.0) 113 if worker_state.worker_connection.is_alive(): 114 _log.error('Worker %d did not exit in time.' % worker_state.number) 115 116 def handle_done(self, source, log_messages=None): 117 worker_state = self._worker_states[source] 118 worker_state.done = True 119 self._log_messages(log_messages) 120 121 def handle_started_test(self, *args): 122 self._caller.handle('started_test', *args) 123 124 def handle_finished_test(self, *args): 125 self._caller.handle('finished_test', *args) 126 127 def handle_finished_test_list(self, *args): 128 self._caller.handle('finished_test_list', *args) 129 130 def handle_exception(self, *args): 131 self._handle_worker_exception(*args) 132 133 def _log_messages(self, messages): 134 for message in messages: 135 logging.root.handle(message) 136 137 @staticmethod 138 def _handle_worker_exception(source, exception_type, exception_value, stack): 139 if exception_type == KeyboardInterrupt: 140 raise exception_type(exception_value) 141 _log.error("%s raised %s('%s'):" % ( 142 source, exception_value.__class__.__name__, str(exception_value))) 143 raise WorkerException(str(exception_value)) 144 145 146 class _WorkerState(object): 147 def __init__(self, number, worker_connection): 148 self.worker_connection = worker_connection 149 self.number = number 150 self.done = False 151 152 def __repr__(self): 153 return "_WorkerState(" + str(self.__dict__) + ")" 154 155 156 def _get_broker(max_workers, client, worker_factory, host=None): 94 157 """Return a connection to a manager/worker message_broker 95 158 … … 398 461 raise NotImplementedError 399 462 463 # FIXME: rename to yield_to_caller(). 400 464 def yield_to_broker(self): 401 465 pass -
trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py
r122384 r122394 52 52 starting_queue = start_queue 53 53 stopping_queue = stop_queue 54 return manager_worker_broker. get(max_workers, manager, _TestWorker)54 return manager_worker_broker._get_broker(max_workers, manager, _TestWorker) 55 55 56 56
Note: See TracChangeset
for help on using the changeset viewer.