Source code for pycoupler.run

import os
from datetime import datetime
from subprocess import run, Popen, PIPE, CalledProcessError
from pycoupler.config import read_config

import multiprocessing as mp


def operate_lpjml(config_file, std_to_file=False):
    """Run LPJmL using a generated (class LpjmlConfig) config file.
    Similar to R function `lpjmlKit::run_lpjml`.

    Parameters
    ----------
    config_file : str
        File name including path if not current to config_file
    std_to_file : bool, optional
        If True, stdout and stderr are written to files in the output folder.
        Defaults to False.
    """

    config = read_config(config_file)

    if not os.path.isdir(config.model_path):
        raise ValueError(f"Folder of model_path '{config.model_path}' does not exist!")

    output_path = f"{config.sim_path}/output/{config.sim_name}"

    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M")
    stdout_file = os.path.join(output_path, f"stdout_{timestamp}.log")
    stderr_file = os.path.join(output_path, f"stderr_{timestamp}.log")

    if not os.path.isdir(output_path):
        os.makedirs(output_path)
        print(f"Created output_path '{output_path}'")

    cmd = [f"{config.model_path}/bin/lpjml", config_file]
    # environment settings to be used for interartive LPJmL sessions
    #   MPI settings conflict with (e.g. on login node)
    os.environ["I_MPI_DAPL_UD"] = "disable"
    os.environ["I_MPI_FABRICS"] = "shm:shm"
    os.environ["I_MPI_DAPL_FABRIC"] = "shm:sh"
    if std_to_file:
        with open(stdout_file, "w") as f_out, open(stderr_file, "w") as f_err:
            with Popen(
                cmd,
                stdout=f_out,
                stderr=f_err,
                bufsize=1,
                universal_newlines=True,
                cwd=config.model_path,
            ) as p:
                p.wait()
    else:
        with Popen(
            cmd,
            stdout=PIPE,
            stderr=PIPE,
            bufsize=1,
            universal_newlines=True,
            cwd=config.model_path,
        ) as p:
            for line in p.stdout:
                print(line, end="")
            for line in p.stderr:
                print(line, end="")

    # reset default MPI settings to be able to submit jobs in parallel again
    os.environ["I_MPI_DAPL_UD"] = "enable"
    os.environ["I_MPI_FABRICS"] = "shm:dapl"
    del os.environ["I_MPI_DAPL_FABRIC"]
    # raise error if returncode does not reflect successfull call
    if p.returncode != 0:
        raise CalledProcessError(p.returncode, p.args)


