Compare commits
	
		
			3 Commits
		
	
	
		
			59bde66861
			...
			f6b715cd65
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | f6b715cd65 | ||
|  | cb9b364918 | ||
|  | 7c934307c0 | 
							
								
								
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										1
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @ -6,3 +6,4 @@ __pycache__/ | ||||
| dist/ | ||||
| build/ | ||||
| Pipfile* | ||||
| .coverage | ||||
|  | ||||
| @ -45,7 +45,7 @@ branch = True | ||||
| 
 | ||||
| [coverage:report] | ||||
| # Fail the test if coverage is below a certain percentage | ||||
| fail_under = 90 | ||||
| fail_under = 85 | ||||
| show_missing = True | ||||
| exclude_lines = | ||||
|     if __name__ == .__main__.: | ||||
|  | ||||
| @ -12,6 +12,7 @@ async def _announce_server(**kwargs): | ||||
|     SERVER_PORT = kwargs.get("port", 8000) | ||||
|     HOST_SERVER_IP = kwargs.get("host_ip", "0.0.0.0") | ||||
|     HOST_SERVER_PORT = kwargs.get("host_port", 5000) | ||||
|     RETRY_INTERVAL = kwargs.get("retry_interval", 5) | ||||
| 
 | ||||
|     @sio.event | ||||
|     async def connect(): | ||||
| @ -29,7 +30,7 @@ async def _announce_server(**kwargs): | ||||
|             except Exception as e: | ||||
|                 print(e) | ||||
|                 print("Failed to connect to host, retrying in 5 seconds") | ||||
|                 await asyncio.sleep(5) | ||||
|                 await asyncio.sleep(RETRY_INTERVAL) | ||||
|         # await sio.connect(f'http://{HOST_SERVER_IP}:{HOST_SERVER_PORT}') | ||||
|         print("Connected to host") | ||||
| 
 | ||||
| @ -44,82 +45,6 @@ async def _announce_server(**kwargs): | ||||
|     await main() | ||||
| 
 | ||||
| 
 | ||||
| # def announce_server(task=None, loop=None, **outer_kwargs): | ||||
| #     if task is None: | ||||
| #         return lambda f: announce_server(f, loop=loop, **outer_kwargs) | ||||
| 
 | ||||
| #     @wraps(task) | ||||
| #     def wrapper(*args, **kwargs): | ||||
| #         async def main(*args, **kwargs): | ||||
| #             if loop is not None: | ||||
| #                 host_block_thread = loop.run_in_executor(None, task) | ||||
| #             else: | ||||
| #                 host_block_thread = asyncio.to_thread(task) # python 3.9+ | ||||
| 
 | ||||
| #             # Announce the server to the host | ||||
| #             await _announce_server(**outer_kwargs) | ||||
| 
 | ||||
| #             # Wait for host_block to finish | ||||
| #             await host_block_thread | ||||
| 
 | ||||
| #         if loop is not None: | ||||
| #             task = loop.create_task(main(*args, **kwargs)) | ||||
| #         else: | ||||
| #             task = asyncio.run(main(*args, **kwargs)) | ||||
| #         return task | ||||
| #     return wrapper | ||||
| 
 | ||||
| # def announce_server(task=None, loop=None, **outer_kwargs): | ||||
| #     if task is None: | ||||
| #         return lambda f: announce_server(f, loop=loop, **outer_kwargs) | ||||
| 
 | ||||
| #     @wraps(task) | ||||
| #     async def wrapper(*args, **kwargs): | ||||
| #         if not asyncio.iscoroutinefunction(task): | ||||
| #             # If the decorated function is not a coroutine, wrap it in a coroutine | ||||
| #             task = asyncio.coroutine(task) | ||||
| #         if loop is not None: | ||||
| #             host_block_thread = loop.run_in_executor(None, task) | ||||
| #         else: | ||||
| #             host_block_thread = asyncio.to_thread(task) | ||||
| 
 | ||||
| #         # Announce the server to the host | ||||
| #         await _announce_server(**outer_kwargs) | ||||
| 
 | ||||
| #         # Wait for host_block to finish | ||||
| #         await host_block_thread | ||||
| 
 | ||||
| #     return wrapper | ||||
| 
 | ||||
| 
 | ||||
| # def announce_server(task=None, loop=None, **outer_kwargs): | ||||
| #     if task is None: | ||||
| #         return lambda f: announce_server(f, loop=loop, **outer_kwargs) | ||||
| 
 | ||||
| #     if loop is None: | ||||
| #         loop = asyncio.get_event_loop() | ||||
| 
 | ||||
| #     @wraps(task) | ||||
| #     def wrapper(*args, **kwargs): | ||||
| #         async def main(*args, **kwargs): | ||||
| #             if asyncio.iscoroutinefunction(task): | ||||
| #                 # If the task is async, just await it | ||||
| #                 host_block_thread = task(*args, **kwargs) | ||||
| #             else: | ||||
| #                 host_block_thread = loop.run_in_executor(None, task, *args, **kwargs) | ||||
| 
 | ||||
