Changeset 122394 in webkit


Ignore:
Timestamp:
Jul 11, 2012 5:45:58 PM (12 years ago)
Author:
dpranke@chromium.org
Message:

nrwt: add a MessagePool abstraction that the manager will call to replace the broker
https://bugs.webkit.org/show_bug.cgi?id=90511

Reviewed by Ojan Vafai.

This change introduces the new MessagePool abstraction that will
replace the classes in manager_worker_broker. It is a minimal
interface that tries to follow the conventions in
multiprocessing.Pool and concurrency.futures ... it provides a
context manager and a run() method that sends N messages to M
workers processes (starting workers as necessary) and waits for
them all to complete, handling cleanup as necessary. The caller
is responsible for providing a handle() method to handle
messages received from the workers.

This interface basically hides all of the multiprocessing logic from
the manager class.

The initial implementation of MessagePool is a simple shim
around the existing broker classes; a subsequent change will
replace all the other classes with a much simpler
implementation.

No additional tests are provided for now; existing tests should
provide adequate coverage, and I will add new unit tests for the
MessagePool class when I replace the existing implementation.

  • Scripts/webkitpy/layout_tests/controllers/manager.py:

(TestRunInterruptedException.reduce):
(Manager.init):
(Manager._run_tests):
(Manager._run_tests.instead):
(Manager.handle):
(Manager._handle_started_test):
(Manager._handle_finished_test_list):
(Manager._handle_finished_test):

  • Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py:

(get):
(_MessagePool):
(_MessagePool.init):
(_MessagePool.enter):
(_MessagePool.exit):
(_MessagePool.run):
(_MessagePool.wait):
(_MessagePool.is_done):
(_MessagePool._worker_is_done):
(_MessagePool._close):
(_MessagePool.handle_done):
(_MessagePool.handle_started_test):
(_MessagePool.handle_finished_test):
(_MessagePool.handle_finished_test_list):
(_MessagePool.handle_exception):
(_MessagePool._log_messages):
(_MessagePool._handle_worker_exception):
(_WorkerState):
(_WorkerState.for):
(_WorkerState.init):
(_WorkerState.repr):
(_get_broker):

  • Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py:

(make_broker):

