Compare commits

..

1 Commits

Author SHA1 Message Date
Michael Pilosov
4d9ad9ddb8 updates for threading 2024-06-01 17:57:48 -06:00
10 changed files with 310 additions and 60 deletions

View File

@ -25,8 +25,8 @@ RUN chown -R user:user /home/user
USER user
# Copy the requirements file to /tmp and install Python dependencies with user flag
RUN python -m pip install --upgrade pip
COPY --chown=user:user requirements.txt /tmp/requirements.txt
RUN python -m pip install --upgrade pip
RUN pip install --no-cache-dir --user -r /tmp/requirements.txt
# APPLICATION SETUP
@ -35,8 +35,8 @@ RUN pip install --no-cache-dir --user -r /tmp/requirements.txt
COPY --chown=user:user profiles.default.toml /home/user/.prefect/profiles.toml
# Copy the application files
COPY --chown=user:user app ./app
COPY --chown=user:user noaa_animate.py .
COPY --chown=user:user app ./
# COPY --chown=user:user noaa_animate.py .
COPY --chown=user:user start.sh .
COPY --chown=user:user init_db.py .
RUN chmod +x start.sh

View File

@ -1,3 +1,5 @@
import threading
from flows import create_animations
import logging
import os
import re
@ -10,7 +12,7 @@ from prefect.deployments import run_deployment
PORT = 9021
PROTO = "https"
BARE = False
BARE = True
app = Flask(__name__)
@ -68,7 +70,7 @@ def handle_api():
@app.route("/videos/<path:filename>")
def custom_static(filename):
return send_from_directory("../out", filename)
return send_from_directory("./out", filename)
@app.route("/", methods=["GET"])
@ -150,4 +152,5 @@ def proxy(url="", proto=PROTO):
if __name__ == "__main__":
threading.Thread(target=create_animations.serve, daemon=True, kwargs={"name": "noaa-animate", "limit": 8}).start()
app.run(host="0.0.0.0", port=9021, debug=True)

264
app/flows.py Normal file
View File

