Compare commits
13 Commits
v0.0.1-rc4
...
main
Author | SHA1 | Date | |
---|---|---|---|
|
e8d04cff7c | ||
|
38cd99fa73 | ||
|
945f485243 | ||
|
fe15bbbf1e | ||
|
b87f5a329c | ||
|
e717f60e06 | ||
|
25a7f003d8 | ||
|
91b40234cb | ||
|
91621847e5 | ||
|
8e8d5cd36a | ||
|
0933f943a5 | ||
|
32135e802b | ||
|
9996df0934 |
5
Makefile
5
Makefile
@ -10,4 +10,7 @@ pub: build
|
||||
install:
|
||||
pip install -e .[dev,pub]
|
||||
|
||||
.PHONY: build clean pub install
|
||||
test-api:
|
||||
docker run --rm -ti -p 4999:4999 python:3.7-slim bash -c "pip install -U announce_server~=0.0.2rc0; announce_server start_registry"
|
||||
|
||||
.PHONY: build clean pub install test-api
|
39
README.md
39
README.md
@ -46,4 +46,41 @@ from announce_server import register_service
|
||||
def your_function():
|
||||
pass
|
||||
|
||||
```
|
||||
```
|
||||
|
||||
## Registry
|
||||
|
||||
The `announce_server` CLI provides a simple way to start a registry server. The registry server keeps track of available services and periodically sends heartbeat messages to ensure that registered services are still active.
|
||||
|
||||
### Command
|
||||
|
||||
```bash
|
||||
announce_server start_registry [--address ADDRESS] [--port PORT] [--heartbeat_interval INTERVAL] [--heartbeat_timeout TIMEOUT]
|
||||
```
|
||||
|
||||
### Arguments
|
||||
|
||||
- `--address ADDRESS`: The IP address of the server. Default: `0.0.0.0`.
|
||||
- `--port PORT`: The port number of the server. Default: `4999`.
|
||||
- `--heartbeat_interval INTERVAL`: The interval between heartbeat messages in seconds. Default: `5`.
|
||||
- `--heartbeat_timeout TIMEOUT`: The timeout for waiting for a response in seconds. Default: `3`.
|
||||
|
||||
### Example
|
||||
|
||||
To start the registry server with the default configuration, run:
|
||||
|
||||
```bash
|
||||
announce_server start_registry
|
||||
```
|
||||
|
||||
The full syntax is equivalent to:
|
||||
|
||||
```bash
|
||||
announce_server start_registry --address 0.0.0.0 --port 4999 --heartbeat_interval 5 --heartbeat_timeout 3
|
||||
```
|
||||
|
||||
To test connections, run:
|
||||
|
||||
```bash
|
||||
announce_server start_client --host-ip 0.0.0.0 --host-port 4999
|
||||
```
|
||||
|
16
setup.cfg
16
setup.cfg
@ -12,7 +12,6 @@ classifiers =
|
||||
Intended Audience :: Developers
|
||||
License :: OSI Approved :: MIT License
|
||||
Programming Language :: Python :: 3
|
||||
Programming Language :: Python :: 3.6
|
||||
Programming Language :: Python :: 3.7
|
||||
Programming Language :: Python :: 3.8
|
||||
Programming Language :: Python :: 3.9
|
||||
@ -24,7 +23,7 @@ package_dir =
|
||||
= src
|
||||
packages = find:
|
||||
install_requires =
|
||||
python-socketio[asyncio_client]
|
||||
python-socketio[asyncio_client]~=5.0.0
|
||||
|
||||
[options.packages.find]
|
||||
where = src
|
||||
@ -42,6 +41,17 @@ pub =
|
||||
setuptools_scm
|
||||
twine
|
||||
|
||||
api =
|
||||
aiohttp
|
||||
|
||||
[options.entry_points]
|
||||
console_scripts =
|
||||
announce_server = announce_server.__main__:main
|
||||
|
||||
|
||||
[tool:pytest]
|
||||
addopts = --cov --cov-report term-missing
|
||||
|
||||
[coverage:run]
|
||||
source = announce_server
|
||||
branch = True
|
||||
@ -53,5 +63,3 @@ show_missing = True
|
||||
exclude_lines =
|
||||
if __name__ == .__main__.:
|
||||
|
||||
[tool:pytest]
|
||||
addopts = --cov --cov-report term-missing
|
2
setup.py
2
setup.py
@ -7,5 +7,5 @@ setup(
|
||||
"setuptools_scm",
|
||||
],
|
||||
use_scm_version=True,
|
||||
python_requires=">=3.6",
|
||||
python_requires=">=3.7", # because of python-socketsio[asyncio_client]
|
||||
)
|
||||
|
65
src/announce_server/__main__.py
Normal file
65
src/announce_server/__main__.py
Normal file
@ -0,0 +1,65 @@
|
||||
import argparse
|
||||
|
||||
from announce_server import register_service
|
||||
from announce_server.client import start_client
|
||||
from announce_server.server import start_server
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Announce server CLI")
|
||||
subparsers = parser.add_subparsers(dest="command", help="Available subcommands")
|
||||
|
||||
# Start registry subcommand
|
||||
start_registry_parser = subparsers.add_parser(
|
||||
"start_registry", help="Start the registry server"
|
||||
)
|
||||
start_registry_parser.add_argument(
|
||||
"--ip", default="0.0.0.0", help="IP address of the host server"
|
||||
)
|
||||
start_registry_parser.add_argument(
|
||||
"--port", default=4999, type=int, help="Port of the host server"
|
||||
)
|
||||
start_registry_parser.add_argument(
|
||||
"--heartbeat-interval",
|
||||
default=5,
|
||||
type=float,
|
||||
help="Heartbeat interval in seconds",
|
||||
)
|
||||
start_registry_parser.add_argument(
|
||||
"--heartbeat-timeout",
|
||||
default=3,
|
||||
type=float,
|
||||
help="Heartbeat timeout in seconds",
|
||||
)
|
||||
|
||||
# Start client subcommand
|
||||
start_client_parser = subparsers.add_parser(
|
||||
"start_client", help="Start the client server"
|
||||
)
|
||||
start_client_parser.add_argument(
|
||||
"--host-ip",
|
||||
type=str,
|
||||
default="0.0.0.0",
|
||||
help="Host IP address (default: 0.0.0.0)",
|
||||
)
|
||||
start_client_parser.add_argument(
|
||||
"--host-port", type=int, default=4999, help="Host port number (default: 4999)"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.command == "start_registry":
|
||||
start_server(
|
||||
address=args.ip,
|
||||
port=args.port,
|
||||
heartbeat_interval=args.heartbeat_interval,
|
||||
heartbeat_timeout=args.heartbeat_timeout,
|
||||
)
|
||||
elif args.command == "start_client":
|
||||
start_client(host_ip=args.host_ip, host_port=args.host_port)
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
70
src/announce_server/client.py
Normal file
70
src/announce_server/client.py
Normal file
@ -0,0 +1,70 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
from http.server import HTTPServer, SimpleHTTPRequestHandler
|
||||
|
||||
import socketio
|
||||
|
||||
from .decorator import register_service
|
||||
from .get_ip import get_ip_address
|
||||
|
||||
http_server_thread = None
|
||||
|
||||
def start_client(host_ip="0.0.0.0", host_port=4999):
|
||||
@register_service(
|
||||
name="test client",
|
||||
ip=get_ip_address(),
|
||||
port=13373,
|
||||
host_ip=host_ip,
|
||||
host_port=host_port,
|
||||
)
|
||||
def server(port=13373):
|
||||
def start_server():
|
||||
global http_server_thread
|
||||
print(f"Serving HTTP on 0.0.0.0 port {port} (http://0.0.0.0:{port}/) ...")
|
||||
httpd = HTTPServer(("", port), SimpleHTTPRequestHandler)
|
||||
httpd.serve_forever()
|
||||
server_thread = threading.Thread(target=start_server)
|
||||
server_thread.daemon = True
|
||||
server_thread.start()
|
||||
server_thread.join()
|
||||
|
||||
def signal_handler(signal, frame):
|
||||
print("Cleaning up and shutting down...")
|
||||
|
||||
# If the HTTP server thread is running, shut it down
|
||||
if http_server_thread is not None and http_server_thread.is_alive():
|
||||
http_server_thread.shutdown()
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
signal.signal(signal.SIGTERM, signal_handler)
|
||||
try:
|
||||
server()
|
||||
signal.pause()
|
||||
except asyncio.exceptions.CancelledError:
|
||||
print("CancelledError")
|
||||
# signal_handler(signal.SIGINT, None)
|
||||
pass
|
||||
finally:
|
||||
print("Shutting down test client...")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Start announce_server client.")
|
||||
parser.add_argument(
|
||||
"--host-ip",
|
||||
type=str,
|
||||
default="0.0.0.0",
|
||||
help="Host IP address (default: 0.0.0.0)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--host-port", type=int, default=4999, help="Host port number (default: 4999)"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
start_client(args.host_ip, args.host_port)
|
@ -6,6 +6,7 @@ import socketio
|
||||
|
||||
sio = socketio.AsyncClient()
|
||||
|
||||
import signal
|
||||
|
||||
async def _announce_server(**kwargs):
|
||||
SERVER_NAME = kwargs.get("name", "server_1")
|
||||
@ -30,7 +31,9 @@ async def _announce_server(**kwargs):
|
||||
break
|
||||
except Exception as e:
|
||||
print(e)
|
||||
print("Failed to connect to host, retrying in 5 seconds")
|
||||
print(
|
||||
f"Failed to connect to host server, retrying in {RETRY_INTERVAL} seconds"
|
||||
)
|
||||
await asyncio.sleep(RETRY_INTERVAL)
|
||||
# await sio.connect(f'http://{HOST_SERVER_IP}:{HOST_SERVER_PORT}')
|
||||
print("Connected to host")
|
||||
@ -66,6 +69,38 @@ def register_service(task=None, **outer_kwargs):
|
||||
|
||||
return wrapper
|
||||
|
||||
# def register_service(task=None, **outer_kwargs):
|
||||
# if task is None:
|
||||
# return lambda f: register_service(f, **outer_kwargs)
|
||||
|
||||
# @wraps(task)
|
||||
# def wrapper(*args, **kwargs):
|
||||
# async def main(*args, **kwargs):
|
||||
# loop = asyncio.get_event_loop()
|
||||
# host_block_thread = loop.run_in_executor(None, task)
|
||||
|
||||
# # Announce the server to the host
|
||||
# await _announce_server(**outer_kwargs)
|
||||
|
||||
# # Set up signal handlers to clean up properly
|
||||
# def signal_handler(signum, frame):
|
||||
# print(f"Received signal {signum}. Cleaning up and shutting down...")
|
||||
# host_block_thread.cancel()
|
||||
# import sys
|
||||
# sys.exit(0)
|
||||
|
||||
# signal.signal(signal.SIGINT, signal_handler)
|
||||
# signal.signal(signal.SIGTERM, signal_handler)
|
||||
|
||||
# # Wait for host_block to finish or to be cancelled
|
||||
# try:
|
||||
# await host_block_thread
|
||||
# except asyncio.CancelledError:
|
||||
# pass
|
||||
|
||||
# return asyncio.run(main())
|
||||
|
||||
# return wrapper
|
||||
|
||||
def announce_server(*args, **kwargs):
|
||||
"""Wrapper for register_service"""
|
||||
|
194
src/announce_server/server.py
Normal file
194
src/announce_server/server.py
Normal file
@ -0,0 +1,194 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import signal
|
||||
|
||||
import socketio
|
||||
from aiohttp import web
|
||||
|
||||
sio = socketio.AsyncServer(async_mode="aiohttp")
|
||||
app = web.Application()
|
||||
sio.attach(app)
|
||||
|
||||
servers = {}
|
||||
|
||||
|
||||
async def available(request):
|
||||
"""
|
||||
Return a JSON response containing the available servers.
|
||||
|
||||
Returns
|
||||
-------
|
||||
aiohttp.web.Response
|
||||
JSON response containing the available servers.
|
||||
"""
|
||||
return web.json_response(servers)
|
||||
|
||||
|
||||
app.router.add_get("/available", available)
|
||||
|
||||
|
||||
@sio.event
|
||||
async def connect(sid, environ):
|
||||
"""Handle a new connection to the socket."""
|
||||
print("Connected:", sid)
|
||||
|
||||
|
||||
@sio.event
|
||||
async def register(sid, data):
|
||||
"""
|
||||
Register a new server.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
sid : str
|
||||
Socket ID of the connected server.
|
||||
data : dict
|
||||
Server information (name, IP, and port).
|
||||
"""
|
||||
server_info = data
|
||||
name = server_info["name"]
|
||||
|
||||
servers[name] = {"ip": server_info["ip"], "port": server_info["port"], "sid": sid}
|
||||
print(servers)
|
||||
|
||||
|
||||
@sio.event
|
||||
async def disconnect(sid):
|
||||
"""
|
||||
Handle a server disconnect.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
sid : str
|
||||
Socket ID of the disconnected server.
|
||||
"""
|
||||
print("Disconnected from server:", sid)
|
||||
for name, server in servers.items():
|
||||
if server["sid"] == sid:
|
||||
del servers[name]
|
||||
break
|
||||
|
||||
|
||||
async def heartbeat(sio, interval, timeout):
|
||||
"""
|
||||
Periodically send heartbeat messages to connected servers.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
sio : socketio.AsyncServer
|
||||
The socket.io server instance.
|
||||
interval : int
|
||||
The interval between heartbeat messages in seconds.
|
||||
timeout : int
|
||||
The timeout for waiting for a response in seconds.
|
||||
"""
|
||||
while True:
|
||||
await asyncio.sleep(interval)
|
||||
server_values_copy = list(servers.values())
|
||||
for server in server_values_copy:
|
||||
sid = server["sid"]
|
||||
try:
|
||||
print(f"Sending heartbeat to {sid}...")
|
||||
heartbeat_future = sio.emit("heartbeat", to=sid)
|
||||
await asyncio.wait_for(heartbeat_future, timeout=timeout)
|
||||
except (asyncio.TimeoutError, socketio.exceptions.TimeoutError):
|
||||
print(f"Server {sid} failed to respond to heartbeat after {timeout}s.")
|
||||
await sio.disconnect(sid)
|
||||
|
||||
|
||||
def create_exit_handler(loop, heartbeat_task):
|
||||
"""
|
||||
Create an exit handler for gracefully shutting down the server.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
loop : asyncio.AbstractEventLoop
|
||||
The event loop.
|
||||
heartbeat_task : asyncio.Task
|
||||
The heartbeat task.
|
||||
|
||||
Returns
|
||||
-------
|
||||
Callable
|
||||
An asynchronous exit handler function.
|
||||
"""
|
||||
|
||||
async def exit_handler(sig, frame):
|
||||
print("Shutting down host...")
|
||||
heartbeat_task.cancel()
|
||||
await loop.shutdown_asyncgens()
|
||||
loop.stop()
|
||||
|
||||
return exit_handler
|
||||
|
||||
|
||||
def start_server(address, port, heartbeat_interval, heartbeat_timeout):
|
||||
"""
|
||||
Run the main server loop.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
address : str
|
||||
The IP address of the server.
|
||||
port : int
|
||||
The port number of the server.
|
||||
heartbeat_interval : int
|
||||
The interval between heartbeat messages in seconds.
|
||||
heartbeat_timeout : int
|
||||
The timeout for waiting for a response in seconds.
|
||||
"""
|
||||
loop = asyncio.get_event_loop()
|
||||
# Python 3.7+, coroutines only:
|
||||
# heartbeat_task = loop.create_task(
|
||||
# heartbeat(sio, heartbeat_interval, heartbeat_timeout)
|
||||
# )
|
||||
# aiohttp_app = loop.create_task(web._run_app(app, host=address, port=port))
|
||||
|
||||
# Python 3.6+ compatible. Supports any awaitable:
|
||||
heartbeat_task = asyncio.ensure_future(
|
||||
heartbeat(sio, heartbeat_interval, heartbeat_timeout)
|
||||
)
|
||||
|
||||
aiohttp_app = asyncio.ensure_future(web._run_app(app, host=address, port=port))
|
||||
|
||||
exit_handler = create_exit_handler(loop, heartbeat_task)
|
||||
signal.signal(signal.SIGINT, exit_handler)
|
||||
signal.signal(signal.SIGTERM, exit_handler)
|
||||
|
||||
try:
|
||||
loop.run_until_complete(asyncio.gather(heartbeat_task, aiohttp_app))
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
finally:
|
||||
loop.run_until_complete(loop.shutdown_asyncgens())
|
||||
loop.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Start announce_server client.")
|
||||
|
||||
parser.add_argument("--ip", default="0.0.0.0", help="IP address of the host server")
|
||||
parser.add_argument(
|
||||
"--port", default=4999, type=int, help="Port of the host server"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--heartbeat-interval",
|
||||
default=5,
|
||||
type=float,
|
||||
help="Heartbeat interval in seconds",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--heartbeat-timeout",
|
||||
default=3,
|
||||
type=float,
|
||||
help="Heartbeat timeout in seconds",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
start_server(
|
||||
address=args.ip,
|
||||
port=args.port,
|
||||
heartbeat_interval=args.heartbeat_interval,
|
||||
heartbeat_timeout=args.heartbeat_timeout,
|
||||
)
|
Loading…
Reference in New Issue
Block a user