Big Data Exploratory With Dask On Kubernetes
Dask is a parallel analytical computing library that implements many of the pandas API, built to aid the online (as opposed to batch) “big data” analytics. When a dataset is big enough that no longer to fit in memory, the Python process crashes if it were load through pandas read_csv API, while dask handles this through truncated processes.
However, running Dask on your laptop does not completely resolve the problem, the amount of processing also the IO quickly becomes the main bottle-neck. To really take advantage of Dask, one has to scale (Dask) out.
Kubernetes is an open source container orchestration system. It’s known for its high velocity in continuously deploying and scaling applications in developers’ realm. It makes much good runtime environment for dask.
The full deployment consists of three parts.
- A notebook instance
- A Dask scheduler
- Many Dask workers
The full how to use is published on Github https://github.com/lihan/dask-kubernetes
The dask scheduler colocates with the notebook instance and is launched in Python code. It lets you define the size of each worker, the image the worker runs, and how many copies of the workers to run. The Kubernetes will schedule those workers (Pods) onto available resources.
We will run the exact same image on both the scheduler as well as workers to make things more consistent. The only exception is that the worker nodes are not running Jupyter notebook process. They share the exact same Python dependencies.
The Kubernetes deployment file
The deployment configuration below will load my image and runs one copy in the Kubernete cluster. The two things need to note is that since the Dask worker(Pod) is created through Python, later on, you need to have a service account that has permission to create/delete/list Pods. You can do so through this file https://github.com/lihan/dask-kubernetes/blob/master/kube/20-service-account.yaml
Access notebook instance
After the setup is completed (make sure follow this link). You should be able to see a running notebook instance in your kubernetes cluster.
List all running Pods by executing
kubectl get po
Since by default all Pods are not accessible through outside the cluster. We also not interested to setup a load balancer just to access this one application.
We’ll connect by port forwarding.
kubectl port-forward <notebook-instance-name> 8888:8888
http://localhost:8888 to access your notebook instance
So here, we will need to define the Pod config file and call the KubeCluster API in order to launch workers.
from dask_kubernetes import KubeCluster yaml= ''' kind: Pod metadata: labels: role: worker spec: restartPolicy: Never containers: - image: lihan/dask-ml:v5 imagePullPolicy: Always args: [dask-worker, --nthreads, '1', --no-bokeh, --memory-limit, 1.5GB, --death-timeout, '60'] name: dask-worer resources: requests: cpu: "600m" memory: "1800Mi" limits: cpu: "600m" memory: "1800Mi" ''' f = open('worker.yaml', 'w') f.write(yaml) f.close() cluster = KubeCluster.from_yaml('worker.yaml')
Creating the client that uses the cluster
from dask.distributed import Client client = Client(cluster) client
You will notice a nice Jupyter widget show up, lets you to enter the number of workers.
When you launched dask scheduler above, it also launches a dashboard which lets you visualise the current workload. To access it, run
kubectl port-forward <notebook-instance-name> 8787:8787 (You will notice the port 8787) on deployment file.
Once you have some kind of Dask operation going on, you will see something like below:
One of the biggest down side I found so far is the I/O. For my 100G dataset that hosted on Google Cloud Storage, the I/O throughput drops sharply after minute of sustained reads. (I can see the
getItem task gets slow) Perhaps reading the data onto a fast SSD to cache data should be done first before analytics work. The other idea is to save the big files into smaller trunks. Dask understands the file masks which let you to read all files by filename patterns. It should make the data distribution onto worker nodes much faster.
The second limitation that’s quite affecting is that it lacks some of the pandas API support. Pandas handles split and combine tasks beautifully with its
transform APIs. However, one of my frequently used feature is not supported
groupby('col')['col'].transform('func'). It leaves me to work around it and sometimes create less readable code.
Dask and Kubernetes work together to make the data analytics go a mile further. In reality, much of the dataset are beyond what a single laptop can handle well. Despite the lack of full Pandas API support, it’s still a prime choice for many of the big data exploratory tasks.