@ -0,0 +1,264 @@
import os
import re
from datetime import datetime, timedelta
from io import BytesIO
from typing import Dict, Iterator, List
import httpx
# import imageio
import numpy as np
from moviepy.editor import ImageSequenceClip
from PIL import Image
from prefect import flow, task
from prefect.tasks import task_input_hash
BASE_URL = "https://services.swpc.noaa.gov/images/animations/geospace/"
@task(
retries=3,
retry_delay_seconds=5,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(minutes=2),
log_prints=True,
)
def get_file_links(url: str, ext: str | None = None) -> Iterator[str]:
response = httpx.get(url)
response.raise_for_status()
webpage_content = response.text
if ext is None:
print("Extension not supplied. Inferring (less efficient) png/jpg/jpeg")
exts = ["png", "jpg", "jpeg"]
else:
exts = [ext.lower()]
lines = webpage_content.split("\n")
for line in lines:
for ext in exts:
if ext in line: # need to parse the href link
start_pos = line.find('href="') + len('href="')
end_pos = line.find('"', start_pos)
href = line[start_pos:end_pos]
if href.endswith(f"latest.{ext}"):
print("Skipping latest")
continue
if href.endswith(ext):
if not href.startswith("http"):
href = url + href
yield href
break # Exit the inner loop to avoid duplicate yields for multiple exts
def url_tail_hash(context, parameters):
# return a constant
return parameters["url"].split("/")[-1]
def out_path_hash(context, parameters):
return parameters["output_path"] + f"_L{len(parameters['images'])}"
@task(
retries=5,
retry_delay_seconds=1,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(minutes=5),
result_storage_key="{parameters[url]}",
)
def get_content(url: str, params: Dict[str, any] | None = None):
response = httpx.get(f"https://{url}", params=params)
try:
response.raise_for_status()
return response.content
except httpx.HTTPStatusError:
return None
def preview_urls(urls):
print("URLS (head):")
print(urls[:5])
print("URLS (tail):")
print(urls[-5:])
@task(
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
)
def get_images(urls: List[str] | List[str], limit: int = 0):
if limit > 0:
print(f"Limiting to {limit} urls")
urls = urls[-limit:]
urls = [url.replace("https://", "").replace("http://", "") for url in urls]
preview_urls(urls)
futures = get_content.map(urls)
images = [
(urls[i], f.result()) for i, f in enumerate(futures) if f.result() is not None
]
return images
def extract_timestamp_from_url(url: str) -> str:
# Assuming the timestamp format is in the format shown in the screenshot
match = re.search(r"\d{8}_\d{6}", url)
return match.group(0) if match else ""
# @task(
# cache_key_fn=out_path_hash,
# cache_expiration=timedelta(minutes=3),
# result_storage_key="{parameters[output_path]}",
# )
# def create_animation(
# images: List[bytes], output_path: str, duration: float = 0.5
# ) -> None:
# if not images:
# raise ValueError("No images!")
# pil_images = [Image.open(BytesIO(img_data)).convert("RGB") for img_data in images]
# imageio.mimsave(output_path, pil_images, duration=duration)
# return output_path
def make_even_dimensions(image):
width, height = image.size
if width % 2 == 1:
width -= 1
if height % 2 == 1:
height -= 1
return image.resize((width, height), Image.ANTIALIAS)
def crop_to_even(image):
width, height = image.size
# Adjust width and height to be even
if width % 2 == 1:
width -= 1
if height % 2 == 1:
height -= 1
return image.crop((0, 0, width, height))
@task(
cache_key_fn=out_path_hash,
cache_expiration=timedelta(hours=4),
result_storage_key="{parameters[output_path]}",
)
def create_mp4_animation(images: List[bytes], output_path: str, fps: int = 24) -> None:
# Convert bytes to PIL images and then to numpy arrays
frames = [
np.array(crop_to_even(Image.open(BytesIO(img_data)).convert("RGB")))
for img_data in images
]
# Create a video clip from the image sequence
clip = ImageSequenceClip(frames, fps=fps)
# Write the video clip to a file
clip.write_videofile(
output_path,
codec="libx264",
ffmpeg_params=["-pix_fmt", "yuv420p"],
preset="medium",
bitrate="800k",
)
return output_path
def format_output_name(url: str, latest: bool = False):
if latest:
now = "latest"
else:
now = datetime.now().strftime("%Y%m%d-%H:%M:%S")
return (
url.replace("https://", "")
.replace("http://", "")
.replace("/", "-")
.replace(".", "_")
+ now
)
@task(
name="animate",
retries=0,
retry_delay_seconds=1,
log_prints=True,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(minutes=3),
)
def animate(
url: str = "https://services.swpc.noaa.gov/images/animations/geospace/density/",
ext: str = "png",
latest: bool = True,
limit: int = 0,
):
urls = get_file_links(url, ext)
if len(urls) == 0:
raise ValueError("No urls scraped")
images = get_images(list(sorted(urls)), limit=limit)
if len(images) == 0:
raise ValueError("No images retrieved.")
print(f"Retrieved {len(images)} images.")
sorted_images = sorted(images, key=lambda x: extract_timestamp_from_url(x[0]))
print("Head:")
print([u for u, i in sorted_images[:5]])
frames = [s[1] for s in sorted_images]
# create_animation(frames, "out.gif", duration=5)
out_name = format_output_name(url, latest=latest)
create_mp4_animation(frames, f"out/{out_name}.mp4")
def deploy_name():
return datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S") + "Z"
@flow(
name="create-animations",
retries=0,
retry_delay_seconds=1,
log_prints=True,
flow_run_name=None,
timeout_seconds=90,
)
def create_animations(
url: str | List[str] = BASE_URL + "velocity/",
ext: str | None = None,
latest: bool = False,
limit: int = 0,
):
if isinstance(url, str):
url = [url]
futures = animate.map(url, ext, latest, limit)
return futures
if __name__ == "__main__":
# make_animation.from_source(
# source=TEST_REPO,
# entrypoint="noaa_animate.py:make_animation",
# ).deploy(
# name="noaa-animate", work_pool_name="process"
# )
from prefect.client.schemas.schedules import CronSchedule
sched = CronSchedule(cron="*/15 * * * *", timezone="America/Denver")
LINKS = [
BASE_URL + "density/",
BASE_URL + "velocity/",
BASE_URL + "pressure/",
]
sched_params = {
"latest": True,
"url": LINKS,
"ext": "png",
"limit": 0,
}
create_animations.serve(
"noaa-animate", limit=8, schedule=None, parameters=sched_params
)
# make_animation(url)

