Compare commits

...

19 Commits

Author SHA1 Message Date
Michael Pilosov
e8d04cff7c working with two ctrl-c, previous commit does not 2023-03-20 00:33:59 -06:00
Michael Pilosov
38cd99fa73 RADICAL SIMPLIFICATION. working client. 2023-03-19 23:32:09 -06:00
Michael Pilosov
945f485243 REFACTOR/CLEANUP start_client 2023-03-19 23:14:48 -06:00
Michael Pilosov
fe15bbbf1e linting 2023-03-19 23:09:49 -06:00
Michael Pilosov
b87f5a329c working client 2023-03-19 23:09:40 -06:00
Michael Pilosov
e717f60e06 adding client 2023-03-19 21:45:47 -06:00
Michael Pilosov
25a7f003d8 makefile 2023-03-19 20:08:12 -06:00
Michael Pilosov
91b40234cb python 3.7+ 2023-03-19 20:03:22 -06:00
Michael Pilosov
91621847e5 readme update 2023-03-19 19:46:21 -06:00
Michael Pilosov
8e8d5cd36a support python 3.6+ by changing to ensure_future. 2023-03-19 19:33:52 -06:00
Michael Pilosov
0933f943a5 REGISTRY IS BORN 2023-03-19 19:21:42 -06:00
Michael Pilosov
32135e802b linting 2023-03-19 18:17:43 -06:00
Michael Pilosov
9996df0934 fix print statement 2023-03-19 18:06:06 -06:00
Michael Pilosov
c71bb5babf adding makefile 2023-03-19 18:02:11 -06:00
Michael Pilosov
8e14fc19e6 improve tests 2023-03-19 18:02:11 -06:00
Michael Pilosov
8c414fd87d backwards-compatible renaming 2023-03-19 18:02:11 -06:00
Michael Pilosov
0aa76cd98c add documentation 2023-03-19 18:02:11 -06:00
Michael Pilosov
6e15861ba6 updated instructions for build, new pkgs 2023-03-19 18:02:11 -06:00
Michael Pilosov
1d502d2107 updated instructions for build, new pkgs 2023-03-19 17:10:52 -06:00
11 changed files with 549 additions and 21 deletions

16
Makefile Normal file
View File

