Compare commits
	
		
			1 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 4d9ad9ddb8 | 
| @ -35,8 +35,8 @@ RUN pip install --no-cache-dir --user -r /tmp/requirements.txt | |||||||
| COPY --chown=user:user profiles.default.toml /home/user/.prefect/profiles.toml | COPY --chown=user:user profiles.default.toml /home/user/.prefect/profiles.toml | ||||||
| 
 | 
 | ||||||
| # Copy the application files | # Copy the application files | ||||||
| COPY --chown=user:user app ./app | COPY --chown=user:user app ./ | ||||||
| COPY --chown=user:user noaa_animate.py . | # COPY --chown=user:user noaa_animate.py . | ||||||
| COPY --chown=user:user start.sh . | COPY --chown=user:user start.sh . | ||||||
| COPY --chown=user:user init_db.py . | COPY --chown=user:user init_db.py . | ||||||
| RUN chmod +x start.sh | RUN chmod +x start.sh | ||||||
|  | |||||||
| @ -1,3 +1,5 @@ | |||||||
|  | import threading | ||||||
|  | from flows import create_animations | ||||||
| import logging | import logging | ||||||
| import os | import os | ||||||
| import re | import re | ||||||
| @ -68,7 +70,7 @@ def handle_api(): | |||||||
| 
 | 
 | ||||||
| @app.route("/videos/<path:filename>") | @app.route("/videos/<path:filename>") | ||||||
| def custom_static(filename): | def custom_static(filename): | ||||||
|     return send_from_directory("../out", filename) |     return send_from_directory("./out", filename) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @app.route("/", methods=["GET"]) | @app.route("/", methods=["GET"]) | ||||||
| @ -150,4 +152,5 @@ def proxy(url="", proto=PROTO): | |||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| if __name__ == "__main__": | if __name__ == "__main__": | ||||||
|  |     threading.Thread(target=create_animations.serve, daemon=True, kwargs={"name": "noaa-animate", "limit": 8}).start() | ||||||
|     app.run(host="0.0.0.0", port=9021, debug=True) |     app.run(host="0.0.0.0", port=9021, debug=True) | ||||||
|  | |||||||
							
								
								
									
										264
									
								
								app/flows.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										264
									
								
								app/flows.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,264 @@ | |||||||
|  | import os | ||||||
|  | import re | ||||||
|  | from datetime import datetime, timedelta | ||||||
|  | from io import BytesIO | ||||||
|  | from typing import Dict, Iterator, List | ||||||
|  | 
 | ||||||
|  | import httpx | ||||||
|  | # import imageio | ||||||
|  | import numpy as np | ||||||
|  | from moviepy.editor import ImageSequenceClip | ||||||
|  | from PIL import Image | ||||||
|  | from prefect import flow, task | ||||||
|  | from prefect.tasks import task_input_hash | ||||||
|  | 
 | ||||||
