Changeset 78506 in webkit


Ignore:
Timestamp:
Feb 14, 2011 2:13:15 PM (13 years ago)
Author:
dpranke@chromium.org
Message:

2011-02-14 Dirk Pranke <dpranke@chromium.org>

Reviewed by Tony Chang.

nrwt multiprocessing: add code to handle interrupts and wedged
threads.
https://bugs.webkit.org/show_bug.cgi?id=54072

  • Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py: Adds the cancel(), is_alive(), join(), and log_wedged_worker() methods to the WorkerConnection class
  • Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker_unittest.py:
  • Scripts/webkitpy/layout_tests/layout_package/test_runner2.py:
  • Scripts/webkitpy/layout_tests/layout_package/worker.py: Adds the cancel() method to the Worker class
Location:
trunk/Tools
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • trunk/Tools/ChangeLog

    r78502 r78506  
     12011-02-14  Dirk Pranke  <dpranke@chromium.org>
     2
     3        Reviewed by Tony Chang.
     4
     5        nrwt multiprocessing: add code to handle interrupts and wedged
     6        threads.
     7        https://bugs.webkit.org/show_bug.cgi?id=54072
     8
     9        * Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py:
     10        Adds the cancel(), is_alive(), join(), and log_wedged_worker()
     11        methods to the WorkerConnection class
     12
     13        * Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker_unittest.py:
     14        * Scripts/webkitpy/layout_tests/layout_package/test_runner2.py:
     15        * Scripts/webkitpy/layout_tests/layout_package/worker.py:
     16        Adds the cancel() method to the Worker class
     17
    1182011-02-14  Dirk Pranke  <dpranke@chromium.org>
    219
  • trunk/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py

    r78398 r78506  
    5353
    5454# Handle Python < 2.6 where multiprocessing isn't available.
    55 #
    56 # _Multiprocessing_Process is needed so that _MultiProcessWorker
    57 # can be defined with or without multiprocessing.
    5855try:
    5956    import multiprocessing
    60     _Multiprocessing_Process = multiprocessing.Process
    6157except ImportError:
    6258    multiprocessing = None
    63     _Multiprocessing_Process = threading.Thread
    64 
    65 
     59
     60
     61from webkitpy.common.system import stack_utils
    6662from webkitpy.layout_tests import port
    6763from webkitpy.layout_tests.layout_package import message_broker2
     
    220216                                                  ANY_WORKER_TOPIC, MANAGER_TOPIC)
    221217
     218    def cancel(self):
     219        raise NotImplementedError
     220
     221    def is_alive(self):
     222        raise NotImplementedError
     223
     224    def join(self, timeout):
     225        raise NotImplementedError
     226
     227    def log_wedged_worker(self, test_name):
     228        raise NotImplementedError
     229
    222230    def yield_to_broker(self):
    223231        pass
     
    227235    def __init__(self, broker, port, manager_client, worker_class, worker_number):
    228236        _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options)
     237        self._alive = False
    229238        self._port = port
    230239        self._manager_client = manager_client
    231240
     241    def cancel(self):
     242        self._client.cancel()
     243
     244    def is_alive(self):
     245        return self._alive
     246
     247    def join(self, timeout):
     248        assert not self._alive
     249
     250    def log_wedged_worker(self, test_name):
     251        assert False, "_InlineWorkerConnection.log_wedged_worker() called"
     252
    232253    def run(self):
     254        self._alive = True
    233255        self._client.run(self._port)
     256        self._alive = False
    234257
    235258    def yield_to_broker(self):
     
    244267        self._client = client
    245268
     269    def cancel(self):
     270        return self._client.cancel()
     271
     272    def log_wedged_worker(self, test_name):
     273        stack_utils.log_thread_state(_log.error, self._client.name(), self.ident, " is wedged on test %s" % test_name)
     274
    246275    def run(self):
    247276        # FIXME: We can remove this once everyone is on 2.6.
     
    256285        self._thread = _Thread(self, port, self._client)
    257286
     287    def cancel(self):
     288        return self._thread.cancel()
     289
     290    def is_alive(self):
     291        # FIXME: Change this to is_alive once everyone is on 2.6.
     292        return self._thread.isAlive()
     293
     294    def join(self, timeout):
     295        return self._thread.join(timeout)
     296
     297    def log_wedged_worker(self, test_name):
     298        return self._thread.log_wedged_worker(test_name)
     299
    258300    def start(self):
    259301        self._thread.start()
    260302
    261303
    262 class _Process(_Multiprocessing_Process):
    263     def __init__(self, worker_connection, platform_name, options, client):
    264         _Multiprocessing_Process.__init__(self)
    265         self._worker_connection = worker_connection
    266         self._platform_name = platform_name
    267         self._options = options
    268         self._client = client
    269 
    270     def run(self):
    271         logging.basicConfig()
    272         port_obj = port.get(self._platform_name, self._options)
    273         self._client.run(port_obj)
     304if multiprocessing:
     305
     306    class _Process(multiprocessing.Process):
     307        def __init__(self, worker_connection, platform_name, options, client):
     308            multiprocessing.Process.__init__(self)
     309            self._worker_connection = worker_connection
     310            self._platform_name = platform_name
     311            self._options = options
     312            self._client = client
     313
     314        def log_wedged_worker(self, test_name):
     315            _log.error("%s (pid %d) is wedged on test %s" % (self.name, self.pid, test_name))
     316
     317        def run(self):
     318            logging.basicConfig()
     319            port_obj = port.get(self._platform_name, self._options)
     320            self._client.run(port_obj)
    274321
    275322
     
    279326        self._proc = _Process(self, platform_name, options, self._client)
    280327
     328    def cancel(self):
     329        return self._proc.terminate()
     330
     331    def is_alive(self):
     332        return self._proc.is_alive()
     333
     334    def join(self, timeout):
     335        return self._proc.join(timeout)
     336
     337    def log_wedged_worker(self, test_name):
     338        return self._proc.log_wedged_worker(test_name)
     339
    281340    def start(self):
    282341        self._proc.start()
  • trunk/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker_unittest.py

    r78398 r78506  
    4545
    4646
    47 class TestWorker(manager_worker_broker.AbstractWorker):
    48     def __init__(self, broker_connection, worker_number, options):
    49         self._broker_connection = broker_connection
    50         self._options = options
    51         self._worker_number = worker_number
    52         self._name = 'TestWorker/%d' % worker_number
    53         self._stopped = False
    54 
    55     def handle_stop(self, src):
    56         self._stopped = True
    57 
    58     def handle_test(self, src, an_int, a_str):
    59         assert an_int == 1
    60         assert a_str == "hello, world"
    61         self._broker_connection.post_message('test', 2, 'hi, everybody')
    62 
    63     def is_done(self):
    64         return self._stopped
    65 
    66     def name(self):
    67         return self._name
    68 
    69     def start(self):
    70         pass
    71 
    72     def run(self, port):
    73         try:
    74             self._broker_connection.run_message_loop()
    75             self._broker_connection.yield_to_broker()
    76             self._broker_connection.post_message('done')
    77         except Exception, e:
    78             self._broker_connection.post_message('exception', (type(e), str(e), None))
     47def worker_maker(starting_queue=None, stopping_queue=None):
     48    class _TestWorker(manager_worker_broker.AbstractWorker):
     49        def __init__(self, broker_connection, worker_number, options):
     50            self._broker_connection = broker_connection
     51            self._options = options
     52            self._worker_number = worker_number
     53            self._name = 'TestWorker/%d' % worker_number
     54            self._stopped = False
     55            self._canceled = False
     56            self._starting_queue = starting_queue
     57            self._stopping_queue = stopping_queue
     58
     59        def handle_stop(self, src):
     60            self._stopped = True
     61
     62        def handle_test(self, src, an_int, a_str):
     63            assert an_int == 1
     64            assert a_str == "hello, world"
     65            self._broker_connection.post_message('test', 2, 'hi, everybody')
     66
     67        def is_done(self):
     68            return self._stopped or self._canceled
     69
     70        def name(self):
     71            return self._name
     72
     73        def cancel(self):
     74            self._canceled = True
     75
     76        def run(self, port):
     77            if self._starting_queue:
     78                self._starting_queue.put('')
     79            if self._stopping_queue:
     80                self._stopping_queue.get()
     81            try:
     82                self._broker_connection.run_message_loop()
     83                self._broker_connection.yield_to_broker()
     84                self._broker_connection.post_message('done')
     85            except Exception, e:
     86                self._broker_connection.post_message('exception', (type(e), str(e), None))
     87    return _TestWorker
    7988
    8089
     
    8695
    8796
    88 def make_broker(manager, worker_model):
     97def make_broker(manager, worker_model, starting_queue=None, stopping_queue=None):
    8998    options = get_options(worker_model)
    9099    return manager_worker_broker.get(port.get("test"), options, manager,
    91                                      TestWorker)
     100                                     worker_maker(starting_queue, stopping_queue))
    92101
    93102
     
    113122    contract all implementations must follow."""
    114123
    115     #
    116     # Methods to implement the Manager side of the ClientInterface
    117     #
    118124    def name(self):
    119125        return 'Tester'
     
    122128        return self._done
    123129
    124     #
    125     # Handlers for the messages the TestWorker may send.
    126     #
    127130    def handle_done(self, src):
    128131        self._done = True
     
    136139        self._done = True
    137140
    138     #
    139     # Testing helper methods
    140     #
    141141    def setUp(self):
    142142        self._an_int = None
     
    147147        self._worker_model = None
    148148
    149     def make_broker(self):
    150         self._broker = make_broker(self, self._worker_model)
    151 
    152     #
    153     # Actual unit tests
    154     #
     149    def make_broker(self, starting_queue=None, stopping_queue=None):
     150        self._broker = make_broker(self, self._worker_model, starting_queue,
     151                                   stopping_queue)
     152
     153    def test_cancel(self):
     154        self.make_broker()
     155        worker = self._broker.start_worker(0)
     156        worker.cancel()
     157        self._broker.post_message('test', 1, 'hello, world')
     158        worker.join(0.5)
     159        self.assertFalse(worker.is_alive())
     160
    155161    def test_done(self):
    156         if not self._worker_model:
    157             return
    158162        self.make_broker()
    159163        worker = self._broker.start_worker(0)
     
    161165        self._broker.post_message('stop')
    162166        self._broker.run_message_loop()
     167        worker.join(0.5)
     168        self.assertFalse(worker.is_alive())
    163169        self.assertTrue(self.is_done())
    164170        self.assertEqual(self._an_int, 2)
    165171        self.assertEqual(self._a_str, 'hi, everybody')
    166172
     173    def test_log_wedged_worker(self):
     174        starting_queue = self.queue()
     175        stopping_queue = self.queue()
     176        self.make_broker(starting_queue, stopping_queue)
     177        oc = outputcapture.OutputCapture()
     178        oc.capture_output()
     179        try:
     180            worker = self._broker.start_worker(0)
     181            starting_queue.get()
     182            worker.log_wedged_worker('test_name')
     183            stopping_queue.put('')
     184            self._broker.post_message('stop')
     185            self._broker.run_message_loop()
     186            worker.join(0.5)
     187            self.assertFalse(worker.is_alive())
     188            self.assertTrue(self.is_done())
     189        finally:
     190            oc.restore_output()
     191
    167192    def test_unknown_message(self):
    168         if not self._worker_model:
    169             return
    170193        self.make_broker()
    171194        worker = self._broker.start_worker(0)
    172195        self._broker.post_message('unknown')
    173196        self._broker.run_message_loop()
     197        worker.join(0.5)
    174198
    175199        self.assertTrue(self.is_done())
     200        self.assertFalse(worker.is_alive())
    176201        self.assertEquals(self._exception[0], ValueError)
    177202        self.assertEquals(self._exception[1],
     
    184209        self._worker_model = 'inline'
    185210
    186 
    187 class MultiProcessBrokerTests(_TestsMixin, unittest.TestCase):
    188     def setUp(self):
    189         _TestsMixin.setUp(self)
    190         if multiprocessing:
     211    def test_log_wedged_worker(self):
     212        self.make_broker()
     213        worker = self._broker.start_worker(0)
     214        self.assertRaises(AssertionError, worker.log_wedged_worker, None)
     215
     216
     217if multiprocessing:
     218
     219    class MultiProcessBrokerTests(_TestsMixin, unittest.TestCase):
     220        def setUp(self):
     221            _TestsMixin.setUp(self)
    191222            self._worker_model = 'processes'
    192         else:
    193             self._worker_model = None
    194 
    195     def queue(self):
    196         return multiprocessing.Queue()
     223
     224        def queue(self):
     225            return multiprocessing.Queue()
    197226
    198227
     
    201230        _TestsMixin.setUp(self)
    202231        self._worker_model = 'threads'
     232
     233    def queue(self):
     234        return Queue.Queue()
    203235
    204236
     
    223255        self.assertRaises(NotImplementedError, obj.start_worker, 0)
    224256
     257    def test_workerconnection_is_abstract(self):
     258        # Test that all the base class methods are abstract and have the
     259        # signature we expect.
     260        broker = make_broker(self, 'inline')
     261        obj = manager_worker_broker._WorkerConnection(broker._broker, worker_maker(), 0, None)
     262        self.assertRaises(NotImplementedError, obj.cancel)
     263        self.assertRaises(NotImplementedError, obj.is_alive)
     264        self.assertRaises(NotImplementedError, obj.join, None)
     265        self.assertRaises(NotImplementedError, obj.log_wedged_worker, None)
     266
    225267
    226268if __name__ == '__main__':
  • trunk/Tools/Scripts/webkitpy/layout_tests/layout_package/test_runner2.py

    r78502 r78506  
    5353        self.number = number
    5454        self.done = False
     55        self.current_test_name = None
     56        self.next_timeout = None
     57        self.wedged = False
    5558
    5659    def __repr__(self):
     
    7376
    7477    def _worker_is_done(self, worker_state):
    75         # FIXME: check if the worker is wedged.
    76         return worker_state.done
     78        t = time.time()
     79        if worker_state.done or worker_state.wedged:
     80            return True
     81
     82        next_timeout = worker_state.next_timeout
     83        WEDGE_PADDING = 40.0
     84        if next_timeout and t > next_timeout + WEDGE_PADDING:
     85            _log.error('')
     86            worker_state.worker_connection.log_wedged_worker(worker_state.current_test_name)
     87            _log.error('')
     88            worker_state.wedged = True
     89            return True
     90        return False
    7791
    7892    def name(self):
     
    132146        keyboard_interrupted = False
    133147        interrupted = False
    134         if not self._options.dry_run:
    135             while not self.is_done():
    136                 # We loop with a timeout in order to be able to detect wedged threads.
    137                 manager_connection.run_message_loop(delay_secs=1.0)
    138 
    139         # FIXME: handle exceptions, interrupts.
     148        try:
     149            if not self._options.dry_run:
     150                while not self.is_done():
     151                    # We loop with a timeout in order to be able to detect wedged threads.
     152                    manager_connection.run_message_loop(delay_secs=1.0)
     153
     154                if any(worker_state.wedged for worker_state in self._worker_states.values()):
     155                    _log.error('')
     156                    _log.error('Remaining workers are wedged, bailing out.')
     157                    _log.error('')
     158                else:
     159                    _log.debug('No wedged threads')
     160
     161                # Make sure all of the workers have shut down (if possible).
     162                for worker_state in self._worker_states.values():
     163                    if not worker_state.wedged and worker_state.worker_connection.is_alive():
     164                        worker_state.worker_connection.join(0.5)
     165                        assert not worker_state.worker_connection.is_alive()
     166
     167        except KeyboardInterrupt:
     168            _log.info("Interrupted, exiting")
     169            self._cancel_workers()
     170            keyboard_interrupted = True
     171        except test_runner.TestRunInterruptedException, e:
     172            _log.info(e.reason)
     173            self._cancel_workers()
     174            interrupted = True
     175        except:
     176            # Unexpected exception; don't try to clean up workers.
     177            _log.info("Exception raised, exiting")
     178            raise
     179
    140180
    141181        # FIXME: implement stats.
     
    147187                self._group_stats, self._all_results)
    148188
     189    def cancel_workers(self):
     190        for worker_state in self._worker_states.values():
     191            worker_state.worker_connection.cancel()
     192
    149193    def handle_started_test(self, source, test_info, hang_timeout):
    150         # FIXME: implement
    151         pass
     194        worker_state = self._worker_states[source]
     195        worker_state.current_test_name = self._port.relative_test_filename(test_info.filename)
     196        worker_state.next_timeout = time.time() + hang_timeout
    152197
    153198    def handle_done(self, source):
     
    163208
    164209    def handle_finished_test(self, source, result, elapsed_time):
     210        worker_state = self._worker_states[source]
     211        worker_state.next_timeout = None
     212        worker_state.current_test_name = None
     213
     214        if worker_state.wedged:
     215            # This shouldn't happen if we have our timeouts tuned properly.
     216            _log.error("%s unwedged", w.name)
     217
    165218        self._update_summary_with_result(self._current_result_summary, result)
    166219
  • trunk/Tools/Scripts/webkitpy/layout_tests/layout_package/worker.py

    r78302 r78506  
    4949        self._name = 'worker/%d' % worker_number
    5050        self._done = False
     51        self._canceled = False
    5152        self._port = None
    5253
     
    5455        self._port = port
    5556
     57    def cancel(self):
     58        """Attempt to abort processing (best effort)."""
     59        self._canceled = True
     60
    5661    def is_done(self):
    57         return self._done
     62        return self._done or self._canceled
    5863
    5964    def name(self):
     
    6368        self._deferred_init(port)
    6469
     70        exception_msg = ""
    6571        _log.debug("%s starting" % self._name)
    6672
    67         # FIXME: need to add in error handling, better logging.
    68         self._worker_connection.run_message_loop()
    69         self._worker_connection.post_message('done')
     73        try:
     74            self._worker_connection.run_message_loop()
     75            if not self.is_done():
     76                raise AssertionError("%s: ran out of messages in worker queue."
     77                                     % self._name)
     78        except KeyboardInterrupt:
     79            exception_msg = ", interrupted"
     80        except:
     81            exception_msg = ", exception raised"
     82        finally:
     83            _log.debug("%s done%s" % (self._name, exception_msg))
     84            if exception_msg:
     85                exc_info = sys.exc_info()
     86                stack_utils.log_traceback(_log.error, exc_info[2])
     87                # FIXME: Figure out how to send a message with a traceback.
     88                self._worker_connection.post_message('exception',
     89                    (exc_info[0], exc_info[1], None))
     90            self._worker_connection.post_message('done')
    7091
    7192    def handle_test_list(self, src, list_name, test_list):
Note: See TracChangeset for help on using the changeset viewer.