diff --git a/Dockerfile b/Dockerfile index f984865..6ea6a7a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -35,8 +35,8 @@ RUN pip install --no-cache-dir --user -r /tmp/requirements.txt COPY --chown=user:user profiles.default.toml /home/user/.prefect/profiles.toml # Copy the application files -COPY --chown=user:user app ./app -COPY --chown=user:user noaa_animate.py . +COPY --chown=user:user app ./ +# COPY --chown=user:user noaa_animate.py . COPY --chown=user:user start.sh . COPY --chown=user:user init_db.py . RUN chmod +x start.sh diff --git a/app/app.py b/app/app.py index 415737a..7e195ee 100644 --- a/app/app.py +++ b/app/app.py @@ -1,3 +1,5 @@ +import threading +from flows import create_animations import logging import os import re @@ -68,7 +70,7 @@ def handle_api(): @app.route("/videos/") def custom_static(filename): - return send_from_directory("../out", filename) + return send_from_directory("./out", filename) @app.route("/", methods=["GET"]) @@ -150,4 +152,5 @@ def proxy(url="", proto=PROTO): if __name__ == "__main__": + threading.Thread(target=create_animations.serve, daemon=True, kwargs={"name": "noaa-animate", "limit": 8}).start() app.run(host="0.0.0.0", port=9021, debug=True) diff --git a/app/flows.py b/app/flows.py new file mode 100644 index 0000000..c42d470 --- /dev/null +++ b/app/flows.py @@ -0,0 +1,264 @@ +import os +import re +from datetime import datetime, timedelta +from io import BytesIO +from typing import Dict, Iterator, List + +import httpx +# import imageio +import numpy as np +from moviepy.editor import ImageSequenceClip +from PIL import Image +from prefect import flow, task +from prefect.tasks import task_input_hash + +BASE_URL = "https://services.swpc.noaa.gov/images/animations/geospace/" + + +@task( + retries=3, + retry_delay_seconds=5, + cache_key_fn=task_input_hash, + cache_expiration=timedelta(minutes=2), + log_prints=True, +) +def get_file_links(url: str, ext: str | None = None) -> Iterator[str]: + response = httpx.get(url) + response.raise_for_status() + webpage_content = response.text + if ext is None: + print("Extension not supplied. Inferring (less efficient) png/jpg/jpeg") + exts = ["png", "jpg", "jpeg"] + else: + exts = [ext.lower()] + lines = webpage_content.split("\n") + for line in lines: + for ext in exts: + if ext in line: # need to parse the href link + start_pos = line.find('href="') + len('href="') + end_pos = line.find('"', start_pos) + href = line[start_pos:end_pos] + if href.endswith(f"latest.{ext}"): + print("Skipping latest") + continue + if href.endswith(ext): + if not href.startswith("http"): + href = url + href + yield href + break # Exit the inner loop to avoid duplicate yields for multiple exts + + +def url_tail_hash(context, parameters): + # return a constant + return parameters["url"].split("/")[-1] + + +def out_path_hash(context, parameters): + return parameters["output_path"] + f"_L{len(parameters['images'])}" + + +@task( + retries=5, + retry_delay_seconds=1, + cache_key_fn=task_input_hash, + cache_expiration=timedelta(minutes=5), + result_storage_key="{parameters[url]}", +) +def get_content(url: str, params: Dict[str, any] | None = None): + response = httpx.get(f"https://{url}", params=params) + try: + response.raise_for_status() + return response.content + except httpx.HTTPStatusError: + return None + + +def preview_urls(urls): + print("URLS (head):") + print(urls[:5]) + print("URLS (tail):") + print(urls[-5:]) + + +@task( + cache_key_fn=task_input_hash, + cache_expiration=timedelta(hours=1), +) +def get_images(urls: List[str] | List[str], limit: int = 0): + if limit > 0: + print(f"Limiting to {limit} urls") + urls = urls[-limit:] + + urls = [url.replace("https://", "").replace("http://", "") for url in urls] + preview_urls(urls) + + futures = get_content.map(urls) + images = [ + (urls[i], f.result()) for i, f in enumerate(futures) if f.result() is not None + ] + return images + + +def extract_timestamp_from_url(url: str) -> str: + # Assuming the timestamp format is in the format shown in the screenshot + match = re.search(r"\d{8}_\d{6}", url) + return match.group(0) if match else "" + + +# @task( +# cache_key_fn=out_path_hash, +# cache_expiration=timedelta(minutes=3), +# result_storage_key="{parameters[output_path]}", +# ) +# def create_animation( +# images: List[bytes], output_path: str, duration: float = 0.5 +# ) -> None: +# if not images: +# raise ValueError("No images!") +# pil_images = [Image.open(BytesIO(img_data)).convert("RGB") for img_data in images] +# imageio.mimsave(output_path, pil_images, duration=duration) +# return output_path + + +def make_even_dimensions(image): + width, height = image.size + if width % 2 == 1: + width -= 1 + if height % 2 == 1: + height -= 1 + return image.resize((width, height), Image.ANTIALIAS) + + +def crop_to_even(image): + width, height = image.size + # Adjust width and height to be even + if width % 2 == 1: + width -= 1 + if height % 2 == 1: + height -= 1 + return image.crop((0, 0, width, height)) + + +@task( + cache_key_fn=out_path_hash, + cache_expiration=timedelta(hours=4), + result_storage_key="{parameters[output_path]}", +) +def create_mp4_animation(images: List[bytes], output_path: str, fps: int = 24) -> None: + # Convert bytes to PIL images and then to numpy arrays + frames = [ + np.array(crop_to_even(Image.open(BytesIO(img_data)).convert("RGB"))) + for img_data in images + ] + + # Create a video clip from the image sequence + clip = ImageSequenceClip(frames, fps=fps) + + # Write the video clip to a file + clip.write_videofile( + output_path, + codec="libx264", + ffmpeg_params=["-pix_fmt", "yuv420p"], + preset="medium", + bitrate="800k", + ) + + return output_path + + +def format_output_name(url: str, latest: bool = False): + if latest: + now = "latest" + else: + now = datetime.now().strftime("%Y%m%d-%H:%M:%S") + return ( + url.replace("https://", "") + .replace("http://", "") + .replace("/", "-") + .replace(".", "_") + + now + ) + + +@task( + name="animate", + retries=0, + retry_delay_seconds=1, + log_prints=True, + cache_key_fn=task_input_hash, + cache_expiration=timedelta(minutes=3), +) +def animate( + url: str = "https://services.swpc.noaa.gov/images/animations/geospace/density/", + ext: str = "png", + latest: bool = True, + limit: int = 0, +): + urls = get_file_links(url, ext) + if len(urls) == 0: + raise ValueError("No urls scraped") + images = get_images(list(sorted(urls)), limit=limit) + if len(images) == 0: + raise ValueError("No images retrieved.") + print(f"Retrieved {len(images)} images.") + sorted_images = sorted(images, key=lambda x: extract_timestamp_from_url(x[0])) + print("Head:") + print([u for u, i in sorted_images[:5]]) + frames = [s[1] for s in sorted_images] + # create_animation(frames, "out.gif", duration=5) + out_name = format_output_name(url, latest=latest) + create_mp4_animation(frames, f"out/{out_name}.mp4") + + +def deploy_name(): + return datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") + "Z" + + +@flow( + name="create-animations", + retries=0, + retry_delay_seconds=1, + log_prints=True, + flow_run_name=None, + timeout_seconds=90, +) +def create_animations( + url: str | List[str] = BASE_URL + "velocity/", + ext: str | None = None, + latest: bool = False, + limit: int = 0, +): + if isinstance(url, str): + url = [url] + + futures = animate.map(url, ext, latest, limit) + return futures + + +if __name__ == "__main__": + # make_animation.from_source( + # source=TEST_REPO, + # entrypoint="noaa_animate.py:make_animation", + # ).deploy( + # name="noaa-animate", work_pool_name="process" + # ) + + from prefect.client.schemas.schedules import CronSchedule + + sched = CronSchedule(cron="*/15 * * * *", timezone="America/Denver") + + LINKS = [ + BASE_URL + "density/", + BASE_URL + "velocity/", + BASE_URL + "pressure/", + ] + sched_params = { + "latest": True, + "url": LINKS, + "ext": "png", + "limit": 0, + } + create_animations.serve( + "noaa-animate", limit=8, schedule=None, parameters=sched_params + ) + # make_animation(url) diff --git a/init_db.py b/init_db.py index 17a121c..cafe4c3 100644 --- a/init_db.py +++ b/init_db.py @@ -16,34 +16,40 @@ def initialize_db(db_path): logging.info(f"{db_path} exists") # Log a message if the database exists return try: - with sqlite3.connect( - db_path - ) as conn: # Using 'with' to ensure that the connection is closed automatically - configure_database(conn) + # Using 'with' to ensure that the connection is closed automatically + with sqlite3.connect(db_path) as conn: + configure_database(conn) except sqlite3.Error as e: logging.error(f"Database error: {e}") # Log any SQLite errors that occur except Exception as e: - logging.error( - f"Exception in initialize_db: {e}" - ) # Log any other exceptions that occur + logging.error(f"Exception in initialize_db: {e}") def configure_database(conn): cursor = conn.cursor() # Setting the journal mode to WAL for better concurrency cursor.execute("PRAGMA journal_mode = WAL;") + # Ensuring foreign key constraints are enforced for data integrity + cursor.execute("PRAGMA foreign_keys = ON;") + # disable legacy alter table behavior as it will cause problems during + # migrations when tables are renamed as references would otherwise be retained + # in some locations + # https://www.sqlite.org/pragma.html#pragma_legacy_alter_table + cursor.execute("PRAGMA legacy_alter_table=OFF") + # when using the WAL, we do need to sync changes on every write. sqlite + # recommends using 'normal' mode which is much faster # Setting synchronous to NORMAL for a balance between speed and reliability cursor.execute("PRAGMA synchronous = NORMAL;") - # Setting a busy timeout to prevent immediate failures when the database is locked - cursor.execute("PRAGMA busy_timeout = 5000;") # Increasing the cache size to reduce the number of disk I/O operations - cursor.execute("PRAGMA cache_size = -32000;") + cursor.execute("PRAGMA cache_size = 20000;") # Enabling memory-mapped I/O for potentially faster file operations cursor.execute("PRAGMA mmap_size = 536870912;") + # Setting a busy timeout to prevent immediate failures when the database is locked + # setting the value very high allows for more 'concurrency' + # without running into errors, but may result in slow api calls + cursor.execute("PRAGMA busy_timeout = 60000;") # Setting locking mode to EXCLUSIVE can enhance performance for single-user scenarios - cursor.execute("PRAGMA locking_mode = EXCLUSIVE;") - # Ensuring foreign key constraints are enforced for data integrity - cursor.execute("PRAGMA foreign_keys = ON;") + # cursor.execute("PRAGMA locking_mode = EXCLUSIVE;") conn.commit() # Commit all PRAGMA configurations logging.info("Set up database with multi-user optimizations.") @@ -55,13 +61,11 @@ def batch_transact(db_path, operations): db_path ) as conn: # Ensure that the connection is handled properly cursor = conn.cursor() - cursor.execute( - "BEGIN TRANSACTION;" - ) # Start a transaction for batch operations + # Start a transaction for batch operations + cursor.execute("BEGIN TRANSACTION;") for operation in operations: - cursor.execute( - operation - ) # Execute each SQL operation provided in the operations list + # Execute each SQL operation provided in the operations list + cursor.execute(operation) cursor.execute("COMMIT;") # Commit all operations at once except sqlite3.Error as e: logging.error(f"Database error during batch transaction: {e}") @@ -73,9 +77,8 @@ def maintenance(db_path): try: with sqlite3.connect(db_path) as conn: cursor = conn.cursor() - cursor.execute( - "PRAGMA optimize;" - ) # Optimize the database to maintain performance + # Optimize the database to maintain performance + cursor.execute("PRAGMA optimize;") cursor.execute("VACUUM;") # Reclaim space and defragment the database file except sqlite3.Error as e: logging.error(f"Database error during maintenance: {e}") diff --git a/start.sh b/start.sh index a379cfc..d2217e0 100755 --- a/start.sh +++ b/start.sh @@ -1,14 +1,14 @@ #!/bin/bash -# Start the web app -cd app && make & # Start Prefect in the background prefect server start --host 0.0.0.0 & sleep 10 +# Start the web app +make & # Start the deployment -python noaa_animate.py & +python flows.py & # Wait for all background jobs to finish wait