Doc and refactor.
This commit is contained in:
@@ -40,7 +40,7 @@ CONNECTIONS = {}
|
|||||||
|
|
||||||
class Connection:
|
class Connection:
|
||||||
"""
|
"""
|
||||||
Manages the lifecycle of a kapow connection.
|
Manages the lifecycle of a Kapow! connection.
|
||||||
|
|
||||||
Behaves like a memory for the "fields" available in HTTP
|
Behaves like a memory for the "fields" available in HTTP
|
||||||
connections.
|
connections.
|
||||||
@@ -150,15 +150,16 @@ class Connection:
|
|||||||
|
|
||||||
|
|
||||||
async def get_field(request):
|
async def get_field(request):
|
||||||
|
"""Get the value of some HTTP field in the given connection."""
|
||||||
id = request.match_info["id"]
|
id = request.match_info["id"]
|
||||||
resource = request.match_info["resource"]
|
field = request.match_info["field"]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
connection = CONNECTIONS[id]
|
connection = CONNECTIONS[id]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
response = web.HTTPNotFound()
|
response = web.HTTPNotFound()
|
||||||
else:
|
else:
|
||||||
content = await connection.get(resource)
|
content = await connection.get(field)
|
||||||
|
|
||||||
if isinstance(content, StreamReader):
|
if isinstance(content, StreamReader):
|
||||||
response = web.StreamResponse(status=200, reason="OK")
|
response = web.StreamResponse(status=200, reason="OK")
|
||||||
@@ -177,8 +178,9 @@ async def get_field(request):
|
|||||||
|
|
||||||
|
|
||||||
async def set_field(request):
|
async def set_field(request):
|
||||||
|
"""Set the value of some HTTP field in the given connection."""
|
||||||
id = request.match_info["id"]
|
id = request.match_info["id"]
|
||||||
resource = request.match_info["resource"]
|
field = request.match_info["field"]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
connection = CONNECTIONS[id]
|
connection = CONNECTIONS[id]
|
||||||
@@ -186,7 +188,7 @@ async def set_field(request):
|
|||||||
response = web.HTTPNotFound()
|
response = web.HTTPNotFound()
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
await connection.set(resource, request.content)
|
await connection.set(field, request.content)
|
||||||
except ConnectionResetError:
|
except ConnectionResetError:
|
||||||
# Raised when trying to write to an already-closed stream.
|
# Raised when trying to write to an already-closed stream.
|
||||||
request.transport.close()
|
request.transport.close()
|
||||||
@@ -206,28 +208,40 @@ async def append_field(request):
|
|||||||
|
|
||||||
|
|
||||||
def handle_route(entrypoint, command):
|
def handle_route(entrypoint, command):
|
||||||
|
"""
|
||||||
|
Return an aiohttp route handler that will execute entrypoint and
|
||||||
|
command in order to manage a Kapow! route.
|
||||||
|
|
||||||
|
"""
|
||||||
async def _handle(request):
|
async def _handle(request):
|
||||||
|
# Register a new connection to Kapow!
|
||||||
id = "CONN_" + str(uuid4()).replace('-', '_')
|
id = "CONN_" + str(uuid4()).replace('-', '_')
|
||||||
connection = CONNECTIONS[id] = Connection(request)
|
connection = CONNECTIONS[id] = Connection(request)
|
||||||
|
|
||||||
|
# Run entrypoint + command passing the connection id
|
||||||
executable, *params = shlex.split(entrypoint)
|
executable, *params = shlex.split(entrypoint)
|
||||||
args = ' '.join([executable] + [shlex.quote(token) for token in params] + [shlex.quote(command)])
|
args = ' '.join([executable]
|
||||||
|
+ [shlex.quote(token) for token in params]
|
||||||
|
+ [shlex.quote(command)])
|
||||||
|
try:
|
||||||
|
shell_task = await asyncio.create_subprocess_shell(
|
||||||
|
args,
|
||||||
|
env={**os.environ,
|
||||||
|
"KAPOW_URL": "http://localhost:8080/kapow",
|
||||||
|
"KAPOW_CONNECTION": id,
|
||||||
|
"PATH": ":".join([os.path.join(os.path.dirname(os.path.realpath(__file__)), "bin"),
|
||||||
|
os.environ["PATH"]]),
|
||||||
|
},
|
||||||
|
stdin=asyncio.subprocess.DEVNULL)
|
||||||
|
|
||||||
# Run the source
|
await shell_task.wait()
|
||||||
shell_task = await asyncio.create_subprocess_shell(
|
except:
|
||||||
args,
|
raise
|
||||||
env={**os.environ,
|
else:
|
||||||
"KAPOW_URL": "http://localhost:8080/kapow",
|
# Respond when the command finish
|
||||||
"KAPOW_CONNECTION": id,
|
return await connection.build_response()
|
||||||
"PATH": ":".join([os.path.join(os.path.dirname(os.path.realpath(__file__)), "bin"),
|
finally:
|
||||||
os.environ["PATH"]]),
|
del CONNECTIONS[id]
|
||||||
},
|
|
||||||
stdin=asyncio.subprocess.DEVNULL)
|
|
||||||
await shell_task.wait()
|
|
||||||
|
|
||||||
del CONNECTIONS[id]
|
|
||||||
|
|
||||||
return await connection.build_response()
|
|
||||||
|
|
||||||
return _handle
|
return _handle
|
||||||
|
|
||||||
@@ -237,10 +251,12 @@ def handle_route(entrypoint, command):
|
|||||||
########################################################################
|
########################################################################
|
||||||
|
|
||||||
async def get_routes(request):
|
async def get_routes(request):
|
||||||
|
"""Return the list of registered routes."""
|
||||||
return web.json_response(list(request.app.router))
|
return web.json_response(list(request.app.router))
|
||||||
|
|
||||||
|
|
||||||
async def create_route(request):
|
async def create_route(request):
|
||||||
|
"""Create a new Kapow! route."""
|
||||||
request.app.router._frozen = False
|
request.app.router._frozen = False
|
||||||
content = await request.json()
|
content = await request.json()
|
||||||
name = "ROUTE_" + str(uuid4()).replace('-', '_')
|
name = "ROUTE_" + str(uuid4()).replace('-', '_')
|
||||||
@@ -254,6 +270,7 @@ async def create_route(request):
|
|||||||
|
|
||||||
|
|
||||||
async def delete_route(request):
|
async def delete_route(request):
|
||||||
|
"""Delete the given Kapow! route."""
|
||||||
id = request.match_info["id"]
|
id = request.match_info["id"]
|
||||||
route = request.app.router._named_resources.pop(id)
|
route = request.app.router._named_resources.pop(id)
|
||||||
request.app.router._resources.remove(route)
|
request.app.router._resources.remove(route)
|
||||||
@@ -267,13 +284,18 @@ async def delete_route(request):
|
|||||||
|
|
||||||
|
|
||||||
async def run_init_script(app):
|
async def run_init_script(app):
|
||||||
|
"""
|
||||||
|
Run the init script if given, then wait for the shell to finish.
|
||||||
|
|
||||||
|
"""
|
||||||
if len(sys.argv) == 1:
|
if len(sys.argv) == 1:
|
||||||
# No script given
|
# No script given
|
||||||
cmd = "/bin/bash"
|
cmd = "/bin/bash"
|
||||||
elif len(sys.argv) == 2:
|
elif len(sys.argv) == 2:
|
||||||
cmd = f"/bin/bash --init-file {sys.argv[1]}"
|
cmd = f"/bin/bash --init-file {sys.argv[1]}"
|
||||||
else:
|
else:
|
||||||
raise RuntimeError("Invalid parameter count.")
|
print(f"Usage: sys.argv[0] <init-script>")
|
||||||
|
os._exit(1)
|
||||||
|
|
||||||
binpath = os.path.join(os.path.dirname(os.path.realpath(__file__)), "bin")
|
binpath = os.path.join(os.path.dirname(os.path.realpath(__file__)), "bin")
|
||||||
shell_task = await asyncio.create_subprocess_shell(
|
shell_task = await asyncio.create_subprocess_shell(
|
||||||
@@ -293,19 +315,20 @@ async def start_background_tasks(app):
|
|||||||
app["debug_tasks"] = loop.create_task(run_init_script(app))
|
app["debug_tasks"] = loop.create_task(run_init_script(app))
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def kapow():
|
||||||
|
"""Start aiohttp app."""
|
||||||
app = web.Application(client_max_size=1024**3)
|
app = web.Application(client_max_size=1024**3)
|
||||||
app.add_routes([
|
app.add_routes([
|
||||||
web.get('/kapow/routes', get_routes),
|
web.get('/kapow/routes', get_routes),
|
||||||
web.post('/kapow/routes', create_route),
|
web.post('/kapow/routes', create_route),
|
||||||
web.delete('/kapow/routes/{id}', delete_route),
|
web.delete('/kapow/routes/{id}', delete_route),
|
||||||
web.get('/kapow/connections/{id}/{resource:.*}', get_field),
|
web.get('/kapow/connections/{id}/{field:.*}', get_field),
|
||||||
# web.post('/kapow/connections/{id}/{resource:.*}', append_resource),
|
# web.post('/kapow/connections/{id}/{field:.*}', append_field),
|
||||||
web.put('/kapow/connections/{id}/{resource:.*}', set_field),
|
web.put('/kapow/connections/{id}/{field:.*}', set_field),
|
||||||
])
|
])
|
||||||
app.on_startup.append(start_background_tasks)
|
app.on_startup.append(start_background_tasks)
|
||||||
web.run_app(app)
|
web.run_app(app)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
kapow()
|
||||||
|
|||||||
Reference in New Issue
Block a user