cleanup of tests
This commit is contained in:
parent
7c934307c0
commit
cb9b364918
1
.gitignore
vendored
1
.gitignore
vendored
@ -6,3 +6,4 @@ __pycache__/
|
||||
dist/
|
||||
build/
|
||||
Pipfile*
|
||||
.coverage
|
||||
|
@ -45,82 +45,6 @@ async def _announce_server(**kwargs):
|
||||
await main()
|
||||
|
||||
|
||||
# def announce_server(task=None, loop=None, **outer_kwargs):
|
||||
# if task is None:
|
||||
# return lambda f: announce_server(f, loop=loop, **outer_kwargs)
|
||||
|
||||
# @wraps(task)
|
||||
# def wrapper(*args, **kwargs):
|
||||
# async def main(*args, **kwargs):
|
||||
# if loop is not None:
|
||||
# host_block_thread = loop.run_in_executor(None, task)
|
||||
# else:
|
||||
# host_block_thread = asyncio.to_thread(task) # python 3.9+
|
||||
|
||||
# # Announce the server to the host
|
||||
# await _announce_server(**outer_kwargs)
|
||||
|
||||
# # Wait for host_block to finish
|
||||
# await host_block_thread
|
||||
|
||||
# if loop is not None:
|
||||
# task = loop.create_task(main(*args, **kwargs))
|
||||
# else:
|
||||
# task = asyncio.run(main(*args, **kwargs))
|
||||
# return task
|
||||
# return wrapper
|
||||
|
||||
# def announce_server(task=None, loop=None, **outer_kwargs):
|
||||
# if task is None:
|
||||
# return lambda f: announce_server(f, loop=loop, **outer_kwargs)
|
||||
|
||||
# @wraps(task)
|
||||
# async def wrapper(*args, **kwargs):
|
||||
# if not asyncio.iscoroutinefunction(task):
|
||||
# # If the decorated function is not a coroutine, wrap it in a coroutine
|
||||
# task = asyncio.coroutine(task)
|
||||
# if loop is not None:
|
||||
# host_block_thread = loop.run_in_executor(None, task)
|
||||
# else:
|
||||
# host_block_thread = asyncio.to_thread(task)
|
||||
|
||||
# # Announce the server to the host
|
||||
# await _announce_server(**outer_kwargs)
|
||||
|
||||
# # Wait for host_block to finish
|
||||
# await host_block_thread
|
||||
|
||||
# return wrapper
|
||||
|
||||
|
||||
# def announce_server(task=None, loop=None, **outer_kwargs):
|
||||
# if task is None:
|
||||
# return lambda f: announce_server(f, loop=loop, **outer_kwargs)
|
||||
|
||||
# if loop is None:
|
||||
# loop = asyncio.get_event_loop()
|
||||
|
||||
# @wraps(task)
|
||||
# def wrapper(*args, **kwargs):
|
||||
# async def main(*args, **kwargs):
|
||||
# if asyncio.iscoroutinefunction(task):
|
||||
# # If the task is async, just await it
|
||||
# host_block_thread = task(*args, **kwargs)
|
||||
# else:
|
||||
# host_block_thread = loop.run_in_executor(None, task, *args, **kwargs)
|
||||
|
||||
# # Announce the server to the host
|
||||
# await _announce_server(**outer_kwargs)
|
||||
|
||||
# # Wait for host_block to finish
|
||||
# await host_block_thread
|
||||
|
||||
# task = loop.create_task(main(*args, **kwargs))
|
||||
# return task
|
||||
|
||||
# return wrapper
|
||||
|
||||
|
||||
def announce_server(task=None, **outer_kwargs):
|
||||
if task is None:
|
||||
return lambda f: announce_server(f, **outer_kwargs)
|
||||
|
@ -40,7 +40,9 @@ async def test_announce_server(event_loop):
|
||||
assert mock_sio.connect.call_count >= 2
|
||||
|
||||
# Check if sio.connect was called with the correct arguments
|
||||
mock_sio.connect.assert_has_calls([call("http://127.0.0.1:5123")] * mock_sio.connect.call_count)
|
||||
mock_sio.connect.assert_has_calls(
|
||||
[call("http://127.0.0.1:5123")] * mock_sio.connect.call_count
|
||||
)
|
||||
|
||||
# Since we don't have access to the event handlers directly, we can't test them in this way.
|
||||
# Instead, you could refactor the code to make the event handlers separate functions that can be tested independently.
|
||||
|
39
tests/test_decorator.py
Normal file
39
tests/test_decorator.py
Normal file
@ -0,0 +1,39 @@
|
||||
import asyncio
|
||||
import subprocess
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from announce_server.decorator import _announce_server, announce_server
|
||||
|
||||
|
||||
@patch("announce_server.decorator._announce_server")
|
||||
def test_announce_server_decorator(mock_announce_server):
|
||||
# Mock the _announce_server function to prevent actual connections
|
||||
mock_announce_server.return_value = MagicMock()
|
||||
|
||||
# Decorate the sample function with announce_server
|
||||
@announce_server(
|
||||
name="test_server",
|
||||
ip="127.0.0.1",
|
||||
port=8000,
|
||||
host_ip="127.0.0.1",
|
||||
host_port=5000,
|
||||
)
|
||||
def http_server():
|
||||
server = subprocess.Popen(["python3", "-m", "http.server", "13373"])
|
||||
yield
|
||||
server.terminate()
|
||||
server.wait()
|
||||
|
||||
# Run the decorated function
|
||||
http_server()
|
||||
|
||||
# Check if the _announce_server function was called with the correct arguments
|
||||
mock_announce_server.assert_called_once_with(
|
||||
name="test_server",
|
||||
ip="127.0.0.1",
|
||||
port=8000,
|
||||
host_ip="127.0.0.1",
|
||||
host_port=5000,
|
||||
)
|
Loading…
Reference in New Issue
Block a user