Debug levels and closing hanging fifos.

This commit is contained in:
Roberto Abdelkader Martínez Pérez
2019-04-22 07:47:50 +02:00
parent d5ee3ca506
commit acc178949d
+154 -30
View File
@@ -10,8 +10,12 @@ from string import Template
import asyncio import asyncio
import contextlib import contextlib
import io import io
import logging
import os import os
import sys
import tempfile import tempfile
import threading
import traceback
from aiohttp import web from aiohttp import web
from pyparsing import alphas, nums, White from pyparsing import alphas, nums, White
@@ -21,6 +25,8 @@ from pyparsing import OneOrMore, Optional, delimitedList
import aiofiles import aiofiles
import click import click
log = logging.getLogger('kapow')
######################################################################## ########################################################################
# Parser # # Parser #
######################################################################## ########################################################################
@@ -65,18 +71,27 @@ KAPOW_PROGRAM = OneOrMore(CODE_EP | PATH_EP)
@dataclass @dataclass
class ResourceManager: class ResourceManager:
"""A resource exposed to the subshell.""" """A resource exposed to the subshell."""
#: Kapow resource representation
kapow_repr: str
#: Representation of the resource that can be understood by the shell #: Representation of the resource that can be understood by the shell
shell_repr: str shell_repr: str
#: Coroutine capable of managing the resource internally #: Coroutine capable of managing the resource internally
coro: object coro: object
#: Path to readed fifo. Needs to be written for coro to release.
#: XXX: Use proper fifo async instead
fifo_path: str = None
#: Fifo direction 'read'/'write'
fifo_direction: str = None
async def get_value(context, path): async def get_value(context, path):
"""Return the value of an http resource.""" """Return the value of an http resource."""
def nrd(n): def nrd(n):
"""Return the nrd element in a path.""" """Return the nrd element in a path."""
return path.split('/', n)[-1] return path.split('/')[n]
try:
if path == 'request/method': if path == 'request/method':
return context['request'].method.encode('utf-8') return context['request'].method.encode('utf-8')
elif path == 'request/path': elif path == 'request/path':
@@ -87,12 +102,32 @@ async def get_value(context, path):
return context['request'].rel_url.query[nrd(2)].encode('utf-8') return context['request'].rel_url.query[nrd(2)].encode('utf-8')
elif path.startswith('request/header'): elif path.startswith('request/header'):
return context['request'].headers[nrd(2)].encode('utf-8') return context['request'].headers[nrd(2)].encode('utf-8')
elif path.startswith('request/cookie'):
return context['request'].cookies[nrd(2)].encode('utf-8')
elif path.startswith('request/form'): elif path.startswith('request/form'):
return (await context['request'].post())[nrd(2)].encode('utf-8') return (await context['request'].post())[nrd(2)].encode('utf-8')
elif path.startswith('request/file'):
name = nrd(2)
content = nrd(3) # filename / content
field = (await context['request'].post())[name]
if content == 'filename':
try:
return field.filename.encode('utf-8')
except:
return b''
elif content == 'content':
try:
return field.file.read()
except:
return b''
else:
raise ValueError(f'Unknown content type {content!r}')
elif path == 'request/body': elif path == 'request/body':
return await context['request'].read() return await context['request'].read()
else: else:
raise ValueError(f'Unknown path {path!r}') raise ValueError(f'Unknown path {path!r}')
except KeyError:
return b''
async def set_value(context, path, value): async def set_value(context, path, value):
@@ -103,8 +138,11 @@ async def set_value(context, path, value):
append semantics. Non file-like resources are just set. append semantics. Non file-like resources are just set.
""" """
if not value:
return
def nrd(n): def nrd(n):
return path.split('/', n)[-1] return path.split('/')[n]
if path == 'response/status': if path == 'response/status':
context['response_status'] = int(value.decode('utf-8')) context['response_status'] = int(value.decode('utf-8'))
@@ -115,6 +153,9 @@ async def set_value(context, path, value):
elif path.startswith('response/header/'): elif path.startswith('response/header/'):
clean = value.rstrip(b'\n').decode('utf-8') clean = value.rstrip(b'\n').decode('utf-8')
context['response_headers'][nrd(2)] = clean context['response_headers'][nrd(2)] = clean
elif path.startswith('response/cookie/'):
clean = value.rstrip(b'\n').decode('utf-8')
context['response_cookies'][nrd(2)] = clean
else: else:
raise ValueError(f'Unknown path {path!r}') raise ValueError(f'Unknown path {path!r}')
@@ -132,7 +173,11 @@ def get_manager(resource, context):
Return an async context manager capable of manage the given Return an async context manager capable of manage the given
resource. resource.
""" """
try:
view, path = resource.split(':') view, path = resource.split(':')
except:
log.error(f"Invalid resource %r", resource)
raise
@contextlib.asynccontextmanager @contextlib.asynccontextmanager
async def manager(): async def manager():
@@ -156,6 +201,7 @@ def get_manager(resource, context):
else: else:
value = await get_value(context, path) value = await get_value(context, path)
yield ResourceManager( yield ResourceManager(
kapow_repr=resource,
shell_repr=value.decode('utf-8'), shell_repr=value.decode('utf-8'),
coro=asyncio.sleep(0)) coro=asyncio.sleep(0))
elif view == 'value': elif view == 'value':
@@ -164,6 +210,7 @@ def get_manager(resource, context):
else: else:
value = await get_value(context, path) value = await get_value(context, path)
yield ResourceManager( yield ResourceManager(
kapow_repr=resource,
shell_repr=shell_quote(value.decode('utf-8')), shell_repr=shell_quote(value.decode('utf-8')),
coro=asyncio.sleep(0)) coro=asyncio.sleep(0))
elif view == 'fifo': elif view == 'fifo':
@@ -180,7 +227,7 @@ def get_manager(resource, context):
if path.endswith('/lines'): if path.endswith('/lines'):
chunk = await fifo.readline() chunk = await fifo.readline()
else: else:
chunk = await fifo.read(128) chunk = await fifo.read(1024*10)
if chunk: if chunk:
if not initialized: if not initialized:
# Give a chance to other coroutines # Give a chance to other coroutines
@@ -191,6 +238,8 @@ def get_manager(resource, context):
status=200, status=200,
headers=context["response_headers"], headers=context["response_headers"],
reason="OK") reason="OK")
for name, value in context["response_cookies"]:
response.set_cookie(name, value)
context["stream"] = response context["stream"] = response
await response.prepare(context["request"]) await response.prepare(context["request"])
initialized = True initialized = True
@@ -207,14 +256,23 @@ def get_manager(resource, context):
await fifo.write(await get_value(context, path)) await fifo.write(await get_value(context, path))
elif is_writable(path): elif is_writable(path):
async with aiofiles.open(filename, 'rb') as fifo: async with aiofiles.open(filename, 'rb') as fifo:
await set_value(context, path, await fifo.read()) buf = io.BytesIO()
while True:
chunk = await fifo.read(128)
if not chunk:
break
buf.write(chunk)
await set_value(context, path, buf.getvalue())
else: else:
raise RuntimeError('WTF!') raise RuntimeError('WTF!')
finally: finally:
os.unlink(filename) os.unlink(filename)
yield ResourceManager( yield ResourceManager(
kapow_repr=resource,
shell_repr=shell_quote(filename), shell_repr=shell_quote(filename),
coro=manage_fifo()) coro=manage_fifo(),
fifo_path=filename,
fifo_direction='read' if is_readable(path) else 'write')
elif view == 'file': elif view == 'file':
with tempfile.NamedTemporaryFile(mode='w+b', buffering=0) as tmp: with tempfile.NamedTemporaryFile(mode='w+b', buffering=0) as tmp:
if is_readable(path): if is_readable(path):
@@ -223,6 +281,7 @@ def get_manager(resource, context):
tmp.flush() tmp.flush()
yield ResourceManager( yield ResourceManager(
kapow_repr=resource,
shell_repr=shell_quote(tmp.name), shell_repr=shell_quote(tmp.name),
coro=asyncio.sleep(0)) coro=asyncio.sleep(0))
@@ -247,30 +306,55 @@ class KapowTemplate(Template):
# Initialize all resources creating a mapping # Initialize all resources creating a mapping
resources = dict() # resource: (shell_repr, manager) resources = dict() # resource: (shell_repr, manager)
for match in self.pattern.findall(self.template): for match in self.pattern.findall(self.template):
_, resource, *_ = match delim, resource, *rest = match
if not resource: # When is braced
resource = rest[0]
if delim and not resource and rest == ['', '']:
# Escaped
continue
if resource not in resources: if resource not in resources:
try:
manager = get_manager(resource, context) manager = get_manager(resource, context)
except:
log.error(f"Invalid match %r, %r, %r", delim, resource, rest)
raise
resources[resource] = await stack.enter_async_context(manager()) resources[resource] = await stack.enter_async_context(manager())
code = self.substitute(**{k: v.shell_repr code = self.substitute(**{k: v.shell_repr
for k, v in resources.items()}) for k, v in resources.items()})
# print('-'*80) log.debug("Creating tasks")
# print(code) manager_tasks = {asyncio.create_task(v.coro): v
# print('-'*80) for k, v in resources.items()}
manager_tasks = [asyncio.create_task(v.coro)
for v in resources.values()]
await asyncio.sleep(0) await asyncio.sleep(0)
shell_task = await asyncio.create_subprocess_shell(code) log.debug("Creating subprocess")
shell_task = await asyncio.create_subprocess_shell(
code,
executable=os.environ.get('SHELL', '/bin/sh'))
log.debug("Waiting for subprocess")
await shell_task.wait() # Run the subshell process await shell_task.wait() # Run the subshell process
# XXX: Managers commit changes
_, pending = await asyncio.wait(manager_tasks, timeout=1) done, pending = await asyncio.wait(manager_tasks.keys(), timeout=0.1)
if pending: if pending:
# print(f"Warning: Resources not consumed ({len(pending)})")
for task in pending: for task in pending:
task.cancel() resource = manager_tasks[task]
if resource.fifo_path is not None:
log.debug(f"Trying to stop %s", resource.kapow_repr)
if resource.fifo_direction == 'write':
os.system(f"echo -n > {resource.fifo_path} &")
elif resource.fifo_direction == 'read':
os.system(f"cat {resource.fifo_path} > /dev/null &")
else:
raise ValueError("Unknown direction")
else:
log.debug(f"Non-fifo resource pending!! %s", resource.kapow_repr)
log.debug("Waiting for pending resources...")
await asyncio.wait(pending)
await asyncio.sleep(0) await asyncio.sleep(0)
@@ -282,6 +366,7 @@ def create_context(request):
context["response_body"] = io.BytesIO() context["response_body"] = io.BytesIO()
context["response_status"] = 200 context["response_status"] = 200
context["response_headers"] = dict() context["response_headers"] = dict()
context["response_cookies"] = dict()
return context return context
@@ -294,6 +379,7 @@ async def response_from_context(context):
body = context["response_body"].getvalue() body = context["response_body"].getvalue()
status = context["response_status"] status = context["response_status"]
headers = context["response_headers"] headers = context["response_headers"]
cookies = context["response_cookies"]
# Content-Type guessing (for demo only) # Content-Type guessing (for demo only)
if "Content-Type" not in headers: if "Content-Type" not in headers:
@@ -304,15 +390,26 @@ async def response_from_context(context):
else: else:
headers["Content-Type"] = "text/plain" headers["Content-Type"] = "text/plain"
return web.Response(body=body, status=status, headers=headers) response = web.Response(body=body, status=status, headers=headers)
for name, value in cookies.items():
response.set_cookie(name, value)
return response
def generate_endpoint(code): def generate_endpoint(code, path=None):
"""Return an aiohttp-endpoint coroutine to run kapow `code`.""" """Return an aiohttp-endpoint coroutine to run kapow `code`."""
async def endpoint(request): async def endpoint(request):
context = create_context(request) context = create_context(request)
log.debug("Running endpoint %r", path)
try:
await KapowTemplate(code).run(context) # Will change context await KapowTemplate(code).run(context) # Will change context
return await response_from_context(context) except:
log.exception("Template crashed!")
log.debug("Endpoint finished, creating response %r", path)
response = await response_from_context(context)
log.debug("Responding %r", path)
return response
return endpoint return endpoint
@@ -336,41 +433,68 @@ def path_server(path):
def register_code_endpoint(app, methods, pattern, code): def register_code_endpoint(app, methods, pattern, code):
"""Register all needed endpoints for the defined endpoint code.""" """Register all needed endpoints for the defined endpoint code."""
print(f"Registering [code] methods={methods!r} pattern={pattern!r}") endpoint = generate_endpoint(code, pattern)
endpoint = generate_endpoint(code)
for method in methods: # May be '*' for method in methods: # May be '*'
app.add_routes([web.route(method, pattern, endpoint)]) app.add_routes([web.route(method, pattern, endpoint)])
def register_path_endpoint(app, methods, pattern, path): def register_path_endpoint(app, methods, pattern, path):
"""Register all needed endpoints for the defined file.""" """Register all needed endpoints for the defined file."""
print(f"Registering [path] methods={methods!r} pattern={pattern!r}") for method in methods:
for method in methods: # May be '*' if method != 'GET':
app.add_routes([web.route(method, pattern, path_server(path))]) raise ValueError("Invalid method for serving files.")
else:
app.add_routes([web.static(pattern, path)])
async def debug_tasks():
while True:
await asyncio.sleep(1)
log.debug("Tasks: %s | Threads: %s",
len(asyncio.Task.all_tasks()),
threading.active_count())
async def start_background_tasks(app):
app["debug_tasks"] = app.loop.create_task(debug_tasks())
@click.command() @click.command()
@click.option('--expression', '-e') @click.option('--expression', '-e')
@click.option('--verbose', '-v', count=True)
@click.argument('program', type=click.File(), required=False) @click.argument('program', type=click.File(), required=False)
@click.pass_context @click.pass_context
def main(ctx, program, expression): def main(ctx, program, verbose, expression):
"""Run the kapow server with the given command-line parameters.""" """Run the kapow server with the given command-line parameters."""
if program is None and expression is None: if program is None and expression is None:
click.echo(ctx.get_help()) program = sys.stdin
ctx.exit()
app = web.Application(client_max_size=1024*1024*1024)
if verbose == 0:
_print = lambda _: None
logging.basicConfig(stream=sys.stderr, level=logging.ERROR)
else:
_print = lambda s: print(s, file=sys.stderr)
if verbose == 1:
logging.basicConfig(stream=sys.stderr, level=logging.INFO)
else:
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
if verbose > 2:
app.on_startup.append(start_background_tasks)
source = expression if program is None else program.read() source = expression if program is None else program.read()
app = web.Application()
for ep, _, _ in KAPOW_PROGRAM.scanString(source): for ep, _, _ in KAPOW_PROGRAM.scanString(source):
methods = ep.method.asList()[0].split('|') methods = ep.method.asList()[0].split('|')
pattern = ''.join(ep.urlpattern) pattern = ''.join(ep.urlpattern)
if ep.body: if ep.body:
log.info(f"Registering [code] methods=%r pattern=%r", methods, pattern)
register_code_endpoint(app, methods, pattern, ep.body) register_code_endpoint(app, methods, pattern, ep.body)
else: else:
log.info(f"Registering [path] methods=%r pattern=%r", methods, pattern)
register_path_endpoint(app, methods, pattern, ep.path) register_path_endpoint(app, methods, pattern, ep.path)
web.run_app(app) web.run_app(app, print=_print)
if __name__ == '__main__': if __name__ == '__main__':
main() main()