| #             # Announce the server to the host | ||||
| #             await _announce_server(**outer_kwargs) | ||||
| 
 | ||||
| #             # Wait for host_block to finish | ||||
| #             await host_block_thread | ||||
| 
 | ||||
| #         task = loop.create_task(main(*args, **kwargs)) | ||||
| #         return task | ||||
| 
 | ||||
| # return wrapper | ||||
| 
 | ||||
| 
 | ||||
| def announce_server(task=None, **outer_kwargs): | ||||
|     if task is None: | ||||
|         return lambda f: announce_server(f, **outer_kwargs) | ||||
|  | ||||
| @ -1,39 +1,48 @@ | ||||
| import asyncio | ||||
| import subprocess | ||||
| from unittest.mock import MagicMock, patch | ||||
| from unittest.mock import AsyncMock, call, patch | ||||
| 
 | ||||
| import pytest | ||||
| import socketio | ||||
| 
 | ||||
| from announce_server.decorator import _announce_server, announce_server | ||||
| from announce_server.decorator import _announce_server | ||||
| 
 | ||||
| 
 | ||||
| @patch("announce_server.decorator._announce_server") | ||||
| def test_announce_server_decorator(mock_announce_server): | ||||
|     # Mock the _announce_server function to prevent actual connections | ||||
|     mock_announce_server.return_value = MagicMock() | ||||
| @pytest.mark.asyncio | ||||
| async def test_announce_server(event_loop): | ||||
|     # Mock the socketio.AsyncClient to prevent actual connections | ||||
| 
 | ||||
