Browse Source

updates for threading

upgrade
Michael Pilosov 6 months ago
parent
commit
4d9ad9ddb8
  1. 4
      Dockerfile
  2. 5
      app/app.py
  3. 264
      app/flows.py
  4. 45
      init_db.py
  5. 6
      start.sh

4
Dockerfile

@ -35,8 +35,8 @@ RUN pip install --no-cache-dir --user -r /tmp/requirements.txt
COPY --chown=user:user profiles.default.toml /home/user/.prefect/profiles.toml COPY --chown=user:user profiles.default.toml /home/user/.prefect/profiles.toml
# Copy the application files # Copy the application files
COPY --chown=user:user app ./app COPY --chown=user:user app ./
COPY --chown=user:user noaa_animate.py . # COPY --chown=user:user noaa_animate.py .
COPY --chown=user:user start.sh . COPY --chown=user:user start.sh .
COPY --chown=user:user init_db.py . COPY --chown=user:user init_db.py .
RUN chmod +x start.sh RUN chmod +x start.sh

5
app/app.py

@ -1,3 +1,5 @@
import threading
from flows import create_animations
import logging import logging
import os import os
import re import re
@ -68,7 +70,7 @@ def handle_api():
@app.route("/videos/<path:filename>") @app.route("/videos/<path:filename>")
def custom_static(filename): def custom_static(filename):
return send_from_directory("../out", filename) return send_from_directory("./out", filename)
@app.route("/", methods=["GET"]) @app.route("/", methods=["GET"])
@ -150,4 +152,5 @@ def proxy(url="", proto=PROTO):
if __name__ == "__main__": if __name__ == "__main__":
threading.Thread(target=create_animations.serve, daemon=True, kwargs={"name": "noaa-animate", "limit": 8}).start()
app.run(host="0.0.0.0", port=9021, debug=True) app.run(host="0.0.0.0", port=9021, debug=True)

264
app/flows.py

