commit 188481f0cf4325002035564e48bd2e363b7811ce Author: Michael Pilosov Date: Fri May 31 11:25:40 2024 -0600 new repo diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e53710c --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +out/ +*.gif +*.mp4 +__pycache__/ +*.db diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..f984865 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,51 @@ +# Use Python 3.10.11 slim image as the base image +FROM python:3.10.11-slim + +# Set environment variables to avoid writing .pyc files and buffering stdout and stderr +ENV PYTHONDONTWRITEBYTECODE 1 +ENV PYTHONUNBUFFERED 1 + +# Create a new user 'user' with UID and GID of 1000 +RUN groupadd -g 1000 user && \ + useradd -m -s /bin/bash -u 1000 -g user user + +# Set environment variables for the user install +ENV PATH=/home/user/.local/bin:$PATH + +# Install system dependencies as root +RUN apt-get update && \ + apt-get install -y --no-install-recommends make ffmpeg dumb-init && \ + rm -rf /var/lib/apt/lists/* + +# Set the home directory +WORKDIR /home/user/ +RUN chown -R user:user /home/user + +# Switch to non-root user before copying files and installing Python packages +USER user + +# Copy the requirements file to /tmp and install Python dependencies with user flag +COPY --chown=user:user requirements.txt /tmp/requirements.txt +RUN python -m pip install --upgrade pip +RUN pip install --no-cache-dir --user -r /tmp/requirements.txt + +# APPLICATION SETUP + +# Copy the default profiles file and set the appropriate permissions +COPY --chown=user:user profiles.default.toml /home/user/.prefect/profiles.toml + +# Copy the application files +COPY --chown=user:user app ./app +COPY --chown=user:user noaa_animate.py . +COPY --chown=user:user start.sh . +COPY --chown=user:user init_db.py . +RUN chmod +x start.sh +RUN mkdir -p out +RUN python init_db.py /home/user/.prefect/prefect.db + +# Set the correct ownership (recursively) for /app +# Already owned by user due to --chown in COPY commands + +# Define the entrypoint and the commands to execute +ENTRYPOINT ["dumb-init", "--"] +CMD ["./start.sh"] diff --git a/app/app.py b/app/app.py new file mode 100644 index 0000000..0dce227 --- /dev/null +++ b/app/app.py @@ -0,0 +1,147 @@ +import logging +import os +import re +import time +from datetime import datetime + +import requests +from flask import Flask, Response, render_template, request, send_from_directory +from prefect.deployments import run_deployment + +PORT = 9021 +app = Flask(__name__) + +logging.basicConfig(level=logging.DEBUG) + + +def deploy_name(): + return datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") + "Z" + + +def get_host(): + host = os.environ.get("LIGHTNING_CLOUDSPACE_HOST") + if host is None: + default_host = os.environ.get("HOST_NAME", "0.0.0.0") + return f"{default_host}:{PORT}" + else: + return f"{PORT}-{host}" + + +@app.route("/iframe") +@app.route("/iframe/") +@app.route("/iframe/") +def home(subpath="images/animations/"): + host = get_host() + initial_url = f"http://{host}/{subpath}" + api_url = f"http://{host}/api" + return render_template( + "index.html", initial_url=initial_url, host=f"http://{host}", api_url=api_url + ) + + +@app.route("/api", methods=["POST"]) +def handle_api(): + data = request.json # This assumes you're sending JSON data. + url = data.get("url") + logging.debug(f"Received URL: {url}") + params = {"url": url, "limit": 24 * 60, "ext": None} + response = run_deployment( + name="create-animations/noaa-animate", + parameters=params, + flow_run_name=f"{deploy_name()}.webapp.{url}", + ) + # response is a FlowRun - need to get what we want from it. + + # Process the data as needed. + return { + "status": "success", + "message": f"{url} processed successfully", + # "response": response, + }, 200 + + +@app.route("/videos/") +def custom_static(filename): + return send_from_directory("../out", filename) + + +@app.route("/", methods=["GET"]) +@app.route("/", methods=["GET"]) +def proxy(url=""): + original_base_url = "https://services.swpc.noaa.gov" + host = get_host() + proxy_base_url = f"http://{host}/" + + target_url = f"{original_base_url}/{url}" + logging.debug(f"Fetching URL: {target_url}") + + try: + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36" + } + response = requests.get(target_url, headers=headers, stream=True) + excluded_headers = [ + "content-encoding", + "content-length", + "transfer-encoding", + "connection", + ] + headers = [ + (name, value) + for (name, value) in response.raw.headers.items() + if name.lower() not in excluded_headers + ] + + if "text/html" in response.headers.get("Content-Type", ""): + content = response.content.decode("utf-8") + content = re.sub(r"'http://", "'https://", content) + content = re.sub( + r"https?://services.swpc.noaa.gov", proxy_base_url, content + ) + + content = content.replace( + "", + f""" + + """, + ) + content = content.encode("utf-8") + return Response(content, status=response.status_code, headers=headers) + else: + return Response( + response.content, status=response.status_code, headers=headers + ) + + except Exception as e: + logging.error(f"Error fetching URL: {e}") + return Response(f"Error fetching URL: {e}", status=500) + + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=9021, debug=True) diff --git a/app/makefile b/app/makefile new file mode 100644 index 0000000..8ea0bd4 --- /dev/null +++ b/app/makefile @@ -0,0 +1,5 @@ +start: + gunicorn --worker-class gevent --bind 0.0.0.0:9021 app:app + +dev: + python app.py diff --git a/app/templates/index.html b/app/templates/index.html new file mode 100644 index 0000000..bd1bbbe --- /dev/null +++ b/app/templates/index.html @@ -0,0 +1,271 @@ + + + + + + + Animator + + + + +

