Changeset 78398 in webkit


Ignore:
Timestamp:
Feb 11, 2011 5:42:00 PM (13 years ago)
Author:
dpranke@chromium.org
Message:

2011-02-11 Dirk Pranke <dpranke@chromium.org>

Reviewed by Tony Chang.

This patch adds to NRWT most of the support needed to run the new
message-based workers in separate threads or processes. The code
isn't fully complete yet because we don't support cancel() or
is_alive().

https://bugs.webkit.org/show_bug.cgi?id=54070

  • Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py:
  • Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker_unittest.py:
  • Scripts/webkitpy/layout_tests/layout_package/test_runner2.py:
  • Scripts/webkitpy/layout_tests/port/base.py:
  • Scripts/webkitpy/layout_tests/port/mock_drt.py:
  • Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:
Location:
trunk/Tools
Files:
7 edited

Legend:

Unmodified
Added
Removed
  • trunk/Tools/ChangeLog

    r78359 r78398  
     12011-02-11  Dirk Pranke  <dpranke@chromium.org>
     2
     3        Reviewed by Tony Chang.
     4
     5        This patch adds to NRWT most of the support needed to run the new
     6        message-based workers in separate threads or processes. The code
     7        isn't fully complete yet because we don't support cancel() or
     8        is_alive().
     9
     10        https://bugs.webkit.org/show_bug.cgi?id=54070
     11
     12        * Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py:
     13        * Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker_unittest.py:
     14        * Scripts/webkitpy/layout_tests/layout_package/test_runner2.py:
     15        * Scripts/webkitpy/layout_tests/port/base.py:
     16        * Scripts/webkitpy/layout_tests/port/mock_drt.py:
     17        * Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py:
     18
    1192011-02-11  Sailesh Agrawal  <sail@chromium.org>
    220
  • trunk/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py

    r78302 r78398  
    4747import optparse
    4848import Queue
     49import thread
    4950import threading
     51import time
     52
    5053
    5154# Handle Python < 2.6 where multiprocessing isn't available.
     55#
     56# _Multiprocessing_Process is needed so that _MultiProcessWorker
     57# can be defined with or without multiprocessing.
    5258try:
    5359    import multiprocessing
     60    _Multiprocessing_Process = multiprocessing.Process
    5461except ImportError:
    5562    multiprocessing = None
    56 
    57 
     63    _Multiprocessing_Process = threading.Thread
     64
     65
     66from webkitpy.layout_tests import port
    5867from webkitpy.layout_tests.layout_package import message_broker2
    5968
     
    165174
    166175    def start_worker(self, worker_number):
    167         self._inline_worker = _InlineWorker(self._broker, self._port, self._client,
    168             self._worker_class, worker_number)
     176        self._inline_worker = _InlineWorkerConnection(self._broker, self._port,
     177            self._client, self._worker_class, worker_number)
    169178        return self._inline_worker
    170179
     
    178187class _ThreadedManager(_ManagerConnection):
    179188    def __init__(self, broker, port, options, client, worker_class):
    180         raise NotImplementedError
     189        _ManagerConnection.__init__(self, broker, options, client, worker_class)
     190        self._port = port
     191
     192    def start_worker(self, worker_number):
     193        worker_connection = _ThreadedWorkerConnection(self._broker, self._port,
     194            self._worker_class, worker_number)
     195        worker_connection.start()
     196        return worker_connection
    181197
    182198
    183199class _MultiProcessManager(_ManagerConnection):
    184200    def __init__(self, broker, port, options, client, worker_class):
    185         raise NotImplementedError
     201        # Note that this class does not keep a handle to the actual port
     202        # object, because it isn't Picklable. Instead it keeps the port
     203        # name and recreates the port in the child process from the name
     204        # and options.
     205        _ManagerConnection.__init__(self, broker, options, client, worker_class)
     206        self._platform_name = port.real_name()
     207
     208    def start_worker(self, worker_number):
     209        worker_connection = _MultiProcessWorkerConnection(self._broker, self._platform_name,
     210            self._worker_class, worker_number, self._options)
     211        worker_connection.start()
     212        return worker_connection
    186213
    187214
     
    193220                                                  ANY_WORKER_TOPIC, MANAGER_TOPIC)
    194221
    195     def run(self):
    196         raise NotImplementedError
    197 
    198222    def yield_to_broker(self):
    199223        pass
    200224
    201225
    202 class _InlineWorker(_WorkerConnection):
     226class _InlineWorkerConnection(_WorkerConnection):
    203227    def __init__(self, broker, port, manager_client, worker_class, worker_number):
    204228        _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options)
     
    211235    def yield_to_broker(self):
    212236        self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client)
     237
     238
     239class _Thread(threading.Thread):
     240    def __init__(self, worker_connection, port, client):
     241        threading.Thread.__init__(self)
     242        self._worker_connection = worker_connection
     243        self._port = port
     244        self._client = client
     245
     246    def run(self):
     247        # FIXME: We can remove this once everyone is on 2.6.
     248        if not hasattr(self, 'ident'):
     249            self.ident = thread.get_ident()
     250        self._client.run(self._port)
     251
     252
     253class _ThreadedWorkerConnection(_WorkerConnection):
     254    def __init__(self, broker, port, worker_class, worker_number):
     255        _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options)
     256        self._thread = _Thread(self, port, self._client)
     257
     258    def start(self):
     259        self._thread.start()
     260
     261
     262class _Process(_Multiprocessing_Process):
     263    def __init__(self, worker_connection, platform_name, options, client):
     264        _Multiprocessing_Process.__init__(self)
     265        self._worker_connection = worker_connection
     266        self._platform_name = platform_name
     267        self._options = options
     268        self._client = client
     269
     270    def run(self):
     271        logging.basicConfig()
     272        port_obj = port.get(self._platform_name, self._options)
     273        self._client.run(port_obj)
     274
     275
     276class _MultiProcessWorkerConnection(_WorkerConnection):
     277    def __init__(self, broker, platform_name, worker_class, worker_number, options):
     278        _WorkerConnection.__init__(self, broker, worker_class, worker_number, options)
     279        self._proc = _Process(self, platform_name, options, self._client)
     280
     281    def start(self):
     282        self._proc.start()
  • trunk/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker_unittest.py

    r78302 r78398  
    2828
    2929import optparse
     30import Queue
    3031import sys
    3132import unittest
     
    4445
    4546
    46 class TestWorker(object):
    47     pass
     47class TestWorker(manager_worker_broker.AbstractWorker):
     48    def __init__(self, broker_connection, worker_number, options):
     49        self._broker_connection = broker_connection
     50        self._options = options
     51        self._worker_number = worker_number
     52        self._name = 'TestWorker/%d' % worker_number
     53        self._stopped = False
     54
     55    def handle_stop(self, src):
     56        self._stopped = True
     57
     58    def handle_test(self, src, an_int, a_str):
     59        assert an_int == 1
     60        assert a_str == "hello, world"
     61        self._broker_connection.post_message('test', 2, 'hi, everybody')
     62
     63    def is_done(self):
     64        return self._stopped
     65
     66    def name(self):
     67        return self._name
     68
     69    def start(self):
     70        pass
     71
     72    def run(self, port):
     73        try:
     74            self._broker_connection.run_message_loop()
     75            self._broker_connection.yield_to_broker()
     76            self._broker_connection.post_message('done')
     77        except Exception, e:
     78            self._broker_connection.post_message('exception', (type(e), str(e), None))
    4879
    4980
     
    6697
    6798    def test_get__threads(self):
    68         self.assertRaises(NotImplementedError, make_broker, self, 'threads')
     99        self.assertTrue(make_broker(self, 'threads') is not None)
    69100
    70101    def test_get__processes(self):
    71102        if multiprocessing:
    72             self.assertRaises(NotImplementedError, make_broker, self, 'processes')
     103            self.assertTrue(make_broker(self, 'processes') is not None)
    73104        else:
    74105            self.assertRaises(ValueError, make_broker, self, 'processes')
     
    77108        self.assertRaises(ValueError, make_broker, self, 'unknown')
    78109
     110
     111class _TestsMixin(object):
     112    """Mixin class that implements a series of tests to enforce the
     113    contract all implementations must follow."""
     114
     115    #
     116    # Methods to implement the Manager side of the ClientInterface
     117    #
     118    def name(self):
     119        return 'Tester'
     120
     121    def is_done(self):
     122        return self._done
     123
     124    #
     125    # Handlers for the messages the TestWorker may send.
     126    #
     127    def handle_done(self, src):
     128        self._done = True
     129
     130    def handle_test(self, src, an_int, a_str):
     131        self._an_int = an_int
     132        self._a_str = a_str
     133
     134    def handle_exception(self, src, exc_info):
     135        self._exception = exc_info
     136        self._done = True
     137
     138    #
     139    # Testing helper methods
     140    #
     141    def setUp(self):
     142        self._an_int = None
     143        self._a_str = None
     144        self._broker = None
     145        self._done = False
     146        self._exception = None
     147        self._worker_model = None
     148
     149    def make_broker(self):
     150        self._broker = make_broker(self, self._worker_model)
     151
     152    #
     153    # Actual unit tests
     154    #
     155    def test_done(self):
     156        if not self._worker_model:
     157            return
     158        self.make_broker()
     159        worker = self._broker.start_worker(0)
     160        self._broker.post_message('test', 1, 'hello, world')
     161        self._broker.post_message('stop')
     162        self._broker.run_message_loop()
     163        self.assertTrue(self.is_done())
     164        self.assertEqual(self._an_int, 2)
     165        self.assertEqual(self._a_str, 'hi, everybody')
     166
     167    def test_unknown_message(self):
     168        if not self._worker_model:
     169            return
     170        self.make_broker()
     171        worker = self._broker.start_worker(0)
     172        self._broker.post_message('unknown')
     173        self._broker.run_message_loop()
     174
     175        self.assertTrue(self.is_done())
     176        self.assertEquals(self._exception[0], ValueError)
     177        self.assertEquals(self._exception[1],
     178            "TestWorker/0: received message 'unknown' it couldn't handle")
     179
     180
     181class InlineBrokerTests(_TestsMixin, unittest.TestCase):
     182    def setUp(self):
     183        _TestsMixin.setUp(self)
     184        self._worker_model = 'inline'
     185
     186
     187class MultiProcessBrokerTests(_TestsMixin, unittest.TestCase):
     188    def setUp(self):
     189        _TestsMixin.setUp(self)
     190        if multiprocessing:
     191            self._worker_model = 'processes'
     192        else:
     193            self._worker_model = None
     194
     195    def queue(self):
     196        return multiprocessing.Queue()
     197
     198
     199class ThreadedBrokerTests(_TestsMixin, unittest.TestCase):
     200    def setUp(self):
     201        _TestsMixin.setUp(self)
     202        self._worker_model = 'threads'
     203
     204
     205class FunctionsTest(unittest.TestCase):
    79206    def test_runtime_options(self):
    80207        option_list = manager_worker_broker.runtime_options()
     
    83210        self.assertTrue(options)
    84211
    85 # FIXME: Add in unit tests for the managers, coverage tests for the interfaces.
     212
     213class InterfaceTest(unittest.TestCase):
     214    # These tests mostly exist to pacify coverage.
     215
     216    # FIXME: There must be a better way to do this and also verify
     217    # that classes do implement every abstract method in an interface.
     218    def test_managerconnection_is_abstract(self):
     219        # Test that all the base class methods are abstract and have the
     220        # signature we expect.
     221        broker = make_broker(self, 'inline')
     222        obj = manager_worker_broker._ManagerConnection(broker._broker, None, self, None)
     223        self.assertRaises(NotImplementedError, obj.start_worker, 0)
    86224
    87225
  • trunk/Tools/Scripts/webkitpy/layout_tests/layout_package/test_runner2.py

    r78302 r78398  
    4646class TestRunner2(test_runner.TestRunner):
    4747    def __init__(self, port, options, printer):
    48         if options.worker_model in ('threads', 'processes'):
    49             raise ValueError('--worker-model=%s not supported yet' % options.worker_model)
    50 
    5148        test_runner.TestRunner.__init__(self, port, options, printer)
    5249        self._all_results = []
  • trunk/Tools/Scripts/webkitpy/layout_tests/port/base.py

    r78175 r78398  
    468468        return 'cpu'
    469469
     470    def real_name(self):
     471        """Returns the actual name of the port, not the delegate's."""
     472        return self.name()
     473
    470474    def get_option(self, name, default_value=None):
    471475        # FIXME: Eventually we should not have to do a test for
  • trunk/Tools/Scripts/webkitpy/layout_tests/port/mock_drt.py

    r77431 r78398  
    5454            kwargs['port_name'] = kwargs['port_name'][len(prefix):]
    5555        self.__delegate = factory.get(**kwargs)
     56        self.__real_name = prefix + self.__delegate.name()
     57
     58    def real_name(self):
     59        return self.__real_name
    5660
    5761    def __getattr__(self, name):
     
    272276
    273277
    274 
    275278if __name__ == '__main__':
    276279    fs = filesystem.FileSystem()
  • trunk/Tools/Scripts/webkitpy/layout_tests/run_webkit_tests_unittest.py

    r78302 r78398  
    468468
    469469    def test_worker_model__processes(self):
    470         self.assertRaises(ValueError, logging_run, ['--worker-model', 'processes'])
     470        if compare_version(sys, '2.6')[0] >= 0:
     471            self.assertTrue(passing_run(['--worker-model', 'processes']))
    471472
    472473    def test_worker_model__threads(self):
    473         self.assertRaises(ValueError, logging_run, ['--worker-model', 'threads'])
     474        self.assertTrue(passing_run(['--worker-model', 'threads']))
    474475
    475476    def test_worker_model__unknown(self):
Note: See TracChangeset for help on using the changeset viewer.