Changeset 78506 in webkit
- Timestamp:
- Feb 14, 2011 2:13:15 PM (13 years ago)
- Location:
- trunk/Tools
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/Tools/ChangeLog
r78502 r78506 1 2011-02-14 Dirk Pranke <dpranke@chromium.org> 2 3 Reviewed by Tony Chang. 4 5 nrwt multiprocessing: add code to handle interrupts and wedged 6 threads. 7 https://bugs.webkit.org/show_bug.cgi?id=54072 8 9 * Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py: 10 Adds the cancel(), is_alive(), join(), and log_wedged_worker() 11 methods to the WorkerConnection class 12 13 * Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker_unittest.py: 14 * Scripts/webkitpy/layout_tests/layout_package/test_runner2.py: 15 * Scripts/webkitpy/layout_tests/layout_package/worker.py: 16 Adds the cancel() method to the Worker class 17 1 18 2011-02-14 Dirk Pranke <dpranke@chromium.org> 2 19 -
trunk/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker.py
r78398 r78506 53 53 54 54 # Handle Python < 2.6 where multiprocessing isn't available. 55 #56 # _Multiprocessing_Process is needed so that _MultiProcessWorker57 # can be defined with or without multiprocessing.58 55 try: 59 56 import multiprocessing 60 _Multiprocessing_Process = multiprocessing.Process61 57 except ImportError: 62 58 multiprocessing = None 63 _Multiprocessing_Process = threading.Thread 64 65 59 60 61 from webkitpy.common.system import stack_utils 66 62 from webkitpy.layout_tests import port 67 63 from webkitpy.layout_tests.layout_package import message_broker2 … … 220 216 ANY_WORKER_TOPIC, MANAGER_TOPIC) 221 217 218 def cancel(self): 219 raise NotImplementedError 220 221 def is_alive(self): 222 raise NotImplementedError 223 224 def join(self, timeout): 225 raise NotImplementedError 226 227 def log_wedged_worker(self, test_name): 228 raise NotImplementedError 229 222 230 def yield_to_broker(self): 223 231 pass … … 227 235 def __init__(self, broker, port, manager_client, worker_class, worker_number): 228 236 _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options) 237 self._alive = False 229 238 self._port = port 230 239 self._manager_client = manager_client 231 240 241 def cancel(self): 242 self._client.cancel() 243 244 def is_alive(self): 245 return self._alive 246 247 def join(self, timeout): 248 assert not self._alive 249 250 def log_wedged_worker(self, test_name): 251 assert False, "_InlineWorkerConnection.log_wedged_worker() called" 252 232 253 def run(self): 254 self._alive = True 233 255 self._client.run(self._port) 256 self._alive = False 234 257 235 258 def yield_to_broker(self): … … 244 267 self._client = client 245 268 269 def cancel(self): 270 return self._client.cancel() 271 272 def log_wedged_worker(self, test_name): 273 stack_utils.log_thread_state(_log.error, self._client.name(), self.ident, " is wedged on test %s" % test_name) 274 246 275 def run(self): 247 276 # FIXME: We can remove this once everyone is on 2.6. … … 256 285 self._thread = _Thread(self, port, self._client) 257 286 287 def cancel(self): 288 return self._thread.cancel() 289 290 def is_alive(self): 291 # FIXME: Change this to is_alive once everyone is on 2.6. 292 return self._thread.isAlive() 293 294 def join(self, timeout): 295 return self._thread.join(timeout) 296 297 def log_wedged_worker(self, test_name): 298 return self._thread.log_wedged_worker(test_name) 299 258 300 def start(self): 259 301 self._thread.start() 260 302 261 303 262 class _Process(_Multiprocessing_Process): 263 def __init__(self, worker_connection, platform_name, options, client): 264 _Multiprocessing_Process.__init__(self) 265 self._worker_connection = worker_connection 266 self._platform_name = platform_name 267 self._options = options 268 self._client = client 269 270 def run(self): 271 logging.basicConfig() 272 port_obj = port.get(self._platform_name, self._options) 273 self._client.run(port_obj) 304 if multiprocessing: 305 306 class _Process(multiprocessing.Process): 307 def __init__(self, worker_connection, platform_name, options, client): 308 multiprocessing.Process.__init__(self) 309 self._worker_connection = worker_connection 310 self._platform_name = platform_name 311 self._options = options 312 self._client = client 313 314 def log_wedged_worker(self, test_name): 315 _log.error("%s (pid %d) is wedged on test %s" % (self.name, self.pid, test_name)) 316 317 def run(self): 318 logging.basicConfig() 319 port_obj = port.get(self._platform_name, self._options) 320 self._client.run(port_obj) 274 321 275 322 … … 279 326 self._proc = _Process(self, platform_name, options, self._client) 280 327 328 def cancel(self): 329 return self._proc.terminate() 330 331 def is_alive(self): 332 return self._proc.is_alive() 333 334 def join(self, timeout): 335 return self._proc.join(timeout) 336 337 def log_wedged_worker(self, test_name): 338 return self._proc.log_wedged_worker(test_name) 339 281 340 def start(self): 282 341 self._proc.start() -
trunk/Tools/Scripts/webkitpy/layout_tests/layout_package/manager_worker_broker_unittest.py
r78398 r78506 45 45 46 46 47 class TestWorker(manager_worker_broker.AbstractWorker): 48 def __init__(self, broker_connection, worker_number, options): 49 self._broker_connection = broker_connection 50 self._options = options 51 self._worker_number = worker_number 52 self._name = 'TestWorker/%d' % worker_number 53 self._stopped = False 54 55 def handle_stop(self, src): 56 self._stopped = True 57 58 def handle_test(self, src, an_int, a_str): 59 assert an_int == 1 60 assert a_str == "hello, world" 61 self._broker_connection.post_message('test', 2, 'hi, everybody') 62 63 def is_done(self): 64 return self._stopped 65 66 def name(self): 67 return self._name 68 69 def start(self): 70 pass 71 72 def run(self, port): 73 try: 74 self._broker_connection.run_message_loop() 75 self._broker_connection.yield_to_broker() 76 self._broker_connection.post_message('done') 77 except Exception, e: 78 self._broker_connection.post_message('exception', (type(e), str(e), None)) 47 def worker_maker(starting_queue=None, stopping_queue=None): 48 class _TestWorker(manager_worker_broker.AbstractWorker): 49 def __init__(self, broker_connection, worker_number, options): 50 self._broker_connection = broker_connection 51 self._options = options 52 self._worker_number = worker_number 53 self._name = 'TestWorker/%d' % worker_number 54 self._stopped = False 55 self._canceled = False 56 self._starting_queue = starting_queue 57 self._stopping_queue = stopping_queue 58 59 def handle_stop(self, src): 60 self._stopped = True 61 62 def handle_test(self, src, an_int, a_str): 63 assert an_int == 1 64 assert a_str == "hello, world" 65 self._broker_connection.post_message('test', 2, 'hi, everybody') 66 67 def is_done(self): 68 return self._stopped or self._canceled 69 70 def name(self): 71 return self._name 72 73 def cancel(self): 74 self._canceled = True 75 76 def run(self, port): 77 if self._starting_queue: 78 self._starting_queue.put('') 79 if self._stopping_queue: 80 self._stopping_queue.get() 81 try: 82 self._broker_connection.run_message_loop() 83 self._broker_connection.yield_to_broker() 84 self._broker_connection.post_message('done') 85 except Exception, e: 86 self._broker_connection.post_message('exception', (type(e), str(e), None)) 87 return _TestWorker 79 88 80 89 … … 86 95 87 96 88 def make_broker(manager, worker_model ):97 def make_broker(manager, worker_model, starting_queue=None, stopping_queue=None): 89 98 options = get_options(worker_model) 90 99 return manager_worker_broker.get(port.get("test"), options, manager, 91 TestWorker)100 worker_maker(starting_queue, stopping_queue)) 92 101 93 102 … … 113 122 contract all implementations must follow.""" 114 123 115 #116 # Methods to implement the Manager side of the ClientInterface117 #118 124 def name(self): 119 125 return 'Tester' … … 122 128 return self._done 123 129 124 #125 # Handlers for the messages the TestWorker may send.126 #127 130 def handle_done(self, src): 128 131 self._done = True … … 136 139 self._done = True 137 140 138 #139 # Testing helper methods140 #141 141 def setUp(self): 142 142 self._an_int = None … … 147 147 self._worker_model = None 148 148 149 def make_broker(self): 150 self._broker = make_broker(self, self._worker_model) 151 152 # 153 # Actual unit tests 154 # 149 def make_broker(self, starting_queue=None, stopping_queue=None): 150 self._broker = make_broker(self, self._worker_model, starting_queue, 151 stopping_queue) 152 153 def test_cancel(self): 154 self.make_broker() 155 worker = self._broker.start_worker(0) 156 worker.cancel() 157 self._broker.post_message('test', 1, 'hello, world') 158 worker.join(0.5) 159 self.assertFalse(worker.is_alive()) 160 155 161 def test_done(self): 156 if not self._worker_model:157 return158 162 self.make_broker() 159 163 worker = self._broker.start_worker(0) … … 161 165 self._broker.post_message('stop') 162 166 self._broker.run_message_loop() 167 worker.join(0.5) 168 self.assertFalse(worker.is_alive()) 163 169 self.assertTrue(self.is_done()) 164 170 self.assertEqual(self._an_int, 2) 165 171 self.assertEqual(self._a_str, 'hi, everybody') 166 172 173 def test_log_wedged_worker(self): 174 starting_queue = self.queue() 175 stopping_queue = self.queue() 176 self.make_broker(starting_queue, stopping_queue) 177 oc = outputcapture.OutputCapture() 178 oc.capture_output() 179 try: 180 worker = self._broker.start_worker(0) 181 starting_queue.get() 182 worker.log_wedged_worker('test_name') 183 stopping_queue.put('') 184 self._broker.post_message('stop') 185 self._broker.run_message_loop() 186 worker.join(0.5) 187 self.assertFalse(worker.is_alive()) 188 self.assertTrue(self.is_done()) 189 finally: 190 oc.restore_output() 191 167 192 def test_unknown_message(self): 168 if not self._worker_model:169 return170 193 self.make_broker() 171 194 worker = self._broker.start_worker(0) 172 195 self._broker.post_message('unknown') 173 196 self._broker.run_message_loop() 197 worker.join(0.5) 174 198 175 199 self.assertTrue(self.is_done()) 200 self.assertFalse(worker.is_alive()) 176 201 self.assertEquals(self._exception[0], ValueError) 177 202 self.assertEquals(self._exception[1], … … 184 209 self._worker_model = 'inline' 185 210 186 187 class MultiProcessBrokerTests(_TestsMixin, unittest.TestCase): 188 def setUp(self): 189 _TestsMixin.setUp(self) 190 if multiprocessing: 211 def test_log_wedged_worker(self): 212 self.make_broker() 213 worker = self._broker.start_worker(0) 214 self.assertRaises(AssertionError, worker.log_wedged_worker, None) 215 216 217 if multiprocessing: 218 219 class MultiProcessBrokerTests(_TestsMixin, unittest.TestCase): 220 def setUp(self): 221 _TestsMixin.setUp(self) 191 222 self._worker_model = 'processes' 192 else: 193 self._worker_model = None 194 195 def queue(self): 196 return multiprocessing.Queue() 223 224 def queue(self): 225 return multiprocessing.Queue() 197 226 198 227 … … 201 230 _TestsMixin.setUp(self) 202 231 self._worker_model = 'threads' 232 233 def queue(self): 234 return Queue.Queue() 203 235 204 236 … … 223 255 self.assertRaises(NotImplementedError, obj.start_worker, 0) 224 256 257 def test_workerconnection_is_abstract(self): 258 # Test that all the base class methods are abstract and have the 259 # signature we expect. 260 broker = make_broker(self, 'inline') 261 obj = manager_worker_broker._WorkerConnection(broker._broker, worker_maker(), 0, None) 262 self.assertRaises(NotImplementedError, obj.cancel) 263 self.assertRaises(NotImplementedError, obj.is_alive) 264 self.assertRaises(NotImplementedError, obj.join, None) 265 self.assertRaises(NotImplementedError, obj.log_wedged_worker, None) 266 225 267 226 268 if __name__ == '__main__': -
trunk/Tools/Scripts/webkitpy/layout_tests/layout_package/test_runner2.py
r78502 r78506 53 53 self.number = number 54 54 self.done = False 55 self.current_test_name = None 56 self.next_timeout = None 57 self.wedged = False 55 58 56 59 def __repr__(self): … … 73 76 74 77 def _worker_is_done(self, worker_state): 75 # FIXME: check if the worker is wedged. 76 return worker_state.done 78 t = time.time() 79 if worker_state.done or worker_state.wedged: 80 return True 81 82 next_timeout = worker_state.next_timeout 83 WEDGE_PADDING = 40.0 84 if next_timeout and t > next_timeout + WEDGE_PADDING: 85 _log.error('') 86 worker_state.worker_connection.log_wedged_worker(worker_state.current_test_name) 87 _log.error('') 88 worker_state.wedged = True 89 return True 90 return False 77 91 78 92 def name(self): … … 132 146 keyboard_interrupted = False 133 147 interrupted = False 134 if not self._options.dry_run: 135 while not self.is_done(): 136 # We loop with a timeout in order to be able to detect wedged threads. 137 manager_connection.run_message_loop(delay_secs=1.0) 138 139 # FIXME: handle exceptions, interrupts. 148 try: 149 if not self._options.dry_run: 150 while not self.is_done(): 151 # We loop with a timeout in order to be able to detect wedged threads. 152 manager_connection.run_message_loop(delay_secs=1.0) 153 154 if any(worker_state.wedged for worker_state in self._worker_states.values()): 155 _log.error('') 156 _log.error('Remaining workers are wedged, bailing out.') 157 _log.error('') 158 else: 159 _log.debug('No wedged threads') 160 161 # Make sure all of the workers have shut down (if possible). 162 for worker_state in self._worker_states.values(): 163 if not worker_state.wedged and worker_state.worker_connection.is_alive(): 164 worker_state.worker_connection.join(0.5) 165 assert not worker_state.worker_connection.is_alive() 166 167 except KeyboardInterrupt: 168 _log.info("Interrupted, exiting") 169 self._cancel_workers() 170 keyboard_interrupted = True 171 except test_runner.TestRunInterruptedException, e: 172 _log.info(e.reason) 173 self._cancel_workers() 174 interrupted = True 175 except: 176 # Unexpected exception; don't try to clean up workers. 177 _log.info("Exception raised, exiting") 178 raise 179 140 180 141 181 # FIXME: implement stats. … … 147 187 self._group_stats, self._all_results) 148 188 189 def cancel_workers(self): 190 for worker_state in self._worker_states.values(): 191 worker_state.worker_connection.cancel() 192 149 193 def handle_started_test(self, source, test_info, hang_timeout): 150 # FIXME: implement 151 pass 194 worker_state = self._worker_states[source] 195 worker_state.current_test_name = self._port.relative_test_filename(test_info.filename) 196 worker_state.next_timeout = time.time() + hang_timeout 152 197 153 198 def handle_done(self, source): … … 163 208 164 209 def handle_finished_test(self, source, result, elapsed_time): 210 worker_state = self._worker_states[source] 211 worker_state.next_timeout = None 212 worker_state.current_test_name = None 213 214 if worker_state.wedged: 215 # This shouldn't happen if we have our timeouts tuned properly. 216 _log.error("%s unwedged", w.name) 217 165 218 self._update_summary_with_result(self._current_result_summary, result) 166 219 -
trunk/Tools/Scripts/webkitpy/layout_tests/layout_package/worker.py
r78302 r78506 49 49 self._name = 'worker/%d' % worker_number 50 50 self._done = False 51 self._canceled = False 51 52 self._port = None 52 53 … … 54 55 self._port = port 55 56 57 def cancel(self): 58 """Attempt to abort processing (best effort).""" 59 self._canceled = True 60 56 61 def is_done(self): 57 return self._done 62 return self._done or self._canceled 58 63 59 64 def name(self): … … 63 68 self._deferred_init(port) 64 69 70 exception_msg = "" 65 71 _log.debug("%s starting" % self._name) 66 72 67 # FIXME: need to add in error handling, better logging. 68 self._worker_connection.run_message_loop() 69 self._worker_connection.post_message('done') 73 try: 74 self._worker_connection.run_message_loop() 75 if not self.is_done(): 76 raise AssertionError("%s: ran out of messages in worker queue." 77 % self._name) 78 except KeyboardInterrupt: 79 exception_msg = ", interrupted" 80 except: 81 exception_msg = ", exception raised" 82 finally: 83 _log.debug("%s done%s" % (self._name, exception_msg)) 84 if exception_msg: 85 exc_info = sys.exc_info() 86 stack_utils.log_traceback(_log.error, exc_info[2]) 87 # FIXME: Figure out how to send a message with a traceback. 88 self._worker_connection.post_message('exception', 89 (exc_info[0], exc_info[1], None)) 90 self._worker_connection.post_message('done') 70 91 71 92 def handle_test_list(self, src, list_name, test_list):
Note: See TracChangeset
for help on using the changeset viewer.