[docs] def run_lpjml(config_file, std_to_file=False): """Run LPJmL using a generated (class LpjmlConfig) config file. Similar to R function `lpjmlKit::run_lpjml`. Parameters ---------- config_file : str File name including path if not current to config_file std_to_file : bool, optional If True, stdout and stderr are written to files in the output folder. Defaults to False. """ run = mp.Process(target=operate_lpjml, args=(config_file, std_to_file)) run.start() return run
[docs] def submit_lpjml( config_file, group="copan", sclass="short", ntasks=256, wtime=None, dependency=None, blocking=None, option=None, couple_to=None, venv_path=None, ): """Submit LPJmL run to Slurm using `lpjsubmit` and a generated (class LpjmlConfig) config file. Provide arguments for Slurm sbatch depending on the run. Similar to R function `lpjmlKit::submit_lpjml`. Parameters ---------- config_file : str File name including path if not current to config_file group : str, optional PIK group name to be used for Slurm. Defaults to "copan". sclass : str, optional Define the job classification, options are "short", "medium", "long", "priority", "standby", "io". For more information have a look at <https://www.pik-potsdam.de/en>. Defaults to `"short"`. ntasks : int/str, optional Define the number of tasks/threads. More information at <https://www.pik-potsdam.de/en> and <https://slurm.schedmd.com/sbatch.html>. Defaults to 256. wtime : str, optional Define the time limit. Setting a lower time limit than the maximum runtime for `sclass` can reduce the wait time in the SLURM job queue. More information at <https://www.pik-potsdam.de/en> and <https://slurm.schedmd.com/sbatch.html>. dependency : int/str, optional If there is a job that should be processed first (e.g. spinup) then pass its job id here. blocking : int, optional Cores to be blocked. More information at <https://www.pik-potsdam.de/en> and <https://slurm.schedmd.com/sbatch.html>. option : str/list, optional Additional options to be passed to lpjsubmit. Can be a string or a list of strings. couple_to : str, optional Path to program/model/script LPJmL should be coupled to venv_path : str, optional Path to a venv to run the coupled script in. This should be the path to the top folder of the venv. If not set, `python3` in PATH is used. Returns ------- str The submitted jobs id if submitted successfully. """ config = read_config(config_file) if not os.path.isdir(config.model_path): raise ValueError(f"Folder of model_path '{config.model_path}' does not exist!") output_path = f"{config.sim_path}/output/{config.sim_name}" timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M") stdout_file = os.path.join(output_path, f"stdout_{timestamp}.log") stderr_file = os.path.join(output_path, f"stderr_{timestamp}.log") if not os.path.isdir(output_path): os.makedirs(output_path) print(f"Created output_path '{output_path}'") lpjroot = os.environ.get("LPJROOT") # prepare lpjsubmit command to be called via subprocess cmd = [f"{config.model_path}/bin/lpjsubmit"] # specify sbatch arguments required by lpjsubmit internally cmd.extend( [ "-group", group, "-class", sclass, "-o", stdout_file, "-e", stderr_file, ] # noqa: E501 ) # if dependency (jobid) defined, submit is queued by slurm with nocheck if dependency: cmd.extend(["-nocheck", "-dependency", str(dependency)]) # processing time to get a better position in slurm queue if wtime: cmd.extend(["-wtime", str(wtime)]) # if cores to be blocked if blocking: cmd.extend(["-blocking", str(blocking)]) if option: if isinstance(option, str): cmd.extend(["-option", option]) elif isinstance(option, list): for opt in option: cmd.extend(["-option", opt]) # run in coupled mode and pass coupling program/model if couple_to: python_path = "python3" if venv_path: python_path = os.path.join(venv_path, "bin/python") if not os.path.isfile(python_path): raise FileNotFoundError( f"venv path contains no python binary at '{python_path}'." ) bash_script = f"""#!/bin/bash # Define the path to the config file config_file="{config_file}" # Call the Python script with the config file as an argument {python_path} {couple_to} $config_file """ couple_file = f"{output_path}/copan_lpjml.sh" with open(couple_file, "w") as file: file.write(bash_script) # Change the permissions of the file to make it executable run(["chmod", "+x", couple_file]) cmd.extend(["-couple", couple_file]) cmd.extend([str(ntasks), config_file]) # Intialize submit_status in higher scope submit_status = None # set LPJROOT to model_path to be able to call lpjsubmit try: os.environ["LPJROOT"] = config.model_path # call lpjsubmit via subprocess and return status if successfull submit_status = run(cmd, capture_output=True) except Exception as e: print("Error occurred:", e) finally: if lpjroot: os.environ["LPJROOT"] = lpjroot else: del os.environ["LPJROOT"] # print stdout and stderr if not successful if submit_status is None: raise Exception("Process was not submitted.") elif submit_status.returncode == 0: print(submit_status.stdout.decode("utf-8")) else: print(submit_status.stdout.decode("utf-8")) print(submit_status.stderr.decode("utf-8")) raise CalledProcessError(submit_status.returncode, submit_status.args) # return job id return ( submit_status.stdout.decode("utf-8") .split("Submitted batch job ")[1] .split("\n")[0] )
[docs] def check_lpjml(config_file): """Check if config file is set correctly. Parameters ---------- config_file : str File name (including path) to generated config json file. model_path : str Path to `LPJmL_internal` (lpjml repository) """ config = read_config(config_file) if not os.path.isdir(config.model_path): raise ValueError(f"Folder of model_path '{config.model_path}' does not exist!") if os.path.isfile(f"{config.model_path}/bin/lpjcheck"): proc_status = run( ["./bin/lpjcheck", config_file], capture_output=True, # "-param", cwd=config.model_path, ) if proc_status.returncode == 0: print(proc_status.stdout.decode("utf-8")) else: print(proc_status.stdout.decode("utf-8")) print(proc_status.stderr.decode("utf-8"))