|  | BASE_URL = "https://services.swpc.noaa.gov/images/animations/geospace/" | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @task( | ||||||
|  |     retries=3, | ||||||
|  |     retry_delay_seconds=5, | ||||||
|  |     cache_key_fn=task_input_hash, | ||||||
|  |     cache_expiration=timedelta(minutes=2), | ||||||
|  |     log_prints=True, | ||||||
|  | ) | ||||||
|  | def get_file_links(url: str, ext: str | None = None) -> Iterator[str]: | ||||||
|  |     response = httpx.get(url) | ||||||
|  |     response.raise_for_status() | ||||||
|  |     webpage_content = response.text | ||||||
|  |     if ext is None: | ||||||
|  |         print("Extension not supplied. Inferring (less efficient) png/jpg/jpeg") | ||||||
|  |         exts = ["png", "jpg", "jpeg"] | ||||||
|  |     else: | ||||||
|  |         exts = [ext.lower()] | ||||||
|  |     lines = webpage_content.split("\n") | ||||||
|  |     for line in lines: | ||||||
|  |         for ext in exts: | ||||||
|  |             if ext in line:  # need to parse the href link | ||||||
|  |                 start_pos = line.find('href="') + len('href="') | ||||||
|  |                 end_pos = line.find('"', start_pos) | ||||||
|  |                 href = line[start_pos:end_pos] | ||||||
|  |                 if href.endswith(f"latest.{ext}"): | ||||||
|  |                     print("Skipping latest") | ||||||
|  |                     continue | ||||||
|  |                 if href.endswith(ext): | ||||||
|  |                     if not href.startswith("http"): | ||||||
|  |                         href = url + href | ||||||
|  |                     yield href | ||||||
|  |                 break  # Exit the inner loop to avoid duplicate yields for multiple exts | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def url_tail_hash(context, parameters): | ||||||
|  |     # return a constant | ||||||
|  |     return parameters["url"].split("/")[-1] | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def out_path_hash(context, parameters): | ||||||
|  |     return parameters["output_path"] + f"_L{len(parameters['images'])}" | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @task( | ||||||
|  |     retries=5, | ||||||
|  |     retry_delay_seconds=1, | ||||||
|  |     cache_key_fn=task_input_hash, | ||||||
|  |     cache_expiration=timedelta(minutes=5), | ||||||
|  |     result_storage_key="{parameters[url]}", | ||||||
|  | ) | ||||||
|  | def get_content(url: str, params: Dict[str, any] | None = None): | ||||||
|  |     response = httpx.get(f"https://{url}", params=params) | ||||||
|  |     try: | ||||||
|  |         response.raise_for_status() | ||||||
|  |         return response.content | ||||||
|  |     except httpx.HTTPStatusError: | ||||||
|  |         return None | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def preview_urls(urls): | ||||||
|  |     print("URLS (head):") | ||||||
|  |     print(urls[:5]) | ||||||
|  |     print("URLS (tail):") | ||||||
|  |     print(urls[-5:]) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @task( | ||||||
|  |     cache_key_fn=task_input_hash, | ||||||
|  |     cache_expiration=timedelta(hours=1), | ||||||
|  | ) | ||||||
|  | def get_images(urls: List[str] | List[str], limit: int = 0): | ||||||
|  |     if limit > 0: | ||||||
|  |         print(f"Limiting to {limit} urls") | ||||||
|  |         urls = urls[-limit:] | ||||||
|  | 
 | ||||||
|  |     urls = [url.replace("https://", "").replace("http://", "") for url in urls] | ||||||
|  |     preview_urls(urls) | ||||||
|  | 
 | ||||||
|  |     futures = get_content.map(urls) | ||||||
|  |     images = [ | ||||||
|  |         (urls[i], f.result()) for i, f in enumerate(futures) if f.result() is not None | ||||||
|  |     ] | ||||||
|  |     return images | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def extract_timestamp_from_url(url: str) -> str: | ||||||
|  |     # Assuming the timestamp format is in the format shown in the screenshot | ||||||
|  |     match = re.search(r"\d{8}_\d{6}", url) | ||||||
|  |     return match.group(0) if match else "" | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | # @task( | ||||||
|  | #     cache_key_fn=out_path_hash, | ||||||
|  | #     cache_expiration=timedelta(minutes=3), | ||||||
|  | #     result_storage_key="{parameters[output_path]}", | ||||||
|  | # ) | ||||||
|  | # def create_animation( | ||||||
|  | #     images: List[bytes], output_path: str, duration: float = 0.5 | ||||||
|  | # ) -> None: | ||||||
|  | #     if not images: | ||||||
|  | #         raise ValueError("No images!") | ||||||
|  | #     pil_images = [Image.open(BytesIO(img_data)).convert("RGB") for img_data in images] | ||||||
|  | #     imageio.mimsave(output_path, pil_images, duration=duration) | ||||||
|  | #     return output_path | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def make_even_dimensions(image): | ||||||
|  |     width, height = image.size | ||||||
|  |     if width % 2 == 1: | ||||||
|  |         width -= 1 | ||||||
|  |     if height % 2 == 1: | ||||||
|  |         height -= 1 | ||||||
|  |     return image.resize((width, height), Image.ANTIALIAS) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def crop_to_even(image): | ||||||
|  |     width, height = image.size | ||||||
|  |     # Adjust width and height to be even | ||||||
|  |     if width % 2 == 1: | ||||||
|  |         width -= 1 | ||||||
|  |     if height % 2 == 1: | ||||||
|  |         height -= 1 | ||||||
|  |     return image.crop((0, 0, width, height)) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @task( | ||||||
|  |     cache_key_fn=out_path_hash, | ||||||
|  |     cache_expiration=timedelta(hours=4), | ||||||
|  |     result_storage_key="{parameters[output_path]}", | ||||||
|  | ) | ||||||
|  | def create_mp4_animation(images: List[bytes], output_path: str, fps: int = 24) -> None: | ||||||
|  |     # Convert bytes to PIL images and then to numpy arrays | ||||||
|  |     frames = [ | ||||||
|  |         np.array(crop_to_even(Image.open(BytesIO(img_data)).convert("RGB"))) | ||||||
|  |         for img_data in images | ||||||
|  |     ] | ||||||
|  | 
 | ||||||
