#! /usr/bin/env nix-shell #! nix-shell -i python3.7 -p python37 python37Packages.aiohttp python37Packages.requests python37Packages.click # # TODO: maybe add an option (cli) to supply the external address # # Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from collections import namedtuple from urllib.parse import urlparse from uuid import uuid4 import asyncio import binascii import contextlib import datetime import io import ipaddress import json import logging import os import shlex import ssl import sys import tempfile import uuid from aiohttp import web, StreamReader from aiohttp.web_urldispatcher import UrlDispatcher from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization from cryptography import x509 from cryptography.x509.oid import NameOID import click import requests log = logging.getLogger('kapow') loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) KAPOW_CONTROL_URL="https://localhost:8081" KAPOW_DATA_URL="http://localhost:8082" ######################################################################## # HTTPS Management # ######################################################################## def generate_ssl_cert(name, alt=None): # Generate our key key = rsa.generate_private_key( public_exponent=65537, key_size=2048, ) # Various details about who we are. For a self-signed certificate the # subject and issuer are always the same. subject = issuer = x509.Name([ x509.NameAttribute(NameOID.COMMON_NAME, name), ]) cert = x509.CertificateBuilder().subject_name( subject ).issuer_name( issuer ).public_key( key.public_key() ).serial_number( x509.random_serial_number() ).not_valid_before( datetime.datetime.utcnow() ).not_valid_after( datetime.datetime.utcnow() + datetime.timedelta(days=3650) ) if alt is not None: try: ip = ipaddress.ip_address(alt) except: cert = cert.add_extension( x509.SubjectAlternativeName([x509.DNSName(alt)]), critical=True, ) else: cert = cert.add_extension( x509.SubjectAlternativeName([x509.IPAddress(ip)]), critical=True, ) finally: cert = cert.add_extension( x509.ExtendedKeyUsage( [x509.oid.ExtendedKeyUsageOID.SERVER_AUTH], ), critical=True ) else: cert=cert.add_extension( x509.ExtendedKeyUsage( [x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH], ), critical=True ) cert = cert.sign(key, hashes.SHA256()) key_bytes = key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.TraditionalOpenSSL, encryption_algorithm=serialization.NoEncryption() ) crt_bytes = cert.public_bytes(serialization.Encoding.PEM) return (key_bytes, crt_bytes) ######################################################################## # Resource Management # ######################################################################## CONNECTIONS = {} class Connection: """ Manages the lifecycle of a Kapow! connection. Behaves like a memory for the "fields" available in HTTP connections. """ def __init__(self, request): self._stream = None self._body = io.BytesIO() self._status = 200 self._headers = dict() self._cookies = dict() self.request = request async def get(self, key): """Get the content of the field `key`.""" res = urlparse(key) def nth(n): """Return the nth element in a path.""" return res.path.split('/')[n] if res.path == 'request/method': return self.request.method.encode('utf-8') elif res.path == 'request/body': return self.request.content elif res.path == 'request/path': return self.request.path.encode('utf-8') elif res.path == 'request/host': return self.request.host.encode('utf-8') elif res.path.startswith('request/matches/'): return self.request.match_info[nth(2)].encode('utf-8') elif res.path.startswith('request/params/'): return self.request.rel_url.query[nth(2)].encode('utf-8') elif res.path.startswith('request/headers/'): return self.request.headers[nth(2)].encode('utf-8') elif res.path.startswith('request/cookies/'): return self.request.cookies[nth(2)].encode('utf-8') elif res.path == 'request/form': data = await self.request.post() files = [fieldname.encode('utf-8') for fieldname, field in data.items()] return b'\n'.join(files) elif res.path.startswith('request/form/'): return (await self.request.post())[nth(2)].encode('utf-8') elif res.path == 'request/files': data = await self.request.post() files = [fieldname.encode('utf-8') for fieldname, field in data.items() if hasattr(field, 'filename')] return b'\n'.join(files) elif res.path.startswith('request/files/'): name = nth(2) content = nth(3) # filename / content field = (await self.request.post())[name] if content == 'filename': try: return field.filename.encode('utf-8') except Exception: return b'' elif content == 'content': try: return field.file.read() except Exception: return b'' else: raise ValueError(f'Unknown content type {content!r}') else: raise ValueError('Unknown path') async def set(self, key, content): """Set the field `key` with the value in `content`.""" res = urlparse(key) def nth(n): return res.path.split('/')[n] if res.path == 'response/status': self._status = int((await content.read()).decode('utf-8')) elif res.path == 'response/body': self._body.write(await content.read()) elif res.path.startswith('response/headers/'): clean = (await content.read()).rstrip(b'\n').decode('utf-8') self._headers[nth(2)] = clean elif res.path.startswith('response/cookies/'): clean = (await content.read()).rstrip(b'\n').decode('utf-8') self._cookies[nth(2)] = clean elif res.path == 'response/stream': if self._stream is None: self._stream = web.StreamResponse(status=self._status, reason="OK", headers=self._headers) for name, value in self._cookies.items(): self._stream.set_cookie(name, value) await self._stream.prepare(self.request) chunk = await content.readany() while chunk: await self._stream.write(chunk) chunk = await content.readany() else: raise ValueError(f'Unknown path {res.path!r}') async def append(self, key, content): """Append to field `key` the value in `content`.""" raise NotImplementedError() async def build_response(self): """Return the appropriate aiohttp.web.*Response.""" if self._stream is None: response = web.Response(body=self._body.getvalue(), status=self._status, headers=self._headers) for name, value in self._cookies.items(): response.set_cookie(name, value) return response else: await self._stream.write_eof() return self._stream async def get_field(request): """Get the value of some HTTP field in the given connection.""" id = request.match_info["id"] field = request.match_info["field"] try: connection = CONNECTIONS[id] except KeyError: response = web.json_response(data=error_body("Handler ID Not Found"), status=404, reason="Not Found") else: try: content = await connection.get(field) except ValueError: return web.json_response(data=error_body("Invalid Resource Path"), status=400, reason="Bad Request") except KeyError: return web.json_response(data=error_body("Resource Item Not Found"), status=404, reason="Not Found") if isinstance(content, StreamReader): response = web.StreamResponse(status=200, reason="OK") await response.prepare(request) chunk = await content.readany() while chunk: await response.write(chunk) chunk = await content.readany() await response.write_eof() else: response = web.Response(body=content) return response async def set_field(request): """Set the value of some HTTP field in the given connection.""" id = request.match_info["id"] field = request.match_info["field"] try: connection = CONNECTIONS[id] except ValueError: return web.json_response(data=error_body("Invalid Resource Path"), status=400, reason="Bad Request") except KeyError: response = web.json_response(data=error_body("Handler ID Not Found"), status=404, reason="Not Found") else: try: await connection.set(field, request.content) except ConnectionResetError: # Raised when trying to write to an already-closed stream. request.transport.close() else: response = web.Response(body=b'') return response async def append_field(request): pass ######################################################################## # Endpoint Execution # ######################################################################## def handle_route(entrypoint, command): """ Return an aiohttp route handler that will execute entrypoint and command in order to manage a Kapow! route. """ async def _handle(request): # Register a new connection to Kapow! id = "CONN_" + str(uuid4()).replace('-', '_') connection = CONNECTIONS[id] = Connection(request) # Run entrypoint + command passing the connection id executable, *params = shlex.split(entrypoint) args = ' '.join([executable] + [shlex.quote(token) for token in params] + [shlex.quote(command)]) try: shell_task = await asyncio.create_subprocess_shell( args, env={**os.environ, "KAPOW_DATA_URL": KAPOW_DATA_URL, "KAPOW_HANDLER_ID": id }, stdin=asyncio.subprocess.DEVNULL) await shell_task.wait() except: raise else: # Respond when the command finish return await connection.build_response() finally: del CONNECTIONS[id] return _handle ######################################################################## # Route Management # ######################################################################## def error_body(reason): return {"reason": reason} def get_routes(app): async def _get_routes(request): """Return the list of registered routes.""" data = [{"index": idx, "method": r.method, "id": r.id, "url_pattern": r.path, "entrypoint": r.entrypoint, "command": r.command} for idx, r in enumerate(app["user_routes"])] return web.json_response(data) return _get_routes def get_route(app): async def _get_route(request): """Return requested registered route.""" id = request.match_info["id"] for idx, r in enumerate(app["user_routes"]): if r.id == id: return web.json_response({"index": idx, "method": r.method, "id": r.id, "url_pattern": r.path, "entrypoint": r.entrypoint, "command": r.command}) else: return web.json_response(data=error_body("Route Not Found"), status=404, reason="Not Found") return _get_route def insert_route(app): async def _insert_route(request): """Insert a new Kapow! route.""" try: content = await request.json() except ValueError: return web.json_response(data=error_body("Malformed JSON"), status=400, reason="Bad Request") try: index = int(content["index"]) assert index >= 0 method = content.get("method", "GET") entrypoint = content.get("entrypoint", "/bin/sh -c") command = content.get("command", "") route = KapowRoute(method=method, path=content["url_pattern"], id="ROUTE_" + str(uuid4()).replace('-', '_'), entrypoint=entrypoint, command=command, handler=handle_route(entrypoint, command)) app.change_routes((app["user_routes"][:index] + [route] + app["user_routes"][index:])) except (InvalidRouteError, KeyError, AssertionError, ValueError) as exc: return web.json_response(data=error_body("Invalid Route"), status=422, reason="Unprocessable Entity") else: app["user_routes"].insert(index, route) return web.json_response({"id": route.id, "method": route.method, "url_pattern": route.path, "entrypoint": route.entrypoint, "command": route.command, "index": index}, status=201) return _insert_route def append_route(app): async def _append_route(request): """Append a new Kapow! route.""" try: content = await request.json() except ValueError as exc: return web.json_response(data=error_body("Malformed JSON"), status=400, reason="Bad Request") try: method = content.get("method", "GET") entrypoint = content.get("entrypoint", "/bin/sh -c") command = content.get("command", "") route = KapowRoute(method=method, path=content["url_pattern"], id="ROUTE_" + str(uuid4()).replace('-', '_'), entrypoint=entrypoint, command=command, handler=handle_route(entrypoint, command)) app.change_routes(app["user_routes"] + [route]) except (InvalidRouteError, KeyError) as exc: return web.json_response(data=error_body("Invalid Route"), status=422, reason="Unprocessable Entity") else: app["user_routes"].append(route) return web.json_response({"id": route.id, "method": route.method, "url_pattern": route.path, "entrypoint": route.entrypoint, "command": route.command, "index": len(app["user_routes"])-1}, status=201) return _append_route def delete_route(app): async def _delete_route(request): """Delete the given Kapow! route.""" id = request.match_info["id"] routes = [r for r in app["user_routes"] if r.id != id] if len(routes) == len(app["user_routes"]): return web.json_response(data=error_body("Route Not Found"), status=404, reason="Not Found") else: app.change_routes(routes) app["user_routes"] = routes return web.Response(status=204, reason="No Content") return _delete_route ######################################################################## # aiohttp webapp # ######################################################################## async def report_result(proc): await proc.communicate() print(f"Process exited with code {proc.returncode}") async def run_init_script(app, scripts, interactive): """ Run the init script if given, then wait for the shell to finish. """ for script in scripts: try: result = await asyncio.create_subprocess_exec( script, env={**os.environ, "KAPOW_CONTROL_CLIENT_CERT": app["client_cert"], "KAPOW_CONTROL_CLIENT_KEY": app["client_key"], "KAPOW_CONTROL_SERVER_CERT": app["server_cert"], "KAPOW_CONTROL_URL": KAPOW_CONTROL_URL, }) except Exception as exc: print(exc) else: asyncio.create_task(report_result(result)) class InvalidRouteError(Exception): pass class DynamicApplication(web.Application): """ A wrapper around `aiohttp.web.Application` allowing changing routes dynamically. This is not safe as mentioned here: https://github.com/aio-libs/aiohttp/issues/3238. On the other hand this is a PoC anyway... """ def change_routes(self, routes): router = UrlDispatcher() try: for route in routes: router.add_route(route.method, route.path, route.handler, name=route.id) except Exception as exc: raise InvalidRouteError("Invalid route") from exc else: self._router = router if self._frozen: self._router.freeze() KapowRoute = namedtuple('KapowRoute', ('method', 'path', 'id', 'entrypoint', 'command', 'handler')) async def start_background_tasks(app): global loop app["debug_tasks"] = loop.create_task(run_init_script(app, app["scripts"], app["interactive"])) def reduce_addr(addr): """Drop the port part from an `addr:port` string (IPv6 aware)""" addr, *_ = addr.rsplit(':', 1) if addr.startswith('[') and addr.endswith(']'): return addr[1:-1] else: return addr async def start_kapow_server(user_bind, control_bind, data_bind, scripts, certfile=None, interactive=False, keyfile=None, control_reachable_addr="localhost:8081"): global KAPOW_CONTROL_URL KAPOW_CONTROL_URL=f"https://{control_reachable_addr}" # # USER # user_app = DynamicApplication(client_max_size=1024**3) user_app["user_routes"] = list() # [KapowRoute] user_runner = web.AppRunner(user_app) await user_runner.setup() ssl_context = None if certfile and keyfile: ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_context.load_cert_chain(certfile, keyfile) user_ip, user_port = user_bind.rsplit(':', 2) user_site = web.TCPSite(user_runner, user_ip, int(user_port), ssl_context=ssl_context) await user_site.start() # # CONTROL # alternate_name = reduce_addr(control_reachable_addr) srv_key_bytes, srv_crt_bytes = generate_ssl_cert("control", alternate_name) cli_key_bytes, cli_crt_bytes = generate_ssl_cert("control") context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) with tempfile.NamedTemporaryFile(suffix=".pem", delete=True) as pem_file, \ tempfile.NamedTemporaryFile(suffix=".key", delete=True) as key_file, \ tempfile.NamedTemporaryFile(suffix=".pem", delete=True) as cli_crt_file: pem_file.write(srv_crt_bytes) pem_file.flush() key_file.write(srv_key_bytes) key_file.flush() cli_crt_file.write(cli_crt_bytes) cli_crt_file.flush() context.verify_mode = ssl.CERT_REQUIRED context.load_cert_chain(pem_file.name, key_file.name) context.load_verify_locations(cafile=cli_crt_file.name) control_app = web.Application(client_max_size=1024**3) control_app.add_routes([ web.get('/routes', get_routes(user_app)), web.get('/routes/{id}', get_route(user_app)), web.post('/routes', append_route(user_app)), web.put('/routes', insert_route(user_app)), web.delete('/routes/{id}', delete_route(user_app)), ]) control_app["scripts"] = scripts control_app["client_cert"] = cli_crt_bytes control_app["client_key"] = cli_key_bytes control_app["server_cert"] = srv_crt_bytes control_app["interactive"] = interactive control_app.on_startup.append(start_background_tasks) control_runner = web.AppRunner(control_app) await control_runner.setup() control_ip, control_port = control_bind.rsplit(':', 2) control_site = web.TCPSite(control_runner, control_ip, int(control_port), ssl_context=context) await control_site.start() # # DATA # data_app = web.Application(client_max_size=1024**3) data_app.add_routes([ # Data API web.get('/handlers/{id}/{field:.*}', get_field), web.put('/handlers/{id}/{field:.*}', set_field), ]) data_runner = web.AppRunner(data_app) await data_runner.setup() data_ip, data_port = data_bind.rsplit(':', 2) data_site = web.TCPSite(data_runner, data_ip, int(data_port)) await data_site.start() ######################################################################## # Command Line # ######################################################################## @click.group() @click.pass_context def kapow(ctx): """Start aiohttp app.""" pass @kapow.command(help="Start a Kapow! server") @click.option("--certfile", default=None) @click.option("--keyfile", default=None) @click.option("--bind", default="0.0.0.0:8080") @click.option("--control-bind", default="0.0.0.0:8081") @click.option("--data-bind", default="0.0.0.0:8082") @click.option("--control-reachable-addr", default="localhost:8081") @click.option("-i", "--interactive", is_flag=True) @click.argument("scripts", nargs=-1) def server(certfile, keyfile, bind, interactive, scripts, control_reachable_addr, control_bind, data_bind): if bool(certfile) ^ bool(keyfile): print("For SSL both 'certfile' and 'keyfile' should be provided.") sys.exit(1) loop.run_until_complete( start_kapow_server(bind, control_bind, data_bind, scripts, certfile, interactive, keyfile, control_reachable_addr)) loop.run_forever() @kapow.group(help="Manage current server HTTP routes") def route(): pass @contextlib.contextmanager def kapow_control_certs(): with tempfile.NamedTemporaryFile(suffix='.crt', encoding='utf-8', mode='w') as srv_cert, \ tempfile.NamedTemporaryFile(suffix='.crt', encoding='utf-8', mode='w') as cli_cert, \ tempfile.NamedTemporaryFile(suffix='.key', encoding='utf-8', mode='w') as cli_key: srv_cert.write(os.environ["KAPOW_CONTROL_SERVER_CERT"]) srv_cert.file.flush() cli_cert.write(os.environ["KAPOW_CONTROL_CLIENT_CERT"]) cli_cert.file.flush() cli_key.write(os.environ["KAPOW_CONTROL_CLIENT_KEY"]) cli_key.file.flush() session=requests.Session() session.verify=srv_cert.name session.cert=(cli_cert.name, cli_key.name) yield session @route.command("add") @click.option("-c", "--command", nargs=1) @click.option("-e", "--entrypoint", default="/bin/sh -c") @click.option("-X", "--method", default="GET") @click.option("--url", envvar='KAPOW_CONTROL_URL', default=KAPOW_CONTROL_URL) @click.argument("url_pattern", nargs=1) @click.argument("command_file", required=False) def route_add(url_pattern, entrypoint, command, method, url, command_file): with kapow_control_certs() as requests: if command: # Command is given inline source = command elif command_file is None: # No command source = "" elif command_file == '-': # Read commands from stdin source = sys.stdin.read() else: # Read commands from a file with open(command_file, 'r', encoding='utf-8') as handler: source = handler.read() response = requests.post(f"{url}/routes", json={"method": method, "url_pattern": url_pattern, "entrypoint": entrypoint, "command": source}) response.raise_for_status() print(json.dumps(response.json(), indent=2)) @route.command("remove") @click.option("--url", envvar='KAPOW_CONTROL_URL', default=KAPOW_CONTROL_URL) @click.argument("route-id") def route_remove(route_id, url): with kapow_control_certs() as requests: response = requests.delete(f"{url}/routes/{route_id}") response.raise_for_status() @route.command("list") @click.option("--url", envvar='KAPOW_CONTROL_URL', default=KAPOW_CONTROL_URL) @click.argument("route-id", nargs=1, required=False, default=None) def route_list(route_id, url): with kapow_control_certs() as requests: if route_id is None: response = requests.get(f"{url}/routes") else: response = requests.get(f"{url}/routes/{route_id}") response.raise_for_status() print(json.dumps(response.json(), indent=2)) @kapow.command("set", help="Set data from the current context") @click.option("--url", envvar='KAPOW_DATA_URL', default=KAPOW_DATA_URL) @click.option("--handler-id", envvar='KAPOW_HANDLER_ID') @click.argument("path", nargs=1) @click.argument("value", required=False) def kapow_set(url, handler_id, path, value): if value is None: data = sys.stdin.buffer else: data = value.encode('utf-8') try: response = requests.put(f"{url}/handlers/{handler_id}{path}", data=data) except requests.exceptions.ConnectionError: return False else: response.raise_for_status() @kapow.command("get", help="Get data from the current context") @click.option("--url", envvar='KAPOW_DATA_URL', default=KAPOW_DATA_URL) @click.option("--handler-id", envvar='KAPOW_HANDLER_ID') @click.argument("path", nargs=1) def kapow_get(url, handler_id, path): try: response = requests.get(f"{url}/handlers/{handler_id}{path}", stream=True) response.raise_for_status() except requests.exceptions.ConnectionError: return False else: for chunk in response.iter_content(chunk_size=None): sys.stdout.buffer.write(chunk) if __name__ == '__main__': kapow()