Source code for execute.subprocess_interface

"""
Module
------

    subprocess_interface.py

Description
-----------

    This module provides an interface to launch supported job types
    for the respective platform using the Python subprocess library.

Functions
---------

    __job_info__(job_type, app=None)

        This function defines the launcher and task attributes for the
        job type specified upon entry.

    __launch__(cmd, infile, errlog, outlog)

        This function launches the command string arguments (`cmd`)
        and writes the output and error to the `outlog` and `errlog`
        paths; if `errlog` and/or `outlog` are NoneType, they default
        to `err.log` and/or `out.log`, respectively.

    run(exe, job_type="slurm", ntasks=1, args=None, infile=None, errlog=None,
        outlog=None, multi_prog=False, multi_prog_conf=None)

        This function launches a job application in accordance with
        the parameters received upon entry.

Author(s)
---------

    Henry R. Winterbottom; 31 December 2022

History
-------

    2022-12-31: Henry Winterbottom -- Initial implementation.

"""

# ----

# pylint: disable=broad-except
# pylint: disable=consider-using-with
# pylint: disable=raise-missing-from
# pylint: disable=too-many-arguments
# pylint: disable=too-many-branches

# ----

import glob
import re
import subprocess
from typing import List, Tuple

from tools import system_interface
from utils.exceptions_interface import SubprocessInterfaceError
from utils.logger_interface import Logger

# ----

# Define all available module properties.
__all__ = ["run"]

# ----

logger = Logger(caller_name=__name__)

# ----

# Define the supported job types.
job_types_list = ["app", "python", "slurm"]

# ----


def __job_info__(job_type: str, app: str = None) -> Tuple[str, str]:
    """
    Description
    -----------

    This function defines the launcher and task attributes for the job
    type specified upon entry.

    Parameters
    ----------

    job_type: ``str``

        A Python string specifying the job type.

    Keywords
    --------

    app: ``str``

        A Python string specifying the application name; required only
        if job_type is "app" upon entry.

    Returns
    -------

    launcher: ``str``

        A Python string specifying the path to the respective
        supported job type application launcher for the respective
        platform.

    ntasks: ``str``

        A Python string specifying the number of tasks attribute for
        the respective application launcher.

    Raises
    ------

    SubprocessInterfaceError:

        - raised if the job type specified upon entry is not
          supported.

        - raised if an exception is encountered while determing the
          application launcher; this is currently only relative to
          SLURM.

        - raised if the launcher for the respective job type is
          NoneType following assignment.

    """

    # Check that the job type is supported; proceed accordingly.
    if job_type.lower() not in job_types_list:
        msg = f"The job type {job_type.lower()} is not supported. Aborting!!!"
        raise SubprocessInterfaceError(msg=msg)
    msg = f"Configuring {job_type} type jobs."
    logger.info(msg=msg)

    # Define the job attributes for application job types.
    if job_type.lower() == "app":
        (launcher, tasks) = [app, None]

    # Define the job attributes for Python job types.
    if job_type.lower() == "python":
        launcher = system_interface.get_app_path(app="python")
        tasks = None

    # Define job attributes for SLURM workload scheduler job types.
    if job_type.lower() == "slurm":
        try:
            launcher = system_interface.get_app_path(app="srun")
            tasks = "--ntasks"
        except Exception as error:
            msg = (
                "Determining the application launcher for SLURM "
                f"failed with error {error}. Aborting!!!"
            )
            raise SubprocessInterfaceError(msg=msg)

    # Check that the launcher type has been determined from the run
    # time environment; proceed accordingly.
    if job_type.lower() != "app":
        if launcher is None:
            msg = (
                f"The path for job type {job_type} launcher could not be "
                "determined for the respective platform. Aborting!!!"
            )
            raise SubprocessInterfaceError(msg=msg)

    return (launcher, tasks)


# ----


