From 5765cf8a5954002d9ef33e231d7ffcad1e2251c6 Mon Sep 17 00:00:00 2001 From: Michael Pilosov Date: Sun, 19 Mar 2023 02:38:22 -0600 Subject: [PATCH] working decorator? --- example_block/eden-server/Dockerfile | 2 + example_block/eden-server/announce.py | 49 +++++++++++++++++++------ example_block/eden-server/server.py | 53 ++------------------------- registry/host.py | 2 +- 4 files changed, 44 insertions(+), 62 deletions(-) diff --git a/example_block/eden-server/Dockerfile b/example_block/eden-server/Dockerfile index 178df9c..de413a1 100644 --- a/example_block/eden-server/Dockerfile +++ b/example_block/eden-server/Dockerfile @@ -23,6 +23,8 @@ ENV PATH="/home/eden/.local/bin:${PATH}" RUN pip install eden-python RUN pip install python-socketio[asyncio_server] aiohttp COPY server.py . +COPY announce.py . +RUN touch __init__.py # attempted bugfix COPY image_utils.py /home/eden/.local/lib/python3.10/site-packages/eden/image_utils.py # attempt git-python hackaround diff --git a/example_block/eden-server/announce.py b/example_block/eden-server/announce.py index be3240c..2a34c05 100644 --- a/example_block/eden-server/announce.py +++ b/example_block/eden-server/announce.py @@ -25,8 +25,12 @@ SERVER_PORT = 8000 sio = socketio.AsyncClient() async def announce_server(): - await sio.connect(f'http://{HOST_SERVER_IP}:{HOST_SERVER_PORT}') - await sio.emit('register', {'name': SERVER_NAME, 'ip': SERVER_IP, 'port': SERVER_PORT}) + @sio.event + async def connect(): + await sio.emit('register', {'name': SERVER_NAME, 'ip': SERVER_IP, 'port': SERVER_PORT}) + + async def main(): + await sio.connect(f'http://{HOST_SERVER_IP}:{HOST_SERVER_PORT}') @sio.on("heartbeat") async def on_heartbeat(): @@ -35,21 +39,44 @@ async def announce_server(): @sio.event async def disconnect(): print("Disconnected from host") + + await main() + +# def announce_server_decorator(host_block_function): +# @wraps(host_block_function) +# def wrapper(*args, **kwargs): +# loop = asyncio.get_event_loop() + +# # Start the server announcement task +# announce_task = loop.create_task(announce_server()) + +# # Run the original host_block function +# result = host_block_function(*args, **kwargs) + +# # Cancel the announcement task after the host_block function is done +# announce_task.cancel() + +# return result + +# return wrapper def announce_server_decorator(host_block_function): @wraps(host_block_function) def wrapper(*args, **kwargs): - loop = asyncio.get_event_loop() - - # Start the server announcement task - announce_task = loop.create_task(announce_server()) + async def main(*args, **kwargs): + loop = asyncio.get_event_loop() + host_block_thread = loop.run_in_executor(None, host_block_function) - # Run the original host_block function - result = host_block_function(*args, **kwargs) + # Announce the server to the host + await announce_server() + # announce_task = loop.create_task(announce_server()) - # Cancel the announcement task after the host_block function is done - announce_task.cancel() + # run announcement task infinitely in background but allow host_block to run + # await asyncio.gather(announce_task, host_block_thread) + # Wait for host_block to finish + await host_block_thread - return result + # announce_task.cancel() + return asyncio.run(main()) return wrapper diff --git a/example_block/eden-server/server.py b/example_block/eden-server/server.py index 6c28f34..039f309 100644 --- a/example_block/eden-server/server.py +++ b/example_block/eden-server/server.py @@ -55,7 +55,9 @@ def do_something(config): pil_image = Image(pil_image) return {"value": value, "index": index, "label": label, 'image': pil_image} +from announce import announce_server_decorator +@announce_server_decorator def run_host_block(): host_block( block=eden_block, @@ -69,55 +71,6 @@ def run_host_block(): requires_gpu=True, ) -import asyncio -import socketio -import socket - -def get_ip_address(): - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - try: - # This IP address doesn't need to be reachable, as we're only using it to find the local IP address - s.connect(("10.255.255.255", 1)) - ip = s.getsockname()[0] - except Exception: - ip = "127.0.0.1" - finally: - s.close() - return ip - - -# Update these with the correct values for your host and server -HOST_SERVER_IP = "192.168.1.113" -HOST_SERVER_PORT = 4999 -SERVER_NAME = "server_1" -SERVER_IP = get_ip_address() -SERVER_PORT = 8000 - -sio = socketio.AsyncClient() - -async def announce_server(): - await sio.connect(f'http://{HOST_SERVER_IP}:{HOST_SERVER_PORT}') - await sio.emit('register', {'name': SERVER_NAME, 'ip': SERVER_IP, 'port': SERVER_PORT}) - - @sio.on("heartbeat") - async def on_heartbeat(): - print("Received heartbeat from host") - - @sio.event - async def disconnect(): - print("Disconnected from host") - -async def main(): - # Run host_block in a separate thread - loop = asyncio.get_event_loop() - host_block_thread = loop.run_in_executor(None, run_host_block) - - # Announce the server to the host - await announce_server() - - # Wait for host_block to finish - await host_block_thread - if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + run_host_block() \ No newline at end of file diff --git a/registry/host.py b/registry/host.py index 113f227..efc9cee 100644 --- a/registry/host.py +++ b/registry/host.py @@ -5,7 +5,7 @@ from aiohttp import web SERVER_0_IP = "192.168.1.113" FLASK_SERVER_PORT = 4999 -HEARTBEAT_INTERVAL = 1 +HEARTBEAT_INTERVAL = 5 HEARTBEAT_TIMEOUT = 3 sio = socketio.AsyncServer(async_mode='aiohttp')