|  |     # Create a video clip from the image sequence | ||||||
|  |     clip = ImageSequenceClip(frames, fps=fps) | ||||||
|  | 
 | ||||||
|  |     # Write the video clip to a file | ||||||
|  |     clip.write_videofile( | ||||||
|  |         output_path, | ||||||
|  |         codec="libx264", | ||||||
|  |         ffmpeg_params=["-pix_fmt", "yuv420p"], | ||||||
|  |         preset="medium", | ||||||
|  |         bitrate="800k", | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  |     return output_path | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def format_output_name(url: str, latest: bool = False): | ||||||
|  |     if latest: | ||||||
|  |         now = "latest" | ||||||
|  |     else: | ||||||
|  |         now = datetime.now().strftime("%Y%m%d-%H:%M:%S") | ||||||
|  |     return ( | ||||||
|  |         url.replace("https://", "") | ||||||
|  |         .replace("http://", "") | ||||||
|  |         .replace("/", "-") | ||||||
|  |         .replace(".", "_") | ||||||
|  |         + now | ||||||
|  |     ) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @task( | ||||||
|  |     name="animate", | ||||||
|  |     retries=0, | ||||||
|  |     retry_delay_seconds=1, | ||||||
|  |     log_prints=True, | ||||||
|  |     cache_key_fn=task_input_hash, | ||||||
|  |     cache_expiration=timedelta(minutes=3), | ||||||
|  | ) | ||||||
|  | def animate( | ||||||
|  |     url: str = "https://services.swpc.noaa.gov/images/animations/geospace/density/", | ||||||
|  |     ext: str = "png", | ||||||
|  |     latest: bool = True, | ||||||
|  |     limit: int = 0, | ||||||
|  | ): | ||||||
|  |     urls = get_file_links(url, ext) | ||||||
|  |     if len(urls) == 0: | ||||||
|  |         raise ValueError("No urls scraped") | ||||||
|  |     images = get_images(list(sorted(urls)), limit=limit) | ||||||
|  |     if len(images) == 0: | ||||||
|  |         raise ValueError("No images retrieved.") | ||||||
|  |     print(f"Retrieved {len(images)} images.") | ||||||
|  |     sorted_images = sorted(images, key=lambda x: extract_timestamp_from_url(x[0])) | ||||||
|  |     print("Head:") | ||||||
|  |     print([u for u, i in sorted_images[:5]]) | ||||||
|  |     frames = [s[1] for s in sorted_images] | ||||||
|  |     # create_animation(frames, "out.gif", duration=5) | ||||||
|  |     out_name = format_output_name(url, latest=latest) | ||||||
|  |     create_mp4_animation(frames, f"out/{out_name}.mp4") | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def deploy_name(): | ||||||
|  |     return datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") + "Z" | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @flow( | ||||||
|  |     name="create-animations", | ||||||
|  |     retries=0, | ||||||
|  |     retry_delay_seconds=1, | ||||||
|  |     log_prints=True, | ||||||
|  |     flow_run_name=None, | ||||||
|  |     timeout_seconds=90, | ||||||
|  | ) | ||||||
|  | def create_animations( | ||||||
|  |     url: str | List[str] = BASE_URL + "velocity/", | ||||||
|  |     ext: str | None = None, | ||||||
|  |     latest: bool = False, | ||||||
|  |     limit: int = 0, | ||||||
|  | ): | ||||||
|  |     if isinstance(url, str): | ||||||
|  |         url = [url] | ||||||
|  | 
 | ||||||
|  |     futures = animate.map(url, ext, latest, limit) | ||||||
|  |     return futures | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | if __name__ == "__main__": | ||||||
|  |     # make_animation.from_source( | ||||||
|  |     #     source=TEST_REPO, | ||||||
|  |     #     entrypoint="noaa_animate.py:make_animation", | ||||||
|  |     # ).deploy( | ||||||
|  |     #     name="noaa-animate", work_pool_name="process" | ||||||
|  |     # ) | ||||||
|  | 
 | ||||||
|  |     from prefect.client.schemas.schedules import CronSchedule | ||||||
|  | 
 | ||||||