@ -0,0 +1,264 @@
import os
import re
from datetime import datetime, timedelta
from io import BytesIO
from typing import Dict, Iterator, List
import httpx
# import imageio
import numpy as np
from moviepy.editor import ImageSequenceClip
from PIL import Image
from prefect import flow, task
from prefect.tasks import task_input_hash
BASE_URL = "https://services.swpc.noaa.gov/images/animations/geospace/"
@task(
retries=3,
retry_delay_seconds=5,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(minutes=2),
log_prints=True,
)
def get_file_links(url: str, ext: str | None = None) -> Iterator[str]:
response = httpx.get(url)
response.raise_for_status()
webpage_content = response.text
if ext is None:
print("Extension not supplied. Inferring (less efficient) png/jpg/jpeg")
exts = ["png", "jpg", "jpeg"]
else:
exts = [ext.lower()]
lines = webpage_content.split("\n")
for line in lines:
for ext in exts:
if ext in line: # need to parse the href link
start_pos = line.find('href="') + len('href="')
end_pos = line.find('"', start_pos)
href = line[start_pos:end_pos]
if href.endswith(f"latest.{ext}"):
print("Skipping latest")
continue
if href.endswith(ext):
if not href.startswith("http"):
href = url + href
yield href
break # Exit the inner loop to avoid duplicate yields for multiple exts
def url_tail_hash(context, parameters):
# return a constant
return parameters["url"].split("/")[-1]
def out_path_hash(context, parameters):
return parameters["output_path"] + f"_L{len(parameters['images'])}"
@task(
retries=5,
retry_delay_seconds=1,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(minutes=5),
result_storage_key="{parameters[url]}",
)
def get_content(url: str, params: Dict[str, any] | None = None):
response = httpx.get(f"https://{url}", params=params)
try:
response.raise_for_status()
return response.content
except httpx.HTTPStatusError:
return None
def preview_urls(urls):
print("URLS (head):")
print(urls[:5])
print("URLS (tail):")
print(urls[-5:])
@task(
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
)
def get_images(urls: List[str] | List[str], limit: int = 0):
if limit > 0:
print(f"Limiting to {limit} urls")
urls = urls[-limit:]
urls = [url.replace("https://", "").replace("http://", "") for url in urls]
preview_urls(urls)
futures = get_content.map(urls)
images = [
(urls[i], f.result()) for i, f in enumerate(futures) if f.result() is not None
]
return images
def extract_timestamp_from_url(url: str) -> str:
# Assuming the timestamp format is in the format shown in the screenshot
match = re.search(r"\d{8}_\d{6}", url)
return match.group(0) if match else ""
# @task(
# cache_key_fn=out_path_hash,
# cache_expiration=timedelta(minutes=3),
# result_storage_key="{parameters[output_path]}",
# )
# def create_animation(
# images: List[bytes], output_path: str, duration: float = 0.5
# ) -> None:
# if not images:
# raise ValueError("No images!")
# pil_images = [Image.open(BytesIO(img_data)).convert("RGB") for img_data in images]
# imageio.mimsave(output_path, pil_images, duration=duration)
# return output_path
def make_even_dimensions(image):
width, height = image.size
if width % 2 == 1:
width -= 1
if height % 2 == 1:
height -= 1
return image.resize((width, height), Image.ANTIALIAS)
def crop_to_even(image):
width, height = image.size
# Adjust width and height to be even
if width % 2 == 1:
width -= 1
if height % 2 == 1:
height -= 1
return image.crop((0, 0, width, height))
@task(
cache_key_fn=out_path_hash,
cache_expiration=timedelta(hours=4),
result_storage_key="{parameters[output_path]}",
)
def create_mp4_animation(images: List[bytes], output_path: str, fps: int = 24) -> None:
# Convert bytes to PIL images and then to numpy arrays
frames = [
np.array(crop_to_even(Image.open(BytesIO(img_data)).convert("RGB")))
for img_data in images
]
# Create a video clip from the image sequence
clip = ImageSequenceClip(frames, fps=fps)
# Write the video clip to a file
clip.write_videofile(
output_path,
codec="libx264",
ffmpeg_params=["-pix_fmt", "yuv420p"],
preset="medium",
bitrate="800k",
)
return output_path
def format_output_name(url: str, latest: bool = False):
if latest:
now = "latest"
else:
now = datetime.now().strftime("%Y%m%d-%H:%M:%S")
return (
url.replace("https://", "")
.replace("http://", "")
.replace("/", "-")
.replace(".", "_")
+ now
)
@task(
name="animate",
retries=0,
retry_delay_seconds=1,
log_prints=True,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(minutes=3),
)
def animate(
url: str = "https://services.swpc.noaa.gov/images/animations/geospace/density/",
ext: str = "png",
latest: bool = True,
limit: int = 0,
):
urls = get_file_links(url, ext)
if len(urls) == 0:
raise ValueError("No urls scraped")
images = get_images(list(sorted(urls)), limit=limit)
if len(images) == 0:
raise ValueError("No images retrieved.")
print(f"Retrieved {len(images)} images.")
sorted_images = sorted(images, key=lambda x: extract_timestamp_from_url(x[0]))
print("Head:")
print([u for u, i in sorted_images[:5]])
frames = [s[1] for s in sorted_images]
# create_animation(frames, "out.gif", duration=5)
out_name = format_output_name(url, latest=latest)
create_mp4_animation(frames, f"out/{out_name}.mp4")
def deploy_name():
return datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") + "Z"
@flow(
name="create-animations",
retries=0,
retry_delay_seconds=1,
log_prints=True,
flow_run_name=None,
timeout_seconds=90,
)
def create_animations(
url: str | List[str] = BASE_URL + "velocity/",
ext: str | None = None,
latest: bool = False,
limit: int = 0,
):
if isinstance(url, str):
url = [url]
futures = animate.map(url, ext, latest, limit)
return futures
if __name__ == "__main__":
# make_animation.from_source(
# source=TEST_REPO,
# entrypoint="noaa_animate.py:make_animation",
# ).deploy(
# name="noaa-animate", work_pool_name="process"
# )
from prefect.client.schemas.schedules import CronSchedule
sched = CronSchedule(cron="*/15 * * * *", timezone="America/Denver")
LINKS = [
BASE_URL + "density/",
BASE_URL + "velocity/",
BASE_URL + "pressure/",
]
sched_params = {
"latest": True,
"url": LINKS,
"ext": "png",
"limit": 0,
}
create_animations.serve(
"noaa-animate", limit=8, schedule=None, parameters=sched_params
)
# make_animation(url)

45
init_db.py

