Files
kapow/poc/kapow
pancho horrillo dc253f7485 Sync poc to spec
Co-authored-by: Roberto Abdelkader Martínez Pérez <robertomartinezp@gmail.com>
2019-05-28 15:41:54 +02:00

334 lines
11 KiB
Python
Executable File

#!/usr/bin/env python
#
# Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from urllib.parse import urlparse
from uuid import uuid4
import asyncio
import io
import logging
import os
import shlex
import sys
from aiohttp import web, StreamReader
log = logging.getLogger('kapow')
########################################################################
# Resource Management #
########################################################################
CONNECTIONS = {}
class Connection:
"""
Manages the lifecycle of a Kapow! connection.
Behaves like a memory for the "fields" available in HTTP
connections.
"""
def __init__(self, request):
self._stream = None
self._body = io.BytesIO()
self._status = 200
self._headers = dict()
self._cookies = dict()
self.request = request
async def get(self, key):
"""Get the content of the field `key`."""
res = urlparse(key)
def nth(n):
"""Return the nth element in a path."""
return res.path.split('/')[n]
if res.path == 'request/method':
return self.request.method.encode('utf-8')
elif res.path == 'request/body':
return self.request.content
elif res.path == 'request/path':
return self.request.path.encode('utf-8')
elif res.path.startswith('request/matches/'):
return self.request.match_info[nth(2)].encode('utf-8')
elif res.path.startswith('request/params/'):
return self.request.rel_url.query[nth(2)].encode('utf-8')
elif res.path.startswith('request/headers/'):
return self.request.headers[nth(2)].encode('utf-8')
elif res.path.startswith('request/cookies/'):
return self.request.cookies[nth(2)].encode('utf-8')
elif res.path.startswith('request/form/'):
return (await self.request.post())[nth(2)].encode('utf-8')
elif res.path.startswith('request/files/'):
name = nth(2)
content = nth(3) # filename / content
field = (await self.request.post())[name]
if content == 'filename':
try:
return field.filename.encode('utf-8')
except Exception:
return b''
elif content == 'content':
try:
return field.file.read()
except Exception:
return b''
else:
raise ValueError(f'Unknown content type {content!r}')
else:
raise ValueError('Unknown path')
async def set(self, key, content):
"""Set the field `key` with the value in `content`."""
res = urlparse(key)
def nth(n):
return res.path.split('/')[n]
if res.path == 'response/status':
self._status = int((await content.read()).decode('utf-8'))
elif res.path == 'response/body':
self._body.write(await content.read())
elif res.path.startswith('response/headers/'):
clean = (await content.read()).rstrip(b'\n').decode('utf-8')
self._headers[nth(2)] = clean
elif res.path.startswith('response/cookies/'):
clean = (await content.read()).rstrip(b'\n').decode('utf-8')
self._cookies[nth(2)] = clean
elif res.path == 'response/stream':
if self._stream is None:
self._stream = web.StreamResponse(status=self._status,
reason="OK",
headers=self._headers)
for name, value in self._cookies.items():
self._stream.set_cookie(name, value)
await self._stream.prepare(self.request)
chunk = await content.readany()
while chunk:
await self._stream.write(chunk)
chunk = await content.readany()
else:
raise ValueError(f'Unknown path {res.path!r}')
async def append(self, key, content):
"""Append to field `key` the value in `content`."""
raise NotImplementedError()
async def build_response(self):
"""Return the appropriate aiohttp.web.*Response."""
if self._stream is None:
response = web.Response(body=self._body.getvalue(),
status=self._status,
headers=self._headers)
for name, value in self._cookies.items():
response.set_cookie(name, value)
return response
else:
await self._stream.write_eof()
return self._stream
async def get_field(request):
"""Get the value of some HTTP field in the given connection."""
id = request.match_info["id"]
field = request.match_info["field"]
try:
connection = CONNECTIONS[id]
except KeyError:
response = web.HTTPNotFound()
else:
content = await connection.get(field)
if isinstance(content, StreamReader):
response = web.StreamResponse(status=200, reason="OK")
await response.prepare(request)
chunk = await content.readany()
while chunk:
await response.write(chunk)
chunk = await content.readany()
await response.write_eof()
else:
response = web.Response(body=content)
return response
async def set_field(request):
"""Set the value of some HTTP field in the given connection."""
id = request.match_info["id"]
field = request.match_info["field"]
try:
connection = CONNECTIONS[id]
except KeyError:
response = web.HTTPNotFound()
else:
try:
await connection.set(field, request.content)
except ConnectionResetError:
# Raised when trying to write to an already-closed stream.
request.transport.close()
else:
response = web.Response(body=b'')
return response
async def append_field(request):
pass
########################################################################
# Endpoint Execution #
########################################################################
def handle_route(entrypoint, command):
"""
Return an aiohttp route handler that will execute entrypoint and
command in order to manage a Kapow! route.
"""
async def _handle(request):
# Register a new connection to Kapow!
id = "CONN_" + str(uuid4()).replace('-', '_')
connection = CONNECTIONS[id] = Connection(request)
# Run entrypoint + command passing the connection id
executable, *params = shlex.split(entrypoint)
args = ' '.join([executable]
+ [shlex.quote(token) for token in params]
+ [shlex.quote(command)])
try:
shell_task = await asyncio.create_subprocess_shell(
args,
env={**os.environ,
"KAPOW_URL": "http://localhost:8080",
"KAPOW_HANDLER_ID": id
},
stdin=asyncio.subprocess.DEVNULL)
await shell_task.wait()
except:
raise
else:
# Respond when the command finish
return await connection.build_response()
finally:
del CONNECTIONS[id]
return _handle
########################################################################
# Route Management #
########################################################################
async def get_routes(request):
"""Return the list of registered routes."""
return web.json_response(list(request.app.router))
async def append_route(request):
"""Create a new Kapow! route."""
request.app.router._frozen = False
content = await request.json()
name = "ROUTE_" + str(uuid4()).replace('-', '_')
request.app.router.add_route(content["method"],
content["url_pattern"],
handle_route(content["entrypoint"],
content["command"]),
name=name)
print(f'Route created {content["method"]} {content["url_pattern"]}')
return web.json_response(name)
async def delete_route(request):
"""Delete the given Kapow! route."""
id = request.match_info["id"]
route = request.app.router._named_resources.pop(id)
request.app.router._resources.remove(route)
print(f'Route deleted {id}')
return web.json_response(id)
########################################################################
# aiohttp webapp #
########################################################################
async def run_init_script(app):
"""
Run the init script if given, then wait for the shell to finish.
"""
if len(sys.argv) == 1:
# No script given
cmd = "/bin/bash"
elif len(sys.argv) == 2:
cmd = f"/bin/bash --init-file {sys.argv[1]}"
else:
print(f"Usage: {sys.argv[0]} <init-script>")
os._exit(1)
shell_task = await asyncio.create_subprocess_shell(
cmd,
env={**os.environ,
"KAPOW_URL": "http://localhost:8080"
})
await shell_task.wait()
await app.cleanup()
os._exit(shell_task.returncode)
async def start_background_tasks(app):
loop = asyncio.get_running_loop()
app["debug_tasks"] = loop.create_task(run_init_script(app))
def kapow():
"""Start aiohttp app."""
app = web.Application(client_max_size=1024**3)
app.add_routes([
# Control API
web.get('/routes', get_routes),
web.post('/routes', append_route), # TODO: return route index
# web.put('/routes', insert_route), # TODO: return route index
web.delete('/routes/{id}', delete_route),
# Data API
web.get('/handlers/{id}/{field:.*}', get_field),
web.put('/handlers/{id}/{field:.*}', set_field),
])
app.on_startup.append(start_background_tasks)
web.run_app(app)
if __name__ == '__main__':
kapow()