Files
kapow/spec/test/features/steps/steps.py
pancho horrillo 6194c6961a Tweak PoC to pass current test suite
Co-authored-by: Roberto Abdelkader Martínez Pérez <robertomartinezp@gmail.com>
2019-11-11 17:47:28 +01:00

309 lines
10 KiB
Python

#
# Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from contextlib import suppress
from time import sleep
import json
import os
import shlex
import signal
import socket
import subprocess
import sys
import tempfile
import threading
from multiprocessing.pool import ThreadPool
import time
import requests
from environconfig import EnvironConfig, StringVar, IntVar, BooleanVar
from comparedict import is_subset
import jsonexample
import logging
WORD2POS = {"first": 0, "second": 1, "last": -1}
HERE = os.path.dirname(__file__)
class Env(EnvironConfig):
#: How to run Kapow! server
KAPOW_SERVER_CMD = StringVar(default="kapow server")
#: Where the Control API is
KAPOW_CONTROLAPI_URL = StringVar(default="http://localhost:8081")
#: Where the Data API is
KAPOW_DATAAPI_URL = StringVar(default="http://localhost:8082")
#: Where the User Interface is
KAPOW_USER_URL = StringVar(default="http://localhost:8080")
KAPOW_BOOT_TIMEOUT = IntVar(default=1000)
KAPOW_DEBUG_TESTS = BooleanVar(default=False)
if Env.KAPOW_DEBUG_TESTS:
# These two lines enable debugging at httplib level
# (requests->urllib3->http.client) You will see the REQUEST,
# including HEADERS and DATA, and RESPONSE with HEADERS but without
# DATA. The only thing missing will be the response.body which is
# not logged.
try:
import http.client as http_client
except ImportError:
# Python 2
import httplib as http_client
http_client.HTTPConnection.debuglevel = 1
# You must initialize logging, otherwise you'll not see debug output.
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
def run_kapow_server(context):
context.server = subprocess.Popen(
shlex.split(Env.KAPOW_SERVER_CMD),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
shell=False)
# Check process is running with reachable APIs
open_ports = False
for _ in range(Env.KAPOW_BOOT_TIMEOUT):
is_running = context.server.poll() is None
assert is_running, "Server is not running!"
with suppress(requests.exceptions.ConnectionError):
open_ports = (
requests.head(Env.KAPOW_CONTROLAPI_URL, timeout=1).status_code
and requests.head(Env.KAPOW_DATAAPI_URL, timeout=1).status_code)
if open_ports:
break
sleep(.01)
assert open_ports, "API is unreachable after KAPOW_BOOT_TIMEOUT"
@given('I have a just started Kapow! server')
@given('I have a running Kapow! server')
def step_impl(context):
run_kapow_server(context)
@when('I request a routes listing')
def step_impl(context):
context.response = requests.get(f"{Env.KAPOW_CONTROLAPI_URL}/routes")
@given('I have a Kapow! server with the following routes')
def step_impl(context):
run_kapow_server(context)
if not hasattr(context, 'table'):
raise RuntimeError("A table must be set for this step.")
for row in context.table:
response = requests.post(f"{Env.KAPOW_CONTROLAPI_URL}/routes",
json={h: row[h] for h in row.headings})
response.raise_for_status()
@given('I have a Kapow! server with the following testing routes')
def step_impl(context):
run_kapow_server(context)
if not hasattr(context, 'table'):
raise RuntimeError("A table must be set for this step.")
for row in context.table:
response = requests.post(
f"{Env.KAPOW_CONTROLAPI_URL}/routes",
json={"entrypoint": " ".join(
[sys.executable,
shlex.quote(os.path.join(HERE, "testinghandler.py")),
shlex.quote(context.handler_fifo_path)]), # Created in before_scenario
**{h: row[h] for h in row.headings}})
response.raise_for_status()
def testing_request(context, request_fn):
# Run the request in background
context.testing_request = ThreadPool(processes=1).apply_async(request_fn)
# Block until the handler connects and give us its pid and the
# handler_id
with open(context.handler_fifo_path, 'r') as fifo:
(context.testing_handler_pid,
context.testing_handler_id) = fifo.readline().rstrip('\n').split(';')
@when('I send a request to the testing route "{path}"')
def step_impl(context, path):
def _request():
try:
return requests.get(f"{Env.KAPOW_USER_URL}{path}", stream=False)
except:
return None
testing_request(context, _request)
@when('I release the testing request')
def step_impl(context):
os.kill(int(context.testing_handler_pid), signal.SIGTERM)
context.testing_response = context.testing_request.get()
@when('I append the route')
def step_impl(context):
context.response = requests.post(f"{Env.KAPOW_CONTROLAPI_URL}/routes",
data=context.text,
headers={"Content-Type": "application/json"})
@then('I get {code} as response code')
def step_impl(context, code):
assert context.response.status_code == int(code), f"Got {context.response.status_code} instead"
@then('I get {code} as response code in the testing request')
def step_impl(context, code):
assert context.testing_response.status_code == int(code), f"Got {context.testing_response.status_code} instead"
@then('I get "{reason}" as response reason phrase')
def step_impl(context, reason):
assert context.response.reason == reason, f"Got {context.response.reason} instead"
@then('I get the following response body')
def step_impl(context):
assert is_subset(jsonexample.loads(context.text), context.response.json())
@then('I get the following response raw body')
def step_impl(context):
assert context.text == context.response.text, f"{context.text!r} != {context.response.text!r}"
@when('I delete the route with id "{id}"')
def step_impl(context, id):
context.response = requests.delete(f"{Env.KAPOW_CONTROLAPI_URL}/routes/{id}")
@when('I insert the route')
def step_impl(context):
context.response = requests.put(f"{Env.KAPOW_CONTROLAPI_URL}/routes",
headers={"Content-Type": "application/json"},
data=context.text)
@when('I try to append with this malformed JSON document')
def step_impl(context):
context.response = requests.post(
f"{Env.KAPOW_CONTROLAPI_URL}/routes",
headers={"Content-Type": "application/json"},
data=context.text)
@when('I delete the {order} route')
def step_impl(context, order):
idx = WORD2POS.get(order)
routes = requests.get(f"{Env.KAPOW_CONTROLAPI_URL}/routes")
id = routes.json()[idx]["id"]
context.response = requests.delete(f"{Env.KAPOW_CONTROLAPI_URL}/routes/{id}")
@when('I try to insert with this JSON document')
def step_impl(context):
context.response = requests.put(
f"{Env.KAPOW_CONTROLAPI_URL}/routes",
headers={"Content-Type": "application/json"},
data=context.text)
@when('I get the route with id "{id}"')
def step_impl(context, id):
context.response = requests.get(f"{Env.KAPOW_CONTROLAPI_URL}/routes/{id}")
@when('I get the {order} route')
def step_impl(context, order):
idx = WORD2POS.get(order)
routes = requests.get(f"{Env.KAPOW_CONTROLAPI_URL}/routes")
id = routes.json()[idx]["id"]
context.response = requests.get(f"{Env.KAPOW_CONTROLAPI_URL}/routes/{id}")
@when('I get the resource "{resource}"')
@when('I get the resource "{resource}" for the handler with id "{handler_id}"')
def step_impl(context, resource, handler_id=None):
if handler_id is None:
handler_id = context.testing_handler_id
context.response = requests.get(
f"{Env.KAPOW_DATAAPI_URL}/handlers/{handler_id}{resource}")
@when('I set the resource "{resource}" with value "{value}"')
def step_impl(context, resource, value):
context.response = requests.put(
f"{Env.KAPOW_DATAAPI_URL}/handlers/{context.testing_handler_id}{resource}",
data=value.encode("utf-8"))
@when('I send a request to the testing route "{path}" adding')
def step_impl(context, path):
if not hasattr(context, 'table'):
raise RuntimeError("A table must be set for this step.")
params = {
"headers": dict(),
"cookies": dict(),
"params": dict()}
setters = {
"header": params["headers"].setdefault,
"cookie": params["cookies"].setdefault,
"body": lambda _, v: params.setdefault("data", v.encode("utf-8")),
"parameter": params["params"].setdefault,}
for row in context.table:
setters[row["fieldType"]](row["name"], row["value"])
def _request():
try:
return requests.get(f"{Env.KAPOW_USER_URL}{path}",
stream=False,
**params)
except:
return None
testing_request(context, _request)
@then('I get the value "{value}" for the response "{fieldType}" named "{elementName}" in the testing request')
def step_impl(context, value, fieldType, elementName):
if fieldType == "header":
actual = context.testing_response.headers.get(elementName)
elif fieldType == "cookie":
actual = context.testing_response.cookies.get(elementName)
elif fieldType == "body":
actual = context.testing_response.text
else:
raise ValueError("Unknown fieldtype {fieldType!r}")
assert actual == value, f"Expecting {fieldType} {elementName!r} to be {value!r}, got {actual!r} insted"