Merge remote-tracking branch 'origin/develop' into server-as-subcommand
This commit is contained in:
Executable
+397
@@ -0,0 +1,397 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
#
|
||||
# Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
from urllib.parse import urlparse
|
||||
from uuid import uuid4
|
||||
import asyncio
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import shlex
|
||||
import sys
|
||||
|
||||
from aiohttp import web, StreamReader
|
||||
import click
|
||||
import requests
|
||||
|
||||
|
||||
log = logging.getLogger('kapow')
|
||||
|
||||
|
||||
########################################################################
|
||||
# Resource Management #
|
||||
########################################################################
|
||||
|
||||
|
||||
CONNECTIONS = {}
|
||||
|
||||
|
||||
class Connection:
|
||||
"""
|
||||
Manages the lifecycle of a Kapow! connection.
|
||||
|
||||
Behaves like a memory for the "fields" available in HTTP
|
||||
connections.
|
||||
|
||||
"""
|
||||
def __init__(self, request):
|
||||
self._stream = None
|
||||
self._body = io.BytesIO()
|
||||
self._status = 200
|
||||
self._headers = dict()
|
||||
self._cookies = dict()
|
||||
|
||||
self.request = request
|
||||
|
||||
async def get(self, key):
|
||||
"""Get the content of the field `key`."""
|
||||
res = urlparse(key)
|
||||
|
||||
def nth(n):
|
||||
"""Return the nth element in a path."""
|
||||
return res.path.split('/')[n]
|
||||
|
||||
if res.path == 'request/method':
|
||||
return self.request.method.encode('utf-8')
|
||||
elif res.path == 'request/body':
|
||||
return self.request.content
|
||||
elif res.path == 'request/path':
|
||||
return self.request.path.encode('utf-8')
|
||||
elif res.path.startswith('request/matches/'):
|
||||
return self.request.match_info[nth(2)].encode('utf-8')
|
||||
elif res.path.startswith('request/params/'):
|
||||
return self.request.rel_url.query[nth(2)].encode('utf-8')
|
||||
elif res.path.startswith('request/headers/'):
|
||||
return self.request.headers[nth(2)].encode('utf-8')
|
||||
elif res.path.startswith('request/cookies/'):
|
||||
return self.request.cookies[nth(2)].encode('utf-8')
|
||||
elif res.path.startswith('request/form/'):
|
||||
return (await self.request.post())[nth(2)].encode('utf-8')
|
||||
elif res.path.startswith('request/files/'):
|
||||
name = nth(2)
|
||||
content = nth(3) # filename / content
|
||||
field = (await self.request.post())[name]
|
||||
if content == 'filename':
|
||||
try:
|
||||
return field.filename.encode('utf-8')
|
||||
except Exception:
|
||||
return b''
|
||||
elif content == 'content':
|
||||
try:
|
||||
return field.file.read()
|
||||
except Exception:
|
||||
return b''
|
||||
else:
|
||||
raise ValueError(f'Unknown content type {content!r}')
|
||||
else:
|
||||
raise ValueError('Unknown path')
|
||||
|
||||
async def set(self, key, content):
|
||||
"""Set the field `key` with the value in `content`."""
|
||||
res = urlparse(key)
|
||||
|
||||
def nth(n):
|
||||
return res.path.split('/')[n]
|
||||
|
||||
if res.path == 'response/status':
|
||||
self._status = int((await content.read()).decode('utf-8'))
|
||||
elif res.path == 'response/body':
|
||||
self._body.write(await content.read())
|
||||
elif res.path.startswith('response/headers/'):
|
||||
clean = (await content.read()).rstrip(b'\n').decode('utf-8')
|
||||
self._headers[nth(2)] = clean
|
||||
elif res.path.startswith('response/cookies/'):
|
||||
clean = (await content.read()).rstrip(b'\n').decode('utf-8')
|
||||
self._cookies[nth(2)] = clean
|
||||
elif res.path == 'response/stream':
|
||||
if self._stream is None:
|
||||
self._stream = web.StreamResponse(status=self._status,
|
||||
reason="OK",
|
||||
headers=self._headers)
|
||||
for name, value in self._cookies.items():
|
||||
self._stream.set_cookie(name, value)
|
||||
await self._stream.prepare(self.request)
|
||||
|
||||
chunk = await content.readany()
|
||||
while chunk:
|
||||
await self._stream.write(chunk)
|
||||
chunk = await content.readany()
|
||||
else:
|
||||
raise ValueError(f'Unknown path {res.path!r}')
|
||||
|
||||
async def append(self, key, content):
|
||||
"""Append to field `key` the value in `content`."""
|
||||
raise NotImplementedError()
|
||||
|
||||
async def build_response(self):
|
||||
"""Return the appropriate aiohttp.web.*Response."""
|
||||
if self._stream is None:
|
||||
response = web.Response(body=self._body.getvalue(),
|
||||
status=self._status,
|
||||
headers=self._headers)
|
||||
for name, value in self._cookies.items():
|
||||
response.set_cookie(name, value)
|
||||
return response
|
||||
else:
|
||||
await self._stream.write_eof()
|
||||
return self._stream
|
||||
|
||||
|
||||
async def get_field(request):
|
||||
"""Get the value of some HTTP field in the given connection."""
|
||||
id = request.match_info["id"]
|
||||
field = request.match_info["field"]
|
||||
|
||||
try:
|
||||
connection = CONNECTIONS[id]
|
||||
except KeyError:
|
||||
response = web.HTTPNotFound()
|
||||
else:
|
||||
content = await connection.get(field)
|
||||
|
||||
if isinstance(content, StreamReader):
|
||||
response = web.StreamResponse(status=200, reason="OK")
|
||||
await response.prepare(request)
|
||||
|
||||
chunk = await content.readany()
|
||||
while chunk:
|
||||
await response.write(chunk)
|
||||
chunk = await content.readany()
|
||||
|
||||
await response.write_eof()
|
||||
else:
|
||||
response = web.Response(body=content)
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def set_field(request):
|
||||
"""Set the value of some HTTP field in the given connection."""
|
||||
id = request.match_info["id"]
|
||||
field = request.match_info["field"]
|
||||
|
||||
try:
|
||||
connection = CONNECTIONS[id]
|
||||
except KeyError:
|
||||
response = web.HTTPNotFound()
|
||||
else:
|
||||
try:
|
||||
await connection.set(field, request.content)
|
||||
except ConnectionResetError:
|
||||
# Raised when trying to write to an already-closed stream.
|
||||
request.transport.close()
|
||||
else:
|
||||
response = web.Response(body=b'')
|
||||
|
||||
return response
|
||||
|
||||
|
||||
async def append_field(request):
|
||||
pass
|
||||
|
||||
|
||||
########################################################################
|
||||
# Endpoint Execution #
|
||||
########################################################################
|
||||
|
||||
|
||||
def handle_route(entrypoint, command):
|
||||
"""
|
||||
Return an aiohttp route handler that will execute entrypoint and
|
||||
command in order to manage a Kapow! route.
|
||||
|
||||
"""
|
||||
async def _handle(request):
|
||||
# Register a new connection to Kapow!
|
||||
id = "CONN_" + str(uuid4()).replace('-', '_')
|
||||
connection = CONNECTIONS[id] = Connection(request)
|
||||
|
||||
# Run entrypoint + command passing the connection id
|
||||
executable, *params = shlex.split(entrypoint)
|
||||
args = ' '.join([executable]
|
||||
+ [shlex.quote(token) for token in params]
|
||||
+ [shlex.quote(command)])
|
||||
try:
|
||||
shell_task = await asyncio.create_subprocess_shell(
|
||||
args,
|
||||
env={**os.environ,
|
||||
"KAPOW_URL": "http://localhost:8080",
|
||||
"KAPOW_HANDLER_ID": id
|
||||
},
|
||||
stdin=asyncio.subprocess.DEVNULL)
|
||||
|
||||
await shell_task.wait()
|
||||
except:
|
||||
raise
|
||||
else:
|
||||
# Respond when the command finish
|
||||
return await connection.build_response()
|
||||
finally:
|
||||
del CONNECTIONS[id]
|
||||
|
||||
return _handle
|
||||
|
||||
|
||||
########################################################################
|
||||
# Route Management #
|
||||
########################################################################
|
||||
|
||||
async def get_routes(request):
|
||||
"""Return the list of registered routes."""
|
||||
return web.json_response(list(request.app.router))
|
||||
|
||||
|
||||
async def append_route(request):
|
||||
"""Create a new Kapow! route."""
|
||||
request.app.router._frozen = False
|
||||
content = await request.json()
|
||||
name = "ROUTE_" + str(uuid4()).replace('-', '_')
|
||||
request.app.router.add_route(content["method"],
|
||||
content["url_pattern"],
|
||||
handle_route(content["entrypoint"],
|
||||
content["command"]),
|
||||
name=name)
|
||||
print(f'Route created {content["method"]} {content["url_pattern"]}')
|
||||
return web.json_response(name)
|
||||
|
||||
|
||||
async def delete_route(request):
|
||||
"""Delete the given Kapow! route."""
|
||||
id = request.match_info["id"]
|
||||
route = request.app.router._named_resources.pop(id)
|
||||
request.app.router._resources.remove(route)
|
||||
print(f'Route deleted {id}')
|
||||
return web.json_response(id)
|
||||
|
||||
|
||||
########################################################################
|
||||
# aiohttp webapp #
|
||||
########################################################################
|
||||
|
||||
|
||||
async def run_init_script(app, scripts):
|
||||
"""
|
||||
Run the init script if given, then wait for the shell to finish.
|
||||
|
||||
"""
|
||||
if not scripts:
|
||||
# No script given
|
||||
cmd = "/bin/bash"
|
||||
else:
|
||||
filenames = " ".join(shlex.quote(f) for f in scripts)
|
||||
cmd = f"/bin/bash --init-file <(cat {filenames})"
|
||||
|
||||
shell_task = await asyncio.create_subprocess_shell(
|
||||
cmd,
|
||||
executable="/bin/bash",
|
||||
env={**os.environ,
|
||||
"KAPOW_URL": "http://localhost:8080"
|
||||
})
|
||||
|
||||
await shell_task.wait()
|
||||
await app.cleanup()
|
||||
os._exit(shell_task.returncode)
|
||||
|
||||
|
||||
async def start_background_tasks(app):
|
||||
loop = asyncio.get_running_loop()
|
||||
app["debug_tasks"] = loop.create_task(run_init_script(app, app["scripts"]))
|
||||
|
||||
|
||||
def start_kapow_server(scripts):
|
||||
app = web.Application(client_max_size=1024**3)
|
||||
app.add_routes([
|
||||
# Control API
|
||||
web.get('/routes', get_routes),
|
||||
web.post('/routes', append_route), # TODO: return route index
|
||||
# web.put('/routes', insert_route), # TODO: return route index
|
||||
web.delete('/routes/{id}', delete_route),
|
||||
|
||||
# Data API
|
||||
web.get('/handlers/{id}/{field:.*}', get_field),
|
||||
web.put('/handlers/{id}/{field:.*}', set_field),
|
||||
])
|
||||
app["scripts"] = scripts
|
||||
app.on_startup.append(start_background_tasks)
|
||||
web.run_app(app)
|
||||
|
||||
|
||||
########################################################################
|
||||
# Command Lineagement #
|
||||
########################################################################
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.pass_context
|
||||
def kapow(ctx):
|
||||
"""Start aiohttp app."""
|
||||
pass
|
||||
|
||||
|
||||
@kapow.command()
|
||||
@click.argument("scripts", nargs=-1)
|
||||
def server(scripts):
|
||||
start_kapow_server(scripts)
|
||||
|
||||
@kapow.group()
|
||||
def route():
|
||||
pass
|
||||
|
||||
|
||||
@route.command()
|
||||
@click.option("-c", "--command", nargs=1)
|
||||
@click.option("-e", "--entrypoint", default="/bin/sh -c")
|
||||
@click.option("-X", "--method", default="GET")
|
||||
@click.option("--url", envvar='KAPOW_URL')
|
||||
@click.argument("url_pattern", nargs=1)
|
||||
@click.argument("command_file", required=False)
|
||||
def add(url_pattern, entrypoint, command, method, url, command_file):
|
||||
if command:
|
||||
# Command is given inline
|
||||
source = command
|
||||
elif command_file is None:
|
||||
# No command
|
||||
source = ""
|
||||
elif command_file == '-':
|
||||
# Read commands from stdin
|
||||
source = sys.stdin.read()
|
||||
else:
|
||||
# Read commands from a file
|
||||
with open(command_file, 'r', encoding='utf-8') as handler:
|
||||
source = handler.read()
|
||||
|
||||
response = requests.post(f"{url}/routes",
|
||||
json={"method": method,
|
||||
"url_pattern": url_pattern,
|
||||
"entrypoint": entrypoint,
|
||||
"command": source})
|
||||
response.raise_for_status()
|
||||
print(response.json())
|
||||
|
||||
|
||||
@route.command()
|
||||
@click.option("--url", envvar='KAPOW_URL')
|
||||
@click.argument("route-id")
|
||||
def remove(route_id, url):
|
||||
response = requests.delete(f"{url}/routes/{route_id}")
|
||||
response.raise_for_status()
|
||||
print(response.json())
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
kapow()
|
||||
Reference in New Issue
Block a user