Package cherrypy :: Package test :: Module test_states
[hide private]
[frames] | no frames]

Source Code for Module cherrypy.test.test_states

  1  import os 
  2  import signal 
  3  import socket 
  4  import sys 
  5  import time 
  6  import unittest 
  7  import warnings 
  8   
  9  import cherrypy 
 10  import cherrypy.process.servers 
 11  from cherrypy._cpcompat import BadStatusLine, ntob 
 12  from cherrypy.test import helper 
 13   
 14  engine = cherrypy.engine 
 15  thisdir = os.path.join(os.getcwd(), os.path.dirname(__file__)) 
 16   
 17   
18 -class Dependency:
19
20 - def __init__(self, bus):
21 self.bus = bus 22 self.running = False 23 self.startcount = 0 24 self.gracecount = 0 25 self.threads = {}
26
27 - def subscribe(self):
28 self.bus.subscribe('start', self.start) 29 self.bus.subscribe('stop', self.stop) 30 self.bus.subscribe('graceful', self.graceful) 31 self.bus.subscribe('start_thread', self.startthread) 32 self.bus.subscribe('stop_thread', self.stopthread)
33
34 - def start(self):
35 self.running = True 36 self.startcount += 1
37
38 - def stop(self):
39 self.running = False
40
41 - def graceful(self):
42 self.gracecount += 1
43
44 - def startthread(self, thread_id):
45 self.threads[thread_id] = None
46
47 - def stopthread(self, thread_id):
48 del self.threads[thread_id]
49 50 db_connection = Dependency(engine) 51 52
53 -def setup_server():
54 class Root: 55 56 def index(self): 57 return "Hello World"
58 index.exposed = True 59 60 def ctrlc(self): 61 raise KeyboardInterrupt() 62 ctrlc.exposed = True 63 64 def graceful(self): 65 engine.graceful() 66 return "app was (gracefully) restarted succesfully" 67 graceful.exposed = True 68 69 def block_explicit(self): 70 while True: 71 if cherrypy.response.timed_out: 72 cherrypy.response.timed_out = False 73 return "broken!" 74 time.sleep(0.01) 75 block_explicit.exposed = True 76 77 def block_implicit(self): 78 time.sleep(0.5) 79 return "response.timeout = %s" % cherrypy.response.timeout 80 block_implicit.exposed = True 81 82 cherrypy.tree.mount(Root()) 83 cherrypy.config.update({ 84 'environment': 'test_suite', 85 'engine.deadlock_poll_freq': 0.1, 86 }) 87 88 db_connection.subscribe() 89 90 # ------------ Enough helpers. Time for real live test cases. ------------ # 91 92
93 -class ServerStateTests(helper.CPWebCase):
94 setup_server = staticmethod(setup_server) 95
96 - def setUp(self):
97 cherrypy.server.socket_timeout = 0.1 98 self.do_gc_test = False
99
100 - def test_0_NormalStateFlow(self):
101 engine.stop() 102 # Our db_connection should not be running 103 self.assertEqual(db_connection.running, False) 104 self.assertEqual(db_connection.startcount, 1) 105 self.assertEqual(len(db_connection.threads), 0) 106 107 # Test server start 108 engine.start() 109 self.assertEqual(engine.state, engine.states.STARTED) 110 111 host = cherrypy.server.socket_host 112 port = cherrypy.server.socket_port 113 self.assertRaises(IOError, cherrypy._cpserver.check_port, host, port) 114 115 # The db_connection should be running now 116 self.assertEqual(db_connection.running, True) 117 self.assertEqual(db_connection.startcount, 2) 118 self.assertEqual(len(db_connection.threads), 0) 119 120 self.getPage("/") 121 self.assertBody("Hello World") 122 self.assertEqual(len(db_connection.threads), 1) 123 124 # Test engine stop. This will also stop the HTTP server. 125 engine.stop() 126 self.assertEqual(engine.state, engine.states.STOPPED) 127 128 # Verify that our custom stop function was called 129 self.assertEqual(db_connection.running, False) 130 self.assertEqual(len(db_connection.threads), 0) 131 132 # Block the main thread now and verify that exit() works. 133 def exittest(): 134 self.getPage("/") 135 self.assertBody("Hello World") 136 engine.exit()
137 cherrypy.server.start() 138 engine.start_with_callback(exittest) 139 engine.block() 140 self.assertEqual(engine.state, engine.states.EXITING)
141
142 - def test_1_Restart(self):
143 cherrypy.server.start() 144 engine.start() 145 146 # The db_connection should be running now 147 self.assertEqual(db_connection.running, True) 148 grace = db_connection.gracecount 149 150 self.getPage("/") 151 self.assertBody("Hello World") 152 self.assertEqual(len(db_connection.threads), 1) 153 154 # Test server restart from this thread 155 engine.graceful() 156 self.assertEqual(engine.state, engine.states.STARTED) 157 self.getPage("/") 158 self.assertBody("Hello World") 159 self.assertEqual(db_connection.running, True) 160 self.assertEqual(db_connection.gracecount, grace + 1) 161 self.assertEqual(len(db_connection.threads), 1) 162 163 # Test server restart from inside a page handler 164 self.getPage("/graceful") 165 self.assertEqual(engine.state, engine.states.STARTED) 166 self.assertBody("app was (gracefully) restarted succesfully") 167 self.assertEqual(db_connection.running, True) 168 self.assertEqual(db_connection.gracecount, grace + 2) 169 # Since we are requesting synchronously, is only one thread used? 170 # Note that the "/graceful" request has been flushed. 171 self.assertEqual(len(db_connection.threads), 0) 172 173 engine.stop() 174 self.assertEqual(engine.state, engine.states.STOPPED) 175 self.assertEqual(db_connection.running, False) 176 self.assertEqual(len(db_connection.threads), 0)
177
178 - def test_2_KeyboardInterrupt(self):
179 # Raise a keyboard interrupt in the HTTP server's main thread. 180 # We must start the server in this, the main thread 181 engine.start() 182 cherrypy.server.start() 183 184 self.persistent = True 185 try: 186 # Make the first request and assert there's no "Connection: close". 187 self.getPage("/") 188 self.assertStatus('200 OK') 189 self.assertBody("Hello World") 190 self.assertNoHeader("Connection") 191 192 cherrypy.server.httpserver.interrupt = KeyboardInterrupt 193 engine.block() 194 195 self.assertEqual(db_connection.running, False) 196 self.assertEqual(len(db_connection.threads), 0) 197 self.assertEqual(engine.state, engine.states.EXITING) 198 finally: 199 self.persistent = False 200 201 # Raise a keyboard interrupt in a page handler; on multithreaded 202 # servers, this should occur in one of the worker threads. 203 # This should raise a BadStatusLine error, since the worker 204 # thread will just die without writing a response. 205 engine.start() 206 cherrypy.server.start() 207 208 try: 209 self.getPage("/ctrlc") 210 except BadStatusLine: 211 pass 212 else: 213 print(self.body) 214 self.fail("AssertionError: BadStatusLine not raised") 215 216 engine.block() 217 self.assertEqual(db_connection.running, False) 218 self.assertEqual(len(db_connection.threads), 0)
219
220 - def test_3_Deadlocks(self):
221 cherrypy.config.update({'response.timeout': 0.2}) 222 223 engine.start() 224 cherrypy.server.start() 225 try: 226 self.assertNotEqual(engine.timeout_monitor.thread, None) 227 228 # Request a "normal" page. 229 self.assertEqual(engine.timeout_monitor.servings, []) 230 self.getPage("/") 231 self.assertBody("Hello World") 232 # request.close is called async. 233 while engine.timeout_monitor.servings: 234 sys.stdout.write(".") 235 time.sleep(0.01) 236 237 # Request a page that explicitly checks itself for deadlock. 238 # The deadlock_timeout should be 2 secs. 239 self.getPage("/block_explicit") 240 self.assertBody("broken!") 241 242 # Request a page that implicitly breaks deadlock. 243 # If we deadlock, we want to touch as little code as possible, 244 # so we won't even call handle_error, just bail ASAP. 245 self.getPage("/block_implicit") 246 self.assertStatus(500) 247 self.assertInBody("raise cherrypy.TimeoutError()") 248 finally: 249 engine.exit()
250
251 - def test_4_Autoreload(self):
252 # If test_3 has not been executed, the server won't be stopped, 253 # so we'll have to do it. 254 if engine.state != engine.states.EXITING: 255 engine.exit() 256 257 # Start the demo script in a new process 258 p = helper.CPProcess(ssl=(self.scheme.lower() == 'https')) 259 p.write_conf(extra='test_case_name: "test_4_Autoreload"') 260 p.start(imports='cherrypy.test._test_states_demo') 261 try: 262 self.getPage("/start") 263 start = float(self.body) 264 265 # Give the autoreloader time to cache the file time. 266 time.sleep(2) 267 268 # Touch the file 269 os.utime(os.path.join(thisdir, "_test_states_demo.py"), None) 270 271 # Give the autoreloader time to re-exec the process 272 time.sleep(2) 273 host = cherrypy.server.socket_host 274 port = cherrypy.server.socket_port 275 cherrypy._cpserver.wait_for_occupied_port(host, port) 276 277 self.getPage("/start") 278 if not (float(self.body) > start): 279 raise AssertionError("start time %s not greater than %s" % 280 (float(self.body), start)) 281 finally: 282 # Shut down the spawned process 283 self.getPage("/exit") 284 p.join()
285
286 - def test_5_Start_Error(self):
287 # If test_3 has not been executed, the server won't be stopped, 288 # so we'll have to do it. 289 if engine.state != engine.states.EXITING: 290 engine.exit() 291 292 # If a process errors during start, it should stop the engine 293 # and exit with a non-zero exit code. 294 p = helper.CPProcess(ssl=(self.scheme.lower() == 'https'), 295 wait=True) 296 p.write_conf( 297 extra="""starterror: True 298 test_case_name: "test_5_Start_Error" 299 """ 300 ) 301 p.start(imports='cherrypy.test._test_states_demo') 302 if p.exit_code == 0: 303 self.fail("Process failed to return nonzero exit code.")
304 305
306 -class PluginTests(helper.CPWebCase):
307
308 - def test_daemonize(self):
309 if os.name not in ['posix']: 310 return self.skip("skipped (not on posix) ") 311 self.HOST = '127.0.0.1' 312 self.PORT = 8081 313 # Spawn the process and wait, when this returns, the original process 314 # is finished. If it daemonized properly, we should still be able 315 # to access pages. 316 p = helper.CPProcess(ssl=(self.scheme.lower() == 'https'), 317 wait=True, daemonize=True, 318 socket_host='127.0.0.1', 319 socket_port=8081) 320 p.write_conf( 321 extra='test_case_name: "test_daemonize"') 322 p.start(imports='cherrypy.test._test_states_demo') 323 try: 324 # Just get the pid of the daemonization process. 325 self.getPage("/pid") 326 self.assertStatus(200) 327 page_pid = int(self.body) 328 self.assertEqual(page_pid, p.get_pid()) 329 finally: 330 # Shut down the spawned process 331 self.getPage("/exit") 332 p.join() 333 334 # Wait until here to test the exit code because we want to ensure 335 # that we wait for the daemon to finish running before we fail. 336 if p.exit_code != 0: 337 self.fail("Daemonized parent process failed to exit cleanly.")
338 339
340 -class SignalHandlingTests(helper.CPWebCase):
341
342 - def test_SIGHUP_tty(self):
343 # When not daemonized, SIGHUP should shut down the server. 344 try: 345 from signal import SIGHUP 346 except ImportError: 347 return self.skip("skipped (no SIGHUP) ") 348 349 # Spawn the process. 350 p = helper.CPProcess(ssl=(self.scheme.lower() == 'https')) 351 p.write_conf( 352 extra='test_case_name: "test_SIGHUP_tty"') 353 p.start(imports='cherrypy.test._test_states_demo') 354 # Send a SIGHUP 355 os.kill(p.get_pid(), SIGHUP) 356 # This might hang if things aren't working right, but meh. 357 p.join()
358
359 - def test_SIGHUP_daemonized(self):
360 # When daemonized, SIGHUP should restart the server. 361 try: 362 from signal import SIGHUP 363 except ImportError: 364 return self.skip("skipped (no SIGHUP) ") 365 366 if os.name not in ['posix']: 367 return self.skip("skipped (not on posix) ") 368 369 # Spawn the process and wait, when this returns, the original process 370 # is finished. If it daemonized properly, we should still be able 371 # to access pages. 372 p = helper.CPProcess(ssl=(self.scheme.lower() == 'https'), 373 wait=True, daemonize=True) 374 p.write_conf( 375 extra='test_case_name: "test_SIGHUP_daemonized"') 376 p.start(imports='cherrypy.test._test_states_demo') 377 378 pid = p.get_pid() 379 try: 380 # Send a SIGHUP 381 os.kill(pid, SIGHUP) 382 # Give the server some time to restart 383 time.sleep(2) 384 self.getPage("/pid") 385 self.assertStatus(200) 386 new_pid = int(self.body) 387 self.assertNotEqual(new_pid, pid) 388 finally: 389 # Shut down the spawned process 390 self.getPage("/exit") 391 p.join()
392
393 - def _require_signal_and_kill(self, signal_name):
394 if not hasattr(signal, signal_name): 395 self.skip("skipped (no %(signal_name)s)" % vars()) 396 397 if not hasattr(os, 'kill'): 398 self.skip("skipped (no os.kill)")
399
400 - def test_SIGTERM(self):
401 "SIGTERM should shut down the server whether daemonized or not." 402 self._require_signal_and_kill('SIGTERM') 403 404 # Spawn a normal, undaemonized process. 405 p = helper.CPProcess(ssl=(self.scheme.lower() == 'https')) 406 p.write_conf( 407 extra='test_case_name: "test_SIGTERM"') 408 p.start(imports='cherrypy.test._test_states_demo') 409 # Send a SIGTERM 410 os.kill(p.get_pid(), signal.SIGTERM) 411 # This might hang if things aren't working right, but meh. 412 p.join() 413 414 if os.name in ['posix']: 415 # Spawn a daemonized process and test again. 416 p = helper.CPProcess(ssl=(self.scheme.lower() == 'https'), 417 wait=True, daemonize=True) 418 p.write_conf( 419 extra='test_case_name: "test_SIGTERM_2"') 420 p.start(imports='cherrypy.test._test_states_demo') 421 # Send a SIGTERM 422 os.kill(p.get_pid(), signal.SIGTERM) 423 # This might hang if things aren't working right, but meh. 424 p.join()
425
427 self._require_signal_and_kill('SIGTERM') 428 429 # Although Windows has `os.kill` and SIGTERM is defined, the 430 # platform does not implement signals and sending SIGTERM 431 # will result in a forced termination of the process. 432 # Therefore, this test is not suitable for Windows. 433 if os.name == 'nt': 434 self.skip("SIGTERM not available") 435 436 # Spawn a normal, undaemonized process. 437 p = helper.CPProcess(ssl=(self.scheme.lower() == 'https')) 438 p.write_conf( 439 extra="""unsubsig: True 440 test_case_name: "test_signal_handler_unsubscribe" 441 """) 442 p.start(imports='cherrypy.test._test_states_demo') 443 # Ask the process to quit 444 os.kill(p.get_pid(), signal.SIGTERM) 445 # This might hang if things aren't working right, but meh. 446 p.join() 447 448 # Assert the old handler ran. 449 target_line = open(p.error_log, 'rb').readlines()[-10] 450 if not ntob("I am an old SIGTERM handler.") in target_line: 451 self.fail("Old SIGTERM handler did not run.\n%r" % target_line)
452 453
454 -class WaitTests(unittest.TestCase):
455
457 """ 458 Wait on INADDR_ANY should not raise IOError 459 460 In cases where the loopback interface does not exist, CherryPy cannot 461 effectively determine if a port binding to INADDR_ANY was effected. 462 In this situation, CherryPy should assume that it failed to detect 463 the binding (not that the binding failed) and only warn that it could 464 not verify it. 465 """ 466 # At such a time that CherryPy can reliably determine one or more 467 # viable IP addresses of the host, this test may be removed. 468 469 # Simulate the behavior we observe when no loopback interface is 470 # present by: finding a port that's not occupied, then wait on it. 471 472 free_port = self.find_free_port() 473 474 servers = cherrypy.process.servers 475 476 def with_shorter_timeouts(func): 477 """ 478 A context where occupied_port_timeout is much smaller to speed 479 test runs. 480 """ 481 # When we have Python 2.5, simplify using the with_statement. 482 orig_timeout = servers.occupied_port_timeout 483 servers.occupied_port_timeout = .07 484 try: 485 func() 486 finally: 487 servers.occupied_port_timeout = orig_timeout
488 489 def do_waiting(): 490 # Wait on the free port that's unbound 491 with warnings.catch_warnings(record=True) as w: 492 servers.wait_for_occupied_port('0.0.0.0', free_port) 493 self.assertEqual(len(w), 1) 494 self.assertTrue(isinstance(w[0], warnings.WarningMessage)) 495 self.assertTrue( 496 'Unable to verify that the server is bound on ' in str(w[0])) 497 498 # The wait should still raise an IO error if INADDR_ANY was 499 # not supplied. 500 self.assertRaises(IOError, servers.wait_for_occupied_port, 501 '127.0.0.1', free_port)
502 503 with_shorter_timeouts(do_waiting) 504
505 - def find_free_port(self):
506 "Find a free port by binding to port 0 then unbinding." 507 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 508 sock.bind(('', 0)) 509 free_port = sock.getsockname()[1] 510 sock.close() 511 return free_port
512