From c55ce23d76fe4a77381fb8194318a26335d739c9 Mon Sep 17 00:00:00 2001 From: Michael Pilosov Date: Sun, 19 Mar 2023 02:02:46 -0600 Subject: [PATCH] announcement-functional block hosting --- .gitignore | 1 + Dockerfile | 14 +- app/__init__.py | 2 +- app/routes.py | 50 ++- docker-compose.yml | 5 + example_block/c2.py | 57 +++ example_block/client.py | 41 ++ example_block/docker-compose.yml | 47 +++ example_block/eden-server/Dockerfile | 42 ++ example_block/eden-server/announce.py | 55 +++ example_block/eden-server/hosting.py | 515 +++++++++++++++++++++++ example_block/eden-server/image_utils.py | 75 ++++ example_block/eden-server/server copy.py | 69 +++ example_block/eden-server/server.py | 123 ++++++ example_block/nginx.conf | 24 ++ example_block/redis/Dockerfile | 12 + example_block/s2.py | 66 +++ example_block/server.py | 66 +++ image_utils.py | 75 ++++ index.html | 1 + registry/host.py | 77 ++++ registry/requirements.txt | 2 + registry/s1.py | 39 ++ registry/s2.py | 39 ++ requirements.txt | 3 +- 25 files changed, 1496 insertions(+), 4 deletions(-) create mode 100644 example_block/c2.py create mode 100644 example_block/client.py create mode 100644 example_block/docker-compose.yml create mode 100644 example_block/eden-server/Dockerfile create mode 100644 example_block/eden-server/announce.py create mode 100644 example_block/eden-server/hosting.py create mode 100644 example_block/eden-server/image_utils.py create mode 100644 example_block/eden-server/server copy.py create mode 100644 example_block/eden-server/server.py create mode 100644 example_block/nginx.conf create mode 100644 example_block/redis/Dockerfile create mode 100644 example_block/s2.py create mode 100644 example_block/server.py create mode 100644 image_utils.py create mode 100644 registry/host.py create mode 100644 registry/requirements.txt create mode 100644 registry/s1.py create mode 100644 registry/s2.py diff --git a/.gitignore b/.gitignore index 272f2a0..11ae271 100644 --- a/.gitignore +++ b/.gitignore @@ -140,3 +140,4 @@ cython_debug/ # Project-specific files cookies.txt +*.dump.rdb diff --git a/Dockerfile b/Dockerfile index 4dff50e..c8d3353 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,16 @@ -FROM python:3.9 +FROM python:3.10 +RUN apt-get update -yqq && apt-get install -y \ + libgl1-mesa-glx \ + && rm -rf /var/lib/apt/lists/* WORKDIR /app +RUN useradd -ms /bin/bash eden +# make them own /app +RUN chown eden:eden /app + +USER eden +# add /home/eden/.local/bin to PATH +ENV PATH="/home/eden/.local/bin:${PATH}" COPY requirements.txt . RUN pip install -r requirements.txt @@ -8,5 +18,7 @@ RUN pip install -r requirements.txt COPY app app EXPOSE 5000 +# attempted bugfix +COPY image_utils.py /home/eden/.local/lib/python3.10/site-packages/eden/image_utils.py CMD ["flask", "run", "--debug", "--host=0.0.0.0"] diff --git a/app/__init__.py b/app/__init__.py index 46b68d9..037c0af 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -3,7 +3,7 @@ from flask_sqlalchemy import SQLAlchemy from flask_cors import CORS app = Flask(__name__) -CORS(app, supports_credentials=True) +# CORS(app, supports_credentials=True) app.config.from_pyfile('config.py') db = SQLAlchemy(app) diff --git a/app/routes.py b/app/routes.py index 9ff2156..bc82590 100644 --- a/app/routes.py +++ b/app/routes.py @@ -1,6 +1,10 @@ from flask import request, jsonify, redirect, url_for, render_template from app import app, db # from app.models import User +import PIL +from eden.client import Client +from eden.datatypes import Image +import time @app.route('/api', methods=['POST']) def process_request(): @@ -13,4 +17,48 @@ def process_request(): # log to flask log the blocks list. app.logger.info(blocks) app.logger.info(text) - return jsonify(success=True) + + results = communicate_with_eden(app, image) + + return jsonify(results) + # return jsonify(success=True) + + +def communicate_with_eden(app, image, ip_address="172.18.0.3", port="5656"): + url = f"http://{ip_address}:{port}" + + ## set up a client + c = Client(url=url, username="abraham") + + # get server's identity + generator_id = c.get_generator_identity() + print(generator_id) + + app.logger.info("setting config") + ## define input args to be sent + config = { + "width": 2000, ## width + "height": 1000, ## height + "input_image": Image( + PIL.Image.open(image.stream) + ), ## images require eden.datatypes.Image() + } + + app.logger.info("set config, running now") + # start the task + run_response = c.run(config) + results = c.fetch(token=run_response["token"]) + + print("Intitial response") + # check status of the task, returns the output too if the task is complete + # results = c.await_results(token=run_response["token"], interval=1, show_progress=False) + i = 0 + while results["status"].get("status") != "complete": + results = c.fetch(token=run_response["token"]) + print(results) + time.sleep(0.1) + i += 1 + if i > 50: + break + + return results \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index bb573ba..7f48fb7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,3 +23,8 @@ services: volumes: db-data: + +networks: + default: + name: eden-network + external: true \ No newline at end of file diff --git a/example_block/c2.py b/example_block/c2.py new file mode 100644 index 0000000..5f28724 --- /dev/null +++ b/example_block/c2.py @@ -0,0 +1,57 @@ +import time +from eden.client import Client +from eden.datatypes import Image + +import subprocess +import socket + +# Get IP address of eden-server service +hostname = 'eden-server' +port = 5656 +network_name = 'eden-network' +import docker +client = docker.from_env() +project_name = 'not_so_minimal' +container_name = f'{project_name}_{hostname}_1' +container = client.containers.get(container_name) +ip_address = container.attrs['NetworkSettings']['Networks'][network_name]['IPAddress'] +print(ip_address) +url = f"http://{ip_address}:{port}" + +## set up a client +c = Client(url=url, username="abraham") + +# get server's identity +generator_id = c.get_generator_identity() +print(generator_id) + +## define input args to be sent +config = { + "width": 2000, ## width + "height": 1000, ## height + "input_image": Image( + "/home/mm/Downloads/FF06F0EC-1B54-458A-BF12-FF7FC2A43C10.jpeg" + ), ## images require eden.datatypes.Image() +} + +# start the task +run_response = c.run(config) + +print("Intitial response") +# check status of the task, returns the output too if the task is complete +results = c.fetch(token=run_response["token"]) +print(results) + +# one eternity later +# time.sleep(5) + +print("Trying") +while results["status"].get("status") != "complete": + results = c.fetch(token=run_response["token"]) + print(results) + time.sleep(0.1) + +## check status again, hopefully the task is complete by now +# results = c.fetch(token=run_response["token"]) +# print(results) +# results['output']['image'].show() diff --git a/example_block/client.py b/example_block/client.py new file mode 100644 index 0000000..065caea --- /dev/null +++ b/example_block/client.py @@ -0,0 +1,41 @@ +import time +from eden.client import Client +from eden.datatypes import Image + +## set up a client +c = Client(url="http://0.0.0.0:5656", username="abraham") + +# get server's identity +generator_id = c.get_generator_identity() +print(generator_id) + +## define input args to be sent +config = { + "width": 2000, ## width + "height": 1000, ## height + "input_image": Image( + "/home/mm/Downloads/FF06F0EC-1B54-458A-BF12-FF7FC2A43C10.jpeg" + ), ## images require eden.datatypes.Image() +} + +# start the task +run_response = c.run(config) + +print("Intitial response") +# check status of the task, returns the output too if the task is complete +results = c.fetch(token=run_response["token"]) +print(results) + +# one eternity later +# time.sleep(5) + +print("Trying") +while results["status"].get("status") != "complete": + results = c.fetch(token=run_response["token"]) + print(results) + time.sleep(0.1) + +## check status again, hopefully the task is complete by now +# results = c.fetch(token=run_response["token"]) +# print(results) +# results['output']['image'].show() \ No newline at end of file diff --git a/example_block/docker-compose.yml b/example_block/docker-compose.yml new file mode 100644 index 0000000..ab71997 --- /dev/null +++ b/example_block/docker-compose.yml @@ -0,0 +1,47 @@ +# docker-compose for redis service defined in ./redis +version: '3.7' + +services: + redis: + build: ./redis + image: redis + ports: + - "6379:6379" + volumes: + - ./data:/data + networks: + - default + +# eden server, started with python server.py, based on Dockerfile in cwd. + eden-server: + build: ./eden-server + image: eden-server + # ports: + # - "5656:5656" + volumes: + - /home/mm/.cache/torch/hub/checkpoints/resnet50-11ad3fa6.pth:/root/.cache/torch/hub/checkpoints/resnet50-11ad3fa6.pth + networks: + - default + depends_on: + - redis + # pass nvidia gpu + runtime: nvidia + environment: + - CUDA_VISIBLE_DEVICES=0 + - NVIDIA_VISIBLE_DEVICES=0 + + # load-balancer: + # image: nginx + # ports: + # - "5656:80" + # volumes: + # - ./nginx.conf:/etc/nginx/nginx.conf:ro + # networks: + # - default + # depends_on: + # - eden-server + +networks: + default: + name: eden-network + external: true \ No newline at end of file diff --git a/example_block/eden-server/Dockerfile b/example_block/eden-server/Dockerfile new file mode 100644 index 0000000..178df9c --- /dev/null +++ b/example_block/eden-server/Dockerfile @@ -0,0 +1,42 @@ +FROM pytorch/pytorch:1.13.1-cuda11.6-cudnn8-runtime + +RUN apt-get update && apt-get install -y \ + libgl1-mesa-glx \ + libglib2.0-0 \ + && rm -rf /var/lib/apt/lists/* + + +# until we hack around gitpython, we need git +# RUN apt-get update && apt-get install -y \ +# git \ +# && rm -rf /var/lib/apt/lists/* + +WORKDIR /app +# create a safe user +RUN useradd -ms /bin/bash eden +# make them own /app +RUN chown eden:eden /app + +USER eden +# add /home/eden/.local/bin to PATH +ENV PATH="/home/eden/.local/bin:${PATH}" +RUN pip install eden-python +RUN pip install python-socketio[asyncio_server] aiohttp +COPY server.py . +# attempted bugfix +COPY image_utils.py /home/eden/.local/lib/python3.10/site-packages/eden/image_utils.py +# attempt git-python hackaround +COPY hosting.py /home/eden/.local/lib/python3.10/site-packages/eden/hosting.py + +EXPOSE 5656 +# ENV GIT_PYTHON_REFRESH=quiet +# hack around gitpython +# RUN git init . +# RUN git config --global user.email "none@site.com" +# RUN git config --global user.name "eden-service-user" +# # add fake remote upstream +# RUN git remote add origin https://git.clfx.cc/mm/eden-app.git +# RUN git add server.py +# RUN git commit -am "initial commit" +ENV GIT_PYTHON_REFRESH=quiet +CMD ["python", "server.py"] \ No newline at end of file diff --git a/example_block/eden-server/announce.py b/example_block/eden-server/announce.py new file mode 100644 index 0000000..be3240c --- /dev/null +++ b/example_block/eden-server/announce.py @@ -0,0 +1,55 @@ +import asyncio +from functools import wraps +import socketio +import socket + +def get_ip_address(): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + # This IP address doesn't need to be reachable, as we're only using it to find the local IP address + s.connect(("10.255.255.255", 1)) + ip = s.getsockname()[0] + except Exception: + ip = "127.0.0.1" + finally: + s.close() + return ip + +# Update these with the correct values for your host and server +HOST_SERVER_IP = "192.168.1.113" +HOST_SERVER_PORT = 4999 +SERVER_NAME = "server_1" +SERVER_IP = get_ip_address() +SERVER_PORT = 8000 + +sio = socketio.AsyncClient() + +async def announce_server(): + await sio.connect(f'http://{HOST_SERVER_IP}:{HOST_SERVER_PORT}') + await sio.emit('register', {'name': SERVER_NAME, 'ip': SERVER_IP, 'port': SERVER_PORT}) + + @sio.on("heartbeat") + async def on_heartbeat(): + print("Received heartbeat from host") + + @sio.event + async def disconnect(): + print("Disconnected from host") + +def announce_server_decorator(host_block_function): + @wraps(host_block_function) + def wrapper(*args, **kwargs): + loop = asyncio.get_event_loop() + + # Start the server announcement task + announce_task = loop.create_task(announce_server()) + + # Run the original host_block function + result = host_block_function(*args, **kwargs) + + # Cancel the announcement task after the host_block function is done + announce_task.cancel() + + return result + + return wrapper diff --git a/example_block/eden-server/hosting.py b/example_block/eden-server/hosting.py new file mode 100644 index 0000000..86ef4bb --- /dev/null +++ b/example_block/eden-server/hosting.py @@ -0,0 +1,515 @@ +import os +import git +import warnings +import uvicorn +import logging +from fastapi import FastAPI +from prometheus_client import Gauge +from starlette_exporter import PrometheusMiddleware, handle_metrics +from fastapi.middleware.cors import CORSMiddleware + +from .datatypes import Image +from .queue import QueueData +from .log_utils import Colors +from .models import Credentials, WaitFor +from .result_storage import ResultStorage +from .config_wrapper import ConfigWrapper +from .data_handlers import Encoder, Decoder +from .threaded_server import ThreadedServer +from .progress_tracker import fetch_progress_from_token +from .log_utils import log_levels, celery_log_levels, PREFIX +from .prometheus_utils import PrometheusMetrics + +from .utils import stop_everything_gracefully, generate_random_string + +from uvicorn.config import LOGGING_CONFIG + +""" +Celery+redis is needed to be able to queue tasks +""" +from celery import Celery +from .celery_utils import run_celery_app + +""" +tool to allocate gpus on queued tasks +""" +from .gpu_allocator import GPUAllocator + + +def host_block( + block, + port=8080, + host="0.0.0.0", + max_num_workers=4, + redis_port=6379, + redis_host="localhost", + requires_gpu=True, + log_level="warning", + logfile="logs.log", + exclude_gpu_ids: list = [], + remove_result_on_fetch = False +): + """ + Use this to host your eden.Block on a server. Supports multiple GPUs and queues tasks automatically with celery. + + Args: + block (eden.block.Block): The eden block you'd want to host. + port (int, optional): Localhost port where the block would be hosted. Defaults to 8080. + host (str): specifies where the endpoint would be hosted. Defaults to '0.0.0.0'. + max_num_workers (int, optional): Maximum number of tasks to run in parallel. Defaults to 4. + redis_port (int, optional): Port number for celery's redis server. Defaults to 6379. + redis_host (str, optional): Place to host redis for `eden.queue.QueueData`. Defaults to localhost. + requires_gpu (bool, optional): Set this to False if your tasks dont necessarily need GPUs. + log_level (str, optional): Can be 'debug', 'info', or 'warning'. Defaults to 'warning' + logfile(str, optional): Name of the file where the logs would be stored. If set to None, it will show all logs on stdout. Defaults to 'logs.log' + exclude_gpu_ids (list, optional): List of gpu ids to not use for hosting. Example: [2,3] + """ + + """ + Response templates: + + /run: + { + 'token': some_long_token, + } + + /fetch: + if task is queued: + { + 'status': { + 'status': queued, + 'queue_position': int + }, + config: current_config + } + + elif task is running: + { + 'status': { + 'status': 'running', + 'progress': float between 0 and 1, + + }, + config: current_config, + 'output': {} ## optionally the user should be able to write outputs here + } + elif task failed: + { + 'status': { + 'status': 'failed', + } + 'config': current_config, + 'output': {} ## will still include the outputs if any so that it gets returned even though the task failed + } + elif task succeeded: + { + 'status': { + 'status': 'complete' + }, + 'output': user_output, + 'config': config + } + """ + + """ + Initiating celery app + """ + celery_app = Celery(__name__, broker=f"redis://{redis_host}:{str(redis_port)}") + celery_app.conf.broker_url = os.environ.get( + "CELERY_BROKER_URL", f"redis://{redis_host}:{str(redis_port)}" + ) + celery_app.conf.result_backend = os.environ.get( + "CELERY_RESULT_BACKEND", f"redis://{redis_host}:{str(redis_port)}" + ) + celery_app.conf.task_track_started = os.environ.get( + "CELERY_TRACK_STARTED", default=True + ) + + celery_app.conf.worker_send_task_events = True + celery_app.conf.task_send_sent_event = True + + """ + each block gets its wown queue + """ + celery_app.conf.task_default_queue = block.name + + """ + set prefetch mult to 1 so that tasks dont get pre-fetched by workers + """ + celery_app.conf.worker_prefetch_multiplier = 1 + + """ + task messages will be acknowledged after the task has been executed + """ + celery_app.conf.task_acks_late = True + + """ + Initiating GPUAllocator only if requires_gpu is True + """ + if requires_gpu == True: + gpu_allocator = GPUAllocator(exclude_gpu_ids=exclude_gpu_ids) + else: + print(PREFIX + " Initiating server with no GPUs since requires_gpu = False") + + if requires_gpu == True: + if gpu_allocator.num_gpus < max_num_workers: + """ + if a task requires a gpu, and the number of workers is > the number of available gpus, + then max_num_workers is automatically set to the number of gpus available + this is because eden assumes that each task requires one gpu (all of it) + """ + warnings.warn( + "max_num_workers is greater than the number of GPUs found, overriding max_num_workers to be: " + + str(gpu_allocator.num_gpus) + ) + max_num_workers = gpu_allocator.num_gpus + + """ + Initiating queue data to keep track of the queue + """ + queue_data = QueueData( + redis_port=redis_port, redis_host=redis_host, queue_name=block.name + ) + + """ + Initiate encoder and decoder + """ + + data_encoder = Encoder() + data_decoder = Decoder() + + """ + Initiate fastAPI app + """ + app = FastAPI() + origins = ["*"] + app.add_middleware( + CORSMiddleware, + allow_origins=origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + app.add_middleware(PrometheusMiddleware) + app.add_route("/metrics", handle_metrics) + + """ + Initiate result storage on redis + """ + + result_storage = ResultStorage( + redis_host=redis_host, + redis_port=redis_port, + ) + + ## set up result storage and data encoder for block + block.result_storage = result_storage + block.data_encoder = data_encoder + + """ + initiate a wrapper which handles 4 metrics for prometheus: + * number of queued jobs + * number of running jobs + * number of failed jobs + * number of succeeded jobs + """ + prometheus_metrics = PrometheusMetrics() + + """ + define celery task + """ + + @celery_app.task(name="run") + def run(args, token: str): + + ## job moves from queue to running + prometheus_metrics.queued.dec(1) + prometheus_metrics.running.inc(1) + + args = data_decoder.decode(args) + """ + allocating a GPU ID to the tast based on usage + for now let's settle for max 1 GPU per task :( + """ + + if requires_gpu == True: + # returns None if there are no gpus available + gpu_name = gpu_allocator.get_gpu() + else: + gpu_name = None ## default value either if there are no gpus available or requires_gpu = False + + """ + If there are no GPUs available, then it returns a sad message. + But if there ARE GPUs available, then it starts run() + """ + if ( + gpu_name == None and requires_gpu == True + ): ## making sure there are no gpus available + + status = { + "status": "No GPUs are available at the moment, please try again later", + } + + else: + + """ + refer: + https://github.com/abraham-ai/eden/issues/14 + """ + args = ConfigWrapper( + data=args, + token=token, + result_storage=result_storage, + gpu=None, ## will be provided later on in the run + progress=None, ## will be provided later on in the run + ) + + if requires_gpu == True: + args.gpu = gpu_name + + if block.progress == True: + """ + if progress was set to True on @eden.Block.run() decorator, then add a progress tracker into the config + """ + args.progress = block.get_progress_bar( + token=token, result_storage=result_storage + ) + + try: + output = block.__run__(args) + + # job moves from running to succeeded + prometheus_metrics.running.dec(1) + prometheus_metrics.succeeded.inc(1) + + # prevent further jobs from hitting a busy gpu after a caught exception + except Exception as e: + + # job moves from running to failed + prometheus_metrics.running.dec(1) + prometheus_metrics.failed.inc(1) + if requires_gpu == True: + gpu_allocator.set_as_free(name=gpu_name) + raise Exception(str(e)) + + if requires_gpu == True: + gpu_allocator.set_as_free(name=gpu_name) + + success = block.write_results(output=output, token=token) + + return success ## return None because results go to result_storage instead + + @app.post("/run") + def start_run(config: block.data_model): + + ## job moves into queue + prometheus_metrics.queued.inc(1) + + """ + refer: + https://github.com/celery/celery/issues/1813#issuecomment-33142648 + """ + token = generate_random_string(len=10) + + kwargs = dict(args=dict(config), token=token) + + res = run.apply_async(kwargs=kwargs, task_id=token, queue_name=block.name) + + initial_dict = {"config": dict(config), "output": {}, "progress": "__none__"} + + success = result_storage.add(token=token, encoded_results=initial_dict) + + response = {"token": token} + + return response + + @app.post("/update") + def update(credentials: Credentials, config: block.data_model): + + token = credentials.token + config = dict(config) + + status = queue_data.get_status(token=token) + + if status["status"] != "invalid token": + + if ( + status["status"] == "queued" + or status["status"] == "running" + or status["status"] == "starting" + ): + + output_from_storage = result_storage.get(token=token) + output_from_storage["config"] = config + + success = result_storage.add( + encoded_results=output_from_storage, token=token + ) + + response = { + "status": { + "status": "successfully updated config", + } + } + + return response + + elif status["status"] == "failed": + + return { + "status": { + "status": "could not update config because job failed", + } + } + + elif status["status"] == "complete": + + return { + "status": { + "status": "could not update config because job is already complete", + } + } + + else: + response = {"status": {"status": "invalid token"}} + return response + + @app.post("/fetch") + def fetch(credentials: Credentials): + """ + Returns either the status of the task or the result depending on whether it's queued, running, complete or failed. + + Args: + credentials (Credentials): should contain a token that points to a task + """ + + token = credentials.token + + status = queue_data.get_status(token=token) + + if status["status"] != "invalid token": + + if status["status"] == "running": + + results = result_storage.get(token=token) + + response = { + "status": status, + "config": results["config"], + "output": results["output"], + } + + if block.progress == True: + progress_value = fetch_progress_from_token( + result_storage=result_storage, token=token + ) + response["status"]["progress"] = progress_value + + elif status["status"] == "complete": + + results = result_storage.get(token=token) + + ## if results are deleted, it still returns the same schema + if results == None and remove_result_on_fetch == True: + response = { + "status": { + "status": "removed" + }, + } + else: + response = { + "status": status, + "config": results["config"], + "output": results["output"], + } + + + + if remove_result_on_fetch == True: + result_storage.delete(token=token) + + elif ( + status["status"] == "queued" + or status["status"] == "starting" + or status["status"] == "failed" + or status["status"] == "revoked" + ): + + results = result_storage.get(token=token) + + response = {"status": status, "config": results["config"]} + + else: + + response = {"status": status} ## invalid token + + return response + + @app.post("/stop") + async def stop(wait_for: WaitFor): + """ + Stops the eden block, and exits the script + + Args: + config (dict, optional): Amount of time in seconds before the server shuts down. Defaults to {'time': 0}. + """ + logging.info(f"Stopping gracefully in {wait_for.seconds} seconds") + stop_everything_gracefully(t=wait_for.seconds) + + @app.post("/get_identity") + def get_identity(): + """ + Returns name and active commit hash of the generator + """ + try: + repo = git.Repo(search_parent_directories=True) + name = repo.remotes.origin.url.split('.git')[0].split('/')[-1] + sha = repo.head.object.hexsha + except git.exc.InvalidGitRepositoryError: + name = "repo-less-eden" + sha = "none" + + response = { + "name": name, + "commit": sha + } + + return response + + + ## overriding the boring old [INFO] thingy + LOGGING_CONFIG["formatters"]["default"]["fmt"] = ( + "[" + Colors.CYAN + "EDEN" + Colors.END + "] %(asctime)s %(message)s" + ) + LOGGING_CONFIG["formatters"]["access"]["fmt"] = ( + "[" + + Colors.CYAN + + "EDEN" + + Colors.END + + "] %(levelprefix)s %(client_addr)s - '%(request_line)s' %(status_code)s" + ) + + config = uvicorn.config.Config(app=app, host=host, port=port, log_level=log_level) + server = ThreadedServer(config=config) + + # context starts fastAPI stuff and run_celery_app starts celery + with server.run_in_thread(): + message = ( + PREFIX + + " Initializing celery worker on: " + + f"redis://localhost:{str(redis_port)}" + ) + print(message) + ## starts celery app + run_celery_app( + celery_app, + max_num_workers=max_num_workers, + loglevel=celery_log_levels[log_level], + logfile=logfile, + queue_name=block.name, + ) + + message = PREFIX + " Stopped" + + print(message) + diff --git a/example_block/eden-server/image_utils.py b/example_block/eden-server/image_utils.py new file mode 100644 index 0000000..d8f647a --- /dev/null +++ b/example_block/eden-server/image_utils.py @@ -0,0 +1,75 @@ +import PIL +import cv2 +import base64 +import numpy as np +from PIL.Image import Image as ImageFile +from PIL.JpegImagePlugin import JpegImageFile +from PIL.PngImagePlugin import PngImageFile +from PIL import Image +from io import BytesIO + + +def _encode_numpy_array_image(image): + image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + + if image.shape[-1] == 3: + _, buffer = cv2.imencode(".jpg", image) + + elif image.shape[-1] == 4: + _, buffer = cv2.imencode(".png", image) + + image_as_text = base64.b64encode(buffer) + + return image_as_text + + +def _encode_pil_image(image): + opencv_image = np.array(image) + image_as_text = _encode_numpy_array_image(image=opencv_image) + + return image_as_text + + +def _encode_image_file(image): + pil_image = Image.open(image) + + return _encode_pil_image(pil_image) + + +def encode(image): + + if ( + type(image) == np.ndarray + or type(image) == str + or isinstance( + image, + ( + JpegImageFile, + PngImageFile, + ImageFile, + ), + ) + ): + + if type(image) == np.ndarray: + image_as_text = _encode_numpy_array_image(image) + + elif type(image) == str: + image_as_text = _encode_image_file(image) + + else: + image_as_text = _encode_pil_image(image) + + return image_as_text.decode("ascii") + + else: + raise Exception( + "expected numpy.array, PIL.Image or str, not: ", str(type(image)) + ) + + +def decode(jpg_as_text): + if jpg_as_text is None: + return None + pil_image = Image.open(BytesIO(base64.b64decode(jpg_as_text))) + return pil_image diff --git a/example_block/eden-server/server copy.py b/example_block/eden-server/server copy.py new file mode 100644 index 0000000..27413e7 --- /dev/null +++ b/example_block/eden-server/server copy.py @@ -0,0 +1,69 @@ +from eden.block import Block +from eden.datatypes import Image +from eden.hosting import host_block + +## eden <3 pytorch +from torchvision import models, transforms +import torch + +model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT) +model = model.eval() ## no dont move it to the gpu just yet :) + +my_transforms = transforms.Compose( + [ + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), # this normalizes the image to the same format as the pretrained model + ] +) + +eden_block = Block() + +my_args = { + "width": 224, ## width + "height": 224, ## height + "input_image": Image(), ## images require eden.datatypes.Image() +} + +import requests +labels = requests.get( + "https://raw.githubusercontent.com/pytorch/hub/master/imagenet_classes.txt" +).text.split("\n") + + +@eden_block.run(args=my_args, progress=False) +def do_something(config): + global model, labels + + pil_image = config["input_image"] + pil_image = pil_image.resize((config["width"], config["height"])) + + device = config.gpu + input_tensor = my_transforms(pil_image).to(device).unsqueeze(0) + + model = model.to(device) + + with torch.no_grad(): + pred = model(input_tensor)[0].cpu() + index = torch.argmax(pred).item() + value = pred[index].item() + # the index is the classification label for the pretrained resnet18 model. + # the human-readable labels associated with this index are pulled and returned as "label" + # we need to get them from imagenet labels, which we need to get online. + + label = labels[index] + # serialize the image + pil_image = Image(pil_image) + return {"value": value, "index": index, "label": label, 'image': pil_image} + +if __name__ == "__main__": + host_block( + block=eden_block, + port=5656, + host="0.0.0.0", + redis_host="redis", + # logfile="log.log", + logfile=None, + log_level="debug", + max_num_workers=1, + requires_gpu=True, + ) diff --git a/example_block/eden-server/server.py b/example_block/eden-server/server.py new file mode 100644 index 0000000..8f87e50 --- /dev/null +++ b/example_block/eden-server/server.py @@ -0,0 +1,123 @@ +from eden.block import Block +from eden.datatypes import Image +from eden.hosting import host_block + +## eden <3 pytorch +from torchvision import models, transforms +import torch + +model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT) +model = model.eval() ## no dont move it to the gpu just yet :) + +my_transforms = transforms.Compose( + [ + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), # this normalizes the image to the same format as the pretrained model + ] +) + +eden_block = Block() + +my_args = { + "width": 224, ## width + "height": 224, ## height + "input_image": Image(), ## images require eden.datatypes.Image() +} + +import requests +labels = requests.get( + "https://raw.githubusercontent.com/pytorch/hub/master/imagenet_classes.txt" +).text.split("\n") + + +@eden_block.run(args=my_args, progress=False) +def do_something(config): + global model, labels + + pil_image = config["input_image"] + pil_image = pil_image.resize((config["width"], config["height"])) + + device = config.gpu + input_tensor = my_transforms(pil_image).to(device).unsqueeze(0) + + model = model.to(device) + + with torch.no_grad(): + pred = model(input_tensor)[0].cpu() + index = torch.argmax(pred).item() + value = pred[index].item() + # the index is the classification label for the pretrained resnet18 model. + # the human-readable labels associated with this index are pulled and returned as "label" + # we need to get them from imagenet labels, which we need to get online. + + label = labels[index] + # serialize the image + pil_image = Image(pil_image) + return {"value": value, "index": index, "label": label, 'image': pil_image} + + +def run_host_block(): + host_block( + block=eden_block, + port=5656, + host="0.0.0.0", + redis_host="redis", + # logfile="log.log", + logfile=None, + log_level="debug", + max_num_workers=1, + requires_gpu=True, + ) + +import asyncio +import socketio +import socket + +def get_ip_address(): + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + # This IP address doesn't need to be reachable, as we're only using it to find the local IP address + s.connect(("10.255.255.255", 1)) + ip = s.getsockname()[0] + except Exception: + ip = "127.0.0.1" + finally: + s.close() + return ip + + +# Update these with the correct values for your host and server +HOST_SERVER_IP = "0.0.0.0" +HOST_SERVER_PORT = 4999 +SERVER_NAME = "server_1" +SERVER_IP = get_ip_address() +SERVER_PORT = 8000 + +sio = socketio.AsyncClient() + +async def announce_server(): + await sio.connect(f'http://{HOST_SERVER_IP}:{HOST_SERVER_PORT}') + await sio.emit('register', {'name': SERVER_NAME, 'ip': SERVER_IP, 'port': SERVER_PORT}) + + @sio.on("heartbeat") + async def on_heartbeat(): + print("Received heartbeat from host") + + @sio.event + async def disconnect(): + print("Disconnected from host") + +async def main(): + # Run host_block in a separate thread + loop = asyncio.get_event_loop() + host_block_thread = loop.run_in_executor(None, run_host_block) + + # Announce the server to the host + await announce_server() + + # Wait for host_block to finish + await host_block_thread + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/example_block/nginx.conf b/example_block/nginx.conf new file mode 100644 index 0000000..50f777e --- /dev/null +++ b/example_block/nginx.conf @@ -0,0 +1,24 @@ +worker_processes 1; + +events { + worker_connections 1024; +} + +http { + upstream eden-servers { + server eden-server:5656; + } + + server { + listen 80; + server_name _; + + location / { + proxy_pass http://eden-servers; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + } + } +} + diff --git a/example_block/redis/Dockerfile b/example_block/redis/Dockerfile new file mode 100644 index 0000000..a4838ef --- /dev/null +++ b/example_block/redis/Dockerfile @@ -0,0 +1,12 @@ +# Use an official Redis image as a parent image +FROM redis:latest + +# Set the working directory to /data +WORKDIR /data + +# Expose Redis port +EXPOSE 6379 + +# Run Redis server as daemon +#CMD ["redis-server", "--daemonize", "yes"] +CMD ["redis-server", "--daemonize", "no"] diff --git a/example_block/s2.py b/example_block/s2.py new file mode 100644 index 0000000..1eb9f74 --- /dev/null +++ b/example_block/s2.py @@ -0,0 +1,66 @@ +from eden.block import Block +from eden.datatypes import Image +from eden.hosting import host_block + +## eden <3 pytorch +from torchvision import models, transforms +import torch + +model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT) +model = model.eval() ## no dont move it to the gpu just yet :) + +my_transforms = transforms.Compose( + [ + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), # this normalizes the image to the same format as the pretrained model + ] +) + +eden_block = Block() + +my_args = { + "width": 224, ## width + "height": 224, ## height + "input_image": Image(), ## images require eden.datatypes.Image() +} + +import requests +labels = requests.get( + "https://raw.githubusercontent.com/pytorch/hub/master/imagenet_classes.txt" +).text.split("\n") + + +@eden_block.run(args=my_args, progress=False) +def do_something(config): + global model, labels + + pil_image = config["input_image"] + pil_image = pil_image.resize((config["width"], config["height"])) + + device = config.gpu + input_tensor = my_transforms(pil_image).to(device).unsqueeze(0) + + model = model.to(device) + + with torch.no_grad(): + pred = model(input_tensor)[0].cpu() + index = torch.argmax(pred).item() + value = pred[index].item() + # the index is the classification label for the pretrained resnet18 model. + # the human-readable labels associated with this index are pulled and returned as "label" + # we need to get them from imagenet labels, which we need to get online. + + label = labels[index] + # serialize the image + pil_image = Image(pil_image) + return {"value": value, "index": index, "label": label, 'image': pil_image} + +if __name__ == "__main__": + host_block( + block=eden_block, + port=5655, + logfile="log2.log", + log_level="debug", + max_num_workers=1, + requires_gpu=True, + ) diff --git a/example_block/server.py b/example_block/server.py new file mode 100644 index 0000000..22e60c0 --- /dev/null +++ b/example_block/server.py @@ -0,0 +1,66 @@ +from eden.block import Block +from eden.datatypes import Image +from eden.hosting import host_block + +## eden <3 pytorch +from torchvision import models, transforms +import torch + +model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT) +model = model.eval() ## no dont move it to the gpu just yet :) + +my_transforms = transforms.Compose( + [ + transforms.ToTensor(), + transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), # this normalizes the image to the same format as the pretrained model + ] +) + +eden_block = Block() + +my_args = { + "width": 224, ## width + "height": 224, ## height + "input_image": Image(), ## images require eden.datatypes.Image() +} + +import requests +labels = requests.get( + "https://raw.githubusercontent.com/pytorch/hub/master/imagenet_classes.txt" +).text.split("\n") + + +@eden_block.run(args=my_args, progress=False) +def do_something(config): + global model, labels + + pil_image = config["input_image"] + pil_image = pil_image.resize((config["width"], config["height"])) + + device = config.gpu + input_tensor = my_transforms(pil_image).to(device).unsqueeze(0) + + model = model.to(device) + + with torch.no_grad(): + pred = model(input_tensor)[0].cpu() + index = torch.argmax(pred).item() + value = pred[index].item() + # the index is the classification label for the pretrained resnet18 model. + # the human-readable labels associated with this index are pulled and returned as "label" + # we need to get them from imagenet labels, which we need to get online. + + label = labels[index] + # serialize the image + pil_image = Image(pil_image) + return {"value": value, "index": index, "label": label, 'image': pil_image} + +if __name__ == "__main__": + host_block( + block=eden_block, + port=5656, + logfile="logs.log", + log_level="debug", + max_num_workers=1, + requires_gpu=True, + ) diff --git a/image_utils.py b/image_utils.py new file mode 100644 index 0000000..d8f647a --- /dev/null +++ b/image_utils.py @@ -0,0 +1,75 @@ +import PIL +import cv2 +import base64 +import numpy as np +from PIL.Image import Image as ImageFile +from PIL.JpegImagePlugin import JpegImageFile +from PIL.PngImagePlugin import PngImageFile +from PIL import Image +from io import BytesIO + + +def _encode_numpy_array_image(image): + image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + + if image.shape[-1] == 3: + _, buffer = cv2.imencode(".jpg", image) + + elif image.shape[-1] == 4: + _, buffer = cv2.imencode(".png", image) + + image_as_text = base64.b64encode(buffer) + + return image_as_text + + +def _encode_pil_image(image): + opencv_image = np.array(image) + image_as_text = _encode_numpy_array_image(image=opencv_image) + + return image_as_text + + +def _encode_image_file(image): + pil_image = Image.open(image) + + return _encode_pil_image(pil_image) + + +def encode(image): + + if ( + type(image) == np.ndarray + or type(image) == str + or isinstance( + image, + ( + JpegImageFile, + PngImageFile, + ImageFile, + ), + ) + ): + + if type(image) == np.ndarray: + image_as_text = _encode_numpy_array_image(image) + + elif type(image) == str: + image_as_text = _encode_image_file(image) + + else: + image_as_text = _encode_pil_image(image) + + return image_as_text.decode("ascii") + + else: + raise Exception( + "expected numpy.array, PIL.Image or str, not: ", str(type(image)) + ) + + +def decode(jpg_as_text): + if jpg_as_text is None: + return None + pil_image = Image.open(BytesIO(base64.b64decode(jpg_as_text))) + return pil_image diff --git a/index.html b/index.html index 325369a..55bfa94 100644 --- a/index.html +++ b/index.html @@ -126,6 +126,7 @@ method: "POST", body: formData, credentials: "include", + mode: "no-cors", }); if (response.ok) { diff --git a/registry/host.py b/registry/host.py new file mode 100644 index 0000000..113f227 --- /dev/null +++ b/registry/host.py @@ -0,0 +1,77 @@ +import asyncio +import signal +import socketio +from aiohttp import web + +SERVER_0_IP = "192.168.1.113" +FLASK_SERVER_PORT = 4999 +HEARTBEAT_INTERVAL = 1 +HEARTBEAT_TIMEOUT = 3 + +sio = socketio.AsyncServer(async_mode='aiohttp') +app = web.Application() +sio.attach(app) + +servers = {} + +async def available(request): + return web.json_response(servers) + +app.router.add_get("/available", available) + +@sio.event +async def connect(sid, environ): + print("I'm connected!", sid) + +@sio.event +async def register(sid, data): + server_info = data + name = server_info["name"] + + servers[name] = {"ip": server_info["ip"], "port": server_info["port"], "sid": sid} + print(servers) + +@sio.event +async def disconnect(sid): + print("I'm disconnected!", sid) + for name, server in servers.items(): + if server["sid"] == sid: + del servers[name] + break + +async def heartbeat(): + while True: + await asyncio.sleep(HEARTBEAT_INTERVAL) + server_values_copy = list(servers.values()) + for server in server_values_copy: + sid = server["sid"] + try: + print(f"Sending heartbeat to {sid}...") + heartbeat_future = sio.emit("heartbeat", to=sid) + await asyncio.wait_for(heartbeat_future, timeout=HEARTBEAT_TIMEOUT) + except (asyncio.TimeoutError, socketio.exceptions.TimeoutError): + print(f"Server {sid} failed to respond to heartbeat.") + await sio.disconnect(sid) + +def exit_handler(sig, frame): + print("Shutting down host...") + loop = asyncio.get_event_loop() + heartbeat_task.cancel() + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.stop() + +if __name__ == "__main__": + signal.signal(signal.SIGINT, exit_handler) + signal.signal(signal.SIGTERM, exit_handler) + + loop = asyncio.get_event_loop() + heartbeat_task = loop.create_task(heartbeat()) + aiohttp_app = loop.create_task(web._run_app(app, host=SERVER_0_IP, port=FLASK_SERVER_PORT)) + + try: + loop.run_until_complete(asyncio.gather(heartbeat_task, aiohttp_app)) + except asyncio.CancelledError: + pass + finally: + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.stop() diff --git a/registry/requirements.txt b/registry/requirements.txt new file mode 100644 index 0000000..8ce3175 --- /dev/null +++ b/registry/requirements.txt @@ -0,0 +1,2 @@ +python-socketio[asyncio_client]==6.1.1 +aiohttp==3.8.1 \ No newline at end of file diff --git a/registry/s1.py b/registry/s1.py new file mode 100644 index 0000000..81a84cd --- /dev/null +++ b/registry/s1.py @@ -0,0 +1,39 @@ +import signal +import socketio + +SERVER_0_IP = "localhost" +SERVER_0_PORT = 4999 +SERVER_1_PORT = 5001 +SERVER_1_NAME = "server_1" + +sio = socketio.Client() + +@sio.event +def connect(): + print("I'm connected!") + sio.emit("register", {"name": SERVER_1_NAME, "ip": SERVER_0_IP, "port": SERVER_1_PORT}) + +@sio.event +def connect_error(data): + print("The connection failed!") + +@sio.event +def disconnect(): + print("I'm disconnected!") + +@sio.event +def heartbeat(): + print("Received heartbeat") + +def main(): + sio.connect(f"http://{SERVER_0_IP}:{SERVER_0_PORT}") + sio.wait() + +def exit_handler(sig, frame): + sio.disconnect() + exit(0) + +if __name__ == "__main__": + signal.signal(signal.SIGINT, exit_handler) + signal.signal(signal.SIGTERM, exit_handler) + main() diff --git a/registry/s2.py b/registry/s2.py new file mode 100644 index 0000000..cdbb07c --- /dev/null +++ b/registry/s2.py @@ -0,0 +1,39 @@ +import signal +import socketio + +SERVER_0_IP = "localhost" +SERVER_0_PORT = 4999 +SERVER_1_PORT = 5002 +SERVER_1_NAME = "server_2" + +sio = socketio.Client() + +@sio.event +def connect(): + print("I'm connected!") + sio.emit("register", {"name": SERVER_1_NAME, "ip": SERVER_0_IP, "port": SERVER_1_PORT}) + +@sio.event +def connect_error(data): + print("The connection failed!") + +@sio.event +def disconnect(): + print("I'm disconnected!") + +@sio.event +def heartbeat(): + print("Received heartbeat") + +def main(): + sio.connect(f"http://{SERVER_0_IP}:{SERVER_0_PORT}") + sio.wait() + +def exit_handler(sig, frame): + sio.disconnect() + exit(0) + +if __name__ == "__main__": + signal.signal(signal.SIGINT, exit_handler) + signal.signal(signal.SIGTERM, exit_handler) + main() diff --git a/requirements.txt b/requirements.txt index 75719f9..91c94d5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ flask_sqlalchemy psycopg2-binary flask_bcrypt # flask_login -flask_cors \ No newline at end of file +flask_cors +eden-python \ No newline at end of file