Changeset 122497 in webkit
- Timestamp:
- Jul 12, 2012 1:22:46 PM (12 years ago)
- Location:
- trunk/Tools
- Files:
-
- 1 deleted
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Tools/ChangeLog
r122486 r122497 1 2012-07-12 Dirk Pranke <dpranke@chromium.org> 2 3 nrwt: reimplement manager_worker_broker in a much simpler form 4 https://bugs.webkit.org/show_bug.cgi?id=90513 5 6 Reviewed by Ojan Vafai. 7 8 This is a wholesale replacement of the MessagePool() implementation 9 and the other classes in manager_worker_broker.py. All of the 10 BrokerConnection*, Broker*, etc. classes are gone, and there are now 11 just a MessagePool class and a _Worker class. Happiness ensues. 12 13 I'm removing manager_worker_broker_unittest.py as well; we get 14 nearly complete coverage from the integration tests, and will 15 get more coverage when test-webkitpy moves to use this as well, 16 so having unit tests seems like unnecessary overhead. (running 17 coverage numbers with test-webkitpy shows that pretty much the only 18 uncovered lines are lines that are only run in the child processes, 19 which coverage doesn't handle at the moment). 20 21 * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py: 22 (_MessagePool.__init__): 23 (_MessagePool.run): 24 (_MessagePool._start_workers): 25 (_MessagePool): 26 (_MessagePool.wait): 27 (_MessagePool._close): 28 (_MessagePool._handle_done): 29 (_MessagePool._can_pickle): 30 (_MessagePool._loop): 31 (WorkerException): 32 (_Message.__init__): 33 (_Message.__repr__): 34 (_Worker): 35 (_Worker.__init__): 36 (_Worker.terminate): 37 (_Worker._close): 38 (_Worker.run): 39 (_Worker.post): 40 (_Worker.yield_to_caller): 41 (_Worker._post): 42 (_Worker._raise): 43 (_Worker._set_up_logging): 44 (_WorkerLogHandler.__init__): 45 (_WorkerLogHandler.emit): 46 * Scripts/webkitpy/layout_tests/controllers/manager_worker_broker_unittest.py: Removed. 47 1 48 2012-07-12 Tony Chang <tony@chromium.org> 2 49 -
trunk/Tools/Scripts/webkitpy/layout_tests/controllers/manager_worker_broker.py
r122394 r122497 32 32 import logging 33 33 import multiprocessing 34 import optparse35 import os36 34 import Queue 37 35 import sys … … 46 44 47 45 _log = logging.getLogger(__name__) 48 49 50 #51 # Topic names for Manager <-> Worker messaging52 #53 MANAGER_TOPIC = 'managers'54 ANY_WORKER_TOPIC = 'workers'55 46 56 47 … … 66 57 self._num_workers = num_workers 67 58 self._worker_startup_delay_secs = worker_startup_delay_secs 68 self._worker_states = {} 59 self._workers = [] 60 self._workers_stopped = set() 69 61 self._host = host 70 self.name = 'manager' 62 self._name = 'manager' 63 self._running_inline = (self._num_workers == 1) 64 if self._running_inline: 65 self._messages_to_worker = Queue.Queue() 66 self._messages_to_manager = Queue.Queue() 67 else: 68 self._messages_to_worker = multiprocessing.Queue() 69 self._messages_to_manager = multiprocessing.Queue() 71 70 72 71 def __enter__(self): … … 78 77 79 78 def run(self, shards): 80 manager_connection = _get_broker(self._num_workers, self, self._worker_factory, self._host) 79 """Posts a list of messages to the pool and waits for them to complete.""" 80 for message in shards: 81 self._messages_to_worker.put(_Message(self._name, message[0], message[1:], from_user=True, logs=())) 82 83 for _ in xrange(self._num_workers): 84 self._messages_to_worker.put(_Message(self._name, 'stop', message_args=(), from_user=False, logs=())) 85 86 self.wait() 87 88 def _start_workers(self): 89 assert not self._workers 90 self._workers_stopped = set() 91 host = None 92 if self._running_inline or self._can_pickle(self._host): 93 host = self._host 94 81 95 for worker_number in xrange(self._num_workers): 82 worker_connection = manager_connection.start_worker(worker_number) 83 worker_state = _WorkerState(worker_number, worker_connection) 84 self._worker_states[worker_connection.name] = worker_state 85 time.sleep(self._worker_startup_delay_secs) 86 87 messages = list(shards) 88 for message in messages: 89 manager_connection.post_message(*message) 90 91 for _ in xrange(self._num_workers): 92 manager_connection.post_message('stop') 93 94 self.wait(manager_connection) 95 96 def wait(self, manager_connection): 97 try: 98 while not self.is_done(): 99 manager_connection.run_message_loop(delay_secs=1.0) 96 worker = _Worker(host, self._messages_to_manager, self._messages_to_worker, self._worker_factory, worker_number, self._running_inline, self if self._running_inline else None) 97 self._workers.append(worker) 98 worker.start() 99 if self._worker_startup_delay_secs: 100 time.sleep(self._worker_startup_delay_secs) 101 102 def wait(self): 103 try: 104 self._start_workers() 105 if self._running_inline: 106 self._workers[0].run() 107 self._loop(block=False) 108 else: 109 self._loop(block=True) 100 110 finally: 101 111 self._close() 102 112 103 def is_done(self):104 worker_states = self._worker_states.values()105 return worker_states and all(worker_state.done for worker_state in worker_states)106 107 113 def _close(self): 108 for worker_state in self._worker_states.values(): 109 if worker_state.worker_connection.is_alive(): 110 worker_state.worker_connection.cancel() 111 _log.debug('Waiting for worker %d to exit' % worker_state.number) 112 worker_state.worker_connection.join(5.0) 113 if worker_state.worker_connection.is_alive(): 114 _log.error('Worker %d did not exit in time.' % worker_state.number) 115 116 def handle_done(self, source, log_messages=None): 117 worker_state = self._worker_states[source] 118 worker_state.done = True 119 self._log_messages(log_messages) 120 121 def handle_started_test(self, *args): 122 self._caller.handle('started_test', *args) 123 124 def handle_finished_test(self, *args): 125 self._caller.handle('finished_test', *args) 126 127 def handle_finished_test_list(self, *args): 128 self._caller.handle('finished_test_list', *args) 129 130 def handle_exception(self, *args): 131 self._handle_worker_exception(*args) 114 for worker in self._workers: 115 if worker.is_alive(): 116 worker.terminate() 117 worker.join() 118 self._workers = [] 119 if not self._running_inline: 120 if self._messages_to_worker: 121 self._messages_to_worker.close() 122 self._messages_to_worker = None 123 if self._messages_to_manager: 124 self._messages_to_manager.close() 125 self._messages_to_manager = None 132 126 133 127 def _log_messages(self, messages): 134 128 for message in messages: 135 129 logging.root.handle(message) 130 131 def _handle_done(self, source): 132 self._workers_stopped.add(source) 136 133 137 134 @staticmethod … … 143 140 raise WorkerException(str(exception_value)) 144 141 145 146 class _WorkerState(object): 147 def __init__(self, number, worker_connection): 148 self.worker_connection = worker_connection 149 self.number = number 150 self.done = False 151 152 def __repr__(self): 153 return "_WorkerState(" + str(self.__dict__) + ")" 154 155 156 def _get_broker(max_workers, client, worker_factory, host=None): 157 """Return a connection to a manager/worker message_broker 158 159 Args: 160 max_workers - max # of workers to run concurrently. 161 client - BrokerClient implementation to dispatch 162 replies to. 163 worker_factory: factory method for creating objects that implement the Worker interface. 164 host: optional picklable host object that can be passed to workers for testing. 165 Returns: 166 A handle to an object that will talk to a message broker configured 167 for the normal manager/worker communication.""" 168 if max_workers == 1: 169 queue_class = Queue.Queue 170 manager_class = _InlineManager 171 else: 172 queue_class = multiprocessing.Queue 173 manager_class = _MultiProcessManager 174 175 broker = _Broker(queue_class) 176 return manager_class(broker, client, worker_factory, host) 142 def _can_pickle(self, host): 143 try: 144 cPickle.dumps(host) 145 return True 146 except TypeError: 147 return False 148 149 def _loop(self, block): 150 try: 151 while True: 152 if len(self._workers_stopped) == len(self._workers): 153 block = False 154 message = self._messages_to_manager.get(block) 155 self._log_messages(message.logs) 156 if message.from_user: 157 self._caller.handle(message.name, message.src, *message.args) 158 continue 159 method = getattr(self, '_handle_' + message.name) 160 assert method, 'bad message %s' % repr(message) 161 method(message.src, *message.args) 162 except Queue.Empty: 163 pass 177 164 178 165 … … 182 169 183 170 184 class BrokerClient(object):185 """Abstract base class / interface that all message broker clients must186 implement. In addition to the methods below, by convention clients187 implement routines of the signature type188 189 handle_MESSAGE_NAME(self, src, ...):190 191 where MESSAGE_NAME matches the string passed to post_message(), and192 src indicates the name of the sender. If the message contains values in193 the message body, those will be provided as optparams."""194 195 def is_done(self):196 """Called from inside run_message_loop() to indicate whether to exit."""197 raise NotImplementedError198 199 def name(self):200 """Return a name that identifies the client."""201 raise NotImplementedError202 203 204 class _Broker(object):205 """Brokers provide the basic model of a set of topics. Clients can post a206 message to any topic using post_message(), and can process messages on one207 topic at a time using run_message_loop()."""208 209 def __init__(self, queue_maker):210 """Args:211 queue_maker: a factory method that returns objects implementing a212 Queue interface (put()/get()).213 """214 self._queue_maker = queue_maker215 self._topics = {}216 217 def __del__(self):218 self.cleanup()219 220 def cleanup(self):221 for queue in self._topics.values():222 if hasattr(queue, 'close'):223 queue.close()224 self._topics = {}225 226 def add_topic(self, topic_name):227 if topic_name not in self._topics:228 self._topics[topic_name] = self._queue_maker()229 230 def _get_queue_for_topic(self, topic_name):231 return self._topics[topic_name]232 233 def post_message(self, client, topic_name, message_name, *message_args):234 """Post a message to the appropriate topic name.235 236 Messages have a name and a tuple of optional arguments. Both must be picklable."""237 message = _Message(client.name, topic_name, message_name, message_args)238 queue = self._get_queue_for_topic(topic_name)239 queue.put(_Message.dumps(message))240 241 def run_message_loop(self, topic_name, client, delay_secs=None):242 """Loop processing messages until client.is_done() or delay passes.243 244 To run indefinitely, set delay_secs to None."""245 assert delay_secs is None or delay_secs > 0246 self._run_loop(topic_name, client, block=True, delay_secs=delay_secs)247 248 def run_all_pending(self, topic_name, client):249 """Process messages until client.is_done() or caller would block."""250 self._run_loop(topic_name, client, block=False, delay_secs=None)251 252 def _run_loop(self, topic_name, client, block, delay_secs):253 queue = self._get_queue_for_topic(topic_name)254 while not client.is_done():255 try:256 s = queue.get(block, delay_secs)257 except Queue.Empty:258 return259 msg = _Message.loads(s)260 self._dispatch_message(msg, client)261 262 def _dispatch_message(self, message, client):263 if not hasattr(client, 'handle_' + message.name):264 raise ValueError(265 "%s: received message '%s' it couldn't handle" %266 (client.name, message.name))267 optargs = message.args268 message_handler = getattr(client, 'handle_' + message.name)269 message_handler(message.src, *optargs)270 271 272 171 class _Message(object): 273 @staticmethod 274 def loads(string_value): 275 obj = cPickle.loads(string_value) 276 assert(isinstance(obj, _Message)) 277 return obj 278 279 def __init__(self, src, topic_name, message_name, message_args): 172 def __init__(self, src, message_name, message_args, from_user, logs): 280 173 self.src = src 281 self.topic_name = topic_name282 174 self.name = message_name 283 175 self.args = message_args 284 285 def dumps(self): 286 return cPickle.dumps(self) 176 self.from_user = from_user 177 self.logs = logs 287 178 288 179 def __repr__(self): 289 return ("_Message(from='%s', topic_name='%s', message_name='%s')" % 290 (self.src, self.topic_name, self.name)) 291 292 293 class _BrokerConnection(object): 294 """_BrokerConnection provides a connection-oriented facade on top of a 295 Broker, so that callers don't have to repeatedly pass the same topic 296 names over and over.""" 297 298 def __init__(self, broker, client, run_topic, post_topic): 299 """Create a _BrokerConnection on top of a _Broker. Note that the _Broker 300 is passed in rather than created so that a single _Broker can be used 301 by multiple _BrokerConnections.""" 302 self._broker = broker 303 self._client = client 304 self._post_topic = post_topic 305 self._run_topic = run_topic 306 broker.add_topic(run_topic) 307 broker.add_topic(post_topic) 308 309 def cleanup(self): 310 self._broker.cleanup() 311 self._broker = None 312 313 def run_message_loop(self, delay_secs=None): 314 self._broker.run_message_loop(self._run_topic, self._client, delay_secs) 315 316 def post_message(self, message_name, *message_args): 317 self._broker.post_message(self._client, self._post_topic, 318 message_name, *message_args) 319 320 def raise_exception(self, exc_info): 321 # Since tracebacks aren't picklable, send the extracted stack instead, 322 # but at least log the full traceback. 323 exception_type, exception_value, exception_traceback = sys.exc_info() 180 return '_Message(src=%s, name=%s, args=%s, from_user=%s, logs=%s)' % (self.src, self.name, self.args, self.from_user, self.logs) 181 182 183 class _Worker(multiprocessing.Process): 184 def __init__(self, host, messages_to_manager, messages_to_worker, worker_factory, worker_number, running_inline, manager): 185 super(_Worker, self).__init__() 186 self.host = host 187 self.worker_number = worker_number 188 self.name = 'worker/%d' % worker_number 189 self.log_messages = [] 190 self._running_inline = running_inline 191 self._manager = manager 192 193 self._messages_to_manager = messages_to_manager 194 self._messages_to_worker = messages_to_worker 195 self._worker = worker_factory(self) 196 self._logger = None 197 self._log_handler = None 198 199 def terminate(self): 200 if self._worker: 201 self._worker.stop() 202 self._worker = None 203 if self.is_alive(): 204 super(_Worker, self).terminate() 205 206 def _close(self): 207 if self._log_handler and self._logger: 208 self._logger.removeHandler(self._log_handler) 209 self._log_handler = None 210 self._logger = None 211 212 def start(self): 213 if not self._running_inline: 214 super(_Worker, self).start() 215 216 def run(self): 217 if not self.host: 218 self.host = Host() 219 if not self._running_inline: 220 self._set_up_logging() 221 222 worker = self._worker 223 exception_msg = "" 224 _log.debug("%s starting" % self.name) 225 226 try: 227 worker.start() 228 while True: 229 message = self._messages_to_worker.get() 230 if message.from_user: 231 worker.handle(message.name, message.src, *message.args) 232 self.yield_to_caller() 233 else: 234 assert message.name == 'stop', 'bad message %s' % repr(message) 235 break 236 237 except Queue.Empty: 238 assert False, '%s: ran out of messages in worker queue.' % self.name 239 except KeyboardInterrupt, e: 240 exception_msg = ", interrupted" 241 if not self._running_inline: 242 _log.warning('worker exception') 243 self._raise(sys.exc_info()) 244 raise 245 except Exception, e: 246 exception_msg = ", exception raised: %s" % str(e) 247 if not self._running_inline: 248 self._raise(sys.exc_info()) 249 raise 250 finally: 251 _log.debug("%s exiting%s" % (self.name, exception_msg)) 252 try: 253 worker.stop() 254 finally: 255 self._post(name='done', args=(), from_user=False) 256 self._close() 257 258 def post(self, name, *args): 259 self._post(name, args, from_user=True) 260 261 def yield_to_caller(self): 262 if self._running_inline: 263 self._manager._loop(block=False) 264 265 def _post(self, name, args, from_user): 266 log_messages = self.log_messages 267 self.log_messages = [] 268 self._messages_to_manager.put(_Message(self.name, name, args, from_user, log_messages)) 269 270 def _raise(self, exc_info): 271 # Since tracebacks aren't picklable, send the extracted stack instead. 272 exception_type, exception_value, exception_traceback = exc_info 324 273 stack_utils.log_traceback(_log.error, exception_traceback) 325 274 stack = traceback.extract_tb(exception_traceback) 326 self._broker.post_message(self._client, self._post_topic, 'exception', exception_type, exception_value, stack) 327 328 329 class AbstractWorker(BrokerClient): 330 def __init__(self, worker_connection, worker_number): 331 BrokerClient.__init__(self) 332 self.worker = None 333 self._worker_connection = worker_connection 334 self.worker_number = worker_number 335 self.name = 'worker/%d' % worker_number 336 self._done = False 337 self._canceled = False 338 self._options = optparse.Values({'verbose': False}) 339 self.host = None 340 341 def is_done(self): 342 return self._done or self._canceled 343 344 def stop_handling_messages(self): 345 self._done = True 346 347 def run(self, host): 348 """Callback for the worker to start executing. Typically does any 349 remaining initialization and then calls broker_connection.run_message_loop().""" 350 exception_msg = "" 351 self.host = host 352 353 self.worker.start() 354 _log.debug('%s starting' % self.name) 355 356 try: 357 self._worker_connection.run_message_loop() 358 if not self.is_done(): 359 raise AssertionError("%s: ran out of messages in worker queue." 360 % self.name) 361 except KeyboardInterrupt: 362 exception_msg = ", interrupted" 363 self._worker_connection.raise_exception(sys.exc_info()) 364 except: 365 exception_msg = ", exception raised" 366 self._worker_connection.raise_exception(sys.exc_info()) 367 finally: 368 _log.debug("%s done with message loop%s" % (self.name, exception_msg)) 369 try: 370 self.worker.stop() 371 finally: 372 # Make sure we post a done so that we can flush the log messages 373 # and clean up properly even if we raise an exception in worker.cleanup(). 374 self._worker_connection.post_message('done') 375 376 def handle_stop(self, source): 377 self._done = True 378 379 def handle_test_list(self, source, list_name, test_list): 380 self.worker.handle('test_list', source, list_name, test_list) 381 382 def cancel(self): 383 """Called when possible to indicate to the worker to stop processing 384 messages and shut down. Note that workers may be stopped without this 385 method being called, so clients should not rely solely on this.""" 386 self._canceled = True 387 388 def yield_to_caller(self): 389 self._worker_connection.yield_to_broker() 390 391 def post(self, *args): 392 self._worker_connection.post_message(*args) 393 394 395 class _ManagerConnection(_BrokerConnection): 396 def __init__(self, broker, client, worker_factory, host): 397 _BrokerConnection.__init__(self, broker, client, MANAGER_TOPIC, ANY_WORKER_TOPIC) 398 self._worker_factory = worker_factory 399 self._host = host 400 401 def start_worker(self, worker_number): 402 raise NotImplementedError 403 404 405 class _InlineManager(_ManagerConnection): 406 def __init__(self, broker, client, worker_factory, host): 407 _ManagerConnection.__init__(self, broker, client, worker_factory, host) 408 self._inline_worker = None 409 410 def start_worker(self, worker_number): 411 host = self._host 412 self._inline_worker = _InlineWorkerConnection(host, self._broker, self._client, self._worker_factory, worker_number) 413 return self._inline_worker 414 415 def run_message_loop(self, delay_secs=None): 416 # Note that delay_secs is ignored in this case since we can't easily 417 # implement it. 418 self._inline_worker.run() 419 self._broker.run_all_pending(MANAGER_TOPIC, self._client) 420 421 422 class _MultiProcessManager(_ManagerConnection): 423 def _can_pickle_host(self): 424 try: 425 cPickle.dumps(self._host) 426 return True 427 except TypeError: 428 return False 429 430 def start_worker(self, worker_number): 431 host = None 432 if self._can_pickle_host(): 433 host = self._host 434 worker_connection = _MultiProcessWorkerConnection(host, self._broker, self._worker_factory, worker_number) 435 worker_connection.start() 436 return worker_connection 437 438 439 class _WorkerConnection(_BrokerConnection): 440 def __init__(self, host, broker, worker_factory, worker_number): 441 # FIXME: keeping track of the differences between the WorkerConnection, the AbstractWorker, and the 442 # actual Worker (created by worker_factory) is very confusing, but this all gets better when 443 # _WorkerConnection and AbstractWorker get merged. 444 self._client = AbstractWorker(self, worker_number) 445 self._worker = worker_factory(self._client) 446 self._client.worker = self._worker 447 self._host = host 448 self._log_messages = [] 449 self._logger = None 450 self._log_handler = None 451 self.name = self._client.name 452 _BrokerConnection.__init__(self, broker, self._client, ANY_WORKER_TOPIC, MANAGER_TOPIC) 453 454 def cancel(self): 455 raise NotImplementedError 456 457 def is_alive(self): 458 raise NotImplementedError 459 460 def join(self, timeout): 461 raise NotImplementedError 462 463 # FIXME: rename to yield_to_caller(). 464 def yield_to_broker(self): 465 pass 466 467 def post_message(self, *args): 468 # FIXME: This is a hack until we can remove the log_messages arg from the manager. 469 if args[0] in ('finished_test', 'done'): 470 log_messages = self._log_messages 471 self._log_messages = [] 472 args = args + tuple([log_messages]) 473 super(_WorkerConnection, self).post_message(*args) 474 475 def set_up_logging(self): 476 self._logger = logging.root 275 self._post(name='worker_exception', args=(exception_type, exception_value, stack), from_user=False) 276 277 def _set_up_logging(self): 278 self._logger = logging.getLogger() 279 477 280 # The unix multiprocessing implementation clones the MeteredStream log handler 478 281 # into the child process, so we need to remove it to avoid duplicate logging. 479 282 for h in self._logger.handlers: 480 283 # log handlers don't have names until python 2.7. 481 if getattr(h, 'name', '') == metered_stream.LOG_HANDLER_NAME: 284 # FIXME: get webkitpy.test.printer from a constant as well. 285 if getattr(h, 'name', '') in (metered_stream.LOG_HANDLER_NAME, 'webkitpy.test.printer'): 482 286 self._logger.removeHandler(h) 483 287 break 484 self._logger.setLevel(logging.DEBUG if self._client._options.verbose else logging.INFO) 288 485 289 self._log_handler = _WorkerLogHandler(self) 486 290 self._logger.addHandler(self._log_handler) 487 488 def clean_up_logging(self):489 if self._log_handler and self._logger:490 self._logger.removeHandler(self._log_handler)491 self._log_handler = None492 self._logger = None493 494 495 class _InlineWorkerConnection(_WorkerConnection):496 def __init__(self, host, broker, manager_client, worker_factory, worker_number):497 _WorkerConnection.__init__(self, host, broker, worker_factory, worker_number)498 self._alive = False499 self._manager_client = manager_client500 501 def cancel(self):502 self._client.cancel()503 504 def is_alive(self):505 return self._alive506 507 def join(self, timeout):508 assert not self._alive509 510 def run(self):511 self._alive = True512 try:513 self._client.run(self._host)514 finally:515 self._alive = False516 517 def yield_to_broker(self):518 self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client)519 520 def raise_exception(self, exc_info):521 # Since the worker is in the same process as the manager, we can522 # raise the exception directly, rather than having to send it through523 # the queue. This allows us to preserve the traceback, but we log524 # it anyway for consistency with the multiprocess case.525 exception_type, exception_value, exception_traceback = sys.exc_info()526 stack_utils.log_traceback(_log.error, exception_traceback)527 raise exception_type, exception_value, exception_traceback528 529 530 class _Process(multiprocessing.Process):531 def __init__(self, worker_connection, client):532 multiprocessing.Process.__init__(self)533 self._worker_connection = worker_connection534 self._client = client535 536 def run(self):537 if not self._worker_connection._host:538 self._worker_connection._host = Host()539 self._worker_connection.run()540 541 542 class _MultiProcessWorkerConnection(_WorkerConnection):543 def __init__(self, host, broker, worker_factory, worker_number):544 _WorkerConnection.__init__(self, host, broker, worker_factory, worker_number)545 self._proc = _Process(self, self._client)546 547 def cancel(self):548 return self._proc.terminate()549 550 def is_alive(self):551 return self._proc.is_alive()552 553 def join(self, timeout):554 return self._proc.join(timeout)555 556 def start(self):557 self._proc.start()558 559 def run(self):560 self.set_up_logging()561 try:562 self._client.run(self._host)563 finally:564 self.clean_up_logging()565 291 566 292 … … 569 295 logging.Handler.__init__(self) 570 296 self._worker = worker 571 self._pid = os.getpid()572 297 573 298 def emit(self, record): 574 self._worker. _log_messages.append(record)299 self._worker.log_messages.append(record)
Note: See TracChangeset
for help on using the changeset viewer.