#!/usr/bin/env python # # Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from urllib.parse import urlparse from uuid import uuid4 import asyncio import io import logging import os import shlex import sys from aiohttp import web, StreamReader log = logging.getLogger('kapow') ######################################################################## # Resource Management # ######################################################################## CONNECTIONS = {} class Connection: """ Manages the lifecycle of a Kapow! connection. Behaves like a memory for the "fields" available in HTTP connections. """ def __init__(self, request): self._stream = None self._body = io.BytesIO() self._status = 200 self._headers = dict() self._cookies = dict() self.request = request async def get(self, key): """Get the content of the field `key`.""" res = urlparse(key) def nth(n): """Return the nth element in a path.""" return res.path.split('/')[n] if res.path == 'request/method': return self.request.method.encode('utf-8') elif res.path == 'request/body': return self.request.content elif res.path == 'request/path': return self.request.path.encode('utf-8') elif res.path.startswith('request/match/'): return self.request.match_info[nth(2)].encode('utf-8') elif res.path.startswith('request/param/'): return self.request.rel_url.query[nth(2)].encode('utf-8') elif res.path.startswith('request/header/'): return self.request.headers[nth(2)].encode('utf-8') elif res.path.startswith('request/cookie/'): return self.request.cookies[nth(2)].encode('utf-8') elif res.path.startswith('request/form/'): return (await self.request.post())[nth(2)].encode('utf-8') elif res.path.startswith('request/file/'): name = nth(2) content = nth(3) # filename / content field = (await self.request.post())[name] if content == 'filename': try: return field.filename.encode('utf-8') except Exception: return b'' elif content == 'content': try: return field.file.read() except Exception: return b'' else: raise ValueError(f'Unknown content type {content!r}') else: raise ValueError('Unknown path') async def set(self, key, content): """Set the field `key` with the value in `content`.""" res = urlparse(key) def nth(n): return res.path.split('/')[n] if res.path == 'response/status': self._status = int((await content.read()).decode('utf-8')) elif res.path == 'response/body': self._body.write(await content.read()) elif res.path.startswith('response/header/'): clean = (await content.read()).rstrip(b'\n').decode('utf-8') self._headers[nth(2)] = clean elif res.path.startswith('response/cookie/'): clean = (await content.read()).rstrip(b'\n').decode('utf-8') self._cookies[nth(2)] = clean elif res.path == 'response/stream': if self._stream is None: self._stream = web.StreamResponse(status=self._status, reason="OK", headers=self._headers) for name, value in self._cookies.items(): self._stream.set_cookie(name, value) await self._stream.prepare(self.request) chunk = await content.readany() while chunk: await self._stream.write(chunk) chunk = await content.readany() else: raise ValueError(f'Unknown path {res.path!r}') async def append(self, key, content): """Append to field `key` the value in `content`.""" raise NotImplementedError() async def build_response(self): """Return the appropriate aiohttp.web.*Response.""" if self._stream is None: response = web.Response(body=self._body.getvalue(), status=self._status, headers=self._headers) for name, value in self._cookies.items(): response.set_cookie(name, value) return response else: await self._stream.write_eof() return self._stream async def get_field(request): """Get the value of some HTTP field in the given connection.""" id = request.match_info["id"] field = request.match_info["field"] try: connection = CONNECTIONS[id] except KeyError: response = web.HTTPNotFound() else: content = await connection.get(field) if isinstance(content, StreamReader): response = web.StreamResponse(status=200, reason="OK") await response.prepare(request) chunk = await content.readany() while chunk: await response.write(chunk) chunk = await content.readany() await response.write_eof() else: response = web.Response(body=content) return response async def set_field(request): """Set the value of some HTTP field in the given connection.""" id = request.match_info["id"] field = request.match_info["field"] try: connection = CONNECTIONS[id] except KeyError: response = web.HTTPNotFound() else: try: await connection.set(field, request.content) except ConnectionResetError: # Raised when trying to write to an already-closed stream. request.transport.close() else: response = web.Response(body=b'') return response async def append_field(request): pass ######################################################################## # Endpoint Execution # ######################################################################## def handle_route(entrypoint, command): """ Return an aiohttp route handler that will execute entrypoint and command in order to manage a Kapow! route. """ async def _handle(request): # Register a new connection to Kapow! id = "CONN_" + str(uuid4()).replace('-', '_') connection = CONNECTIONS[id] = Connection(request) # Run entrypoint + command passing the connection id executable, *params = shlex.split(entrypoint) args = ' '.join([executable] + [shlex.quote(token) for token in params] + [shlex.quote(command)]) try: shell_task = await asyncio.create_subprocess_shell( args, env={**os.environ, "KAPOW_URL": "http://localhost:8080/kapow", "KAPOW_CONNECTION": id }, stdin=asyncio.subprocess.DEVNULL) await shell_task.wait() except: raise else: # Respond when the command finish return await connection.build_response() finally: del CONNECTIONS[id] return _handle ######################################################################## # Route Management # ######################################################################## async def get_routes(request): """Return the list of registered routes.""" return web.json_response(list(request.app.router)) async def create_route(request): """Create a new Kapow! route.""" request.app.router._frozen = False content = await request.json() name = "ROUTE_" + str(uuid4()).replace('-', '_') request.app.router.add_route(content["method"], content["url_pattern"], handle_route(content["entrypoint"], content["command"]), name=name) print(f'Route created {content["method"]} {content["url_pattern"]}') return web.json_response(name) async def delete_route(request): """Delete the given Kapow! route.""" id = request.match_info["id"] route = request.app.router._named_resources.pop(id) request.app.router._resources.remove(route) print(f'Route deleted {id}') return web.json_response(id) ######################################################################## # aiohttp webapp # ######################################################################## async def run_init_script(app): """ Run the init script if given, then wait for the shell to finish. """ if len(sys.argv) == 1: # No script given cmd = "/bin/bash" elif len(sys.argv) == 2: cmd = f"/bin/bash --init-file {sys.argv[1]}" else: print(f"Usage: {sys.argv[0]} ") os._exit(1) shell_task = await asyncio.create_subprocess_shell( cmd, env={**os.environ, "KAPOW_URL": "http://localhost:8080/kapow" }) await shell_task.wait() await app.cleanup() os._exit(shell_task.returncode) async def start_background_tasks(app): loop = asyncio.get_running_loop() app["debug_tasks"] = loop.create_task(run_init_script(app)) def kapow(): """Start aiohttp app.""" app = web.Application(client_max_size=1024**3) app.add_routes([ web.get('/kapow/routes', get_routes), web.post('/kapow/routes', create_route), web.delete('/kapow/routes/{id}', delete_route), web.get('/kapow/connections/{id}/{field:.*}', get_field), # web.post('/kapow/connections/{id}/{field:.*}', append_field), web.put('/kapow/connections/{id}/{field:.*}', set_field), ]) app.on_startup.append(start_background_tasks) web.run_app(app) if __name__ == '__main__': kapow()