Compare commits

..

No commits in common. "f6b715cd650fc6a4c65e32e10fa4e4d88a34c15b" and "59bde66861e7d127709291fd807635495d631674" have entirely different histories.

6 changed files with 108 additions and 158 deletions

1
.gitignore vendored
View File

@ -6,4 +6,3 @@ __pycache__/
dist/ dist/
build/ build/
Pipfile* Pipfile*
.coverage

View File

@ -45,7 +45,7 @@ branch = True
[coverage:report] [coverage:report]
# Fail the test if coverage is below a certain percentage # Fail the test if coverage is below a certain percentage
fail_under = 85 fail_under = 90
show_missing = True show_missing = True
exclude_lines = exclude_lines =
if __name__ == .__main__.: if __name__ == .__main__.:

View File

@ -12,7 +12,6 @@ async def _announce_server(**kwargs):
SERVER_PORT = kwargs.get("port", 8000) SERVER_PORT = kwargs.get("port", 8000)
HOST_SERVER_IP = kwargs.get("host_ip", "0.0.0.0") HOST_SERVER_IP = kwargs.get("host_ip", "0.0.0.0")
HOST_SERVER_PORT = kwargs.get("host_port", 5000) HOST_SERVER_PORT = kwargs.get("host_port", 5000)
RETRY_INTERVAL = kwargs.get("retry_interval", 5)
@sio.event @sio.event
async def connect(): async def connect():
@ -30,7 +29,7 @@ async def _announce_server(**kwargs):
except Exception as e: except Exception as e:
print(e) print(e)
print("Failed to connect to host, retrying in 5 seconds") print("Failed to connect to host, retrying in 5 seconds")
await asyncio.sleep(RETRY_INTERVAL) await asyncio.sleep(5)
# await sio.connect(f'http://{HOST_SERVER_IP}:{HOST_SERVER_PORT}') # await sio.connect(f'http://{HOST_SERVER_IP}:{HOST_SERVER_PORT}')
print("Connected to host") print("Connected to host")
@ -45,6 +44,82 @@ async def _announce_server(**kwargs):
await main() await main()
# def announce_server(task=None, loop=None, **outer_kwargs):
# if task is None:
# return lambda f: announce_server(f, loop=loop, **outer_kwargs)
# @wraps(task)
# def wrapper(*args, **kwargs):
# async def main(*args, **kwargs):
# if loop is not None:
# host_block_thread = loop.run_in_executor(None, task)
# else:
# host_block_thread = asyncio.to_thread(task) # python 3.9+
# # Announce the server to the host
# await _announce_server(**outer_kwargs)
# # Wait for host_block to finish
# await host_block_thread
# if loop is not None:
# task = loop.create_task(main(*args, **kwargs))
# else:
# task = asyncio.run(main(*args, **kwargs))
# return task
# return wrapper
# def announce_server(task=None, loop=None, **outer_kwargs):
# if task is None:
# return lambda f: announce_server(f, loop=loop, **outer_kwargs)
# @wraps(task)
# async def wrapper(*args, **kwargs):
# if not asyncio.iscoroutinefunction(task):
# # If the decorated function is not a coroutine, wrap it in a coroutine
# task = asyncio.coroutine(task)
# if loop is not None:
# host_block_thread = loop.run_in_executor(None, task)
# else:
# host_block_thread = asyncio.to_thread(task)
# # Announce the server to the host
# await _announce_server(**outer_kwargs)
# # Wait for host_block to finish
# await host_block_thread
# return wrapper
# def announce_server(task=None, loop=None, **outer_kwargs):
# if task is None:
# return lambda f: announce_server(f, loop=loop, **outer_kwargs)
# if loop is None:
# loop = asyncio.get_event_loop()
# @wraps(task)
# def wrapper(*args, **kwargs):
# async def main(*args, **kwargs):
# if asyncio.iscoroutinefunction(task):
# # If the task is async, just await it
# host_block_thread = task(*args, **kwargs)
# else:
# host_block_thread = loop.run_in_executor(None, task, *args, **kwargs)
# # Announce the server to the host
# await _announce_server(**outer_kwargs)
# # Wait for host_block to finish
# await host_block_thread
# task = loop.create_task(main(*args, **kwargs))
# return task
# return wrapper
def announce_server(task=None, **outer_kwargs): def announce_server(task=None, **outer_kwargs):
if task is None: if task is None:
return lambda f: announce_server(f, **outer_kwargs) return lambda f: announce_server(f, **outer_kwargs)

View File

