From 7de537db8bf991882b3f2b0b755d20658091bfc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roberto=20Abdelkader=20Mart=C3=ADnez=20P=C3=A9rez?= Date: Thu, 28 Mar 2019 13:06:11 +0100 Subject: [PATCH] WIP Streaming. Added '.' in grammar regex --- kapow.py | 101 +++++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 72 insertions(+), 29 deletions(-) diff --git a/kapow.py b/kapow.py index 13d2bae..8490fb8 100644 --- a/kapow.py +++ b/kapow.py @@ -1,4 +1,7 @@ +#!/usr/bin/env python + from aiohttp import web +from collections import defaultdict from dataclasses import dataclass from shlex import quote as shell_quote from string import Template @@ -31,7 +34,7 @@ multi_method = delimitedList(method, delim="|", combine=True) method_spec = Combine(Literal('*') | multi_method) # Pattern -regex = Word(alphas + nums + '\\+*,[]-_')(name="regex") +regex = Word(alphas + nums + '\\+*,.[]-_')(name="regex") symbol = Word(alphas)(name="symbol") p_pattern = Combine('/{' + symbol + Optional(':' + regex) + '}') p_path = Word('/', alphas + nums + '$-_.+!*\'(),') @@ -50,6 +53,10 @@ endpoint = (Optional(method_spec + Suppress(White()), kapow_program = OneOrMore(endpoint) +######################################################################## +# Resources # +######################################################################## + @dataclass class ResourceManager: shell_repr: str @@ -57,38 +64,43 @@ class ResourceManager: async def get_value(context, path): + def nrd(n): + return path.split('/', n)[-1] + if path == 'request/method': return context['request'].method.encode('utf-8') elif path == 'request/path': return context['request'].path.encode('utf-8') elif path.startswith('request/match'): - key = path.split('/', 2)[-1] - return context['request'].match_info[key].encode('utf-8') + return context['request'].match_info[nrd(2)].encode('utf-8') elif path.startswith('request/param'): - key = path.split('/', 2)[-1] - return context['request'].rel_url.query[key].encode('utf-8') + return context['request'].rel_url.query[nrd(2)].encode('utf-8') elif path.startswith('request/header'): - key = path.split('/', 2)[-1] - return context['request'].headers[key].encode('utf-8') + return context['request'].headers[nrd(2)].encode('utf-8') elif path.startswith('request/form'): - key = path.split('/', 2)[-1] - return (await context['request'].post())[key].encode('utf-8') + return (await context['request'].post())[nrd(2)].encode('utf-8') elif path == 'request/body': return await context['request'].read() else: raise ValueError(f'Unknown path {path!r}') + async def set_value(context, path, value): + def nrd(n): + return path.split('/', n)[-1] + if path == 'response/status': context['response_status'] = int(value.decode('utf-8')) elif path == 'response/body': context['response_body'].write(value) + elif path == 'response/body': + context['response_stream'].write(value) elif path.startswith('response/header/'): - key = path.split('/', 2)[-1] - context['response_headers'][key] = value.rstrip(b'\n').decode('utf-8') + context['response_headers'][nrd(2)] = value.rstrip(b'\n').decode('utf-8') else: raise ValueError(f'Unknown path {path!r}') + def is_readable(path): return path.startswith('request/') @@ -116,18 +128,43 @@ def get_manager(resource, context): filename = tempfile.mktemp() os.mkfifo(filename) - async def manage_fifo(): - try: - if is_readable(path): - async with aiofiles.open(filename, 'wb') as fifo: - await fifo.write(await get_value(context, path)) - elif is_writable(path): + if path == 'response/stream': + async def manage_fifo(): + initialized = False + try: async with aiofiles.open(filename, 'rb') as fifo: - await set_value(context, path, await fifo.read()) - else: - raise RuntimeError('WTF!') - finally: - os.unlink(filename) + while True: + print("Reading stream") + chunk = await fifo.read(128) + if chunk: + if not initialized: + # Give a chance to other coroutines + # to write changes to response + # (headers, etc) + await asyncio.sleep(0) + response = web.StreamResponse(status=200) + context["stream"] = response + await response.prepare(context["request"]) + initialized = True + print("initialized!") + await response.write(b"hola") + else: + break + finally: + os.unlink(filename) + else: + async def manage_fifo(): + try: + if is_readable(path): + async with aiofiles.open(filename, 'wb') as fifo: + await fifo.write(await get_value(context, path)) + elif is_writable(path): + async with aiofiles.open(filename, 'rb') as fifo: + await set_value(context, path, await fifo.read()) + else: + raise RuntimeError('WTF!') + finally: + os.unlink(filename) yield ResourceManager( shell_repr=shell_quote(filename), @@ -184,7 +221,7 @@ class KapowTemplate(Template): for v in resources.values()] await shell_task.wait() # Run the subshell process - done, pending = await asyncio.wait(manager_tasks, timeout=0) # Managers commit changes + done, pending = await asyncio.wait(manager_tasks, timeout=1) # XXX: Managers commit changes if pending: # print(f"Warning: Resources not consumed ({len(pending)})") for task in pending: @@ -198,17 +235,23 @@ def create_runner(code): def create_context(request): context = dict() context["request"] = request + context["stream"] = None context["response_body"] = io.BytesIO() context["response_status"] = 200 context["response_headers"] = dict() return context -def response_from_context(context): - return web.Response( - body=context["response_body"].getbuffer(), - status=context["response_status"], - headers=context["response_headers"]) +async def response_from_context(context): + if context["stream"]: + print("is stream!") + await context["stream"].write_eof() + return context["stream"] + else: + return web.Response( + body=context["response_body"].getbuffer(), + status=context["response_status"], + headers=context["response_headers"]) def generate_endpoint(code): @@ -216,7 +259,7 @@ def generate_endpoint(code): context = create_context(request) runner = create_runner(code) await runner(context) # Will change context - return response_from_context(context) + return (await response_from_context(context)) return endpoint