Source code for httpsrv

'''
Httpsrv is a simple HTTP server for API mocking during automated testing
'''
import json

from threading import Thread
from http.server import HTTPServer, BaseHTTPRequestHandler


[docs]class PendingRequestsLeftException(Exception): ''' Raises when server has pending reques expectations by calling the :func:`Server.assert_no_pending` method ''' pass
class _Expectation: def __init__(self, method, path, headers, text, json): self.method = method self.path = path self.headers = headers or {} self.bytes = text.encode('utf-8') if text else None self.json = json def matches(self, method, path, headers, bytes): return (self.method == method and self._match_path(path) and self._match_headers(headers) and self._match_body(bytes)) def _match_path(self, path): return self.path == path if self.path else True def _match_headers(self, headers): for name, value in self.headers.items(): if not (name in headers and value == headers[name]): return False return True def _match_body(self, bytes): if not self.json: return bytes == self.bytes if self.bytes else True try: parsed = json.loads(bytes.decode('utf8')) return self.json == parsed except ValueError: return False class _Response: def __init__(self, code=200, headers=None, bytes=None): self.code = code self.headers = headers or {} self.bytes = bytes
[docs]class Rule: ''' Expectation rule — defines expected request parameters and response values :type method: str :param method: expected request method: ``'GET'``, ``'POST'``, etc. Can take any custom string :type path: str :param path: expected path including query parameters, e.g. ``'/users?name=John%20Doe'`` if ommited any path will do :type headers: dict :param headers: dictionary of expected request headers :type text: str :param text: expected request body text :type json: dict :param json: request json to expect. If ommited any json will match, if present text param will be ignored ''' def __init__(self, method, path, headers, text, json): self._expectation = _Expectation(method, path, headers, text, json) self.response = None
[docs] def status(self, status, headers=None): ''' Respond with given status and no content :type status: int :param status: status code to return :type headers: dict :param headers: dictionary of headers to add to response :returns: itself :rtype: Rule ''' self.response = _Response(status, headers) return self
[docs] def text(self, text, status=200, headers=None): ''' Respond with given status and text content :type text: str :param text: text to return :type status: int :param status: status code to return :type headers: dict :param headers: dictionary of headers to add to response :returns: itself :rtype: Rule ''' self.response = _Response(status, headers, text.encode('utf8')) return self
[docs] def json(self, json_doc, status=200, headers=None): ''' Respond with given status and JSON content. Will also set ``'Content-Type'`` to ``'applicaion/json'`` if header is not specified explicitly :type json_doc: dict :param json_doc: dictionary to respond with converting to JSON string :type status: int :param status: status code to return :type headers: dict :param headers: dictionary of headers to add to response ''' headers = headers or {} if 'content-type' not in headers: headers['content-type'] = 'application/json' return self.text(json.dumps(json_doc), status, headers)
[docs] def matches(self, method, path, headers, bytes=None): ''' Checks if rule matches given request parameters :type method: str :param method: HTTP method, e.g. ``'GET'``, ``'POST'``, etc. Can take any custom string :type path: str :param path: request path including query parameters, e.g. ``'/users?name=John%20Doe'`` :type bytes: bytes :param bytes: request body :returns: ``True`` if this rule matches given params :rtype: bool ''' return self._expectation.matches(method, path, headers, bytes)
@property def method(self): ''' Method name this rule will respond to :returns: epected method name :rtype: str ''' return self._expectation.method
[docs]class Server: ''' Tunable HTTP server running in a parallel thread. Please note that `this server is not thread-safe` which should not cause any troubles in common use-cases due to python single-threaded nature. :type port: int :param port: port this server will listen to after :func:`Server.start` is called ''' def __init__(self, port): self._port = port self._rules = [] self._always_rules = [] self._thread = None self._server = None self._handler = None self.running = False
[docs] def always(self, method, path=None, headers=None, text=None, json=None): ''' Sends response every time matching parameters are found util :func:`Server.reset` is called :type method: str :param method: request method: ``'GET'``, ``'POST'``, etc. can be some custom string :type path: str :param path: request path including query parameters :type headers: dict :param headers: dictionary of headers to expect. If omitted any headers will do :type text: str :param text: request text to expect. If ommited any text will match :type json: dict :param json: request json to expect. If ommited any json will match, if present text param will be ignored :rtype: Rule :returns: newly created expectation rule ''' rule = Rule(method, path, headers, text, json) return self._add_rule_to(rule, self._always_rules)
# pylint: disable=invalid-name
[docs] def on(self, method, path=None, headers=None, text=None, json=None): ''' Sends response to matching parameters one time and removes it from list of expectations :type method: str :param method: request method: ``'GET'``, ``'POST'``, etc. can be some custom string :type path: str :param path: request path including query parameters :type headers: dict :param headers: dictionary of headers to expect. If omitted any headers will do :type text: str :param text: request text to expect. If ommited any text will match :type json: dict :param json: request json to expect. If ommited any json will match, if present text param will be ignored :rtype: Rule :returns: newly created expectation rule ''' rule = Rule(method, path, headers, text, json) return self._add_rule_to(rule, self._rules)
# pylint: enable=invalid-name def _add_rule_to(self, rule, rules): rules.append(rule) if rule.method not in self._handler.known_methods: self._handler.add_method(rule.method) return rule
[docs] def start(self): ''' Starts a server on the port provided in the :class:`Server` constructor in a separate thread :rtype: Server :returns: server instance for chaining ''' self._handler = _create_handler_class(self._rules, self._always_rules) self._server = HTTPServer(('', self._port), self._handler) self._thread = Thread(target=self._server.serve_forever, daemon=True) self._thread.start() self.running = True return self
[docs] def stop(self): ''' Shuts the server down and waits for server thread to join ''' self._server.shutdown() self._server.server_close() self._thread.join() self.running = False
[docs] def reset(self): ''' Clears the server expectations. Useful for resetting the server to its default state in ``teardDown()`` test method instead of time-consuming restart procedure ''' self._rules.clear() self._always_rules.clear()
[docs] def assert_no_pending(self, target_rule=None): ''' Raises a :class:`PendingRequestsLeftException` error if server has target rule non-resolved. When target_rule argument is ommitted raises if server has any pending expectations. Useful in ``tearDown()`` test method to verify that test had correct expectations :type target_rule: Rule :param target_rule: will raise if this rule is left pending :raises: :class:`PendingRequestsLeftException` ''' if target_rule: if target_rule in self._rules: raise PendingRequestsLeftException() elif self._rules: raise PendingRequestsLeftException()
def _create_handler_class(rules, always_rules): class _Handler(BaseHTTPRequestHandler): known_methods = set() @classmethod def add_method(cls, method): ''' Adds a handler function for HTTP method provided ''' if method in cls.known_methods: return func = lambda self: cls._handle(self, method) setattr(cls, 'do_' + method, func) cls.known_methods.add(method) def _read_body(self): if 'content-length' in self.headers: length = int(self.headers['content-length']) return self.rfile.read(length) if length > 0 else None return None def _respond(self, response): self.send_response(response.code) for key, value in response.headers.items(): self.send_header(key, value) self.end_headers() if response.bytes: self.wfile.write(response.bytes) def _handle(self, method): body = self._read_body() rule = self._respond_with_rules(method, body, rules) if rule: rules.remove(rule) return always_rule = self._respond_with_rules(method, body, always_rules) if always_rule: return return self.send_error( 500, 'No matching rule found for ' + self.requestline + ' body ' + str(body)) def _respond_with_rules(self, method, body, rules): matching_rules = [r for r in rules if r.matches(method, self.path, dict(self.headers), body)] if matching_rules: rule = matching_rules[0] self._respond(rule.response) return rule return None for rule in rules: _Handler.add_method(rule.method) return _Handler