WIP Streaming. Added '.' in grammar regex

This commit is contained in:
Roberto Abdelkader Martínez Pérez
2019-03-28 13:06:11 +01:00
parent c1b90a2608
commit 7de537db8b
+57 -14
View File
@@ -1,4 +1,7 @@
#!/usr/bin/env python
from aiohttp import web from aiohttp import web
from collections import defaultdict
from dataclasses import dataclass from dataclasses import dataclass
from shlex import quote as shell_quote from shlex import quote as shell_quote
from string import Template from string import Template
@@ -31,7 +34,7 @@ multi_method = delimitedList(method, delim="|", combine=True)
method_spec = Combine(Literal('*') | multi_method) method_spec = Combine(Literal('*') | multi_method)
# Pattern # Pattern
regex = Word(alphas + nums + '\\+*,[]-_')(name="regex") regex = Word(alphas + nums + '\\+*,.[]-_')(name="regex")
symbol = Word(alphas)(name="symbol") symbol = Word(alphas)(name="symbol")
p_pattern = Combine('/{' + symbol + Optional(':' + regex) + '}') p_pattern = Combine('/{' + symbol + Optional(':' + regex) + '}')
p_path = Word('/', alphas + nums + '$-_.+!*\'(),') p_path = Word('/', alphas + nums + '$-_.+!*\'(),')
@@ -50,6 +53,10 @@ endpoint = (Optional(method_spec + Suppress(White()),
kapow_program = OneOrMore(endpoint) kapow_program = OneOrMore(endpoint)
########################################################################
# Resources #
########################################################################
@dataclass @dataclass
class ResourceManager: class ResourceManager:
shell_repr: str shell_repr: str
@@ -57,38 +64,43 @@ class ResourceManager:
async def get_value(context, path): async def get_value(context, path):
def nrd(n):
return path.split('/', n)[-1]
if path == 'request/method': if path == 'request/method':
return context['request'].method.encode('utf-8') return context['request'].method.encode('utf-8')
elif path == 'request/path': elif path == 'request/path':
return context['request'].path.encode('utf-8') return context['request'].path.encode('utf-8')
elif path.startswith('request/match'): elif path.startswith('request/match'):
key = path.split('/', 2)[-1] return context['request'].match_info[nrd(2)].encode('utf-8')
return context['request'].match_info[key].encode('utf-8')
elif path.startswith('request/param'): elif path.startswith('request/param'):
key = path.split('/', 2)[-1] return context['request'].rel_url.query[nrd(2)].encode('utf-8')
return context['request'].rel_url.query[key].encode('utf-8')
elif path.startswith('request/header'): elif path.startswith('request/header'):
key = path.split('/', 2)[-1] return context['request'].headers[nrd(2)].encode('utf-8')
return context['request'].headers[key].encode('utf-8')
elif path.startswith('request/form'): elif path.startswith('request/form'):
key = path.split('/', 2)[-1] return (await context['request'].post())[nrd(2)].encode('utf-8')
return (await context['request'].post())[key].encode('utf-8')
elif path == 'request/body': elif path == 'request/body':
return await context['request'].read() return await context['request'].read()
else: else:
raise ValueError(f'Unknown path {path!r}') raise ValueError(f'Unknown path {path!r}')
async def set_value(context, path, value): async def set_value(context, path, value):
def nrd(n):
return path.split('/', n)[-1]
if path == 'response/status': if path == 'response/status':
context['response_status'] = int(value.decode('utf-8')) context['response_status'] = int(value.decode('utf-8'))
elif path == 'response/body': elif path == 'response/body':
context['response_body'].write(value) context['response_body'].write(value)
elif path == 'response/body':
context['response_stream'].write(value)
elif path.startswith('response/header/'): elif path.startswith('response/header/'):
key = path.split('/', 2)[-1] context['response_headers'][nrd(2)] = value.rstrip(b'\n').decode('utf-8')
context['response_headers'][key] = value.rstrip(b'\n').decode('utf-8')
else: else:
raise ValueError(f'Unknown path {path!r}') raise ValueError(f'Unknown path {path!r}')
def is_readable(path): def is_readable(path):
return path.startswith('request/') return path.startswith('request/')
@@ -116,6 +128,31 @@ def get_manager(resource, context):
filename = tempfile.mktemp() filename = tempfile.mktemp()
os.mkfifo(filename) os.mkfifo(filename)
if path == 'response/stream':
async def manage_fifo():
initialized = False
try:
async with aiofiles.open(filename, 'rb') as fifo:
while True:
print("Reading stream")
chunk = await fifo.read(128)
if chunk:
if not initialized:
# Give a chance to other coroutines
# to write changes to response
# (headers, etc)
await asyncio.sleep(0)
response = web.StreamResponse(status=200)
context["stream"] = response
await response.prepare(context["request"])
initialized = True
print("initialized!")
await response.write(b"hola")
else:
break
finally:
os.unlink(filename)
else:
async def manage_fifo(): async def manage_fifo():
try: try:
if is_readable(path): if is_readable(path):
@@ -184,7 +221,7 @@ class KapowTemplate(Template):
for v in resources.values()] for v in resources.values()]
await shell_task.wait() # Run the subshell process await shell_task.wait() # Run the subshell process
done, pending = await asyncio.wait(manager_tasks, timeout=0) # Managers commit changes done, pending = await asyncio.wait(manager_tasks, timeout=1) # XXX: Managers commit changes
if pending: if pending:
# print(f"Warning: Resources not consumed ({len(pending)})") # print(f"Warning: Resources not consumed ({len(pending)})")
for task in pending: for task in pending:
@@ -198,13 +235,19 @@ def create_runner(code):
def create_context(request): def create_context(request):
context = dict() context = dict()
context["request"] = request context["request"] = request
context["stream"] = None
context["response_body"] = io.BytesIO() context["response_body"] = io.BytesIO()
context["response_status"] = 200 context["response_status"] = 200
context["response_headers"] = dict() context["response_headers"] = dict()
return context return context
def response_from_context(context): async def response_from_context(context):
if context["stream"]:
print("is stream!")
await context["stream"].write_eof()
return context["stream"]
else:
return web.Response( return web.Response(
body=context["response_body"].getbuffer(), body=context["response_body"].getbuffer(),
status=context["response_status"], status=context["response_status"],
@@ -216,7 +259,7 @@ def generate_endpoint(code):
context = create_context(request) context = create_context(request)
runner = create_runner(code) runner = create_runner(code)
await runner(context) # Will change context await runner(context) # Will change context
return response_from_context(context) return (await response_from_context(context))
return endpoint return endpoint