Files
kapow/poc/bin/kapow
2019-08-02 10:34:27 +02:00

449 lines
15 KiB
Python
Executable File

#!/usr/bin/env python
#
# Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from urllib.parse import urlparse
from itertools import repeat
from uuid import uuid4
import asyncio
import io
import logging
import os
import shlex
import ssl
import sys
from aiohttp import web, StreamReader
import click
import requests
log = logging.getLogger('kapow')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
########################################################################
# Resource Management #
########################################################################
CONNECTIONS = {}
class Connection:
"""
Manages the lifecycle of a Kapow! connection.
Behaves like a memory for the "fields" available in HTTP
connections.
"""
def __init__(self, request):
self._stream = None
self._body = io.BytesIO()
self._status = 200
self._headers = dict()
self._cookies = dict()
self.request = request
async def get(self, key):
"""Get the content of the field `key`."""
res = urlparse(key)
def nth(n):
"""Return the nth element in a path."""
return res.path.split('/')[n]
if res.path == 'request/method':
return self.request.method.encode('utf-8')
elif res.path == 'request/body':
return self.request.content
elif res.path == 'request/path':
return self.request.path.encode('utf-8')
elif res.path == 'request/host':
return self.request.host.encode('utf-8')
elif res.path.startswith('request/matches/'):
return self.request.match_info[nth(2)].encode('utf-8')
elif res.path.startswith('request/params/'):
return self.request.rel_url.query[nth(2)].encode('utf-8')
elif res.path.startswith('request/headers/'):
return self.request.headers[nth(2)].encode('utf-8')
elif res.path.startswith('request/cookies/'):
return self.request.cookies[nth(2)].encode('utf-8')
elif res.path == 'request/form':
data = await self.request.post()
files = [fieldname.encode('utf-8')
for fieldname, field in data.items()]
return b'\n'.join(files)
elif res.path.startswith('request/form/'):
return (await self.request.post())[nth(2)].encode('utf-8')
elif res.path == 'request/files':
data = await self.request.post()
files = [fieldname.encode('utf-8')
for fieldname, field in data.items()
if hasattr(field, 'filename')]
return b'\n'.join(files)
elif res.path.startswith('request/files/'):
name = nth(2)
content = nth(3) # filename / content
field = (await self.request.post())[name]
if content == 'filename':
try:
return field.filename.encode('utf-8')
except Exception:
return b''
elif content == 'content':
try:
return field.file.read()
except Exception:
return b''
else:
raise ValueError(f'Unknown content type {content!r}')
else:
raise ValueError('Unknown path')
async def set(self, key, content):
"""Set the field `key` with the value in `content`."""
res = urlparse(key)
def nth(n):
return res.path.split('/')[n]
if res.path == 'response/status':
self._status = int((await content.read()).decode('utf-8'))
elif res.path == 'response/body':
self._body.write(await content.read())
elif res.path.startswith('response/headers/'):
clean = (await content.read()).rstrip(b'\n').decode('utf-8')
self._headers[nth(2)] = clean
elif res.path.startswith('response/cookies/'):
clean = (await content.read()).rstrip(b'\n').decode('utf-8')
self._cookies[nth(2)] = clean
elif res.path == 'response/stream':
if self._stream is None:
self._stream = web.StreamResponse(status=self._status,
reason="OK",
headers=self._headers)
for name, value in self._cookies.items():
self._stream.set_cookie(name, value)
await self._stream.prepare(self.request)
chunk = await content.readany()
while chunk:
await self._stream.write(chunk)
chunk = await content.readany()
else:
raise ValueError(f'Unknown path {res.path!r}')
async def append(self, key, content):
"""Append to field `key` the value in `content`."""
raise NotImplementedError()
async def build_response(self):
"""Return the appropriate aiohttp.web.*Response."""
if self._stream is None:
response = web.Response(body=self._body.getvalue(),
status=self._status,
headers=self._headers)
for name, value in self._cookies.items():
response.set_cookie(name, value)
return response
else:
await self._stream.write_eof()
return self._stream
async def get_field(request):
"""Get the value of some HTTP field in the given connection."""
id = request.match_info["id"]
field = request.match_info["field"]
try:
connection = CONNECTIONS[id]
except KeyError:
response = web.HTTPNotFound()
else:
content = await connection.get(field)
if isinstance(content, StreamReader):
response = web.StreamResponse(status=200, reason="OK")
await response.prepare(request)
chunk = await content.readany()
while chunk:
await response.write(chunk)
chunk = await content.readany()
await response.write_eof()
else:
response = web.Response(body=content)
return response
async def set_field(request):
"""Set the value of some HTTP field in the given connection."""
id = request.match_info["id"]
field = request.match_info["field"]
try:
connection = CONNECTIONS[id]
except KeyError:
response = web.HTTPNotFound()
else:
try:
await connection.set(field, request.content)
except ConnectionResetError:
# Raised when trying to write to an already-closed stream.
request.transport.close()
else:
response = web.Response(body=b'')
return response
async def append_field(request):
pass
########################################################################
# Endpoint Execution #
########################################################################
def handle_route(entrypoint, command):
"""
Return an aiohttp route handler that will execute entrypoint and
command in order to manage a Kapow! route.
"""
async def _handle(request):
# Register a new connection to Kapow!
id = "CONN_" + str(uuid4()).replace('-', '_')
connection = CONNECTIONS[id] = Connection(request)
# Run entrypoint + command passing the connection id
executable, *params = shlex.split(entrypoint)
args = ' '.join([executable]
+ [shlex.quote(token) for token in params]
+ [shlex.quote(command)])
try:
shell_task = await asyncio.create_subprocess_shell(
args,
env={**os.environ,
"KAPOW_URL": "http://localhost:8081",
"KAPOW_HANDLER_ID": id
},
stdin=asyncio.subprocess.DEVNULL)
await shell_task.wait()
except:
raise
else:
# Respond when the command finish
return await connection.build_response()
finally:
del CONNECTIONS[id]
return _handle
########################################################################
# Route Management #
########################################################################
def get_routes(app):
async def _get_routes(request):
"""Return the list of registered routes."""
return web.json_response(list(app.router))
return _get_routes
def append_route(app):
async def _append_route(request):
"""Create a new Kapow! route."""
app.router._frozen = False
content = await request.json()
name = "ROUTE_" + str(uuid4()).replace('-', '_')
app.router.add_route(content["method"],
content["url_pattern"],
handle_route(content["entrypoint"],
content["command"]),
name=name)
print(f'Route created {content["method"]} {content["url_pattern"]}')
return web.json_response(name)
return _append_route
def delete_route(app):
async def _delete_route(request):
"""Delete the given Kapow! route."""
id = request.match_info["id"]
route = app.router._named_resources.pop(id)
app.router._resources.remove(route)
print(f'Route deleted {id}')
return web.json_response(id)
return _delete_route
########################################################################
# aiohttp webapp #
########################################################################
async def run_init_script(app, scripts):
"""
Run the init script if given, then wait for the shell to finish.
"""
if not scripts:
# No script given
cmd = "/bin/bash"
else:
def build_filenames():
for filename in scripts:
yield shlex.quote(filename)
yield "<(echo)"
filenames = " ".join(build_filenames())
cmd = f"/bin/bash --init-file <(cat {filenames})"
shell_task = await asyncio.create_subprocess_shell(
cmd,
executable="/bin/bash",
env={**os.environ,
"KAPOW_URL": "http://localhost:8081"
})
await shell_task.wait()
await app.cleanup()
os._exit(shell_task.returncode)
async def start_background_tasks(app):
global loop
app["debug_tasks"] = loop.create_task(run_init_script(app, app["scripts"]))
async def start_kapow_server(bind, scripts, certfile=None, keyfile=None):
user_app = web.Application(client_max_size=1024**3)
user_runner = web.AppRunner(user_app)
await user_runner.setup()
ssl_context = None
if certfile and keyfile:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile, keyfile)
ip, port = bind.split(':')
user_site = web.TCPSite(user_runner, ip, int(port), ssl_context=ssl_context)
await user_site.start()
control_app = web.Application(client_max_size=1024**3)
control_app.add_routes([
# Control API
web.get('/routes', get_routes(user_app)),
web.post('/routes', append_route(user_app)), # TODO: return route index
# web.put('/routes', insert_route(user_app)), # TODO: return route index
web.delete('/routes/{id}', delete_route(user_app)),
# Data API
web.get('/handlers/{id}/{field:.*}', get_field),
web.put('/handlers/{id}/{field:.*}', set_field),
])
control_app["scripts"] = scripts
control_app.on_startup.append(start_background_tasks)
control_runner = web.AppRunner(control_app)
await control_runner.setup()
control_site = web.TCPSite(control_runner, '127.0.0.1', 8081)
await control_site.start()
########################################################################
# Command Line #
########################################################################
@click.group()
@click.pass_context
def kapow(ctx):
"""Start aiohttp app."""
pass
@kapow.command()
@click.option("--certfile", default=None)
@click.option("--keyfile", default=None)
@click.option("--bind", default="0.0.0.0:8080")
@click.argument("scripts", nargs=-1)
def server(certfile, keyfile, bind, scripts):
if bool(certfile) ^ bool(keyfile):
print("For SSL both 'certfile' and 'keyfile' should be provided.")
sys.exit(1)
loop.run_until_complete(start_kapow_server(bind, scripts, certfile, keyfile))
loop.run_forever()
@kapow.group()
def route():
pass
@route.command()
@click.option("-c", "--command", nargs=1)
@click.option("-e", "--entrypoint", default="/bin/sh -c")
@click.option("-X", "--method", default="GET")
@click.option("--url", envvar='KAPOW_URL')
@click.argument("url_pattern", nargs=1)
@click.argument("command_file", required=False)
def add(url_pattern, entrypoint, command, method, url, command_file):
if command:
# Command is given inline
source = command
elif command_file is None:
# No command
source = ""
elif command_file == '-':
# Read commands from stdin
source = sys.stdin.read()
else:
# Read commands from a file
with open(command_file, 'r', encoding='utf-8') as handler:
source = handler.read()
response = requests.post(f"{url}/routes",
json={"method": method,
"url_pattern": url_pattern,
"entrypoint": entrypoint,
"command": source})
response.raise_for_status()
print(response.json())
@route.command()
@click.option("--url", envvar='KAPOW_URL')
@click.argument("route-id")
def remove(route_id, url):
response = requests.delete(f"{url}/routes/{route_id}")
response.raise_for_status()
print(response.json())
if __name__ == '__main__':
kapow()