diff --git a/metaseq/launcher/slurm.py b/metaseq/launcher/slurm.py index fd633b61965d17a0a7155a8e7bd5c201e0f2d2ab..ea5fdd593aab8aaf90fd3a61e02fb82fd745c57c 100644 --- a/metaseq/launcher/slurm.py +++ b/metaseq/launcher/slurm.py @@ -8,6 +8,7 @@ import fnmatch import hashlib import itertools import os +import logging import random import shlex import shutil @@ -18,8 +19,8 @@ from pathlib import Path import metaseq from metaseq.utils import get_random_port -from metaseq.launcher.tombyard import tombstones from metaseq.launcher.sweep import get_env_from_args +from metaseq.launcher.tombyard import tombstones try: import metaseq_internal @@ -29,6 +30,8 @@ try: except ImportError: has_internal = False +logger: logging.Logger = logging.getLogger() + def main(get_grid, postprocess_hyperparams, args): def dry_run(msg): @@ -251,6 +254,7 @@ def gen_srun_command_and_str(args, save_dir_key, train_log, train_stderr, train_ if args.salloc: excluded_hosts = os.environ.get("EXCLUDED_HOSTS", None) included_hosts = os.environ.get("INCLUDED_HOSTS", None) + included_hosts_file = os.environ.get("INCLUDED_HOSTS_FILE", None) base_srun_cmd += [ "--nodes", str(args.num_nodes), @@ -263,7 +267,9 @@ def gen_srun_command_and_str(args, save_dir_key, train_log, train_stderr, train_ ] base_srun_cmd += ["-x", excluded_hosts] if excluded_hosts is not None else [] base_srun_cmd += ["-w", included_hosts] if included_hosts is not None else [] - + base_srun_cmd += ( + ["-F", included_hosts_file] if included_hosts_file is not None else [] + ) srun_cmd = base_srun_cmd + train_cmd srun_cmd_str = " ".join(map(shlex.quote, srun_cmd)) if getattr(args, "requeue_on_fail", False): @@ -284,6 +290,7 @@ def gen_sbatch_command_and_str( ): excluded_hosts = os.environ.get("EXCLUDED_HOSTS", None) included_hosts = os.environ.get("INCLUDED_HOSTS", None) + included_hosts_file = os.environ.get("INCLUDED_HOSTS_FILE", None) sbatch_cmd = [ "sbatch", "--job-name", @@ -331,6 +338,7 @@ def gen_sbatch_command_and_str( sbatch_cmd += ["--qos", "high"] sbatch_cmd += ["-x", excluded_hosts] if excluded_hosts is not None else [] sbatch_cmd += ["-w", included_hosts] if included_hosts is not None else [] + sbatch_cmd += ["-F", included_hosts_file] if included_hosts_file is not None else [] wrapped_cmd = requeue_support() if args.azure: @@ -445,11 +453,14 @@ def launch_train(args, grid, grid_product, dry_run, postprocess_hyperparams): subprocess.check_output( f"ln -fs {abs_oss} {save_dir}/snapshot_public", shell=True ) + # clone base env and update for this job, e.g., we set WANDB_RUN_ID # based on the save_dir, which is based on the current hyperparam values env = base_env.copy() env["METASEQ_SAVE_DIR"] = save_dir - + env["METASEQ_OSS_DESTINATION"] = os.path.abspath( + os.path.abspath(oss_destination) + ) # generate train command train_cmd = gen_train_command( args, @@ -505,7 +516,7 @@ def launch_train(args, grid, grid_product, dry_run, postprocess_hyperparams): print("Launched {}".format(job_id)) if hasattr(args, "tombstonable"): if args.tombstonable: - tombstones(job_id=job_id, base_dir=args.base_directory) + tombstones(job_id=job_id, base_dir=args.tombstoning_superdir) def has_finished(save_dir): diff --git a/metaseq/launcher/sweep.py b/metaseq/launcher/sweep.py index b6afd443d229caf46c02c220a0aaf9f97850f430..555cbe273a94f1bf3a62948226f814cd6a446e18 100644 --- a/metaseq/launcher/sweep.py +++ b/metaseq/launcher/sweep.py @@ -209,6 +209,14 @@ def _get_args(add_extra_options_func=None, input_args: Optional[List[str]] = Non "this can be a file with the steps, or a string. some placeholders such as " "{job_dir} will be replaced", ) + parser.add_argument( + "--tombstonable", + type=bool, + default=False, + help="make the job killable by writing a " + "tombstone 'tombstone_<job_id>' file in any subdir of the checkpoint default" + "(/shared/home/$USER for azure)", + ) # Env flags parser.add_argument("--azure", action="store_true", help="running on azure") @@ -251,17 +259,6 @@ def _get_args(add_extra_options_func=None, input_args: Optional[List[str]] = Non default=None, # None will default to save_dir/tb help="save tensorboard logs in <tensorboard-logdir>/<prefix>.<save_dir_key>", ) - parser.add_argument( - "-ts", - "--tombstonable", - type=bool, - default=False, - help=( - "make the job killable by writing a " - "tombstone 'tombstone_<job_id>' file to user's home directory " - "(/shared/home/$USER)" - ), - ) if add_extra_options_func is not None: # mutates parser add_extra_options_func(parser) @@ -367,8 +364,8 @@ def _modify_arg_defaults_based_on_env(env, args): if args.local_checkpoints_dir is None: args.local_checkpoints_dir = default_local_checkpoints_dir - # assign base directory - args.base_directory = default_prefix + # assign tombstoning dir + args.tombstoning_superdir = default_prefix def main( diff --git a/metaseq/launcher/tombyard.py b/metaseq/launcher/tombyard.py index a8e73dbc645537cad3a23ed4f4abdfe678f805f4..39bdd2c25dbef96f85242f6586a19fc008559b6f 100644 --- a/metaseq/launcher/tombyard.py +++ b/metaseq/launcher/tombyard.py @@ -72,7 +72,7 @@ def tombstones_procedure( f"remove the file {tombstone_name} within the next {period_before_tombstone_detected} " f"for it not to trigger the same command again " ) - _ = os.popen(f"scontrol requeuehold {job_id}").read() + _ = os.popen(f"scontrol requeuehold {job_id} ").read() for tombstone_name in dirstones["requeuehold"]: if os.path.exists(tombstone_name): print( @@ -87,7 +87,12 @@ def tombstones_procedure( time.sleep(period.total_seconds()) -def tombstones(job_id, base_dir, period=datetime.timedelta(seconds=60), dirstones=None): +def tombstones( + job_id, + base_dir, + period=datetime.timedelta(seconds=60), + dirstones=None, +): if dirstones is None: dirstones = {"scancel": [], "requeuehold": [], "release": []} for userdir in os.listdir(base_dir):