Dask Arrays with Xarray
The scientific Python package known as Dask provides Dask Arrays: parallel, larger-than-memory, n-dimensional arrays that make use of blocked algorithms. They are analogous to Numpy arrays, but are distributed. These terms are defined below:
Parallel code uses many or all of the cores on the computer running the code.
Larger-than-memory refers to algorithms that break up data arrays into small pieces, operate on these pieces in an optimized fashion, and stream data from a storage device. This allows a user or programmer to work with datasets of a size larger than the available memory.
A blocked algorithm speeds up large computations by converting them into a series of smaller computations.
In this tutorial, we cover the use of Xarray to wrap Dask arrays. By using Dask arrays instead of Numpy arrays in Xarray data objects, it becomes possible to execute analysis code in parallel with much less code and effort.
Learning Objectives
Learn the distinction between eager and lazy execution, and performing both types of execution with Xarray
Understand key features of Dask Arrays
Learn to perform operations with Dask Arrays in similar ways to performing operations with NumPy arrays
Understand the use of Xarray
DataArrays
andDatasets
as “Dask collections”, and the use of top-level Dask functions such asdask.visualize()
on such collectionsUnderstand the ability to use Dask transparently in all built-in Xarray operations
Prerequisites
Concepts |
Importance |
Notes |
---|---|---|
Necessary |
Familiarity with Data Arrays |
|
Necessary |
Familiarity with Xarray Data Structures |
Time to learn: 30-40 minutes
Imports
For this tutorial, as we are working with Dask, there are a number of Dask packages that must be imported. Also, this is technically an Xarray tutorial, so Xarray and NumPy must also be imported. Finally, the Pythia datasets package is imported, allowing access to the Project Pythia example data library.
import dask
import dask.array as da
import numpy as np
import xarray as xr
from dask.diagnostics import ProgressBar
from dask.utils import format_bytes
from pythia_datasets import DATASETS
Blocked algorithms
As described above, the definition of “blocked algorithm” is an algorithm that replaces a large operation with many small operations. In the case of datasets, this means that a blocked algorithm separates a dataset into chunks, and performs an operation on each.
As an example of how blocked algorithms work, consider a dataset containing a billion numbers, and assume that the sum of the numbers is needed. Using a non-blocked algorithm, all of the numbers are added in one operation, which is extremely inefficient. However, by using a blocked algorithm, the dataset is broken into chunks. (For the purposes of this example, assume that 1,000 chunks are created, with 1,000,000 numbers each.) The sum of the numbers in each chunk is taken, most likely in parallel, and then each of those sums are summed to obtain the final result.
By using blocked algorithms, we achieve the result, in this case one sum of one billion numbers, through the results of many smaller operations, in this case one thousand sums of one million numbers each. (Also note that each of the one thousand sums must then be summed, making the total number of sums 1,001.) This allows for a much greater degree of parallelism, potentially speeding up the code execution dramatically.
dask.array
contains these algorithms
The main object type used in Dask is dask.array
, which implements a subset of the ndarray
(NumPy array) interface. However, unlike ndarray
, dask.array
uses blocked algorithms, which break up the array into smaller arrays, as described above. This allows for the execution of computations on arrays larger than memory, by using parallelism to divide the computation among multiple cores. Dask manages and coordinates blocked algorithms for any given computation by using Dask graphs, which lay out in detail the steps Dask takes to solve a problem. In addition, dask.array
objects, known as Dask Arrays, are lazy; in other words, any computation performed on them is delayed until a specific method is called.
Create a dask.array
object
As stated earlier, Dask Arrays are loosely based on NumPy arrays. In the next set of examples, we illustrate the main differences between Dask Arrays and NumPy arrays. In order to illustrate the differences, we must have both a Dask Array object and a NumPy array object. Therefore, this first example creates a 3-D NumPy array of random data:
shape = (600, 200, 200)
arr = np.random.random(shape)
arr
array([[[0.59882044, 0.89038597, 0.75264685, ..., 0.28575361,
0.77133205, 0.49917285],
[0.90349066, 0.11729641, 0.4646083 , ..., 0.31830512,
0.4682105 , 0.09964511],
[0.59601851, 0.1393428 , 0.1451719 , ..., 0.68692779,
0.19844283, 0.23358143],
...,
[0.58004553, 0.10514537, 0.94488101, ..., 0.02312172,
0.61362743, 0.97977446],
[0.52031164, 0.01479604, 0.64741797, ..., 0.86144959,
0.3383711 , 0.05842095],
[0.0522633 , 0.84912887, 0.14741265, ..., 0.840991 ,
0.2148789 , 0.46491597]],
[[0.8670142 , 0.4173522 , 0.29488315, ..., 0.62832042,
0.31158513, 0.98188514],
[0.54180686, 0.58585186, 0.6396745 , ..., 0.29762375,
0.35190295, 0.27685184],
[0.05604971, 0.55171283, 0.69862599, ..., 0.8649599 ,
0.31128111, 0.78148618],
...,
[0.98084697, 0.63897734, 0.23643916, ..., 0.85527493,
0.27517906, 0.10073066],
[0.13097734, 0.05187221, 0.62567868, ..., 0.34100082,
0.24904174, 0.24744537],
[0.83587677, 0.93942928, 0.87699588, ..., 0.26106564,
0.48093972, 0.14538829]],
[[0.81780786, 0.83584004, 0.95076377, ..., 0.93660647,
0.96552286, 0.89119864],
[0.14571403, 0.82362962, 0.79980835, ..., 0.19511307,
0.46299543, 0.91160124],
[0.94889911, 0.57237988, 0.93493412, ..., 0.62429476,
0.86452692, 0.5443831 ],
...,
[0.87222497, 0.65459307, 0.82116169, ..., 0.70790006,
0.70011525, 0.10014658],
[0.13599426, 0.61554204, 0.13890904, ..., 0.97463388,
0.64439201, 0.89941897],
[0.33958974, 0.94622266, 0.41707807, ..., 0.3110441 ,
0.22638421, 0.63377419]],
...,
[[0.02999301, 0.0263648 , 0.27584748, ..., 0.42909397,
0.14590959, 0.24784125],
[0.11041574, 0.80616766, 0.09298374, ..., 0.3421373 ,
0.42563484, 0.64630186],
[0.52209463, 0.41091542, 0.10486451, ..., 0.25035955,
0.93942855, 0.66156348],
...,
[0.64340089, 0.39739298, 0.8019468 , ..., 0.93968375,
0.06318134, 0.36837465],
[0.75963579, 0.49378495, 0.30624372, ..., 0.61438427,
0.30653086, 0.14667532],
[0.43440798, 0.63575079, 0.69847263, ..., 0.60040108,
0.30731612, 0.81611883]],
[[0.1283456 , 0.87703892, 0.23752744, ..., 0.07754181,
0.93005294, 0.53346445],
[0.43698266, 0.61582401, 0.02379739, ..., 0.65570026,
0.97293592, 0.66378911],
[0.73203301, 0.70717493, 0.36481202, ..., 0.86921607,
0.43639917, 0.47504516],
...,
[0.12499336, 0.66581081, 0.30877271, ..., 0.98772465,
0.16656457, 0.77863969],
[0.87453784, 0.4903757 , 0.68990367, ..., 0.29029562,
0.94981556, 0.23970464],
[0.17938946, 0.78932973, 0.51488989, ..., 0.63125446,
0.70059609, 0.71615045]],
[[0.09134313, 0.56968382, 0.25261009, ..., 0.47449169,
0.5028163 , 0.18931504],
[0.59257282, 0.79706897, 0.61743135, ..., 0.43450106,
0.85023722, 0.37430368],
[0.65896213, 0.00747606, 0.72997344, ..., 0.53380521,
0.61987468, 0.81783034],
...,
[0.20905158, 0.5007961 , 0.99068535, ..., 0.19769782,
0.06665382, 0.84617634],
[0.04103984, 0.70854959, 0.42010955, ..., 0.65786467,
0.44327821, 0.4428599 ],
[0.33840587, 0.79426061, 0.79334088, ..., 0.47697519,
0.41063649, 0.38140662]]])
format_bytes(arr.nbytes)
'183.11 MiB'
As shown above, this NumPy array contains about 183 MB of data.
As stated above, we must also create a Dask Array. This next example creates a Dask Array with the same dimension sizes as the existing NumPy array:
darr = da.random.random(shape, chunks=(300, 100, 200))
By specifying values to the chunks
keyword argument, we can specify the array pieces that Dask’s blocked algorithms break the array into; in this case, we specify (300, 100, 200)
.
Specifying Chunks
In this tutorial, we specify Dask Array chunks in a block shape. However, there are many additional ways to specify chunks; see this documentation for more details.