Support Parallel Tasks

Xylar Asay-Davis
date: 2017/02/22

Summary

Currently, the full analysis suite includes 7 tasks, 5 for the ocean and 2 for sea ice. The number of tasks is expected to grow over time. Task parallelism in some form is needed to allow as many tasks as desired to be run simultaneiously. Successful completion of this design will mean that the analysis suite produces identical results to the current develop branch but that several analysis tasks (a number selected by the user) run simultaneously.

Requirements

Requirement: Tasks run simultaneously
Date last modified: 2017/02/22
Contributors: Xylar Asay-Davis

There must be a mechanism for running more than one analysis task simultaneously.

Requirement: Select maximum number of tasks
Date last modified: 2017/02/22
Contributors: Xylar Asay-Davis

There must be a mechanism for the user to select the maximum number of tasks to run simultaneously. This might be necessary to control the number of processors or the amount of memory used on a given machine or (in the case of running analysis on login nodes) to be nice to other users on a shared resource.

Requirement: Lock files written by multiple tasks
Date last modified: 2017/03/10
Contributors: Xylar Asay-Davis

There must be a mechanism for locking files (during either reading or writing) if they can be written by multiple tasks. This is necessary to prevent cases where multiple tasks write to the same file simultaneously or one task reads from a file at the same time another is writing.

Consideration: Task parallelism should work on either login or compute nodes
Date last modified: 2017/02/22
Contributors: Xylar Asay-Davis

On some systems, care needs to be taken that scripts run on the compute nodes rather than on the management node(s). For example, on Edison and Cori, the `aprun` command is required to ensure that scripts run on compute nodes.

Consideration: There may need to be a way to limit the memory used by a task
Date last modified: 2017/02/22
Contributors: Phillip J. Wolfram, Xylar Asay-Davis

It may be that `xarray-dask` with subprocess (or similar) may need to be some initialization of xarray corresponding to reduced memory available. For example, with 10 processes on a node, `xarray` / `dask` should be initialized to use only 1/10th of the memory and CPUs per task. `xarray-dask` may require special initialization for efficiency and to avoid crashes.

Algorithmic Formulations

Design solution: Tasks run simultaneously
Date last modified: 2017/02/23
Contributors: Xylar Asay-Davis

I propose to have a config option, parallelTaskCount, that is the number of concurrent tasks that are to be performed. If this flag is set to a number greater than 1, analysis tasks will run concurrently. To accomplish this, I propose to use subprocess.call or one of its variants within run_analysis.pyto call itself but with only one task at a time. Thus, if run_analysis.py gets called with only a single task (whether directly from the command line or through subprocess.call), it would execute that task without spawning additional subprocesses.

This approach would require having a method for creating a list of individual tasks to be performed, launching parallelTaskCount of those tasks, and then waiting for them to complete, launching additional tasks as previous tasks complete. The approach would also require individual log files for each task, each stored in the log directory (already a config option).

Design solution: Select maximum number of tasks
Date last modified: 2017/02/23
Contributors: Xylar Asay-Davis

This is accomplished with the parallelTaskCount flag above. A value of parallelTaskCount = 1 would indicate serial execution, though likely still via launching subprocesses for each task.

The command subprocess.Popen allows enough flexibility that it will be possible to launch several jobs, andthen to farm out additional jobs as each returns. It should be possible to use a combination of os.kill(pid, 0), which checks if a process is running, and os.waitpid(-1,0), which waits for any subprocess to finish, to accomplish launching several processes and waiting until the first one finishes before launching the next task, or in pseudo-code:

processes = launchTasks(taskNames[0:taskCount])
remainingTasks = taskNames[taskCount:]
while len(processes) > 0:
    process = waitForTask(processes)
    processes.pop(process)
    if len(remainingTasks) > 0:
        process = launchTasks(remainingTasks[0])
        proceses.append(process)
        remainingTasks = remainingTasks[1:]

Output from the main run_analysis.py task will list which analysis tasks were run and which completed successfully. The full analysis will exit with an error if one task fails, but only after attempting to run all desired analysis tasks. This allows the failure of one analysis task not to interrupt execution of other analyses.