|  |     sched = CronSchedule(cron="*/15 * * * *", timezone="America/Denver") | ||||||
|  | 
 | ||||||
|  |     LINKS = [ | ||||||
|  |         BASE_URL + "density/", | ||||||
|  |         BASE_URL + "velocity/", | ||||||
|  |         BASE_URL + "pressure/", | ||||||
|  |     ] | ||||||
|  |     sched_params = { | ||||||
|  |         "latest": True, | ||||||
|  |         "url": LINKS, | ||||||
|  |         "ext": "png", | ||||||
|  |         "limit": 0, | ||||||
|  |     } | ||||||
|  |     create_animations.serve( | ||||||
|  |         "noaa-animate", limit=8, schedule=None, parameters=sched_params | ||||||
|  |     ) | ||||||
|  |     # make_animation(url) | ||||||
							
								
								
									
										55
									
								
								init_db.py
									
									
									
									
									
								
							
							
						
						
									
										55
									
								
								init_db.py
									
									
									
									
									
								
							| @ -16,34 +16,40 @@ def initialize_db(db_path): | |||||||
|         logging.info(f"{db_path} exists")  # Log a message if the database exists |         logging.info(f"{db_path} exists")  # Log a message if the database exists | ||||||
|         return |         return | ||||||
|     try: |     try: | ||||||
|         with sqlite3.connect( |         # Using 'with' to ensure that the connection is closed automatically | ||||||
|             db_path |         with sqlite3.connect(db_path) as conn: | ||||||
|         ) as conn:  # Using 'with' to ensure that the connection is closed automatically |               configure_database(conn) | ||||||
|             configure_database(conn) |  | ||||||
|     except sqlite3.Error as e: |     except sqlite3.Error as e: | ||||||
|         logging.error(f"Database error: {e}")  # Log any SQLite errors that occur |         logging.error(f"Database error: {e}")  # Log any SQLite errors that occur | ||||||
|     except Exception as e: |     except Exception as e: | ||||||
|         logging.error( |         logging.error(f"Exception in initialize_db: {e}") | ||||||
|             f"Exception in initialize_db: {e}" |  | ||||||
|         )  # Log any other exceptions that occur |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def configure_database(conn): | def configure_database(conn): | ||||||
|     cursor = conn.cursor() |     cursor = conn.cursor() | ||||||
|     # Setting the journal mode to WAL for better concurrency |     # Setting the journal mode to WAL for better concurrency | ||||||
|     cursor.execute("PRAGMA journal_mode = WAL;") |     cursor.execute("PRAGMA journal_mode = WAL;") | ||||||
|     # Setting synchronous to NORMAL for a balance between speed and reliability |  | ||||||
|     cursor.execute("PRAGMA synchronous = NORMAL;") |  | ||||||
|     # Setting a busy timeout to prevent immediate failures when the database is locked |  | ||||||
|     cursor.execute("PRAGMA busy_timeout = 5000;") |  | ||||||
|     # Increasing the cache size to reduce the number of disk I/O operations |  | ||||||
|     cursor.execute("PRAGMA cache_size = -32000;") |  | ||||||
|     # Enabling memory-mapped I/O for potentially faster file operations |  | ||||||
|     cursor.execute("PRAGMA mmap_size = 536870912;") |  | ||||||
|     # Setting locking mode to EXCLUSIVE can enhance performance for single-user scenarios |  | ||||||
|     cursor.execute("PRAGMA locking_mode = EXCLUSIVE;") |  | ||||||
|     # Ensuring foreign key constraints are enforced for data integrity |     # Ensuring foreign key constraints are enforced for data integrity | ||||||
|     cursor.execute("PRAGMA foreign_keys = ON;") |     cursor.execute("PRAGMA foreign_keys = ON;") | ||||||
|  |     # disable legacy alter table behavior as it will cause problems during | ||||||
|  |     # migrations when tables are renamed as references would otherwise be retained | ||||||
|  |     # in some locations | ||||||
|  |     # https://www.sqlite.org/pragma.html#pragma_legacy_alter_table | ||||||
|  |     cursor.execute("PRAGMA legacy_alter_table=OFF") | ||||||
|  |     # when using the WAL, we do need to sync changes on every write. sqlite | ||||||
|  |     # recommends using 'normal' mode which is much faster | ||||||
|  |     # Setting synchronous to NORMAL for a balance between speed and reliability | ||||||
|  |     cursor.execute("PRAGMA synchronous = NORMAL;") | ||||||
|  |     # Increasing the cache size to reduce the number of disk I/O operations | ||||||
|  |     cursor.execute("PRAGMA cache_size = 20000;") | ||||||
|  |     # Enabling memory-mapped I/O for potentially faster file operations | ||||||
|  |     cursor.execute("PRAGMA mmap_size = 536870912;") | ||||||
|  |     # Setting a busy timeout to prevent immediate failures when the database is locked | ||||||
|  |     # setting the value very high allows for more 'concurrency' | ||||||
|  |     # without running into errors, but may result in slow api calls | ||||||
|  |     cursor.execute("PRAGMA busy_timeout = 60000;") | ||||||
|  |     # Setting locking mode to EXCLUSIVE can enhance performance for single-user scenarios | ||||||
|  |     # cursor.execute("PRAGMA locking_mode = EXCLUSIVE;") | ||||||
|     conn.commit()  # Commit all PRAGMA configurations |     conn.commit()  # Commit all PRAGMA configurations | ||||||
| 
 | 
 | ||||||
