#!/usr/bin/env python # # Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from urllib.parse import urlparse from uuid import uuid4 import asyncio import io import logging import os import shlex import sys from aiohttp import web, StreamReader log = logging.getLogger('kapow') ######################################################################## # Resource Management # ######################################################################## CONNECTIONS = {} class ConnectionHandler: def __init__(self, request): self._stream = None self._body = io.BytesIO() self._status = 200 self._headers = dict() self._cookies = dict() self.request = request async def get(self, key): res = urlparse(key) def nrd(n): """Return the nrd element in a path.""" return res.path.split('/')[n] if res.path == 'request/method': return self.request.method.encode('utf-8') elif res.path == 'request/body': return self.request.content elif res.path == 'request/path': return self.request.path.encode('utf-8') elif res.path.startswith('request/match/'): return self.request.match_info[nrd(2)].encode('utf-8') elif res.path.startswith('request/param/'): return self.request.rel_url.query[nrd(2)].encode('utf-8') elif res.path.startswith('request/header/'): return self.request.headers[nrd(2)].encode('utf-8') elif res.path.startswith('request/cookie/'): return self.request.cookies[nrd(2)].encode('utf-8') elif res.path.startswith('request/form/'): return (await self.request.post())[nrd(2)].encode('utf-8') elif res.path.startswith('request/file/'): name = nrd(2) content = nrd(3) # filename / content field = (await self.request.post())[name] if content == 'filename': try: return field.filename.encode('utf-8') except Exception: return b'' elif content == 'content': try: return field.file.read() except Exception: return b'' else: raise ValueError(f'Unknown content type {content!r}') else: raise ValueError('Unknown path') async def set(self, key, content): res = urlparse(key) def nrd(n): return res.path.split('/')[n] if res.path == 'response/status': self._status = int((await content.read()).decode('utf-8')) elif res.path == 'response/body': self._body.write(await content.read()) elif res.path.startswith('response/header/'): clean = (await content.read()).rstrip(b'\n').decode('utf-8') self._headers[nrd(2)] = clean elif res.path.startswith('response/cookie/'): clean = (await content.read()).rstrip(b'\n').decode('utf-8') self._headers[nrd(2)] = clean elif res.path == 'response/stream': if self._stream is None: self._stream = web.StreamResponse(status=self._status, reason="OK", headers=self._headers) for name, value in self._cookies: self._stream.set_cookie(name, value) await self._stream.prepare(self.request) chunk = await content.readany() while chunk: await self._stream.write(chunk) chunk = await content.readany() else: raise ValueError(f'Unknown path {res.path!r}') async def append(self, key, value): raise NotImplementedError() async def build_response(self): if self._stream is None: response = web.Response(body=self._body.getvalue(), status=self._status, headers=self._headers) for name, value in self._cookies.items(): response.set_cookie(name, value) return response else: await self._stream.write_eof() return self._stream async def get_resource(request): id = request.match_info["id"] resource = request.match_info["resource"] maybe_content = await CONNECTIONS[id].get(resource) if isinstance(maybe_content, StreamReader): content = maybe_content response = web.StreamResponse(status=200, reason="OK") await response.prepare(request) chunk = await content.readany() while chunk: await response.write(chunk) chunk = await content.readany() await response.write_eof() return response else: body = maybe_content return web.Response(body=body) async def set_resource(request): id = request.match_info["id"] resource = request.match_info["resource"] await CONNECTIONS[id].set(resource, request.content) return web.Response(body=b'') async def append_resource(request): pass ######################################################################## # Endpoint Execution # ######################################################################## def handle_route(entrypoint, command): async def _handle(request): id = "CONN_" + str(uuid4()).replace('-', '_') connection = CONNECTIONS[id] = ConnectionHandler(request) executable, *params = shlex.split(entrypoint) args = ' '.join([executable] + [shlex.quote(token) for token in params] + [shlex.quote(command)]) # Run the source shell_task = await asyncio.create_subprocess_shell( args, env={**os.environ, "KAPOW_URL": "http://localhost:8080/kapow", "KAPOW_CONNECTION": id, "PATH": ":".join([os.path.join(os.path.dirname(os.path.realpath(__file__)), "bin"), os.environ["PATH"]]), }, stdin=asyncio.subprocess.DEVNULL) await shell_task.wait() del CONNECTIONS[id] return await connection.build_response() return _handle ######################################################################## # Route Management # ######################################################################## async def get_routes(request): return web.json_response(list(request.app.router)) async def create_route(request): request.app.router._frozen = False content = await request.json() name = "ROUTE_" + str(uuid4()).replace('-', '_') request.app.router.add_route(content["method"], content["url_pattern"], handle_route(content["entrypoint"], content["command"]), name=name) return web.json_response(name) async def delete_route(request): id = request.match_info["id"] route = request.app.router._named_resources.pop(id) request.app.router._resources.remove(route) return web.json_response(id) ######################################################################## # aiohttp webapp # ######################################################################## async def run_init_script(): if len(sys.argv) < 2: raise RuntimeError("Script file is mandatory.") shell_task = await asyncio.create_subprocess_shell( f"/bin/bash --init-file {sys.argv[1]}", # script, # executable=os.environ.get('SHELL', '/bin/sh'), env={**os.environ, "KAPOW_URL": "http://localhost:8080/kapow", "PATH": ":".join([os.path.join(os.path.dirname(os.path.realpath(__file__)), "bin"), os.environ["PATH"]]), }) await shell_task.wait() async def start_background_tasks(app): app["debug_tasks"] = app.loop.create_task(run_init_script()) def main(): app = web.Application(client_max_size=1024**3) app.add_routes([ web.get('/kapow/routes', get_routes), web.post('/kapow/routes', create_route), web.delete('/kapow/routes/{id}', delete_route), web.get('/kapow/connections/{id}/{resource:.*}', get_resource), # web.post('/kapow/connections/{id}/{resource:.*}', append_resource), web.put('/kapow/connections/{id}/{resource:.*}', set_resource), ]) app.on_startup.append(start_background_tasks) web.run_app(app) if __name__ == '__main__': main()