#!/usr/bin/env python # # Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from urllib.parse import urlparse from uuid import uuid4 import asyncio import io import logging import os import shlex import sys from aiohttp import web, StreamReader import click import requests log = logging.getLogger('kapow') ######################################################################## # Resource Management # ######################################################################## CONNECTIONS = {} class Connection: """ Manages the lifecycle of a Kapow! connection. Behaves like a memory for the "fields" available in HTTP connections. """ def __init__(self, request): self._stream = None self._body = io.BytesIO() self._status = 200 self._headers = dict() self._cookies = dict() self.request = request async def get(self, key): """Get the content of the field `key`.""" res = urlparse(key) def nth(n): """Return the nth element in a path.""" return res.path.split('/')[n] if res.path == 'request/method': return self.request.method.encode('utf-8') elif res.path == 'request/body': return self.request.content elif res.path == 'request/path': return self.request.path.encode('utf-8') elif res.path == 'request/host': return self.request.host.encode('utf-8') elif res.path.startswith('request/matches/'): return self.request.match_info[nth(2)].encode('utf-8') elif res.path.startswith('request/params/'): return self.request.rel_url.query[nth(2)].encode('utf-8') elif res.path.startswith('request/headers/'): return self.request.headers[nth(2)].encode('utf-8') elif res.path.startswith('request/cookies/'): return self.request.cookies[nth(2)].encode('utf-8') elif res.path.startswith('request/form/'): return (await self.request.post())[nth(2)].encode('utf-8') elif res.path.startswith('request/files/'): name = nth(2) content = nth(3) # filename / content field = (await self.request.post())[name] if content == 'filename': try: return field.filename.encode('utf-8') except Exception: return b'' elif content == 'content': try: return field.file.read() except Exception: return b'' else: raise ValueError(f'Unknown content type {content!r}') else: raise ValueError('Unknown path') async def set(self, key, content): """Set the field `key` with the value in `content`.""" res = urlparse(key) def nth(n): return res.path.split('/')[n] if res.path == 'response/status': self._status = int((await content.read()).decode('utf-8')) elif res.path == 'response/body': self._body.write(await content.read()) elif res.path.startswith('response/headers/'): clean = (await content.read()).rstrip(b'\n').decode('utf-8') self._headers[nth(2)] = clean elif res.path.startswith('response/cookies/'): clean = (await content.read()).rstrip(b'\n').decode('utf-8') self._cookies[nth(2)] = clean elif res.path == 'response/stream': if self._stream is None: self._stream = web.StreamResponse(status=self._status, reason="OK", headers=self._headers) for name, value in self._cookies.items(): self._stream.set_cookie(name, value) await self._stream.prepare(self.request) chunk = await content.readany() while chunk: await self._stream.write(chunk) chunk = await content.readany() else: raise ValueError(f'Unknown path {res.path!r}') async def append(self, key, content): """Append to field `key` the value in `content`.""" raise NotImplementedError() async def build_response(self): """Return the appropriate aiohttp.web.*Response.""" if self._stream is None: response = web.Response(body=self._body.getvalue(), status=self._status, headers=self._headers) for name, value in self._cookies.items(): response.set_cookie(name, value) return response else: await self._stream.write_eof() return self._stream async def get_field(request): """Get the value of some HTTP field in the given connection.""" id = request.match_info["id"] field = request.match_info["field"] try: connection = CONNECTIONS[id] except KeyError: response = web.HTTPNotFound() else: content = await connection.get(field) if isinstance(content, StreamReader): response = web.StreamResponse(status=200, reason="OK") await response.prepare(request) chunk = await content.readany() while chunk: await response.write(chunk) chunk = await content.readany() await response.write_eof() else: response = web.Response(body=content) return response async def set_field(request): """Set the value of some HTTP field in the given connection.""" id = request.match_info["id"] field = request.match_info["field"] try: connection = CONNECTIONS[id] except KeyError: response = web.HTTPNotFound() else: try: await connection.set(field, request.content) except ConnectionResetError: # Raised when trying to write to an already-closed stream. request.transport.close() else: response = web.Response(body=b'') return response async def append_field(request): pass ######################################################################## # Endpoint Execution # ######################################################################## def handle_route(entrypoint, command): """ Return an aiohttp route handler that will execute entrypoint and command in order to manage a Kapow! route. """ async def _handle(request): # Register a new connection to Kapow! id = "CONN_" + str(uuid4()).replace('-', '_') connection = CONNECTIONS[id] = Connection(request) # Run entrypoint + command passing the connection id executable, *params = shlex.split(entrypoint) args = ' '.join([executable] + [shlex.quote(token) for token in params] + [shlex.quote(command)]) try: shell_task = await asyncio.create_subprocess_shell( args, env={**os.environ, "KAPOW_URL": "http://localhost:8080", "KAPOW_HANDLER_ID": id }, stdin=asyncio.subprocess.DEVNULL) await shell_task.wait() except: raise else: # Respond when the command finish return await connection.build_response() finally: del CONNECTIONS[id] return _handle ######################################################################## # Route Management # ######################################################################## async def get_routes(request): """Return the list of registered routes.""" return web.json_response(list(request.app.router)) async def append_route(request): """Create a new Kapow! route.""" request.app.router._frozen = False content = await request.json() name = "ROUTE_" + str(uuid4()).replace('-', '_') request.app.router.add_route(content["method"], content["url_pattern"], handle_route(content["entrypoint"], content["command"]), name=name) print(f'Route created {content["method"]} {content["url_pattern"]}') return web.json_response(name) async def delete_route(request): """Delete the given Kapow! route.""" id = request.match_info["id"] route = request.app.router._named_resources.pop(id) request.app.router._resources.remove(route) print(f'Route deleted {id}') return web.json_response(id) ######################################################################## # aiohttp webapp # ######################################################################## async def run_init_script(app, scripts): """ Run the init script if given, then wait for the shell to finish. """ if not scripts: # No script given cmd = "/bin/bash" else: filenames = " ".join(shlex.quote(f) for f in scripts) cmd = f"/bin/bash --init-file <(cat {filenames})" shell_task = await asyncio.create_subprocess_shell( cmd, executable="/bin/bash", env={**os.environ, "KAPOW_URL": "http://localhost:8080" }) await shell_task.wait() await app.cleanup() os._exit(shell_task.returncode) async def start_background_tasks(app): loop = asyncio.get_running_loop() app["debug_tasks"] = loop.create_task(run_init_script(app, app["scripts"])) def start_kapow_server(scripts): app = web.Application(client_max_size=1024**3) app.add_routes([ # Control API web.get('/routes', get_routes), web.post('/routes', append_route), # TODO: return route index # web.put('/routes', insert_route), # TODO: return route index web.delete('/routes/{id}', delete_route), # Data API web.get('/handlers/{id}/{field:.*}', get_field), web.put('/handlers/{id}/{field:.*}', set_field), ]) app["scripts"] = scripts app.on_startup.append(start_background_tasks) web.run_app(app) ######################################################################## # Command Line # ######################################################################## @click.group() @click.pass_context def kapow(ctx): """Start aiohttp app.""" pass @kapow.command() @click.argument("scripts", nargs=-1) def server(scripts): start_kapow_server(scripts) @kapow.group() def route(): pass @route.command() @click.option("-c", "--command", nargs=1) @click.option("-e", "--entrypoint", default="/bin/sh -c") @click.option("-X", "--method", default="GET") @click.option("--url", envvar='KAPOW_URL') @click.argument("url_pattern", nargs=1) @click.argument("command_file", required=False) def add(url_pattern, entrypoint, command, method, url, command_file): if command: # Command is given inline source = command elif command_file is None: # No command source = "" elif command_file == '-': # Read commands from stdin source = sys.stdin.read() else: # Read commands from a file with open(command_file, 'r', encoding='utf-8') as handler: source = handler.read() response = requests.post(f"{url}/routes", json={"method": method, "url_pattern": url_pattern, "entrypoint": entrypoint, "command": source}) response.raise_for_status() print(response.json()) @route.command() @click.option("--url", envvar='KAPOW_URL') @click.argument("route-id") def remove(route_id, url): response = requests.delete(f"{url}/routes/{route_id}") response.raise_for_status() print(response.json()) if __name__ == '__main__': kapow()