Note that we are leveraging nix-shell to provide portable dependency handling. Co-authored-by: Roberto Abdelkader Martínez Pérez <robertomartinezp@gmail.com>
813 lines
28 KiB
Plaintext
Executable File
813 lines
28 KiB
Plaintext
Executable File
#! /usr/bin/env nix-shell
|
|
#! nix-shell -i python3.7 -p python37 python37Packages.aiohttp python37Packages.requests python37Packages.click
|
|
#
|
|
# TODO: maybe add an option (cli) to supply the external address
|
|
|
|
#
|
|
# Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
from collections import namedtuple
|
|
from urllib.parse import urlparse
|
|
from uuid import uuid4
|
|
import asyncio
|
|
import binascii
|
|
import contextlib
|
|
import datetime
|
|
import io
|
|
import ipaddress
|
|
import json
|
|
import logging
|
|
import os
|
|
import shlex
|
|
import ssl
|
|
import sys
|
|
import tempfile
|
|
import uuid
|
|
|
|
from aiohttp import web, StreamReader
|
|
from aiohttp.web_urldispatcher import UrlDispatcher
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography import x509
|
|
from cryptography.x509.oid import NameOID
|
|
import click
|
|
import requests
|
|
|
|
|
|
log = logging.getLogger('kapow')
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
|
|
KAPOW_CONTROL_URL="https://localhost:8081"
|
|
KAPOW_DATA_URL="http://localhost:8082"
|
|
|
|
########################################################################
|
|
# HTTPS Management #
|
|
########################################################################
|
|
|
|
def generate_ssl_cert(name, alt=None):
|
|
# Generate our key
|
|
key = rsa.generate_private_key(
|
|
public_exponent=65537,
|
|
key_size=2048,
|
|
)
|
|
# Various details about who we are. For a self-signed certificate the
|
|
# subject and issuer are always the same.
|
|
subject = issuer = x509.Name([
|
|
x509.NameAttribute(NameOID.COMMON_NAME, name),
|
|
])
|
|
|
|
cert = x509.CertificateBuilder().subject_name(
|
|
subject
|
|
).issuer_name(
|
|
issuer
|
|
).public_key(
|
|
key.public_key()
|
|
).serial_number(
|
|
x509.random_serial_number()
|
|
).not_valid_before(
|
|
datetime.datetime.utcnow()
|
|
).not_valid_after(
|
|
datetime.datetime.utcnow() + datetime.timedelta(days=3650)
|
|
)
|
|
|
|
if alt is not None:
|
|
try:
|
|
ip = ipaddress.ip_address(alt)
|
|
except:
|
|
cert = cert.add_extension(
|
|
x509.SubjectAlternativeName([x509.DNSName(alt)]),
|
|
critical=True,
|
|
)
|
|
else:
|
|
cert = cert.add_extension(
|
|
x509.SubjectAlternativeName([x509.IPAddress(ip)]),
|
|
critical=True,
|
|
)
|
|
finally:
|
|
cert = cert.add_extension(
|
|
x509.ExtendedKeyUsage(
|
|
[x509.oid.ExtendedKeyUsageOID.SERVER_AUTH],
|
|
),
|
|
critical=True
|
|
)
|
|
else:
|
|
cert=cert.add_extension(
|
|
x509.ExtendedKeyUsage(
|
|
[x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH],
|
|
),
|
|
critical=True
|
|
)
|
|
|
|
cert = cert.sign(key, hashes.SHA256())
|
|
|
|
key_bytes = key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
)
|
|
crt_bytes = cert.public_bytes(serialization.Encoding.PEM)
|
|
|
|
return (key_bytes, crt_bytes)
|
|
|
|
|
|
########################################################################
|
|
# Resource Management #
|
|
########################################################################
|
|
|
|
|
|
CONNECTIONS = {}
|
|
|
|
|
|
class Connection:
|
|
"""
|
|
Manages the lifecycle of a Kapow! connection.
|
|
|
|
Behaves like a memory for the "fields" available in HTTP
|
|
connections.
|
|
|
|
"""
|
|
def __init__(self, request):
|
|
self._stream = None
|
|
self._body = io.BytesIO()
|
|
self._status = 200
|
|
self._headers = dict()
|
|
self._cookies = dict()
|
|
|
|
self.request = request
|
|
|
|
async def get(self, key):
|
|
"""Get the content of the field `key`."""
|
|
res = urlparse(key)
|
|
|
|
def nth(n):
|
|
"""Return the nth element in a path."""
|
|
return res.path.split('/')[n]
|
|
|
|
if res.path == 'request/method':
|
|
return self.request.method.encode('utf-8')
|
|
elif res.path == 'request/body':
|
|
return self.request.content
|
|
elif res.path == 'request/path':
|
|
return self.request.path.encode('utf-8')
|
|
elif res.path == 'request/host':
|
|
return self.request.host.encode('utf-8')
|
|
elif res.path.startswith('request/matches/'):
|
|
return self.request.match_info[nth(2)].encode('utf-8')
|
|
elif res.path.startswith('request/params/'):
|
|
return self.request.rel_url.query[nth(2)].encode('utf-8')
|
|
elif res.path.startswith('request/headers/'):
|
|
return self.request.headers[nth(2)].encode('utf-8')
|
|
elif res.path.startswith('request/cookies/'):
|
|
return self.request.cookies[nth(2)].encode('utf-8')
|
|
elif res.path == 'request/form':
|
|
data = await self.request.post()
|
|
files = [fieldname.encode('utf-8')
|
|
for fieldname, field in data.items()]
|
|
return b'\n'.join(files)
|
|
elif res.path.startswith('request/form/'):
|
|
return (await self.request.post())[nth(2)].encode('utf-8')
|
|
elif res.path == 'request/files':
|
|
data = await self.request.post()
|
|
files = [fieldname.encode('utf-8')
|
|
for fieldname, field in data.items()
|
|
if hasattr(field, 'filename')]
|
|
return b'\n'.join(files)
|
|
elif res.path.startswith('request/files/'):
|
|
name = nth(2)
|
|
content = nth(3) # filename / content
|
|
field = (await self.request.post())[name]
|
|
if content == 'filename':
|
|
try:
|
|
return field.filename.encode('utf-8')
|
|
except Exception:
|
|
return b''
|
|
elif content == 'content':
|
|
try:
|
|
return field.file.read()
|
|
except Exception:
|
|
return b''
|
|
else:
|
|
raise ValueError(f'Unknown content type {content!r}')
|
|
else:
|
|
raise ValueError('Unknown path')
|
|
|
|
async def set(self, key, content):
|
|
"""Set the field `key` with the value in `content`."""
|
|
res = urlparse(key)
|
|
|
|
def nth(n):
|
|
return res.path.split('/')[n]
|
|
|
|
if res.path == 'response/status':
|
|
self._status = int((await content.read()).decode('utf-8'))
|
|
elif res.path == 'response/body':
|
|
self._body.write(await content.read())
|
|
elif res.path.startswith('response/headers/'):
|
|
clean = (await content.read()).rstrip(b'\n').decode('utf-8')
|
|
self._headers[nth(2)] = clean
|
|
elif res.path.startswith('response/cookies/'):
|
|
clean = (await content.read()).rstrip(b'\n').decode('utf-8')
|
|
self._cookies[nth(2)] = clean
|
|
elif res.path == 'response/stream':
|
|
if self._stream is None:
|
|
self._stream = web.StreamResponse(status=self._status,
|
|
reason="OK",
|
|
headers=self._headers)
|
|
for name, value in self._cookies.items():
|
|
self._stream.set_cookie(name, value)
|
|
await self._stream.prepare(self.request)
|
|
|
|
chunk = await content.readany()
|
|
while chunk:
|
|
await self._stream.write(chunk)
|
|
chunk = await content.readany()
|
|
else:
|
|
raise ValueError(f'Unknown path {res.path!r}')
|
|
|
|
async def append(self, key, content):
|
|
"""Append to field `key` the value in `content`."""
|
|
raise NotImplementedError()
|
|
|
|
async def build_response(self):
|
|
"""Return the appropriate aiohttp.web.*Response."""
|
|
if self._stream is None:
|
|
response = web.Response(body=self._body.getvalue(),
|
|
status=self._status,
|
|
headers=self._headers)
|
|
for name, value in self._cookies.items():
|
|
response.set_cookie(name, value)
|
|
return response
|
|
else:
|
|
await self._stream.write_eof()
|
|
return self._stream
|
|
|
|
|
|
async def get_field(request):
|
|
"""Get the value of some HTTP field in the given connection."""
|
|
id = request.match_info["id"]
|
|
field = request.match_info["field"]
|
|
|
|
try:
|
|
connection = CONNECTIONS[id]
|
|
except KeyError:
|
|
response = web.json_response(data=error_body("Handler ID Not Found"), status=404, reason="Not Found")
|
|
else:
|
|
try:
|
|
content = await connection.get(field)
|
|
except ValueError:
|
|
return web.json_response(data=error_body("Invalid Resource Path"), status=400, reason="Bad Request")
|
|
except KeyError:
|
|
return web.json_response(data=error_body("Resource Item Not Found"), status=404, reason="Not Found")
|
|
|
|
if isinstance(content, StreamReader):
|
|
response = web.StreamResponse(status=200, reason="OK")
|
|
await response.prepare(request)
|
|
|
|
chunk = await content.readany()
|
|
while chunk:
|
|
await response.write(chunk)
|
|
chunk = await content.readany()
|
|
|
|
await response.write_eof()
|
|
else:
|
|
response = web.Response(body=content)
|
|
|
|
return response
|
|
|
|
|
|
async def set_field(request):
|
|
"""Set the value of some HTTP field in the given connection."""
|
|
id = request.match_info["id"]
|
|
field = request.match_info["field"]
|
|
|
|
try:
|
|
connection = CONNECTIONS[id]
|
|
except ValueError:
|
|
return web.json_response(data=error_body("Invalid Resource Path"), status=400, reason="Bad Request")
|
|
except KeyError:
|
|
response = web.json_response(data=error_body("Handler ID Not Found"), status=404, reason="Not Found")
|
|
else:
|
|
try:
|
|
await connection.set(field, request.content)
|
|
except ConnectionResetError:
|
|
# Raised when trying to write to an already-closed stream.
|
|
request.transport.close()
|
|
else:
|
|
response = web.Response(body=b'')
|
|
|
|
return response
|
|
|
|
|
|
async def append_field(request):
|
|
pass
|
|
|
|
|
|
########################################################################
|
|
# Endpoint Execution #
|
|
########################################################################
|
|
|
|
|
|
def handle_route(entrypoint, command):
|
|
"""
|
|
Return an aiohttp route handler that will execute entrypoint and
|
|
command in order to manage a Kapow! route.
|
|
|
|
"""
|
|
async def _handle(request):
|
|
# Register a new connection to Kapow!
|
|
id = "CONN_" + str(uuid4()).replace('-', '_')
|
|
connection = CONNECTIONS[id] = Connection(request)
|
|
|
|
# Run entrypoint + command passing the connection id
|
|
executable, *params = shlex.split(entrypoint)
|
|
args = ' '.join([executable]
|
|
+ [shlex.quote(token) for token in params]
|
|
+ [shlex.quote(command)])
|
|
try:
|
|
shell_task = await asyncio.create_subprocess_shell(
|
|
args,
|
|
env={**os.environ,
|
|
"KAPOW_DATA_URL": KAPOW_DATA_URL,
|
|
"KAPOW_HANDLER_ID": id
|
|
},
|
|
stdin=asyncio.subprocess.DEVNULL)
|
|
|
|
await shell_task.wait()
|
|
except:
|
|
raise
|
|
else:
|
|
# Respond when the command finish
|
|
return await connection.build_response()
|
|
finally:
|
|
del CONNECTIONS[id]
|
|
|
|
return _handle
|
|
|
|
|
|
########################################################################
|
|
# Route Management #
|
|
########################################################################
|
|
|
|
|
|
def error_body(reason):
|
|
return {"reason": reason}
|
|
|
|
def get_routes(app):
|
|
async def _get_routes(request):
|
|
"""Return the list of registered routes."""
|
|
data = [{"index": idx,
|
|
"method": r.method,
|
|
"id": r.id,
|
|
"url_pattern": r.path,
|
|
"entrypoint": r.entrypoint,
|
|
"command": r.command}
|
|
for idx, r in enumerate(app["user_routes"])]
|
|
return web.json_response(data)
|
|
return _get_routes
|
|
|
|
|
|
def get_route(app):
|
|
async def _get_route(request):
|
|
"""Return requested registered route."""
|
|
id = request.match_info["id"]
|
|
for idx, r in enumerate(app["user_routes"]):
|
|
if r.id == id:
|
|
return web.json_response({"index": idx,
|
|
"method": r.method,
|
|
"id": r.id,
|
|
"url_pattern": r.path,
|
|
"entrypoint": r.entrypoint,
|
|
"command": r.command})
|
|
else:
|
|
return web.json_response(data=error_body("Route Not Found"), status=404, reason="Not Found")
|
|
return _get_route
|
|
|
|
|
|
def insert_route(app):
|
|
async def _insert_route(request):
|
|
"""Insert a new Kapow! route."""
|
|
try:
|
|
content = await request.json()
|
|
except ValueError:
|
|
return web.json_response(data=error_body("Malformed JSON"), status=400, reason="Bad Request")
|
|
|
|
try:
|
|
index = int(content["index"])
|
|
assert index >= 0
|
|
method = content.get("method", "GET")
|
|
entrypoint = content.get("entrypoint", "/bin/sh -c")
|
|
command = content.get("command", "")
|
|
route = KapowRoute(method=method,
|
|
path=content["url_pattern"],
|
|
id="ROUTE_" + str(uuid4()).replace('-', '_'),
|
|
entrypoint=entrypoint,
|
|
command=command,
|
|
handler=handle_route(entrypoint, command))
|
|
app.change_routes((app["user_routes"][:index]
|
|
+ [route]
|
|
+ app["user_routes"][index:]))
|
|
except (InvalidRouteError, KeyError, AssertionError, ValueError) as exc:
|
|
return web.json_response(data=error_body("Invalid Route"), status=422, reason="Unprocessable Entity")
|
|
else:
|
|
app["user_routes"].insert(index, route)
|
|
return web.json_response({"id": route.id,
|
|
"method": route.method,
|
|
"url_pattern": route.path,
|
|
"entrypoint": route.entrypoint,
|
|
"command": route.command,
|
|
"index": index}, status=201)
|
|
return _insert_route
|
|
|
|
|
|
def append_route(app):
|
|
async def _append_route(request):
|
|
"""Append a new Kapow! route."""
|
|
try:
|
|
content = await request.json()
|
|
except ValueError as exc:
|
|
return web.json_response(data=error_body("Malformed JSON"), status=400, reason="Bad Request")
|
|
|
|
try:
|
|
method = content.get("method", "GET")
|
|
entrypoint = content.get("entrypoint", "/bin/sh -c")
|
|
command = content.get("command", "")
|
|
route = KapowRoute(method=method,
|
|
path=content["url_pattern"],
|
|
id="ROUTE_" + str(uuid4()).replace('-', '_'),
|
|
entrypoint=entrypoint,
|
|
command=command,
|
|
handler=handle_route(entrypoint, command))
|
|
app.change_routes(app["user_routes"] + [route])
|
|
except (InvalidRouteError, KeyError) as exc:
|
|
return web.json_response(data=error_body("Invalid Route"), status=422, reason="Unprocessable Entity")
|
|
else:
|
|
app["user_routes"].append(route)
|
|
return web.json_response({"id": route.id,
|
|
"method": route.method,
|
|
"url_pattern": route.path,
|
|
"entrypoint": route.entrypoint,
|
|
"command": route.command,
|
|
"index": len(app["user_routes"])-1},
|
|
status=201)
|
|
return _append_route
|
|
|
|
|
|
def delete_route(app):
|
|
async def _delete_route(request):
|
|
"""Delete the given Kapow! route."""
|
|
id = request.match_info["id"]
|
|
routes = [r for r in app["user_routes"] if r.id != id]
|
|
if len(routes) == len(app["user_routes"]):
|
|
return web.json_response(data=error_body("Route Not Found"), status=404, reason="Not Found")
|
|
else:
|
|
app.change_routes(routes)
|
|
app["user_routes"] = routes
|
|
return web.Response(status=204, reason="No Content")
|
|
return _delete_route
|
|
|
|
|
|
########################################################################
|
|
# aiohttp webapp #
|
|
########################################################################
|
|
|
|
async def report_result(proc):
|
|
await proc.communicate()
|
|
print(f"Process exited with code {proc.returncode}")
|
|
|
|
|
|
async def run_init_script(app, scripts, interactive):
|
|
"""
|
|
Run the init script if given, then wait for the shell to finish.
|
|
|
|
"""
|
|
for script in scripts:
|
|
try:
|
|
result = await asyncio.create_subprocess_exec(
|
|
script,
|
|
env={**os.environ,
|
|
"KAPOW_CONTROL_CLIENT_CERT": app["client_cert"],
|
|
"KAPOW_CONTROL_CLIENT_KEY": app["client_key"],
|
|
"KAPOW_CONTROL_SERVER_CERT": app["server_cert"],
|
|
"KAPOW_CONTROL_URL": KAPOW_CONTROL_URL,
|
|
})
|
|
except Exception as exc:
|
|
print(exc)
|
|
else:
|
|
asyncio.create_task(report_result(result))
|
|
|
|
|
|
|
|
class InvalidRouteError(Exception):
|
|
pass
|
|
|
|
|
|
class DynamicApplication(web.Application):
|
|
"""
|
|
A wrapper around `aiohttp.web.Application` allowing changing routes
|
|
dynamically.
|
|
|
|
This is not safe as mentioned here:
|
|
https://github.com/aio-libs/aiohttp/issues/3238.
|
|
|
|
On the other hand this is a PoC anyway...
|
|
|
|
"""
|
|
def change_routes(self, routes):
|
|
router = UrlDispatcher()
|
|
try:
|
|
for route in routes:
|
|
router.add_route(route.method,
|
|
route.path,
|
|
route.handler,
|
|
name=route.id)
|
|
except Exception as exc:
|
|
raise InvalidRouteError("Invalid route") from exc
|
|
else:
|
|
self._router = router
|
|
if self._frozen:
|
|
self._router.freeze()
|
|
|
|
|
|
KapowRoute = namedtuple('KapowRoute',
|
|
('method',
|
|
'path',
|
|
'id',
|
|
'entrypoint',
|
|
'command',
|
|
'handler'))
|
|
|
|
|
|
async def start_background_tasks(app):
|
|
global loop
|
|
app["debug_tasks"] = loop.create_task(run_init_script(app, app["scripts"], app["interactive"]))
|
|
|
|
|
|
def reduce_addr(addr):
|
|
"""Drop the port part from an `addr:port` string (IPv6 aware)"""
|
|
addr, *_ = addr.rsplit(':', 1)
|
|
if addr.startswith('[') and addr.endswith(']'):
|
|
return addr[1:-1]
|
|
else:
|
|
return addr
|
|
|
|
|
|
async def start_kapow_server(user_bind,
|
|
control_bind,
|
|
data_bind,
|
|
scripts,
|
|
certfile=None,
|
|
interactive=False,
|
|
keyfile=None,
|
|
control_reachable_addr="localhost:8081"):
|
|
global KAPOW_CONTROL_URL
|
|
KAPOW_CONTROL_URL=f"https://{control_reachable_addr}"
|
|
#
|
|
# USER
|
|
#
|
|
user_app = DynamicApplication(client_max_size=1024**3)
|
|
user_app["user_routes"] = list() # [KapowRoute]
|
|
user_runner = web.AppRunner(user_app)
|
|
await user_runner.setup()
|
|
|
|
ssl_context = None
|
|
if certfile and keyfile:
|
|
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
|
ssl_context.load_cert_chain(certfile, keyfile)
|
|
|
|
user_ip, user_port = user_bind.rsplit(':', 2)
|
|
user_site = web.TCPSite(user_runner, user_ip, int(user_port),
|
|
ssl_context=ssl_context)
|
|
await user_site.start()
|
|
|
|
#
|
|
# CONTROL
|
|
#
|
|
alternate_name = reduce_addr(control_reachable_addr)
|
|
srv_key_bytes, srv_crt_bytes = generate_ssl_cert("control", alternate_name)
|
|
cli_key_bytes, cli_crt_bytes = generate_ssl_cert("control")
|
|
|
|
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
|
with tempfile.NamedTemporaryFile(suffix=".pem", delete=True) as pem_file, \
|
|
tempfile.NamedTemporaryFile(suffix=".key", delete=True) as key_file, \
|
|
tempfile.NamedTemporaryFile(suffix=".pem", delete=True) as cli_crt_file:
|
|
pem_file.write(srv_crt_bytes)
|
|
pem_file.flush()
|
|
key_file.write(srv_key_bytes)
|
|
key_file.flush()
|
|
cli_crt_file.write(cli_crt_bytes)
|
|
cli_crt_file.flush()
|
|
|
|
context.verify_mode = ssl.CERT_REQUIRED
|
|
context.load_cert_chain(pem_file.name, key_file.name)
|
|
context.load_verify_locations(cafile=cli_crt_file.name)
|
|
|
|
control_app = web.Application(client_max_size=1024**3)
|
|
control_app.add_routes([
|
|
web.get('/routes', get_routes(user_app)),
|
|
web.get('/routes/{id}', get_route(user_app)),
|
|
web.post('/routes', append_route(user_app)),
|
|
web.put('/routes', insert_route(user_app)),
|
|
web.delete('/routes/{id}', delete_route(user_app)),
|
|
])
|
|
control_app["scripts"] = scripts
|
|
control_app["client_cert"] = cli_crt_bytes
|
|
control_app["client_key"] = cli_key_bytes
|
|
control_app["server_cert"] = srv_crt_bytes
|
|
control_app["interactive"] = interactive
|
|
control_app.on_startup.append(start_background_tasks)
|
|
|
|
control_runner = web.AppRunner(control_app)
|
|
|
|
await control_runner.setup()
|
|
|
|
control_ip, control_port = control_bind.rsplit(':', 2)
|
|
control_site = web.TCPSite(control_runner, control_ip,
|
|
int(control_port), ssl_context=context)
|
|
await control_site.start()
|
|
|
|
#
|
|
# DATA
|
|
#
|
|
data_app = web.Application(client_max_size=1024**3)
|
|
data_app.add_routes([
|
|
# Data API
|
|
web.get('/handlers/{id}/{field:.*}', get_field),
|
|
web.put('/handlers/{id}/{field:.*}', set_field),
|
|
])
|
|
|
|
data_runner = web.AppRunner(data_app)
|
|
|
|
await data_runner.setup()
|
|
data_ip, data_port = data_bind.rsplit(':', 2)
|
|
data_site = web.TCPSite(data_runner, data_ip, int(data_port))
|
|
await data_site.start()
|
|
|
|
|
|
########################################################################
|
|
# Command Line #
|
|
########################################################################
|
|
|
|
|
|
@click.group()
|
|
@click.pass_context
|
|
def kapow(ctx):
|
|
"""Start aiohttp app."""
|
|
pass
|
|
|
|
|
|
@kapow.command(help="Start a Kapow! server")
|
|
@click.option("--certfile", default=None)
|
|
@click.option("--keyfile", default=None)
|
|
@click.option("--bind", default="0.0.0.0:8080")
|
|
@click.option("--control-bind", default="0.0.0.0:8081")
|
|
@click.option("--data-bind", default="0.0.0.0:8082")
|
|
@click.option("--control-reachable-addr", default="localhost:8081")
|
|
@click.option("-i", "--interactive", is_flag=True)
|
|
@click.argument("scripts", nargs=-1)
|
|
def server(certfile, keyfile, bind, interactive, scripts,
|
|
control_reachable_addr, control_bind, data_bind):
|
|
if bool(certfile) ^ bool(keyfile):
|
|
print("For SSL both 'certfile' and 'keyfile' should be provided.")
|
|
sys.exit(1)
|
|
loop.run_until_complete(
|
|
start_kapow_server(bind,
|
|
control_bind,
|
|
data_bind,
|
|
scripts,
|
|
certfile,
|
|
interactive,
|
|
keyfile,
|
|
control_reachable_addr))
|
|
loop.run_forever()
|
|
|
|
@kapow.group(help="Manage current server HTTP routes")
|
|
def route():
|
|
pass
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def kapow_control_certs():
|
|
with tempfile.NamedTemporaryFile(suffix='.crt', encoding='utf-8', mode='w') as srv_cert, \
|
|
tempfile.NamedTemporaryFile(suffix='.crt', encoding='utf-8', mode='w') as cli_cert, \
|
|
tempfile.NamedTemporaryFile(suffix='.key', encoding='utf-8', mode='w') as cli_key:
|
|
srv_cert.write(os.environ["KAPOW_CONTROL_SERVER_CERT"])
|
|
srv_cert.file.flush()
|
|
cli_cert.write(os.environ["KAPOW_CONTROL_CLIENT_CERT"])
|
|
cli_cert.file.flush()
|
|
cli_key.write(os.environ["KAPOW_CONTROL_CLIENT_KEY"])
|
|
cli_key.file.flush()
|
|
session=requests.Session()
|
|
session.verify=srv_cert.name
|
|
session.cert=(cli_cert.name, cli_key.name)
|
|
yield session
|
|
|
|
|
|
@route.command("add")
|
|
@click.option("-c", "--command", nargs=1)
|
|
@click.option("-e", "--entrypoint", default="/bin/sh -c")
|
|
@click.option("-X", "--method", default="GET")
|
|
@click.option("--url", envvar='KAPOW_CONTROL_URL', default=KAPOW_CONTROL_URL)
|
|
@click.argument("url_pattern", nargs=1)
|
|
@click.argument("command_file", required=False)
|
|
def route_add(url_pattern, entrypoint, command, method, url, command_file):
|
|
with kapow_control_certs() as requests:
|
|
if command:
|
|
# Command is given inline
|
|
source = command
|
|
elif command_file is None:
|
|
# No command
|
|
source = ""
|
|
elif command_file == '-':
|
|
# Read commands from stdin
|
|
source = sys.stdin.read()
|
|
else:
|
|
# Read commands from a file
|
|
with open(command_file, 'r', encoding='utf-8') as handler:
|
|
source = handler.read()
|
|
|
|
response = requests.post(f"{url}/routes",
|
|
json={"method": method,
|
|
"url_pattern": url_pattern,
|
|
"entrypoint": entrypoint,
|
|
"command": source})
|
|
response.raise_for_status()
|
|
print(json.dumps(response.json(), indent=2))
|
|
|
|
|
|
@route.command("remove")
|
|
@click.option("--url", envvar='KAPOW_CONTROL_URL', default=KAPOW_CONTROL_URL)
|
|
@click.argument("route-id")
|
|
def route_remove(route_id, url):
|
|
with kapow_control_certs() as requests:
|
|
response = requests.delete(f"{url}/routes/{route_id}")
|
|
response.raise_for_status()
|
|
|
|
|
|
@route.command("list")
|
|
@click.option("--url", envvar='KAPOW_CONTROL_URL', default=KAPOW_CONTROL_URL)
|
|
@click.argument("route-id", nargs=1, required=False, default=None)
|
|
def route_list(route_id, url):
|
|
with kapow_control_certs() as requests:
|
|
if route_id is None:
|
|
response = requests.get(f"{url}/routes")
|
|
else:
|
|
response = requests.get(f"{url}/routes/{route_id}")
|
|
response.raise_for_status()
|
|
print(json.dumps(response.json(), indent=2))
|
|
|
|
|
|
@kapow.command("set", help="Set data from the current context")
|
|
@click.option("--url", envvar='KAPOW_DATA_URL', default=KAPOW_DATA_URL)
|
|
@click.option("--handler-id", envvar='KAPOW_HANDLER_ID')
|
|
@click.argument("path", nargs=1)
|
|
@click.argument("value", required=False)
|
|
def kapow_set(url, handler_id, path, value):
|
|
if value is None:
|
|
data = sys.stdin.buffer
|
|
else:
|
|
data = value.encode('utf-8')
|
|
|
|
try:
|
|
response = requests.put(f"{url}/handlers/{handler_id}{path}",
|
|
data=data)
|
|
except requests.exceptions.ConnectionError:
|
|
return False
|
|
else:
|
|
response.raise_for_status()
|
|
|
|
|
|
@kapow.command("get", help="Get data from the current context")
|
|
@click.option("--url", envvar='KAPOW_DATA_URL', default=KAPOW_DATA_URL)
|
|
@click.option("--handler-id", envvar='KAPOW_HANDLER_ID')
|
|
@click.argument("path", nargs=1)
|
|
def kapow_get(url, handler_id, path):
|
|
try:
|
|
response = requests.get(f"{url}/handlers/{handler_id}{path}",
|
|
stream=True)
|
|
response.raise_for_status()
|
|
except requests.exceptions.ConnectionError:
|
|
return False
|
|
else:
|
|
for chunk in response.iter_content(chunk_size=None):
|
|
sys.stdout.buffer.write(chunk)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
kapow()
|