diff --git a/notebooks/2-hpc-cluster.ipynb b/notebooks/2-hpc-cluster.ipynb index b972f5d9..a7db2ae2 100644 --- a/notebooks/2-hpc-cluster.ipynb +++ b/notebooks/2-hpc-cluster.ipynb @@ -1 +1 @@ -{"metadata":{"kernelspec":{"name":"flux","display_name":"Flux","language":"python"},"language_info":{"name":"python","version":"3.13.13","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"}},"nbformat_minor":5,"nbformat":4,"cells":[{"id":"ddf66f38-dc4a-4306-8b1c-b923fdb76922","cell_type":"markdown","source":"# HPC Cluster Executor\nIn contrast to the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) and the [HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) the HPC Submission Executors do not communicate via the [zero message queue](https://zeromq.org) but instead store the python functions on the file system and uses the job scheduler to handle the dependencies of the Python functions. Consequently, the block allocation `block_allocation` and the init function `init_function` are not available in the HPC Cluster Executors. At the same time it is possible to close the Python process which created the `Executor`, wait until the execution of the submitted Python functions is completed and afterwards reload the results from the cache.\n\nInternally the HPC submission mode is using the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io) to connect to HPC job schedulers and the [h5py](https://www.h5py.org) package for serializing the Python functions to store them on the file system. Both packages are optional dependency of executorlib. The installation of the [pysqa](https://pysqa.readthedocs.io) package and the [h5py](https://www.h5py.org) package are covered in the installation section. ","metadata":{}},{"id":"d56862a6-8279-421d-a090-7ca2a3c4d416","cell_type":"markdown","source":"## SLURM\nThe [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com) job scheduler is currently the most commonly used job scheduler for HPC clusters. On shared HPC systems users cannot access compute nodes directly — SLURM acts as the resource controller, accepting job requests, managing the queue, and assigning work to nodes when resources become free.\n\nIn the HPC submission mode executorlib internally uses the [sbatch](https://slurm.schedmd.com/sbatch.html) command, this is in contrast to the [HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) which internally uses the [srun](https://slurm.schedmd.com/srun.html) command.\n\nThe connection to the job scheduler is based on the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io). It provides a default configuration for most commonly used job schedulers including SLURM, in addition it is also possible to provide the submission template as part of the resource dictionary `resource_dict` or via the path to the configuration directory with the `pysqa_config_directory` parameter. All three options are covered in more detail on the [pysqa documentation](https://pysqa.readthedocs.io).","metadata":{}},{"id":"slurm-background-001","cell_type":"markdown","source":"### Background\n\nThree commands cover the day-to-day SLURM workflow:\n\n**`sbatch`** submits a batch script from the login node. The script carries resource directives on lines starting with `#SBATCH`:\n\n```bash\n#!/bin/bash\n#SBATCH --job-name=my_job\n#SBATCH --output=my_job.out\n#SBATCH --ntasks=4 # total MPI ranks\n#SBATCH --cpus-per-task=1 # CPU threads per rank\n#SBATCH --time=00:30:00 # wall-clock limit (HH:MM:SS)\n#SBATCH --partition=regular\n\nsrun --mpi=pmix python my_script.py\n```\n\n```bash\nsbatch job.sh # submit — returns a job ID immediately\n```\n\nKey `#SBATCH` directives:\n\n| Directive | Meaning |\n|---|---|\n| `--ntasks=N` | Total MPI ranks (processes) |\n| `--cpus-per-task=N` | CPU threads available to each rank |\n| `--mem=NG` | Memory per node (e.g. `8G`) |\n| `--time=HH:MM:SS` | Maximum wall-clock time |\n| `--partition=name` | Queue / partition to target |\n| `--dependency=afterok:JOBID` | Run only after another job succeeds |\n\n**`srun`** launches parallel tasks *inside* an existing allocation. Multiple `srun` calls can run concurrently using shell backgrounding:\n\n```bash\nsrun --mpi=pmix -n 4 python task_a.py &\nsrun --mpi=pmix -n 4 python task_b.py &\nwait # block until both finish\n```\n\n**`squeue`** and **`sacct`** let you inspect the queue and verify resource assignments:\n\n```bash\nsqueue --me # your running/pending jobs\nsacct -j 12345 --format=JobID,State,AllocCPUS,Elapsed # accounting for job 12345\n```\n\nCommon `squeue` state codes: `PD` (pending), `R` (running), `CG` (completing).","metadata":{}},{"id":"slurm-mpi-001","cell_type":"markdown","source":"### MPI-parallel Python\n\nThe [Message Passing Interface (MPI)](https://www.mpi-forum.org) is the dominant parallelisation standard on HPC systems. [`mpi4py`](https://mpi4py.readthedocs.io) provides Python bindings. A minimal example:\n\n```python\n# script.py\nfrom mpi4py import MPI\ncomm = MPI.COMM_WORLD\nprint(f\"rank {comm.Get_rank()} of {comm.Get_size()}\")\n```\n\n```bash\nsrun --mpi=pmix -n 4 python script.py\n```\n\nWhen multiple independent groups of ranks need to run inside one allocation there are two approaches:\n\n| Approach | How | Cross-group communication |\n|---|---|---|\n| Multiple `srun` calls | Each `srun` gets its own communicator | Not possible |\n| `MPI_Comm_split` | One `srun`, split in Python | Possible via `MPI.COMM_WORLD` |\n\n```python\n# communicator splitting — 8 ranks split into two groups of 4\ncomm = MPI.COMM_WORLD\ncolor = comm.Get_rank() // 4 # group 0 or group 1\nsub_comm = comm.Split(color)\n```","metadata":{}},{"id":"db7760e8-35a6-4a1c-8b0f-410b536c3835","cell_type":"markdown","source":"### SlurmClusterExecutor\n\n```python\nfrom executorlib import SlurmClusterExecutor\n```","metadata":{}},{"id":"b20913f3-59e4-418c-a399-866124f8e497","cell_type":"markdown","source":"In comparison to the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html), the only parameter which is changed in the `SlurmClusterExecutor` is the requirement to specify the cache directory using the `cache_directory=\"./cache\"`. The rest of the syntax remains exactly the same, to simplify the up-scaling of simulation workflows.","metadata":{}},{"id":"0b8f3b77-6199-4736-9f28-3058c5230777","cell_type":"markdown","source":"```python\nwith SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])\n```","metadata":{}},{"id":"37bef7ac-ce3e-4d8a-b848-b1474c370bca","cell_type":"markdown","source":"Specific parameters for `SlurmClusterExecutor` like the maximum run time `\"run_time_max\"`, the maximum memory `\"memory_max\"` or the submission template for the job submission script `\"submission_template\"` can be specified as part of the resource dictionary. Again it is possible to specify the resource dictonary `resource_dicionary` either for each function in the `submit()` function or during the initialization of the `SlurmClusterExecutor`.","metadata":{}},{"id":"658781de-f222-4235-8c26-b0f77a0831b3","cell_type":"markdown","source":"```python\nsubmission_template = \"\"\"\\\n#!/bin/bash\n#SBATCH --output=time.out\n#SBATCH --job-name={{job_name}}\n#SBATCH --chdir={{working_directory}}\n#SBATCH --get-user-env=L\n#SBATCH --partition={{partition}}\n{%- if run_time_max %}\n#SBATCH --time={{ [1, run_time_max // 60]|max }}\n{%- endif %}\n{%- if dependency %}\n#SBATCH --dependency=afterok:{{ dependency | join(',') }}\n{%- endif %}\n{%- if memory_max %}\n#SBATCH --mem={{memory_max}}G\n{%- endif %}\n#SBATCH --ntasks={{cores}}\n\n{{command}}\n\"\"\"\n\nwith SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n future = exe.submit(\n sum, [4, 4], \n resource_dict={\n \"submission_template\": submission_template, \n \"run_time_max\": 180, # in seconds \n \"partition\": \"s.cmfe\",\n })\n print(future.result())\n```","metadata":{}},{"id":"f7ad9c97-7743-4f87-9344-4299b2b31a56","cell_type":"markdown","source":"The template uses [Jinja2](https://jinja.palletsprojects.com) syntax. executorlib fills `{{cores}}` into `--ntasks`, so `cores=1` requests one serial process and `cores=4` requests four MPI ranks. With `pmi_mode=\"pmix\"` the executor additionally wraps the function call in `srun --mpi=pmix -n `:\n\n```python\ndef mpi_calc(i):\n from mpi4py import MPI\n comm = MPI.COMM_WORLD\n return {\"rank\": comm.Get_rank(), \"size\": comm.Get_size(), \"input\": i}\n\nwith SlurmClusterExecutor(pmi_mode=\"pmix\", cache_directory=\"./cache\") as exe:\n future = exe.submit(\n mpi_calc, 42,\n resource_dict={\"cores\": 4, \"partition\": \"regular\", \"run_time_max\": 120})\n print(future.result())\n```\n\nWith these options executorlib in combination with the SLURM job scheduler provides a lot flexibility to configure the submission of Python functions depending on the specific configuration of the job scheduler. ","metadata":{}},{"cell_type":"markdown","id":"e1299068-990e-4fd3-b926-f6415b4e9d75","metadata":{},"source":["### Verifying the Resource Assignment\n","After the submission it is often useful to confirm that the job scheduler actually assigned the requested resources. For the `SlurmClusterExecutor` the SLURM job identifier is stored in the cache and can be retrieved with the `get_cache_data()` function. This job identifier `queue_id` can then be passed to the SLURM [sacct](https://slurm.schedmd.com/sacct.html) command to inspect the accounting record of the job:\n","\n","```python\n","from executorlib import get_cache_data\n","import subprocess\n","\n","for entry in get_cache_data(cache_directory=\"./cache\"):\n"," if \"calc\" in str(entry[\"function\"]):\n"," job_id = entry[\"queue_id\"]\n"," print(subprocess.check_output(\n"," [\"sacct\", \"-j\", str(job_id), \"--format=JobID,State,AllocCPUS,Elapsed\"],\n"," universal_newlines=True,\n"," ))\n","```\n","\n","The `AllocCPUS` column reports the number of CPU cores SLURM allocated for the function. In addition to the `queue_id` each cache entry also contains the full `resource_dict`, the runtime and the path of the result file, which together provide a complete audit trail of the submission - this is the same information returned by the `get_cache_data()` function demonstrated for the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html#cache)."]},{"id":"2a814efb-2fbc-41ba-98df-cf121d19ea66","cell_type":"markdown","source":"## Flux\n[Flux](http://flux-framework.org) is a modern HPC resource manager developed at Lawrence Livermore National Laboratory (LLNL). On many systems it runs as a **secondary scheduler inside a SLURM allocation**, enabling fine-grained hierarchical task distribution. Unlike SLURM, Flux can also be installed locally via conda — making it especially suitable for demonstrations, testing, and continuous integration:\n\n```bash\nconda install -c conda-forge flux-core\nflux start # launch a local Flux instance\n```\n\nThis simple installation is explained in the [installation section](https://executorlib.readthedocs.io/en/latest/installation.html#alternative-installations). The features demonstrated below using Flux apply equally to SLURM.","metadata":{}},{"id":"flux-background-001","cell_type":"markdown","source":"### Background\n\nThe key Flux commands map closely onto their SLURM equivalents:\n\n| Flux command | SLURM equivalent | Description |\n|---|---|---|\n| `flux resource list` | `sinfo` | Show available nodes, cores, and GPUs |\n| `flux jobs -a` | `squeue` | List all jobs (running and completed) |\n| `flux submit` | `sbatch` | Submit a non-blocking job; returns a job ID immediately |\n| `flux run` | `srun` | Launch a blocking job; waits for completion |\n| `flux job attach ` | — | Stream output of a previously submitted job |\n\n```bash\n# submit a non-blocking 4-rank MPI job\nflux submit -o pmi=pmix --ntasks=4 python script.py\n\n# run a blocking 4-rank MPI job (waits for output)\nflux run -o pmi=pmix -n 4 python script.py\n```\n\nThe `-o pmi=pmix` flag matches what SLURM's `--mpi=pmix` provides — the same `mpi4py` scripts run unchanged under both schedulers.","metadata":{}},{"id":"29d7aa18-357e-416e-805c-1322b59abec1","cell_type":"markdown","source":"### Dependencies\nAs already demonstrated for the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the `Executor` classes from executorlib are capable of resolving the dependencies of serial functions, when [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects are used as inputs for subsequent function calls. For the case of the HPC submission these dependencies are communicated to the job scheduler, which allows to stop the Python process which created the `Executor` class, wait until the execution of the submitted Python functions is completed and afterwards restart the Python process for the `Executor` class and reload the calculation results from the cache defined by the `cache_directory` parameter.","metadata":{}},{"id":"0f7fc37a-1248-492d-91ab-9db1d737eaee","cell_type":"code","source":"def add_funct(a, b):\n return a + b","metadata":{"trusted":true},"outputs":[],"execution_count":1},{"id":"ae308683-6083-4e78-afc2-bff6c6dc297b","cell_type":"code","source":"from executorlib import FluxClusterExecutor\n\nwith FluxClusterExecutor(cache_directory=\"./file\") as exe:\n future = 0\n for i in range(4, 8):\n future = exe.submit(add_funct, i, future)\n print(future.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"22\n"}],"execution_count":2},{"id":"ca75cb6c-c50f-4bee-9b09-d8d29d6c263b","cell_type":"markdown","source":"### Resource Assignment\nIn analogy to the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the resource assignment for the `FluxClusterExecutor` is handled by either including the resource dictionary parameter `resource_dict` in the initialization of the `FluxClusterExecutor` class or in every call of the `submit()` function.\n\nBelow this is demonstrated once for the assignment of multiple CPU cores for the execution of a Python function which internally uses the message passing interface (MPI) via the [mpi4py](https://mpi4py.readthedocs.io) package.","metadata":{}},{"id":"eded3a0f-e54f-44f6-962f-eedde4bd2158","cell_type":"code","source":"def calc(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank\n","metadata":{"trusted":true},"outputs":[],"execution_count":3},{"id":"669b05df-3cb2-4f69-9d94-8b2442745ebb","cell_type":"code","source":"with FluxClusterExecutor(cache_directory=\"./file\") as exe:\n fs = exe.submit(calc, 3, resource_dict={\"cores\": 2})\n print(fs.result())","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"(3, 1, 0)\n"}],"execution_count":4},{"id":"d91499d7-5c6c-4c10-b7b7-bfc4b87ddaa8","cell_type":"markdown","source":"Beyond CPU cores and threads which were previously also introduced for the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the HPC Cluster Executors also provide the option to select the available accelerator cards or GPUs, by specifying the `\"gpus_per_core\"` parameter in the resource dictionary `resource_dict`. For demonstration we create a Python function which reads the GPU device IDs and submit it to the `FluxClusterExecutor` class:\n```python\ndef get_available_gpus():\n import socket\n from tensorflow.python.client import device_lib\n local_device_protos = device_lib.list_local_devices()\n return [\n (x.name, x.physical_device_desc, socket.gethostname()) \n for x in local_device_protos if x.device_type == 'GPU'\n ]\n```\n\n```python\nwith FluxClusterExecutor(\n cache_directory=\"./cache\",\n resource_dict={\"gpus_per_core\": 1}\n) as exe:\n fs_1 = exe.submit(get_available_gpus)\n fs_2 = exe.submit(get_available_gpus)\n print(fs_1.result(), fs_2.result())\n```","metadata":{}},{"cell_type":"markdown","id":"c9c46c64-d097-49d3-ba79-ecb871c8f4da","metadata":{},"source":["## Disconnecting and Reconnecting\n","A key advantage of the HPC Cluster Executors over the [HPC Job Executors](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) is that the Python process which created the executor does not need to stay alive while the submitted functions are running. As the functions are submitted as individual scheduler jobs and the results are stored on the file system, the Python process can be closed after the submission - for example to log out of the login node overnight - and the results can be reloaded later. This is controlled with the `shutdown()` method of the executor, which provides the same `wait` and `cancel_futures` parameters as the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown) of the Python standard library.\n","\n","To submit a set of functions and disconnect without waiting, the executor is created without a `with` statement and `shutdown()` is called with `wait=False` and `cancel_futures=False`:\n","\n","```python\n","from executorlib import SlurmClusterExecutor\n","\n","exe = SlurmClusterExecutor(cache_directory=\"./cache\")\n","future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n","exe.shutdown(wait=False, cancel_futures=False)\n","```\n","\n","The submitted jobs remain in the queue of the job scheduler and continue to run. At a later point - even from a new Python process - the same functions are submitted again using the same `cache_directory`. executorlib recognises the cached results and returns them immediately instead of submitting the functions a second time:\n","\n","```python\n","from executorlib import SlurmClusterExecutor\n","\n","with SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n"," future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n"," print([f.result() for f in future_lst])\n","```\n","\n","The behaviour of the `shutdown()` method is summarized in the following table:\n","\n","| `shutdown()` call | Effect |\n","|---|---|\n","| `shutdown(wait=True)` | Wait until all submitted functions are finished before continuing - this is the default and also what the `with` statement does. |\n","| `shutdown(wait=False, cancel_futures=False)` | Return immediately and leave the submitted jobs running in the scheduler queue - used to disconnect from a running workflow. |\n","| `shutdown(wait=False, cancel_futures=True)` | Cancel the submitted jobs which have not started or finished yet. |\n","\n","This disconnect-and-reconnect capability is available for both the `SlurmClusterExecutor` and the `FluxClusterExecutor`, as both communicate via the file system rather than via sockets."]},{"id":"slurm-job-executor-001","cell_type":"markdown","source":"## Combine both\n\nWhile `SlurmClusterExecutor` submits each Python function as a separate `sbatch` job from the login node, the [HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) (`SlurmJobExecutor`) runs **inside** an already-running SLURM allocation and dispatches tasks as `srun` steps — no new jobs enter the queue.\n\n```python\nfrom executorlib import SlurmJobExecutor\n\n# This code runs inside an existing SLURM job\nwith SlurmJobExecutor(max_workers=4) as exe:\n futures = [exe.submit(sum, [i, i], resource_dict={\"cores\": 1}) for i in range(4)]\n print([f.result() for f in futures])\n```\n\nThe two executor types can be nested: a function submitted via `SlurmClusterExecutor` (running as an `sbatch` job) can itself create a `SlurmJobExecutor` to parallelise sub-tasks as `srun` steps within the same allocation:\n\n```python\nfrom executorlib import SlurmClusterExecutor, SlurmJobExecutor\n\ndef parallel_workflow(n):\n with SlurmJobExecutor(max_workers=n) as inner:\n futures = [inner.submit(sum, [i, i], resource_dict={\"cores\": 1}) for i in range(n)]\n return [f.result() for f in futures]\n\nwith SlurmClusterExecutor(cache_directory=\"./cache\") as outer:\n future = outer.submit(\n parallel_workflow, 4,\n resource_dict={\"cores\": 1, \"partition\": \"regular\", \"run_time_max\": 300})\n print(future.result())\n```\n\nThe `sacct` output for such a job will show the outer `sbatch` job together with numbered `srun` steps (e.g. `12345.0`, `12345.1`, …), all completing within the same allocation.\n\n| Executor | Scheduler command | Typical use |\n|---|---|---|\n| `SlurmClusterExecutor` | `sbatch` (one job per function) | Submit from login node |\n| `SlurmJobExecutor` | `srun` steps within current job | Inside an existing allocation |\n| `FluxClusterExecutor` | `flux submit` | Flux-managed allocation or local testing |","metadata":{}},{"id":"3f47fd34-04d1-42a7-bb06-6821dc99a648","cell_type":"markdown","source":"## Cleaning Cache\nFinally, as the HPC Cluster Executors leverage the file system to communicate serialized Python functions, it is important to clean up the cache directory specified by the `cache_directory` parameter once the results of the submitted Python functions are no longer needed. The serialized Python functions are stored in binary format using the [cloudpickle](https://github.com/cloudpipe/cloudpickle) library for serialization. This format is design for caching but not for long-term storage. The user is responsible for the long-term storage of their data.","metadata":{}},{"id":"f537b4f6-cc98-43da-8aca-94a823bcbcbd","cell_type":"code","source":"import os\nimport shutil\n\ncache_dir = \"./file\"\nif os.path.exists(cache_dir):\n print(os.listdir(cache_dir))\n try:\n shutil.rmtree(cache_dir)\n except OSError:\n pass","metadata":{"trusted":true},"outputs":[{"name":"stdout","output_type":"stream","text":"['add_functcb272924f36cbaa9ac79f8d42b6771c8', 'add_funct67b8245bf71c3c6dcb2018663939c72d', 'add_funct144ecf19b5020fccad214df3f4bdabd0', 'add_funct4a5d1f06cca57f0ffbaa3116d39ecc6a', 'add_funct4a5d1f06cca57f0ffbaa3116d39ecc6a_o.h5', 'calcd732cdcbd2c8e68145d99f0be814e667', 'add_funct67b8245bf71c3c6dcb2018663939c72d_o.h5', 'add_funct144ecf19b5020fccad214df3f4bdabd0_o.h5', 'calcd732cdcbd2c8e68145d99f0be814e667_o.h5', 'add_functcb272924f36cbaa9ac79f8d42b6771c8_o.h5']\n"}],"execution_count":5},{"id":"3efc9f5d-fbf9-4a85-8963-5711a453130d","cell_type":"code","source":"","metadata":{"trusted":true},"outputs":[],"execution_count":null}]} \ No newline at end of file +{"metadata":{"kernelspec":{"name":"flux","display_name":"Flux","language":"python"},"language_info":{"name":"python","version":"3.13.14","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"}},"nbformat_minor":5,"nbformat":4,"cells":[{"id":"ddf66f38-dc4a-4306-8b1c-b923fdb76922","cell_type":"markdown","source":"# HPC Cluster Executor\nIn contrast to the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) and the [HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) the HPC Submission Executors do not communicate via the [zero message queue](https://zeromq.org) but instead store the python functions on the file system and uses the job scheduler to handle the dependencies of the Python functions. Consequently, the block allocation `block_allocation` and the init function `init_function` are not available in the HPC Cluster Executors. At the same time it is possible to close the Python process which created the `Executor`, wait until the execution of the submitted Python functions is completed and afterwards reload the results from the cache.\n\nInternally the HPC submission mode is using the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io) to connect to HPC job schedulers and the [h5py](https://www.h5py.org) package for serializing the Python functions to store them on the file system. Both packages are optional dependency of executorlib. The installation of the [pysqa](https://pysqa.readthedocs.io) package and the [h5py](https://www.h5py.org) package are covered in the installation section. ","metadata":{}},{"id":"d56862a6-8279-421d-a090-7ca2a3c4d416","cell_type":"markdown","source":"## SLURM\nThe [Simple Linux Utility for Resource Management (SLURM)](https://slurm.schedmd.com) job scheduler is currently the most commonly used job scheduler for HPC clusters. On shared HPC systems users cannot access compute nodes directly — SLURM acts as the resource controller, accepting job requests, managing the queue, and assigning work to nodes when resources become free.\n\nIn the HPC submission mode executorlib internally uses the [sbatch](https://slurm.schedmd.com/sbatch.html) command, this is in contrast to the [HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) which internally uses the [srun](https://slurm.schedmd.com/srun.html) command.\n\nThe connection to the job scheduler is based on the [Python simple queuing system adatper (pysqa)](https://pysqa.readthedocs.io). It provides a default configuration for most commonly used job schedulers including SLURM, in addition it is also possible to provide the submission template as part of the resource dictionary `resource_dict` or via the path to the configuration directory with the `pysqa_config_directory` parameter. All three options are covered in more detail on the [pysqa documentation](https://pysqa.readthedocs.io).","metadata":{}},{"id":"slurm-background-001","cell_type":"markdown","source":"### Background\n\nThree commands cover the day-to-day SLURM workflow:\n\n**`sbatch`** submits a batch script from the login node. The script carries resource directives on lines starting with `#SBATCH`:\n\n```bash\n#!/bin/bash\n#SBATCH --job-name=my_job\n#SBATCH --output=my_job.out\n#SBATCH --ntasks=4 # total MPI ranks\n#SBATCH --cpus-per-task=1 # CPU threads per rank\n#SBATCH --time=00:30:00 # wall-clock limit (HH:MM:SS)\n#SBATCH --partition=regular\n\nsrun --mpi=pmix python my_script.py\n```\n\n```bash\nsbatch job.sh # submit — returns a job ID immediately\n```\n\nKey `#SBATCH` directives:\n\n| Directive | Meaning |\n|---|---|\n| `--ntasks=N` | Total MPI ranks (processes) |\n| `--cpus-per-task=N` | CPU threads available to each rank |\n| `--mem=NG` | Memory per node (e.g. `8G`) |\n| `--time=HH:MM:SS` | Maximum wall-clock time |\n| `--partition=name` | Queue / partition to target |\n| `--dependency=afterok:JOBID` | Run only after another job succeeds |\n\n**`srun`** launches parallel tasks *inside* an existing allocation. Multiple `srun` calls can run concurrently using shell backgrounding:\n\n```bash\nsrun --mpi=pmix -n 4 python task_a.py &\nsrun --mpi=pmix -n 4 python task_b.py &\nwait # block until both finish\n```\n\n**`squeue`** and **`sacct`** let you inspect the queue and verify resource assignments:\n\n```bash\nsqueue --me # your running/pending jobs\nsacct -j 12345 --format=JobID,State,AllocCPUS,Elapsed # accounting for job 12345\n```\n\nCommon `squeue` state codes: `PD` (pending), `R` (running), `CG` (completing).","metadata":{}},{"id":"slurm-mpi-001","cell_type":"markdown","source":"### MPI-parallel Python\n\nThe [Message Passing Interface (MPI)](https://www.mpi-forum.org) is the dominant parallelisation standard on HPC systems. [`mpi4py`](https://mpi4py.readthedocs.io) provides Python bindings. A minimal example:\n\n```python\n# script.py\nfrom mpi4py import MPI\ncomm = MPI.COMM_WORLD\nprint(f\"rank {comm.Get_rank()} of {comm.Get_size()}\")\n```\n\n```bash\nsrun --mpi=pmix -n 4 python script.py\n```\n\nWhen multiple independent groups of ranks need to run inside one allocation there are two approaches:\n\n| Approach | How | Cross-group communication |\n|---|---|---|\n| Multiple `srun` calls | Each `srun` gets its own communicator | Not possible |\n| `MPI_Comm_split` | One `srun`, split in Python | Possible via `MPI.COMM_WORLD` |\n\n```python\n# communicator splitting — 8 ranks split into two groups of 4\ncomm = MPI.COMM_WORLD\ncolor = comm.Get_rank() // 4 # group 0 or group 1\nsub_comm = comm.Split(color)\n```","metadata":{}},{"id":"db7760e8-35a6-4a1c-8b0f-410b536c3835","cell_type":"markdown","source":"### SlurmClusterExecutor\n\n```python\nfrom executorlib import SlurmClusterExecutor\n```","metadata":{}},{"id":"b20913f3-59e4-418c-a399-866124f8e497","cell_type":"markdown","source":"In comparison to the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html), the only parameter which is changed in the `SlurmClusterExecutor` is the requirement to specify the cache directory using the `cache_directory=\"./cache\"`. The rest of the syntax remains exactly the same, to simplify the up-scaling of simulation workflows.","metadata":{}},{"id":"0b8f3b77-6199-4736-9f28-3058c5230777","cell_type":"markdown","source":"```python\nwith SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])\n```","metadata":{}},{"id":"37bef7ac-ce3e-4d8a-b848-b1474c370bca","cell_type":"markdown","source":"Specific parameters for `SlurmClusterExecutor` like the maximum run time `\"run_time_max\"`, the maximum memory `\"memory_max\"` or the submission template for the job submission script `\"submission_template\"` can be specified as part of the resource dictionary. Again it is possible to specify the resource dictonary `resource_dicionary` either for each function in the `submit()` function or during the initialization of the `SlurmClusterExecutor`.","metadata":{}},{"id":"658781de-f222-4235-8c26-b0f77a0831b3","cell_type":"markdown","source":"```python\nsubmission_template = \"\"\"\\\n#!/bin/bash\n#SBATCH --output=time.out\n#SBATCH --job-name={{job_name}}\n#SBATCH --chdir={{working_directory}}\n#SBATCH --get-user-env=L\n#SBATCH --partition={{partition}}\n{%- if run_time_max %}\n#SBATCH --time={{ [1, run_time_max // 60]|max }}\n{%- endif %}\n{%- if dependency %}\n#SBATCH --dependency=afterok:{{ dependency | join(',') }}\n{%- endif %}\n{%- if memory_max %}\n#SBATCH --mem={{memory_max}}G\n{%- endif %}\n#SBATCH --ntasks={{cores}}\n\n{{command}}\n\"\"\"\n\nwith SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n future = exe.submit(\n sum, [4, 4], \n resource_dict={\n \"submission_template\": submission_template, \n \"run_time_max\": 180, # in seconds \n \"partition\": \"s.cmfe\",\n })\n print(future.result())\n```","metadata":{}},{"id":"f7ad9c97-7743-4f87-9344-4299b2b31a56","cell_type":"markdown","source":"The template uses [Jinja2](https://jinja.palletsprojects.com) syntax. executorlib fills `{{cores}}` into `--ntasks`, so `cores=1` requests one serial process and `cores=4` requests four MPI ranks. With `pmi_mode=\"pmix\"` the executor additionally wraps the function call in `srun --mpi=pmix -n `:\n\n```python\ndef mpi_calc(i):\n from mpi4py import MPI\n comm = MPI.COMM_WORLD\n return {\"rank\": comm.Get_rank(), \"size\": comm.Get_size(), \"input\": i}\n\nwith SlurmClusterExecutor(pmi_mode=\"pmix\", cache_directory=\"./cache\") as exe:\n future = exe.submit(\n mpi_calc, 42,\n resource_dict={\"submission_template\": submission_template, \"cores\": 4, \"partition\": \"regular\", \"run_time_max\": 120})\n print(future.result())\n```\n\nWith these options executorlib in combination with the SLURM job scheduler provides a lot flexibility to configure the submission of Python functions depending on the specific configuration of the job scheduler. \n\nIn case the submission goes wrong, for example when no submission template is defined and executorlib falls back to the default submission template a `CalledProcessError` is raised:\n\n```python\ndef echo(i):\n return i\n\nwith SlurmClusterExecutor() as exe:\n f1 = exe.submit(echo)\n print(f1.result())\n```\n\nThe error message would be something like:\n```\nCalledProcessError: Command '['sbatch', '--parsable', '/home/jovyan/executorlib_cache/echoe5873521b7330f831bed941744079e83/run_queue.sh']' returned non-zero exit status 1.\n```\n\nThis is not really helpful, as it primarily says the `sbatch` command was not successful, but the error message which `sbatch` raised on the command line is not directly accessible from Python. To access the error message you can access the exception of the future object:\n\n```python\nexcep = f1.exception()\nprint(excep.output)\n```\n\nThis gives:\n```\nsbatch: error: invalid partition specified: (null)\nsbatch: error: Batch job submission failed: Invalid partition name specified\n```\n\nSo the submission failed because no partition was provided, which can be corrected by adding the `--partition` flag in the submission template, as demonstrated above. ","metadata":{}},{"id":"e1299068-990e-4fd3-b926-f6415b4e9d75","cell_type":"markdown","source":"### Verifying the Resource Assignment\nAfter the submission it is often useful to confirm that the job scheduler actually assigned the requested resources. For the `SlurmClusterExecutor` the SLURM job identifier is stored in the cache and can be retrieved with the `get_cache_data()` function. This job identifier `queue_id` can then be passed to the SLURM [sacct](https://slurm.schedmd.com/sacct.html) command to inspect the accounting record of the job:\n\n```python\nfrom executorlib import get_cache_data\nimport subprocess\n\nfor entry in get_cache_data(cache_directory=\"./cache\"):\n if \"calc\" in str(entry[\"function\"]):\n job_id = entry[\"queue_id\"]\n print(subprocess.check_output(\n [\"sacct\", \"-j\", str(job_id), \"--format=JobID,State,AllocCPUS,Elapsed\"],\n universal_newlines=True,\n ))\n```\n\nThe `AllocCPUS` column reports the number of CPU cores SLURM allocated for the function. In addition to the `queue_id` each cache entry also contains the full `resource_dict`, the runtime and the path of the result file, which together provide a complete audit trail of the submission - this is the same information returned by the `get_cache_data()` function demonstrated for the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html#cache).","metadata":{}},{"id":"2a814efb-2fbc-41ba-98df-cf121d19ea66","cell_type":"markdown","source":"## Flux\n[Flux](http://flux-framework.org) is a modern HPC resource manager developed at Lawrence Livermore National Laboratory (LLNL). On many systems it runs as a **secondary scheduler inside a SLURM allocation**, enabling fine-grained hierarchical task distribution. Unlike SLURM, Flux can also be installed locally via conda — making it especially suitable for demonstrations, testing, and continuous integration:\n\n```bash\nconda install -c conda-forge flux-core\nflux start # launch a local Flux instance\n```\n\nThis simple installation is explained in the [installation section](https://executorlib.readthedocs.io/en/latest/installation.html#alternative-installations). The features demonstrated below using Flux apply equally to SLURM.","metadata":{}},{"id":"flux-background-001","cell_type":"markdown","source":"### Background\n\nThe key Flux commands map closely onto their SLURM equivalents:\n\n| Flux command | SLURM equivalent | Description |\n|---|---|---|\n| `flux resource list` | `sinfo` | Show available nodes, cores, and GPUs |\n| `flux jobs -a` | `squeue` | List all jobs (running and completed) |\n| `flux submit` | `sbatch` | Submit a non-blocking job; returns a job ID immediately |\n| `flux run` | `srun` | Launch a blocking job; waits for completion |\n| `flux job attach ` | — | Stream output of a previously submitted job |\n\n```bash\n# submit a non-blocking 4-rank MPI job\nflux submit -o pmi=pmix --ntasks=4 python script.py\n\n# run a blocking 4-rank MPI job (waits for output)\nflux run -o pmi=pmix -n 4 python script.py\n```\n\nThe `-o pmi=pmix` flag matches what SLURM's `--mpi=pmix` provides — the same `mpi4py` scripts run unchanged under both schedulers.","metadata":{}},{"id":"29d7aa18-357e-416e-805c-1322b59abec1","cell_type":"markdown","source":"### Dependencies\nAs already demonstrated for the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the `Executor` classes from executorlib are capable of resolving the dependencies of serial functions, when [concurrent futures Future](https://docs.python.org/3/library/concurrent.futures.html#future-objects) objects are used as inputs for subsequent function calls. For the case of the HPC submission these dependencies are communicated to the job scheduler, which allows to stop the Python process which created the `Executor` class, wait until the execution of the submitted Python functions is completed and afterwards restart the Python process for the `Executor` class and reload the calculation results from the cache defined by the `cache_directory` parameter.","metadata":{}},{"id":"0f7fc37a-1248-492d-91ab-9db1d737eaee","cell_type":"code","source":"def add_funct(a, b):\n return a + b","metadata":{"trusted":false},"outputs":[],"execution_count":1},{"id":"ae308683-6083-4e78-afc2-bff6c6dc297b","cell_type":"code","source":"from executorlib import FluxClusterExecutor\n\nwith FluxClusterExecutor(cache_directory=\"./file\") as exe:\n future = 0\n for i in range(4, 8):\n future = exe.submit(add_funct, i, future)\n print(future.result())","metadata":{"trusted":false},"outputs":[{"name":"stdout","output_type":"stream","text":"22\n"}],"execution_count":2},{"id":"ca75cb6c-c50f-4bee-9b09-d8d29d6c263b","cell_type":"markdown","source":"### Resource Assignment\nIn analogy to the [SingleNodeExecutor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the resource assignment for the `FluxClusterExecutor` is handled by either including the resource dictionary parameter `resource_dict` in the initialization of the `FluxClusterExecutor` class or in every call of the `submit()` function.\n\nBelow this is demonstrated once for the assignment of multiple CPU cores for the execution of a Python function which internally uses the message passing interface (MPI) via the [mpi4py](https://mpi4py.readthedocs.io) package.","metadata":{}},{"id":"eded3a0f-e54f-44f6-962f-eedde4bd2158","cell_type":"code","source":"def calc(i):\n from mpi4py import MPI\n\n size = MPI.COMM_WORLD.Get_size()\n rank = MPI.COMM_WORLD.Get_rank()\n return i, size, rank\n","metadata":{"trusted":false},"outputs":[],"execution_count":3},{"id":"669b05df-3cb2-4f69-9d94-8b2442745ebb","cell_type":"code","source":"with FluxClusterExecutor(cache_directory=\"./file\") as exe:\n fs = exe.submit(calc, 3, resource_dict={\"cores\": 2})\n print(fs.result())","metadata":{"trusted":false},"outputs":[{"name":"stdout","output_type":"stream","text":"(3, 1, 0)\n"}],"execution_count":4},{"id":"d91499d7-5c6c-4c10-b7b7-bfc4b87ddaa8","cell_type":"markdown","source":"Beyond CPU cores and threads which were previously also introduced for the [Single Node Executor](https://executorlib.readthedocs.io/en/latest/1-single-node.html) the HPC Cluster Executors also provide the option to select the available accelerator cards or GPUs, by specifying the `\"gpus_per_core\"` parameter in the resource dictionary `resource_dict`. For demonstration we create a Python function which reads the GPU device IDs and submit it to the `FluxClusterExecutor` class:\n```python\ndef get_available_gpus():\n import socket\n from tensorflow.python.client import device_lib\n local_device_protos = device_lib.list_local_devices()\n return [\n (x.name, x.physical_device_desc, socket.gethostname()) \n for x in local_device_protos if x.device_type == 'GPU'\n ]\n```\n\n```python\nwith FluxClusterExecutor(\n cache_directory=\"./cache\",\n resource_dict={\"gpus_per_core\": 1}\n) as exe:\n fs_1 = exe.submit(get_available_gpus)\n fs_2 = exe.submit(get_available_gpus)\n print(fs_1.result(), fs_2.result())\n```","metadata":{}},{"id":"c9c46c64-d097-49d3-ba79-ecb871c8f4da","cell_type":"markdown","source":"## Disconnecting and Reconnecting\nA key advantage of the HPC Cluster Executors over the [HPC Job Executors](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) is that the Python process which created the executor does not need to stay alive while the submitted functions are running. As the functions are submitted as individual scheduler jobs and the results are stored on the file system, the Python process can be closed after the submission - for example to log out of the login node overnight - and the results can be reloaded later. This is controlled with the `shutdown()` method of the executor, which provides the same `wait` and `cancel_futures` parameters as the [Executor interface](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Executor.shutdown) of the Python standard library.\n\nTo submit a set of functions and disconnect without waiting, the executor is created without a `with` statement and `shutdown()` is called with `wait=False` and `cancel_futures=False`:\n\n```python\nfrom executorlib import SlurmClusterExecutor\n\nexe = SlurmClusterExecutor(cache_directory=\"./cache\")\nfuture_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\nexe.shutdown(wait=False, cancel_futures=False)\n```\n\nThe submitted jobs remain in the queue of the job scheduler and continue to run. At a later point - even from a new Python process - the same functions are submitted again using the same `cache_directory`. executorlib recognises the cached results and returns them immediately instead of submitting the functions a second time:\n\n```python\nfrom executorlib import SlurmClusterExecutor\n\nwith SlurmClusterExecutor(cache_directory=\"./cache\") as exe:\n future_lst = [exe.submit(sum, [i, i]) for i in range(1, 4)]\n print([f.result() for f in future_lst])\n```\n\nThe behaviour of the `shutdown()` method is summarized in the following table:\n\n| `shutdown()` call | Effect |\n|---|---|\n| `shutdown(wait=True)` | Wait until all submitted functions are finished before continuing - this is the default and also what the `with` statement does. |\n| `shutdown(wait=False, cancel_futures=False)` | Return immediately and leave the submitted jobs running in the scheduler queue - used to disconnect from a running workflow. |\n| `shutdown(wait=False, cancel_futures=True)` | Cancel the submitted jobs which have not started or finished yet. |\n\nThis disconnect-and-reconnect capability is available for both the `SlurmClusterExecutor` and the `FluxClusterExecutor`, as both communicate via the file system rather than via sockets.","metadata":{}},{"id":"slurm-job-executor-001","cell_type":"markdown","source":"## Combine both\n\nWhile `SlurmClusterExecutor` submits each Python function as a separate `sbatch` job from the login node, the [HPC Job Executor](https://executorlib.readthedocs.io/en/latest/3-hpc-job.html) (`SlurmJobExecutor`) runs **inside** an already-running SLURM allocation and dispatches tasks as `srun` steps — no new jobs enter the queue.\n\n```python\nfrom executorlib import SlurmJobExecutor\n\n# This code runs inside an existing SLURM job\nwith SlurmJobExecutor(max_workers=4) as exe:\n futures = [exe.submit(sum, [i, i], resource_dict={\"cores\": 1}) for i in range(4)]\n print([f.result() for f in futures])\n```\n\nThe two executor types can be nested: a function submitted via `SlurmClusterExecutor` (running as an `sbatch` job) can itself create a `SlurmJobExecutor` to parallelise sub-tasks as `srun` steps within the same allocation:\n\n```python\nfrom executorlib import SlurmClusterExecutor, SlurmJobExecutor\n\ndef parallel_workflow(n):\n with SlurmJobExecutor(max_workers=n) as inner:\n futures = [inner.submit(sum, [i, i], resource_dict={\"cores\": 1}) for i in range(n)]\n return [f.result() for f in futures]\n\nwith SlurmClusterExecutor(cache_directory=\"./cache\") as outer:\n future = outer.submit(\n parallel_workflow, 4,\n resource_dict={\"cores\": 1, \"partition\": \"regular\", \"run_time_max\": 300})\n print(future.result())\n```\n\nThe `sacct` output for such a job will show the outer `sbatch` job together with numbered `srun` steps (e.g. `12345.0`, `12345.1`, …), all completing within the same allocation.\n\n| Executor | Scheduler command | Typical use |\n|---|---|---|\n| `SlurmClusterExecutor` | `sbatch` (one job per function) | Submit from login node |\n| `SlurmJobExecutor` | `srun` steps within current job | Inside an existing allocation |\n| `FluxClusterExecutor` | `flux submit` | Flux-managed allocation or local testing |","metadata":{}},{"id":"3f47fd34-04d1-42a7-bb06-6821dc99a648","cell_type":"markdown","source":"## Cleaning Cache\nFinally, as the HPC Cluster Executors leverage the file system to communicate serialized Python functions, it is important to clean up the cache directory specified by the `cache_directory` parameter once the results of the submitted Python functions are no longer needed. The serialized Python functions are stored in binary format using the [cloudpickle](https://github.com/cloudpipe/cloudpickle) library for serialization. This format is design for caching but not for long-term storage. The user is responsible for the long-term storage of their data.","metadata":{}},{"id":"f537b4f6-cc98-43da-8aca-94a823bcbcbd","cell_type":"code","source":"import os\nimport shutil\n\ncache_dir = \"./file\"\nif os.path.exists(cache_dir):\n print(os.listdir(cache_dir))\n try:\n shutil.rmtree(cache_dir)\n except OSError:\n pass","metadata":{"trusted":false},"outputs":[{"name":"stdout","output_type":"stream","text":"['add_functcb272924f36cbaa9ac79f8d42b6771c8', 'add_funct67b8245bf71c3c6dcb2018663939c72d', 'add_funct144ecf19b5020fccad214df3f4bdabd0', 'add_funct4a5d1f06cca57f0ffbaa3116d39ecc6a', 'add_funct4a5d1f06cca57f0ffbaa3116d39ecc6a_o.h5', 'calcd732cdcbd2c8e68145d99f0be814e667', 'add_funct67b8245bf71c3c6dcb2018663939c72d_o.h5', 'add_funct144ecf19b5020fccad214df3f4bdabd0_o.h5', 'calcd732cdcbd2c8e68145d99f0be814e667_o.h5', 'add_functcb272924f36cbaa9ac79f8d42b6771c8_o.h5']\n"}],"execution_count":5},{"id":"3efc9f5d-fbf9-4a85-8963-5711a453130d","cell_type":"code","source":"","metadata":{"trusted":false},"outputs":[],"execution_count":null}]} \ No newline at end of file