|     logging.info("Set up database with multi-user optimizations.") |     logging.info("Set up database with multi-user optimizations.") | ||||||
| @ -55,13 +61,11 @@ def batch_transact(db_path, operations): | |||||||
|             db_path |             db_path | ||||||
|         ) as conn:  # Ensure that the connection is handled properly |         ) as conn:  # Ensure that the connection is handled properly | ||||||
|             cursor = conn.cursor() |             cursor = conn.cursor() | ||||||
|             cursor.execute( |             # Start a transaction for batch operations | ||||||
|                 "BEGIN TRANSACTION;" |             cursor.execute("BEGIN TRANSACTION;")   | ||||||
|             )  # Start a transaction for batch operations |  | ||||||
|             for operation in operations: |             for operation in operations: | ||||||
|                 cursor.execute( |                 # Execute each SQL operation provided in the operations list | ||||||
|                     operation |                 cursor.execute(operation)   | ||||||
|                 )  # Execute each SQL operation provided in the operations list |  | ||||||
|             cursor.execute("COMMIT;")  # Commit all operations at once |             cursor.execute("COMMIT;")  # Commit all operations at once | ||||||
|     except sqlite3.Error as e: |     except sqlite3.Error as e: | ||||||
|         logging.error(f"Database error during batch transaction: {e}") |         logging.error(f"Database error during batch transaction: {e}") | ||||||
| @ -73,9 +77,8 @@ def maintenance(db_path): | |||||||
|     try: |     try: | ||||||
|         with sqlite3.connect(db_path) as conn: |         with sqlite3.connect(db_path) as conn: | ||||||
|             cursor = conn.cursor() |             cursor = conn.cursor() | ||||||
|             cursor.execute( |             # Optimize the database to maintain performance | ||||||
|                 "PRAGMA optimize;" |             cursor.execute("PRAGMA optimize;") | ||||||
|             )  # Optimize the database to maintain performance |  | ||||||
|             cursor.execute("VACUUM;")  # Reclaim space and defragment the database file |             cursor.execute("VACUUM;")  # Reclaim space and defragment the database file | ||||||
|     except sqlite3.Error as e: |     except sqlite3.Error as e: | ||||||
|         logging.error(f"Database error during maintenance: {e}") |         logging.error(f"Database error during maintenance: {e}") | ||||||
|  | |||||||
							
								
								
									
										6
									
								
								start.sh
									
									
									
									
									
								
							
							
						
						
									
										6
									
								
								start.sh
									
									
									
									
									
								
							| @ -1,14 +1,14 @@ | |||||||
| #!/bin/bash | #!/bin/bash | ||||||
| # Start the web app |  | ||||||
| cd app && make & |  | ||||||
| 
 | 
 | ||||||
| # Start Prefect in the background | # Start Prefect in the background | ||||||
| prefect server start --host 0.0.0.0 & | prefect server start --host 0.0.0.0 & | ||||||
| 
 | 
 | ||||||
| sleep 10 | sleep 10 | ||||||
|  | # Start the web app | ||||||
|  | make & | ||||||
| 
 | 
 | ||||||
| # Start the deployment | # Start the deployment | ||||||
| python noaa_animate.py & | python flows.py & | ||||||
| 
 | 
 | ||||||
| # Wait for all background jobs to finish | # Wait for all background jobs to finish | ||||||
| wait | wait | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user