def __launch__(cmd: List, infile: str, errlog: str, outlog: str) -> int:
    """
    Description
    -----------

    This function launches the command string arguments (`cmd`) and
    writes the output and error to the `outlog` and `errlog paths`; if
    `errlog` and/or `outlog` are NoneType, they default to `err.log` and/or
    `out.log`, respectively.

    Parameters
    ----------

    cmd: ``List``

        A Python list containing the SLURM commands for launching the
        respective executable task.

    infile: ``str``

        A Python string specifying the path to a file to be opened and
        used as input to the respective executable; if NoneType upon
        entry, the stdin argument to the subprocess Popen object is
        ignored; this parameter may also contain wildcard (e.g., *)
        values; if wildcards are present, the Python glob library is
        used to collect the relevant files and the collected files are
        then appended to the command string; for wildcard instances,
        the parameter shell is passed to the subprocess object and set
        to `True`.

    errlog: ``str``

        A Python string specifying the path to the error-output (e.g.,
        `stderr`) information; if NoneType upon entry, the `stderr` is
        written to `err.log`.

    outlog: ``str``

        A Python string specifying the path to the standard-output
        (e.g., `stdout`) information; if NoneType upon entry, the `stdout`
        is written to `out.log`.

    Returns
    -------

    returncode: ``int``

        A Python integer specifying the return code provided by the
        subprocess command.

    Raises
    ------

    SubprocessInterfaceError:

        - raised if an exception is encountered during the launch of
          the respective application.

    """

    # Define and open the stdout (standard out) and stderr (standard
    # error) file paths.
    if outlog is None:
        outlog = "out.log"
    stdout = open(outlog, "w", encoding="utf-8")
    if errlog is None:
        errlog = "err.log"
    stderr = open(errlog, "w", encoding="utf-8")

    # Check whether the executable input file contains any
    # wildcard values.
    if infile is not None:
        has_wildcards = re.search(r"[^.]\*", infile)
        if has_wildcards:
            stdin = glob.glob(infile)
        if not has_wildcards:
            stdin = open(infile, "r", encoding="utf-8")

    # Launch the respective executable and proceed accordingly.
    try:
        if infile is None:
            proc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr)
        if infile is not None:
            # Build the command line arguments assuming that
            # wildcard values are included.
            if has_wildcards:
                # Build the command line arguments for the
                # respective application.
                cmd_string = str()
                for item in cmd:
                    cmd_string = cmd_string + f"{item} "
                for item in stdin:
                    cmd_string = cmd_string + f"{item} "

                # Execute the application accordingly.
                proc = subprocess.Popen(
                    cmd_string, stdout=stdout, stderr=stderr, shell=True
                )

            # Build the command line arguments assuming that no
            # wildcard values are included.
            if not has_wildcards:
                proc = subprocess.Popen(cmd, stdin=stdin, stdout=stdout, stderr=stderr)

        # Launch the executable and proceed accordingly.
        proc.wait()
        proc.communicate()
        returncode = proc.returncode
    except Exception as msg:
        raise SubprocessInterfaceError(msg=msg)

    # Close the stdout and stderr files and proceed accordingly.
    stderr.close()
    stdout.close()
    if (infile is not None) and (not has_wildcards):
        stdin.close()
    if returncode != 0:
        msg = (
            f"Executable failed! Please refer to {errlog} for more "
            "information. Aborting!!!"
        )
        raise SubprocessInterfaceError(msg=msg)

    return returncode


# ----


[docs]def run( exe: str, job_type: str = "slurm", ntasks: int = 1, args: List = None, infile: str = None, errlog: str = None, outlog: str = None, multi_prog: bool = False, multi_prog_conf: str = None, ) -> int: """ Description ----------- This function launches a job application in accordance with the parameters received upon entry. Parameters ---------- exe: ``str`` A Python string specifying the path to the application to be launched. Keywords -------- job_type: ``str``, optional A Python string specifying the job type; this applies to the workload scheduler (e.g., SLURM, PBS, SGE, etc.,). ntasks: ``int``, optional A Python integer specifying the number of compute tasks; if NoneType upon entry, a value of 1 is assumed. args: ``List``, optional A Python list of arguments to be passed to the executable via the command line parser; if NoneType upon entry, no command line arguments are assumed. infile: ``str``, optional A Python string specifying the path to a file to be open and used as input to the respective executable; if NoneType upon entry, the stdin argument to the subprocess `Popen` object is ignored. outlog: ``str``, optional A Python string specifying the path to the standard-output (e.g., `stdout`) information; if NoneType upon entry, the `stdout` is written to `out.log`. errlog: ``str``, optional A Python string specifying the path to the error-output (e.g., `stderr`) information; if NoneType upon entry, the stderr is written to `err.log`. multi_prog: ``bool``, optional A Python boolean valued variable specifying whether to implement the SLURM `multi_prog` capabilities for the respective task; if `True`, `multi_prog_conf` must be specified; note that is not (yet) supported for MVAPICH at run-time configurations/executables. multi_prog_conf: ``str``, optional A Python string specifying the path to the file containing the SLURM `multi_prog` directives; if `multi_prog` (above) is `True`, this value is required; note that is not (yet) supported for MVAPICH run-time configurations/executables. Returns ------- returncode: ``int`` A Python integer specifying the return code provided by the subprocess command. Raises ------ SubprocessInterfaceError: - raised if a specified configuration is not supported. """ # Check that the parameter and keyword values are valid; proceed # accordingly. if job_type.lower() != "slurm": # Reset the parameter and keywork values accordingly. if multi_prog: msg = ( "Multiple program support is not available for " "workload managers other than SLURM; resetting value " "for multi_prog to False; this may cause some unexpected " "results." ) logger.warn(msg=msg) multi_prog_conf = None # Define the command line arguments for the respective # launcher application; proceed accordingly. cmd = [] (launcher, tasks) = __job_info__(job_type=job_type, app=exe) # Define the launcher for the respective job type; proceed # accordingly. if launcher is None: msg = "The launcher application cannot be NoneType upon entry. Aborting!!!" raise SubprocessInterfaceError(msg=msg) if launcher is not None: if tasks is None: cmd.append(f"{launcher}") if tasks is not None: for item in [f"{launcher}", f"{tasks}", f"{ntasks}"]: cmd.append(item) if job_type.lower() != "app": cmd.append(exe) # Check that the multi-prog capabilities are supported; proceed # accordingly; currently this is only supported for SLURM job # types. if multi_prog: if multi_prog_conf is None: msg = ( "For multiple program support (e.g., multi_prog implementation) " ", a configuration file containing the multi-task paritioning " f"required; got {multi_prog_conf} for parameter multi_prog_conf " "upon entry. Aborting!!!" ) raise SubprocessInterfaceError(msg=msg) for item in ["--multi-prog", f"{multi_prog_conf}", f"{exe}"]: cmd.append(item) if not multi_prog: if args is not None: for item in args: cmd.append(f"{item}") # Remove any NoneType instances from the command line string. cmd = list(item for item in cmd if item is not None) # Launch the respective application. returncode = __launch__(cmd=cmd, infile=infile, errlog=errlog, outlog=outlog) return returncode