Changeset 122497 in webkit


Ignore:
Timestamp:
Jul 12, 2012 1:22:46 PM (12 years ago)
Author:
dpranke@chromium.org
Message:

nrwt: reimplement manager_worker_broker in a much simpler form
https://bugs.webkit.org/show_bug.cgi?id=90513

Reviewed by Ojan Vafai.

This is a wholesale replacement of the MessagePool() implementation
and the other classes in manager_worker_broker.py. All of the
BrokerConnection*, Broker*, etc. classes are gone, and there are now
just a MessagePool class and a _Worker class. Happiness ensues.

I'm removing manager_worker_broker_unittest.py as well; we get
nearly complete coverage from the integration tests, and will
get more coverage when test-webkitpy moves to use this as well,
so having unit tests seems like unnecessary overhead. (running
coverage numbers with test-webkitpy shows that pretty much the only
uncovered lines are lines that are only run in the child processes,
which coverage doesn't handle at the moment).

  • Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py:

(_MessagePool.init):
(_MessagePool.run):
(_MessagePool._start_workers):
(_MessagePool):
(_MessagePool.wait):
(_MessagePool._close):
(_MessagePool._handle_done):
(_MessagePool._can_pickle):
(_MessagePool._loop):
(WorkerException):
(_Message.init):
(_Message.repr):
(_Worker):
(_Worker.init):
(_Worker.terminate):
(_Worker._close):
(_Worker.run):
(_Worker.post):
(_Worker.yield_to_caller):
(_Worker._post):
(_Worker._raise):
(_Worker._set_up_logging):
(_WorkerLogHandler.init):
(_WorkerLogHandler.emit):

  • Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py: Removed.
Location:
trunk/Tools
Files:
1 deleted
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/Tools/ChangeLog

    r122486 r122497  
     12012-07-12  Dirk Pranke  <dpranke@chromium.org>
     2
     3        nrwt: reimplement manager_worker_broker in a much simpler form
     4        https://bugs.webkit.org/show_bug.cgi?id=90513
     5
     6        Reviewed by Ojan Vafai.
     7
     8        This is a wholesale replacement of the MessagePool() implementation
     9        and the other classes in manager_worker_broker.py. All of the
     10        BrokerConnection*, Broker*, etc. classes are gone, and there are now
     11        just a MessagePool class and a _Worker class. Happiness ensues.
     12 
     13        I'm removing manager_worker_broker_unittest.py as well; we get
     14        nearly complete coverage from the integration tests, and will
     15        get more coverage when test-webkitpy moves to use this as well,
     16        so having unit tests seems like unnecessary overhead. (running
     17        coverage numbers with test-webkitpy shows that pretty much the only
     18        uncovered lines are lines that are only run in the child processes,
     19        which coverage doesn't handle at the moment).
     20 
     21        * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py:
     22        (_MessagePool.__init__):
     23        (_MessagePool.run):
     24        (_MessagePool._start_workers):
     25        (_MessagePool):
     26        (_MessagePool.wait):
     27        (_MessagePool._close):
     28        (_MessagePool._handle_done):
     29        (_MessagePool._can_pickle):
     30        (_MessagePool._loop):
     31        (WorkerException):
     32        (_Message.__init__):
     33        (_Message.__repr__):
     34        (_Worker):
     35        (_Worker.__init__):
     36        (_Worker.terminate):
     37        (_Worker._close):
     38        (_Worker.run):
     39        (_Worker.post):
     40        (_Worker.yield_to_caller):
     41        (_Worker._post):
     42        (_Worker._raise):
     43        (_Worker._set_up_logging):
     44        (_WorkerLogHandler.__init__):
     45        (_WorkerLogHandler.emit):
     46        * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py: Removed.
     47
    1482012-07-12  Tony Chang  <tony@chromium.org>
    249
  • trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py

    r122394 r122497  
    3232import logging
    3333import multiprocessing
    34 import optparse
    35 import os
    3634import Queue
    3735import sys
     
    4644
    4745_log = logging.getLogger(__name__)
    48 
    49 
    50 #
    51 # Topic names for Manager <-> Worker messaging
    52 #
    53 MANAGER_TOPIC = 'managers'
    54 ANY_WORKER_TOPIC = 'workers'
    5546
    5647
     
    6657        self._num_workers = num_workers
    6758        self._worker_startup_delay_secs = worker_startup_delay_secs
    68         self._worker_states = {}
     59        self._workers = []
     60        self._workers_stopped = set()
    6961        self._host = host
    70         self.name = 'manager'
     62        self._name = 'manager'
     63        self._running_inline = (self._num_workers == 1)
     64        if self._running_inline:
     65            self._messages_to_worker = Queue.Queue()
     66            self._messages_to_manager = Queue.Queue()
     67        else:
     68            self._messages_to_worker = multiprocessing.Queue()
     69            self._messages_to_manager = multiprocessing.Queue()
    7170
    7271    def __enter__(self):
     
    7877
    7978    def run(self, shards):
    80         manager_connection = _get_broker(self._num_workers, self, self._worker_factory, self._host)
     79        """Posts a list of messages to the pool and waits for them to complete."""
     80        for message in shards:
     81            self._messages_to_worker.put(_Message(self._name, message[0], message[1:], from_user=True, logs=()))
     82
     83        for _ in xrange(self._num_workers):
     84            self._messages_to_worker.put(_Message(self._name, 'stop', message_args=(), from_user=False, logs=()))
     85
     86        self.wait()
     87
     88    def _start_workers(self):
     89        assert not self._workers
     90        self._workers_stopped = set()
     91        host = None
     92        if self._running_inline or self._can_pickle(self._host):
     93            host = self._host
     94
    8195        for worker_number in xrange(self._num_workers):
    82             worker_connection = manager_connection.start_worker(worker_number)
    83             worker_state = _WorkerState(worker_number, worker_connection)
    84             self._worker_states[worker_connection.name] = worker_state
    85             time.sleep(self._worker_startup_delay_secs)
    86 
    87         messages = list(shards)
    88         for message in messages:
    89             manager_connection.post_message(*message)
    90 
    91         for _ in xrange(self._num_workers):
    92             manager_connection.post_message('stop')
    93 
    94         self.wait(manager_connection)
    95 
    96     def wait(self, manager_connection):
    97         try:
    98             while not self.is_done():
    99                 manager_connection.run_message_loop(delay_secs=1.0)
     96            worker = _Worker(host, self._messages_to_manager, self._messages_to_worker, self._worker_factory, worker_number, self._running_inline, self if self._running_inline else None)
     97            self._workers.append(worker)
     98            worker.start()
     99            if self._worker_startup_delay_secs:
     100                time.sleep(self._worker_startup_delay_secs)
     101
     102    def wait(self):
     103        try:
     104            self._start_workers()
     105            if self._running_inline:
     106                self._workers[0].run()
     107                self._loop(block=False)
     108            else:
     109                self._loop(block=True)
    100110        finally:
    101111            self._close()
    102112
    103     def is_done(self):
    104         worker_states = self._worker_states.values()
    105         return worker_states and all(worker_state.done for worker_state in worker_states)
    106 
    107113    def _close(self):
    108         for worker_state in self._worker_states.values():
    109             if worker_state.worker_connection.is_alive():
    110                 worker_state.worker_connection.cancel()
    111                 _log.debug('Waiting for worker %d to exit' % worker_state.number)
    112                 worker_state.worker_connection.join(5.0)
    113                 if worker_state.worker_connection.is_alive():
    114                     _log.error('Worker %d did not exit in time.' % worker_state.number)
    115 
    116     def handle_done(self, source, log_messages=None):
    117         worker_state = self._worker_states[source]
    118         worker_state.done = True
    119         self._log_messages(log_messages)
    120 
    121     def handle_started_test(self, *args):
    122         self._caller.handle('started_test', *args)
    123 
    124     def handle_finished_test(self, *args):
    125         self._caller.handle('finished_test', *args)
    126 
    127     def handle_finished_test_list(self, *args):
    128         self._caller.handle('finished_test_list', *args)
    129 
    130     def handle_exception(self, *args):
    131         self._handle_worker_exception(*args)
     114        for worker in self._workers:
     115            if worker.is_alive():
     116                worker.terminate()
     117                worker.join()
     118        self._workers = []
     119        if not self._running_inline:
     120            if self._messages_to_worker:
     121                self._messages_to_worker.close()
     122                self._messages_to_worker = None
     123            if self._messages_to_manager:
     124                self._messages_to_manager.close()
     125                self._messages_to_manager = None
    132126
    133127    def _log_messages(self, messages):
    134128        for message in messages:
    135129            logging.root.handle(message)
     130
     131    def _handle_done(self, source):
     132        self._workers_stopped.add(source)
    136133
    137134    @staticmethod
     
    143140        raise WorkerException(str(exception_value))
    144141
    145 
    146 class _WorkerState(object):
    147     def __init__(self, number, worker_connection):
    148         self.worker_connection = worker_connection
    149         self.number = number
    150         self.done = False
    151 
    152     def __repr__(self):
    153         return "_WorkerState(" + str(self.__dict__) + ")"
    154 
    155 
    156 def _get_broker(max_workers, client, worker_factory, host=None):
    157     """Return a connection to a manager/worker message_broker
    158 
    159     Args:
    160         max_workers - max # of workers to run concurrently.
    161         client - BrokerClient implementation to dispatch
    162             replies to.
    163         worker_factory: factory method for creating objects that implement the Worker interface.
    164         host: optional picklable host object that can be passed to workers for testing.
    165     Returns:
    166         A handle to an object that will talk to a message broker configured
    167         for the normal manager/worker communication."""
    168     if max_workers == 1:
    169         queue_class = Queue.Queue
    170         manager_class = _InlineManager
    171     else:
    172         queue_class = multiprocessing.Queue
    173         manager_class = _MultiProcessManager
    174 
    175     broker = _Broker(queue_class)
    176     return manager_class(broker, client, worker_factory, host)
     142    def _can_pickle(self, host):
     143        try:
     144            cPickle.dumps(host)
     145            return True
     146        except TypeError:
     147            return False
     148
     149    def _loop(self, block):
     150        try:
     151            while True:
     152                if len(self._workers_stopped) == len(self._workers):
     153                    block = False
     154                message = self._messages_to_manager.get(block)
     155                self._log_messages(message.logs)
     156                if message.from_user:
     157                    self._caller.handle(message.name, message.src, *message.args)
     158                    continue
     159                method = getattr(self, '_handle_' + message.name)
     160                assert method, 'bad message %s' % repr(message)
     161                method(message.src, *message.args)
     162        except Queue.Empty:
     163            pass
    177164
    178165
     
    182169
    183170
    184 class BrokerClient(object):
    185     """Abstract base class / interface that all message broker clients must
    186     implement. In addition to the methods below, by convention clients
    187     implement routines of the signature type
    188 
    189         handle_MESSAGE_NAME(self, src, ...):
    190 
    191     where MESSAGE_NAME matches the string passed to post_message(), and
    192     src indicates the name of the sender. If the message contains values in
    193     the message body, those will be provided as optparams."""
    194 
    195     def is_done(self):
    196         """Called from inside run_message_loop() to indicate whether to exit."""
    197         raise NotImplementedError
    198 
    199     def name(self):
    200         """Return a name that identifies the client."""
    201         raise NotImplementedError
    202 
    203 
    204 class _Broker(object):
    205     """Brokers provide the basic model of a set of topics. Clients can post a
    206     message to any topic using post_message(), and can process messages on one
    207     topic at a time using run_message_loop()."""
    208 
    209     def __init__(self, queue_maker):
    210         """Args:
    211             queue_maker: a factory method that returns objects implementing a
    212                 Queue interface (put()/get()).
    213         """
    214         self._queue_maker = queue_maker
    215         self._topics = {}
    216 
    217     def __del__(self):
    218         self.cleanup()
    219 
    220     def cleanup(self):
    221         for queue in self._topics.values():
    222             if hasattr(queue, 'close'):
    223                 queue.close()
    224         self._topics = {}
    225 
    226     def add_topic(self, topic_name):
    227         if topic_name not in self._topics:
    228             self._topics[topic_name] = self._queue_maker()
    229 
    230     def _get_queue_for_topic(self, topic_name):
    231         return self._topics[topic_name]
    232 
    233     def post_message(self, client, topic_name, message_name, *message_args):
    234         """Post a message to the appropriate topic name.
    235 
    236         Messages have a name and a tuple of optional arguments. Both must be picklable."""
    237         message = _Message(client.name, topic_name, message_name, message_args)
    238         queue = self._get_queue_for_topic(topic_name)
    239         queue.put(_Message.dumps(message))
    240 
    241     def run_message_loop(self, topic_name, client, delay_secs=None):
    242         """Loop processing messages until client.is_done() or delay passes.
    243 
    244         To run indefinitely, set delay_secs to None."""
    245         assert delay_secs is None or delay_secs > 0
    246         self._run_loop(topic_name, client, block=True, delay_secs=delay_secs)
    247 
    248     def run_all_pending(self, topic_name, client):
    249         """Process messages until client.is_done() or caller would block."""
    250         self._run_loop(topic_name, client, block=False, delay_secs=None)
    251 
    252     def _run_loop(self, topic_name, client, block, delay_secs):
    253         queue = self._get_queue_for_topic(topic_name)
    254         while not client.is_done():
    255             try:
    256                 s = queue.get(block, delay_secs)
    257             except Queue.Empty:
    258                 return
    259             msg = _Message.loads(s)
    260             self._dispatch_message(msg, client)
    261 
    262     def _dispatch_message(self, message, client):
    263         if not hasattr(client, 'handle_' + message.name):
    264             raise ValueError(
    265                "%s: received message '%s' it couldn't handle" %
    266                (client.name, message.name))
    267         optargs = message.args
    268         message_handler = getattr(client, 'handle_' + message.name)
    269         message_handler(message.src, *optargs)
    270 
    271 
    272171class _Message(object):
    273     @staticmethod
    274     def loads(string_value):
    275         obj = cPickle.loads(string_value)
    276         assert(isinstance(obj, _Message))
    277         return obj
    278 
    279     def __init__(self, src, topic_name, message_name, message_args):
     172    def __init__(self, src, message_name, message_args, from_user, logs):
    280173        self.src = src
    281         self.topic_name = topic_name
    282174        self.name = message_name
    283175        self.args = message_args
    284 
    285     def dumps(self):
    286         return cPickle.dumps(self)
     176        self.from_user = from_user
     177        self.logs = logs
    287178
    288179    def __repr__(self):
    289         return ("_Message(from='%s', topic_name='%s', message_name='%s')" %
    290                 (self.src, self.topic_name, self.name))
    291 
    292 
    293 class _BrokerConnection(object):
    294     """_BrokerConnection provides a connection-oriented facade on top of a
    295     Broker, so that callers don't have to repeatedly pass the same topic
    296     names over and over."""
    297 
    298     def __init__(self, broker, client, run_topic, post_topic):
    299         """Create a _BrokerConnection on top of a _Broker. Note that the _Broker
    300         is passed in rather than created so that a single _Broker can be used
    301         by multiple _BrokerConnections."""
    302         self._broker = broker
    303         self._client = client
    304         self._post_topic = post_topic
    305         self._run_topic = run_topic
    306         broker.add_topic(run_topic)
    307         broker.add_topic(post_topic)
    308 
    309     def cleanup(self):
    310         self._broker.cleanup()
    311         self._broker = None
    312 
    313     def run_message_loop(self, delay_secs=None):
    314         self._broker.run_message_loop(self._run_topic, self._client, delay_secs)
    315 
    316     def post_message(self, message_name, *message_args):
    317         self._broker.post_message(self._client, self._post_topic,
    318                                   message_name, *message_args)
    319 
    320     def raise_exception(self, exc_info):
    321         # Since tracebacks aren't picklable, send the extracted stack instead,
    322         # but at least log the full traceback.
    323         exception_type, exception_value, exception_traceback = sys.exc_info()
     180        return '_Message(src=%s, name=%s, args=%s, from_user=%s, logs=%s)' % (self.src, self.name, self.args, self.from_user, self.logs)
     181
     182
     183class _Worker(multiprocessing.Process):
     184    def __init__(self, host, messages_to_manager, messages_to_worker, worker_factory, worker_number, running_inline, manager):
     185        super(_Worker, self).__init__()
     186        self.host = host
     187        self.worker_number = worker_number
     188        self.name = 'worker/%d' % worker_number
     189        self.log_messages = []
     190        self._running_inline = running_inline
     191        self._manager = manager
     192
     193        self._messages_to_manager = messages_to_manager
     194        self._messages_to_worker = messages_to_worker
     195        self._worker = worker_factory(self)
     196        self._logger = None
     197        self._log_handler = None
     198
     199    def terminate(self):
     200        if self._worker:
     201            self._worker.stop()
     202            self._worker = None
     203        if self.is_alive():
     204            super(_Worker, self).terminate()
     205
     206    def _close(self):
     207        if self._log_handler and self._logger:
     208            self._logger.removeHandler(self._log_handler)
     209        self._log_handler = None
     210        self._logger = None
     211
     212    def start(self):
     213        if not self._running_inline:
     214            super(_Worker, self).start()
     215
     216    def run(self):
     217        if not self.host:
     218            self.host = Host()
     219        if not self._running_inline:
     220            self._set_up_logging()
     221
     222        worker = self._worker
     223        exception_msg = ""
     224        _log.debug("%s starting" % self.name)
     225
     226        try:
     227            worker.start()
     228            while True:
     229                message = self._messages_to_worker.get()
     230                if message.from_user:
     231                    worker.handle(message.name, message.src, *message.args)
     232                    self.yield_to_caller()
     233                else:
     234                    assert message.name == 'stop', 'bad message %s' % repr(message)
     235                    break
     236
     237        except Queue.Empty:
     238            assert False, '%s: ran out of messages in worker queue.' % self.name
     239        except KeyboardInterrupt, e:
     240            exception_msg = ", interrupted"
     241            if not self._running_inline:
     242                _log.warning('worker exception')
     243                self._raise(sys.exc_info())
     244            raise
     245        except Exception, e:
     246            exception_msg = ", exception raised: %s" % str(e)
     247            if not self._running_inline:
     248                self._raise(sys.exc_info())
     249            raise
     250        finally:
     251            _log.debug("%s exiting%s" % (self.name, exception_msg))
     252            try:
     253                worker.stop()
     254            finally:
     255                self._post(name='done', args=(), from_user=False)
     256            self._close()
     257
     258    def post(self, name, *args):
     259        self._post(name, args, from_user=True)
     260
     261    def yield_to_caller(self):
     262        if self._running_inline:
     263            self._manager._loop(block=False)
     264
     265    def _post(self, name, args, from_user):
     266        log_messages = self.log_messages
     267        self.log_messages = []
     268        self._messages_to_manager.put(_Message(self.name, name, args, from_user, log_messages))
     269
     270    def _raise(self, exc_info):
     271        # Since tracebacks aren't picklable, send the extracted stack instead.
     272        exception_type, exception_value, exception_traceback = exc_info
    324273        stack_utils.log_traceback(_log.error, exception_traceback)
    325274        stack = traceback.extract_tb(exception_traceback)
    326         self._broker.post_message(self._client, self._post_topic, 'exception', exception_type, exception_value, stack)
    327 
    328 
    329 class AbstractWorker(BrokerClient):
    330     def __init__(self, worker_connection, worker_number):
    331         BrokerClient.__init__(self)
    332         self.worker = None
    333         self._worker_connection = worker_connection
    334         self.worker_number = worker_number
    335         self.name = 'worker/%d' % worker_number
    336         self._done = False
    337         self._canceled = False
    338         self._options = optparse.Values({'verbose': False})
    339         self.host = None
    340 
    341     def is_done(self):
    342         return self._done or self._canceled
    343 
    344     def stop_handling_messages(self):
    345         self._done = True
    346 
    347     def run(self, host):
    348         """Callback for the worker to start executing. Typically does any
    349         remaining initialization and then calls broker_connection.run_message_loop()."""
    350         exception_msg = ""
    351         self.host = host
    352 
    353         self.worker.start()
    354         _log.debug('%s starting' % self.name)
    355 
    356         try:
    357             self._worker_connection.run_message_loop()
    358             if not self.is_done():
    359                 raise AssertionError("%s: ran out of messages in worker queue."
    360                                      % self.name)
    361         except KeyboardInterrupt:
    362             exception_msg = ", interrupted"
    363             self._worker_connection.raise_exception(sys.exc_info())
    364         except:
    365             exception_msg = ", exception raised"
    366             self._worker_connection.raise_exception(sys.exc_info())
    367         finally:
    368             _log.debug("%s done with message loop%s" % (self.name, exception_msg))
    369             try:
    370                 self.worker.stop()
    371             finally:
    372                 # Make sure we post a done so that we can flush the log messages
    373                 # and clean up properly even if we raise an exception in worker.cleanup().
    374                 self._worker_connection.post_message('done')
    375 
    376     def handle_stop(self, source):
    377         self._done = True
    378 
    379     def handle_test_list(self, source, list_name, test_list):
    380         self.worker.handle('test_list', source, list_name, test_list)
    381 
    382     def cancel(self):
    383         """Called when possible to indicate to the worker to stop processing
    384         messages and shut down. Note that workers may be stopped without this
    385         method being called, so clients should not rely solely on this."""
    386         self._canceled = True
    387 
    388     def yield_to_caller(self):
    389         self._worker_connection.yield_to_broker()
    390 
    391     def post(self, *args):
    392         self._worker_connection.post_message(*args)
    393 
    394 
    395 class _ManagerConnection(_BrokerConnection):
    396     def __init__(self, broker, client, worker_factory, host):
    397         _BrokerConnection.__init__(self, broker, client, MANAGER_TOPIC, ANY_WORKER_TOPIC)
    398         self._worker_factory = worker_factory
    399         self._host = host
    400 
    401     def start_worker(self, worker_number):
    402         raise NotImplementedError
    403 
    404 
    405 class _InlineManager(_ManagerConnection):
    406     def __init__(self, broker, client, worker_factory, host):
    407         _ManagerConnection.__init__(self, broker, client, worker_factory, host)
    408         self._inline_worker = None
    409 
    410     def start_worker(self, worker_number):
    411         host = self._host
    412         self._inline_worker = _InlineWorkerConnection(host, self._broker, self._client, self._worker_factory, worker_number)
    413         return self._inline_worker
    414 
    415     def run_message_loop(self, delay_secs=None):
    416         # Note that delay_secs is ignored in this case since we can't easily
    417         # implement it.
    418         self._inline_worker.run()
    419         self._broker.run_all_pending(MANAGER_TOPIC, self._client)
    420 
    421 
    422 class _MultiProcessManager(_ManagerConnection):
    423     def _can_pickle_host(self):
    424         try:
    425             cPickle.dumps(self._host)
    426             return True
    427         except TypeError:
    428             return False
    429 
    430     def start_worker(self, worker_number):
    431         host = None
    432         if self._can_pickle_host():
    433             host = self._host
    434         worker_connection = _MultiProcessWorkerConnection(host, self._broker, self._worker_factory, worker_number)
    435         worker_connection.start()
    436         return worker_connection
    437 
    438 
    439 class _WorkerConnection(_BrokerConnection):
    440     def __init__(self, host, broker, worker_factory, worker_number):
    441         # FIXME: keeping track of the differences between the WorkerConnection, the AbstractWorker, and the
    442         # actual Worker (created by worker_factory) is very confusing, but this all gets better when
    443         # _WorkerConnection and AbstractWorker get merged.
    444         self._client = AbstractWorker(self, worker_number)
    445         self._worker = worker_factory(self._client)
    446         self._client.worker = self._worker
    447         self._host = host
    448         self._log_messages = []
    449         self._logger = None
    450         self._log_handler = None
    451         self.name = self._client.name
    452         _BrokerConnection.__init__(self, broker, self._client, ANY_WORKER_TOPIC, MANAGER_TOPIC)
    453 
    454     def cancel(self):
    455         raise NotImplementedError
    456 
    457     def is_alive(self):
    458         raise NotImplementedError
    459 
    460     def join(self, timeout):
    461         raise NotImplementedError
    462 
    463     # FIXME: rename to yield_to_caller().
    464     def yield_to_broker(self):
    465         pass
    466 
    467     def post_message(self, *args):
    468         # FIXME: This is a hack until we can remove the log_messages arg from the manager.
    469         if args[0] in ('finished_test', 'done'):
    470             log_messages = self._log_messages
    471             self._log_messages = []
    472             args = args + tuple([log_messages])
    473         super(_WorkerConnection, self).post_message(*args)
    474 
    475     def set_up_logging(self):
    476         self._logger = logging.root
     275        self._post(name='worker_exception', args=(exception_type, exception_value, stack), from_user=False)
     276
     277    def _set_up_logging(self):
     278        self._logger = logging.getLogger()
     279
    477280        # The unix multiprocessing implementation clones the MeteredStream log handler
    478281        # into the child process, so we need to remove it to avoid duplicate logging.
    479282        for h in self._logger.handlers:
    480283            # log handlers don't have names until python 2.7.
    481             if getattr(h, 'name', '') == metered_stream.LOG_HANDLER_NAME:
     284            # FIXME: get webkitpy.test.printer from a constant as well.
     285            if getattr(h, 'name', '') in (metered_stream.LOG_HANDLER_NAME, 'webkitpy.test.printer'):
    482286                self._logger.removeHandler(h)
    483287                break
    484         self._logger.setLevel(logging.DEBUG if self._client._options.verbose else logging.INFO)
     288
    485289        self._log_handler = _WorkerLogHandler(self)
    486290        self._logger.addHandler(self._log_handler)
    487 
    488     def clean_up_logging(self):
    489         if self._log_handler and self._logger:
    490             self._logger.removeHandler(self._log_handler)
    491         self._log_handler = None
    492         self._logger = None
    493 
    494 
    495 class _InlineWorkerConnection(_WorkerConnection):
    496     def __init__(self, host, broker, manager_client, worker_factory, worker_number):
    497         _WorkerConnection.__init__(self, host, broker, worker_factory, worker_number)
    498         self._alive = False
    499         self._manager_client = manager_client
    500 
    501     def cancel(self):
    502         self._client.cancel()
    503 
    504     def is_alive(self):
    505         return self._alive
    506 
    507     def join(self, timeout):
    508         assert not self._alive
    509 
    510     def run(self):
    511         self._alive = True
    512         try:
    513             self._client.run(self._host)
    514         finally:
    515             self._alive = False
    516 
    517     def yield_to_broker(self):
    518         self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client)
    519 
    520     def raise_exception(self, exc_info):
    521         # Since the worker is in the same process as the manager, we can
    522         # raise the exception directly, rather than having to send it through
    523         # the queue. This allows us to preserve the traceback, but we log
    524         # it anyway for consistency with the multiprocess case.
    525         exception_type, exception_value, exception_traceback = sys.exc_info()
    526         stack_utils.log_traceback(_log.error, exception_traceback)
    527         raise exception_type, exception_value, exception_traceback
    528 
    529 
    530 class _Process(multiprocessing.Process):
    531     def __init__(self, worker_connection, client):
    532         multiprocessing.Process.__init__(self)
    533         self._worker_connection = worker_connection
    534         self._client = client
    535 
    536     def run(self):
    537         if not self._worker_connection._host:
    538             self._worker_connection._host = Host()
    539         self._worker_connection.run()
    540 
    541 
    542 class _MultiProcessWorkerConnection(_WorkerConnection):
    543     def __init__(self, host, broker, worker_factory, worker_number):
    544         _WorkerConnection.__init__(self, host, broker, worker_factory, worker_number)
    545         self._proc = _Process(self, self._client)
    546 
    547     def cancel(self):
    548         return self._proc.terminate()
    549 
    550     def is_alive(self):
    551         return self._proc.is_alive()
    552 
    553     def join(self, timeout):
    554         return self._proc.join(timeout)
    555 
    556     def start(self):
    557         self._proc.start()
    558 
    559     def run(self):
    560         self.set_up_logging()
    561         try:
    562             self._client.run(self._host)
    563         finally:
    564             self.clean_up_logging()
    565291
    566292
     
    569295        logging.Handler.__init__(self)
    570296        self._worker = worker
    571         self._pid = os.getpid()
    572297
    573298    def emit(self, record):
    574         self._worker._log_messages.append(record)
     299        self._worker.log_messages.append(record)
Note: See TracChangeset for help on using the changeset viewer.