|
@@ -31,14 +31,14 @@ host, host_data_path, local_working_path, local_metadata_path, local_data_path =
|
|
|
|
|
|
r = subprocess.run(["ssh", host, "bash", "-c", f"IFS=$'\n'; ls {host_data_path}"], stdout=subprocess.PIPE, check=True)
|
|
r = subprocess.run(["ssh", host, "bash", "-c", f"IFS=$'\n'; ls {host_data_path}"], stdout=subprocess.PIPE, check=True)
|
|
|
|
|
|
-available = {f for f in r.stdout.decode().split('\n') if f}
|
|
|
|
|
|
+available_files = {f for f in r.stdout.decode().split('\n') if f}
|
|
|
|
|
|
# There's better ways to list a dir locally, but using bash & ls again reduces possible formatting discrepencies.
|
|
# There's better ways to list a dir locally, but using bash & ls again reduces possible formatting discrepencies.
|
|
r = subprocess.run(["bash", "-c", f"IFS=$'\n'; ls {local_metadata_path}"], stdout=subprocess.PIPE, check=True)
|
|
r = subprocess.run(["bash", "-c", f"IFS=$'\n'; ls {local_metadata_path}"], stdout=subprocess.PIPE, check=True)
|
|
|
|
|
|
-processed = {f for f in r.stdout.decode().split('\n') if f}
|
|
|
|
|
|
+processed_files = {f for f in r.stdout.decode().split('\n') if f}
|
|
|
|
|
|
-new = available - processed
|
|
|
|
|
|
+new_files = available_files - processed_files
|
|
|
|
|
|
def process_file(new_file: str) -> None:
|
|
def process_file(new_file: str) -> None:
|
|
# Be super cautious about empty file names, wouldn't want to `rm -rf` a folder by accident
|
|
# Be super cautious about empty file names, wouldn't want to `rm -rf` a folder by accident
|
|
@@ -53,17 +53,17 @@ def process_file(new_file: str) -> None:
|
|
try:
|
|
try:
|
|
subprocess.run(["mv", f'{local_working_path}/{new_file}', f'{local_data_path}'], check=True)
|
|
subprocess.run(["mv", f'{local_working_path}/{new_file}', f'{local_data_path}'], check=True)
|
|
except:
|
|
except:
|
|
- subprocess.run(["rm", f'{local_metadata_path}/{new_file}'], check=True)
|
|
|
|
|
|
+ subprocess.run(["rm", f'{local_metadata_path}/{new_file}'], check=False)
|
|
raise
|
|
raise
|
|
|
|
|
|
subprocess.run(["rm", "-rf", f'{local_working_path}/{new_file}'], check=True)
|
|
subprocess.run(["rm", "-rf", f'{local_working_path}/{new_file}'], check=True)
|
|
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
|
|
- future_to_new_files = {executor.submit(process_file, new_file): new_file for new_file in new}
|
|
|
|
|
|
+ future_to_new_files = {executor.submit(process_file, new_file): new_file for new_file in new_files}
|
|
for future in concurrent.futures.as_completed(future_to_new_files):
|
|
for future in concurrent.futures.as_completed(future_to_new_files):
|
|
new_file = future_to_new_files[future]
|
|
new_file = future_to_new_files[future]
|
|
try:
|
|
try:
|
|
data = future.result()
|
|
data = future.result()
|
|
print(f"Processed {new_file}")
|
|
print(f"Processed {new_file}")
|
|
except Exception as exc:
|
|
except Exception as exc:
|
|
- print('%r generated an exception: %s' % (new_file, exc))
|
|
|
|
|
|
+ print(f"{new_file} generated an exception: {exc}")
|