@ -1,48 +1,39 @@
import asyncio import asyncio
from unittest.mock import AsyncMock, call, patch import subprocess
from unittest.mock import MagicMock, patch
import pytest import pytest
import socketio
from announce_server.decorator import _announce_server from announce_server.decorator import _announce_server, announce_server
@pytest.mark.asyncio @patch("announce_server.decorator._announce_server")
async def test_announce_server(event_loop): def test_announce_server_decorator(mock_announce_server):
# Mock the socketio.AsyncClient to prevent actual connections # Mock the _announce_server function to prevent actual connections
mock_announce_server.return_value = MagicMock()
with patch("announce_server.decorator.sio") as mock_sio: # Decorate the sample function with announce_server
# Create a fake sio.connect() function that simulates a retry loop @announce_server(
async def fake_connect(*args, **kwargs): name="test_server",
await asyncio.sleep(0.1) ip="127.0.0.1",
raise RuntimeError("Failed to connect") port=8000,
host_ip="127.0.0.1",
host_port=5000,
)
def http_server():
server = subprocess.Popen(["python3", "-m", "http.server", "13373"])
yield
server.terminate()
server.wait()
# Set the fake connect function to be used as a side_effect for the mock # Run the decorated function
mock_sio.connect = AsyncMock(side_effect=fake_connect) http_server()
# Define the outer_kwargs for the _announce_server function # Check if the _announce_server function was called with the correct arguments
outer_kwargs = { mock_announce_server.assert_called_once_with(
"name": "test_server", name="test_server",
"ip": "127.0.0.1", ip="127.0.0.1",
"port": 8000, port=8000,
"host_ip": "127.0.0.1", host_ip="127.0.0.1",
"host_port": 5123, host_port=5000,
"retry_interval": 0.001, )
}
# Run the _announce_server function with a timeout to avoid infinite loop
try:
await asyncio.wait_for(_announce_server(**outer_kwargs), timeout=0.105)
except asyncio.TimeoutError:
pass
# Check if sio.connect was called multiple times due to the retry loop
assert mock_sio.connect.call_count >= 2
# Check if sio.connect was called with the correct arguments
mock_sio.connect.assert_has_calls(
[call("http://127.0.0.1:5123")] * mock_sio.connect.call_count
)
# Since we don't have access to the event handlers directly, we can't test them in this way.
# Instead, you could refactor the code to make the event handlers separate functions that can be tested independently.

View File

@ -1,39 +0,0 @@
import asyncio
import subprocess
from unittest.mock import MagicMock, patch
import pytest
from announce_server.decorator import _announce_server, announce_server
@patch("announce_server.decorator._announce_server")
def test_announce_server_decorator(mock_announce_server):
# Mock the _announce_server function to prevent actual connections
mock_announce_server.return_value = MagicMock()
# Decorate the sample function with announce_server
@announce_server(
name="test_server",
ip="127.0.0.1",
port=8000,
host_ip="127.0.0.1",
host_port=5000,
)
def http_server():
server = subprocess.Popen(["python3", "-m", "http.server", "13373"])
yield
server.terminate()
server.wait()
# Run the decorated function
http_server()
# Check if the _announce_server function was called with the correct arguments
mock_announce_server.assert_called_once_with(
name="test_server",
ip="127.0.0.1",
port=8000,
host_ip="127.0.0.1",
host_port=5000,
)

View File

@ -1,76 +0,0 @@
# def announce_server(task=None, loop=None, **outer_kwargs):
# if task is None:
# return lambda f: announce_server(f, loop=loop, **outer_kwargs)
# @wraps(task)
# def wrapper(*args, **kwargs):
# async def main(*args, **kwargs):
# if loop is not None:
# host_block_thread = loop.run_in_executor(None, task)
# else:
# host_block_thread = asyncio.to_thread(task) # python 3.9+
# # Announce the server to the host
# await _announce_server(**outer_kwargs)
# # Wait for host_block to finish
# await host_block_thread
# if loop is not None:
# task = loop.create_task(main(*args, **kwargs))
# else:
# task = asyncio.run(main(*args, **kwargs))
# return task
# return wrapper
# def announce_server(task=None, loop=None, **outer_kwargs):
# if task is None:
# return lambda f: announce_server(f, loop=loop, **outer_kwargs)
# @wraps(task)
# async def wrapper(*args, **kwargs):
# if not asyncio.iscoroutinefunction(task):
# # If the decorated function is not a coroutine, wrap it in a coroutine
# task = asyncio.coroutine(task)
# if loop is not None:
# host_block_thread = loop.run_in_executor(None, task)
# else:
# host_block_thread = asyncio.to_thread(task)
# # Announce the server to the host
# await _announce_server(**outer_kwargs)
# # Wait for host_block to finish
# await host_block_thread
# return wrapper
# def announce_server(task=None, loop=None, **outer_kwargs):
# if task is None:
# return lambda f: announce_server(f, loop=loop, **outer_kwargs)
# if loop is None:
# loop = asyncio.get_event_loop()
# @wraps(task)
# def wrapper(*args, **kwargs):
# async def main(*args, **kwargs):
# if asyncio.iscoroutinefunction(task):
# # If the task is async, just await it
# host_block_thread = task(*args, **kwargs)
# else:
# host_block_thread = loop.run_in_executor(None, task, *args, **kwargs)
# # Announce the server to the host
# await _announce_server(**outer_kwargs)
# # Wait for host_block to finish
# await host_block_thread
# task = loop.create_task(main(*args, **kwargs))
# return task
# return wrapper