View File

@ -1,13 +1,5 @@
start: hypercorn
gevent:
start:
gunicorn --worker-class gevent --bind 0.0.0.0:9021 app:app
gthread:
gunicorn --worker-class gthread --threads 4 --bind 0.0.0.0:9021 app:app
hypercorn:
hypercorn --bind 0.0.0.0:9021 app:app
dev:
python app.py

View File

@ -16,34 +16,40 @@ def initialize_db(db_path):
logging.info(f"{db_path} exists") # Log a message if the database exists
return
try:
with sqlite3.connect(
db_path
) as conn: # Using 'with' to ensure that the connection is closed automatically
# Using 'with' to ensure that the connection is closed automatically
with sqlite3.connect(db_path) as conn:
configure_database(conn)
except sqlite3.Error as e:
logging.error(f"Database error: {e}") # Log any SQLite errors that occur
except Exception as e:
logging.error(
f"Exception in initialize_db: {e}"
) # Log any other exceptions that occur
logging.error(f"Exception in initialize_db: {e}")
def configure_database(conn):
cursor = conn.cursor()
# Setting the journal mode to WAL for better concurrency
cursor.execute("PRAGMA journal_mode = WAL;")
# Setting synchronous to NORMAL for a balance between speed and reliability
cursor.execute("PRAGMA synchronous = NORMAL;")
# Setting a busy timeout to prevent immediate failures when the database is locked
cursor.execute("PRAGMA busy_timeout = 5000;")
# Increasing the cache size to reduce the number of disk I/O operations
cursor.execute("PRAGMA cache_size = -32000;")
# Enabling memory-mapped I/O for potentially faster file operations
cursor.execute("PRAGMA mmap_size = 536870912;")
# Setting locking mode to EXCLUSIVE can enhance performance for single-user scenarios
cursor.execute("PRAGMA locking_mode = EXCLUSIVE;")
# Ensuring foreign key constraints are enforced for data integrity
cursor.execute("PRAGMA foreign_keys = ON;")
# disable legacy alter table behavior as it will cause problems during
# migrations when tables are renamed as references would otherwise be retained
# in some locations
# https://www.sqlite.org/pragma.html#pragma_legacy_alter_table
cursor.execute("PRAGMA legacy_alter_table=OFF")
# when using the WAL, we do need to sync changes on every write. sqlite
# recommends using 'normal' mode which is much faster
# Setting synchronous to NORMAL for a balance between speed and reliability
cursor.execute("PRAGMA synchronous = NORMAL;")
# Increasing the cache size to reduce the number of disk I/O operations
cursor.execute("PRAGMA cache_size = 20000;")
# Enabling memory-mapped I/O for potentially faster file operations
cursor.execute("PRAGMA mmap_size = 536870912;")
# Setting a busy timeout to prevent immediate failures when the database is locked
# setting the value very high allows for more 'concurrency'
# without running into errors, but may result in slow api calls
cursor.execute("PRAGMA busy_timeout = 60000;")
# Setting locking mode to EXCLUSIVE can enhance performance for single-user scenarios
# cursor.execute("PRAGMA locking_mode = EXCLUSIVE;")
conn.commit() # Commit all PRAGMA configurations
logging.info("Set up database with multi-user optimizations.")
@ -55,13 +61,11 @@ def batch_transact(db_path, operations):
db_path
) as conn: # Ensure that the connection is handled properly
cursor = conn.cursor()
cursor.execute(
"BEGIN TRANSACTION;"
) # Start a transaction for batch operations
# Start a transaction for batch operations
cursor.execute("BEGIN TRANSACTION;")
for operation in operations:
cursor.execute(
operation
) # Execute each SQL operation provided in the operations list
# Execute each SQL operation provided in the operations list
cursor.execute(operation)
cursor.execute("COMMIT;") # Commit all operations at once
except sqlite3.Error as e:
logging.error(f"Database error during batch transaction: {e}")
@ -73,9 +77,8 @@ def maintenance(db_path):
try:
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
cursor.execute(
"PRAGMA optimize;"
) # Optimize the database to maintain performance
# Optimize the database to maintain performance
cursor.execute("PRAGMA optimize;")
cursor.execute("VACUUM;") # Reclaim space and defragment the database file
except sqlite3.Error as e:
logging.error(f"Database error during maintenance: {e}")

