Files
pancho horrillo ab50721f69 test(spec): Control Server API secured via cross-pinning mTLS
. We are now leveraging nix for portable dependency handling.
. There are now three types of tests: client, server and end-to-end.
. server tests exercise the actual kapow server being tested, while the
requests are performed using the test steps.
. client tests exercise the actual kapow client being tested, while the
requests are served using the test steps.
. e2e test exercise the actual kapow program in its dual role of client
and server (¡como tiene que ser!).

Co-authored-by: Roberto Abdelkader Martínez Pérez <robertomartinezp@gmail.com>
2021-03-12 17:09:50 +01:00

659 lines
23 KiB
Python

#
# Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from contextlib import suppress, contextmanager
from multiprocessing.pool import ThreadPool
from time import sleep
import datetime
import http.server
import ipaddress
import json
import logging
import os
import shlex
import signal
import socket
import ssl
import subprocess
import sys
import tempfile
import threading
import time
from comparedict import is_subset
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography import x509
from cryptography.x509.oid import NameOID, ExtensionOID
from environconfig import EnvironConfig, StringVar, IntVar, BooleanVar
from requests import exceptions as requests_exceptions
import jsonexample
import requests
WORD2POS = {"first": 0, "second": 1, "last": -1}
HERE = os.path.dirname(__file__)
class Env(EnvironConfig):
#: How to run Kapow! server
KAPOW_SERVER_CMD = StringVar(default="kapow server")
#: Where the Control API is
KAPOW_CONTROL_URL = StringVar(default="https://localhost:8081")
KAPOW_CONTROL_PORT = IntVar(default=8081)
#: Where the Data API is
KAPOW_DATA_URL = StringVar(default="http://localhost:8082")
#: Where the User Interface is
KAPOW_USER_URL = StringVar(default="http://localhost:8080")
KAPOW_CONTROL_TOKEN = StringVar(default="TEST-SPEC-CONTROL-TOKEN")
KAPOW_BOOT_TIMEOUT = IntVar(default=3000)
KAPOW_DEBUG_TESTS = BooleanVar(default=False)
if Env.KAPOW_DEBUG_TESTS:
# These two lines enable debugging at httplib level
# (requests->urllib3->http.client) You will see the REQUEST,
# including HEADERS and DATA, and RESPONSE with HEADERS but without
# DATA. The only thing missing will be the response.body which is
# not logged.
try:
import http.client as http_client
except ImportError:
# Python 2
import httplib as http_client
http_client.HTTPConnection.debuglevel = 1
# You must initialize logging, otherwise you'll not see debug output.
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
def generate_ssl_cert(subject_name, alternate_name):
# Generate our key
key = rsa.generate_private_key(
public_exponent=65537,
key_size=4096,
)
# Various details about who we are. For a self-signed certificate the
# subject and issuer are always the same.
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, subject_name),
])
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.datetime.utcnow()
).not_valid_after(
# Our certificate will be valid for 10 days
datetime.datetime.utcnow() + datetime.timedelta(days=10)
).add_extension(
x509.SubjectAlternativeName([x509.DNSName(alternate_name)]),
critical=True,
).add_extension(
x509.ExtendedKeyUsage(
[x509.oid.ExtendedKeyUsageOID.SERVER_AUTH
if subject_name.endswith('_server')
else x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH]),
critical=True,
# Sign our certificate with our private key
).sign(key, hashes.SHA256())
key_bytes = key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
crt_bytes = cert.public_bytes(serialization.Encoding.PEM)
return (key_bytes, crt_bytes)
@contextmanager
def mtls_client(context):
with tempfile.NamedTemporaryFile(suffix='.crt', encoding='utf-8', mode='w') as srv_cert, \
tempfile.NamedTemporaryFile(suffix='.crt', encoding='utf-8', mode='w') as cli_cert, \
tempfile.NamedTemporaryFile(suffix='.key', encoding='utf-8', mode='w') as cli_key:
srv_cert.write(context.init_script_environ["KAPOW_CONTROL_SERVER_CERT"])
srv_cert.file.flush()
cli_cert.write(context.init_script_environ["KAPOW_CONTROL_CLIENT_CERT"])
cli_cert.file.flush()
cli_key.write(context.init_script_environ["KAPOW_CONTROL_CLIENT_KEY"])
cli_key.file.flush()
session=requests.Session()
session.verify=srv_cert.name
session.cert=(cli_cert.name, cli_key.name)
yield session
def is_port_open(port):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
return sock.connect_ex(('127.0.0.1', port)) == 0
def run_kapow_server(context, extra_args=""):
assert (not is_port_open(Env.KAPOW_CONTROL_PORT)), "Another process is already bound"
context.server = subprocess.Popen(
shlex.split(Env.KAPOW_SERVER_CMD) + shlex.split(extra_args) + [os.path.join(HERE, "get_environment.py")],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env={'SPECTEST_FIFO': context.init_script_fifo_path, **os.environ},
shell=False)
# Check process is running with reachable APIs
open_ports = False
for _ in range(Env.KAPOW_BOOT_TIMEOUT):
with suppress(requests_exceptions.ConnectionError):
if is_port_open(Env.KAPOW_CONTROL_PORT):
open_ports = True
break
sleep(.01)
assert open_ports, "API is unreachable after KAPOW_BOOT_TIMEOUT"
# Get init_script enviroment via fifo
with open(context.init_script_fifo_path, 'r') as fifo:
context.init_script_environ = json.load(fifo)
@given('I have a just started Kapow! server')
@given('I have a running Kapow! server')
def step_impl(context):
run_kapow_server(context)
@given(u'I launch the server with the following extra arguments')
def step_impl(context):
run_kapow_server(context, context.text)
@when('I request a route listing without providing a Control Access Token')
def step_impl(context):
with mtls_client(context) as requests:
context.response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
@when('I request a route listing without providing an empty Control Access Token')
def step_impl(context):
with mtls_client(context) as requests:
context.response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
@when(u'I request a route listing providing a bad Control Access Token')
def step_impl(context):
with mtls_client(context) as requests:
context.response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
@when('I request a routes listing')
def step_impl(context):
with mtls_client(context) as requests:
context.response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
@given('I have a Kapow! server with the following routes')
def step_impl(context):
run_kapow_server(context)
if not hasattr(context, 'table'):
raise RuntimeError("A table must be set for this step.")
with mtls_client(context) as requests:
for row in context.table:
response = requests.post(
f"{Env.KAPOW_CONTROL_URL}/routes",
json={h: row[h] for h in row.headings})
response.raise_for_status()
@given('I have a Kapow! server with the following testing routes')
def step_impl(context):
run_kapow_server(context)
if not hasattr(context, 'table'):
raise RuntimeError("A table must be set for this step.")
with mtls_client(context) as requests:
for row in context.table:
response = requests.post(
f"{Env.KAPOW_CONTROL_URL}/routes",
json={"entrypoint": " ".join(
[sys.executable,
shlex.quote(os.path.join(HERE, "testinghandler.py")),
shlex.quote(context.handler_fifo_path)]), # Created in before_scenario
**{h: row[h] for h in row.headings}})
response.raise_for_status()
def testing_request(context, request_fn):
# Run the request in background
context.testing_request = ThreadPool(processes=1).apply_async(request_fn)
# Block until the handler connects and give us its pid and the
# handler_id
with open(context.handler_fifo_path, 'r') as fifo:
(context.testing_handler_pid,
context.testing_handler_id) = fifo.readline().rstrip('\n').split(';')
@when('I send a request to the testing route "{path}"')
def step_impl(context, path):
def _request():
try:
return requests.get(f"{Env.KAPOW_USER_URL}{path}", stream=False)
except:
return None
testing_request(context, _request)
@when('I release the testing request')
def step_impl(context):
os.kill(int(context.testing_handler_pid), signal.SIGTERM)
context.testing_handler_pid = None
context.testing_response = context.testing_request.get()
@when('I append the route')
def step_impl(context):
with mtls_client(context) as requests:
context.response = requests.post(
f"{Env.KAPOW_CONTROL_URL}/routes",
data=context.text,
headers={"Content-Type": "application/json"})
@then('I get {code} as response code')
def step_impl(context, code):
assert context.response.status_code == int(code), f"Got {context.response.status_code} instead"
@then('I get {code} as response code in the testing request')
def step_impl(context, code):
assert context.testing_response.status_code == int(code), f"Got {context.testing_response.status_code} instead"
@then('the response header "{header_name}" contains "{value}"')
def step_impl(context, header_name, value):
assert context.response.headers.get(header_name, "").split(';')[0] == value, f"Got {context.response.headers.get(header_name)} instead"
@then('the testing response header {header_name} contains {value}')
def step_impl(context, header_name, value):
assert context.testing_response.headers.get(header_name) == value, f"Got {context.testing_response.headers.get(header_name)} instead"
@then('I get "{reason}" as response reason phrase')
def step_impl(context, reason):
assert context.response.reason == reason, f"Got {context.response.reason} instead"
@then('I get the following response body')
def step_impl(context):
assert is_subset(jsonexample.loads(context.text), context.response.json())
@then('I get the following response raw body')
def step_impl(context):
assert context.text == context.response.text, f"{context.text!r} != {context.response.text!r}"
@when('I delete the route with id "{id}"')
def step_impl(context, id):
with mtls_client(context) as requests:
context.response = requests.delete(
f"{Env.KAPOW_CONTROL_URL}/routes/{id}")
@when('I insert the route')
def step_impl(context):
with mtls_client(context) as requests:
context.response = requests.put(
f"{Env.KAPOW_CONTROL_URL}/routes",
headers={"Content-Type": "application/json"},
data=context.text)
@when('I try to append with this malformed JSON document')
def step_impl(context):
with mtls_client(context) as requests:
context.response = requests.post(
f"{Env.KAPOW_CONTROL_URL}/routes",
headers={"Content-Type": "application/json"},
data=context.text)
@when('I delete the {order} route')
def step_impl(context, order):
with mtls_client(context) as requests:
idx = WORD2POS.get(order)
routes = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
id = routes.json()[idx]["id"]
context.response = requests.delete(
f"{Env.KAPOW_CONTROL_URL}/routes/{id}")
@when('I try to insert with this JSON document')
def step_impl(context):
with mtls_client(context) as requests:
context.response = requests.put(
f"{Env.KAPOW_CONTROL_URL}/routes",
headers={"Content-Type": "application/json"},
data=context.text)
@when('I get the route with id "{id}"')
def step_impl(context, id):
with mtls_client(context) as requests:
context.response = requests.get(
f"{Env.KAPOW_CONTROL_URL}/routes/{id}")
@when('I get the {order} route')
def step_impl(context, order):
with mtls_client(context) as requests:
idx = WORD2POS.get(order)
routes = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
id = routes.json()[idx]["id"]
context.response = requests.get(
f"{Env.KAPOW_CONTROL_URL}/routes/{id}")
@when('I get the resource "{resource}"')
@when('I get the resource "{resource}" for the handler with id "{handler_id}"')
def step_impl(context, resource, handler_id=None):
if handler_id is None:
handler_id = context.testing_handler_id
context.response = requests.get(
f"{Env.KAPOW_DATA_URL}/handlers/{handler_id}{resource}")
@when('I set the resource "{resource}" with value "{value}"')
def step_impl(context, resource, value):
context.response = requests.put(
f"{Env.KAPOW_DATA_URL}/handlers/{context.testing_handler_id}{resource}",
data=value.encode("utf-8"))
@when('I send a request to the testing route "{path}" adding')
def step_impl(context, path):
if not hasattr(context, 'table'):
raise RuntimeError("A table must be set for this step.")
params = {
"headers": dict(),
"cookies": dict(),
"params": dict()}
setters = {
"header": params["headers"].setdefault,
"cookie": params["cookies"].setdefault,
"body": lambda _, v: params.setdefault("data", v.encode("utf-8")),
"parameter": params["params"].setdefault,}
for row in context.table:
setters[row["fieldType"]](row["name"], row["value"])
def _request():
try:
return requests.get(f"{Env.KAPOW_USER_URL}{path}",
stream=False,
**params)
except:
return None
testing_request(context, _request)
@then('I get the value "{value}" for the response "{fieldType}" named "{elementName}" in the testing request')
def step_impl(context, value, fieldType, elementName):
if fieldType == "header":
actual = context.testing_response.headers.get(elementName)
elif fieldType == "cookie":
actual = context.testing_response.cookies.get(elementName)
elif fieldType == "body":
actual = context.testing_response.text
else:
raise ValueError("Unknown fieldtype {fieldType!r}")
assert actual == value, f"Expecting {fieldType} {elementName!r} to be {value!r}, got {actual!r} insted"
@given('a test HTTPS server on the {port} port')
def step_impl(context, port):
context.request_ready = threading.Event()
context.request_ready.clear()
context.response_ready = threading.Event()
context.response_ready.clear()
class SaveResponseHandler(http.server.BaseHTTPRequestHandler):
def do_verb(self):
context.request_response = self
context.request_ready.set()
context.response_ready.wait()
do_GET=do_verb
do_POST=do_verb
do_PUT=do_verb
do_DELETE=do_verb
do_HEAD=do_verb
if port == "control":
port = 8081
elif port == "data":
port = 8082
else:
raise ValueError(f"Unknown port {port}")
context.httpserver = http.server.HTTPServer(('127.0.0.1', port),
SaveResponseHandler)
context.srv_key, context.srv_crt = generate_ssl_cert("control_server", "localhost")
context.cli_key, context.cli_crt = generate_ssl_cert("control_client", "localhost")
with tempfile.NamedTemporaryFile(suffix=".key") as key_file, \
tempfile.NamedTemporaryFile(suffix=".crt") as crt_file:
key_file.write(context.srv_key)
key_file.flush()
crt_file.write(context.srv_crt)
crt_file.flush()
context.httpserver.socket = ssl.wrap_socket(
context.httpserver.socket,
keyfile=key_file.name,
certfile=crt_file.name,
server_side=True)
context.httpserver_thread = threading.Thread(
target=context.httpserver.serve_forever,
daemon=True)
context.httpserver_thread.start()
def run_command_with_certs(context, srv_crt, cli_crt, cli_key):
_, command = context.text.split('$')
command = command.lstrip()
def exec_in_thread():
context.command = subprocess.Popen(
command,
shell=True,
env={'KAPOW_CONTROL_SERVER_CERT': srv_crt,
'KAPOW_CONTROL_CLIENT_CERT': cli_crt,
'KAPOW_CONTROL_CLIENT_KEY': cli_key,
**os.environ})
context.command.wait()
context.command_thread = threading.Thread(target=exec_in_thread, daemon=True)
context.command_thread.start()
@step('I run the following command (with invalid certs)')
def step_impl(context):
invalid_srv_crt, _ = generate_ssl_cert("invalid_control_server",
"localhost")
run_command_with_certs(context,
invalid_srv_crt,
context.cli_crt,
context.cli_key)
@step('I run the following command')
def step_impl(context):
run_command_with_certs(context,
context.srv_crt,
context.cli_crt,
context.cli_key)
@when('I run the following command (setting the control certs environment variables)')
def step_impl(context):
run_command_with_certs(
context,
context.init_script_environ["KAPOW_CONTROL_SERVER_CERT"],
context.init_script_environ["KAPOW_CONTROL_CLIENT_CERT"],
context.init_script_environ["KAPOW_CONTROL_CLIENT_KEY"])
@step('the HTTPS server receives a "{method}" request to "{path}"')
def step_impl(context, method, path):
context.request_ready.wait()
assert context.request_response.command == method, f"Method {context.request_response.command} is not {method}"
assert context.request_response.path == path, f"Method {context.request_response.path} is not {path}"
@then('the received request has the header "{name}" set to "{value}"')
def step_impl(context, name, value):
context.request_ready.wait()
matching = context.request_response.headers[name]
assert matching, f"Header {name} not found"
assert matching == value, f"Value of header doesn't match. {matching} != {value}"
@when('the server responds with')
def step_impl(context):
# TODO: set the fields given in the table
has_body = False
for row in context.table:
if row['field'] == 'status':
context.request_response.send_response(int(row['value']))
elif row['field'].startswith('headers.'):
_, header = row['field'].split('.')
context.request_response.send_header(header, row['value'])
elif row['field'] == 'body':
has_body = True
payload = row['value'].encode('utf-8')
context.request_response.send_header('Content-Length', str(len(payload)))
context.request_response.end_headers()
context.request_response.wfile.write(payload)
if not has_body:
context.request_response.send_header('Content-Length', '0')
context.request_response.end_headers()
context.response_ready.set()
@then('the command exits {immediately} with "{returncode}"')
@then('the command exits with "{returncode}"')
def step_impl(context, returncode, immediately=False):
context.command_thread.join(timeout=3.0 if immediately else None)
if context.command_thread.is_alive():
try:
print("killing in the name of")
context.command.kill()
finally:
assert False, "The command is still alive"
else:
context.command.wait()
assert context.command.returncode == int(returncode), f"Command returned {context.command.returncode} instead of {returncode}"
@then('the received request doesn\'t have the header "{name}" set')
def step_impl(context, name):
context.request_ready.wait()
assert name not in context.request_response.headers, f"Header {name} found"
@when('I try to connect to the control API without providing a certificate')
def step_impl(context):
try:
context.request_response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes", verify=False)
except Exception as exc:
context.request_response = exc
@then(u'I get a connection error')
def step_impl(context):
assert issubclass(type(context.request_response), Exception), context.request_response
@when(u'I try to connect to the control API providing an invalid certificate')
def step_impl(context):
key, cert = generate_ssl_cert("foo", "localhost")
with tempfile.NamedTemporaryFile(suffix='.crt') as cert_file, \
tempfile.NamedTemporaryFile(suffix='.key') as key_file:
cert_file.write(cert)
cert_file.flush()
key_file.write(key)
key_file.flush()
with requests.Session() as session:
session.cert = (cert_file.name, key_file.name)
session.verify = False
try:
context.request_response = session.get(
f"{Env.KAPOW_CONTROL_URL}/routes")
except Exception as exc:
context.request_response = exc
@when('I inspect the automatically generated control server certificate')
def step_impl(context):
context.control_server_cert = x509.load_pem_x509_certificate(
context.init_script_environ["KAPOW_CONTROL_SERVER_CERT"].encode('ascii'))
@then('the extension "{extension}" contains "{value}" of type "{typename}"')
def step_impl(context, extension, value, typename):
if extension == 'Subject Alternative Name':
oid = ExtensionOID.SUBJECT_ALTERNATIVE_NAME
else:
raise NotImplementedError(f'Unknown extension {extension}')
if typename == 'DNSName':
type_ = x509.DNSName
converter = lambda x: x
elif typename == 'IPAddress':
type_ = x509.IPAddress
converter = ipaddress.ip_address
else:
raise NotImplementedError(f'Unknown type {typename}')
ext = context.control_server_cert.extensions.get_extension_for_oid(oid)
values = ext.value.get_values_for_type(type_)
assert converter(value) in values, f"Value {value} not in {values}"