You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
266 lines
7.2 KiB
266 lines
7.2 KiB
import os
|
|
import re
|
|
from datetime import datetime, timedelta
|
|
from io import BytesIO
|
|
from typing import Dict, Iterator, List
|
|
|
|
import httpx
|
|
# import imageio
|
|
import numpy as np
|
|
from moviepy.editor import ImageSequenceClip
|
|
from PIL import Image
|
|
from prefect import flow, task
|
|
from prefect.task_runners import ConcurrentTaskRunner
|
|
from prefect.tasks import task_input_hash
|
|
|
|
BASE_URL = "https://services.swpc.noaa.gov/images/animations/geospace/"
|
|
|
|
|
|
@task(
|
|
retries=3,
|
|
retry_delay_seconds=5,
|
|
cache_key_fn=task_input_hash,
|
|
cache_expiration=timedelta(minutes=2),
|
|
log_prints=True,
|
|
)
|
|
def get_file_links(url: str, ext: str | None = None) -> Iterator[str]:
|
|
response = httpx.get(url)
|
|
response.raise_for_status()
|
|
webpage_content = response.text
|
|
if ext is None:
|
|
print("Extension not supplied. Inferring (less efficient) png/jpg/jpeg")
|
|
exts = ["png", "jpg", "jpeg"]
|
|
else:
|
|
exts = [ext.lower()]
|
|
lines = webpage_content.split("\n")
|
|
for line in lines:
|
|
for ext in exts:
|
|
if ext in line: # need to parse the href link
|
|
start_pos = line.find('href="') + len('href="')
|
|
end_pos = line.find('"', start_pos)
|
|
href = line[start_pos:end_pos]
|
|
if href.endswith(f"latest.{ext}"):
|
|
print("Skipping latest")
|
|
continue
|
|
if href.endswith(ext):
|
|
if not href.startswith("http"):
|
|
href = url + href
|
|
yield href
|
|
break # Exit the inner loop to avoid duplicate yields for multiple exts
|
|
|
|
|
|
def url_tail_hash(context, parameters):
|
|
# return a constant
|
|
return parameters["url"].split("/")[-1]
|
|
|
|
|
|
def out_path_hash(context, parameters):
|
|
return parameters["output_path"] + f"_L{len(parameters['images'])}"
|
|
|
|
|
|
@task(
|
|
retries=5,
|
|
retry_delay_seconds=1,
|
|
cache_key_fn=task_input_hash,
|
|
cache_expiration=timedelta(minutes=5),
|
|
result_storage_key="{parameters[url]}",
|
|
)
|
|
def get_content(url: str, params: Dict[str, any] | None = None):
|
|
response = httpx.get(f"https://{url}", params=params)
|
|
try:
|
|
response.raise_for_status()
|
|
return response.content
|
|
except httpx.HTTPStatusError:
|
|
return None
|
|
|
|
|
|
def preview_urls(urls):
|
|
print("URLS (head):")
|
|
print(urls[:5])
|
|
print("URLS (tail):")
|
|
print(urls[-5:])
|
|
|
|
|
|
@task(
|
|
cache_key_fn=task_input_hash,
|
|
cache_expiration=timedelta(hours=1),
|
|
)
|
|
def get_images(urls: List[str] | List[str], limit: int = 0):
|
|
if limit > 0:
|
|
print(f"Limiting to {limit} urls")
|
|
urls = urls[-limit:]
|
|
|
|
urls = [url.replace("https://", "").replace("http://", "") for url in urls]
|
|
preview_urls(urls)
|
|
|
|
futures = get_content.map(urls)
|
|
images = [
|
|
(urls[i], f.result()) for i, f in enumerate(futures) if f.result() is not None
|
|
]
|
|
return images
|
|
|
|
|
|
def extract_timestamp_from_url(url: str) -> str:
|
|
# Assuming the timestamp format is in the format shown in the screenshot
|
|
match = re.search(r"\d{8}_\d{6}", url)
|
|
return match.group(0) if match else ""
|
|
|
|
|
|
# @task(
|
|
# cache_key_fn=out_path_hash,
|
|
# cache_expiration=timedelta(minutes=3),
|
|
# result_storage_key="{parameters[output_path]}",
|
|
# )
|
|
# def create_animation(
|
|
# images: List[bytes], output_path: str, duration: float = 0.5
|
|
# ) -> None:
|
|
# if not images:
|
|
# raise ValueError("No images!")
|
|
# pil_images = [Image.open(BytesIO(img_data)).convert("RGB") for img_data in images]
|
|
# imageio.mimsave(output_path, pil_images, duration=duration)
|
|
# return output_path
|
|
|
|
|
|
def make_even_dimensions(image):
|
|
width, height = image.size
|
|
if width % 2 == 1:
|
|
width -= 1
|
|
if height % 2 == 1:
|
|
height -= 1
|
|
return image.resize((width, height), Image.ANTIALIAS)
|
|
|
|
|
|
def crop_to_even(image):
|
|
width, height = image.size
|
|
# Adjust width and height to be even
|
|
if width % 2 == 1:
|
|
width -= 1
|
|
if height % 2 == 1:
|
|
height -= 1
|
|
return image.crop((0, 0, width, height))
|
|
|
|
|
|
@task(
|
|
cache_key_fn=out_path_hash,
|
|
cache_expiration=timedelta(hours=4),
|
|
result_storage_key="{parameters[output_path]}",
|
|
)
|
|
def create_mp4_animation(images: List[bytes], output_path: str, fps: int = 24) -> None:
|
|
# Convert bytes to PIL images and then to numpy arrays
|
|
frames = [
|
|
np.array(crop_to_even(Image.open(BytesIO(img_data)).convert("RGB")))
|
|
for img_data in images
|
|
]
|
|
|
|
# Create a video clip from the image sequence
|
|
clip = ImageSequenceClip(frames, fps=fps)
|
|
|
|
# Write the video clip to a file
|
|
clip.write_videofile(
|
|
output_path,
|
|
codec="libx264",
|
|
ffmpeg_params=["-pix_fmt", "yuv420p"],
|
|
preset="medium",
|
|
bitrate="800k",
|
|
)
|
|
|
|
return output_path
|
|
|
|
|
|
def format_output_name(url: str, latest: bool = False):
|
|
if latest:
|
|
now = "latest"
|
|
else:
|
|
now = datetime.now().strftime("%Y%m%d-%H:%M:%S")
|
|
return (
|
|
url.replace("https://", "")
|
|
.replace("http://", "")
|
|
.replace("/", "-")
|
|
.replace(".", "_")
|
|
+ now
|
|
)
|
|
|
|
|
|
@task(
|
|
name="animate",
|
|
retries=0,
|
|
retry_delay_seconds=1,
|
|
log_prints=True,
|
|
cache_key_fn=task_input_hash,
|
|
cache_expiration=timedelta(minutes=3),
|
|
)
|
|
def animate(
|
|
url: str = "https://services.swpc.noaa.gov/images/animations/geospace/density/",
|
|
ext: str = "png",
|
|
latest: bool = True,
|
|
limit: int = 0,
|
|
):
|
|
urls = get_file_links(url, ext)
|
|
if len(urls) == 0:
|
|
raise ValueError("No urls scraped")
|
|
images = get_images(list(sorted(urls)), limit=limit)
|
|
if len(images) == 0:
|
|
raise ValueError("No images retrieved.")
|
|
print(f"Retrieved {len(images)} images.")
|
|
sorted_images = sorted(images, key=lambda x: extract_timestamp_from_url(x[0]))
|
|
print("Head:")
|
|
print([u for u, i in sorted_images[:5]])
|
|
frames = [s[1] for s in sorted_images]
|
|
# create_animation(frames, "out.gif", duration=5)
|
|
out_name = format_output_name(url, latest=latest)
|
|
create_mp4_animation(frames, f"out/{out_name}.mp4")
|
|
|
|
|
|
def deploy_name():
|
|
return datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") + "Z"
|
|
|
|
|
|
@flow(
|
|
name="create-animations",
|
|
retries=0,
|
|
retry_delay_seconds=1,
|
|
log_prints=True,
|
|
task_runner=ConcurrentTaskRunner(),
|
|
flow_run_name=None,
|
|
timeout_seconds=90,
|
|
)
|
|
def create_animations(
|
|
url: str | List[str] = BASE_URL + "velocity/",
|
|
ext: str | None = None,
|
|
latest: bool = False,
|
|
limit: int = 0,
|
|
):
|
|
if isinstance(url, str):
|
|
url = [url]
|
|
|
|
futures = animate.map(url, ext, latest, limit)
|
|
return futures
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# make_animation.from_source(
|
|
# source=TEST_REPO,
|
|
# entrypoint="noaa_animate.py:make_animation",
|
|
# ).deploy(
|
|
# name="noaa-animate", work_pool_name="process"
|
|
# )
|
|
|
|
from prefect.client.schemas.schedules import CronSchedule
|
|
|
|
sched = CronSchedule(cron="*/15 * * * *", timezone="America/Denver")
|
|
|
|
links = [
|
|
BASE_URL + "density/",
|
|
BASE_URL + "velocity/",
|
|
BASE_URL + "pressure/",
|
|
]
|
|
sched_params = {
|
|
"latest": True,
|
|
"url": links,
|
|
"ext": "png",
|
|
"limit": 0,
|
|
}
|
|
create_animations.serve(
|
|
"noaa-animate", limit=8, schedule=None, parameters=sched_params
|
|
)
|
|
# make_animation(url)
|
|
|