chunk data gen

This commit is contained in:
mm 2023-05-05 05:41:52 +00:00
parent 1e38ce04c9
commit 9083e9d6e1

View File

@ -105,34 +105,37 @@ def calculate_distance(pair):
distance = get_distance(city1["name"], city2["name"]) distance = get_distance(city1["name"], city2["name"])
return city1["name"], city2["name"], distance return city1["name"], city2["name"], distance
def main(): def main():
cities = list(us_cities.values()) cities = list(us_cities.values())
print(f"Num cities: {len(cities)}") print(f"Num cities: {len(cities)}")
city_combinations = list(itertools.combinations(cities, 2)) city_combinations = list(itertools.combinations(cities, 2))
chunk_size = 800 # adjust this as needed
num_chunks = len(city_combinations) // chunk_size + 1
output_file = args.output_file
with open(args.output_file, "w", newline="") as csvfile: with open(output_file, "w", newline="") as csvfile:
fieldnames = ["city_from", "city_to", "distance"] fieldnames = ["city_from", "city_to", "distance"]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader() writer.writeheader()
try: try:
executor = concurrent.futures.ProcessPoolExecutor(max_workers=args.workers) executor = concurrent.futures.ProcessPoolExecutor(max_workers=args.workers)
# results = executor.map(calculate_distance, city_combinations) for i in range(num_chunks):
futures = { chunk = city_combinations[i * chunk_size : (i + 1) * chunk_size]
executor.submit(calculate_distance, pair): pair futures = {
for pair in city_combinations executor.submit(calculate_distance, pair): pair for pair in chunk
} }
for future in as_completed(futures): for future in as_completed(futures):
city_from, city_to, distance = future.result() city_from, city_to, distance = future.result()
if distance is not None: if distance is not None:
writer.writerow( writer.writerow(
{ {
"city_from": city_from, "city_from": city_from,
"city_to": city_to, "city_to": city_to,
"distance": distance, "distance": distance,
} }
) )
csvfile.flush() # write to disk immediately
except KeyboardInterrupt: except KeyboardInterrupt:
print("Interrupted. Terminating processes...") print("Interrupted. Terminating processes...")
executor.shutdown(wait=False) executor.shutdown(wait=False)