Files
kapow/poc/bin/kapow
pancho horrillo 3af5def406 poc: make error_body(reason) return a JSON with extraneous fields
We then ensure that the cucumber steps only test the
presence of the required fields, not just comparing it byte-wise.

Co-authored-by: Roberto Abdelkader Martínez Pérez <robertomartinezp@gmail.com>
2019-11-14 17:40:47 +01:00

640 lines
22 KiB
Python
Executable File

#!/usr/bin/env python
#
# Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from collections import namedtuple
from urllib.parse import urlparse
from uuid import uuid4
import asyncio
import io
import json
import logging
import os
import shlex
import ssl
import sys
from aiohttp import web, StreamReader
from aiohttp.web_urldispatcher import UrlDispatcher
import click
import requests
log = logging.getLogger('kapow')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
########################################################################
# Resource Management #
########################################################################
CONNECTIONS = {}
class Connection:
"""
Manages the lifecycle of a Kapow! connection.
Behaves like a memory for the "fields" available in HTTP
connections.
"""
def __init__(self, request):
self._stream = None
self._body = io.BytesIO()
self._status = 200
self._headers = dict()
self._cookies = dict()
self.request = request
async def get(self, key):
"""Get the content of the field `key`."""
res = urlparse(key)
def nth(n):
"""Return the nth element in a path."""
return res.path.split('/')[n]
if res.path == 'request/method':
return self.request.method.encode('utf-8')
elif res.path == 'request/body':
return self.request.content
elif res.path == 'request/path':
return self.request.path.encode('utf-8')
elif res.path == 'request/host':
return self.request.host.encode('utf-8')
elif res.path.startswith('request/matches/'):
return self.request.match_info[nth(2)].encode('utf-8')
elif res.path.startswith('request/params/'):
return self.request.rel_url.query[nth(2)].encode('utf-8')
elif res.path.startswith('request/headers/'):
return self.request.headers[nth(2)].encode('utf-8')
elif res.path.startswith('request/cookies/'):
return self.request.cookies[nth(2)].encode('utf-8')
elif res.path == 'request/form':
data = await self.request.post()
files = [fieldname.encode('utf-8')
for fieldname, field in data.items()]
return b'\n'.join(files)
elif res.path.startswith('request/form/'):
return (await self.request.post())[nth(2)].encode('utf-8')
elif res.path == 'request/files':
data = await self.request.post()
files = [fieldname.encode('utf-8')
for fieldname, field in data.items()
if hasattr(field, 'filename')]
return b'\n'.join(files)
elif res.path.startswith('request/files/'):
name = nth(2)
content = nth(3) # filename / content
field = (await self.request.post())[name]
if content == 'filename':
try:
return field.filename.encode('utf-8')
except Exception:
return b''
elif content == 'content':
try:
return field.file.read()
except Exception:
return b''
else:
raise ValueError(f'Unknown content type {content!r}')
else:
raise ValueError('Unknown path')
async def set(self, key, content):
"""Set the field `key` with the value in `content`."""
res = urlparse(key)
def nth(n):
return res.path.split('/')[n]
if res.path == 'response/status':
self._status = int((await content.read()).decode('utf-8'))
elif res.path == 'response/body':
self._body.write(await content.read())
elif res.path.startswith('response/headers/'):
clean = (await content.read()).rstrip(b'\n').decode('utf-8')
self._headers[nth(2)] = clean
elif res.path.startswith('response/cookies/'):
clean = (await content.read()).rstrip(b'\n').decode('utf-8')
self._cookies[nth(2)] = clean
elif res.path == 'response/stream':
if self._stream is None:
self._stream = web.StreamResponse(status=self._status,
reason="OK",
headers=self._headers)
for name, value in self._cookies.items():
self._stream.set_cookie(name, value)
await self._stream.prepare(self.request)
chunk = await content.readany()
while chunk:
await self._stream.write(chunk)
chunk = await content.readany()
else:
raise ValueError(f'Unknown path {res.path!r}')
async def append(self, key, content):
"""Append to field `key` the value in `content`."""
raise NotImplementedError()
async def build_response(self):
"""Return the appropriate aiohttp.web.*Response."""
if self._stream is None:
response = web.Response(body=self._body.getvalue(),
status=self._status,
headers=self._headers)
for name, value in self._cookies.items():
response.set_cookie(name, value)
return response
else:
await self._stream.write_eof()
return self._stream
async def get_field(request):
"""Get the value of some HTTP field in the given connection."""
id = request.match_info["id"]
field = request.match_info["field"]
try:
connection = CONNECTIONS[id]
except KeyError:
response = web.json_response(data=error_body("Handler ID Not Found"), status=404, reason="Not Found")
else:
try:
content = await connection.get(field)
except ValueError:
return web.json_response(data=error_body("Invalid Resource Path"), status=400, reason="Bad Request")
except KeyError:
return web.json_response(data=error_body("Resource Item Not Found"), status=404, reason="Not Found")
if isinstance(content, StreamReader):
response = web.StreamResponse(status=200, reason="OK")
await response.prepare(request)
chunk = await content.readany()
while chunk:
await response.write(chunk)
chunk = await content.readany()
await response.write_eof()
else:
response = web.Response(body=content)
return response
async def set_field(request):
"""Set the value of some HTTP field in the given connection."""
id = request.match_info["id"]
field = request.match_info["field"]
try:
connection = CONNECTIONS[id]
except ValueError:
return web.json_response(data=error_body("Invalid Resource Path"), status=400, reason="Bad Request")
except KeyError:
response = web.json_response(data=error_body("Handler ID Not Found"), status=404, reason="Not Found")
else:
try:
await connection.set(field, request.content)
except ConnectionResetError:
# Raised when trying to write to an already-closed stream.
request.transport.close()
else:
response = web.Response(body=b'')
return response
async def append_field(request):
pass
########################################################################
# Endpoint Execution #
########################################################################
def handle_route(entrypoint, command):
"""
Return an aiohttp route handler that will execute entrypoint and
command in order to manage a Kapow! route.
"""
async def _handle(request):
# Register a new connection to Kapow!
id = "CONN_" + str(uuid4()).replace('-', '_')
connection = CONNECTIONS[id] = Connection(request)
# Run entrypoint + command passing the connection id
executable, *params = shlex.split(entrypoint)
args = ' '.join([executable]
+ [shlex.quote(token) for token in params]
+ [shlex.quote(command)])
try:
shell_task = await asyncio.create_subprocess_shell(
args,
env={**os.environ,
"KAPOW_URL": "http://localhost:8081",
"KAPOW_HANDLER_ID": id
},
stdin=asyncio.subprocess.DEVNULL)
await shell_task.wait()
except:
raise
else:
# Respond when the command finish
return await connection.build_response()
finally:
del CONNECTIONS[id]
return _handle
########################################################################
# Route Management #
########################################################################
def error_body(reason):
return {"reason": reason, "foo": "bar"}
def get_routes(app):
async def _get_routes(request):
"""Return the list of registered routes."""
data = [{"index": idx,
"method": r.method,
"id": r.id,
"url_pattern": r.path,
"entrypoint": r.entrypoint,
"command": r.command}
for idx, r in enumerate(app["user_routes"])]
return web.json_response(data)
return _get_routes
def get_route(app):
async def _get_route(request):
"""Return requested registered route."""
id = request.match_info["id"]
for idx, r in enumerate(app["user_routes"]):
if r.id == id:
return web.json_response({"index": idx,
"method": r.method,
"id": r.id,
"url_pattern": r.path,
"entrypoint": r.entrypoint,
"command": r.command})
else:
return web.json_response(data=error_body("Route Not Found"), status=404, reason="Not Found")
return _get_route
def insert_route(app):
async def _insert_route(request):
"""Insert a new Kapow! route."""
try:
content = await request.json()
except ValueError:
return web.json_response(data=error_body("Malformed JSON"), status=400, reason="Bad Request")
try:
index = int(content["index"])
assert index >= 0
method = content.get("method", "GET")
entrypoint = content.get("entrypoint", "/bin/sh -c")
command = content.get("command", "")
route = KapowRoute(method=method,
path=content["url_pattern"],
id="ROUTE_" + str(uuid4()).replace('-', '_'),
entrypoint=entrypoint,
command=command,
handler=handle_route(entrypoint, command))
app.change_routes((app["user_routes"][:index]
+ [route]
+ app["user_routes"][index:]))
except (InvalidRouteError, KeyError, AssertionError, ValueError) as exc:
return web.json_response(data=error_body("Invalid Route"), status=422, reason="Unprocessable Entity")
else:
app["user_routes"].insert(index, route)
return web.json_response({"id": route.id,
"method": route.method,
"url_pattern": route.path,
"entrypoint": route.entrypoint,
"command": route.command,
"index": index}, status=201)
return _insert_route
def append_route(app):
async def _append_route(request):
"""Append a new Kapow! route."""
try:
content = await request.json()
except ValueError as exc:
return web.json_response(data=error_body("Malformed JSON"), status=400, reason="Bad Request")
try:
method = content.get("method", "GET")
entrypoint = content.get("entrypoint", "/bin/sh -c")
command = content.get("command", "")
route = KapowRoute(method=method,
path=content["url_pattern"],
id="ROUTE_" + str(uuid4()).replace('-', '_'),
entrypoint=entrypoint,
command=command,
handler=handle_route(entrypoint, command))
app.change_routes(app["user_routes"] + [route])
except (InvalidRouteError, KeyError) as exc:
return web.json_response(data=error_body("Invalid Route"), status=422, reason="Unprocessable Entity")
else:
app["user_routes"].append(route)
return web.json_response({"id": route.id,
"method": route.method,
"url_pattern": route.path,
"entrypoint": route.entrypoint,
"command": route.command,
"index": len(app["user_routes"])-1},
status=201)
return _append_route
def delete_route(app):
async def _delete_route(request):
"""Delete the given Kapow! route."""
id = request.match_info["id"]
routes = [r for r in app["user_routes"] if r.id != id]
if len(routes) == len(app["user_routes"]):
return web.json_response(data=error_body("Route Not Found"), status=404, reason="Not Found")
else:
app.change_routes(routes)
app["user_routes"] = routes
return web.Response(status=204, reason="No Content")
return _delete_route
########################################################################
# aiohttp webapp #
########################################################################
async def run_init_script(app, scripts, interactive):
"""
Run the init script if given, then wait for the shell to finish.
"""
if not scripts:
# No script given
if not interactive:
return
else:
cmd = "/bin/bash"
else:
def build_filenames():
for filename in scripts:
yield shlex.quote(filename)
yield "<(echo)"
filenames = " ".join(build_filenames())
if interactive:
cmd = f"/bin/bash --init-file <(cat {filenames})"
else:
cmd = f"/bin/bash <(cat {filenames})"
shell_task = await asyncio.create_subprocess_shell(
cmd,
executable="/bin/bash",
env={**os.environ,
"KAPOW_URL": "http://localhost:8081"
})
await shell_task.wait()
if interactive:
await app.cleanup()
os._exit(shell_task.returncode)
class InvalidRouteError(Exception):
pass
class DynamicApplication(web.Application):
"""
A wrapper around `aiohttp.web.Application` allowing changing routes
dynamically.
This is not safe as mentioned here:
https://github.com/aio-libs/aiohttp/issues/3238.
On the other hand this is a PoC anyway...
"""
def change_routes(self, routes):
router = UrlDispatcher()
try:
for route in routes:
router.add_route(route.method,
route.path,
route.handler,
name=route.id)
except Exception as exc:
raise InvalidRouteError("Invalid route") from exc
else:
self._router = router
if self._frozen:
self._router.freeze()
KapowRoute = namedtuple('KapowRoute',
('method',
'path',
'id',
'entrypoint',
'command',
'handler'))
async def start_background_tasks(app):
global loop
app["debug_tasks"] = loop.create_task(run_init_script(app, app["scripts"], app["interactive"]))
async def start_kapow_server(bind, scripts, certfile=None, interactive=False, keyfile=None):
user_app = DynamicApplication(client_max_size=1024**3)
user_app["user_routes"] = list() # [KapowRoute]
user_runner = web.AppRunner(user_app)
await user_runner.setup()
ssl_context = None
if certfile and keyfile:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile, keyfile)
ip, port = bind.split(':')
user_site = web.TCPSite(user_runner, ip, int(port), ssl_context=ssl_context)
await user_site.start()
control_app = web.Application(client_max_size=1024**3)
control_app.add_routes([
# Control API
web.get('/routes', get_routes(user_app)),
web.get('/routes/{id}', get_route(user_app)),
web.post('/routes', append_route(user_app)),
web.put('/routes', insert_route(user_app)),
web.delete('/routes/{id}', delete_route(user_app)),
# Data API
web.get('/handlers/{id}/{field:.*}', get_field),
web.put('/handlers/{id}/{field:.*}', set_field),
])
control_app["scripts"] = scripts
control_app["interactive"] = interactive
control_app.on_startup.append(start_background_tasks)
control_runner = web.AppRunner(control_app)
await control_runner.setup()
control_site = web.TCPSite(control_runner, '127.0.0.1', 8081)
await control_site.start()
########################################################################
# Command Line #
########################################################################
@click.group()
@click.pass_context
def kapow(ctx):
"""Start aiohttp app."""
pass
@kapow.command(help="Start a Kapow! server")
@click.option("--certfile", default=None)
@click.option("--keyfile", default=None)
@click.option("--bind", default="0.0.0.0:8080")
@click.option("-i", "--interactive", is_flag=True)
@click.argument("scripts", nargs=-1)
def server(certfile, keyfile, bind, interactive, scripts):
if bool(certfile) ^ bool(keyfile):
print("For SSL both 'certfile' and 'keyfile' should be provided.")
sys.exit(1)
loop.run_until_complete(start_kapow_server(bind, scripts, certfile, interactive, keyfile))
loop.run_forever()
@kapow.group(help="Manage current server HTTP routes")
def route():
pass
@route.command("add")
@click.option("-c", "--command", nargs=1)
@click.option("-e", "--entrypoint", default="/bin/sh -c")
@click.option("-X", "--method", default="GET")
@click.option("--url", envvar='KAPOW_URL')
@click.argument("url_pattern", nargs=1)
@click.argument("command_file", required=False)
def route_add(url_pattern, entrypoint, command, method, url, command_file):
if command:
# Command is given inline
source = command
elif command_file is None:
# No command
source = ""
elif command_file == '-':
# Read commands from stdin
source = sys.stdin.read()
else:
# Read commands from a file
with open(command_file, 'r', encoding='utf-8') as handler:
source = handler.read()
response = requests.post(f"{url}/routes",
json={"method": method,
"url_pattern": url_pattern,
"entrypoint": entrypoint,
"command": source})
response.raise_for_status()
print(json.dumps(response.json(), indent=2))
@route.command("remove")
@click.option("--url", envvar='KAPOW_URL')
@click.argument("route-id")
def route_remove(route_id, url):
response = requests.delete(f"{url}/routes/{route_id}")
response.raise_for_status()
@route.command("list")
@click.option("--url", envvar='KAPOW_URL')
@click.argument("route-id", nargs=1, required=False, default=None)
def route_list(route_id, url):
if route_id is None:
response = requests.get(f"{url}/routes")
else:
response = requests.get(f"{url}/routes/{route_id}")
response.raise_for_status()
print(json.dumps(response.json(), indent=2))
@kapow.command("set", help="Set data from the current context")
@click.option("--url", envvar='KAPOW_URL')
@click.option("--handler-id", envvar='KAPOW_HANDLER_ID')
@click.argument("path", nargs=1)
@click.argument("value", required=False)
def kapow_set(url, handler_id, path, value):
if value is None:
data = sys.stdin.buffer
else:
data = value.encode('utf-8')
try:
response = requests.put(f"{url}/handlers/{handler_id}{path}",
data=data)
except requests.exceptions.ConnectionError:
return False
else:
response.raise_for_status()
@kapow.command("get", help="Get data from the current context")
@click.option("--url", envvar='KAPOW_URL')
@click.option("--handler-id", envvar='KAPOW_HANDLER_ID')
@click.argument("path", nargs=1)
def kapow_get(url, handler_id, path):
try:
response = requests.get(f"{url}/handlers/{handler_id}{path}",
stream=True)
response.raise_for_status()
except requests.exceptions.ConnectionError:
return False
else:
for chunk in response.iter_content(chunk_size=None):
sys.stdout.buffer.write(chunk)
if __name__ == '__main__':
kapow()