import os
import re
from datetime import datetime , timedelta
from io import BytesIO
from typing import Dict , Iterator , List
import httpx
# import imageio
import numpy as np
from moviepy . editor import ImageSequenceClip
from PIL import Image
from prefect import flow , task
from prefect . tasks import task_input_hash
BASE_URL = " https://services.swpc.noaa.gov/images/animations/geospace/ "
@task (
retries = 3 ,
retry_delay_seconds = 5 ,
cache_key_fn = task_input_hash ,
cache_expiration = timedelta ( minutes = 2 ) ,
log_prints = True ,
)
def get_file_links ( url : str , ext : str | None = None ) - > Iterator [ str ] :
response = httpx . get ( url )
response . raise_for_status ( )
webpage_content = response . text
if ext is None :
print ( " Extension not supplied. Inferring (less efficient) png/jpg/jpeg " )
exts = [ " png " , " jpg " , " jpeg " ]
else :
exts = [ ext . lower ( ) ]
lines = webpage_content . split ( " \n " )
for line in lines :
for ext in exts :
if ext in line : # need to parse the href link
start_pos = line . find ( ' href= " ' ) + len ( ' href= " ' )
end_pos = line . find ( ' " ' , start_pos )
href = line [ start_pos : end_pos ]
if href . endswith ( f " latest. { ext } " ) :
print ( " Skipping latest " )
continue
if href . endswith ( ext ) :
if not href . startswith ( " http " ) :
href = url + href
yield href
break # Exit the inner loop to avoid duplicate yields for multiple exts
def url_tail_hash ( context , parameters ) :
# return a constant
return parameters [ " url " ] . split ( " / " ) [ - 1 ]
def out_path_hash ( context , parameters ) :
return parameters [ " output_path " ] + f " _L { len ( parameters [ ' images ' ] ) } "
@task (
retries = 5 ,
retry_delay_seconds = 1 ,
cache_key_fn = task_input_hash ,
cache_expiration = timedelta ( minutes = 5 ) ,
result_storage_key = " {parameters[url]} " ,
)
def get_content ( url : str , params : Dict [ str , any ] | None = None ) :
response = httpx . get ( f " https:// { url } " , params = params )
try :
response . raise_for_status ( )
return response . content
except httpx . HTTPStatusError :
return None
def preview_urls ( urls ) :
print ( " URLS (head): " )
print ( urls [ : 5 ] )
print ( " URLS (tail): " )
print ( urls [ - 5 : ] )
@task (
cache_key_fn = task_input_hash ,
cache_expiration = timedelta ( hours = 1 ) ,
)
def get_images ( urls : List [ str ] | List [ str ] , limit : int = 0 ) :
if limit > 0 :
print ( f " Limiting to { limit } urls " )
urls = urls [ - limit : ]
urls = [ url . replace ( " https:// " , " " ) . replace ( " http:// " , " " ) for url in urls ]
preview_urls ( urls )
futures = get_content . map ( urls )
images = [
( urls [ i ] , f . result ( ) ) for i , f in enumerate ( futures ) if f . result ( ) is not None
]
return images
def extract_timestamp_from_url ( url : str ) - > str :
# Assuming the timestamp format is in the format shown in the screenshot
match = re . search ( r " \ d {8} _ \ d {6} " , url )
return match . group ( 0 ) if match else " "
# @task(
# cache_key_fn=out_path_hash,
# cache_expiration=timedelta(minutes=3),
# result_storage_key="{parameters[output_path]}",
# )
# def create_animation(
# images: List[bytes], output_path: str, duration: float = 0.5
# ) -> None:
# if not images:
# raise ValueError("No images!")
# pil_images = [Image.open(BytesIO(img_data)).convert("RGB") for img_data in images]
# imageio.mimsave(output_path, pil_images, duration=duration)
# return output_path
def make_even_dimensions ( image ) :
width , height = image . size
if width % 2 == 1 :
width - = 1
if height % 2 == 1 :
height - = 1
return image . resize ( ( width , height ) , Image . ANTIALIAS )
def crop_to_even ( image ) :
width , height = image . size
# Adjust width and height to be even
if width % 2 == 1 :
width - = 1
if height % 2 == 1 :
height - = 1
return image . crop ( ( 0 , 0 , width , height ) )
@task (
cache_key_fn = out_path_hash ,
cache_expiration = timedelta ( hours = 4 ) ,
result_storage_key = " {parameters[output_path]} " ,
)
def create_mp4_animation ( images : List [ bytes ] , output_path : str , fps : int = 24 ) - > None :
# Convert bytes to PIL images and then to numpy arrays
frames = [
np . array ( crop_to_even ( Image . open ( BytesIO ( img_data ) ) . convert ( " RGB " ) ) )
for img_data in images
]
# Create a video clip from the image sequence
clip = ImageSequenceClip ( frames , fps = fps )
# Write the video clip to a file
clip . write_videofile (
output_path ,
codec = " libx264 " ,
ffmpeg_params = [ " -pix_fmt " , " yuv420p " ] ,
preset = " medium " ,
bitrate = " 800k " ,
)
return output_path
def format_output_name ( url : str , latest : bool = False ) :
if latest :
now = " latest "
else :
now = datetime . now ( ) . strftime ( " % Y % m %d - % H: % M: % S " )
return (
url . replace ( " https:// " , " " )
. replace ( " http:// " , " " )
. replace ( " / " , " - " )
. replace ( " . " , " _ " )
+ now
)
@task (
name = " animate " ,
retries = 0 ,
retry_delay_seconds = 1 ,
log_prints = True ,
cache_key_fn = task_input_hash ,
cache_expiration = timedelta ( minutes = 3 ) ,
)
def animate (
url : str = " https://services.swpc.noaa.gov/images/animations/geospace/density/ " ,
ext : str = " png " ,
latest : bool = True ,
limit : int = 0 ,
) :
urls = get_file_links ( url , ext )
urls = list ( urls )
if len ( urls ) == 0 :
raise ValueError ( " No urls scraped " )
images = get_images ( list ( sorted ( urls ) ) , limit = limit )
if len ( images ) == 0 :
raise ValueError ( " No images retrieved. " )
print ( f " Retrieved { len ( images ) } images. " )
sorted_images = sorted ( images , key = lambda x : extract_timestamp_from_url ( x [ 0 ] ) )
print ( " Head: " )
print ( [ u for u , i in sorted_images [ : 5 ] ] )
frames = [ s [ 1 ] for s in sorted_images ]
# create_animation(frames, "out.gif", duration=5)
out_name = format_output_name ( url , latest = latest )
create_mp4_animation ( frames , f " out/ { out_name } .mp4 " )
def deploy_name ( ) :
return datetime . utcnow ( ) . strftime ( " % Y- % m- %d T % H: % M: % S " ) + " Z "
@flow (
name = " create-animations " ,
retries = 0 ,
retry_delay_seconds = 1 ,
log_prints = True ,
flow_run_name = None ,
timeout_seconds = 90 ,
)
def create_animations (
url : str | List [ str ] = BASE_URL + " velocity/ " ,
ext : str | None = None ,
latest : bool = False ,
limit : int = 0 ,
) :
if isinstance ( url , str ) :
url = [ url ]
futures = animate . map ( url , ext , latest , limit )
return futures
if __name__ == " __main__ " :
# make_animation.from_source(
# source=TEST_REPO,
# entrypoint="noaa_animate.py:make_animation",
# ).deploy(
# name="noaa-animate", work_pool_name="process"
# )
from prefect . client . schemas . schedules import CronSchedule
sched = CronSchedule ( cron = " */15 * * * * " , timezone = " America/Denver " )
links = [
BASE_URL + " density/ " ,
BASE_URL + " velocity/ " ,
BASE_URL + " pressure/ " ,
]
sched_params = {
" latest " : True ,
" url " : links ,
" ext " : " png " ,
" limit " : 0 ,
}
create_animations . serve (
" noaa-animate " , limit = 8 , interval = None , parameters = sched_params
)
# make_animation(url)