In a future PR, this work can be expanded to include checking if the appropriate analysis member (AM) was turned on during the run and skipping any analysis tasks that depend on that AM if not (Issue #58).

Design solution: Lock files written by multiple tasks
Date last modified: 2017/03/10
Contributors: Xylar Asay-Davis

Teh design solution is based on the process lock in the fasteners package: http://fasteners.readthedocs.io/en/latest/examples.html#interprocess-locks

Currently, only mapping files should be written by multiple tasks, requiring locks.

The algorithm consists of 2 changes. First, I removed the option overwriteMappingFiles, which is now always False—if a mapping file exists, it is not overwritten. This was necessary because now only one task will write a given mapping file if it doesn’t already exist and the other tasks will wait for it to be written. Then, all tasks know there is a valid mapping file that they can read without having to lock the file.

The second change was to add a lock around the subprocess call to ESMF_RegridWeightGen that make sure only one process generates the mapping file. Each process attempts to acquire the lock and checks if the mapping file already exists once it acquires the lock. If not, it generates the mapping file and releases the lock. If so, it just releases the lock and moves on. Thus, only the first process to acquire the lock generates the mapping file and the others wait until it is finished.

Design solution: Task parallelism should work on either login or compute nodes
Date last modified: 2017/02/23
Contributors: Xylar Asay-Davis

For the time being, I propose to address only task parallelism on the login nodes and to extend the parallelism to work robustly on compute nodes as a separate project. Nevertheless, I will seek to implement this design in a way that should be conducive to this later extension. Likely what will be required is a robust way of adding a prefix to the commandline (e.g. `aprun -np 1`) when calling subprocesses. Adding such a prefix should be relatively simple.

Design solution: There may need to be a way to limit the memory used by a task
Date last modified: 2017/02/23
Contributors: Xylar Asay-Davis

I am not very familiar with `dask` within `xarray` and I do not intend to address this consideration directly in this project. However, on my brief investigation, it seems like the proper way to handle this may be to have a `chunk` config option either for all tasks or for individual tasks that can be used to control the size of data in memory. I think such an approach can be investigated in parallel to this project. An intermediate solution for situations where memory is limited would be to set `parallelTaskCount` to a small number.

Design and Implementation

This design has been implemented in the test branch https://github.com/xylar/MPAS-Analysis/tree/parallel_tasks

Implementation: Tasks run simultaneously
Date last modified: 2017/03/10
Contributors: Xylar Asay-Davis

Tasks can now run in parallel. This has been implemented in these 4 functions within run_analysis.py:

def run_parallel_tasks(config, analyses, configFiles, taskCount):
    # {{{
    """
    Run this script once each for several parallel tasks.

    Author: Xylar Asay-Davis
    Last Modified: 03/08/2017
    """

    taskNames = [analysisModule.get_task_name(**kwargs) for
                 analysisModule, kwargs in analyses]

    taskCount = min(taskCount, len(taskNames))

    (processes, logs) = launch_tasks(taskNames[0:taskCount], config,
                                     configFiles)
    remainingTasks = taskNames[taskCount:]
    while len(processes) > 0:
        (taskName, process) = wait_for_task(processes)
        if process.returncode == 0:
            print "Task {} has finished successfully.".format(taskName)
        else:
            print "ERROR in task {}.  See log file {} for details".format(
                taskName, logs[taskName].name)
        logs[taskName].close()
        # remove the process from the process dictionary (no need to bother)
        processes.pop(taskName)

        if len(remainingTasks) > 0:
            (process, log) = launch_tasks(remainingTasks[0:1], config,
                                          configFiles)
            # merge the new process and log into these dictionaries
            processes.update(process)
            logs.update(log)
            remainingTasks = remainingTasks[1:]
    # }}}


def launch_tasks(taskNames, config, configFiles):  # {{{
    """
    Launch one or more tasks

    Author: Xylar Asay-Davis
    Last Modified: 03/08/2017
    """
    thisFile = os.path.realpath(__file__)

    logsDirectory = build_config_full_path(config, 'output',
                                           'logsSubdirectory')
    make_directories(logsDirectory)

    commandPrefix = config.getWithDefault('execute', 'commandPrefix',
                                          default='')
    if commandPrefix == '':
        commandPrefix = []
    else:
        commandPrefix = commandPrefix.split(' ')

    processes = {}
    logs = {}
    for taskName in taskNames:
        args = commandPrefix + [thisFile, '--generate', taskName] + configFiles

        logFileName = '{}/{}.log'.format(logsDirectory, taskName)

        # write the command to the log file
        logFile = open(logFileName, 'w')
        logFile.write('Command: {}\n'.format(' '.join(args)))
        # make sure the command gets written before the rest of the log
        logFile.flush()
        print 'Running {}'.format(taskName)
        process = subprocess.Popen(args, stdout=logFile,
                                   stderr=subprocess.STDOUT)
        processes[taskName] = process
        logs[taskName] = logFile

    return (processes, logs)  # }}}


def wait_for_task(processes):  # {{{
    """
    Wait for the next process to finish and check its status.  Returns both the
    task name and the process that finished.

    Author: Xylar Asay-Davis
    Last Modified: 03/08/2017
    """

    # first, check if any process has already finished
    for taskName, process in processes.iteritems():  # python 2.7!
        if(not is_running(process)):
            return (taskName, process)

    # No process has already finished, so wait for the next one
    (pid, status) = os.waitpid(-1, 0)
    for taskName, process in processes.iteritems():
        if pid == process.pid:
            process.returncode = status
            # since we used waitpid, this won't happen automatically
            return (taskName, process)  # }}}


def is_running(process):  # {{{
    """
    Returns whether a given process is currently running

    Author: Xylar Asay-Davis
    Last Modified: 03/08/2017
    """

    try:
        os.kill(process.pid, 0)
    except OSError:
        return False
    else:
        return True  # }}}

Implementation: Select maximum number of tasks
Date last modified: 2017/03/10
Contributors: Xylar Asay-Davis

There is a configuration option, parallelTaskCount, which defaults to 1, meaning tasks run in serial:

[execute]
## options related to executing parallel tasks

# the number of parallel tasks (1 means tasks run in serial, the default)
parallelTaskCount = 8

Implementation: Lock files written by multiple tasks
Date last modified: 2017/03/10
Contributors: Xylar Asay-Davis

Here is the code for locking the mapping file within shared.interpolation.interpolate:

import fasteners
...
# lock the weights file in case it is being written by another process
with fasteners.InterProcessLock(_get_lock_path(outWeightFileName)):
    # make sure another process didn't already create the mapping file in
    # the meantime
    if not os.path.exists(outWeightFileName):
        # make sure any output is flushed before we add output from the
        # subprocess
        subprocess.check_call(args)

Implementation: Task parallelism should work on either login or compute nodes
Date last modified: 2017/03/10
Contributors: Xylar Asay-Davis

I have included a config option commandPrefix that should be able to be used to run the analysis on compute nodes. If the command prefix is empty, the code should run as normal on the compute nodes.

[execute]
## options related to executing parallel tasks

# the number of parallel tasks (1 means tasks run in serial, the default)
parallelTaskCount = 1

# Prefix on the commnd line before a parallel task (e.g. 'srun -n 1 python')
# Default is no prefix (run_analysis.py is executed directly)
commandPrefix = srun -n 1

Implementation: There may need to be a way to limit the memory used by a task
Date last modified: 2017/03/10
Contributors: Xylar Asay-Davis

As mentioned above, I have not addressed this consideration in this project. Currently, the suggested approach would be to limit parallelTaskCount to a number of tasks that does not cause memory problems. More sophisticated approaches could be explored in the future.

Testing

Testing and Validation: Tasks run simultaneously
Date last modified: 2017/03/10
Contributors: Xylar Asay-Davis

So far, I have tested extensively on my laptop (parallelTaskCount = 1, 2, 4 and 8) with the expected results. Later, I will test on Edison and Wolf as well.

Testing and Validation: Select maximum number of tasks
Date last modified: 2017/03/10
Contributors: Xylar Asay-Davis

Same as above.

Implementation: Lock files written by multiple tasks
Date last modified: 2017/03/10
Contributors: Xylar Asay-Davis

I ran multiple climatology map tasks at the same time and verified from the log files that only one created each mapping file. Others must have waited for that file to be written or they would have crashed almost immediately when they tried to read the mapping file during remapping operations. So I’m confident the code is working as intended.

Testing and Validation: Task parallelism should work on either login or compute nodes
Date last modified: 2017/03/10
Contributors: Xylar Asay-Davis

On Edison and Wolf, I will test running the analysis with parallel tasks both on login nodes and by submitting a job to run on the compute nodes (using the appropriate commandPrefix).

Testing and Validation: There may need to be a way to limit the memory used by a task
Date last modified: 2017/03/10
Contributors: Xylar Asay-Davis

Assuming no crashes in my testing on compute nodes with all tasks running in parallel, I will leave this consideration for investigation in the future.