Big Data Exploratory With Dask On Kubernetes

Dask is a parallel analytical computing library that implements many of the pandas API, built to aid the online (as opposed to batch) “big data” analytics. When a dataset is big enough that no longer to fit in memory, the Python process crashes if it were load through pandas read_csv API, while dask handles this through truncated processes.

However, running Dask on your laptop does not completely resolve the problem, the amount of processing also the IO quickly becomes the main bottle-neck. To really take advantage of Dask, one has to scale (Dask) out.

Kubernetes

Kubernetes is an open source container orchestration system. It’s known for its high velocity in continuously deploying and scaling applications in developers’ realm. It makes much good runtime environment for dask.

Deployment Architecture

The full deployment consists of three parts.

The full how to use is published on Github https://github.com/lihan/dask-kubernetes

The dask scheduler colocates with the notebook instance and is launched in Python code. It lets you define the size of each worker, the image the worker runs, and how many copies of the workers to run. The Kubernetes will schedule those workers (Pods) onto available resources.

We will run the exact same image on both the scheduler as well as workers to make things more consistent. The only exception is that the worker nodes are not running Jupyter notebook process. They share the exact same Python dependencies.

The Kubernetes deployment file

The deployment configuration below will load my image and runs one copy in the Kubernete cluster. The two things need to note is that since the Dask worker(Pod) is created through Python, later on, you need to have a service account that has permission to create/delete/list Pods. You can do so through this file https://github.com/lihan/dask-kubernetes/blob/master/kube/20-service-account.yaml

Access notebook instance

After the setup is completed (make sure follow this link). You should be able to see a running notebook instance in your kubernetes cluster.

List all running Pods by executing kubectl get po

Since by default all Pods are not accessible through outside the cluster. We also not interested to setup a load balancer just to access this one application.

We’ll connect by port forwarding.

kubectl port-forward <notebook-instance-name> 8888:8888

Then hit http://localhost:8888 to access your notebook instance

Launch workers

So here, we will need to define the Pod config file and call the KubeCluster API in order to launch workers.

from dask_kubernetes import KubeCluster

yaml= '''
kind: Pod
metadata:
  labels:
    role: worker
spec:
  restartPolicy: Never
  containers:
  - image: lihan/dask-ml:v5
    imagePullPolicy: Always
    args: [dask-worker, --nthreads, '1', --no-bokeh, --memory-limit, 1.5GB, --death-timeout, '60']
    name: dask-worer
    resources:
      requests:
        cpu: "600m"
        memory: "1800Mi"
      limits:
        cpu: "600m"
        memory: "1800Mi"
'''
f = open('worker.yaml', 'w')
f.write(yaml)
f.close()

cluster = KubeCluster.from_yaml('worker.yaml')

Creating the client that uses the cluster

from dask.distributed import Client
client = Client(cluster)
client

png

You will notice a nice Jupyter widget show up, lets you to enter the number of workers.

Dask dashboard

When you launched dask scheduler above, it also launches a dashboard which lets you visualise the current workload. To access it, run kubectl port-forward <notebook-instance-name> 8787:8787 (You will notice the port 8787) on deployment file.

Once you have some kind of Dask operation going on, you will see something like below:

gif

Limitations

One of the biggest down side I found so far is the I/O. For my 100G dataset that hosted on Google Cloud Storage, the I/O throughput drops sharply after minute of sustained reads. (I can see the getItem task gets slow) Perhaps reading the data onto a fast SSD to cache data should be done first before analytics work. The other idea is to save the big files into smaller trunks. Dask understands the file masks which let you to read all files by filename patterns. It should make the data distribution onto worker nodes much faster.

The second limitation that’s quite affecting is that it lacks some of the pandas API support. Pandas handles split and combine tasks beautifully with its groupby, apply, transform APIs. However, one of my frequently used feature is not supported groupby('col')['col'].transform('func'). It leaves me to work around it and sometimes create less readable code.

Conclusion

Dask and Kubernetes work together to make the data analytics go a mile further. In reality, much of the dataset are beyond what a single laptop can handle well. Despite the lack of full Pandas API support, it’s still a prime choice for many of the big data exploratory tasks.

Happy Dasking…