Compare commits
1 Commits
Author | SHA1 | Date |
---|---|---|
Michael Pilosov | 4d9ad9ddb8 | 6 months ago |
5 changed files with 298 additions and 28 deletions
@ -0,0 +1,264 @@ |
|||||
|
import os |
||||
|
import re |
||||
|
from datetime import datetime, timedelta |
||||
|
from io import BytesIO |
||||
|
from typing import Dict, Iterator, List |
||||
|
|
||||
|
import httpx |
||||
|
# import imageio |
||||
|
import numpy as np |
||||
|
from moviepy.editor import ImageSequenceClip |
||||
|
from PIL import Image |
||||
|
from prefect import flow, task |
||||
|
from prefect.tasks import task_input_hash |
||||
|
|
||||
|
BASE_URL = "https://services.swpc.noaa.gov/images/animations/geospace/" |
||||
|
|
||||
|
|
||||
|
@task( |
||||
|
retries=3, |
||||
|
retry_delay_seconds=5, |
||||
|
cache_key_fn=task_input_hash, |
||||
|
cache_expiration=timedelta(minutes=2), |
||||
|
log_prints=True, |
||||
|
) |
||||
|
def get_file_links(url: str, ext: str | None = None) -> Iterator[str]: |
||||
|
response = httpx.get(url) |
||||
|
response.raise_for_status() |
||||
|
webpage_content = response.text |
||||
|
if ext is None: |
||||
|
print("Extension not supplied. Inferring (less efficient) png/jpg/jpeg") |
||||
|
exts = ["png", "jpg", "jpeg"] |
||||
|
else: |
||||
|
exts = [ext.lower()] |
||||
|
lines = webpage_content.split("\n") |
||||
|
for line in lines: |
||||
|
for ext in exts: |
||||
|
if ext in line: # need to parse the href link |
||||
|
start_pos = line.find('href="') + len('href="') |
||||
|
end_pos = line.find('"', start_pos) |
||||
|
href = line[start_pos:end_pos] |
||||
|
if href.endswith(f"latest.{ext}"): |
||||
|
print("Skipping latest") |
||||
|
continue |
||||
|
if href.endswith(ext): |
||||
|
if not href.startswith("http"): |
||||
|
href = url + href |
||||
|
yield href |
||||
|
break # Exit the inner loop to avoid duplicate yields for multiple exts |
||||
|
|
||||
|
|
||||
|
def url_tail_hash(context, parameters): |
||||
|
# return a constant |
||||
|
return parameters["url"].split("/")[-1] |
||||
|
|
||||
|
|
||||
|
def out_path_hash(context, parameters): |
||||
|
return parameters["output_path"] + f"_L{len(parameters['images'])}" |
||||
|
|
||||
|
|
||||
|
@task( |
||||
|
retries=5, |
||||
|
retry_delay_seconds=1, |
||||
|
cache_key_fn=task_input_hash, |
||||
|
cache_expiration=timedelta(minutes=5), |
||||
|
result_storage_key="{parameters[url]}", |
||||
|
) |
||||
|
def get_content(url: str, params: Dict[str, any] | None = None): |
||||
|
response = httpx.get(f"https://{url}", params=params) |
||||
|
try: |
||||
|
response.raise_for_status() |
||||
|
return response.content |
||||
|
except httpx.HTTPStatusError: |
||||
|
return None |
||||
|
|
||||
|
|
||||
|
def preview_urls(urls): |
||||
|
print("URLS (head):") |
||||
|
print(urls[:5]) |
||||
|
print("URLS (tail):") |
||||
|
print(urls[-5:]) |
||||
|
|
||||
|
|
||||
|
@task( |
||||
|
cache_key_fn=task_input_hash, |
||||
|
cache_expiration=timedelta(hours=1), |
||||
|
) |
||||
|
def get_images(urls: List[str] | List[str], limit: int = 0): |
||||
|
if limit > 0: |
||||
|
print(f"Limiting to {limit} urls") |
||||
|
urls = urls[-limit:] |
||||
|
|
||||
|
urls = [url.replace("https://", "").replace("http://", "") for url in urls] |
||||
|
preview_urls(urls) |
||||
|
|
||||
|
futures = get_content.map(urls) |
||||
|
images = [ |
||||
|
(urls[i], f.result()) for i, f in enumerate(futures) if f.result() is not None |
||||
|
] |
||||
|
return images |
||||
|
|
||||
|
|
||||
|
def extract_timestamp_from_url(url: str) -> str: |
||||
|
# Assuming the timestamp format is in the format shown in the screenshot |
||||
|
match = re.search(r"\d{8}_\d{6}", url) |
||||
|
return match.group(0) if match else "" |
||||
|
|
||||
|
|
||||
|
# @task( |
||||
|
# cache_key_fn=out_path_hash, |
||||
|
# cache_expiration=timedelta(minutes=3), |
||||
|
# result_storage_key="{parameters[output_path]}", |
||||
|
# ) |
||||
|
# def create_animation( |
||||
|
# images: List[bytes], output_path: str, duration: float = 0.5 |
||||
|
# ) -> None: |
||||
|
# if not images: |
||||
|
# raise ValueError("No images!") |
||||
|
# pil_images = [Image.open(BytesIO(img_data)).convert("RGB") for img_data in images] |
||||
|
# imageio.mimsave(output_path, pil_images, duration=duration) |
||||
|
# return output_path |
||||
|
|
||||
|
|
||||
|
def make_even_dimensions(image): |
||||
|
width, height = image.size |
||||
|
if width % 2 == 1: |
||||
|
width -= 1 |
||||
|
if height % 2 == 1: |
||||
|
height -= 1 |
||||
|
return image.resize((width, height), Image.ANTIALIAS) |
||||
|
|
||||
|
|
||||
|
def crop_to_even(image): |
||||
|
width, height = image.size |
||||
|
# Adjust width and height to be even |
||||
|
if width % 2 == 1: |
||||
|
width -= 1 |
||||
|
if height % 2 == 1: |
||||
|
height -= 1 |
||||
|
return image.crop((0, 0, width, height)) |
||||
|
|
||||
|
|
||||
|
@task( |
||||
|
cache_key_fn=out_path_hash, |
||||
|
cache_expiration=timedelta(hours=4), |
||||
|
result_storage_key="{parameters[output_path]}", |
||||
|
) |
||||
|
def create_mp4_animation(images: List[bytes], output_path: str, fps: int = 24) -> None: |
||||
|
# Convert bytes to PIL images and then to numpy arrays |
||||
|
frames = [ |
||||
|
np.array(crop_to_even(Image.open(BytesIO(img_data)).convert("RGB"))) |
||||
|
for img_data in images |
||||
|
] |
||||
|
|
||||
|
# Create a video clip from the image sequence |
||||
|
clip = ImageSequenceClip(frames, fps=fps) |
||||
|
|
||||
|
# Write the video clip to a file |
||||
|
clip.write_videofile( |
||||
|
output_path, |
||||
|
codec="libx264", |
||||
|
ffmpeg_params=["-pix_fmt", "yuv420p"], |
||||
|
preset="medium", |
||||
|
bitrate="800k", |
||||
|
) |
||||
|
|
||||
|
return output_path |
||||
|
|
||||
|
|
||||
|
def format_output_name(url: str, latest: bool = False): |
||||
|
if latest: |
||||
|
now = "latest" |
||||
|
else: |
||||
|
now = datetime.now().strftime("%Y%m%d-%H:%M:%S") |
||||
|
return ( |
||||
|
url.replace("https://", "") |
||||
|
.replace("http://", "") |
||||
|
.replace("/", "-") |
||||
|
.replace(".", "_") |
||||
|
+ now |
||||
|
) |
||||
|
|
||||
|
|
||||
|
@task( |
||||
|
name="animate", |
||||
|
retries=0, |
||||
|
retry_delay_seconds=1, |
||||
|
log_prints=True, |
||||
|
cache_key_fn=task_input_hash, |
||||
|
cache_expiration=timedelta(minutes=3), |
||||
|
) |
||||
|
def animate( |
||||
|
url: str = "https://services.swpc.noaa.gov/images/animations/geospace/density/", |
||||
|
ext: str = "png", |
||||
|
latest: bool = True, |
||||
|
limit: int = 0, |
||||
|
): |
||||
|
urls = get_file_links(url, ext) |
||||
|
if len(urls) == 0: |
||||
|
raise ValueError("No urls scraped") |
||||
|
images = get_images(list(sorted(urls)), limit=limit) |
||||
|
if len(images) == 0: |
||||
|
raise ValueError("No images retrieved.") |
||||
|
print(f"Retrieved {len(images)} images.") |
||||
|
sorted_images = sorted(images, key=lambda x: extract_timestamp_from_url(x[0])) |
||||
|
print("Head:") |
||||
|
print([u for u, i in sorted_images[:5]]) |
||||
|
frames = [s[1] for s in sorted_images] |
||||
|
# create_animation(frames, "out.gif", duration=5) |
||||
|
out_name = format_output_name(url, latest=latest) |
||||
|
create_mp4_animation(frames, f"out/{out_name}.mp4") |
||||
|
|
||||
|
|
||||
|
def deploy_name(): |
||||
|
return datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") + "Z" |
||||
|
|
||||
|
|
||||
|
@flow( |
||||
|
name="create-animations", |
||||
|
retries=0, |
||||
|
retry_delay_seconds=1, |
||||
|
log_prints=True, |
||||
|
flow_run_name=None, |
||||
|
timeout_seconds=90, |
||||
|
) |
||||
|
def create_animations( |
||||
|
url: str | List[str] = BASE_URL + "velocity/", |
||||
|
ext: str | None = None, |
||||
|
latest: bool = False, |
||||
|
limit: int = 0, |
||||
|
): |
||||
|
if isinstance(url, str): |
||||
|
url = [url] |
||||
|
|
||||
|
futures = animate.map(url, ext, latest, limit) |
||||
|
return futures |
||||
|
|
||||
|
|
||||
|
if __name__ == "__main__": |
||||
|
# make_animation.from_source( |
||||
|
# source=TEST_REPO, |
||||
|
# entrypoint="noaa_animate.py:make_animation", |
||||
|
# ).deploy( |
||||
|
# name="noaa-animate", work_pool_name="process" |
||||
|
# ) |
||||
|
|
||||
|
from prefect.client.schemas.schedules import CronSchedule |
||||
|
|
||||
|
sched = CronSchedule(cron="*/15 * * * *", timezone="America/Denver") |
||||
|
|
||||
|
LINKS = [ |
||||
|
BASE_URL + "density/", |
||||
|
BASE_URL + "velocity/", |
||||
|
BASE_URL + "pressure/", |
||||
|
] |
||||
|
sched_params = { |
||||
|
"latest": True, |
||||
|
"url": LINKS, |
||||
|
"ext": "png", |
||||
|
"limit": 0, |
||||
|
} |
||||
|
create_animations.serve( |
||||
|
"noaa-animate", limit=8, schedule=None, parameters=sched_params |
||||
|
) |
||||
|
# make_animation(url) |
Loading…
Reference in new issue