diff --git a/poc/kapow b/poc/kapow index e7a30b8..9cc3eba 100755 --- a/poc/kapow +++ b/poc/kapow @@ -40,7 +40,7 @@ CONNECTIONS = {} class Connection: """ - Manages the lifecycle of a kapow connection. + Manages the lifecycle of a Kapow! connection. Behaves like a memory for the "fields" available in HTTP connections. @@ -150,15 +150,16 @@ class Connection: async def get_field(request): + """Get the value of some HTTP field in the given connection.""" id = request.match_info["id"] - resource = request.match_info["resource"] + field = request.match_info["field"] try: connection = CONNECTIONS[id] except KeyError: response = web.HTTPNotFound() else: - content = await connection.get(resource) + content = await connection.get(field) if isinstance(content, StreamReader): response = web.StreamResponse(status=200, reason="OK") @@ -177,8 +178,9 @@ async def get_field(request): async def set_field(request): + """Set the value of some HTTP field in the given connection.""" id = request.match_info["id"] - resource = request.match_info["resource"] + field = request.match_info["field"] try: connection = CONNECTIONS[id] @@ -186,7 +188,7 @@ async def set_field(request): response = web.HTTPNotFound() else: try: - await connection.set(resource, request.content) + await connection.set(field, request.content) except ConnectionResetError: # Raised when trying to write to an already-closed stream. request.transport.close() @@ -206,28 +208,40 @@ async def append_field(request): def handle_route(entrypoint, command): + """ + Return an aiohttp route handler that will execute entrypoint and + command in order to manage a Kapow! route. + + """ async def _handle(request): + # Register a new connection to Kapow! id = "CONN_" + str(uuid4()).replace('-', '_') connection = CONNECTIONS[id] = Connection(request) + # Run entrypoint + command passing the connection id executable, *params = shlex.split(entrypoint) - args = ' '.join([executable] + [shlex.quote(token) for token in params] + [shlex.quote(command)]) + args = ' '.join([executable] + + [shlex.quote(token) for token in params] + + [shlex.quote(command)]) + try: + shell_task = await asyncio.create_subprocess_shell( + args, + env={**os.environ, + "KAPOW_URL": "http://localhost:8080/kapow", + "KAPOW_CONNECTION": id, + "PATH": ":".join([os.path.join(os.path.dirname(os.path.realpath(__file__)), "bin"), + os.environ["PATH"]]), + }, + stdin=asyncio.subprocess.DEVNULL) - # Run the source - shell_task = await asyncio.create_subprocess_shell( - args, - env={**os.environ, - "KAPOW_URL": "http://localhost:8080/kapow", - "KAPOW_CONNECTION": id, - "PATH": ":".join([os.path.join(os.path.dirname(os.path.realpath(__file__)), "bin"), - os.environ["PATH"]]), - }, - stdin=asyncio.subprocess.DEVNULL) - await shell_task.wait() - - del CONNECTIONS[id] - - return await connection.build_response() + await shell_task.wait() + except: + raise + else: + # Respond when the command finish + return await connection.build_response() + finally: + del CONNECTIONS[id] return _handle @@ -237,10 +251,12 @@ def handle_route(entrypoint, command): ######################################################################## async def get_routes(request): + """Return the list of registered routes.""" return web.json_response(list(request.app.router)) async def create_route(request): + """Create a new Kapow! route.""" request.app.router._frozen = False content = await request.json() name = "ROUTE_" + str(uuid4()).replace('-', '_') @@ -254,6 +270,7 @@ async def create_route(request): async def delete_route(request): + """Delete the given Kapow! route.""" id = request.match_info["id"] route = request.app.router._named_resources.pop(id) request.app.router._resources.remove(route) @@ -267,13 +284,18 @@ async def delete_route(request): async def run_init_script(app): + """ + Run the init script if given, then wait for the shell to finish. + + """ if len(sys.argv) == 1: # No script given cmd = "/bin/bash" elif len(sys.argv) == 2: cmd = f"/bin/bash --init-file {sys.argv[1]}" else: - raise RuntimeError("Invalid parameter count.") + print(f"Usage: sys.argv[0] ") + os._exit(1) binpath = os.path.join(os.path.dirname(os.path.realpath(__file__)), "bin") shell_task = await asyncio.create_subprocess_shell( @@ -293,19 +315,20 @@ async def start_background_tasks(app): app["debug_tasks"] = loop.create_task(run_init_script(app)) -def main(): +def kapow(): + """Start aiohttp app.""" app = web.Application(client_max_size=1024**3) app.add_routes([ web.get('/kapow/routes', get_routes), web.post('/kapow/routes', create_route), web.delete('/kapow/routes/{id}', delete_route), - web.get('/kapow/connections/{id}/{resource:.*}', get_field), - # web.post('/kapow/connections/{id}/{resource:.*}', append_resource), - web.put('/kapow/connections/{id}/{resource:.*}', set_field), + web.get('/kapow/connections/{id}/{field:.*}', get_field), + # web.post('/kapow/connections/{id}/{field:.*}', append_field), + web.put('/kapow/connections/{id}/{field:.*}', set_field), ]) app.on_startup.append(start_background_tasks) web.run_app(app) if __name__ == '__main__': - main() + kapow()