adds interactivity to services.swpc.noaa.gov/images/animations. uses prefect for a backend to scrape images generate movies with ffmpeg. flask frontend at /iframe + proxy server via bare domain and API access via /api.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

265 lines
7.1 KiB

import os
import re
from datetime import datetime, timedelta
from io import BytesIO
from typing import Dict, Iterator, List
import httpx
# import imageio
import numpy as np
from moviepy.editor import ImageSequenceClip
from PIL import Image
from prefect import flow, task
from prefect.tasks import task_input_hash
BASE_URL = "https://services.swpc.noaa.gov/images/animations/geospace/"
@task(
retries=3,
retry_delay_seconds=5,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(minutes=2),
log_prints=True,
)
def get_file_links(url: str, ext: str | None = None) -> Iterator[str]:
response = httpx.get(url)
response.raise_for_status()
webpage_content = response.text
if ext is None:
print("Extension not supplied. Inferring (less efficient) png/jpg/jpeg")
exts = ["png", "jpg", "jpeg"]
else:
exts = [ext.lower()]
lines = webpage_content.split("\n")
for line in lines:
for ext in exts:
if ext in line: # need to parse the href link
start_pos = line.find('href="') + len('href="')
end_pos = line.find('"', start_pos)
href = line[start_pos:end_pos]
if href.endswith(f"latest.{ext}"):
print("Skipping latest")
continue
if href.endswith(ext):
if not href.startswith("http"):
href = url + href
yield href
break # Exit the inner loop to avoid duplicate yields for multiple exts
def url_tail_hash(context, parameters):
# return a constant
return parameters["url"].split("/")[-1]
def out_path_hash(context, parameters):
return parameters["output_path"] + f"_L{len(parameters['images'])}"
@task(
retries=5,
retry_delay_seconds=1,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(minutes=5),
result_storage_key="{parameters[url]}",
)
def get_content(url: str, params: Dict[str, any] | None = None):
response = httpx.get(f"https://{url}", params=params)
try:
response.raise_for_status()
return response.content
except httpx.HTTPStatusError:
return None
def preview_urls(urls):
print("URLS (head):")
print(urls[:5])
print("URLS (tail):")
print(urls[-5:])
@task(
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
)
def get_images(urls: List[str] | List[str], limit: int = 0):
if limit > 0:
print(f"Limiting to {limit} urls")
urls = urls[-limit:]
urls = [url.replace("https://", "").replace("http://", "") for url in urls]
preview_urls(urls)
futures = get_content.map(urls)
images = [
(urls[i], f.result()) for i, f in enumerate(futures) if f.result() is not None
]
return images
def extract_timestamp_from_url(url: str) -> str:
# Assuming the timestamp format is in the format shown in the screenshot
match = re.search(r"\d{8}_\d{6}", url)
return match.group(0) if match else ""
# @task(
# cache_key_fn=out_path_hash,
# cache_expiration=timedelta(minutes=3),
# result_storage_key="{parameters[output_path]}",
# )
# def create_animation(
# images: List[bytes], output_path: str, duration: float = 0.5
# ) -> None:
# if not images:
# raise ValueError("No images!")
# pil_images = [Image.open(BytesIO(img_data)).convert("RGB") for img_data in images]
# imageio.mimsave(output_path, pil_images, duration=duration)
# return output_path
def make_even_dimensions(image):
width, height = image.size
if width % 2 == 1:
width -= 1
if height % 2 == 1:
height -= 1
return image.resize((width, height), Image.ANTIALIAS)
def crop_to_even(image):
width, height = image.size
# Adjust width and height to be even
if width % 2 == 1:
width -= 1
if height % 2 == 1:
height -= 1
return image.crop((0, 0, width, height))
@task(
cache_key_fn=out_path_hash,
cache_expiration=timedelta(hours=4),
result_storage_key="{parameters[output_path]}",
)
def create_mp4_animation(images: List[bytes], output_path: str, fps: int = 24) -> None:
# Convert bytes to PIL images and then to numpy arrays
frames = [
np.array(crop_to_even(Image.open(BytesIO(img_data)).convert("RGB")))
for img_data in images
]
# Create a video clip from the image sequence
clip = ImageSequenceClip(frames, fps=fps)
# Write the video clip to a file
clip.write_videofile(
output_path,
codec="libx264",
ffmpeg_params=["-pix_fmt", "yuv420p"],
preset="medium",
bitrate="800k",
)
return output_path
def format_output_name(url: str, latest: bool = False):
if latest:
now = "latest"
else:
now = datetime.now().strftime("%Y%m%d-%H:%M:%S")
return (
url.replace("https://", "")
.replace("http://", "")
.replace("/", "-")
.replace(".", "_")
+ now
)
@task(
name="animate",
retries=0,
retry_delay_seconds=1,
log_prints=True,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(minutes=3),
)
def animate(
url: str = "https://services.swpc.noaa.gov/images/animations/geospace/density/",
ext: str = "png",
latest: bool = True,
limit: int = 0,
):
urls = get_file_links(url, ext)
if len(urls) == 0:
raise ValueError("No urls scraped")
images = get_images(list(sorted(urls)), limit=limit)
if len(images) == 0:
raise ValueError("No images retrieved.")
print(f"Retrieved {len(images)} images.")
sorted_images = sorted(images, key=lambda x: extract_timestamp_from_url(x[0]))
print("Head:")
print([u for u, i in sorted_images[:5]])
frames = [s[1] for s in sorted_images]
# create_animation(frames, "out.gif", duration=5)
out_name = format_output_name(url, latest=latest)
create_mp4_animation(frames, f"out/{out_name}.mp4")
def deploy_name():
return datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") + "Z"
@flow(
name="create-animations",
retries=0,
retry_delay_seconds=1,
log_prints=True,
flow_run_name=None,
timeout_seconds=90,
)
def create_animations(
url: str | List[str] = BASE_URL + "velocity/",
ext: str | None = None,
latest: bool = False,
limit: int = 0,
):
if isinstance(url, str):
url = [url]
futures = animate.map(url, ext, latest, limit)
return futures
if __name__ == "__main__":
# make_animation.from_source(
# source=TEST_REPO,
# entrypoint="noaa_animate.py:make_animation",
# ).deploy(
# name="noaa-animate", work_pool_name="process"
# )
from prefect.client.schemas.schedules import CronSchedule
sched = CronSchedule(cron="*/15 * * * *", timezone="America/Denver")
LINKS = [
BASE_URL + "density/",
BASE_URL + "velocity/",
BASE_URL + "pressure/",
]
sched_params = {
"latest": True,
"url": LINKS,
"ext": "png",
"limit": 0,
}
create_animations.serve(
"noaa-animate", limit=8, schedule=None, parameters=sched_params
)
# make_animation(url)