Compare commits
No commits in common. "main" and "59bde66861e7d127709291fd807635495d631674" have entirely different histories.
main
...
59bde66861
1
.gitignore
vendored
1
.gitignore
vendored
@ -6,4 +6,3 @@ __pycache__/
|
|||||||
dist/
|
dist/
|
||||||
build/
|
build/
|
||||||
Pipfile*
|
Pipfile*
|
||||||
.coverage
|
|
||||||
|
16
Makefile
16
Makefile
@ -1,16 +0,0 @@
|
|||||||
build: clean
|
|
||||||
python -m build --sdist --wheel
|
|
||||||
|
|
||||||
clean:
|
|
||||||
rm -rf dist/ build/ .eggs/ .pytest_cache/ src/announce_server.egg-info/
|
|
||||||
|
|
||||||
pub: build
|
|
||||||
twine upload dist/*
|
|
||||||
|
|
||||||
install:
|
|
||||||
pip install -e .[dev,pub]
|
|
||||||
|
|
||||||
test-api:
|
|
||||||
docker run --rm -ti -p 4999:4999 python:3.7-slim bash -c "pip install -U announce_server~=0.0.2rc0; announce_server start_registry"
|
|
||||||
|
|
||||||
.PHONY: build clean pub install test-api
|
|
61
README.md
61
README.md
@ -8,79 +8,30 @@ A Python library that announces a server to a host.
|
|||||||
pip install announce-server
|
pip install announce-server
|
||||||
```
|
```
|
||||||
|
|
||||||
## Development
|
|
||||||
|
|
||||||
To install the developer dependencies required for testing and publishing:
|
|
||||||
```bash
|
|
||||||
pip install -e .[dev,pub]
|
|
||||||
```
|
|
||||||
|
|
||||||
## Build
|
## Build
|
||||||
To build the package, run:
|
To build the package, run:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
rm -rf dist/ build/ .eggs/ .pytest_cache/ src/announce_server.egg-info/
|
pip install -e .[dev]
|
||||||
python -m build --sdist --wheel
|
pip install -m build
|
||||||
```
|
|
||||||
|
|
||||||
To publish:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
twine upload dist/*
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## Test
|
## Test
|
||||||
|
|
||||||
To run the tests, call:
|
To run the tests, install the package with the `[dev]` option:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
|
pip install -e .[dev]
|
||||||
pytest
|
pytest
|
||||||
```
|
```
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
```python
|
```python
|
||||||
from announce_server import register_service
|
from announce_server.announce import announce_server
|
||||||
|
|
||||||
@register_service(name="server_name", ip="server_ip", port=8000, host_ip="host_server_ip", host_port=5000, retry_interval=5)
|
@announce_server(name="server_name", ip="server_ip", port=8000, host_ip="host_server_ip", host_port=5000)
|
||||||
def your_function():
|
def your_function():
|
||||||
pass
|
pass
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## Registry
|
|
||||||
|
|
||||||
The `announce_server` CLI provides a simple way to start a registry server. The registry server keeps track of available services and periodically sends heartbeat messages to ensure that registered services are still active.
|
|
||||||
|
|
||||||
### Command
|
|
||||||
|
|
||||||
```bash
|
|
||||||
announce_server start_registry [--address ADDRESS] [--port PORT] [--heartbeat_interval INTERVAL] [--heartbeat_timeout TIMEOUT]
|
|
||||||
```
|
|
||||||
|
|
||||||
### Arguments
|
|
||||||
|
|
||||||
- `--address ADDRESS`: The IP address of the server. Default: `0.0.0.0`.
|
|
||||||
- `--port PORT`: The port number of the server. Default: `4999`.
|
|
||||||
- `--heartbeat_interval INTERVAL`: The interval between heartbeat messages in seconds. Default: `5`.
|
|
||||||
- `--heartbeat_timeout TIMEOUT`: The timeout for waiting for a response in seconds. Default: `3`.
|
|
||||||
|
|
||||||
### Example
|
|
||||||
|
|
||||||
To start the registry server with the default configuration, run:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
announce_server start_registry
|
|
||||||
```
|
|
||||||
|
|
||||||
The full syntax is equivalent to:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
announce_server start_registry --address 0.0.0.0 --port 4999 --heartbeat_interval 5 --heartbeat_timeout 3
|
|
||||||
```
|
|
||||||
|
|
||||||
To test connections, run:
|
|
||||||
|
|
||||||
```bash
|
|
||||||
announce_server start_client --host-ip 0.0.0.0 --host-port 4999
|
|
||||||
```
|
|
||||||
|
27
setup.cfg
27
setup.cfg
@ -12,6 +12,7 @@ classifiers =
|
|||||||
Intended Audience :: Developers
|
Intended Audience :: Developers
|
||||||
License :: OSI Approved :: MIT License
|
License :: OSI Approved :: MIT License
|
||||||
Programming Language :: Python :: 3
|
Programming Language :: Python :: 3
|
||||||
|
Programming Language :: Python :: 3.6
|
||||||
Programming Language :: Python :: 3.7
|
Programming Language :: Python :: 3.7
|
||||||
Programming Language :: Python :: 3.8
|
Programming Language :: Python :: 3.8
|
||||||
Programming Language :: Python :: 3.9
|
Programming Language :: Python :: 3.9
|
||||||
@ -23,34 +24,20 @@ package_dir =
|
|||||||
= src
|
= src
|
||||||
packages = find:
|
packages = find:
|
||||||
install_requires =
|
install_requires =
|
||||||
python-socketio[asyncio_client]~=5.0.0
|
python-socketio[asyncio_client]
|
||||||
|
|
||||||
[options.packages.find]
|
[options.packages.find]
|
||||||
where = src
|
where = src
|
||||||
|
|
||||||
[options.extras_require]
|
[options.extras_require]
|
||||||
dev =
|
dev =
|
||||||
|
build
|
||||||
|
setuptools_scm
|
||||||
pytest
|
pytest
|
||||||
pytest-mock
|
pytest-mock
|
||||||
pytest-asyncio
|
pytest-asyncio
|
||||||
pytest-cov
|
pytest-cov
|
||||||
|
# asynctest; python_version<'3.8'
|
||||||
pub =
|
|
||||||
build
|
|
||||||
wheel
|
|
||||||
setuptools_scm
|
|
||||||
twine
|
|
||||||
|
|
||||||
api =
|
|
||||||
aiohttp
|
|
||||||
|
|
||||||
[options.entry_points]
|
|
||||||
console_scripts =
|
|
||||||
announce_server = announce_server.__main__:main
|
|
||||||
|
|
||||||
|
|
||||||
[tool:pytest]
|
|
||||||
addopts = --cov --cov-report term-missing
|
|
||||||
|
|
||||||
[coverage:run]
|
[coverage:run]
|
||||||
source = announce_server
|
source = announce_server
|
||||||
@ -58,8 +45,10 @@ branch = True
|
|||||||
|
|
||||||
[coverage:report]
|
[coverage:report]
|
||||||
# Fail the test if coverage is below a certain percentage
|
# Fail the test if coverage is below a certain percentage
|
||||||
fail_under = 85
|
fail_under = 90
|
||||||
show_missing = True
|
show_missing = True
|
||||||
exclude_lines =
|
exclude_lines =
|
||||||
if __name__ == .__main__.:
|
if __name__ == .__main__.:
|
||||||
|
|
||||||
|
[tool:pytest]
|
||||||
|
addopts = --cov --cov-report term-missing
|
2
setup.py
2
setup.py
@ -7,5 +7,5 @@ setup(
|
|||||||
"setuptools_scm",
|
"setuptools_scm",
|
||||||
],
|
],
|
||||||
use_scm_version=True,
|
use_scm_version=True,
|
||||||
python_requires=">=3.7", # because of python-socketsio[asyncio_client]
|
python_requires=">=3.6",
|
||||||
)
|
)
|
||||||
|
@ -1,8 +1,2 @@
|
|||||||
from .decorator import (
|
from .decorator import _announce_server, announce_server
|
||||||
_announce_server,
|
|
||||||
announce_server,
|
|
||||||
register_block,
|
|
||||||
register_server,
|
|
||||||
register_service,
|
|
||||||
)
|
|
||||||
from .get_ip import get_ip_address
|
from .get_ip import get_ip_address
|
||||||
|
@ -1,65 +0,0 @@
|
|||||||
import argparse
|
|
||||||
|
|
||||||
from announce_server import register_service
|
|
||||||
from announce_server.client import start_client
|
|
||||||
from announce_server.server import start_server
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
|
||||||
parser = argparse.ArgumentParser(description="Announce server CLI")
|
|
||||||
subparsers = parser.add_subparsers(dest="command", help="Available subcommands")
|
|
||||||
|
|
||||||
# Start registry subcommand
|
|
||||||
start_registry_parser = subparsers.add_parser(
|
|
||||||
"start_registry", help="Start the registry server"
|
|
||||||
)
|
|
||||||
start_registry_parser.add_argument(
|
|
||||||
"--ip", default="0.0.0.0", help="IP address of the host server"
|
|
||||||
)
|
|
||||||
start_registry_parser.add_argument(
|
|
||||||
"--port", default=4999, type=int, help="Port of the host server"
|
|
||||||
)
|
|
||||||
start_registry_parser.add_argument(
|
|
||||||
"--heartbeat-interval",
|
|
||||||
default=5,
|
|
||||||
type=float,
|
|
||||||
help="Heartbeat interval in seconds",
|
|
||||||
)
|
|
||||||
start_registry_parser.add_argument(
|
|
||||||
"--heartbeat-timeout",
|
|
||||||
default=3,
|
|
||||||
type=float,
|
|
||||||
help="Heartbeat timeout in seconds",
|
|
||||||
)
|
|
||||||
|
|
||||||
# Start client subcommand
|
|
||||||
start_client_parser = subparsers.add_parser(
|
|
||||||
"start_client", help="Start the client server"
|
|
||||||
)
|
|
||||||
start_client_parser.add_argument(
|
|
||||||
"--host-ip",
|
|
||||||
type=str,
|
|
||||||
default="0.0.0.0",
|
|
||||||
help="Host IP address (default: 0.0.0.0)",
|
|
||||||
)
|
|
||||||
start_client_parser.add_argument(
|
|
||||||
"--host-port", type=int, default=4999, help="Host port number (default: 4999)"
|
|
||||||
)
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
if args.command == "start_registry":
|
|
||||||
start_server(
|
|
||||||
address=args.ip,
|
|
||||||
port=args.port,
|
|
||||||
heartbeat_interval=args.heartbeat_interval,
|
|
||||||
heartbeat_timeout=args.heartbeat_timeout,
|
|
||||||
)
|
|
||||||
elif args.command == "start_client":
|
|
||||||
start_client(host_ip=args.host_ip, host_port=args.host_port)
|
|
||||||
else:
|
|
||||||
parser.print_help()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
@ -1,70 +0,0 @@
|
|||||||
import argparse
|
|
||||||
import asyncio
|
|
||||||
import signal
|
|
||||||
import sys
|
|
||||||
import threading
|
|
||||||
from http.server import HTTPServer, SimpleHTTPRequestHandler
|
|
||||||
|
|
||||||
import socketio
|
|
||||||
|
|
||||||
from .decorator import register_service
|
|
||||||
from .get_ip import get_ip_address
|
|
||||||
|
|
||||||
http_server_thread = None
|
|
||||||
|
|
||||||
def start_client(host_ip="0.0.0.0", host_port=4999):
|
|
||||||
@register_service(
|
|
||||||
name="test client",
|
|
||||||
ip=get_ip_address(),
|
|
||||||
port=13373,
|
|
||||||
host_ip=host_ip,
|
|
||||||
host_port=host_port,
|
|
||||||
)
|
|
||||||
def server(port=13373):
|
|
||||||
def start_server():
|
|
||||||
global http_server_thread
|
|
||||||
print(f"Serving HTTP on 0.0.0.0 port {port} (http://0.0.0.0:{port}/) ...")
|
|
||||||
httpd = HTTPServer(("", port), SimpleHTTPRequestHandler)
|
|
||||||
httpd.serve_forever()
|
|
||||||
server_thread = threading.Thread(target=start_server)
|
|
||||||
server_thread.daemon = True
|
|
||||||
server_thread.start()
|
|
||||||
server_thread.join()
|
|
||||||
|
|
||||||
def signal_handler(signal, frame):
|
|
||||||
print("Cleaning up and shutting down...")
|
|
||||||
|
|
||||||
# If the HTTP server thread is running, shut it down
|
|
||||||
if http_server_thread is not None and http_server_thread.is_alive():
|
|
||||||
http_server_thread.shutdown()
|
|
||||||
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
signal.signal(signal.SIGINT, signal_handler)
|
|
||||||
signal.signal(signal.SIGTERM, signal_handler)
|
|
||||||
try:
|
|
||||||
server()
|
|
||||||
signal.pause()
|
|
||||||
except asyncio.exceptions.CancelledError:
|
|
||||||
print("CancelledError")
|
|
||||||
# signal_handler(signal.SIGINT, None)
|
|
||||||
pass
|
|
||||||
finally:
|
|
||||||
print("Shutting down test client...")
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
parser = argparse.ArgumentParser(description="Start announce_server client.")
|
|
||||||
parser.add_argument(
|
|
||||||
"--host-ip",
|
|
||||||
type=str,
|
|
||||||
default="0.0.0.0",
|
|
||||||
help="Host IP address (default: 0.0.0.0)",
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--host-port", type=int, default=4999, help="Host port number (default: 4999)"
|
|
||||||
)
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
start_client(args.host_ip, args.host_port)
|
|
@ -1,12 +1,10 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import warnings
|
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
|
||||||
import socketio
|
import socketio
|
||||||
|
|
||||||
sio = socketio.AsyncClient()
|
sio = socketio.AsyncClient()
|
||||||
|
|
||||||
import signal
|
|
||||||
|
|
||||||
async def _announce_server(**kwargs):
|
async def _announce_server(**kwargs):
|
||||||
SERVER_NAME = kwargs.get("name", "server_1")
|
SERVER_NAME = kwargs.get("name", "server_1")
|
||||||
@ -14,7 +12,6 @@ async def _announce_server(**kwargs):
|
|||||||
SERVER_PORT = kwargs.get("port", 8000)
|
SERVER_PORT = kwargs.get("port", 8000)
|
||||||
HOST_SERVER_IP = kwargs.get("host_ip", "0.0.0.0")
|
HOST_SERVER_IP = kwargs.get("host_ip", "0.0.0.0")
|
||||||
HOST_SERVER_PORT = kwargs.get("host_port", 5000)
|
HOST_SERVER_PORT = kwargs.get("host_port", 5000)
|
||||||
RETRY_INTERVAL = kwargs.get("retry_interval", 5)
|
|
||||||
|
|
||||||
@sio.event
|
@sio.event
|
||||||
async def connect():
|
async def connect():
|
||||||
@ -31,10 +28,8 @@ async def _announce_server(**kwargs):
|
|||||||
break
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(e)
|
print(e)
|
||||||
print(
|
print("Failed to connect to host, retrying in 5 seconds")
|
||||||
f"Failed to connect to host server, retrying in {RETRY_INTERVAL} seconds"
|
await asyncio.sleep(5)
|
||||||
)
|
|
||||||
await asyncio.sleep(RETRY_INTERVAL)
|
|
||||||
# await sio.connect(f'http://{HOST_SERVER_IP}:{HOST_SERVER_PORT}')
|
# await sio.connect(f'http://{HOST_SERVER_IP}:{HOST_SERVER_PORT}')
|
||||||
print("Connected to host")
|
print("Connected to host")
|
||||||
|
|
||||||
@ -49,9 +44,85 @@ async def _announce_server(**kwargs):
|
|||||||
await main()
|
await main()
|
||||||
|
|
||||||
|
|
||||||
def register_service(task=None, **outer_kwargs):
|
# def announce_server(task=None, loop=None, **outer_kwargs):
|
||||||
|
# if task is None:
|
||||||
|
# return lambda f: announce_server(f, loop=loop, **outer_kwargs)
|
||||||
|
|
||||||
|
# @wraps(task)
|
||||||
|
# def wrapper(*args, **kwargs):
|
||||||
|
# async def main(*args, **kwargs):
|
||||||
|
# if loop is not None:
|
||||||
|
# host_block_thread = loop.run_in_executor(None, task)
|
||||||
|
# else:
|
||||||
|
# host_block_thread = asyncio.to_thread(task) # python 3.9+
|
||||||
|
|
||||||
|
# # Announce the server to the host
|
||||||
|
# await _announce_server(**outer_kwargs)
|
||||||
|
|
||||||
|
# # Wait for host_block to finish
|
||||||
|
# await host_block_thread
|
||||||
|
|
||||||
|
# if loop is not None:
|
||||||
|
# task = loop.create_task(main(*args, **kwargs))
|
||||||
|
# else:
|
||||||
|
# task = asyncio.run(main(*args, **kwargs))
|
||||||
|
# return task
|
||||||
|
# return wrapper
|
||||||
|
|
||||||
|
# def announce_server(task=None, loop=None, **outer_kwargs):
|
||||||
|
# if task is None:
|
||||||
|
# return lambda f: announce_server(f, loop=loop, **outer_kwargs)
|
||||||
|
|
||||||
|
# @wraps(task)
|
||||||
|
# async def wrapper(*args, **kwargs):
|
||||||
|
# if not asyncio.iscoroutinefunction(task):
|
||||||
|
# # If the decorated function is not a coroutine, wrap it in a coroutine
|
||||||
|
# task = asyncio.coroutine(task)
|
||||||
|
# if loop is not None:
|
||||||
|
# host_block_thread = loop.run_in_executor(None, task)
|
||||||
|
# else:
|
||||||
|
# host_block_thread = asyncio.to_thread(task)
|
||||||
|
|
||||||
|
# # Announce the server to the host
|
||||||
|
# await _announce_server(**outer_kwargs)
|
||||||
|
|
||||||
|
# # Wait for host_block to finish
|
||||||
|
# await host_block_thread
|
||||||
|
|
||||||
|
# return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
# def announce_server(task=None, loop=None, **outer_kwargs):
|
||||||
|
# if task is None:
|
||||||
|
# return lambda f: announce_server(f, loop=loop, **outer_kwargs)
|
||||||
|
|
||||||
|
# if loop is None:
|
||||||
|
# loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
|
# @wraps(task)
|
||||||
|
# def wrapper(*args, **kwargs):
|
||||||
|
# async def main(*args, **kwargs):
|
||||||
|
# if asyncio.iscoroutinefunction(task):
|
||||||
|
# # If the task is async, just await it
|
||||||
|
# host_block_thread = task(*args, **kwargs)
|
||||||
|
# else:
|
||||||
|
# host_block_thread = loop.run_in_executor(None, task, *args, **kwargs)
|
||||||
|
|
||||||
|
# # Announce the server to the host
|
||||||
|
# await _announce_server(**outer_kwargs)
|
||||||
|
|
||||||
|
# # Wait for host_block to finish
|
||||||
|
# await host_block_thread
|
||||||
|
|
||||||
|
# task = loop.create_task(main(*args, **kwargs))
|
||||||
|
# return task
|
||||||
|
|
||||||
|
# return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
def announce_server(task=None, **outer_kwargs):
|
||||||
if task is None:
|
if task is None:
|
||||||
return lambda f: register_service(f, **outer_kwargs)
|
return lambda f: announce_server(f, **outer_kwargs)
|
||||||
|
|
||||||
@wraps(task)
|
@wraps(task)
|
||||||
def wrapper(*args, **kwargs):
|
def wrapper(*args, **kwargs):
|
||||||
@ -68,55 +139,3 @@ def register_service(task=None, **outer_kwargs):
|
|||||||
return asyncio.run(main())
|
return asyncio.run(main())
|
||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
# def register_service(task=None, **outer_kwargs):
|
|
||||||
# if task is None:
|
|
||||||
# return lambda f: register_service(f, **outer_kwargs)
|
|
||||||
|
|
||||||
# @wraps(task)
|
|
||||||
# def wrapper(*args, **kwargs):
|
|
||||||
# async def main(*args, **kwargs):
|
|
||||||
# loop = asyncio.get_event_loop()
|
|
||||||
# host_block_thread = loop.run_in_executor(None, task)
|
|
||||||
|
|
||||||
# # Announce the server to the host
|
|
||||||
# await _announce_server(**outer_kwargs)
|
|
||||||
|
|
||||||
# # Set up signal handlers to clean up properly
|
|
||||||
# def signal_handler(signum, frame):
|
|
||||||
# print(f"Received signal {signum}. Cleaning up and shutting down...")
|
|
||||||
# host_block_thread.cancel()
|
|
||||||
# import sys
|
|
||||||
# sys.exit(0)
|
|
||||||
|
|
||||||
# signal.signal(signal.SIGINT, signal_handler)
|
|
||||||
# signal.signal(signal.SIGTERM, signal_handler)
|
|
||||||
|
|
||||||
# # Wait for host_block to finish or to be cancelled
|
|
||||||
# try:
|
|
||||||
# await host_block_thread
|
|
||||||
# except asyncio.CancelledError:
|
|
||||||
# pass
|
|
||||||
|
|
||||||
# return asyncio.run(main())
|
|
||||||
|
|
||||||
# return wrapper
|
|
||||||
|
|
||||||
def announce_server(*args, **kwargs):
|
|
||||||
"""Wrapper for register_service"""
|
|
||||||
warnings.warn(
|
|
||||||
"announce_server is deprecated, use register_service instead",
|
|
||||||
DeprecationWarning,
|
|
||||||
stacklevel=2,
|
|
||||||
)
|
|
||||||
return register_service(*args, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
def register_block(*args, **kwargs):
|
|
||||||
"""Wrapper for register_service"""
|
|
||||||
return register_service(*args, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
def register_server(*args, **kwargs):
|
|
||||||
"""Wrapper for register_service"""
|
|
||||||
return register_service(*args, **kwargs)
|
|
||||||
|
@ -1,194 +0,0 @@
|
|||||||
import argparse
|
|
||||||
import asyncio
|
|
||||||
import signal
|
|
||||||
|
|
||||||
import socketio
|
|
||||||
from aiohttp import web
|
|
||||||
|
|
||||||
sio = socketio.AsyncServer(async_mode="aiohttp")
|
|
||||||
app = web.Application()
|
|
||||||
sio.attach(app)
|
|
||||||
|
|
||||||
servers = {}
|
|
||||||
|
|
||||||
|
|
||||||
async def available(request):
|
|
||||||
"""
|
|
||||||
Return a JSON response containing the available servers.
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
aiohttp.web.Response
|
|
||||||
JSON response containing the available servers.
|
|
||||||
"""
|
|
||||||
return web.json_response(servers)
|
|
||||||
|
|
||||||
|
|
||||||
app.router.add_get("/available", available)
|
|
||||||
|
|
||||||
|
|
||||||
@sio.event
|
|
||||||
async def connect(sid, environ):
|
|
||||||
"""Handle a new connection to the socket."""
|
|
||||||
print("Connected:", sid)
|
|
||||||
|
|
||||||
|
|
||||||
@sio.event
|
|
||||||
async def register(sid, data):
|
|
||||||
"""
|
|
||||||
Register a new server.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
sid : str
|
|
||||||
Socket ID of the connected server.
|
|
||||||
data : dict
|
|
||||||
Server information (name, IP, and port).
|
|
||||||
"""
|
|
||||||
server_info = data
|
|
||||||
name = server_info["name"]
|
|
||||||
|
|
||||||
servers[name] = {"ip": server_info["ip"], "port": server_info["port"], "sid": sid}
|
|
||||||
print(servers)
|
|
||||||
|
|
||||||
|
|
||||||
@sio.event
|
|
||||||
async def disconnect(sid):
|
|
||||||
"""
|
|
||||||
Handle a server disconnect.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
sid : str
|
|
||||||
Socket ID of the disconnected server.
|
|
||||||
"""
|
|
||||||
print("Disconnected from server:", sid)
|
|
||||||
for name, server in servers.items():
|
|
||||||
if server["sid"] == sid:
|
|
||||||
del servers[name]
|
|
||||||
break
|
|
||||||
|
|
||||||
|
|
||||||
async def heartbeat(sio, interval, timeout):
|
|
||||||
"""
|
|
||||||
Periodically send heartbeat messages to connected servers.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
sio : socketio.AsyncServer
|
|
||||||
The socket.io server instance.
|
|
||||||
interval : int
|
|
||||||
The interval between heartbeat messages in seconds.
|
|
||||||
timeout : int
|
|
||||||
The timeout for waiting for a response in seconds.
|
|
||||||
"""
|
|
||||||
while True:
|
|
||||||
await asyncio.sleep(interval)
|
|
||||||
server_values_copy = list(servers.values())
|
|
||||||
for server in server_values_copy:
|
|
||||||
sid = server["sid"]
|
|
||||||
try:
|
|
||||||
print(f"Sending heartbeat to {sid}...")
|
|
||||||
heartbeat_future = sio.emit("heartbeat", to=sid)
|
|
||||||
await asyncio.wait_for(heartbeat_future, timeout=timeout)
|
|
||||||
except (asyncio.TimeoutError, socketio.exceptions.TimeoutError):
|
|
||||||
print(f"Server {sid} failed to respond to heartbeat after {timeout}s.")
|
|
||||||
await sio.disconnect(sid)
|
|
||||||
|
|
||||||
|
|
||||||
def create_exit_handler(loop, heartbeat_task):
|
|
||||||
"""
|
|
||||||
Create an exit handler for gracefully shutting down the server.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
loop : asyncio.AbstractEventLoop
|
|
||||||
The event loop.
|
|
||||||
heartbeat_task : asyncio.Task
|
|
||||||
The heartbeat task.
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
Callable
|
|
||||||
An asynchronous exit handler function.
|
|
||||||
"""
|
|
||||||
|
|
||||||
async def exit_handler(sig, frame):
|
|
||||||
print("Shutting down host...")
|
|
||||||
heartbeat_task.cancel()
|
|
||||||
await loop.shutdown_asyncgens()
|
|
||||||
loop.stop()
|
|
||||||
|
|
||||||
return exit_handler
|
|
||||||
|
|
||||||
|
|
||||||
def start_server(address, port, heartbeat_interval, heartbeat_timeout):
|
|
||||||
"""
|
|
||||||
Run the main server loop.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
address : str
|
|
||||||
The IP address of the server.
|
|
||||||
port : int
|
|
||||||
The port number of the server.
|
|
||||||
heartbeat_interval : int
|
|
||||||
The interval between heartbeat messages in seconds.
|
|
||||||
heartbeat_timeout : int
|
|
||||||
The timeout for waiting for a response in seconds.
|
|
||||||
"""
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
# Python 3.7+, coroutines only:
|
|
||||||
# heartbeat_task = loop.create_task(
|
|
||||||
# heartbeat(sio, heartbeat_interval, heartbeat_timeout)
|
|
||||||
# )
|
|
||||||
# aiohttp_app = loop.create_task(web._run_app(app, host=address, port=port))
|
|
||||||
|
|
||||||
# Python 3.6+ compatible. Supports any awaitable:
|
|
||||||
heartbeat_task = asyncio.ensure_future(
|
|
||||||
heartbeat(sio, heartbeat_interval, heartbeat_timeout)
|
|
||||||
)
|
|
||||||
|
|
||||||
aiohttp_app = asyncio.ensure_future(web._run_app(app, host=address, port=port))
|
|
||||||
|
|
||||||
exit_handler = create_exit_handler(loop, heartbeat_task)
|
|
||||||
signal.signal(signal.SIGINT, exit_handler)
|
|
||||||
signal.signal(signal.SIGTERM, exit_handler)
|
|
||||||
|
|
||||||
try:
|
|
||||||
loop.run_until_complete(asyncio.gather(heartbeat_task, aiohttp_app))
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
|
||||||
finally:
|
|
||||||
loop.run_until_complete(loop.shutdown_asyncgens())
|
|
||||||
loop.stop()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
parser = argparse.ArgumentParser(description="Start announce_server client.")
|
|
||||||
|
|
||||||
parser.add_argument("--ip", default="0.0.0.0", help="IP address of the host server")
|
|
||||||
parser.add_argument(
|
|
||||||
"--port", default=4999, type=int, help="Port of the host server"
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--heartbeat-interval",
|
|
||||||
default=5,
|
|
||||||
type=float,
|
|
||||||
help="Heartbeat interval in seconds",
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--heartbeat-timeout",
|
|
||||||
default=3,
|
|
||||||
type=float,
|
|
||||||
help="Heartbeat timeout in seconds",
|
|
||||||
)
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
|
||||||
|
|
||||||
start_server(
|
|
||||||
address=args.ip,
|
|
||||||
port=args.port,
|
|
||||||
heartbeat_interval=args.heartbeat_interval,
|
|
||||||
heartbeat_timeout=args.heartbeat_timeout,
|
|
||||||
)
|
|
@ -1,61 +0,0 @@
|
|||||||
from unittest.mock import MagicMock, patch
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from announce_server.decorator import (
|
|
||||||
announce_server,
|
|
||||||
register_block,
|
|
||||||
register_server,
|
|
||||||
register_service,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
"decorator_alias",
|
|
||||||
[
|
|
||||||
register_block,
|
|
||||||
register_server,
|
|
||||||
],
|
|
||||||
)
|
|
||||||
def test_alias_calls_register_service(decorator_alias):
|
|
||||||
test_args = (None, None)
|
|
||||||
test_kwargs = {
|
|
||||||
"name": "test_server",
|
|
||||||
"ip": "127.0.0.1",
|
|
||||||
"port": 8000,
|
|
||||||
"host_ip": "127.0.0.1",
|
|
||||||
"host_port": 5000,
|
|
||||||
}
|
|
||||||
|
|
||||||
with patch("announce_server.decorator.register_service") as mock_register_service:
|
|
||||||
mock_register_service.return_value = MagicMock()
|
|
||||||
|
|
||||||
decorator = decorator_alias(*test_args, **test_kwargs)
|
|
||||||
decorator(MagicMock())
|
|
||||||
|
|
||||||
mock_register_service.assert_called_once_with(*test_args, **test_kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
"decorator_alias",
|
|
||||||
[
|
|
||||||
announce_server,
|
|
||||||
],
|
|
||||||
)
|
|
||||||
def test_deprecated_alias_calls_register_service(decorator_alias):
|
|
||||||
test_args = (None, None)
|
|
||||||
test_kwargs = {
|
|
||||||
"name": "test_server",
|
|
||||||
"ip": "127.0.0.1",
|
|
||||||
"port": 8000,
|
|
||||||
"host_ip": "127.0.0.1",
|
|
||||||
"host_port": 5000,
|
|
||||||
}
|
|
||||||
with patch("announce_server.decorator.register_service") as mock_register_service:
|
|
||||||
mock_register_service.return_value = MagicMock()
|
|
||||||
|
|
||||||
with pytest.warns(DeprecationWarning):
|
|
||||||
decorator = decorator_alias(*test_args, **test_kwargs)
|
|
||||||
|
|
||||||
decorator(MagicMock())
|
|
||||||
mock_register_service.assert_called_once_with(*test_args, **test_kwargs)
|
|
@ -1,48 +1,39 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
from unittest.mock import AsyncMock, call, patch
|
import subprocess
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import socketio
|
|
||||||
|
|
||||||
from announce_server.decorator import _announce_server
|
from announce_server.decorator import _announce_server, announce_server
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@patch("announce_server.decorator._announce_server")
|
||||||
async def test_announce_server(event_loop):
|
def test_announce_server_decorator(mock_announce_server):
|
||||||
# Mock the socketio.AsyncClient to prevent actual connections
|
# Mock the _announce_server function to prevent actual connections
|
||||||
|
mock_announce_server.return_value = MagicMock()
|
||||||
|
|
||||||
with patch("announce_server.decorator.sio") as mock_sio:
|
# Decorate the sample function with announce_server
|
||||||
# Create a fake sio.connect() function that simulates a retry loop
|
@announce_server(
|
||||||
async def fake_connect(*args, **kwargs):
|
name="test_server",
|
||||||
await asyncio.sleep(0.1)
|
ip="127.0.0.1",
|
||||||
raise RuntimeError("Failed to connect")
|
port=8000,
|
||||||
|
host_ip="127.0.0.1",
|
||||||
# Set the fake connect function to be used as a side_effect for the mock
|
host_port=5000,
|
||||||
mock_sio.connect = AsyncMock(side_effect=fake_connect)
|
|
||||||
|
|
||||||
# Define the outer_kwargs for the _announce_server function
|
|
||||||
outer_kwargs = {
|
|
||||||
"name": "test_server",
|
|
||||||
"ip": "127.0.0.1",
|
|
||||||
"port": 8000,
|
|
||||||
"host_ip": "127.0.0.1",
|
|
||||||
"host_port": 5123,
|
|
||||||
"retry_interval": 0.001,
|
|
||||||
}
|
|
||||||
|
|
||||||
# Run the _announce_server function with a timeout to avoid infinite loop
|
|
||||||
try:
|
|
||||||
await asyncio.wait_for(_announce_server(**outer_kwargs), timeout=0.105)
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Check if sio.connect was called multiple times due to the retry loop
|
|
||||||
assert mock_sio.connect.call_count >= 2
|
|
||||||
|
|
||||||
# Check if sio.connect was called with the correct arguments
|
|
||||||
mock_sio.connect.assert_has_calls(
|
|
||||||
[call("http://127.0.0.1:5123")] * mock_sio.connect.call_count
|
|
||||||
)
|
)
|
||||||
|
def http_server():
|
||||||
|
server = subprocess.Popen(["python3", "-m", "http.server", "13373"])
|
||||||
|
yield
|
||||||
|
server.terminate()
|
||||||
|
server.wait()
|
||||||
|
|
||||||
# Since we don't have access to the event handlers directly, we can't test them in this way.
|
# Run the decorated function
|
||||||
# Instead, you could refactor the code to make the event handlers separate functions that can be tested independently.
|
http_server()
|
||||||
|
|
||||||
|
# Check if the _announce_server function was called with the correct arguments
|
||||||
|
mock_announce_server.assert_called_once_with(
|
||||||
|
name="test_server",
|
||||||
|
ip="127.0.0.1",
|
||||||
|
port=8000,
|
||||||
|
host_ip="127.0.0.1",
|
||||||
|
host_port=5000,
|
||||||
|
)
|
||||||
|
@ -1,39 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
import subprocess
|
|
||||||
from unittest.mock import MagicMock, patch
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from announce_server.decorator import _announce_server, register_service
|
|
||||||
|
|
||||||
|
|
||||||
@patch("announce_server.decorator._announce_server")
|
|
||||||
def test_announce_server_decorator(mock_announce_server):
|
|
||||||
# Mock the _announce_server function to prevent actual connections
|
|
||||||
mock_announce_server.return_value = MagicMock()
|
|
||||||
|
|
||||||
# Decorate the sample function with announce_server
|
|
||||||
@register_service(
|
|
||||||
name="test_server",
|
|
||||||
ip="127.0.0.1",
|
|
||||||
port=8000,
|
|
||||||
host_ip="127.0.0.1",
|
|
||||||
host_port=5000,
|
|
||||||
)
|
|
||||||
def http_server():
|
|
||||||
server = subprocess.Popen(["python3", "-m", "http.server", "13373"])
|
|
||||||
yield
|
|
||||||
server.terminate()
|
|
||||||
server.wait()
|
|
||||||
|
|
||||||
# Run the decorated function
|
|
||||||
http_server()
|
|
||||||
|
|
||||||
# Check if the _announce_server function was called with the correct arguments
|
|
||||||
mock_announce_server.assert_called_once_with(
|
|
||||||
name="test_server",
|
|
||||||
ip="127.0.0.1",
|
|
||||||
port=8000,
|
|
||||||
host_ip="127.0.0.1",
|
|
||||||
host_port=5000,
|
|
||||||
)
|
|
@ -1,76 +0,0 @@
|
|||||||
|
|
||||||
# def announce_server(task=None, loop=None, **outer_kwargs):
|
|
||||||
# if task is None:
|
|
||||||
# return lambda f: announce_server(f, loop=loop, **outer_kwargs)
|
|
||||||
|
|
||||||
# @wraps(task)
|
|
||||||
# def wrapper(*args, **kwargs):
|
|
||||||
# async def main(*args, **kwargs):
|
|
||||||
# if loop is not None:
|
|
||||||
# host_block_thread = loop.run_in_executor(None, task)
|
|
||||||
# else:
|
|
||||||
# host_block_thread = asyncio.to_thread(task) # python 3.9+
|
|
||||||
|
|
||||||
# # Announce the server to the host
|
|
||||||
# await _announce_server(**outer_kwargs)
|
|
||||||
|
|
||||||
# # Wait for host_block to finish
|
|
||||||
# await host_block_thread
|
|
||||||
|
|
||||||
# if loop is not None:
|
|
||||||
# task = loop.create_task(main(*args, **kwargs))
|
|
||||||
# else:
|
|
||||||
# task = asyncio.run(main(*args, **kwargs))
|
|
||||||
# return task
|
|
||||||
# return wrapper
|
|
||||||
|
|
||||||
# def announce_server(task=None, loop=None, **outer_kwargs):
|
|
||||||
# if task is None:
|
|
||||||
# return lambda f: announce_server(f, loop=loop, **outer_kwargs)
|
|
||||||
|
|
||||||
# @wraps(task)
|
|
||||||
# async def wrapper(*args, **kwargs):
|
|
||||||
# if not asyncio.iscoroutinefunction(task):
|
|
||||||
# # If the decorated function is not a coroutine, wrap it in a coroutine
|
|
||||||
# task = asyncio.coroutine(task)
|
|
||||||
# if loop is not None:
|
|
||||||
# host_block_thread = loop.run_in_executor(None, task)
|
|
||||||
# else:
|
|
||||||
# host_block_thread = asyncio.to_thread(task)
|
|
||||||
|
|
||||||
# # Announce the server to the host
|
|
||||||
# await _announce_server(**outer_kwargs)
|
|
||||||
|
|
||||||
# # Wait for host_block to finish
|
|
||||||
# await host_block_thread
|
|
||||||
|
|
||||||
# return wrapper
|
|
||||||
|
|
||||||
|
|
||||||
# def announce_server(task=None, loop=None, **outer_kwargs):
|
|
||||||
# if task is None:
|
|
||||||
# return lambda f: announce_server(f, loop=loop, **outer_kwargs)
|
|
||||||
|
|
||||||
# if loop is None:
|
|
||||||
# loop = asyncio.get_event_loop()
|
|
||||||
|
|
||||||
# @wraps(task)
|
|
||||||
# def wrapper(*args, **kwargs):
|
|
||||||
# async def main(*args, **kwargs):
|
|
||||||
# if asyncio.iscoroutinefunction(task):
|
|
||||||
# # If the task is async, just await it
|
|
||||||
# host_block_thread = task(*args, **kwargs)
|
|
||||||
# else:
|
|
||||||
# host_block_thread = loop.run_in_executor(None, task, *args, **kwargs)
|
|
||||||
|
|
||||||
# # Announce the server to the host
|
|
||||||
# await _announce_server(**outer_kwargs)
|
|
||||||
|
|
||||||
# # Wait for host_block to finish
|
|
||||||
# await host_block_thread
|
|
||||||
|
|
||||||
# task = loop.create_task(main(*args, **kwargs))
|
|
||||||
# return task
|
|
||||||
|
|
||||||
# return wrapper
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user