. We are now leveraging nix for portable dependency handling. . There are now three types of tests: client, server and end-to-end. . server tests exercise the actual kapow server being tested, while the requests are performed using the test steps. . client tests exercise the actual kapow client being tested, while the requests are served using the test steps. . e2e test exercise the actual kapow program in its dual role of client and server (¡como tiene que ser!). Co-authored-by: Roberto Abdelkader Martínez Pérez <robertomartinezp@gmail.com>
659 lines
23 KiB
Python
659 lines
23 KiB
Python
#
|
|
# Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
from contextlib import suppress, contextmanager
|
|
from multiprocessing.pool import ThreadPool
|
|
from time import sleep
|
|
import datetime
|
|
import http.server
|
|
import ipaddress
|
|
import json
|
|
import logging
|
|
import os
|
|
import shlex
|
|
import signal
|
|
import socket
|
|
import ssl
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
|
|
from comparedict import is_subset
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography import x509
|
|
from cryptography.x509.oid import NameOID, ExtensionOID
|
|
from environconfig import EnvironConfig, StringVar, IntVar, BooleanVar
|
|
from requests import exceptions as requests_exceptions
|
|
import jsonexample
|
|
import requests
|
|
|
|
|
|
WORD2POS = {"first": 0, "second": 1, "last": -1}
|
|
HERE = os.path.dirname(__file__)
|
|
|
|
|
|
class Env(EnvironConfig):
|
|
#: How to run Kapow! server
|
|
KAPOW_SERVER_CMD = StringVar(default="kapow server")
|
|
|
|
#: Where the Control API is
|
|
KAPOW_CONTROL_URL = StringVar(default="https://localhost:8081")
|
|
KAPOW_CONTROL_PORT = IntVar(default=8081)
|
|
|
|
#: Where the Data API is
|
|
KAPOW_DATA_URL = StringVar(default="http://localhost:8082")
|
|
|
|
#: Where the User Interface is
|
|
KAPOW_USER_URL = StringVar(default="http://localhost:8080")
|
|
|
|
KAPOW_CONTROL_TOKEN = StringVar(default="TEST-SPEC-CONTROL-TOKEN")
|
|
|
|
KAPOW_BOOT_TIMEOUT = IntVar(default=3000)
|
|
|
|
KAPOW_DEBUG_TESTS = BooleanVar(default=False)
|
|
|
|
|
|
if Env.KAPOW_DEBUG_TESTS:
|
|
# These two lines enable debugging at httplib level
|
|
# (requests->urllib3->http.client) You will see the REQUEST,
|
|
# including HEADERS and DATA, and RESPONSE with HEADERS but without
|
|
# DATA. The only thing missing will be the response.body which is
|
|
# not logged.
|
|
try:
|
|
import http.client as http_client
|
|
except ImportError:
|
|
# Python 2
|
|
import httplib as http_client
|
|
http_client.HTTPConnection.debuglevel = 1
|
|
|
|
# You must initialize logging, otherwise you'll not see debug output.
|
|
logging.basicConfig()
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
requests_log = logging.getLogger("requests.packages.urllib3")
|
|
requests_log.setLevel(logging.DEBUG)
|
|
requests_log.propagate = True
|
|
|
|
|
|
def generate_ssl_cert(subject_name, alternate_name):
|
|
# Generate our key
|
|
key = rsa.generate_private_key(
|
|
public_exponent=65537,
|
|
key_size=4096,
|
|
)
|
|
# Various details about who we are. For a self-signed certificate the
|
|
# subject and issuer are always the same.
|
|
subject = issuer = x509.Name([
|
|
x509.NameAttribute(NameOID.COMMON_NAME, subject_name),
|
|
])
|
|
cert = x509.CertificateBuilder().subject_name(
|
|
subject
|
|
).issuer_name(
|
|
issuer
|
|
).public_key(
|
|
key.public_key()
|
|
).serial_number(
|
|
x509.random_serial_number()
|
|
).not_valid_before(
|
|
datetime.datetime.utcnow()
|
|
).not_valid_after(
|
|
# Our certificate will be valid for 10 days
|
|
datetime.datetime.utcnow() + datetime.timedelta(days=10)
|
|
).add_extension(
|
|
x509.SubjectAlternativeName([x509.DNSName(alternate_name)]),
|
|
critical=True,
|
|
).add_extension(
|
|
x509.ExtendedKeyUsage(
|
|
[x509.oid.ExtendedKeyUsageOID.SERVER_AUTH
|
|
if subject_name.endswith('_server')
|
|
else x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH]),
|
|
critical=True,
|
|
# Sign our certificate with our private key
|
|
).sign(key, hashes.SHA256())
|
|
|
|
key_bytes = key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
|
encryption_algorithm=serialization.NoEncryption()
|
|
)
|
|
crt_bytes = cert.public_bytes(serialization.Encoding.PEM)
|
|
|
|
return (key_bytes, crt_bytes)
|
|
|
|
|
|
@contextmanager
|
|
def mtls_client(context):
|
|
with tempfile.NamedTemporaryFile(suffix='.crt', encoding='utf-8', mode='w') as srv_cert, \
|
|
tempfile.NamedTemporaryFile(suffix='.crt', encoding='utf-8', mode='w') as cli_cert, \
|
|
tempfile.NamedTemporaryFile(suffix='.key', encoding='utf-8', mode='w') as cli_key:
|
|
srv_cert.write(context.init_script_environ["KAPOW_CONTROL_SERVER_CERT"])
|
|
srv_cert.file.flush()
|
|
cli_cert.write(context.init_script_environ["KAPOW_CONTROL_CLIENT_CERT"])
|
|
cli_cert.file.flush()
|
|
cli_key.write(context.init_script_environ["KAPOW_CONTROL_CLIENT_KEY"])
|
|
cli_key.file.flush()
|
|
session=requests.Session()
|
|
session.verify=srv_cert.name
|
|
session.cert=(cli_cert.name, cli_key.name)
|
|
yield session
|
|
|
|
|
|
def is_port_open(port):
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
|
return sock.connect_ex(('127.0.0.1', port)) == 0
|
|
|
|
|
|
def run_kapow_server(context, extra_args=""):
|
|
assert (not is_port_open(Env.KAPOW_CONTROL_PORT)), "Another process is already bound"
|
|
|
|
context.server = subprocess.Popen(
|
|
shlex.split(Env.KAPOW_SERVER_CMD) + shlex.split(extra_args) + [os.path.join(HERE, "get_environment.py")],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
env={'SPECTEST_FIFO': context.init_script_fifo_path, **os.environ},
|
|
shell=False)
|
|
|
|
# Check process is running with reachable APIs
|
|
open_ports = False
|
|
for _ in range(Env.KAPOW_BOOT_TIMEOUT):
|
|
with suppress(requests_exceptions.ConnectionError):
|
|
if is_port_open(Env.KAPOW_CONTROL_PORT):
|
|
open_ports = True
|
|
break
|
|
sleep(.01)
|
|
|
|
assert open_ports, "API is unreachable after KAPOW_BOOT_TIMEOUT"
|
|
|
|
# Get init_script enviroment via fifo
|
|
with open(context.init_script_fifo_path, 'r') as fifo:
|
|
context.init_script_environ = json.load(fifo)
|
|
|
|
|
|
@given('I have a just started Kapow! server')
|
|
@given('I have a running Kapow! server')
|
|
def step_impl(context):
|
|
run_kapow_server(context)
|
|
|
|
|
|
@given(u'I launch the server with the following extra arguments')
|
|
def step_impl(context):
|
|
run_kapow_server(context, context.text)
|
|
|
|
|
|
@when('I request a route listing without providing a Control Access Token')
|
|
def step_impl(context):
|
|
with mtls_client(context) as requests:
|
|
context.response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
|
|
|
|
|
|
@when('I request a route listing without providing an empty Control Access Token')
|
|
def step_impl(context):
|
|
with mtls_client(context) as requests:
|
|
context.response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
|
|
|
|
|
|
@when(u'I request a route listing providing a bad Control Access Token')
|
|
def step_impl(context):
|
|
with mtls_client(context) as requests:
|
|
context.response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
|
|
|
|
|
|
@when('I request a routes listing')
|
|
def step_impl(context):
|
|
with mtls_client(context) as requests:
|
|
context.response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
|
|
|
|
|
|
@given('I have a Kapow! server with the following routes')
|
|
def step_impl(context):
|
|
run_kapow_server(context)
|
|
|
|
if not hasattr(context, 'table'):
|
|
raise RuntimeError("A table must be set for this step.")
|
|
|
|
with mtls_client(context) as requests:
|
|
for row in context.table:
|
|
response = requests.post(
|
|
f"{Env.KAPOW_CONTROL_URL}/routes",
|
|
json={h: row[h] for h in row.headings})
|
|
response.raise_for_status()
|
|
|
|
|
|
@given('I have a Kapow! server with the following testing routes')
|
|
def step_impl(context):
|
|
run_kapow_server(context)
|
|
|
|
if not hasattr(context, 'table'):
|
|
raise RuntimeError("A table must be set for this step.")
|
|
|
|
with mtls_client(context) as requests:
|
|
for row in context.table:
|
|
response = requests.post(
|
|
f"{Env.KAPOW_CONTROL_URL}/routes",
|
|
json={"entrypoint": " ".join(
|
|
[sys.executable,
|
|
shlex.quote(os.path.join(HERE, "testinghandler.py")),
|
|
shlex.quote(context.handler_fifo_path)]), # Created in before_scenario
|
|
**{h: row[h] for h in row.headings}})
|
|
response.raise_for_status()
|
|
|
|
def testing_request(context, request_fn):
|
|
# Run the request in background
|
|
context.testing_request = ThreadPool(processes=1).apply_async(request_fn)
|
|
|
|
# Block until the handler connects and give us its pid and the
|
|
# handler_id
|
|
with open(context.handler_fifo_path, 'r') as fifo:
|
|
(context.testing_handler_pid,
|
|
context.testing_handler_id) = fifo.readline().rstrip('\n').split(';')
|
|
|
|
|
|
@when('I send a request to the testing route "{path}"')
|
|
def step_impl(context, path):
|
|
def _request():
|
|
try:
|
|
return requests.get(f"{Env.KAPOW_USER_URL}{path}", stream=False)
|
|
except:
|
|
return None
|
|
|
|
testing_request(context, _request)
|
|
|
|
|
|
@when('I release the testing request')
|
|
def step_impl(context):
|
|
os.kill(int(context.testing_handler_pid), signal.SIGTERM)
|
|
context.testing_handler_pid = None
|
|
context.testing_response = context.testing_request.get()
|
|
|
|
|
|
@when('I append the route')
|
|
def step_impl(context):
|
|
with mtls_client(context) as requests:
|
|
context.response = requests.post(
|
|
f"{Env.KAPOW_CONTROL_URL}/routes",
|
|
data=context.text,
|
|
headers={"Content-Type": "application/json"})
|
|
|
|
@then('I get {code} as response code')
|
|
def step_impl(context, code):
|
|
assert context.response.status_code == int(code), f"Got {context.response.status_code} instead"
|
|
|
|
|
|
@then('I get {code} as response code in the testing request')
|
|
def step_impl(context, code):
|
|
assert context.testing_response.status_code == int(code), f"Got {context.testing_response.status_code} instead"
|
|
|
|
|
|
@then('the response header "{header_name}" contains "{value}"')
|
|
def step_impl(context, header_name, value):
|
|
assert context.response.headers.get(header_name, "").split(';')[0] == value, f"Got {context.response.headers.get(header_name)} instead"
|
|
|
|
|
|
@then('the testing response header {header_name} contains {value}')
|
|
def step_impl(context, header_name, value):
|
|
assert context.testing_response.headers.get(header_name) == value, f"Got {context.testing_response.headers.get(header_name)} instead"
|
|
|
|
|
|
@then('I get "{reason}" as response reason phrase')
|
|
def step_impl(context, reason):
|
|
assert context.response.reason == reason, f"Got {context.response.reason} instead"
|
|
|
|
|
|
@then('I get the following response body')
|
|
def step_impl(context):
|
|
assert is_subset(jsonexample.loads(context.text), context.response.json())
|
|
|
|
|
|
@then('I get the following response raw body')
|
|
def step_impl(context):
|
|
assert context.text == context.response.text, f"{context.text!r} != {context.response.text!r}"
|
|
|
|
|
|
@when('I delete the route with id "{id}"')
|
|
def step_impl(context, id):
|
|
with mtls_client(context) as requests:
|
|
context.response = requests.delete(
|
|
f"{Env.KAPOW_CONTROL_URL}/routes/{id}")
|
|
|
|
|
|
@when('I insert the route')
|
|
def step_impl(context):
|
|
with mtls_client(context) as requests:
|
|
context.response = requests.put(
|
|
f"{Env.KAPOW_CONTROL_URL}/routes",
|
|
headers={"Content-Type": "application/json"},
|
|
data=context.text)
|
|
|
|
|
|
@when('I try to append with this malformed JSON document')
|
|
def step_impl(context):
|
|
with mtls_client(context) as requests:
|
|
context.response = requests.post(
|
|
f"{Env.KAPOW_CONTROL_URL}/routes",
|
|
headers={"Content-Type": "application/json"},
|
|
data=context.text)
|
|
|
|
|
|
@when('I delete the {order} route')
|
|
def step_impl(context, order):
|
|
with mtls_client(context) as requests:
|
|
idx = WORD2POS.get(order)
|
|
routes = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
|
|
id = routes.json()[idx]["id"]
|
|
context.response = requests.delete(
|
|
f"{Env.KAPOW_CONTROL_URL}/routes/{id}")
|
|
|
|
|
|
@when('I try to insert with this JSON document')
|
|
def step_impl(context):
|
|
with mtls_client(context) as requests:
|
|
context.response = requests.put(
|
|
f"{Env.KAPOW_CONTROL_URL}/routes",
|
|
headers={"Content-Type": "application/json"},
|
|
data=context.text)
|
|
|
|
@when('I get the route with id "{id}"')
|
|
def step_impl(context, id):
|
|
with mtls_client(context) as requests:
|
|
context.response = requests.get(
|
|
f"{Env.KAPOW_CONTROL_URL}/routes/{id}")
|
|
|
|
|
|
@when('I get the {order} route')
|
|
def step_impl(context, order):
|
|
with mtls_client(context) as requests:
|
|
idx = WORD2POS.get(order)
|
|
routes = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
|
|
id = routes.json()[idx]["id"]
|
|
context.response = requests.get(
|
|
f"{Env.KAPOW_CONTROL_URL}/routes/{id}")
|
|
|
|
|
|
@when('I get the resource "{resource}"')
|
|
@when('I get the resource "{resource}" for the handler with id "{handler_id}"')
|
|
def step_impl(context, resource, handler_id=None):
|
|
if handler_id is None:
|
|
handler_id = context.testing_handler_id
|
|
|
|
context.response = requests.get(
|
|
f"{Env.KAPOW_DATA_URL}/handlers/{handler_id}{resource}")
|
|
|
|
|
|
@when('I set the resource "{resource}" with value "{value}"')
|
|
def step_impl(context, resource, value):
|
|
context.response = requests.put(
|
|
f"{Env.KAPOW_DATA_URL}/handlers/{context.testing_handler_id}{resource}",
|
|
data=value.encode("utf-8"))
|
|
|
|
|
|
@when('I send a request to the testing route "{path}" adding')
|
|
def step_impl(context, path):
|
|
if not hasattr(context, 'table'):
|
|
raise RuntimeError("A table must be set for this step.")
|
|
|
|
params = {
|
|
"headers": dict(),
|
|
"cookies": dict(),
|
|
"params": dict()}
|
|
setters = {
|
|
"header": params["headers"].setdefault,
|
|
"cookie": params["cookies"].setdefault,
|
|
"body": lambda _, v: params.setdefault("data", v.encode("utf-8")),
|
|
"parameter": params["params"].setdefault,}
|
|
|
|
for row in context.table:
|
|
setters[row["fieldType"]](row["name"], row["value"])
|
|
|
|
def _request():
|
|
try:
|
|
return requests.get(f"{Env.KAPOW_USER_URL}{path}",
|
|
stream=False,
|
|
**params)
|
|
except:
|
|
return None
|
|
|
|
testing_request(context, _request)
|
|
|
|
|
|
@then('I get the value "{value}" for the response "{fieldType}" named "{elementName}" in the testing request')
|
|
def step_impl(context, value, fieldType, elementName):
|
|
if fieldType == "header":
|
|
actual = context.testing_response.headers.get(elementName)
|
|
elif fieldType == "cookie":
|
|
actual = context.testing_response.cookies.get(elementName)
|
|
elif fieldType == "body":
|
|
actual = context.testing_response.text
|
|
else:
|
|
raise ValueError("Unknown fieldtype {fieldType!r}")
|
|
|
|
assert actual == value, f"Expecting {fieldType} {elementName!r} to be {value!r}, got {actual!r} insted"
|
|
|
|
|
|
@given('a test HTTPS server on the {port} port')
|
|
def step_impl(context, port):
|
|
context.request_ready = threading.Event()
|
|
context.request_ready.clear()
|
|
context.response_ready = threading.Event()
|
|
context.response_ready.clear()
|
|
|
|
class SaveResponseHandler(http.server.BaseHTTPRequestHandler):
|
|
def do_verb(self):
|
|
context.request_response = self
|
|
context.request_ready.set()
|
|
context.response_ready.wait()
|
|
do_GET=do_verb
|
|
do_POST=do_verb
|
|
do_PUT=do_verb
|
|
do_DELETE=do_verb
|
|
do_HEAD=do_verb
|
|
|
|
if port == "control":
|
|
port = 8081
|
|
elif port == "data":
|
|
port = 8082
|
|
else:
|
|
raise ValueError(f"Unknown port {port}")
|
|
|
|
context.httpserver = http.server.HTTPServer(('127.0.0.1', port),
|
|
SaveResponseHandler)
|
|
|
|
context.srv_key, context.srv_crt = generate_ssl_cert("control_server", "localhost")
|
|
context.cli_key, context.cli_crt = generate_ssl_cert("control_client", "localhost")
|
|
with tempfile.NamedTemporaryFile(suffix=".key") as key_file, \
|
|
tempfile.NamedTemporaryFile(suffix=".crt") as crt_file:
|
|
key_file.write(context.srv_key)
|
|
key_file.flush()
|
|
crt_file.write(context.srv_crt)
|
|
crt_file.flush()
|
|
context.httpserver.socket = ssl.wrap_socket(
|
|
context.httpserver.socket,
|
|
keyfile=key_file.name,
|
|
certfile=crt_file.name,
|
|
server_side=True)
|
|
context.httpserver_thread = threading.Thread(
|
|
target=context.httpserver.serve_forever,
|
|
daemon=True)
|
|
context.httpserver_thread.start()
|
|
|
|
|
|
def run_command_with_certs(context, srv_crt, cli_crt, cli_key):
|
|
_, command = context.text.split('$')
|
|
command = command.lstrip()
|
|
|
|
def exec_in_thread():
|
|
context.command = subprocess.Popen(
|
|
command,
|
|
shell=True,
|
|
env={'KAPOW_CONTROL_SERVER_CERT': srv_crt,
|
|
'KAPOW_CONTROL_CLIENT_CERT': cli_crt,
|
|
'KAPOW_CONTROL_CLIENT_KEY': cli_key,
|
|
**os.environ})
|
|
context.command.wait()
|
|
|
|
context.command_thread = threading.Thread(target=exec_in_thread, daemon=True)
|
|
context.command_thread.start()
|
|
|
|
@step('I run the following command (with invalid certs)')
|
|
def step_impl(context):
|
|
invalid_srv_crt, _ = generate_ssl_cert("invalid_control_server",
|
|
"localhost")
|
|
run_command_with_certs(context,
|
|
invalid_srv_crt,
|
|
context.cli_crt,
|
|
context.cli_key)
|
|
|
|
|
|
@step('I run the following command')
|
|
def step_impl(context):
|
|
run_command_with_certs(context,
|
|
context.srv_crt,
|
|
context.cli_crt,
|
|
context.cli_key)
|
|
|
|
|
|
@when('I run the following command (setting the control certs environment variables)')
|
|
def step_impl(context):
|
|
run_command_with_certs(
|
|
context,
|
|
context.init_script_environ["KAPOW_CONTROL_SERVER_CERT"],
|
|
context.init_script_environ["KAPOW_CONTROL_CLIENT_CERT"],
|
|
context.init_script_environ["KAPOW_CONTROL_CLIENT_KEY"])
|
|
|
|
|
|
@step('the HTTPS server receives a "{method}" request to "{path}"')
|
|
def step_impl(context, method, path):
|
|
context.request_ready.wait()
|
|
assert context.request_response.command == method, f"Method {context.request_response.command} is not {method}"
|
|
assert context.request_response.path == path, f"Method {context.request_response.path} is not {path}"
|
|
|
|
|
|
|
|
@then('the received request has the header "{name}" set to "{value}"')
|
|
def step_impl(context, name, value):
|
|
context.request_ready.wait()
|
|
matching = context.request_response.headers[name]
|
|
assert matching, f"Header {name} not found"
|
|
assert matching == value, f"Value of header doesn't match. {matching} != {value}"
|
|
|
|
|
|
@when('the server responds with')
|
|
def step_impl(context):
|
|
# TODO: set the fields given in the table
|
|
has_body = False
|
|
for row in context.table:
|
|
if row['field'] == 'status':
|
|
context.request_response.send_response(int(row['value']))
|
|
elif row['field'].startswith('headers.'):
|
|
_, header = row['field'].split('.')
|
|
context.request_response.send_header(header, row['value'])
|
|
elif row['field'] == 'body':
|
|
has_body = True
|
|
payload = row['value'].encode('utf-8')
|
|
context.request_response.send_header('Content-Length', str(len(payload)))
|
|
context.request_response.end_headers()
|
|
context.request_response.wfile.write(payload)
|
|
|
|
if not has_body:
|
|
context.request_response.send_header('Content-Length', '0')
|
|
context.request_response.end_headers()
|
|
|
|
context.response_ready.set()
|
|
|
|
@then('the command exits {immediately} with "{returncode}"')
|
|
@then('the command exits with "{returncode}"')
|
|
def step_impl(context, returncode, immediately=False):
|
|
context.command_thread.join(timeout=3.0 if immediately else None)
|
|
if context.command_thread.is_alive():
|
|
try:
|
|
print("killing in the name of")
|
|
context.command.kill()
|
|
finally:
|
|
assert False, "The command is still alive"
|
|
|
|
else:
|
|
context.command.wait()
|
|
assert context.command.returncode == int(returncode), f"Command returned {context.command.returncode} instead of {returncode}"
|
|
|
|
|
|
@then('the received request doesn\'t have the header "{name}" set')
|
|
def step_impl(context, name):
|
|
context.request_ready.wait()
|
|
assert name not in context.request_response.headers, f"Header {name} found"
|
|
|
|
|
|
@when('I try to connect to the control API without providing a certificate')
|
|
def step_impl(context):
|
|
try:
|
|
context.request_response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes", verify=False)
|
|
except Exception as exc:
|
|
context.request_response = exc
|
|
|
|
|
|
@then(u'I get a connection error')
|
|
def step_impl(context):
|
|
assert issubclass(type(context.request_response), Exception), context.request_response
|
|
|
|
|
|
@when(u'I try to connect to the control API providing an invalid certificate')
|
|
def step_impl(context):
|
|
key, cert = generate_ssl_cert("foo", "localhost")
|
|
with tempfile.NamedTemporaryFile(suffix='.crt') as cert_file, \
|
|
tempfile.NamedTemporaryFile(suffix='.key') as key_file:
|
|
cert_file.write(cert)
|
|
cert_file.flush()
|
|
key_file.write(key)
|
|
key_file.flush()
|
|
with requests.Session() as session:
|
|
session.cert = (cert_file.name, key_file.name)
|
|
session.verify = False
|
|
try:
|
|
context.request_response = session.get(
|
|
f"{Env.KAPOW_CONTROL_URL}/routes")
|
|
except Exception as exc:
|
|
context.request_response = exc
|
|
|
|
|
|
|
|
@when('I inspect the automatically generated control server certificate')
|
|
def step_impl(context):
|
|
context.control_server_cert = x509.load_pem_x509_certificate(
|
|
context.init_script_environ["KAPOW_CONTROL_SERVER_CERT"].encode('ascii'))
|
|
|
|
|
|
@then('the extension "{extension}" contains "{value}" of type "{typename}"')
|
|
def step_impl(context, extension, value, typename):
|
|
if extension == 'Subject Alternative Name':
|
|
oid = ExtensionOID.SUBJECT_ALTERNATIVE_NAME
|
|
else:
|
|
raise NotImplementedError(f'Unknown extension {extension}')
|
|
|
|
if typename == 'DNSName':
|
|
type_ = x509.DNSName
|
|
converter = lambda x: x
|
|
elif typename == 'IPAddress':
|
|
type_ = x509.IPAddress
|
|
converter = ipaddress.ip_address
|
|
else:
|
|
raise NotImplementedError(f'Unknown type {typename}')
|
|
|
|
ext = context.control_server_cert.extensions.get_extension_for_oid(oid)
|
|
values = ext.value.get_values_for_type(type_)
|
|
|
|
assert converter(value) in values, f"Value {value} not in {values}"
|