@ -0,0 +1,16 @@
build: clean
python -m build --sdist --wheel
clean:
rm -rf dist/ build/ .eggs/ .pytest_cache/ src/announce_server.egg-info/
pub: build
twine upload dist/*
install:
pip install -e .[dev,pub]
test-api:
docker run --rm -ti -p 4999:4999 python:3.7-slim bash -c "pip install -U announce_server~=0.0.2rc0; announce_server start_registry"
.PHONY: build clean pub install test-api

View File

@ -8,30 +8,79 @@ A Python library that announces a server to a host.
pip install announce-server
```
## Development
To install the developer dependencies required for testing and publishing:
```bash
pip install -e .[dev,pub]
```
## Build
To build the package, run:
```bash
pip install -e .[dev]
pip install -m build
rm -rf dist/ build/ .eggs/ .pytest_cache/ src/announce_server.egg-info/
python -m build --sdist --wheel
```
To publish:
```bash
twine upload dist/*
```
## Test
To run the tests, install the package with the `[dev]` option:
To run the tests, call:
```bash
pip install -e .[dev]
pytest
```
## Usage
```python
from announce_server.announce import announce_server
from announce_server import register_service
@announce_server(name="server_name", ip="server_ip", port=8000, host_ip="host_server_ip", host_port=5000)
@register_service(name="server_name", ip="server_ip", port=8000, host_ip="host_server_ip", host_port=5000, retry_interval=5)
def your_function():
pass
```
```
## Registry
The `announce_server` CLI provides a simple way to start a registry server. The registry server keeps track of available services and periodically sends heartbeat messages to ensure that registered services are still active.
### Command
```bash
announce_server start_registry [--address ADDRESS] [--port PORT] [--heartbeat_interval INTERVAL] [--heartbeat_timeout TIMEOUT]
```
### Arguments
- `--address ADDRESS`: The IP address of the server. Default: `0.0.0.0`.
- `--port PORT`: The port number of the server. Default: `4999`.
- `--heartbeat_interval INTERVAL`: The interval between heartbeat messages in seconds. Default: `5`.
- `--heartbeat_timeout TIMEOUT`: The timeout for waiting for a response in seconds. Default: `3`.
### Example
To start the registry server with the default configuration, run:
```bash
announce_server start_registry
```
The full syntax is equivalent to:
```bash
announce_server start_registry --address 0.0.0.0 --port 4999 --heartbeat_interval 5 --heartbeat_timeout 3
```
To test connections, run:
```bash
announce_server start_client --host-ip 0.0.0.0 --host-port 4999
```

View File

@ -12,7 +12,6 @@ classifiers =
Intended Audience :: Developers
License :: OSI Approved :: MIT License
Programming Language :: Python :: 3
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
@ -24,20 +23,34 @@ package_dir =
= src
packages = find:
install_requires =
python-socketio[asyncio_client]
python-socketio[asyncio_client]~=5.0.0
[options.packages.find]
where = src
[options.extras_require]
dev =
build
setuptools_scm
pytest
pytest-mock
pytest-asyncio
pytest-cov
# asynctest; python_version<'3.8'
pub =
build
wheel
setuptools_scm
twine
api =
aiohttp
[options.entry_points]
console_scripts =
announce_server = announce_server.__main__:main
[tool:pytest]
addopts = --cov --cov-report term-missing
[coverage:run]
source = announce_server
@ -50,5 +63,3 @@ show_missing = True
exclude_lines =
if __name__ == .__main__.:
[tool:pytest]
addopts = --cov --cov-report term-missing

View File

@ -7,5 +7,5 @@ setup(
"setuptools_scm",
],
use_scm_version=True,
python_requires=">=3.6",
python_requires=">=3.7", # because of python-socketsio[asyncio_client]
)

View File

@ -1,2 +1,8 @@
from .decorator import _announce_server, announce_server
from .decorator import (
_announce_server,
announce_server,
register_block,
register_server,
register_service,
)
from .get_ip import get_ip_address

View File

@ -0,0 +1,65 @@
import argparse
from announce_server import register_service
from announce_server.client import start_client
from announce_server.server import start_server
def main():
parser = argparse.ArgumentParser(description="Announce server CLI")
subparsers = parser.add_subparsers(dest="command", help="Available subcommands")
# Start registry subcommand
start_registry_parser = subparsers.add_parser(
"start_registry", help="Start the registry server"
)
start_registry_parser.add_argument(
"--ip", default="0.0.0.0", help="IP address of the host server"
)
start_registry_parser.add_argument(
"--port", default=4999, type=int, help="Port of the host server"
)
start_registry_parser.add_argument(
"--heartbeat-interval",
default=5,
type=float,
help="Heartbeat interval in seconds",
)
start_registry_parser.add_argument(
"--heartbeat-timeout",
default=3,
type=float,
help="Heartbeat timeout in seconds",
)
# Start client subcommand
start_client_parser = subparsers.add_parser(
"start_client", help="Start the client server"
)
start_client_parser.add_argument(
"--host-ip",
type=str,
default="0.0.0.0",
help="Host IP address (default: 0.0.0.0)",
)
start_client_parser.add_argument(
"--host-port", type=int, default=4999, help="Host port number (default: 4999)"
)
args = parser.parse_args()
if args.command == "start_registry":
start_server(
address=args.ip,
port=args.port,
heartbeat_interval=args.heartbeat_interval,
heartbeat_timeout=args.heartbeat_timeout,
)
elif args.command == "start_client":
start_client(host_ip=args.host_ip, host_port=args.host_port)
else:
parser.print_help()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,70 @@
import argparse
import asyncio
import signal
import sys
import threading
from http.server import HTTPServer, SimpleHTTPRequestHandler
import socketio
from .decorator import register_service
from .get_ip import get_ip_address
http_server_thread = None
def start_client(host_ip="0.0.0.0", host_port=4999):
@register_service(
name="test client",
ip=get_ip_address(),
port=13373,
host_ip=host_ip,
host_port=host_port,
)
def server(port=13373):
def start_server():
global http_server_thread
print(f"Serving HTTP on 0.0.0.0 port {port} (http://0.0.0.0:{port}/) ...")
httpd = HTTPServer(("", port), SimpleHTTPRequestHandler)
httpd.serve_forever()
server_thread = threading.Thread(target=start_server)
server_thread.daemon = True
server_thread.start()
server_thread.join()
def signal_handler(signal, frame):
print("Cleaning up and shutting down...")
# If the HTTP server thread is running, shut it down
if http_server_thread is not None and http_server_thread.is_alive():
http_server_thread.shutdown()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
server()
signal.pause()
except asyncio.exceptions.CancelledError:
print("CancelledError")
# signal_handler(signal.SIGINT, None)
pass
finally:
print("Shutting down test client...")
sys.exit(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Start announce_server client.")
parser.add_argument(
"--host-ip",
type=str,
default="0.0.0.0",
help="Host IP address (default: 0.0.0.0)",
)
parser.add_argument(
"--host-port", type=int, default=4999, help="Host port number (default: 4999)"
)
args = parser.parse_args()
start_client(args.host_ip, args.host_port)

View File

@ -1,10 +1,12 @@
import asyncio
import warnings
from functools import wraps
import socketio
sio = socketio.AsyncClient()
import signal
async def _announce_server(**kwargs):
SERVER_NAME = kwargs.get("name", "server_1")
@ -29,7 +31,9 @@ async def _announce_server(**kwargs):
break
except Exception as e:
print(e)
print("Failed to connect to host, retrying in 5 seconds")
print(
f"Failed to connect to host server, retrying in {RETRY_INTERVAL} seconds"
)
await asyncio.sleep(RETRY_INTERVAL)
# await sio.connect(f'http://{HOST_SERVER_IP}:{HOST_SERVER_PORT}')
print("Connected to host")
@ -45,9 +49,9 @@ async def _announce_server(**kwargs):
await main()
def announce_server(task=None, **outer_kwargs):
def register_service(task=None, **outer_kwargs):
if task is None:
return lambda f: announce_server(f, **outer_kwargs)
return lambda f: register_service(f, **outer_kwargs)
@wraps(task)
def wrapper(*args, **kwargs):
@ -64,3 +68,55 @@ def announce_server(task=None, **outer_kwargs):
return asyncio.run(main())
return wrapper
# def register_service(task=None, **outer_kwargs):
# if task is None:
# return lambda f: register_service(f, **outer_kwargs)
# @wraps(task)
# def wrapper(*args, **kwargs):
# async def main(*args, **kwargs):
# loop = asyncio.get_event_loop()
# host_block_thread = loop.run_in_executor(None, task)
# # Announce the server to the host
# await _announce_server(**outer_kwargs)
# # Set up signal handlers to clean up properly
# def signal_handler(signum, frame):
# print(f"Received signal {signum}. Cleaning up and shutting down...")
# host_block_thread.cancel()
# import sys
# sys.exit(0)
# signal.signal(signal.SIGINT, signal_handler)
# signal.signal(signal.SIGTERM, signal_handler)
# # Wait for host_block to finish or to be cancelled
# try:
# await host_block_thread
# except asyncio.CancelledError:
# pass
# return asyncio.run(main())
# return wrapper
def announce_server(*args, **kwargs):
"""Wrapper for register_service"""
warnings.warn(
"announce_server is deprecated, use register_service instead",
DeprecationWarning,
stacklevel=2,
)
return register_service(*args, **kwargs)
def register_block(*args, **kwargs):
"""Wrapper for register_service"""
return register_service(*args, **kwargs)
def register_server(*args, **kwargs):
"""Wrapper for register_service"""
return register_service(*args, **kwargs)

View File

@ -0,0 +1,194 @@
import argparse
import asyncio
import signal
import socketio
from aiohttp import web
sio = socketio.AsyncServer(async_mode="aiohttp")
app = web.Application()
sio.attach(app)
servers = {}
async def available(request):
"""
Return a JSON response containing the available servers.
Returns
-------
aiohttp.web.Response
JSON response containing the available servers.
"""
return web.json_response(servers)
app.router.add_get("/available", available)
@sio.event
async def connect(sid, environ):
"""Handle a new connection to the socket."""
print("Connected:", sid)
@sio.event
async def register(sid, data):
"""
Register a new server.
Parameters
----------
sid : str
Socket ID of the connected server.
data : dict
Server information (name, IP, and port).
"""
server_info = data
name = server_info["name"]
servers[name] = {"ip": server_info["ip"], "port": server_info["port"], "sid": sid}
print(servers)
@sio.event
async def disconnect(sid):
"""
Handle a server disconnect.
Parameters
----------
sid : str
Socket ID of the disconnected server.
"""
print("Disconnected from server:", sid)
for name, server in servers.items():
if server["sid"] == sid:
del servers[name]
break
async def heartbeat(sio, interval, timeout):
"""
Periodically send heartbeat messages to connected servers.
Parameters
----------
sio : socketio.AsyncServer
The socket.io server instance.
interval : int
The interval between heartbeat messages in seconds.
timeout : int
The timeout for waiting for a response in seconds.
"""
while True:
await asyncio.sleep(interval)
server_values_copy = list(servers.values())
for server in server_values_copy:
sid = server["sid"]
try:
print(f"Sending heartbeat to {sid}...")
heartbeat_future = sio.emit("heartbeat", to=sid)
await asyncio.wait_for(heartbeat_future, timeout=timeout)
except (asyncio.TimeoutError, socketio.exceptions.TimeoutError):
print(f"Server {sid} failed to respond to heartbeat after {timeout}s.")
await sio.disconnect(sid)
def create_exit_handler(loop, heartbeat_task):
"""
Create an exit handler for gracefully shutting down the server.
Parameters
----------
loop : asyncio.AbstractEventLoop
The event loop.
heartbeat_task : asyncio.Task
The heartbeat task.
Returns
-------
Callable
An asynchronous exit handler function.
"""
async def exit_handler(sig, frame):
print("Shutting down host...")
heartbeat_task.cancel()
await loop.shutdown_asyncgens()
loop.stop()
return exit_handler
def start_server(address, port, heartbeat_interval, heartbeat_timeout):
"""
Run the main server loop.
Parameters
----------
address : str
The IP address of the server.
port : int
The port number of the server.
heartbeat_interval : int
The interval between heartbeat messages in seconds.
heartbeat_timeout : int
The timeout for waiting for a response in seconds.
"""
loop = asyncio.get_event_loop()
# Python 3.7+, coroutines only:
# heartbeat_task = loop.create_task(
# heartbeat(sio, heartbeat_interval, heartbeat_timeout)
# )
# aiohttp_app = loop.create_task(web._run_app(app, host=address, port=port))
# Python 3.6+ compatible. Supports any awaitable:
heartbeat_task = asyncio.ensure_future(
heartbeat(sio, heartbeat_interval, heartbeat_timeout)
)
aiohttp_app = asyncio.ensure_future(web._run_app(app, host=address, port=port))
exit_handler = create_exit_handler(loop, heartbeat_task)
signal.signal(signal.SIGINT, exit_handler)
signal.signal(signal.SIGTERM, exit_handler)
try:
loop.run_until_complete(asyncio.gather(heartbeat_task, aiohttp_app))
except asyncio.CancelledError:
pass
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.stop()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Start announce_server client.")
parser.add_argument("--ip", default="0.0.0.0", help="IP address of the host server")
parser.add_argument(
"--port", default=4999, type=int, help="Port of the host server"
)
parser.add_argument(
"--heartbeat-interval",
default=5,
type=float,
help="Heartbeat interval in seconds",
)
parser.add_argument(
"--heartbeat-timeout",
default=3,
type=float,
help="Heartbeat timeout in seconds",
)
args = parser.parse_args()
start_server(
address=args.ip,
port=args.port,
heartbeat_interval=args.heartbeat_interval,
heartbeat_timeout=args.heartbeat_timeout,
)

61
tests/test_aliases.py Normal file
View File

@ -0,0 +1,61 @@
from unittest.mock import MagicMock, patch
import pytest
from announce_server.decorator import (
announce_server,
register_block,
register_server,
register_service,
)
@pytest.mark.parametrize(
"decorator_alias",
[
register_block,
register_server,
],
)
def test_alias_calls_register_service(decorator_alias):
test_args = (None, None)
test_kwargs = {
"name": "test_server",
"ip": "127.0.0.1",
"port": 8000,
"host_ip": "127.0.0.1",
"host_port": 5000,
}
with patch("announce_server.decorator.register_service") as mock_register_service:
mock_register_service.return_value = MagicMock()
decorator = decorator_alias(*test_args, **test_kwargs)
decorator(MagicMock())
mock_register_service.assert_called_once_with(*test_args, **test_kwargs)
@pytest.mark.parametrize(
"decorator_alias",
[
announce_server,
],
)
def test_deprecated_alias_calls_register_service(decorator_alias):
test_args = (None, None)
test_kwargs = {
"name": "test_server",
"ip": "127.0.0.1",
"port": 8000,
"host_ip": "127.0.0.1",
"host_port": 5000,
}
with patch("announce_server.decorator.register_service") as mock_register_service:
mock_register_service.return_value = MagicMock()
with pytest.warns(DeprecationWarning):
decorator = decorator_alias(*test_args, **test_kwargs)
decorator(MagicMock())
mock_register_service.assert_called_once_with(*test_args, **test_kwargs)

View File

@ -4,7 +4,7 @@ from unittest.mock import MagicMock, patch
import pytest
from announce_server.decorator import _announce_server, announce_server
from announce_server.decorator import _announce_server, register_service
@patch("announce_server.decorator._announce_server")
@ -13,7 +13,7 @@ def test_announce_server_decorator(mock_announce_server):
mock_announce_server.return_value = MagicMock()
# Decorate the sample function with announce_server
@announce_server(
@register_service(
name="test_server",
ip="127.0.0.1",
port=8000,