|     # Decorate the sample function with announce_server | ||||
|     @announce_server( | ||||
|         name="test_server", | ||||
|         ip="127.0.0.1", | ||||
|         port=8000, | ||||
|         host_ip="127.0.0.1", | ||||
|         host_port=5000, | ||||
|     with patch("announce_server.decorator.sio") as mock_sio: | ||||
|         # Create a fake sio.connect() function that simulates a retry loop | ||||
|         async def fake_connect(*args, **kwargs): | ||||
|             await asyncio.sleep(0.1) | ||||
|             raise RuntimeError("Failed to connect") | ||||
| 
 | ||||
|         # Set the fake connect function to be used as a side_effect for the mock | ||||
|         mock_sio.connect = AsyncMock(side_effect=fake_connect) | ||||
| 
 | ||||
|         # Define the outer_kwargs for the _announce_server function | ||||
|         outer_kwargs = { | ||||
|             "name": "test_server", | ||||
|             "ip": "127.0.0.1", | ||||
|             "port": 8000, | ||||
|             "host_ip": "127.0.0.1", | ||||
|             "host_port": 5123, | ||||
|             "retry_interval": 0.001, | ||||
|         } | ||||
| 
 | ||||
|         # Run the _announce_server function with a timeout to avoid infinite loop | ||||
|         try: | ||||
|             await asyncio.wait_for(_announce_server(**outer_kwargs), timeout=0.105) | ||||
|         except asyncio.TimeoutError: | ||||
|             pass | ||||
| 
 | ||||
|         # Check if sio.connect was called multiple times due to the retry loop | ||||
|         assert mock_sio.connect.call_count >= 2 | ||||
| 
 | ||||
|         # Check if sio.connect was called with the correct arguments | ||||
|         mock_sio.connect.assert_has_calls( | ||||
|             [call("http://127.0.0.1:5123")] * mock_sio.connect.call_count | ||||
|         ) | ||||
|     def http_server(): | ||||
|         server = subprocess.Popen(["python3", "-m", "http.server", "13373"]) | ||||
|         yield | ||||
|         server.terminate() | ||||
|         server.wait() | ||||
| 
 | ||||
|     # Run the decorated function | ||||
|     http_server() | ||||
| 
 | ||||
|     # Check if the _announce_server function was called with the correct arguments | ||||
|     mock_announce_server.assert_called_once_with( | ||||
|         name="test_server", | ||||
|         ip="127.0.0.1", | ||||
|         port=8000, | ||||
|         host_ip="127.0.0.1", | ||||
|         host_port=5000, | ||||
|     ) | ||||
|         # Since we don't have access to the event handlers directly, we can't test them in this way. | ||||
|         # Instead, you could refactor the code to make the event handlers separate functions that can be tested independently. | ||||
|  | ||||
							
								
								
									
										39
									
								
								tests/test_decorator.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										39
									
								
								tests/test_decorator.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,39 @@ | ||||
| import asyncio | ||||
| import subprocess | ||||
| from unittest.mock import MagicMock, patch | ||||
| 
 | ||||
| import pytest | ||||
| 
 | ||||
| from announce_server.decorator import _announce_server, announce_server | ||||
| 
 | ||||
| 
 | ||||
| @patch("announce_server.decorator._announce_server") | ||||
| def test_announce_server_decorator(mock_announce_server): | ||||
|     # Mock the _announce_server function to prevent actual connections | ||||
|     mock_announce_server.return_value = MagicMock() | ||||
| 
 | ||||
|     # Decorate the sample function with announce_server | ||||
|     @announce_server( | ||||
|         name="test_server", | ||||
|         ip="127.0.0.1", | ||||
|         port=8000, | ||||
|         host_ip="127.0.0.1", | ||||
|         host_port=5000, | ||||
|     ) | ||||
|     def http_server(): | ||||
|         server = subprocess.Popen(["python3", "-m", "http.server", "13373"]) | ||||
|         yield | ||||
|         server.terminate() | ||||
|         server.wait() | ||||
| 
 | ||||
|     # Run the decorated function | ||||
|     http_server() | ||||
| 
 | ||||
|     # Check if the _announce_server function was called with the correct arguments | ||||
|     mock_announce_server.assert_called_once_with( | ||||
|         name="test_server", | ||||
|         ip="127.0.0.1", | ||||
|         port=8000, | ||||
|         host_ip="127.0.0.1", | ||||
|         host_port=5000, | ||||
|     ) | ||||
							
								
								
									
										76
									
								
								what_didnt_work.txt
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										76
									
								
								what_didnt_work.txt
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,76 @@ | ||||
| 
 | ||||
| # def announce_server(task=None, loop=None, **outer_kwargs): | ||||
| #     if task is None: | ||||
| #         return lambda f: announce_server(f, loop=loop, **outer_kwargs) | ||||
| 
 | ||||
| #     @wraps(task) | ||||
| #     def wrapper(*args, **kwargs): | ||||
| #         async def main(*args, **kwargs): | ||||
| #             if loop is not None: | ||||
| #                 host_block_thread = loop.run_in_executor(None, task) | ||||
| #             else: | ||||
| #                 host_block_thread = asyncio.to_thread(task) # python 3.9+ | ||||
| 
 | ||||
| #             # Announce the server to the host | ||||
| #             await _announce_server(**outer_kwargs) | ||||
| 
 | ||||
| #             # Wait for host_block to finish | ||||
| #             await host_block_thread | ||||
| 
 | ||||
| #         if loop is not None: | ||||
| #             task = loop.create_task(main(*args, **kwargs)) | ||||
| #         else: | ||||
| #             task = asyncio.run(main(*args, **kwargs)) | ||||
| #         return task | ||||
| #     return wrapper | ||||
| 
 | ||||
| # def announce_server(task=None, loop=None, **outer_kwargs): | ||||
| #     if task is None: | ||||
| #         return lambda f: announce_server(f, loop=loop, **outer_kwargs) | ||||
| 
 | ||||
| #     @wraps(task) | ||||
| #     async def wrapper(*args, **kwargs): | ||||
| #         if not asyncio.iscoroutinefunction(task): | ||||
| #             # If the decorated function is not a coroutine, wrap it in a coroutine | ||||
| #             task = asyncio.coroutine(task) | ||||
| #         if loop is not None: | ||||
| #             host_block_thread = loop.run_in_executor(None, task) | ||||
| #         else: | ||||
| #             host_block_thread = asyncio.to_thread(task) | ||||
| 
 | ||||
| #         # Announce the server to the host | ||||
| #         await _announce_server(**outer_kwargs) | ||||
| 
 | ||||
| #         # Wait for host_block to finish | ||||
| #         await host_block_thread | ||||
| 
 | ||||
| #     return wrapper | ||||
| 
 | ||||
| 
 | ||||
| # def announce_server(task=None, loop=None, **outer_kwargs): | ||||
| #     if task is None: | ||||
| #         return lambda f: announce_server(f, loop=loop, **outer_kwargs) | ||||
| 
 | ||||
| #     if loop is None: | ||||
| #         loop = asyncio.get_event_loop() | ||||
| 
 | ||||
| #     @wraps(task) | ||||
| #     def wrapper(*args, **kwargs): | ||||
| #         async def main(*args, **kwargs): | ||||
| #             if asyncio.iscoroutinefunction(task): | ||||
| #                 # If the task is async, just await it | ||||
| #                 host_block_thread = task(*args, **kwargs) | ||||
| #             else: | ||||
| #                 host_block_thread = loop.run_in_executor(None, task, *args, **kwargs) | ||||
| 
 | ||||
| #             # Announce the server to the host | ||||
| #             await _announce_server(**outer_kwargs) | ||||
| 
 | ||||
| #             # Wait for host_block to finish | ||||
| #             await host_block_thread | ||||
| 
 | ||||
| #         task = loop.create_task(main(*args, **kwargs)) | ||||
| #         return task | ||||
| 
 | ||||
| # return wrapper | ||||
| 
 | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user