View File

@ -1,13 +1,5 @@
# -e PREFECT_API_URL=https://4200-01j6qzz2bqsqmdhxc6gpqhvhg2.cloudspaces.litng.ai/api
run: build
docker run --rm -ti --name noaa -e HOST_NAME=localhost -p 9021:9021 -p 4200:4200 noaa
serve: build
docker run --rm -ti --name noaa \
-e LIGHTNING_CLOUDSPACE_HOST=$$LIGHTNING_CLOUDSPACE_HOST \
-p 9021:9021 -p 4200:4200 \
noaa
docker run --rm -ti --name noaa -e LIGHTNING_CLOUDSPACE_HOST=noaa.clfx.cc -p 9021:9021 -p 4200:4200 noaa
build:
docker build -t noaa .

View File

@ -195,7 +195,6 @@ def animate(
limit: int = 0,
):
urls = get_file_links(url, ext)
urls = list(urls)
if len(urls) == 0:
raise ValueError("No urls scraped")
images = get_images(list(sorted(urls)), limit=limit)
@ -260,6 +259,6 @@ if __name__ == "__main__":
"limit": 0,
}
create_animations.serve(
"noaa-animate", limit=8, interval=None, parameters=sched_params
"noaa-animate", limit=8, schedule=None, parameters=sched_params
)
# make_animation(url)

View File

@ -1,7 +1,7 @@
active = "default"
PREFECT_API_URL = "http://0.0.0.0:4200/api"
[profiles.default]
PREFECT_API_URL = "http://0.0.0.0:4200/api"
PREFECT_TASK_SCHEDULING_MAX_SCHEDULED_QUEUE_SIZE = 4
PREFECT_API_SERVICES_SCHEDULER_DEPLOYMENT_BATCH_SIZE = 100

View File

@ -1,5 +1,5 @@
# prefect==2.20.4
prefect==3.0.0rc20
prefect==2.17.1
#prefect==3.0.0rc1
Flask==3.0.3
gunicorn==22.0.0
gevent==24.2.1
@ -7,7 +7,4 @@ moviepy==1.0.3
pillow==10.3.0
requests==2.32.3
httpx==0.27.0
# uvicorn==0.28.1
hypercorn==0.17.3
# imageio==2.34.1
# griffe>=0.20.0,<1.0.0

View File

@ -1,14 +1,14 @@
#!/bin/bash
# Start the web app
cd app && make start &
# Start Prefect in the background
prefect server start --host 0.0.0.0 &
sleep 10
# Start the web app
make &
# Start the deployment
python noaa_animate.py &
python flows.py &
# Wait for all background jobs to finish
wait