Animate a Folder of Images

+

Navigate to a folder of 60+ images.

+ +
+ + +
+ + + + + + + + + +
Source: Loading...
+ + + diff --git a/init_db.py b/init_db.py new file mode 100644 index 0000000..17a121c --- /dev/null +++ b/init_db.py @@ -0,0 +1,97 @@ +import argparse +import logging +import os +import sqlite3 + +# Setup basic configuration for logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) + + +def initialize_db(db_path): + # Check if the database file already exists + db_exists = os.path.exists(db_path) + if db_exists: + logging.info(f"{db_path} exists") # Log a message if the database exists + return + try: + with sqlite3.connect( + db_path + ) as conn: # Using 'with' to ensure that the connection is closed automatically + configure_database(conn) + except sqlite3.Error as e: + logging.error(f"Database error: {e}") # Log any SQLite errors that occur + except Exception as e: + logging.error( + f"Exception in initialize_db: {e}" + ) # Log any other exceptions that occur + + +def configure_database(conn): + cursor = conn.cursor() + # Setting the journal mode to WAL for better concurrency + cursor.execute("PRAGMA journal_mode = WAL;") + # Setting synchronous to NORMAL for a balance between speed and reliability + cursor.execute("PRAGMA synchronous = NORMAL;") + # Setting a busy timeout to prevent immediate failures when the database is locked + cursor.execute("PRAGMA busy_timeout = 5000;") + # Increasing the cache size to reduce the number of disk I/O operations + cursor.execute("PRAGMA cache_size = -32000;") + # Enabling memory-mapped I/O for potentially faster file operations + cursor.execute("PRAGMA mmap_size = 536870912;") + # Setting locking mode to EXCLUSIVE can enhance performance for single-user scenarios + cursor.execute("PRAGMA locking_mode = EXCLUSIVE;") + # Ensuring foreign key constraints are enforced for data integrity + cursor.execute("PRAGMA foreign_keys = ON;") + conn.commit() # Commit all PRAGMA configurations + + logging.info("Set up database with multi-user optimizations.") + + +def batch_transact(db_path, operations): + try: + with sqlite3.connect( + db_path + ) as conn: # Ensure that the connection is handled properly + cursor = conn.cursor() + cursor.execute( + "BEGIN TRANSACTION;" + ) # Start a transaction for batch operations + for operation in operations: + cursor.execute( + operation + ) # Execute each SQL operation provided in the operations list + cursor.execute("COMMIT;") # Commit all operations at once + except sqlite3.Error as e: + logging.error(f"Database error during batch transaction: {e}") + except Exception as e: + logging.error(f"Exception in batch_transact: {e}") + + +def maintenance(db_path): + try: + with sqlite3.connect(db_path) as conn: + cursor = conn.cursor() + cursor.execute( + "PRAGMA optimize;" + ) # Optimize the database to maintain performance + cursor.execute("VACUUM;") # Reclaim space and defragment the database file + except sqlite3.Error as e: + logging.error(f"Database error during maintenance: {e}") + except Exception as e: + logging.error(f"Exception in maintenance: {e}") + + +def parse_args(): + parser = argparse.ArgumentParser( + description="Initialize and manage an SQLite database." + ) + parser.add_argument("db_path", type=str, help="Path to the SQLite database file.") + args = parser.parse_args() + return args + + +if __name__ == "__main__": + args = parse_args() # Parse the command-line arguments for the database path + initialize_db(args.db_path) # Use the parsed path to initialize the database diff --git a/makefile b/makefile new file mode 100644 index 0000000..8e42780 --- /dev/null +++ b/makefile @@ -0,0 +1,10 @@ +run: build + docker run --rm -ti --name noaa -e HOST_NAME=localhost -p 9021:9021 -p 4200:4200 noaa + +build: + docker build -t noaa . + +lint: + isort --profile=black . + black . + diff --git a/noaa_animate.py b/noaa_animate.py new file mode 100644 index 0000000..5ca1e2e --- /dev/null +++ b/noaa_animate.py @@ -0,0 +1,266 @@ +import os +import re +from datetime import datetime, timedelta +from io import BytesIO +from typing import Dict, Iterator, List + +import httpx +# import imageio +import numpy as np +from moviepy.editor import ImageSequenceClip +from PIL import Image +from prefect import flow, task +from prefect.task_runners import ConcurrentTaskRunner +from prefect.tasks import task_input_hash + +BASE_URL = "https://services.swpc.noaa.gov/images/animations/geospace/" + + +@task( + retries=3, + retry_delay_seconds=5, + cache_key_fn=task_input_hash, + cache_expiration=timedelta(minutes=2), + log_prints=True, +) +def get_file_links(url: str, ext: str | None = None) -> Iterator[str]: + response = httpx.get(url) + response.raise_for_status() + webpage_content = response.text + if ext is None: + print("Extension not supplied. Inferring (less efficient) png/jpg/jpeg") + exts = ["png", "jpg", "jpeg"] + else: + exts = [ext.lower()] + lines = webpage_content.split("\n") + for line in lines: + for ext in exts: + if ext in line: # need to parse the href link + start_pos = line.find('href="') + len('href="') + end_pos = line.find('"', start_pos) + href = line[start_pos:end_pos] + if href.endswith(f"latest.{ext}"): + print("Skipping latest") + continue + if href.endswith(ext): + if not href.startswith("http"): + href = url + href + yield href + break # Exit the inner loop to avoid duplicate yields for multiple exts + + +def url_tail_hash(context, parameters): + # return a constant + return parameters["url"].split("/")[-1] + + +def out_path_hash(context, parameters): + return parameters["output_path"] + f"_L{len(parameters['images'])}" + + +@task( + retries=5, + retry_delay_seconds=1, + cache_key_fn=task_input_hash, + cache_expiration=timedelta(minutes=5), + result_storage_key="{parameters[url]}", +) +def get_content(url: str, params: Dict[str, any] | None = None): + response = httpx.get(f"https://{url}", params=params) + try: + response.raise_for_status() + return response.content + except httpx.HTTPStatusError: + return None + + +def preview_urls(urls): + print("URLS (head):") + print(urls[:5]) + print("URLS (tail):") + print(urls[-5:]) + + +@task( + cache_key_fn=task_input_hash, + cache_expiration=timedelta(hours=1), +) +def get_images(urls: List[str] | List[str], limit: int = 0): + if limit > 0: + print(f"Limiting to {limit} urls") + urls = urls[-limit:] + + urls = [url.replace("https://", "").replace("http://", "") for url in urls] + preview_urls(urls) + + futures = get_content.map(urls) + images = [ + (urls[i], f.result()) for i, f in enumerate(futures) if f.result() is not None + ] + return images + + +def extract_timestamp_from_url(url: str) -> str: + # Assuming the timestamp format is in the format shown in the screenshot + match = re.search(r"\d{8}_\d{6}", url) + return match.group(0) if match else "" + + +# @task( +# cache_key_fn=out_path_hash, +# cache_expiration=timedelta(minutes=3), +# result_storage_key="{parameters[output_path]}", +# ) +# def create_animation( +# images: List[bytes], output_path: str, duration: float = 0.5 +# ) -> None: +# if not images: +# raise ValueError("No images!") +# pil_images = [Image.open(BytesIO(img_data)).convert("RGB") for img_data in images] +# imageio.mimsave(output_path, pil_images, duration=duration) +# return output_path + + +def make_even_dimensions(image): + width, height = image.size + if width % 2 == 1: + width -= 1 + if height % 2 == 1: + height -= 1 + return image.resize((width, height), Image.ANTIALIAS) + + +def crop_to_even(image): + width, height = image.size + # Adjust width and height to be even + if width % 2 == 1: + width -= 1 + if height % 2 == 1: + height -= 1 + return image.crop((0, 0, width, height)) + + +@task( + cache_key_fn=out_path_hash, + cache_expiration=timedelta(hours=4), + result_storage_key="{parameters[output_path]}", +) +def create_mp4_animation(images: List[bytes], output_path: str, fps: int = 24) -> None: + # Convert bytes to PIL images and then to numpy arrays + frames = [ + np.array(crop_to_even(Image.open(BytesIO(img_data)).convert("RGB"))) + for img_data in images + ] + + # Create a video clip from the image sequence + clip = ImageSequenceClip(frames, fps=fps) + + # Write the video clip to a file + clip.write_videofile( + output_path, + codec="libx264", + ffmpeg_params=["-pix_fmt", "yuv420p"], + preset="medium", + bitrate="800k", + ) + + return output_path + + +def format_output_name(url: str, latest: bool = False): + if latest: + now = "latest" + else: + now = datetime.now().strftime("%Y%m%d-%H:%M:%S") + return ( + url.replace("https://", "") + .replace("http://", "") + .replace("/", "-") + .replace(".", "_") + + now + ) + + +@task( + name="animate", + retries=0, + retry_delay_seconds=1, + log_prints=True, + cache_key_fn=task_input_hash, + cache_expiration=timedelta(minutes=3), +) +def animate( + url: str = "https://services.swpc.noaa.gov/images/animations/geospace/density/", + ext: str = "png", + latest: bool = True, + limit: int = 0, +): + urls = get_file_links(url, ext) + if len(urls) == 0: + raise ValueError("No urls scraped") + images = get_images(list(sorted(urls)), limit=limit) + if len(images) == 0: + raise ValueError("No images retrieved.") + print(f"Retrieved {len(images)} images.") + sorted_images = sorted(images, key=lambda x: extract_timestamp_from_url(x[0])) + print("Head:") + print([u for u, i in sorted_images[:5]]) + frames = [s[1] for s in sorted_images] + # create_animation(frames, "out.gif", duration=5) + out_name = format_output_name(url, latest=latest) + create_mp4_animation(frames, f"out/{out_name}.mp4") + + +def deploy_name(): + return datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") + "Z" + + +@flow( + name="create-animations", + retries=0, + retry_delay_seconds=1, + log_prints=True, + task_runner=ConcurrentTaskRunner(), + flow_run_name=None, + timeout_seconds=90, +) +def create_animations( + url: str | List[str] = BASE_URL + "velocity/", + ext: str | None = None, + latest: bool = False, + limit: int = 0, +): + if isinstance(url, str): + url = [url] + + futures = animate.map(url, ext, latest, limit) + return futures + + +if __name__ == "__main__": + # make_animation.from_source( + # source=TEST_REPO, + # entrypoint="noaa_animate.py:make_animation", + # ).deploy( + # name="noaa-animate", work_pool_name="process" + # ) + + from prefect.client.schemas.schedules import CronSchedule + + sched = CronSchedule(cron="*/15 * * * *", timezone="America/Denver") + + links = [ + BASE_URL + "density/", + BASE_URL + "velocity/", + BASE_URL + "pressure/", + ] + sched_params = { + "latest": True, + "url": links, + "ext": "png", + "limit": 0, + } + create_animations.serve( + "noaa-animate", limit=8, schedule=None, parameters=sched_params + ) + # make_animation(url) diff --git a/profiles.default.toml b/profiles.default.toml new file mode 100644 index 0000000..3e054f9 --- /dev/null +++ b/profiles.default.toml @@ -0,0 +1,14 @@ +active = "default" +PREFECT_API_URL = "http://0.0.0.0:4200/api" + +[profiles.default] +PREFECT_TASK_SCHEDULING_MAX_SCHEDULED_QUEUE_SIZE = 4 + +PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE = 100 +PREFECT_API_SERVICES_SCHEDULER_ENABLED = true +PREFECT_API_SERVICES_SCHEDULER_INSERT_BATCH_SIZE = 500 +PREFECT_API_SERVICES_SCHEDULER_LOOP_SECONDS = 60 +PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS = 3 +PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS = 100 +PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME = '0:30:00' +PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME = '0 days, 8:00:00' diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..1fb8e3c --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +prefect==2.17.1 +Flask==3.0.3 +gunicorn==22.0.0 +gevent==24.2.1 +moviepy==1.0.3 +pillow==10.3.0 +requests==2.32.3 +httpx==0.27.0 +# imageio==2.34.1 diff --git a/start.sh b/start.sh new file mode 100755 index 0000000..a379cfc --- /dev/null +++ b/start.sh @@ -0,0 +1,15 @@ +#!/bin/bash +# Start the web app +cd app && make & + +# Start Prefect in the background +prefect server start --host 0.0.0.0 & + +sleep 10 + +# Start the deployment +python noaa_animate.py & + +# Wait for all background jobs to finish +wait +