From ad6ffb6bc44fefe1df60c4ce4a282471c0f18b84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roberto=20Abdelkader=20Mart=C3=ADnez=20P=C3=A9rez?= Date: Tue, 30 Apr 2019 09:32:16 +0200 Subject: [PATCH] Graceful exit when shell finish. --- poc/kapow | 82 +++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 55 insertions(+), 27 deletions(-) diff --git a/poc/kapow b/poc/kapow index 41d8808..3a22268 100755 --- a/poc/kapow +++ b/poc/kapow @@ -34,6 +34,7 @@ log = logging.getLogger('kapow') # Resource Management # ######################################################################## + CONNECTIONS = {} @@ -140,31 +141,48 @@ class ConnectionHandler: async def get_resource(request): id = request.match_info["id"] resource = request.match_info["resource"] - maybe_content = await CONNECTIONS[id].get(resource) - if isinstance(maybe_content, StreamReader): - content = maybe_content - response = web.StreamResponse(status=200, reason="OK") - await response.prepare(request) - chunk = await content.readany() - while chunk: - await response.write(chunk) - chunk = await content.readany() - await response.write_eof() - return response + + try: + connection = CONNECTIONS[id] + except KeyError: + response = web.HTTPNotFound() else: - body = maybe_content - return web.Response(body=body) + content = await connection.get(resource) + + if isinstance(content, StreamReader): + response = web.StreamResponse(status=200, reason="OK") + await response.prepare(request) + + chunk = await content.readany() + while chunk: + await response.write(chunk) + chunk = await content.readany() + + await response.write_eof() + else: + response = web.Response(body=content) + + return response async def set_resource(request): id = request.match_info["id"] resource = request.match_info["resource"] + try: - await CONNECTIONS[id].set(resource, request.content) - except ConnectionResetError: - # Raised when trying to write to an already-closed stream. - request.transport.close() - return web.Response(body=b'') + connection = CONNECTIONS[id] + except KeyError: + response = web.HTTPNotFound() + else: + try: + await connection.set(resource, request.content) + except ConnectionResetError: + # Raised when trying to write to an already-closed stream. + request.transport.close() + else: + response = web.Response(body=b'') + + return response async def append_resource(request): @@ -220,6 +238,7 @@ async def create_route(request): handle_route(content["entrypoint"], content["command"]), name=name) + print(f'Route created {content["method"]} {content["url_pattern"]}') return web.json_response(name) @@ -227,6 +246,7 @@ async def delete_route(request): id = request.match_info["id"] route = request.app.router._named_resources.pop(id) request.app.router._resources.remove(route) + print(f'Route deleted {id}') return web.json_response(id) @@ -234,24 +254,32 @@ async def delete_route(request): # aiohttp webapp # ######################################################################## -async def run_init_script(): - if len(sys.argv) < 2: - raise RuntimeError("Script file is mandatory.") +async def run_init_script(app): + if len(sys.argv) == 1: + # No script given + cmd = "/bin/bash" + elif len(sys.argv) == 2: + cmd = f"/bin/bash --init-file {sys.argv[1]}" + else: + raise RuntimeError("Invalid parameter count.") + + binpath = os.path.join(os.path.dirname(os.path.realpath(__file__)), "bin") shell_task = await asyncio.create_subprocess_shell( - f"/bin/bash --init-file {sys.argv[1]}", - # script, - # executable=os.environ.get('SHELL', '/bin/sh'), + cmd, env={**os.environ, "KAPOW_URL": "http://localhost:8080/kapow", - "PATH": ":".join([os.path.join(os.path.dirname(os.path.realpath(__file__)), "bin"), - os.environ["PATH"]]), + "PATH": ":".join([binpath, os.environ["PATH"]]), }) + await shell_task.wait() + await app.cleanup() + os._exit(shell_task.returncode) async def start_background_tasks(app): - app["debug_tasks"] = app.loop.create_task(run_init_script()) + loop = asyncio.get_running_loop() + app["debug_tasks"] = loop.create_task(run_init_script(app)) def main():