class: center, middle # Using Python & Jupyter Notebooks on Savio --- # Announcements - Research IT is hiring several [graduate student Domain Consultants](https://docs.google.com/document/d/18-ea-CsRsSVSu8hhFqnPFs-i7RvcG1ObGCW_1olUdJg/edit) for flexible, 10% to 25% (4-10 hours/week) appointments. Email your cover letter and CV to: [research-it@berkeley.edu](mailto:research-it@berkeley.edu) -- - Our next [Cloud Computing Meetup](https://www.meetup.com/ucberkeley_cloudmeetup/) is on **Thursday, December 9 at 1pm** and focused on **startups using cloud technologies**. -- - We are looking for researchers working with **sensitive data**. Get in touch for more information: [research-it@berkeley.edu](mailto:research-it@berkeley.edu) -- - If you have any data or compute questions, please visit us at our weekly office hours on **Wednesdays from 1:30-3pm** and **Thursdays from 9:30-11am**. See https://research-it.berkeley.edu/consulting for Zoom link. --- # Outline - Python package management on the cluster - Overview and pros/cons of using pip and conda - Best practices when using conda environments -- - Using Jupyter notebooks via Open OnDemand on Savio - Monitoring and debugging OOD jobs - Creating your own Jupyter kernels from a conda env - Modifying the kernel to use the module system -- - Basic parallelization strategies in Python - ipyparallel - Dask - Ray - Parallel processing in Jupyter notebooks --- # Introduction We'll do this mostly as a demonstration. We encourage you to login to your account and try out the various examples yourself as we go through them. Much of this material is based on the extensive Savio documention we have prepared and continue to prepare, available at [https://docs-research-it.berkeley.edu/services/high-performance-computing/](https://docs-research-it.berkeley.edu/services/high-performance-computing/). The materials for this tutorial are available using git at the short URL ([tinyurl.com/brc-nov21](https://tinyurl.com/brc-nov21)), the GitHub URL ([https://github.com/ucb-rit/savio-training-python-jupyter-fall-2021](https://github.com/ucb-rit/savio-training-python-jupyter-fall-2021)), or simply as a [zip file](https://github.com/ucb-rit/savio-training-python-jupyter-fall-2021/archive/main.zip). --- # Python on Savio First step, load in Python: ```sh # loads latest version on system 3.7 module load python # or load specific version module load python/3.6 # ``` -- To see what versions are available on the system: ```sh [jpduncan@ln002 ~]$ module avail python -------------------- /global/software/sl-7.x86_64/modfiles/langs --------------------- python/2.7 python/3.5 python/3.6 python/3.7 ... ``` --- ## Don't forget... ...to `module load python`. -- If you forget you will still have a default version available (but no `pip` or `conda`): ```sh [jpduncan@ln002 ~]$ which python /usr/bin/python [jpduncan@ln002 ~]$ python --version Python 2.7.5 [jpduncan@ln002 ~]$ which {pip,conda} /usr/bin/which: no pip in (/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/global/home/groups/allhands/bin:/global/home/users/jpduncan/bin) /usr/bin/which: no conda in (/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/global/home/groups/allhands/bin:/global/home/users/jpduncan/bin) ``` -- .red[WARNING:] We strongly caution against using Python 2 which is no longer supported by the Python Software Foundation. --- # System default Python packages Run `conda list` to see what packages are already available. ```sh [jpduncan@ln001 ~]$ conda list | head # packages in environment at /global/software/sl-7.x86_64/modules/langs/python/3.7: # # Name Version Build Channel _anaconda_depends 2019.03 py37_0 anaconda _ipyw_jlab_nb_ext_conf 0.1.0 py37_0 _libgcc_mutex 0.1 main alabaster 0.7.12 py37_0 anaconda custom py37_1 anaconda-client 1.7.2 py37_0 anaconda-navigator 1.9.7 py37_0 ``` -- Includes many common packages and their dependencies, such as: - **Scientific computing**: numpy, scipy -- - **Visualization**: bokeh, seaborn, matplotlib -- - **Parallel computing**: dask, ipyparallel -- - **Data ingest/manipulation**: hdf5, pandas -- - **Data science**: pyspark, scikit-learn, statsmodels - many more --- # More ML packages A few machine learning packages in the module system: ```sh [jpduncan@ln002 ~]$ module avail ml --------------------- /global/software/sl-7.x86_64/modfiles/apps --------------------- ml/caffe/rc5 ml/tensorflow/1.12.0-cpu-py36 ml/cntk/2.0.beta15-py27 ml/tensorflow/1.12.0-py36 ml/cntk/2.0.beta15-py35 ml/tensorflow/1.7.0-py27 ml/mxnet/0.9.3-py27 ml/tensorflow/1.7.0-py35 ml/mxnet/0.9.3-py35 ml/tensorflow/1.7.0-py36 ml/mxnet/0.9.3-py36 ml/tensorflow/2.0-py36 ml/mxnet/0.9.3-r-3.2.5 ml/tensorflow/2.1.0-py37 ml/tensorflow/1.0.0-py27 ml/tensorflow/2.3.0-py37 ml/tensorflow/1.0.0-py35 ml/tensorflow/2.5.0-py37 ml/tensorflow/1.0.0-py36 ml/torch/torch7 ``` -- ```sh [jpduncan@ln002 ~]$ module avail pytorch ----------------- /global/home/groups/consultsw/sl-7.x86_64/modfiles ----------------- pytorch/0.3.1-py3.5-cuda9.0 pytorch/0.4.0-py36-cuda9.0 pytorch/1.0.0-py36-cuda9.0 ``` --- name: pip # Installing additional packages with pip pip stands for "package installer for Python". You can use it to install packages from: --- template: pip - PyPI -- the Python Package Index ```sh # --user to install in ~/.local # --upgrade to get newer version of package than already installed pip install --user --upgrade scikit-learn ``` --- template: pip count: false - PyPI -- the Python Package Index - Version control systems (VCS) such as `git` ```sh pip install --user git+https://github.com/scikit-learn/scikit-learn ``` --- template: pip count: false - PyPI -- the Python Package Index - Version control systems (VCS) such as `git` - Local project directories ```sh # -e is for "editable" installs (can also be used with VCS) # this is similar to `python setup.py develop` pip install --user -e path/to/your/project ``` --- template: pip count: false - PyPI -- the Python Package Index - Version control systems (VCS) such as `git` - Local project directories - Local or remote source archives ```sh pip install --user https://github.com/scikit-learn/scikit-learn/archive/refs/tags/1.0.1.tar.gz ``` --- template: pip count: false - PyPI -- the Python Package Index - Version control systems (VCS) such as `git` - Local project directories - Local or remote source archives - From a `requirements.txt` file ``` pip install --user -r requirements.txt ``` -- .red[WARNING:] `conda list` won't show the packages installed via the default version of pip that is available after `module load python`. Use `pip list --user` instead. --- # pip default user install directory By default, using `pip install --user` puts package Python code, library files, and metadata into `~/.local/lib/python3.7/site-packages` (assuming Python 3.7). ```sh [jpduncan@ln003 ~]$ pip install --user --upgrade scikit-learn [jpduncan@ln003 ~]$ ls ~/.local/lib/python3.7/site-packages/ __pycache__ sklearn scikit_learn-1.0.1.dist-info threadpoolctl-3.0.0.dist-info scikit_learn.libs threadpoolctl.py ``` -- To avoid taking up too much room in your home directory, you can move this directory to scratch and create a symlink: ```sh cp -pr ~/.local /global/scratch/users/$USER/.local rm -rf ~/.local ln -s /global/scratch/users/$USER/.local ~/.local ``` -- Alternatively, you can change this directory for your session by setting the environmental variable `PYTHONUSERBASE`, or permanently by setting it in `~/.bashrc`. --- ## Uninstalling packages previously installed with pip ```sh # uninstall a single package pip uninstall scikit-learn # uninstall all user packages pip freeze --user | xargs pip uninstall -y # uninstall all user packages with specific prefix pip freeze --user | grep scikit- | xargs pip uninstall -y ``` -- Sadly, `pip uninstall` does not remove package dependencies (but `pip-autoremove` package may help). -- More info on various pip commands in docs: https://pip.pypa.io/en/stable/ --- ## Managing your Python environment with conda The second package manager available by default when you `module load python` is conda. Unlike pip, conda is also an environment manager. -- Create a conda environment. ```sh # create an empty conda env called my-env [jpduncan@ln003 ~]$ conda create --name my-env ``` -- This creates a directory `~/.conda/envs/my-env`. Activate it: ```sh [jpduncan@ln003 ~]$ source activate my-env (my-env) [jpduncan@ln003 ~]$ conda install numpy # also installs Python 3.9 (my-env) [jpduncan@ln003 ~]$ source deactivate DeprecationWarning: 'source deactivate' is deprecated. Use 'conda deactivate'. [jpduncan@ln003 ~]$ ``` -- .red[WARNING:] You may be used to using `conda activate` to activate an environment, but this requires you to run `conda init` first which we don't recommend on Savio. If you do use `conda init`, we recommend you also run `conda config --set auto_activate_base False`. --- # Installing additional packages with conda `conda` is a bit more restrictive than `pip`. There are two main ways to install packages: -- - From various conda "channels" such as `default` (https://anaconda.org/), `conda-forge`, `bioconda`, among others. ```sh (my-env) [jpduncan@ln003 ~]$ conda install scipy # using the default channel # -c is short for --channel (my-env) [jpduncan@ln003 ~]$ conda install -c bioconda samtools # update scipy using the conda-forge channel (my-env) [jpduncan@ln003 ~]$ conda update scipy --channel conda-forge ``` -- - Advanced: You can also install from an archive (e.g., .tar.bz2), but doing so won't resolve the package's dependencies. -- .red[WARNING]: Be sure to activate a conda environment before using `conda install`, otherwise conda will try to put the packages in a read-only system location. --- # Conda config You can customize conda's configuration using `conda config`. - Add `conda-forge` with highest priority for all envs ```sh (my-env) [jpduncan@ln003 ~]$ conda config --add channels conda-forge ``` -- - Add `conda-forge` with lowest priority for all envs: ```sh (my-env) [jpduncan@ln003 ~]$ conda config --append channels conda-forge ``` -- - Add `bioconda` with highest priority for a specific env: ```sh (my-env) [jpduncan@ln003 ~]$ conda config --env --add channels bioconda ``` -- The first two will add or modify `~/.condarc`, while the third will create `~/.conda/envs/my-env/.condarc`. --- # Using pip within a conda env .red[WARNING]: This can cause issues because conda doesn't consider pip-installed packages when installing additional packages. Be careful when using pip in a conda environment. -- First be sure that pip is available already installed in the environment and follow these [best practices](https://www.anaconda.com/blog/using-pip-in-a-conda-environment): - Use pip only after conda -- - If conda changes are needed after using pip, create new environment -- - Don't use `--user` when calling `pip install` -- - Always run pip with `--upgrade-strategy only-if-needed` (the default) -- - Experimental: Use `pip_interop_enabled` setting [see here](https://docs.conda.io/projects/conda/en/latest/user-guide/configuration/pip-interoperability.html) --- ## Upgrading packages and rolling back conda environment changes Say you changed your environment (e.g., updated a package) and suddenly your code has errors. ```sh (my-env) [jpduncan@ln003 ~]$ conda update samtools # breaks environment! ``` -- You can roll those changes back as follows: ```sh # find the last working revision number (my-env) [jpduncan@ln003 ~]$ conda list --revisions # restore to revision 3 (my-env) [jpduncan@ln003 ~]$ conda install --revision=3 ``` -- .blue[NOTE]: conda doesn't show revisions for changes made by pip, so they can't be rolled back this way. --- # Changing installation directory with conda When an environment is active, packages will install in its directory. -- You can also install packages to specific environments or paths without activating first: ```sh # install to the my-env environment [jpduncan@ln003 ~]$ conda install --name my-env scipy # create env and install packages at the same time [jpduncan@ln003 ~]$ conda create --prefix $SCRATCH/conda/envs/scratch-env scipy # give full path to env to activate [jpduncan@ln003 ~]$ source activate $SCRATCH/conda/envs/scratch-env ``` -- Like `~/.local`, you can also consider moving `~/.conda` to scratch and symlinking. --- ## Removing packages, removing conda environments, and cleaning up To remove a specific package in a specific environment: ```sh [jpduncan@ln003 ~]$ conda remove scipy --name my-env ``` -- To remove an entire environment (deactivate first): ```sh [jpduncan@ln003 ~]$ conda remove --name my-env --all ``` -- `conda clean` helps remove unused packages and tarballs: ```sh [jpduncan@ln003 ~]$ conda clean --dry-run --all ``` --- ## pip pros and cons .green[\+] can use system packages -- .green[\+] new package versions come out on PyPI first -- .green[\+] larger number of packages on PyPI -- .green[\+] more flexible package installation -- .green[\+] a bit faster than conda -- .red[\-] doesn't check for simultaneous compatibility of all dependencies -- .red[\-] limited to system Python versions -- .red[\-] no automated dependency cleanup -- .red[\-] Python packages only --- ## conda pros and cons .green[\+] environments self-contained, can use any version of Python -- .green[\+] `conda clean` helps remove unused packages -- .green[\+] if you make a change that didn't go well, roll back -- .green[\+] not just Python packages -- .green[\+] ensures that all dependencies are compatible -- .red[\-] `conda install` tends to be slower than `pip install` -- .red[\-] environments can become quite large on disk and/or cause I/O issues -- .red[\-] more limited number of Python packages available --- # Best practices for reproducibility **pip** -- use a requirements.txt ```sh pip freeze > requirements.txt ``` -- **conda** -- use an environment.yml file ```sh conda env export > environment.yml ``` -- **even better** -- use a container See our [training on using containers on Savio](https://docs-research-it.berkeley.edu/services/high-performance-computing/getting-help/training-and-tutorials/#savio-introduction-to-containers-on-savio-creating-reproducible-scalable-and-portable-workflows). - .green[\+] Prevents I/O issues on the system - .green[\+] Very portable - .red[\-] more of a learning curve --- .center[![Open OnDemand Logo](images/ood-logo.png)] Open OnDemand (OOD) allows various interactive applications to access the full Savio infrastructure, including: - Jupyter Notebooks (our focus) - RStudio - Matlab - VS Code For more info on the OOD project: https://openondemand.org/ --- # Jupyter Notebooks overview Jupyter Notebooks provide an interactive environment for writing and executing Python code, visualizing results, and creating documentation. A Jupyter Notebook can serve as a reproducible record for your analysis. Let's take a quick look at Jupyter on OOD now: https://ood.brc.berkeley.edu .center[![Open On Demand on Savio](images/ood.png)] --- ## Using Jupyter on Savio partitions via Slurm Select appropriate Slurm flags: .center[![Slurm config for Jupyter server](images/jupyter-slurm.png)] --- ## Connect to Jupyter .center[![Interactive sessions](images/my-interactive-sessions.png)] Clicking on the **Session ID** will take you to the a directory with scripts and logs for the session at `~/ondemand/data/sys/dashboard/batch_connect/sys/brc_jupyter-compute/output/
`. --- ## The Jupyter file tree .center[![Jupyter file tree](images/jupyter-tree.png)] You can switch to Jupyter Lab by replacing everything from `/tree` with `/lab` in the URL. --- ## Selecting and changing the notebook kernel .left-column[![New Jupyter notebook](images/new-notebook.png)] -- .right-column[![Change notebook kernel](images/change-kernel.png)] --- ## Some OOD troubleshooting tips - If you're having trouble getting the Slurm flags right when starting an OOD session, try running an analogous [interactive job](https://docs-research-it.berkeley.edu/services/high-performance-computing/user-guide/running-your-jobs/submitting-jobs/#interactive-jobs) from the command line. -- - If running via Slurm, you can see the status of your job with `squeue -u $USER` (or `sq`) as with a usual Slurm job. ```sh [jpduncan@ln001 ~]$ squeue -u $USER JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON) 10080336 savio2 OOD_batc jpduncan R 54:45 1 n0090.savio2 ``` -- - If you see that OOD is repeatedly giving you an authentication window, you need to **close any other browser tabs using OOD**, or go back to your previous tab. -- - Compute on shared Jupyter node for debugging/exploration is limited to **1 core** and **2 GB memory**. --- # Jupyter kernels Every Jupyter Notebook uses a kernel, which provides programming language support. In particular, the default IPython kernel allows Jupyter Notebooks to run Python. -- On Savio we have a number of Jupyter kernels available by default: - Python 2.7, 3.6, 3.7 - TensorFlow kernels - pyspark -- See here for a list of kernels for other programming languages that may work on Savio: https://github.com/jupyter/jupyter/wiki/Jupyter-kernels --- # Making your own Jupyter kernel If the default kernels don't meet your needs, you can create your own. There are various methods, but a straightforward one is to use an environment. -- We could use conda to create an environment which we then convert to an ipykernel: ```sh module load python conda create -n py39 python=3.9 ipykernel source activate py39 python -m ipykernel install --user --name=py39 --display-name="Python 3.9" source deactivate ``` -- .blue[NOTE]: If you don't need a different version of Python from what the system provides, then using a `virtualenv` may take less space on disk than a conda environment. --- ## Modifying your kernels to use system modules Say we want to create an kernel that uses the `ml/tensorflow/2.5.0-py37` module. ```sh module load ml/tensorflow/2.5.0-py37 python -m ipykernel install --user --name=tf25 --display-name="Python3.7 TF2.5.0" ``` -- This creates `~/.local/jupyter/kernels/tf25/kernel.json`: ```json { "argv": [ "/global/software/sl-7.x86_64/modules/langs/python/3.7/bin/python", "-m", "ipykernel_launcher", "-f", "{connection_file}" ], "display_name": "Python3.7 TF-2.5.0", "language": "python" } ``` We'll need to edit this file so that the kernel has access to module and its dependencies. --- We'll add a dictionary called `"env"` with environmental variables to set when starting up the kernel. -- You'll want to see `env` with environmental variables as they are after you run `module load ml/tensorflow/2.5.0-py37`. To see what it does, run `module show ml/tensorflow/2.5.0-py37`. In this case, we'll copy paste these variables to the `env` dictionary: ```sh echo $PATH echo $LD_LIBRARY_PATH echo $PYTHONPATH ``` -- In my case, the final `kernel.json` looks like this: ```json { "argv": [ "/global/software/sl-7.x86_64/modules/langs/python/3.7/bin/python", "-m", "ipykernel_launcher", "-f", "{connection_file}" ], "display_name": "Python3.7 TF-2.5.0", "language": "python", "env": { "PATH": "/global/software/sl-7.x86_64/modules/apps/ml/tensorflow/2.5.0-py37/bin:/global/software/sl-7.x86_64/modules/langs/cuda/11.2/bin:/global/software/sl-7.x86_64/modules/langs/python/3.7/bin:/global/software/sl-7.x86_64/modules/langs/python/3.7/condabin:/usr/local/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/global/home/groups/allhands/bin:/global/home/users/jpduncan/bin", "LD_LIBRARY_PATH": "/global/software/sl-7.x86_64/modules/cuda/11.2/cudnn/8.1.1/lib64:/global/software/sl-7.x86_64/modules/langs/cuda/11.2/lib64/stubs:/global/software/sl-7.x86_64/modules/langs/cuda/11.2/lib64", "PYTHONPATH" : "/global/software/sl-7.x86_64/modules/apps/ml/tensorflow/2.5.0-py37/lib/python3.7/site-packages" } } ``` --- Now I can use my new TensorFlow kernel on a GPU node. .center[![Jupyter with TensorFlow kernel](images/jupyter-tf.png)] --- # Basic parallelization using ipyparallel We'll start by using the IPython parallel package (`ipyparallel`), which allows one to parallelize on a single machine or across multiple machines. `ipyparallel` allows one to easily split up tasks that can be computed independently and send those tasks out to Python worker processes to run in parallel. The results are then collected back on the main Python process. Since the workers are separate Python processes, we need to start up those processes, either from within Python (possible as of version 7 of ipyparallel) or from the command line. --- # Starting workers on one node ``` ## In newer versions of ipyparallel (v. 7 and later) import ipyparallel as ipp ipp.__version__ cluster = ipp.Cluster(n=4) cluster.start_cluster_sync() ``` ``` c = cluster.connect_client_sync() ## Check that we have the number of workers expected: c.ids ``` Side note: You can also start the workers from the command line (this was required in older versions of ipyparallel, probably before version 7). ``` # in the shell ipcluster start -n 4 ``` ``` # in python import ipyparallel as ipp c = ipp.Client() c.ids ``` --- # Testing our cluster Let's just verify that things seem set up ok and we can interact with all our workers: ``` dview = c[:] dview ## Set blocking so that we wait for the result of the parallel execution dview.block = True dview.apply(lambda : "Hello, World") ``` `dview` stands for a 'direct view', which is an interface to our cluster that allows us to 'manually' send tasks to the workers. --- # Parallelized machine learning example Now let's see an example of how we can use our workers to run code in parallel. We'll carry out a statistics/machine learning prediction method (random forest regression) with leave-one-out cross-validation, parallelizing over different held out data. First let's set up packages, data and our main function on the workers: ``` dview.execute('from sklearn.ensemble import RandomForestRegressor as rfr') dview.execute('import numpy as np') def looFit(index, Ylocal, Xlocal): rf = rfr(n_estimators=100) fitted = rf.fit(np.delete(Xlocal, index, axis = 0), np.delete(Ylocal, index)) pred = rf.predict(np.array([Xlocal[index, :]])) return(pred[0]) import numpy as np np.random.seed(0) n = 500 p = 50 X = np.random.normal(0, 1, size = (n, p)) Y = X[: , 0] + pow(abs(X[:,1] * X[:,2]), 0.5) + X[:,1] - X[:,2] + np.random.normal(0, 1, n) mydict = dict(X = X, Y = Y, looFit = looFit) dview.push(mydict) ``` --- # Parallelized machine learning example (2) Now let's set up a "load-balanced view". With this type of interface, one submits the tasks and the controller decides how to divide up the tasks, ideally achieving good load balancing. A load-balanced computation is one that keeps all the workers busy throughout the computation ``` lview = c.load_balanced_view() lview.block = True # We'll leave out only 50 observations, for the sake of time nSub = 50 # need a wrapper function because map() only operates on one argument def wrapper(i): return(looFit(i, Y, X)) # Now run the fitting, predicting on each held-out observation: import time pred = lview.map(wrapper, range(nSub)) pred ``` --- # Using multiple nodes When using multiple nodes, we will generally start up a controller process on the first node that Slurm gives us and then use `srun` (within our running `srun` or `sbatch` session) to start up workers on the nodes. First we'll start our Slurm job using `sbatch` or `srun`. Then once the job starts (if using `srun`) or within our submission script (if using `sbatch`): ``` ipcontroller --ip='*' & sleep 30 ## The next line starts one worker per SLURM task ## (which should equal the number of cores) srun ipengine & sleep 45 # wait until all engines have successfully started ``` At this point you should be able to connect to the running cluster using the syntax seen for single-node usage. .blue[WARNING:] Be careful to set the sleep period long enough that the controller starts before trying to start the workers and the workers start before trying to connect to the workers from within Python. --- # Using ipyparallel in a Jupyter notebook As of our recent upgrade to version 7 of the ipyparallel package, you can start the ipyparallel workers from within Python and therefore from within your notebook. This is the same as we just saw in our basic Python session. Previously, one needed to either start the workers via the "IPython Clusters" tab of the Jupyter interface or start the workers using `ipcluster start` from a terminal session and then connecting to them from the notebook. As before, the number of workers should generally match the number of cores you requested (e.g., on `savio2_htc`) or (ideally) the number of cores on the machine (for partitions allocated on a per node basis). If you've started the workers outside of Python, you need to connect to the running cluster. As before with the ipyparallel examples, 'c' gives us a 'handle' object. ``` import ipyparallel as ipp c = ipp.Client(profile='default', cluster_id='') c.ids # this should show the number of workers equal to the number you requeste ``` # Multiple nodes This may be possible. Feel free to ask us and we can explore it further. --- # Dask overview Python's Dask package provides powerful tools for parallelizing computations on a single machine or across multiple machines. In addition to providing tools that allow you to parallelize independent computations as we did with `ipyparallel`, Dask also allows you to run computations across datasets in parallel using distributed data objects. There's lots of information about Dask online, including [this tutorial](https://github.com/berkeley-scf/tutorial-dask-future) prepared by Chris Paciorek. --- # Distributed data in Dask The idea here is to split up large datasets into chunks (also called 'partitions' or 'shards') and operate in parallel on those chunks. This generally assumes that / works best when the operations can be done independently on the chunks and limited information needs be communicated between chunks. Some advantages of this are: - increasing speed by using multiple cores to process the data. - when using multiple nodes, increasing the amount of total memory that can be used by having the data split across the nodes. # Dask's distributed data types Some of the key types of distributed data objects in Dask are: - distributed dataframes - each chunk is a Pandas dataframe - distributed arrays - each subset of the array is a Numpy array - bags - distributed lists, where each chunk contains some of the elements --- # Dask's 'schedulers' Dask can set up the parallel workers in a variety of ways, which are called schedulers. Here are the options: |Type|Description|Multi-node|Copies of objects made?| |----|-----------|----------|-----------------------| |synchronous|not in parallel|no|no| |threaded|threads within current Python session|no|no| |processes|background Python sessions|no|yes| |distributed|Python sessions across multiple nodes|yes or no|yes| .blue[NOTE:] Because of Python's Global Interpreter Lock (GIL), many computations done in pure Python code won't be parallelized using the 'threaded' scheduler; however computations on numeric data in numpy arrays, Pandas dataframes and other C/C++/Cython-based code will parallelize. Here's an example of setting the 'processes' scheduler for a computation: ``` import dask.multiprocessing # spread work across multiple cores, one worker per core dask.config.set(scheduler='processes', num_workers = 4) ``` --- # Dask bag example - context Let's read in a bunch of data from multiple files and put it into a Dask bag. Here's the format of the input files. They're actually tabular type data (i.e., we can think of rows as observations and columns (space separated here) as fields/variables. But in working with the data using a bag, we'll treat each row as arbitrary observation that is simply a string. Let's get a sense for the data first: ``` gzip -cd /global/scratch/users/paciorek/wikistats_small/dated/part-00000.gz | head -n 5 ``` The data are the number of visits to a given Wikipedia page on a given hour of a given day, in a given language. The fields are {day, hour, language, page, num_visits, page_size}. --- # Dask bag example - initial processing ``` import dask.bag as db wiki = db.read_text('/global/scratch/users/paciorek/wikistats_small/dated/part-0000*') # How many partitions (for zipped files, this will be one per file) wiki.npartitions wiki # dask.bag
# Count the number of elements wiki.count().compute() import re def find(line, regex = "Obama", language = "en"): vals = line.split(' ') if len(vals) < 6: return(False) tmp = re.search(regex, vals[3]) if tmp is None or (language != None and vals[2] != language): return(False) else: return(True) # Filter to the pages related to Barack Obama and count them wiki.filter(find).count().compute() ``` # Lazy execution and pipelines Note that Dask executes lazily. The call to `read_text` doesn't read the data in and `wiki` is just an abstract placeholder for the actual data. Only when we run `compute()` (or a few other operations that indicate the user wants output) is the "pipeline" actually run. Note that if you call `compute()` twice, the entire dataset will get read in twice. So it's good to combine multiple operations into a single pipeline. Here's an example: ``` total_cnt =wiki.count() obama = wiki.filter(find) obama_cnt = obama.count() (obama, obama_cnt, total_cnt) = db.compute(obama, obama_cnt, total_cnt) total_cnt obama[0:5] ``` --- # Extending the example Here's a pipeline where we read the data in, extract the Obama-related entries, and aggregate it to count the number of Obama-related visit for each day-hour combination. To do this, we convert the Dask bag to a Dask dataframe and then using dataframe aggregation operations (the same syntax as in Pandas). ``` def make_tuple(line): return(tuple(line.split(' '))) dtypes = {'date': 'object', 'time': 'object', 'language': 'object', 'webpage': 'object', 'hits': 'float64', 'size': 'float64'} # df is a Dask dataframe df = obama.map(make_tuple).to_dataframe(dtypes) obama_stats = df.groupby(['date','time']).hits.sum().compute() # 'obama_stats' is a Pandas object obama_stats.to_csv('obama_stats.csv') ``` --- # Overview of Ray Ray provides many tools for parallel computation, including remote functions (tasks) and remote classes (actors). ```sh conda install ray-core --channel conda-forge pip install --user ray ``` To start Ray on a single node: ```py import ray ray.init() # this use all available cores ray.init(num_cpus=4, num_gpus=2) # or tell Ray exactly what you want to use ``` Ray **Tasks** allow you to parallelize your functions via decorators: ```py @ray.remote() def my_function(x): return x * x ``` To call a remote function, use `fun_name.remote()`: ```py futures = [my_function.remote(i) for i in range(4)] print(ray.get(futures)) # [0, 1, 4, 9] ``` --- # Parallel classes Ray **Actors** let you parallelize all the methods of a Python class: ```py @ray.remote class Counter(object): def __init__(self, init_val=0): self.n = init_val def increment(self, multiplier = 1): self.n += 1 * muliplier def read(self): return self.n ``` Use `my_obj = ClassName.remote()` to initialize a new instance and `my_obj.method_name.remote()` to call ```py # initialize counters = [Counter.remote(i) for i in range(4)] # call a mutating method [c.increment.remote(2) for c in counters] # call an accessor method futures = [c.read.remote() for c in counters] # get the value print(ray.get(futures)) # [2, 3, 4, 5] ``` --- # Ray object store .center[![Ray object store](images/ray-vs-dask.png)] Compared to Dask, Ray has the advantage of a **shared-memory object store**, which allows workers on the same node to reuse memory. .blue[NOTE]: Ray Data (in beta as of Ray 1.8) allows for interop with Dask collections (such as distributed DataFrames and Bags) by providing a Ray scheduler for Dask (`dask_on_ray`). .footnote[Source: https://www.anyscale.com/blog/analyzing-memory-management-and-performance-in-dask-on-ray] --- # Multi-node Ray It's also possible to run Ray across nodes using Slurm. The Ray docs provide examples and helper utilities to create `sbatch` scripts that will set up a multinode Ray cluster via Slurm. See here: https://docs.ray.io/en/latest/cluster/slurm.html --- # Other Ray projects There are additional Ray libraries in various stages of development that could be worth checking out: - [Datasets](https://docs.ray.io/en/latest/data/dataset.html): support for many data input formats and distributed DataFrame and basic distributed data transformations (e.g., map, filter, and repartition). - [Tune](https://docs.ray.io/en/latest/tune/index.html): distributed hyperparameter tuning for ML frameworks - [RLlib](https://docs.ray.io/en/latest/rllib.html): scalable reinforcement learning and unified API for different frameworks (e.g. TensorFlow and PyTorch). - [Workflows](https://docs.ray.io/en/latest/workflows/concepts.html): Adds durability to Ray dynamic task graphs, appropriate for large-scale ML pipelines or long-running applications. --- # Getting help - **Weekly office hours**: Wednesdays from 1:30-3pm and Thursdays from 9:30-11am (see https://research-it.berkeley.edu/consulting for Zoom link) - **Past trainings**: https://tinyurl.com/savio-trainings - **FAQs**: https://tinyurl.com/savio-faqs - **Savio ticketing system**: email us at brc-hpc-help@berkeley.edu --- class: center, middle # Thanks!