test(spec): Control Server API secured via cross-pinning mTLS
. We are now leveraging nix for portable dependency handling. . There are now three types of tests: client, server and end-to-end. . server tests exercise the actual kapow server being tested, while the requests are performed using the test steps. . client tests exercise the actual kapow client being tested, while the requests are served using the test steps. . e2e test exercise the actual kapow program in its dual role of client and server (¡como tiene que ser!). Co-authored-by: Roberto Abdelkader Martínez Pérez <robertomartinezp@gmail.com>
This commit is contained in:
@@ -13,26 +13,36 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
from contextlib import suppress
|
||||
from contextlib import suppress, contextmanager
|
||||
from multiprocessing.pool import ThreadPool
|
||||
from time import sleep
|
||||
import datetime
|
||||
import http.server
|
||||
import ipaddress
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shlex
|
||||
import signal
|
||||
import socket
|
||||
import ssl
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
from multiprocessing.pool import ThreadPool
|
||||
import time
|
||||
|
||||
import requests
|
||||
from environconfig import EnvironConfig, StringVar, IntVar, BooleanVar
|
||||
from comparedict import is_subset
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography import x509
|
||||
from cryptography.x509.oid import NameOID, ExtensionOID
|
||||
from environconfig import EnvironConfig, StringVar, IntVar, BooleanVar
|
||||
from requests import exceptions as requests_exceptions
|
||||
import jsonexample
|
||||
|
||||
import logging
|
||||
import requests
|
||||
|
||||
|
||||
WORD2POS = {"first": 0, "second": 1, "last": -1}
|
||||
@@ -44,7 +54,8 @@ class Env(EnvironConfig):
|
||||
KAPOW_SERVER_CMD = StringVar(default="kapow server")
|
||||
|
||||
#: Where the Control API is
|
||||
KAPOW_CONTROL_URL = StringVar(default="http://localhost:8081")
|
||||
KAPOW_CONTROL_URL = StringVar(default="https://localhost:8081")
|
||||
KAPOW_CONTROL_PORT = IntVar(default=8081)
|
||||
|
||||
#: Where the Data API is
|
||||
KAPOW_DATA_URL = StringVar(default="http://localhost:8082")
|
||||
@@ -52,7 +63,9 @@ class Env(EnvironConfig):
|
||||
#: Where the User Interface is
|
||||
KAPOW_USER_URL = StringVar(default="http://localhost:8080")
|
||||
|
||||
KAPOW_BOOT_TIMEOUT = IntVar(default=1000)
|
||||
KAPOW_CONTROL_TOKEN = StringVar(default="TEST-SPEC-CONTROL-TOKEN")
|
||||
|
||||
KAPOW_BOOT_TIMEOUT = IntVar(default=3000)
|
||||
|
||||
KAPOW_DEBUG_TESTS = BooleanVar(default=False)
|
||||
|
||||
@@ -77,37 +90,134 @@ if Env.KAPOW_DEBUG_TESTS:
|
||||
requests_log.setLevel(logging.DEBUG)
|
||||
requests_log.propagate = True
|
||||
|
||||
def run_kapow_server(context):
|
||||
|
||||
def generate_ssl_cert(subject_name, alternate_name):
|
||||
# Generate our key
|
||||
key = rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=4096,
|
||||
)
|
||||
# Various details about who we are. For a self-signed certificate the
|
||||
# subject and issuer are always the same.
|
||||
subject = issuer = x509.Name([
|
||||
x509.NameAttribute(NameOID.COMMON_NAME, subject_name),
|
||||
])
|
||||
cert = x509.CertificateBuilder().subject_name(
|
||||
subject
|
||||
).issuer_name(
|
||||
issuer
|
||||
).public_key(
|
||||
key.public_key()
|
||||
).serial_number(
|
||||
x509.random_serial_number()
|
||||
).not_valid_before(
|
||||
datetime.datetime.utcnow()
|
||||
).not_valid_after(
|
||||
# Our certificate will be valid for 10 days
|
||||
datetime.datetime.utcnow() + datetime.timedelta(days=10)
|
||||
).add_extension(
|
||||
x509.SubjectAlternativeName([x509.DNSName(alternate_name)]),
|
||||
critical=True,
|
||||
).add_extension(
|
||||
x509.ExtendedKeyUsage(
|
||||
[x509.oid.ExtendedKeyUsageOID.SERVER_AUTH
|
||||
if subject_name.endswith('_server')
|
||||
else x509.oid.ExtendedKeyUsageOID.CLIENT_AUTH]),
|
||||
critical=True,
|
||||
# Sign our certificate with our private key
|
||||
).sign(key, hashes.SHA256())
|
||||
|
||||
key_bytes = key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.NoEncryption()
|
||||
)
|
||||
crt_bytes = cert.public_bytes(serialization.Encoding.PEM)
|
||||
|
||||
return (key_bytes, crt_bytes)
|
||||
|
||||
|
||||
@contextmanager
|
||||
def mtls_client(context):
|
||||
with tempfile.NamedTemporaryFile(suffix='.crt', encoding='utf-8', mode='w') as srv_cert, \
|
||||
tempfile.NamedTemporaryFile(suffix='.crt', encoding='utf-8', mode='w') as cli_cert, \
|
||||
tempfile.NamedTemporaryFile(suffix='.key', encoding='utf-8', mode='w') as cli_key:
|
||||
srv_cert.write(context.init_script_environ["KAPOW_CONTROL_SERVER_CERT"])
|
||||
srv_cert.file.flush()
|
||||
cli_cert.write(context.init_script_environ["KAPOW_CONTROL_CLIENT_CERT"])
|
||||
cli_cert.file.flush()
|
||||
cli_key.write(context.init_script_environ["KAPOW_CONTROL_CLIENT_KEY"])
|
||||
cli_key.file.flush()
|
||||
session=requests.Session()
|
||||
session.verify=srv_cert.name
|
||||
session.cert=(cli_cert.name, cli_key.name)
|
||||
yield session
|
||||
|
||||
|
||||
def is_port_open(port):
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||
return sock.connect_ex(('127.0.0.1', port)) == 0
|
||||
|
||||
|
||||
def run_kapow_server(context, extra_args=""):
|
||||
assert (not is_port_open(Env.KAPOW_CONTROL_PORT)), "Another process is already bound"
|
||||
|
||||
context.server = subprocess.Popen(
|
||||
shlex.split(Env.KAPOW_SERVER_CMD),
|
||||
shlex.split(Env.KAPOW_SERVER_CMD) + shlex.split(extra_args) + [os.path.join(HERE, "get_environment.py")],
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL,
|
||||
env={'SPECTEST_FIFO': context.init_script_fifo_path, **os.environ},
|
||||
shell=False)
|
||||
|
||||
# Check process is running with reachable APIs
|
||||
open_ports = False
|
||||
for _ in range(Env.KAPOW_BOOT_TIMEOUT):
|
||||
is_running = context.server.poll() is None
|
||||
assert is_running, "Server is not running!"
|
||||
with suppress(requests.exceptions.ConnectionError):
|
||||
open_ports = (
|
||||
requests.head(Env.KAPOW_CONTROL_URL, timeout=1).status_code
|
||||
and requests.head(Env.KAPOW_DATA_URL, timeout=1).status_code)
|
||||
if open_ports:
|
||||
with suppress(requests_exceptions.ConnectionError):
|
||||
if is_port_open(Env.KAPOW_CONTROL_PORT):
|
||||
open_ports = True
|
||||
break
|
||||
sleep(.01)
|
||||
|
||||
assert open_ports, "API is unreachable after KAPOW_BOOT_TIMEOUT"
|
||||
|
||||
# Get init_script enviroment via fifo
|
||||
with open(context.init_script_fifo_path, 'r') as fifo:
|
||||
context.init_script_environ = json.load(fifo)
|
||||
|
||||
|
||||
@given('I have a just started Kapow! server')
|
||||
@given('I have a running Kapow! server')
|
||||
def step_impl(context):
|
||||
run_kapow_server(context)
|
||||
|
||||
|
||||
@given(u'I launch the server with the following extra arguments')
|
||||
def step_impl(context):
|
||||
run_kapow_server(context, context.text)
|
||||
|
||||
|
||||
@when('I request a route listing without providing a Control Access Token')
|
||||
def step_impl(context):
|
||||
with mtls_client(context) as requests:
|
||||
context.response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
|
||||
|
||||
|
||||
@when('I request a route listing without providing an empty Control Access Token')
|
||||
def step_impl(context):
|
||||
with mtls_client(context) as requests:
|
||||
context.response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
|
||||
|
||||
|
||||
@when(u'I request a route listing providing a bad Control Access Token')
|
||||
def step_impl(context):
|
||||
with mtls_client(context) as requests:
|
||||
context.response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
|
||||
|
||||
|
||||
@when('I request a routes listing')
|
||||
def step_impl(context):
|
||||
context.response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
|
||||
with mtls_client(context) as requests:
|
||||
context.response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
|
||||
|
||||
|
||||
@given('I have a Kapow! server with the following routes')
|
||||
@@ -117,10 +227,12 @@ def step_impl(context):
|
||||
if not hasattr(context, 'table'):
|
||||
raise RuntimeError("A table must be set for this step.")
|
||||
|
||||
for row in context.table:
|
||||
response = requests.post(f"{Env.KAPOW_CONTROL_URL}/routes",
|
||||
json={h: row[h] for h in row.headings})
|
||||
response.raise_for_status()
|
||||
with mtls_client(context) as requests:
|
||||
for row in context.table:
|
||||
response = requests.post(
|
||||
f"{Env.KAPOW_CONTROL_URL}/routes",
|
||||
json={h: row[h] for h in row.headings})
|
||||
response.raise_for_status()
|
||||
|
||||
|
||||
@given('I have a Kapow! server with the following testing routes')
|
||||
@@ -130,15 +242,16 @@ def step_impl(context):
|
||||
if not hasattr(context, 'table'):
|
||||
raise RuntimeError("A table must be set for this step.")
|
||||
|
||||
for row in context.table:
|
||||
response = requests.post(
|
||||
f"{Env.KAPOW_CONTROL_URL}/routes",
|
||||
json={"entrypoint": " ".join(
|
||||
[sys.executable,
|
||||
shlex.quote(os.path.join(HERE, "testinghandler.py")),
|
||||
shlex.quote(context.handler_fifo_path)]), # Created in before_scenario
|
||||
**{h: row[h] for h in row.headings}})
|
||||
response.raise_for_status()
|
||||
with mtls_client(context) as requests:
|
||||
for row in context.table:
|
||||
response = requests.post(
|
||||
f"{Env.KAPOW_CONTROL_URL}/routes",
|
||||
json={"entrypoint": " ".join(
|
||||
[sys.executable,
|
||||
shlex.quote(os.path.join(HERE, "testinghandler.py")),
|
||||
shlex.quote(context.handler_fifo_path)]), # Created in before_scenario
|
||||
**{h: row[h] for h in row.headings}})
|
||||
response.raise_for_status()
|
||||
|
||||
def testing_request(context, request_fn):
|
||||
# Run the request in background
|
||||
@@ -165,15 +278,17 @@ def step_impl(context, path):
|
||||
@when('I release the testing request')
|
||||
def step_impl(context):
|
||||
os.kill(int(context.testing_handler_pid), signal.SIGTERM)
|
||||
context.testing_handler_pid = None
|
||||
context.testing_response = context.testing_request.get()
|
||||
|
||||
|
||||
@when('I append the route')
|
||||
def step_impl(context):
|
||||
context.response = requests.post(f"{Env.KAPOW_CONTROL_URL}/routes",
|
||||
data=context.text,
|
||||
headers={"Content-Type": "application/json"})
|
||||
|
||||
with mtls_client(context) as requests:
|
||||
context.response = requests.post(
|
||||
f"{Env.KAPOW_CONTROL_URL}/routes",
|
||||
data=context.text,
|
||||
headers={"Content-Type": "application/json"})
|
||||
|
||||
@then('I get {code} as response code')
|
||||
def step_impl(context, code):
|
||||
@@ -212,50 +327,62 @@ def step_impl(context):
|
||||
|
||||
@when('I delete the route with id "{id}"')
|
||||
def step_impl(context, id):
|
||||
context.response = requests.delete(f"{Env.KAPOW_CONTROL_URL}/routes/{id}")
|
||||
with mtls_client(context) as requests:
|
||||
context.response = requests.delete(
|
||||
f"{Env.KAPOW_CONTROL_URL}/routes/{id}")
|
||||
|
||||
|
||||
@when('I insert the route')
|
||||
def step_impl(context):
|
||||
context.response = requests.put(f"{Env.KAPOW_CONTROL_URL}/routes",
|
||||
headers={"Content-Type": "application/json"},
|
||||
data=context.text)
|
||||
with mtls_client(context) as requests:
|
||||
context.response = requests.put(
|
||||
f"{Env.KAPOW_CONTROL_URL}/routes",
|
||||
headers={"Content-Type": "application/json"},
|
||||
data=context.text)
|
||||
|
||||
|
||||
@when('I try to append with this malformed JSON document')
|
||||
def step_impl(context):
|
||||
context.response = requests.post(
|
||||
f"{Env.KAPOW_CONTROL_URL}/routes",
|
||||
headers={"Content-Type": "application/json"},
|
||||
data=context.text)
|
||||
with mtls_client(context) as requests:
|
||||
context.response = requests.post(
|
||||
f"{Env.KAPOW_CONTROL_URL}/routes",
|
||||
headers={"Content-Type": "application/json"},
|
||||
data=context.text)
|
||||
|
||||
|
||||
@when('I delete the {order} route')
|
||||
def step_impl(context, order):
|
||||
idx = WORD2POS.get(order)
|
||||
routes = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
|
||||
id = routes.json()[idx]["id"]
|
||||
context.response = requests.delete(f"{Env.KAPOW_CONTROL_URL}/routes/{id}")
|
||||
with mtls_client(context) as requests:
|
||||
idx = WORD2POS.get(order)
|
||||
routes = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
|
||||
id = routes.json()[idx]["id"]
|
||||
context.response = requests.delete(
|
||||
f"{Env.KAPOW_CONTROL_URL}/routes/{id}")
|
||||
|
||||
|
||||
@when('I try to insert with this JSON document')
|
||||
def step_impl(context):
|
||||
context.response = requests.put(
|
||||
f"{Env.KAPOW_CONTROL_URL}/routes",
|
||||
headers={"Content-Type": "application/json"},
|
||||
data=context.text)
|
||||
with mtls_client(context) as requests:
|
||||
context.response = requests.put(
|
||||
f"{Env.KAPOW_CONTROL_URL}/routes",
|
||||
headers={"Content-Type": "application/json"},
|
||||
data=context.text)
|
||||
|
||||
@when('I get the route with id "{id}"')
|
||||
def step_impl(context, id):
|
||||
context.response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes/{id}")
|
||||
with mtls_client(context) as requests:
|
||||
context.response = requests.get(
|
||||
f"{Env.KAPOW_CONTROL_URL}/routes/{id}")
|
||||
|
||||
|
||||
@when('I get the {order} route')
|
||||
def step_impl(context, order):
|
||||
idx = WORD2POS.get(order)
|
||||
routes = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
|
||||
id = routes.json()[idx]["id"]
|
||||
context.response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes/{id}")
|
||||
with mtls_client(context) as requests:
|
||||
idx = WORD2POS.get(order)
|
||||
routes = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes")
|
||||
id = routes.json()[idx]["id"]
|
||||
context.response = requests.get(
|
||||
f"{Env.KAPOW_CONTROL_URL}/routes/{id}")
|
||||
|
||||
|
||||
@when('I get the resource "{resource}"')
|
||||
@@ -316,3 +443,216 @@ def step_impl(context, value, fieldType, elementName):
|
||||
raise ValueError("Unknown fieldtype {fieldType!r}")
|
||||
|
||||
assert actual == value, f"Expecting {fieldType} {elementName!r} to be {value!r}, got {actual!r} insted"
|
||||
|
||||
|
||||
@given('a test HTTPS server on the {port} port')
|
||||
def step_impl(context, port):
|
||||
context.request_ready = threading.Event()
|
||||
context.request_ready.clear()
|
||||
context.response_ready = threading.Event()
|
||||
context.response_ready.clear()
|
||||
|
||||
class SaveResponseHandler(http.server.BaseHTTPRequestHandler):
|
||||
def do_verb(self):
|
||||
context.request_response = self
|
||||
context.request_ready.set()
|
||||
context.response_ready.wait()
|
||||
do_GET=do_verb
|
||||
do_POST=do_verb
|
||||
do_PUT=do_verb
|
||||
do_DELETE=do_verb
|
||||
do_HEAD=do_verb
|
||||
|
||||
if port == "control":
|
||||
port = 8081
|
||||
elif port == "data":
|
||||
port = 8082
|
||||
else:
|
||||
raise ValueError(f"Unknown port {port}")
|
||||
|
||||
context.httpserver = http.server.HTTPServer(('127.0.0.1', port),
|
||||
SaveResponseHandler)
|
||||
|
||||
context.srv_key, context.srv_crt = generate_ssl_cert("control_server", "localhost")
|
||||
context.cli_key, context.cli_crt = generate_ssl_cert("control_client", "localhost")
|
||||
with tempfile.NamedTemporaryFile(suffix=".key") as key_file, \
|
||||
tempfile.NamedTemporaryFile(suffix=".crt") as crt_file:
|
||||
key_file.write(context.srv_key)
|
||||
key_file.flush()
|
||||
crt_file.write(context.srv_crt)
|
||||
crt_file.flush()
|
||||
context.httpserver.socket = ssl.wrap_socket(
|
||||
context.httpserver.socket,
|
||||
keyfile=key_file.name,
|
||||
certfile=crt_file.name,
|
||||
server_side=True)
|
||||
context.httpserver_thread = threading.Thread(
|
||||
target=context.httpserver.serve_forever,
|
||||
daemon=True)
|
||||
context.httpserver_thread.start()
|
||||
|
||||
|
||||
def run_command_with_certs(context, srv_crt, cli_crt, cli_key):
|
||||
_, command = context.text.split('$')
|
||||
command = command.lstrip()
|
||||
|
||||
def exec_in_thread():
|
||||
context.command = subprocess.Popen(
|
||||
command,
|
||||
shell=True,
|
||||
env={'KAPOW_CONTROL_SERVER_CERT': srv_crt,
|
||||
'KAPOW_CONTROL_CLIENT_CERT': cli_crt,
|
||||
'KAPOW_CONTROL_CLIENT_KEY': cli_key,
|
||||
**os.environ})
|
||||
context.command.wait()
|
||||
|
||||
context.command_thread = threading.Thread(target=exec_in_thread, daemon=True)
|
||||
context.command_thread.start()
|
||||
|
||||
@step('I run the following command (with invalid certs)')
|
||||
def step_impl(context):
|
||||
invalid_srv_crt, _ = generate_ssl_cert("invalid_control_server",
|
||||
"localhost")
|
||||
run_command_with_certs(context,
|
||||
invalid_srv_crt,
|
||||
context.cli_crt,
|
||||
context.cli_key)
|
||||
|
||||
|
||||
@step('I run the following command')
|
||||
def step_impl(context):
|
||||
run_command_with_certs(context,
|
||||
context.srv_crt,
|
||||
context.cli_crt,
|
||||
context.cli_key)
|
||||
|
||||
|
||||
@when('I run the following command (setting the control certs environment variables)')
|
||||
def step_impl(context):
|
||||
run_command_with_certs(
|
||||
context,
|
||||
context.init_script_environ["KAPOW_CONTROL_SERVER_CERT"],
|
||||
context.init_script_environ["KAPOW_CONTROL_CLIENT_CERT"],
|
||||
context.init_script_environ["KAPOW_CONTROL_CLIENT_KEY"])
|
||||
|
||||
|
||||
@step('the HTTPS server receives a "{method}" request to "{path}"')
|
||||
def step_impl(context, method, path):
|
||||
context.request_ready.wait()
|
||||
assert context.request_response.command == method, f"Method {context.request_response.command} is not {method}"
|
||||
assert context.request_response.path == path, f"Method {context.request_response.path} is not {path}"
|
||||
|
||||
|
||||
|
||||
@then('the received request has the header "{name}" set to "{value}"')
|
||||
def step_impl(context, name, value):
|
||||
context.request_ready.wait()
|
||||
matching = context.request_response.headers[name]
|
||||
assert matching, f"Header {name} not found"
|
||||
assert matching == value, f"Value of header doesn't match. {matching} != {value}"
|
||||
|
||||
|
||||
@when('the server responds with')
|
||||
def step_impl(context):
|
||||
# TODO: set the fields given in the table
|
||||
has_body = False
|
||||
for row in context.table:
|
||||
if row['field'] == 'status':
|
||||
context.request_response.send_response(int(row['value']))
|
||||
elif row['field'].startswith('headers.'):
|
||||
_, header = row['field'].split('.')
|
||||
context.request_response.send_header(header, row['value'])
|
||||
elif row['field'] == 'body':
|
||||
has_body = True
|
||||
payload = row['value'].encode('utf-8')
|
||||
context.request_response.send_header('Content-Length', str(len(payload)))
|
||||
context.request_response.end_headers()
|
||||
context.request_response.wfile.write(payload)
|
||||
|
||||
if not has_body:
|
||||
context.request_response.send_header('Content-Length', '0')
|
||||
context.request_response.end_headers()
|
||||
|
||||
context.response_ready.set()
|
||||
|
||||
@then('the command exits {immediately} with "{returncode}"')
|
||||
@then('the command exits with "{returncode}"')
|
||||
def step_impl(context, returncode, immediately=False):
|
||||
context.command_thread.join(timeout=3.0 if immediately else None)
|
||||
if context.command_thread.is_alive():
|
||||
try:
|
||||
print("killing in the name of")
|
||||
context.command.kill()
|
||||
finally:
|
||||
assert False, "The command is still alive"
|
||||
|
||||
else:
|
||||
context.command.wait()
|
||||
assert context.command.returncode == int(returncode), f"Command returned {context.command.returncode} instead of {returncode}"
|
||||
|
||||
|
||||
@then('the received request doesn\'t have the header "{name}" set')
|
||||
def step_impl(context, name):
|
||||
context.request_ready.wait()
|
||||
assert name not in context.request_response.headers, f"Header {name} found"
|
||||
|
||||
|
||||
@when('I try to connect to the control API without providing a certificate')
|
||||
def step_impl(context):
|
||||
try:
|
||||
context.request_response = requests.get(f"{Env.KAPOW_CONTROL_URL}/routes", verify=False)
|
||||
except Exception as exc:
|
||||
context.request_response = exc
|
||||
|
||||
|
||||
@then(u'I get a connection error')
|
||||
def step_impl(context):
|
||||
assert issubclass(type(context.request_response), Exception), context.request_response
|
||||
|
||||
|
||||
@when(u'I try to connect to the control API providing an invalid certificate')
|
||||
def step_impl(context):
|
||||
key, cert = generate_ssl_cert("foo", "localhost")
|
||||
with tempfile.NamedTemporaryFile(suffix='.crt') as cert_file, \
|
||||
tempfile.NamedTemporaryFile(suffix='.key') as key_file:
|
||||
cert_file.write(cert)
|
||||
cert_file.flush()
|
||||
key_file.write(key)
|
||||
key_file.flush()
|
||||
with requests.Session() as session:
|
||||
session.cert = (cert_file.name, key_file.name)
|
||||
session.verify = False
|
||||
try:
|
||||
context.request_response = session.get(
|
||||
f"{Env.KAPOW_CONTROL_URL}/routes")
|
||||
except Exception as exc:
|
||||
context.request_response = exc
|
||||
|
||||
|
||||
|
||||
@when('I inspect the automatically generated control server certificate')
|
||||
def step_impl(context):
|
||||
context.control_server_cert = x509.load_pem_x509_certificate(
|
||||
context.init_script_environ["KAPOW_CONTROL_SERVER_CERT"].encode('ascii'))
|
||||
|
||||
|
||||
@then('the extension "{extension}" contains "{value}" of type "{typename}"')
|
||||
def step_impl(context, extension, value, typename):
|
||||
if extension == 'Subject Alternative Name':
|
||||
oid = ExtensionOID.SUBJECT_ALTERNATIVE_NAME
|
||||
else:
|
||||
raise NotImplementedError(f'Unknown extension {extension}')
|
||||
|
||||
if typename == 'DNSName':
|
||||
type_ = x509.DNSName
|
||||
converter = lambda x: x
|
||||
elif typename == 'IPAddress':
|
||||
type_ = x509.IPAddress
|
||||
converter = ipaddress.ip_address
|
||||
else:
|
||||
raise NotImplementedError(f'Unknown type {typename}')
|
||||
|
||||
ext = context.control_server_cert.extensions.get_extension_for_oid(oid)
|
||||
values = ext.value.get_values_for_type(type_)
|
||||
|
||||
assert converter(value) in values, f"Value {value} not in {values}"
|
||||
|
||||
Reference in New Issue
Block a user