Files
pancho horrillo b7b55d2f3b test(poc): Secure Control API using cross-pinning mTLS
Note that we are leveraging nix-shell to provide portable dependency
handling.

Co-authored-by: Roberto Abdelkader Martínez Pérez <robertomartinezp@gmail.com>
2021-03-12 17:00:53 +01:00

813 lines
28 KiB
Plaintext
Executable File

#! /usr/bin/env nix-shell
#! nix-shell -i python3.7 -p python37 python37Packages.aiohttp python37Packages.requests python37Packages.click
#
# TODO: maybe add an option (cli) to supply the external address
#
# Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from collections import namedtuple
from urllib.parse import urlparse
from uuid import uuid4
import asyncio
import binascii
import contextlib
import datetime
import io
import ipaddress
import json
import logging
import os
import shlex
import ssl
import sys
import tempfile
import uuid
from aiohttp import web, StreamReader
from aiohttp.web_urldispatcher import UrlDispatcher
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography import x509
from cryptography.x509.oid import NameOID
import click
import requests
log = logging.getLogger('kapow')
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
KAPOW_CONTROL_URL="https://localhost:8081"
KAPOW_DATA_URL="http://localhost:8082"
########################################################################
# HTTPS Management #
########################################################################
def generate_ssl_cert(name, alt=None):
# Generate our key
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
)
# Various details about who we are. For a self-signed certificate the
# subject and issuer are always the same.
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, name),
])
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.utcnow()
).not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=3650)
)
if alt is not None:
try:
ip = ipaddress.ip_address(alt)
except:
cert = cert.add_extension(
x509.SubjectAlternativeName([x509.DNSName(alt)]),
critical=True,
)
else:
cert = cert.add_extension(
x509.SubjectAlternativeName([x509.IPAddress(ip)]),
critical=True,
)
finally:
cert = cert.add_extension(
x509.ExtendedKeyUsage(
[x509.oid.ExtendedKeyUsageOID.SERVER_AUTH],
),
critical=True
)
else:
cert=cert.add_extension(
x509.ExtendedKeyUsage(
[x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH],
),
critical=True
)
cert = cert.sign(key, hashes.SHA256())
key_bytes = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
crt_bytes = cert.public_bytes(serialization.Encoding.PEM)
return (key_bytes, crt_bytes)
########################################################################
# Resource Management #
########################################################################
CONNECTIONS = {}
class Connection:
"""
Manages the lifecycle of a Kapow! connection.
Behaves like a memory for the "fields" available in HTTP
connections.
"""
def __init__(self, request):
self._stream = None
self._body = io.BytesIO()
self._status = 200
self._headers = dict()
self._cookies = dict()
self.request = request
async def get(self, key):
"""Get the content of the field `key`."""
res = urlparse(key)
def nth(n):
"""Return the nth element in a path."""
return res.path.split('/')[n]
if res.path == 'request/method':
return self.request.method.encode('utf-8')
elif res.path == 'request/body':
return self.request.content
elif res.path == 'request/path':
return self.request.path.encode('utf-8')
elif res.path == 'request/host':
return self.request.host.encode('utf-8')
elif res.path.startswith('request/matches/'):
return self.request.match_info[nth(2)].encode('utf-8')
elif res.path.startswith('request/params/'):
return self.request.rel_url.query[nth(2)].encode('utf-8')
elif res.path.startswith('request/headers/'):
return self.request.headers[nth(2)].encode('utf-8')
elif res.path.startswith('request/cookies/'):
return self.request.cookies[nth(2)].encode('utf-8')
elif res.path == 'request/form':
data = await self.request.post()
files = [fieldname.encode('utf-8')
for fieldname, field in data.items()]
return b'\n'.join(files)
elif res.path.startswith('request/form/'):
return (await self.request.post())[nth(2)].encode('utf-8')
elif res.path == 'request/files':
data = await self.request.post()
files = [fieldname.encode('utf-8')
for fieldname, field in data.items()
if hasattr(field, 'filename')]
return b'\n'.join(files)
elif res.path.startswith('request/files/'):
name = nth(2)
content = nth(3) # filename / content
field = (await self.request.post())[name]
if content == 'filename':
try:
return field.filename.encode('utf-8')
except Exception:
return b''
elif content == 'content':
try:
return field.file.read()
except Exception:
return b''
else:
raise ValueError(f'Unknown content type {content!r}')
else:
raise ValueError('Unknown path')
async def set(self, key, content):
"""Set the field `key` with the value in `content`."""
res = urlparse(key)
def nth(n):
return res.path.split('/')[n]
if res.path == 'response/status':
self._status = int((await content.read()).decode('utf-8'))
elif res.path == 'response/body':
self._body.write(await content.read())
elif res.path.startswith('response/headers/'):
clean = (await content.read()).rstrip(b'\n').decode('utf-8')
self._headers[nth(2)] = clean
elif res.path.startswith('response/cookies/'):
clean = (await content.read()).rstrip(b'\n').decode('utf-8')
self._cookies[nth(2)] = clean
elif res.path == 'response/stream':
if self._stream is None:
self._stream = web.StreamResponse(status=self._status,
reason="OK",
headers=self._headers)
for name, value in self._cookies.items():
self._stream.set_cookie(name, value)
await self._stream.prepare(self.request)
chunk = await content.readany()
while chunk:
await self._stream.write(chunk)
chunk = await content.readany()
else:
raise ValueError(f'Unknown path {res.path!r}')
async def append(self, key, content):
"""Append to field `key` the value in `content`."""
raise NotImplementedError()
async def build_response(self):
"""Return the appropriate aiohttp.web.*Response."""
if self._stream is None:
response = web.Response(body=self._body.getvalue(),
status=self._status,
headers=self._headers)
for name, value in self._cookies.items():
response.set_cookie(name, value)
return response
else:
await self._stream.write_eof()
return self._stream
async def get_field(request):
"""Get the value of some HTTP field in the given connection."""
id = request.match_info["id"]
field = request.match_info["field"]
try:
connection = CONNECTIONS[id]
except KeyError:
response = web.json_response(data=error_body("Handler ID Not Found"), status=404, reason="Not Found")
else:
try:
content = await connection.get(field)
except ValueError:
return web.json_response(data=error_body("Invalid Resource Path"), status=400, reason="Bad Request")
except KeyError:
return web.json_response(data=error_body("Resource Item Not Found"), status=404, reason="Not Found")
if isinstance(content, StreamReader):
response = web.StreamResponse(status=200, reason="OK")
await response.prepare(request)
chunk = await content.readany()
while chunk:
await response.write(chunk)
chunk = await content.readany()
await response.write_eof()
else:
response = web.Response(body=content)
return response
async def set_field(request):
"""Set the value of some HTTP field in the given connection."""
id = request.match_info["id"]
field = request.match_info["field"]
try:
connection = CONNECTIONS[id]
except ValueError:
return web.json_response(data=error_body("Invalid Resource Path"), status=400, reason="Bad Request")
except KeyError:
response = web.json_response(data=error_body("Handler ID Not Found"), status=404, reason="Not Found")
else:
try:
await connection.set(field, request.content)
except ConnectionResetError:
# Raised when trying to write to an already-closed stream.
request.transport.close()
else:
response = web.Response(body=b'')
return response
async def append_field(request):
pass
########################################################################
# Endpoint Execution #
########################################################################
def handle_route(entrypoint, command):
"""
Return an aiohttp route handler that will execute entrypoint and
command in order to manage a Kapow! route.
"""
async def _handle(request):
# Register a new connection to Kapow!
id = "CONN_" + str(uuid4()).replace('-', '_')
connection = CONNECTIONS[id] = Connection(request)
# Run entrypoint + command passing the connection id
executable, *params = shlex.split(entrypoint)
args = ' '.join([executable]
+ [shlex.quote(token) for token in params]
+ [shlex.quote(command)])
try:
shell_task = await asyncio.create_subprocess_shell(
args,
env={**os.environ,
"KAPOW_DATA_URL": KAPOW_DATA_URL,
"KAPOW_HANDLER_ID": id
},
stdin=asyncio.subprocess.DEVNULL)
await shell_task.wait()
except:
raise
else:
# Respond when the command finish
return await connection.build_response()
finally:
del CONNECTIONS[id]
return _handle
########################################################################
# Route Management #
########################################################################
def error_body(reason):
return {"reason": reason}
def get_routes(app):
async def _get_routes(request):
"""Return the list of registered routes."""
data = [{"index": idx,
"method": r.method,
"id": r.id,
"url_pattern": r.path,
"entrypoint": r.entrypoint,
"command": r.command}
for idx, r in enumerate(app["user_routes"])]
return web.json_response(data)
return _get_routes
def get_route(app):
async def _get_route(request):
"""Return requested registered route."""
id = request.match_info["id"]
for idx, r in enumerate(app["user_routes"]):
if r.id == id:
return web.json_response({"index": idx,
"method": r.method,
"id": r.id,
"url_pattern": r.path,
"entrypoint": r.entrypoint,
"command": r.command})
else:
return web.json_response(data=error_body("Route Not Found"), status=404, reason="Not Found")
return _get_route
def insert_route(app):
async def _insert_route(request):
"""Insert a new Kapow! route."""
try:
content = await request.json()
except ValueError:
return web.json_response(data=error_body("Malformed JSON"), status=400, reason="Bad Request")
try:
index = int(content["index"])
assert index >= 0
method = content.get("method", "GET")
entrypoint = content.get("entrypoint", "/bin/sh -c")
command = content.get("command", "")
route = KapowRoute(method=method,
path=content["url_pattern"],
id="ROUTE_" + str(uuid4()).replace('-', '_'),
entrypoint=entrypoint,
command=command,
handler=handle_route(entrypoint, command))
app.change_routes((app["user_routes"][:index]
+ [route]
+ app["user_routes"][index:]))
except (InvalidRouteError, KeyError, AssertionError, ValueError) as exc:
return web.json_response(data=error_body("Invalid Route"), status=422, reason="Unprocessable Entity")
else:
app["user_routes"].insert(index, route)
return web.json_response({"id": route.id,
"method": route.method,
"url_pattern": route.path,
"entrypoint": route.entrypoint,
"command": route.command,
"index": index}, status=201)
return _insert_route
def append_route(app):
async def _append_route(request):
"""Append a new Kapow! route."""
try:
content = await request.json()
except ValueError as exc:
return web.json_response(data=error_body("Malformed JSON"), status=400, reason="Bad Request")
try:
method = content.get("method", "GET")
entrypoint = content.get("entrypoint", "/bin/sh -c")
command = content.get("command", "")
route = KapowRoute(method=method,
path=content["url_pattern"],
id="ROUTE_" + str(uuid4()).replace('-', '_'),
entrypoint=entrypoint,
command=command,
handler=handle_route(entrypoint, command))
app.change_routes(app["user_routes"] + [route])
except (InvalidRouteError, KeyError) as exc:
return web.json_response(data=error_body("Invalid Route"), status=422, reason="Unprocessable Entity")
else:
app["user_routes"].append(route)
return web.json_response({"id": route.id,
"method": route.method,
"url_pattern": route.path,
"entrypoint": route.entrypoint,
"command": route.command,
"index": len(app["user_routes"])-1},
status=201)
return _append_route
def delete_route(app):
async def _delete_route(request):
"""Delete the given Kapow! route."""
id = request.match_info["id"]
routes = [r for r in app["user_routes"] if r.id != id]
if len(routes) == len(app["user_routes"]):
return web.json_response(data=error_body("Route Not Found"), status=404, reason="Not Found")
else:
app.change_routes(routes)
app["user_routes"] = routes
return web.Response(status=204, reason="No Content")
return _delete_route
########################################################################
# aiohttp webapp #
########################################################################
async def report_result(proc):
await proc.communicate()
print(f"Process exited with code {proc.returncode}")
async def run_init_script(app, scripts, interactive):
"""
Run the init script if given, then wait for the shell to finish.
"""
for script in scripts:
try:
result = await asyncio.create_subprocess_exec(
script,
env={**os.environ,
"KAPOW_CONTROL_CLIENT_CERT": app["client_cert"],
"KAPOW_CONTROL_CLIENT_KEY": app["client_key"],
"KAPOW_CONTROL_SERVER_CERT": app["server_cert"],
"KAPOW_CONTROL_URL": KAPOW_CONTROL_URL,
})
except Exception as exc:
print(exc)
else:
asyncio.create_task(report_result(result))
class InvalidRouteError(Exception):
pass
class DynamicApplication(web.Application):
"""
A wrapper around `aiohttp.web.Application` allowing changing routes
dynamically.
This is not safe as mentioned here:
https://github.com/aio-libs/aiohttp/issues/3238.
On the other hand this is a PoC anyway...
"""
def change_routes(self, routes):
router = UrlDispatcher()
try:
for route in routes:
router.add_route(route.method,
route.path,
route.handler,
name=route.id)
except Exception as exc:
raise InvalidRouteError("Invalid route") from exc
else:
self._router = router
if self._frozen:
self._router.freeze()
KapowRoute = namedtuple('KapowRoute',
('method',
'path',
'id',
'entrypoint',
'command',
'handler'))
async def start_background_tasks(app):
global loop
app["debug_tasks"] = loop.create_task(run_init_script(app, app["scripts"], app["interactive"]))
def reduce_addr(addr):
"""Drop the port part from an `addr:port` string (IPv6 aware)"""
addr, *_ = addr.rsplit(':', 1)
if addr.startswith('[') and addr.endswith(']'):
return addr[1:-1]
else:
return addr
async def start_kapow_server(user_bind,
control_bind,
data_bind,
scripts,
certfile=None,
interactive=False,
keyfile=None,
control_reachable_addr="localhost:8081"):
global KAPOW_CONTROL_URL
KAPOW_CONTROL_URL=f"https://{control_reachable_addr}"
#
# USER
#
user_app = DynamicApplication(client_max_size=1024**3)
user_app["user_routes"] = list() # [KapowRoute]
user_runner = web.AppRunner(user_app)
await user_runner.setup()
ssl_context = None
if certfile and keyfile:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile, keyfile)
user_ip, user_port = user_bind.rsplit(':', 2)
user_site = web.TCPSite(user_runner, user_ip, int(user_port),
ssl_context=ssl_context)
await user_site.start()
#
# CONTROL
#
alternate_name = reduce_addr(control_reachable_addr)
srv_key_bytes, srv_crt_bytes = generate_ssl_cert("control", alternate_name)
cli_key_bytes, cli_crt_bytes = generate_ssl_cert("control")
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
with tempfile.NamedTemporaryFile(suffix=".pem", delete=True) as pem_file, \
tempfile.NamedTemporaryFile(suffix=".key", delete=True) as key_file, \
tempfile.NamedTemporaryFile(suffix=".pem", delete=True) as cli_crt_file:
pem_file.write(srv_crt_bytes)
pem_file.flush()
key_file.write(srv_key_bytes)
key_file.flush()
cli_crt_file.write(cli_crt_bytes)
cli_crt_file.flush()
context.verify_mode = ssl.CERT_REQUIRED
context.load_cert_chain(pem_file.name, key_file.name)
context.load_verify_locations(cafile=cli_crt_file.name)
control_app = web.Application(client_max_size=1024**3)
control_app.add_routes([
web.get('/routes', get_routes(user_app)),
web.get('/routes/{id}', get_route(user_app)),
web.post('/routes', append_route(user_app)),
web.put('/routes', insert_route(user_app)),
web.delete('/routes/{id}', delete_route(user_app)),
])
control_app["scripts"] = scripts
control_app["client_cert"] = cli_crt_bytes
control_app["client_key"] = cli_key_bytes
control_app["server_cert"] = srv_crt_bytes
control_app["interactive"] = interactive
control_app.on_startup.append(start_background_tasks)
control_runner = web.AppRunner(control_app)
await control_runner.setup()
control_ip, control_port = control_bind.rsplit(':', 2)
control_site = web.TCPSite(control_runner, control_ip,
int(control_port), ssl_context=context)
await control_site.start()
#
# DATA
#
data_app = web.Application(client_max_size=1024**3)
data_app.add_routes([
# Data API
web.get('/handlers/{id}/{field:.*}', get_field),
web.put('/handlers/{id}/{field:.*}', set_field),
])
data_runner = web.AppRunner(data_app)
await data_runner.setup()
data_ip, data_port = data_bind.rsplit(':', 2)
data_site = web.TCPSite(data_runner, data_ip, int(data_port))
await data_site.start()
########################################################################
# Command Line #
########################################################################
@click.group()
@click.pass_context
def kapow(ctx):
"""Start aiohttp app."""
pass
@kapow.command(help="Start a Kapow! server")
@click.option("--certfile", default=None)
@click.option("--keyfile", default=None)
@click.option("--bind", default="0.0.0.0:8080")
@click.option("--control-bind", default="0.0.0.0:8081")
@click.option("--data-bind", default="0.0.0.0:8082")
@click.option("--control-reachable-addr", default="localhost:8081")
@click.option("-i", "--interactive", is_flag=True)
@click.argument("scripts", nargs=-1)
def server(certfile, keyfile, bind, interactive, scripts,
control_reachable_addr, control_bind, data_bind):
if bool(certfile) ^ bool(keyfile):
print("For SSL both 'certfile' and 'keyfile' should be provided.")
sys.exit(1)
loop.run_until_complete(
start_kapow_server(bind,
control_bind,
data_bind,
scripts,
certfile,
interactive,
keyfile,
control_reachable_addr))
loop.run_forever()
@kapow.group(help="Manage current server HTTP routes")
def route():
pass
@contextlib.contextmanager
def kapow_control_certs():
with tempfile.NamedTemporaryFile(suffix='.crt', encoding='utf-8', mode='w') as srv_cert, \
tempfile.NamedTemporaryFile(suffix='.crt', encoding='utf-8', mode='w') as cli_cert, \
tempfile.NamedTemporaryFile(suffix='.key', encoding='utf-8', mode='w') as cli_key:
srv_cert.write(os.environ["KAPOW_CONTROL_SERVER_CERT"])
srv_cert.file.flush()
cli_cert.write(os.environ["KAPOW_CONTROL_CLIENT_CERT"])
cli_cert.file.flush()
cli_key.write(os.environ["KAPOW_CONTROL_CLIENT_KEY"])
cli_key.file.flush()
session=requests.Session()
session.verify=srv_cert.name
session.cert=(cli_cert.name, cli_key.name)
yield session
@route.command("add")
@click.option("-c", "--command", nargs=1)
@click.option("-e", "--entrypoint", default="/bin/sh -c")
@click.option("-X", "--method", default="GET")
@click.option("--url", envvar='KAPOW_CONTROL_URL', default=KAPOW_CONTROL_URL)
@click.argument("url_pattern", nargs=1)
@click.argument("command_file", required=False)
def route_add(url_pattern, entrypoint, command, method, url, command_file):
with kapow_control_certs() as requests:
if command:
# Command is given inline
source = command
elif command_file is None:
# No command
source = ""
elif command_file == '-':
# Read commands from stdin
source = sys.stdin.read()
else:
# Read commands from a file
with open(command_file, 'r', encoding='utf-8') as handler:
source = handler.read()
response = requests.post(f"{url}/routes",
json={"method": method,
"url_pattern": url_pattern,
"entrypoint": entrypoint,
"command": source})
response.raise_for_status()
print(json.dumps(response.json(), indent=2))
@route.command("remove")
@click.option("--url", envvar='KAPOW_CONTROL_URL', default=KAPOW_CONTROL_URL)
@click.argument("route-id")
def route_remove(route_id, url):
with kapow_control_certs() as requests:
response = requests.delete(f"{url}/routes/{route_id}")
response.raise_for_status()
@route.command("list")
@click.option("--url", envvar='KAPOW_CONTROL_URL', default=KAPOW_CONTROL_URL)
@click.argument("route-id", nargs=1, required=False, default=None)
def route_list(route_id, url):
with kapow_control_certs() as requests:
if route_id is None:
response = requests.get(f"{url}/routes")
else:
response = requests.get(f"{url}/routes/{route_id}")
response.raise_for_status()
print(json.dumps(response.json(), indent=2))
@kapow.command("set", help="Set data from the current context")
@click.option("--url", envvar='KAPOW_DATA_URL', default=KAPOW_DATA_URL)
@click.option("--handler-id", envvar='KAPOW_HANDLER_ID')
@click.argument("path", nargs=1)
@click.argument("value", required=False)
def kapow_set(url, handler_id, path, value):
if value is None:
data = sys.stdin.buffer
else:
data = value.encode('utf-8')
try:
response = requests.put(f"{url}/handlers/{handler_id}{path}",
data=data)
except requests.exceptions.ConnectionError:
return False
else:
response.raise_for_status()
@kapow.command("get", help="Get data from the current context")
@click.option("--url", envvar='KAPOW_DATA_URL', default=KAPOW_DATA_URL)
@click.option("--handler-id", envvar='KAPOW_HANDLER_ID')
@click.argument("path", nargs=1)
def kapow_get(url, handler_id, path):
try:
response = requests.get(f"{url}/handlers/{handler_id}{path}",
stream=True)
response.raise_for_status()
except requests.exceptions.ConnectionError:
return False
else:
for chunk in response.iter_content(chunk_size=None):
sys.stdout.buffer.write(chunk)
if __name__ == '__main__':
kapow()