Location:
trunk/Tools
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • trunk/Tools/ChangeLog

    r122391 r122394  
     12012-07-03  Dirk Pranke  <dpranke@chromium.org>
     2
     3        nrwt: add a MessagePool abstraction that the manager will call to replace the broker
     4        https://bugs.webkit.org/show_bug.cgi?id=90511
     5
     6        Reviewed by Ojan Vafai.
     7
     8        This change introduces the new MessagePool abstraction that will
     9        replace the classes in manager_worker_broker. It is a minimal
     10        interface that tries to follow the conventions in
     11        multiprocessing.Pool and concurrency.futures ... it provides a
     12        context manager and a run() method that sends N messages to M
     13        workers processes (starting workers as necessary) and waits for
     14        them all to complete, handling cleanup as necessary. The caller
     15        is responsible for providing a handle() method to handle
     16        messages received from the workers.
     17       
     18        This interface basically hides all of the multiprocessing logic from
     19        the manager class.
     20
     21        The initial implementation of MessagePool is a simple shim
     22        around the existing broker classes; a subsequent change will
     23        replace all the other classes with a much simpler
     24        implementation.
     25
     26        No additional tests are provided for now; existing tests should
     27        provide adequate coverage, and I will add new unit tests for the
     28        MessagePool class when I replace the existing implementation.
     29
     30        * Scripts/webkitpy/layout_tests/controllers/manager.py:
     31        (TestRunInterruptedException.__reduce__):
     32        (Manager.__init__):
     33        (Manager._run_tests):
     34        (Manager._run_tests.instead):
     35        (Manager.handle):
     36        (Manager._handle_started_test):
     37        (Manager._handle_finished_test_list):
     38        (Manager._handle_finished_test):
     39        * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py:
     40        (get):
     41        (_MessagePool):
     42        (_MessagePool.__init__):
     43        (_MessagePool.__enter__):
     44        (_MessagePool.__exit__):
     45        (_MessagePool.run):
     46        (_MessagePool.wait):
     47        (_MessagePool.is_done):
     48        (_MessagePool._worker_is_done):
     49        (_MessagePool._close):
     50        (_MessagePool.handle_done):
     51        (_MessagePool.handle_started_test):
     52        (_MessagePool.handle_finished_test):
     53        (_MessagePool.handle_finished_test_list):
     54        (_MessagePool.handle_exception):
     55        (_MessagePool._log_messages):
     56        (_MessagePool._handle_worker_exception):
     57        (_WorkerState):
     58        (_WorkerState.for):
     59        (_WorkerState.__init__):
     60        (_WorkerState.__repr__):
     61        (_get_broker):
     62        * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py:
     63        (make_broker):
     64
    1652012-07-11  Simon Fraser  <simon.fraser@apple.com>
    266
  • trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager.py

    r122384 r122394  
    267267
    268268
     269# Export this so callers don't need to know about manager_worker_broker.
    269270WorkerException = manager_worker_broker.WorkerException
    270271
     
    301302        self._options = options
    302303        self._printer = printer
    303         self._message_broker = None
    304304        self._expectations = None
    305305
     
    329329        self._all_results = []
    330330        self._group_stats = {}
     331        self._worker_stats = {}
    331332        self._current_result_summary = None
    332 
    333         # This maps worker names to the state we are tracking for each of them.
    334         self._worker_states = {}
    335 
    336         self.name = 'manager'
    337333
    338334    def collect_tests(self, args):
     
    747743        self._all_results = []
    748744        self._group_stats = {}
    749         self._worker_states = {}
     745        self._worker_stats = {}
    750746
    751747        keyboard_interrupted = False
    752748        interrupted = False
    753         thread_timings = []
    754749
    755750        self._printer.print_update('Sharding tests ...')
     
    778773
    779774        if self._options.dry_run:
    780             return (keyboard_interrupted, interrupted, thread_timings, self._group_stats, self._all_results)
     775            return (keyboard_interrupted, interrupted, self._worker_stats.values(), self._group_stats, self._all_results)
    781776
    782777        self._printer.print_update('Starting %s ...' % grammar.pluralize('worker', num_workers))
    783         for worker_number in xrange(num_workers):
    784             worker_connection = manager_connection.start_worker(worker_number)
    785             worker_state = _WorkerState(worker_number, worker_connection)
    786             self._worker_states[worker_connection.name] = worker_state
    787 
    788             time.sleep(self._port.worker_startup_delay_secs())
    789 
    790         self._printer.print_update("Starting testing ...")
    791         for shard in all_shards:
    792             # FIXME: Change 'test_list' to 'shard', make sharding public.
    793             manager_connection.post_message('test_list', shard.name, shard.test_inputs)
    794 
    795         # We post one 'stop' message for each worker. Because the stop message
    796         # are sent after all of the tests, and because each worker will stop
    797         # reading messsages after receiving a stop, we can be sure each
    798         # worker will get a stop message and hence they will all shut down.
    799         for _ in xrange(num_workers):
    800             manager_connection.post_message('stop')
    801778
    802779        try:
    803             while not self.is_done():
    804                 manager_connection.run_message_loop(delay_secs=1.0)
    805 
    806             # Make sure all of the workers have shut down (if possible).
    807             for worker_state in self._worker_states.values():
    808                 if worker_state.worker_connection.is_alive():
    809                     _log.debug('Waiting for worker %d to exit' % worker_state.number)
    810                     worker_state.worker_connection.join(5.0)
    811                     if worker_state.worker_connection.is_alive():
    812                         _log.error('Worker %d did not exit in time.' % worker_state.number)
    813 
     780            with manager_worker_broker.get(self, worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool:
     781                pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards)
    814782        except KeyboardInterrupt:
    815783            self._printer.flush()
    816784            self._printer.write('Interrupted, exiting ...')
    817             self.cancel_workers()
    818785            keyboard_interrupted = True
    819786        except TestRunInterruptedException, e:
    820787            _log.warning(e.reason)
    821             self.cancel_workers()
    822788            interrupted = True
    823         except WorkerException:
    824             self.cancel_workers()
    825             raise
    826789        except:
    827             # Unexpected exception; don't try to clean up workers.
    828790            _log.error("Exception raised, exiting")
    829             self.cancel_workers()
    830791            raise
    831792        finally:
    832             manager_connection.cleanup()
    833793            self.stop_servers_with_lock()
    834794
    835         thread_timings = [worker_state.stats for worker_state in self._worker_states.values()]
    836 
    837795        # FIXME: should this be a class instead of a tuple?
    838         return (interrupted, keyboard_interrupted, thread_timings, self._group_stats, self._all_results)
     796        return (interrupted, keyboard_interrupted, self._worker_stats.values(), self._group_stats, self._all_results)
    839797
    840798    def results_directory(self):
     
    14451403        self._port.show_results_html_file(results_filename)
    14461404
    1447 
    1448     def is_done(self):
    1449         worker_states = self._worker_states.values()
    1450         return worker_states and all(self._worker_is_done(worker_state) for worker_state in worker_states)
    1451 
    1452     # FIXME: Inline this function.
    1453     def _worker_is_done(self, worker_state):
    1454         return worker_state.done
    1455 
    1456     def cancel_workers(self):
    1457         for worker_state in self._worker_states.values():
    1458             worker_state.worker_connection.cancel()
    1459 
    1460     def handle_started_test(self, source, test_info, hang_timeout):
    1461         worker_state = self._worker_states[source]
    1462         worker_state.current_test_name = test_info.test_name
    1463         worker_state.next_timeout = time.time() + hang_timeout
    1464 
    1465     def handle_done(self, source, log_messages=None):
    1466         worker_state = self._worker_states[source]
    1467         worker_state.done = True
    1468         self._log_messages(log_messages)
    1469 
    1470     def handle_exception(self, source, exception_type, exception_value, stack):
    1471         if exception_type in (KeyboardInterrupt, TestRunInterruptedException):
    1472             raise exception_type(exception_value)
    1473         _log.error("%s raised %s('%s'):" % (
    1474                    source,
    1475                    exception_value.__class__.__name__,
    1476                    str(exception_value)))
    1477         self._log_worker_stack(stack)
    1478         raise WorkerException(str(exception_value))
    1479 
    1480     def handle_finished_test_list(self, source, list_name, num_tests, elapsed_time):
     1405    def handle(self, name, source, *args):
     1406        method = getattr(self, '_handle_' + name)
     1407        if method:
     1408            return method(source, *args)
     1409        raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args)))
     1410
     1411    def _handle_started_test(self, worker_name, test_input, test_timeout_sec):
     1412        # FIXME: log that we've started another test.
     1413        pass
     1414
     1415    def _handle_finished_test_list(self, worker_name, list_name, num_tests, elapsed_time):
    14811416        self._group_stats[list_name] = (num_tests, elapsed_time)
    14821417
     
    14931428                self.stop_servers_with_lock()
    14941429
    1495     def handle_finished_test(self, source, result, elapsed_time, log_messages=None):
    1496         worker_state = self._worker_states[source]
    1497         worker_state.next_timeout = None
    1498         worker_state.current_test_name = None
    1499         worker_state.stats['total_time'] += elapsed_time
    1500         worker_state.stats['num_tests'] += 1
    1501 
    1502         self._log_messages(log_messages)
     1430    def _handle_finished_test(self, worker_name, result, elapsed_time, log_messages=[]):
     1431        self._worker_stats.setdefault(worker_name, {'name': worker_name, 'num_tests': 0, 'total_time': 0})
     1432        self._worker_stats[worker_name]['total_time'] += elapsed_time
     1433        self._worker_stats[worker_name]['num_tests'] += 1
    15031434        self._all_results.append(result)
    15041435        self._update_summary_with_result(self._current_result_summary, result)
    1505 
    1506     def _log_messages(self, messages):
    1507         for message in messages:
    1508             logging.root.handle(message)
    1509 
    1510     def _log_worker_stack(self, stack):
    1511         webkitpydir = self._port.path_from_webkit_base('Tools', 'Scripts', 'webkitpy') + self._filesystem.sep
    1512         for filename, line_number, function_name, text in stack:
    1513             if filename.startswith(webkitpydir):
    1514                 filename = filename.replace(webkitpydir, '')
    1515             _log.error('  %s:%u (in %s)' % (filename, line_number, function_name))
    1516             _log.error('    %s' % text)
    15171436
    15181437
     
    15641483
    15651484    return [tryint(chunk) for chunk in re.split('(\d+)', string_to_split)]
    1566 
    1567 
    1568 class _WorkerState(object):
    1569     """A class for the manager to use to track the current state of the workers."""
    1570     def __init__(self, number, worker_connection):
    1571         self.worker_connection = worker_connection
    1572         self.number = number
    1573         self.done = False
    1574         self.current_test_name = None
    1575         self.next_timeout = None
    1576         self.stats = {}
    1577         self.stats['name'] = worker_connection.name
    1578         self.stats['num_tests'] = 0
    1579         self.stats['total_time'] = 0
    1580 
    1581     def __repr__(self):
    1582         return "_WorkerState(" + str(self.__dict__) + ")"
  • trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py

    r122384 r122394  
    2727# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    2828
    29 """Module for handling messages and concurrency for run-webkit-tests.
    30 
    31 This module implements a message broker that connects the manager to the
    32 workers: it provides a messaging abstraction and message loops (building on
    33 top of message_broker), and handles starting workers by launching processes.
    34 
    35 There are a lot of classes and objects involved in a fully connected system.
    36 They interact more or less like:
    37 
    38   Manager  -->  _InlineManager ---> _InlineWorker <-> Worker
    39      ^                    \               /              ^
    40      |                     v             v               |
    41      \-----------------------  Broker   ----------------/
    42 
    43 The broker simply distributes messages onto topics (named queues); the actual
    44 queues themselves are provided by the caller, as the queue's implementation
    45 requirements varies vary depending on the desired concurrency model
    46 (none/threads/processes).
    47 
    48 In order for shared-nothing messaging between processing to be possible,
    49 Messages must be picklable.
    50 
    51 The module defines one interface and two classes. Callers of this package
    52 must implement the BrokerClient interface, and most callers will create
    53 _BrokerConnections as well as Brokers.
    54 
    55 The classes relate to each other as:
    56 
    57     BrokerClient   ------>    _BrokerConnection
    58          ^                         |
    59          |                         v
    60          \----------------      _Broker
    61 
    62 (The BrokerClient never calls broker directly after it is created, only
    63 _BrokerConnection.  _BrokerConnection passes a reference to BrokerClient to
    64 _Broker, and _Broker only invokes that reference, never talking directly to
    65 BrokerConnection).
    66 """
     29"""Module for handling messages and concurrency for run-webkit-tests."""
    6730
    6831import cPickle
     
    7336import Queue
    7437import sys
     38import time
    7539import traceback
    7640
     
    9155
    9256
    93 def get(max_workers, client, worker_factory, host=None):
     57def get(caller, worker_factory, num_workers, worker_startup_delay_secs=0.0, host=None):
     58    """Returns an object that exposes a run() method that takes a list of test shards and runs them in parallel."""
     59    return _MessagePool(caller, worker_factory, num_workers, worker_startup_delay_secs, host)
     60
     61
     62class _MessagePool(object):
     63    def __init__(self, caller, worker_factory, num_workers, worker_startup_delay_secs=0.0, host=None):
     64        self._caller = caller
     65        self._worker_factory = worker_factory
     66        self._num_workers = num_workers
     67        self._worker_startup_delay_secs = worker_startup_delay_secs
     68        self._worker_states = {}
     69        self._host = host
     70        self.name = 'manager'
     71
     72    def __enter__(self):
     73        return self
     74
     75    def __exit__(self, exc_type, exc_value, traceback):
     76        self._close()
     77        return False
     78
     79    def run(self, shards):
     80        manager_connection = _get_broker(self._num_workers, self, self._worker_factory, self._host)
     81        for worker_number in xrange(self._num_workers):
     82            worker_connection = manager_connection.start_worker(worker_number)
     83            worker_state = _WorkerState(worker_number, worker_connection)
     84            self._worker_states[worker_connection.name] = worker_state
     85            time.sleep(self._worker_startup_delay_secs)
     86
     87        messages = list(shards)
     88        for message in messages:
     89            manager_connection.post_message(*message)
     90
     91        for _ in xrange(self._num_workers):
     92            manager_connection.post_message('stop')
     93
     94        self.wait(manager_connection)
     95
     96    def wait(self, manager_connection):
     97        try:
     98            while not self.is_done():
     99                manager_connection.run_message_loop(delay_secs=1.0)
     100        finally:
     101            self._close()
     102
     103    def is_done(self):
     104        worker_states = self._worker_states.values()
     105        return worker_states and all(worker_state.done for worker_state in worker_states)
     106
     107    def _close(self):
     108        for worker_state in self._worker_states.values():
     109            if worker_state.worker_connection.is_alive():
     110                worker_state.worker_connection.cancel()
     111                _log.debug('Waiting for worker %d to exit' % worker_state.number)
     112                worker_state.worker_connection.join(5.0)
     113                if worker_state.worker_connection.is_alive():
     114                    _log.error('Worker %d did not exit in time.' % worker_state.number)
     115
     116    def handle_done(self, source, log_messages=None):
     117        worker_state = self._worker_states[source]
     118        worker_state.done = True
     119        self._log_messages(log_messages)
     120
     121    def handle_started_test(self, *args):
     122        self._caller.handle('started_test', *args)
     123
     124    def handle_finished_test(self, *args):
     125        self._caller.handle('finished_test', *args)
     126
     127    def handle_finished_test_list(self, *args):
     128        self._caller.handle('finished_test_list', *args)
     129
     130    def handle_exception(self, *args):
     131        self._handle_worker_exception(*args)
     132
     133    def _log_messages(self, messages):
     134        for message in messages:
     135            logging.root.handle(message)
     136
     137    @staticmethod
     138    def _handle_worker_exception(source, exception_type, exception_value, stack):
     139        if exception_type == KeyboardInterrupt:
     140            raise exception_type(exception_value)
     141        _log.error("%s raised %s('%s'):" % (
     142                   source, exception_value.__class__.__name__, str(exception_value)))
     143        raise WorkerException(str(exception_value))
     144
     145
     146class _WorkerState(object):
     147    def __init__(self, number, worker_connection):
     148        self.worker_connection = worker_connection
     149        self.number = number
     150        self.done = False
     151
     152    def __repr__(self):
     153        return "_WorkerState(" + str(self.__dict__) + ")"
     154
     155
     156def _get_broker(max_workers, client, worker_factory, host=None):
    94157    """Return a connection to a manager/worker message_broker
    95158
     
    398461        raise NotImplementedError
    399462
     463    # FIXME: rename to yield_to_caller().
    400464    def yield_to_broker(self):
    401465        pass
  • trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py

    r122384 r122394  
    5252    starting_queue = start_queue
    5353    stopping_queue = stop_queue
    54     return manager_worker_broker.get(max_workers, manager, _TestWorker)
     54    return manager_worker_broker._get_broker(max_workers, manager, _TestWorker)
    5555
    5656
Note: See TracChangeset for help on using the changeset viewer.