@ -16,34 +16,40 @@ def initialize_db(db_path):
logging.info(f"{db_path} exists") # Log a message if the database exists logging.info(f"{db_path} exists") # Log a message if the database exists
return return
try: try:
with sqlite3.connect( # Using 'with' to ensure that the connection is closed automatically
db_path with sqlite3.connect(db_path) as conn:
) as conn: # Using 'with' to ensure that the connection is closed automatically
configure_database(conn) configure_database(conn)
except sqlite3.Error as e: except sqlite3.Error as e:
logging.error(f"Database error: {e}") # Log any SQLite errors that occur logging.error(f"Database error: {e}") # Log any SQLite errors that occur
except Exception as e: except Exception as e:
logging.error( logging.error(f"Exception in initialize_db: {e}")
f"Exception in initialize_db: {e}"
) # Log any other exceptions that occur
def configure_database(conn): def configure_database(conn):
cursor = conn.cursor() cursor = conn.cursor()
# Setting the journal mode to WAL for better concurrency # Setting the journal mode to WAL for better concurrency
cursor.execute("PRAGMA journal_mode = WAL;") cursor.execute("PRAGMA journal_mode = WAL;")
# Ensuring foreign key constraints are enforced for data integrity
cursor.execute("PRAGMA foreign_keys = ON;")
# disable legacy alter table behavior as it will cause problems during
# migrations when tables are renamed as references would otherwise be retained
# in some locations
# https://www.sqlite.org/pragma.html#pragma_legacy_alter_table
cursor.execute("PRAGMA legacy_alter_table=OFF")
# when using the WAL, we do need to sync changes on every write. sqlite
# recommends using 'normal' mode which is much faster
# Setting synchronous to NORMAL for a balance between speed and reliability # Setting synchronous to NORMAL for a balance between speed and reliability
cursor.execute("PRAGMA synchronous = NORMAL;") cursor.execute("PRAGMA synchronous = NORMAL;")
# Setting a busy timeout to prevent immediate failures when the database is locked
cursor.execute("PRAGMA busy_timeout = 5000;")
# Increasing the cache size to reduce the number of disk I/O operations # Increasing the cache size to reduce the number of disk I/O operations
cursor.execute("PRAGMA cache_size = -32000;") cursor.execute("PRAGMA cache_size = 20000;")
# Enabling memory-mapped I/O for potentially faster file operations # Enabling memory-mapped I/O for potentially faster file operations
cursor.execute("PRAGMA mmap_size = 536870912;") cursor.execute("PRAGMA mmap_size = 536870912;")
# Setting a busy timeout to prevent immediate failures when the database is locked
# setting the value very high allows for more 'concurrency'
# without running into errors, but may result in slow api calls
cursor.execute("PRAGMA busy_timeout = 60000;")
# Setting locking mode to EXCLUSIVE can enhance performance for single-user scenarios # Setting locking mode to EXCLUSIVE can enhance performance for single-user scenarios
cursor.execute("PRAGMA locking_mode = EXCLUSIVE;") # cursor.execute("PRAGMA locking_mode = EXCLUSIVE;")
# Ensuring foreign key constraints are enforced for data integrity
cursor.execute("PRAGMA foreign_keys = ON;")
conn.commit() # Commit all PRAGMA configurations conn.commit() # Commit all PRAGMA configurations
logging.info("Set up database with multi-user optimizations.") logging.info("Set up database with multi-user optimizations.")
@ -55,13 +61,11 @@ def batch_transact(db_path, operations):
db_path db_path
) as conn: # Ensure that the connection is handled properly ) as conn: # Ensure that the connection is handled properly
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute( # Start a transaction for batch operations
"BEGIN TRANSACTION;" cursor.execute("BEGIN TRANSACTION;")
) # Start a transaction for batch operations
for operation in operations: for operation in operations:
cursor.execute( # Execute each SQL operation provided in the operations list
operation cursor.execute(operation)
) # Execute each SQL operation provided in the operations list
cursor.execute("COMMIT;") # Commit all operations at once cursor.execute("COMMIT;") # Commit all operations at once
except sqlite3.Error as e: except sqlite3.Error as e:
logging.error(f"Database error during batch transaction: {e}") logging.error(f"Database error during batch transaction: {e}")
@ -73,9 +77,8 @@ def maintenance(db_path):
try: try:
with sqlite3.connect(db_path) as conn: with sqlite3.connect(db_path) as conn:
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute( # Optimize the database to maintain performance
"PRAGMA optimize;" cursor.execute("PRAGMA optimize;")
) # Optimize the database to maintain performance
cursor.execute("VACUUM;") # Reclaim space and defragment the database file cursor.execute("VACUUM;") # Reclaim space and defragment the database file
except sqlite3.Error as e: except sqlite3.Error as e:
logging.error(f"Database error during maintenance: {e}") logging.error(f"Database error during maintenance: {e}")

6
start.sh

@ -1,14 +1,14 @@
#!/bin/bash #!/bin/bash
# Start the web app
cd app && make &
# Start Prefect in the background # Start Prefect in the background
prefect server start --host 0.0.0.0 & prefect server start --host 0.0.0.0 &
sleep 10 sleep 10
# Start the web app
make &
# Start the deployment # Start the deployment
python noaa_animate.py & python flows.py &
# Wait for all background jobs to finish # Wait for all background jobs to finish
wait wait

Loading…
Cancel
Save