Browse Source

working decorator?

main
Michael Pilosov 2 years ago
parent
commit
5765cf8a59
  1. 2
      example_block/eden-server/Dockerfile
  2. 45
      example_block/eden-server/announce.py
  3. 53
      example_block/eden-server/server.py
  4. 2
      registry/host.py

2
example_block/eden-server/Dockerfile

@ -23,6 +23,8 @@ ENV PATH="/home/eden/.local/bin:${PATH}"
RUN pip install eden-python
RUN pip install python-socketio[asyncio_server] aiohttp
COPY server.py .
COPY announce.py .
RUN touch __init__.py
# attempted bugfix
COPY image_utils.py /home/eden/.local/lib/python3.10/site-packages/eden/image_utils.py
# attempt git-python hackaround

45
example_block/eden-server/announce.py

@ -25,9 +25,13 @@ SERVER_PORT = 8000
sio = socketio.AsyncClient()
async def announce_server():
await sio.connect(f'http://{HOST_SERVER_IP}:{HOST_SERVER_PORT}')
@sio.event
async def connect():
await sio.emit('register', {'name': SERVER_NAME, 'ip': SERVER_IP, 'port': SERVER_PORT})
async def main():
await sio.connect(f'http://{HOST_SERVER_IP}:{HOST_SERVER_PORT}')
@sio.on("heartbeat")
async def on_heartbeat():
print("Received heartbeat from host")
@ -36,20 +40,43 @@ async def announce_server():
async def disconnect():
print("Disconnected from host")
await main()
# def announce_server_decorator(host_block_function):
# @wraps(host_block_function)
# def wrapper(*args, **kwargs):
# loop = asyncio.get_event_loop()
# # Start the server announcement task
# announce_task = loop.create_task(announce_server())
# # Run the original host_block function
# result = host_block_function(*args, **kwargs)
# # Cancel the announcement task after the host_block function is done
# announce_task.cancel()
# return result
# return wrapper
def announce_server_decorator(host_block_function):
@wraps(host_block_function)
def wrapper(*args, **kwargs):
async def main(*args, **kwargs):
loop = asyncio.get_event_loop()
host_block_thread = loop.run_in_executor(None, host_block_function)
# Start the server announcement task
announce_task = loop.create_task(announce_server())
# Run the original host_block function
result = host_block_function(*args, **kwargs)
# Announce the server to the host
await announce_server()
# announce_task = loop.create_task(announce_server())
# Cancel the announcement task after the host_block function is done
announce_task.cancel()
# run announcement task infinitely in background but allow host_block to run
# await asyncio.gather(announce_task, host_block_thread)
# Wait for host_block to finish
await host_block_thread
return result
# announce_task.cancel()
return asyncio.run(main())
return wrapper

53
example_block/eden-server/server.py

@ -55,7 +55,9 @@ def do_something(config):
pil_image = Image(pil_image)
return {"value": value, "index": index, "label": label, 'image': pil_image}
from announce import announce_server_decorator
@announce_server_decorator
def run_host_block():
host_block(
block=eden_block,
@ -69,55 +71,6 @@ def run_host_block():
requires_gpu=True,
)
import asyncio
import socketio
import socket
def get_ip_address():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# This IP address doesn't need to be reachable, as we're only using it to find the local IP address
s.connect(("10.255.255.255", 1))
ip = s.getsockname()[0]
except Exception:
ip = "127.0.0.1"
finally:
s.close()
return ip
# Update these with the correct values for your host and server
HOST_SERVER_IP = "192.168.1.113"
HOST_SERVER_PORT = 4999
SERVER_NAME = "server_1"
SERVER_IP = get_ip_address()
SERVER_PORT = 8000
sio = socketio.AsyncClient()
async def announce_server():
await sio.connect(f'http://{HOST_SERVER_IP}:{HOST_SERVER_PORT}')
await sio.emit('register', {'name': SERVER_NAME, 'ip': SERVER_IP, 'port': SERVER_PORT})
@sio.on("heartbeat")
async def on_heartbeat():
print("Received heartbeat from host")
@sio.event
async def disconnect():
print("Disconnected from host")
async def main():
# Run host_block in a separate thread
loop = asyncio.get_event_loop()
host_block_thread = loop.run_in_executor(None, run_host_block)
# Announce the server to the host
await announce_server()
# Wait for host_block to finish
await host_block_thread
if __name__ == "__main__":
asyncio.run(main())
run_host_block()

2
registry/host.py

@ -5,7 +5,7 @@ from aiohttp import web
SERVER_0_IP = "192.168.1.113"
FLASK_SERVER_PORT = 4999
HEARTBEAT_INTERVAL = 1
HEARTBEAT_INTERVAL = 5
HEARTBEAT_TIMEOUT = 3
sio = socketio.AsyncServer(async_mode='aiohttp')

Loading…
Cancel
Save