1 """A library of helper functions for the CherryPy test suite."""
2
3 import datetime
4 import logging
5 log = logging.getLogger(__name__)
6 import os
7 thisdir = os.path.abspath(os.path.dirname(__file__))
8 serverpem = os.path.join(os.getcwd(), thisdir, 'test.pem')
9
10 import re
11 import sys
12 import time
13 import warnings
14
15 import cherrypy
16 from cherrypy._cpcompat import basestring, copyitems, HTTPSConnection, ntob
17 from cherrypy.lib import httputil
18 from cherrypy.lib import gctools
19 from cherrypy.lib.reprconf import unrepr
20 from cherrypy.test import webtest
21
22
23 if sys.version_info < (2, 7):
24 import cherrypy._cpcompat_subprocess as subprocess
25 else:
26 import subprocess
27
28 import nose
29
30 _testconfig = None
31
32
34 global _testconfig
35 if _testconfig is None:
36 conf = {
37 'scheme': 'http',
38 'protocol': "HTTP/1.1",
39 'port': 54583,
40 'host': '127.0.0.1',
41 'validate': False,
42 'conquer': False,
43 'server': 'wsgi',
44 }
45 try:
46 import testconfig
47 _conf = testconfig.config.get('supervisor', None)
48 if _conf is not None:
49 for k, v in _conf.items():
50 if isinstance(v, basestring):
51 _conf[k] = unrepr(v)
52 conf.update(_conf)
53 except ImportError:
54 pass
55 _testconfig = conf
56 conf = _testconfig.copy()
57 conf.update(overconf)
58
59 return conf
60
61
63
64 """Base class for modeling and controlling servers during testing."""
65
67 for k, v in kwargs.items():
68 if k == 'port':
69 setattr(self, k, int(v))
70 setattr(self, k, v)
71
72
73 log_to_stderr = lambda msg, level: sys.stderr.write(msg + os.linesep)
74
75
77
78 """Base class for modeling/controlling servers which run in the same
79 process.
80
81 When the server side runs in a different process, start/stop can dump all
82 state between each test module easily. When the server side runs in the
83 same process as the client, however, we have to do a bit more work to
84 ensure config and mounted apps are reset between tests.
85 """
86
87 using_apache = False
88 using_wsgi = False
89
104
105
106 - def start(self, modulename=None):
116
118 """Tell the server about any apps which the setup functions mounted."""
119 pass
120
131
132
134
135 """Server supervisor for the builtin HTTP server."""
136
137 httpserver_class = "cherrypy._cpnative_server.CPHTTPServer"
138 using_apache = False
139 using_wsgi = False
140
142 return "Builtin HTTP Server on %s:%s" % (self.host, self.port)
143
144
146
147 """Server supervisor for the builtin WSGI server."""
148
149 httpserver_class = "cherrypy._cpwsgi_server.CPWSGIServer"
150 using_apache = False
151 using_wsgi = True
152
154 return "Builtin WSGI Server on %s:%s" % (self.host, self.port)
155
157 """Hook a new WSGI app into the origin server."""
158 cherrypy.server.httpserver.wsgi_app = self.get_app()
159
161 """Obtain a new (decorated) WSGI app to hook into the origin server."""
162 if app is None:
163 app = cherrypy.tree
164
165 if self.conquer:
166 try:
167 import wsgiconq
168 except ImportError:
169 warnings.warn(
170 "Error importing wsgiconq. pyconquer will not run.")
171 else:
172 app = wsgiconq.WSGILogger(app, c_calls=True)
173
174 if self.validate:
175 try:
176 from wsgiref import validate
177 except ImportError:
178 warnings.warn(
179 "Error importing wsgiref. The validator will not run.")
180 else:
181
182 app = validate.validator(app)
183
184 return app
185
186
192
193
200
201
205
206
210
211
215
216
220
221
223
224 script_name = ""
225 scheme = "http"
226
227 available_servers = {'wsgi': LocalWSGISupervisor,
228 'wsgi_u': get_wsgi_u_supervisor,
229 'native': NativeServerSupervisor,
230 'cpmodpy': get_cpmodpy_supervisor,
231 'modpygw': get_modpygw_supervisor,
232 'modwsgi': get_modwsgi_supervisor,
233 'modfcgid': get_modfcgid_supervisor,
234 'modfastcgi': get_modfastcgi_supervisor,
235 }
236 default_server = "wsgi"
237
239 v = sys.version.split()[0]
240 log.info("Python version used to run this test script: %s" % v)
241 log.info("CherryPy version: %s" % cherrypy.__version__)
242 if supervisor.scheme == "https":
243 ssl = " (ssl)"
244 else:
245 ssl = ""
246 log.info("HTTP server version: %s%s" % (supervisor.protocol, ssl))
247 log.info("PID: %s" % os.getpid())
248
249 cherrypy.server.using_apache = supervisor.using_apache
250 cherrypy.server.using_wsgi = supervisor.using_wsgi
251
252 if sys.platform[:4] == 'java':
253 cherrypy.config.update({'server.nodelay': False})
254
255 if isinstance(conf, basestring):
256 parser = cherrypy.lib.reprconf.Parser()
257 conf = parser.dict_from_file(conf).get('global', {})
258 else:
259 conf = conf or {}
260 baseconf = conf.copy()
261 baseconf.update({'server.socket_host': supervisor.host,
262 'server.socket_port': supervisor.port,
263 'server.protocol_version': supervisor.protocol,
264 'environment': "test_suite",
265 })
266 if supervisor.scheme == "https":
267
268 baseconf['server.ssl_certificate'] = serverpem
269 baseconf['server.ssl_private_key'] = serverpem
270
271
272
273
274
275
276
277 if supervisor.scheme == "https":
278 webtest.WebCase.HTTP_CONN = HTTPSConnection
279 return baseconf
280 _setup_server = classmethod(_setup_server)
281
311 setup_class = classmethod(setup_class)
312
314 ''
315 if hasattr(cls, 'setup_server'):
316 cls.supervisor.stop()
317 teardown_class = classmethod(teardown_class)
318
319 do_gc_test = False
320
325
326
327 test_gc.compat_co_firstlineno = getattr(
328 sys, 'maxint', None) or float('inf')
329
332
342
345
346 - def getPage(self, url, headers=None, method="GET", body=None,
347 protocol=None):
348 """Open the url. Return status, headers, body."""
349 if self.script_name:
350 url = httputil.urljoin(self.script_name, url)
351 return webtest.WebCase.getPage(self, url, headers, method, body,
352 protocol)
353
354 - def skip(self, msg='skipped '):
355 raise nose.SkipTest(msg)
356
357 - def assertErrorPage(self, status, message=None, pattern=''):
358 """Compare the response body with a built in error page.
359
360 The function will optionally look for the regexp pattern,
361 within the exception embedded in the error page."""
362
363
364 page = cherrypy._cperror.get_error_page(status, message=message)
365
366
367
368 def esc(text):
369 return re.escape(ntob(text))
370 epage = re.escape(page)
371 epage = epage.replace(
372 esc('<pre id="traceback"></pre>'),
373 esc('<pre id="traceback">') + ntob('(.*)') + esc('</pre>'))
374 m = re.match(epage, self.body, re.DOTALL)
375 if not m:
376 self._handlewebError(
377 'Error page does not match; expected:\n' + page)
378 return
379
380
381 if pattern is None:
382
383 if m and m.group(1):
384 self._handlewebError('Error page contains traceback')
385 else:
386 if (m is None) or (
387 not re.search(ntob(re.escape(pattern), self.encoding),
388 m.group(1))):
389 msg = 'Error page does not contain %s in traceback'
390 self._handlewebError(msg % repr(pattern))
391
392 date_tolerance = 2
393
395 """Assert abs(dt1 - dt2) is within Y seconds."""
396 if seconds is None:
397 seconds = self.date_tolerance
398
399 if dt1 > dt2:
400 diff = dt1 - dt2
401 else:
402 diff = dt2 - dt1
403 if not diff < datetime.timedelta(seconds=seconds):
404 raise AssertionError('%r and %r are not within %r seconds.' %
405 (dt1, dt2, seconds))
406
407
414
415
416
417
419
420 pid_file = os.path.join(thisdir, 'test.pid')
421 config_file = os.path.join(thisdir, 'test.conf')
422 config_template = """[global]
423 server.socket_host: '%(host)s'
424 server.socket_port: %(port)s
425 checker.on: False
426 log.screen: False
427 log.error_file: r'%(error_log)s'
428 log.access_file: r'%(access_log)s'
429 %(ssl)s
430 %(extra)s
431 """
432 error_log = os.path.join(thisdir, 'test.error.log')
433 access_log = os.path.join(thisdir, 'test.access.log')
434
435 - def __init__(self, wait=False, daemonize=False, ssl=False,
436 socket_host=None, socket_port=None):
442
444 if self.ssl:
445 serverpem = os.path.join(thisdir, 'test.pem')
446 ssl = """
447 server.ssl_certificate: r'%s'
448 server.ssl_private_key: r'%s'
449 """ % (serverpem, serverpem)
450 else:
451 ssl = ""
452
453 conf = self.config_template % {
454 'host': self.host,
455 'port': self.port,
456 'error_log': self.error_log,
457 'access_log': self.access_log,
458 'ssl': ssl,
459 'extra': extra,
460 }
461 f = open(self.config_file, 'wb')
462 f.write(ntob(conf, 'utf-8'))
463 f.close()
464
465 - def start(self, imports=None):
466 """Start cherryd in a subprocess."""
467 cherrypy._cpserver.wait_for_free_port(self.host, self.port)
468
469 args = [
470
471 '/usr/sbin/cherryd',
472 '-c', self.config_file,
473 '-p', self.pid_file,
474 ]
475
476 if not isinstance(imports, (list, tuple)):
477 imports = [imports]
478 for i in imports:
479 if i:
480 args.append('-i')
481 args.append(i)
482
483 if self.daemonize:
484 args.append('-d')
485
486 env = os.environ.copy()
487
488
489 grandparentdir = os.path.abspath(os.path.join(thisdir, '..', '..'))
490 if env.get('PYTHONPATH', ''):
491 env['PYTHONPATH'] = os.pathsep.join(
492 (grandparentdir, env['PYTHONPATH']))
493 else:
494 env['PYTHONPATH'] = grandparentdir
495 self._proc = subprocess.Popen([sys.executable] + args, env=env)
496 if self.wait:
497 self.exit_code = self._proc.wait()
498 else:
499 cherrypy._cpserver.wait_for_occupied_port(self.host, self.port)
500
501
502 if self.daemonize:
503 time.sleep(2)
504 else:
505 time.sleep(1)
506
508 if self.daemonize:
509 return int(open(self.pid_file, 'rb').read())
510 return self._proc.pid
511
513 """Wait for the process to exit."""
514 if self.daemonize:
515 return self._join_daemon()
516 self._proc.wait()
517
519 try:
520 try:
521
522 os.wait()
523 except AttributeError:
524
525 try:
526 pid = self.get_pid()
527 except IOError:
528
529 pass
530 else:
531 os.waitpid(pid, 0)
532 except OSError:
533 x = sys.exc_info()[1]
534 if x.args != (10, 'No child processes'):
535 raise
536