import os import re from datetime import datetime, timedelta from io import BytesIO from typing import Dict, Iterator, List import httpx # import imageio import numpy as np from moviepy.editor import ImageSequenceClip from PIL import Image from prefect import flow, task from prefect.task_runners import ConcurrentTaskRunner from prefect.tasks import task_input_hash BASE_URL = "https://services.swpc.noaa.gov/images/animations/geospace/" @task( retries=3, retry_delay_seconds=5, cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=2), log_prints=True, ) def get_file_links(url: str, ext: str | None = None) -> Iterator[str]: response = httpx.get(url) response.raise_for_status() webpage_content = response.text if ext is None: print("Extension not supplied. Inferring (less efficient) png/jpg/jpeg") exts = ["png", "jpg", "jpeg"] else: exts = [ext.lower()] lines = webpage_content.split("\n") for line in lines: for ext in exts: if ext in line: # need to parse the href link start_pos = line.find('href="') + len('href="') end_pos = line.find('"', start_pos) href = line[start_pos:end_pos] if href.endswith(f"latest.{ext}"): print("Skipping latest") continue if href.endswith(ext): if not href.startswith("http"): href = url + href yield href break # Exit the inner loop to avoid duplicate yields for multiple exts def url_tail_hash(context, parameters): # return a constant return parameters["url"].split("/")[-1] def out_path_hash(context, parameters): return parameters["output_path"] + f"_L{len(parameters['images'])}" @task( retries=5, retry_delay_seconds=1, cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=5), result_storage_key="{parameters[url]}", ) def get_content(url: str, params: Dict[str, any] | None = None): response = httpx.get(f"https://{url}", params=params) try: response.raise_for_status() return response.content except httpx.HTTPStatusError: return None def preview_urls(urls): print("URLS (head):") print(urls[:5]) print("URLS (tail):") print(urls[-5:]) @task( cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1), ) def get_images(urls: List[str] | List[str], limit: int = 0): if limit > 0: print(f"Limiting to {limit} urls") urls = urls[-limit:] urls = [url.replace("https://", "").replace("http://", "") for url in urls] preview_urls(urls) futures = get_content.map(urls) images = [ (urls[i], f.result()) for i, f in enumerate(futures) if f.result() is not None ] return images def extract_timestamp_from_url(url: str) -> str: # Assuming the timestamp format is in the format shown in the screenshot match = re.search(r"\d{8}_\d{6}", url) return match.group(0) if match else "" # @task( # cache_key_fn=out_path_hash, # cache_expiration=timedelta(minutes=3), # result_storage_key="{parameters[output_path]}", # ) # def create_animation( # images: List[bytes], output_path: str, duration: float = 0.5 # ) -> None: # if not images: # raise ValueError("No images!") # pil_images = [Image.open(BytesIO(img_data)).convert("RGB") for img_data in images] # imageio.mimsave(output_path, pil_images, duration=duration) # return output_path def make_even_dimensions(image): width, height = image.size if width % 2 == 1: width -= 1 if height % 2 == 1: height -= 1 return image.resize((width, height), Image.ANTIALIAS) def crop_to_even(image): width, height = image.size # Adjust width and height to be even if width % 2 == 1: width -= 1 if height % 2 == 1: height -= 1 return image.crop((0, 0, width, height)) @task( cache_key_fn=out_path_hash, cache_expiration=timedelta(hours=4), result_storage_key="{parameters[output_path]}", ) def create_mp4_animation(images: List[bytes], output_path: str, fps: int = 24) -> None: # Convert bytes to PIL images and then to numpy arrays frames = [ np.array(crop_to_even(Image.open(BytesIO(img_data)).convert("RGB"))) for img_data in images ] # Create a video clip from the image sequence clip = ImageSequenceClip(frames, fps=fps) # Write the video clip to a file clip.write_videofile( output_path, codec="libx264", ffmpeg_params=["-pix_fmt", "yuv420p"], preset="medium", bitrate="800k", ) return output_path def format_output_name(url: str, latest: bool = False): if latest: now = "latest" else: now = datetime.now().strftime("%Y%m%d-%H:%M:%S") return ( url.replace("https://", "") .replace("http://", "") .replace("/", "-") .replace(".", "_") + now ) @task( name="animate", retries=0, retry_delay_seconds=1, log_prints=True, cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=3), ) def animate( url: str = "https://services.swpc.noaa.gov/images/animations/geospace/density/", ext: str = "png", latest: bool = True, limit: int = 0, ): urls = get_file_links(url, ext) if len(urls) == 0: raise ValueError("No urls scraped") images = get_images(list(sorted(urls)), limit=limit) if len(images) == 0: raise ValueError("No images retrieved.") print(f"Retrieved {len(images)} images.") sorted_images = sorted(images, key=lambda x: extract_timestamp_from_url(x[0])) print("Head:") print([u for u, i in sorted_images[:5]]) frames = [s[1] for s in sorted_images] # create_animation(frames, "out.gif", duration=5) out_name = format_output_name(url, latest=latest) create_mp4_animation(frames, f"out/{out_name}.mp4") def deploy_name(): return datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") + "Z" @flow( name="create-animations", retries=0, retry_delay_seconds=1, log_prints=True, task_runner=ConcurrentTaskRunner(), flow_run_name=None, timeout_seconds=90, ) def create_animations( url: str | List[str] = BASE_URL + "velocity/", ext: str | None = None, latest: bool = False, limit: int = 0, ): if isinstance(url, str): url = [url] futures = animate.map(url, ext, latest, limit) return futures if __name__ == "__main__": # make_animation.from_source( # source=TEST_REPO, # entrypoint="noaa_animate.py:make_animation", # ).deploy( # name="noaa-animate", work_pool_name="process" # ) from prefect.client.schemas.schedules import CronSchedule sched = CronSchedule(cron="*/15 * * * *", timezone="America/Denver") links = [ BASE_URL + "density/", BASE_URL + "velocity/", BASE_URL + "pressure/", ] sched_params = { "latest": True, "url": links, "ext": "png", "limit": 0, } create_animations.serve( "noaa-animate", limit=8, schedule=None, parameters=sched_params ) # make_animation(url)