source: quanta magazine

Building a Spark Job Scheduler on Kubernetes

Abhijit Mondal
18 min readJul 19, 2023

--

This post is a gist of our efforts in migrating from managed Spark in Synapse and Databricks to Kubernetes for our production workloads.

Without the nitty gritty of migration efforts, lets get into the details of how did we setup our Spark cluster and how our Spark application runs on K8s.

I will not get into the details of Testing, Monitoring, Logging etc. in this post but will cover the basic setup that will allow anybody to replicate the same to run Spark jobs on K8s.

The details will be kept for another post.

Step 1: Setup AKS

For the rest of this post we will assume the following:

  1. All shell commands are run on the Bash shell on Ubuntu 22.04
  2. Azure Container Registry (ACR) has been setup.
  3. Python v3.10+
  4. Azure CLI tool is installed.
  5. An AAD app (azure active directory) setup
  6. Azure subscription and a resource group set up with appropriate permissions (AKS Cluster Admin, RBAC Admin) for the AAD app.

Login using the az cli:

az login

Run the following command to create an AKS cluster:

az aks create \
--resource-group my_resource_group \
--name my_aks_cluster \
--node-count 3 \
--location eastus \
--enable-aad \
--enable-azure-rbac \
--enable-cluster-autoscaler \
--min-count 1 \
--max-count 20 \
--node-vm-size Standard_D4s_v3 \
--ssh-key-value ${HOME}/.ssh/id_rsa.pub

We are creating a K8s cluster with auto-scaling enabled and authentication using Azure Active Directory (AAD) app. Authorization is through RBAC.

Notice that we are also passing our ssh public key while creating the cluster. If this is not already created, then create the ssh key before running the above command.

How to Set Up SSH Keys on Ubuntu 22.04 | DigitalOcean

Next we need to create a node pool in our K8s cluster. These node pool will act as our spark cluster.

In AKS, when we create the cluster, by default it will create a node pool for K8s internal services like CoreDNS and metrics. This is the agent pool and this should not be used for running our applications.

az aks nodepool add \
--resource-group my_resource_group \
--cluster-name my_aks_cluster \
--name spark_node_pool \
--node-vm-size Standard_D4s_v3 \
--node-osdisk-size 128 \
--os-sku "Ubuntu" \
--enable-cluster-autoscaler \
--max-count 20 \
--max-pods 100 \
--min-count 0 \
--mode "User" \
--node-count 1

Notice the mode argument above which we set to be “User”.

The other configurations can be set as per requirements.

Step 2: Install and configure kubectl

Install kubectl

curl -LO "https://dl.k8s.io/release/$(curl -L \
-s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"

sudo install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl

The AKS credentials needs to be stored in a local config file in order to use the kubectl commands to access the cluster from local.

az aks get-credentials \
--resource-group my_resource_group \
--name my_aks_cluster \
--subscription my_subscription \
--admin \
--file "${HOME}/.kube/config"

chmod 600 "${HOME}/.kube/config"

In order to use the kubectl commands from local machine, apart from the credentials stored in the ~/.kube/config file, we also need to authenticate the device from where we are running the commands.

To authenticate the local machine, run the following command:

kubelogin convert-kubeconfig -l azurecli

May be need to install kubelogin if not installed.

# Install kubelogin

if [[ ! -f /usr/local/bin/kubelogin ]]; then
curl -fsSLO https://github.com/Azure/kubelogin/releases/download/v0.0.30/kubelogin-linux-amd64.zip
unzip -ojd /usr/local/bin kubelogin-linux-amd64.zip bin/linux_amd64/kubelogin
rm -f kubelogin-linux-amd64.zip
kubelogin --version
fi

In order for AKS to access the Container Registry, we need to create a secret in AKS, that will be used to pull container images form ACR:

kubectl create secret docker-registry my-secret \
--docker-server=username.azurecr.io \
--docker-username=aad_app_id \
--docker-password=aad_app_secret

For that, our AAD app needs to be assigned the role “AcrPull” in our ACR account.

# Get acr registry id
ACR_REGISTRY_ID=$(az acr show --name myacr --query id --output tsv)

# Assign AcrPull role to aad app in acr registry
az role assignment create \
--assignee-object-id aad_app_object_id \
--assignee-principal-type ServicePrincipal \
--role "AcrPull" \
--scope $ACR_REGISTRY_ID

Next we need to create a service account for our application and assign role to the account:

# Create k8s service account
kubectl create serviceaccount spark

# Assign edit role to service account
kubectl create clusterrolebinding spark-role \
--clusterrole=edit \
--serviceaccount=default:spark \
--namespace=default

Pods created in K8s are run under specific service accounts.

Thus, before submitting a request for a spark application deployment to the API server through the control plane, we need to create the service account and associate our deployment to this service account.

Then we assign the default cluster role “edit” to our newly created service account “spark”.

Note that all K8s resources are being created in the “default” namespace. Although we can create separate namespace for our application.

When the request for deploying a spark application is made, K8s creates a pod for the spark driver in the service account “spark” created above.

Next the driver will create the executor pods, for that the driver will require the “edit” role in the cluster scope. The executor pods are created in the same service account as the driver pod.

In order to use localhost as a proxy to our k8s master, run the following command:

nohup kubectl proxy >kproxy.txt 2>&1 &

This will start the process in background.

Step 3: Build spark from source

Assuming that Java 8+ is installed, set JAVA_HOME path:

echo -e "export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64" >> ~/.bashrc
source ~/.bashrc

I am having Java 11 installed.

Next pull spark source from github:

git clone https://github.com/apache/spark

Build the package with Kubernetes support using maven (this step will take some time):

sudo apt install maven
./build/mvn -Pkubernetes -DskipTests clean package

Step 4: Spark application architecture

Our spark application is sort of a Job Scheduler on top of Spark & k8s.

The directory structure is as follows:

aks-spark-scheduler
---- spark
folder container spark source as above

---- tasks
folder containing task specific files
---- __init__.py
---- spark_task.py
main file for creating SparkSession & SparkContext
---- helper_tasks.py
helper functions for tasks
---- some other files related to tasks, jars etc.

---- aks_executor.py
main application file invoked from spark driver

---- blob_storage.py
blob storage helper codes using python sdk

---- helpers.py
common helper methods like get credentials etc.

---- Dockerfile
base application dockerfile

---- requirements.txt
python packages required by base image

For more details, refer to the repository here: funktor/aks-spark-scheduler: Schedule Spark Jobs on Azure Kubernetes Service (AKS) (github.com)

The pyspark image is built using the Dockerfile inside the spark folder. Details of the dockerfile is provided in the next step.

The main application image is built using Dockerfile inside the root folder.

In order to run a spark job, follow the below steps:

  • Create the application files inside the tasks folder.
    Any helper functions, constants etc. should be defined inside the files or new files inside tasks folder.
  • The main task file should have a function named “run” that accepts the job_id.
    This method will be called after the container starts, that will subsequently run the job.
    Thus the run method is our entrypoint to the spark job.
  • Any inputs and outputs required by the tasks have to be handled within the application codes inside the tasks folder.
    For e.g. if an output file needs to be uploaded to user’s blob storage, then the blob storage url, credentials, uploading to container etc. has to be handled by the user.
  • Secrets can be used in 2 ways:
    1. Using azure key vault.
    2. By passing through environment variables.
  • In the aks_executor.py file, import the spark_task.py file from tasks folder and call its run method with the job id.

An example of tasks/spark_task.py file:

from pyspark.sql import SparkSession
import uuid, os
import tasks.helpers_tasks as ht
from tasks.blob_storage import Blob
import datetime
from pyspark.sql import functions as F
from pyspark.sql import DataFrame as sparkDataFrame

STORAGE_ACCOUNT='mystorage'
KEY_VAULT_URI='https://mykv.vault.azure.net/'
STORAGE_ACCOUNT_URL=f'{STORAGE_ACCOUNT}.blob.core.windows.net'

# Get from environment variables
APP_ID = os.getenv('APP_ID')
TENANT_ID = os.getenv('TENANT_ID')
APP_SECRET = os.getenv('APP_SECRET')

def get_azure_spark_connection():
# Get storage account key from key vault
accnt_key = \
ht\
.get_key_vault_secret(APP_ID,
TENANT_ID,
APP_SECRET,
KEY_VAULT_URI,
f"{STORAGE_ACCOUNT}-account-key")

# Init spark session
spark = \
SparkSession\
.builder\
.appName("NewSparkApp")\
.getOrCreate()

# Required for reading and writing files from/to blob storage
spark.sparkContext\
._jsc\
.hadoopConfiguration()\
.set(f"fs.azure.account.key.{STORAGE_ACCOUNT}.dfs.core.windows.net",
accnt_key)

return spark

def run(job_id:str):
# Get blob object
blob = Blob(STORAGE_ACCOUNT_URL, APP_ID, TENANT_ID, APP_SECRET)

start = \
datetime.datetime.today()\
-datetime.timedelta(days=60)

end = datetime.datetime.today()

output_file = f'output-{job_id}.tsv'
spark = get_azure_spark_connection()

# Get all files in blob storage partitioned by date from start to end date range
# Files are partitioned by <year>/<month>/data_<year>_<month>_<day>.csv

curr = start
paths = []

while curr < end:
h = datetime.datetime.strftime(curr, "%Y/%m/data_%Y_%m_%d.csv")
blob_path = f"inputs/{h}"

file_url = \
f"abfss://mycontainer@{STORAGE_ACCOUNT}.dfs.core.windows.net/{blob_path}"

# Check if the blob exists or not
exists = blob.blob_exists("mycontainer", blob_path)
if exists:
paths += [file_url]

curr = curr + datetime.timedelta(days=1)

# Read all the files with the same schema
df:sparkDataFrame = \
spark\
.read\
.format("csv")\
.option("inferSchema", "true")\
.option("header", "true")\
.option("sep", ",")\
.csv(paths)

# Calculate sum of total quantity grouped by 1st 4 columns of dataframe
df:sparkDataFrame = \
df\
.groupBy("Col_A",
"Col_B",
"Col_C",
"Col_D")\
.agg(F.sum("TotalQuantity").alias("TotalQuantity"))

# Save dataframe in single partition in blob storage
df\
.coalesce(1)\
.write\
.option("header", "true")\
.option("delimiter", ",")\
.csv(f"abfss://container@{STORAGE_ACCOUNT}.dfs.core.windows.net/outputs/{job_id}")

with open(output_file, "w") as f:
f.write('SUCCESS !!!')

return output_file

The above task requires to read multiple CSV files starting from 60 days ago till today from a folder in user’s blob storage that is partitioned by date.

Then calculate the sum of TotalQuantity grouped by 4 columns and upload the result to user’s blob storage again.

Note that when we use df.coalesce(1).write.csv() for saving a dataframe, we cannot use local storage because the file might be saved in an executor and when the job is done, executor pods are stopped by the driver, hence we cannot access the output file.

Hence we need to use some cloud storage like Blob or S3 etc. to save the dataframe.

Another solution is to use df.toPandas().to_csv() to first convert the spark dataframe to Pandas dataframe and save in CSV, it will save the file in the driver pod and hence can access the output file from driver pod.

But with large dataframes both the above methods can cause OOM errors.

To create your own tasks, edit these files or add new files inside tasks folder.

This is how the aks_executor.py file looks like (this file need not be changed):

from tasks import spark_task as mytask
import os, uuid
from blob_storage import Blob

if __name__ == '__main__':
job_id = str(uuid.uuid4())

blob_storage_url = os.getenv('BLOB_STORAGE_URL')
app_id = os.getenv('APP_ID')
tenant_id = os.getenv('TENANT_ID')
app_secret = os.getenv('APP_SECRET')
blob_storage_out_container = os.getenv('BLOB_STORAGE_OUT_CONTAINER')

# Initializing blob
blob = Blob(blob_storage_url,
app_id,
tenant_id,
app_secret)

print("Running tasks...")
output_file = mytask.run(job_id)

print("Uploading task logs...")
blob.upload_file_to_container(\
local_file_path=\
output_file,
output_container=blob_storage_out_container,
output_file_path=\
os.path.join(job_id, output_file)
)

print("Done")

Step 5: Build pyspark docker image in ACR

Inside the root folder (aks-spark-scheduler), create the Dockerfile with the following contents:

ARG base_img

FROM $base_img
WORKDIR /

# Reset to root to run installation tasks
USER 0

RUN mkdir ${SPARK_HOME}/python
RUN apt-get update && \
apt install -y wget python3 python3-pip && \
pip3 install --upgrade pip setuptools && \
rm -r /root/.cache && rm -rf /var/cache/apt/*

COPY spark/python/pyspark ${SPARK_HOME}/python/pyspark
COPY spark/python/lib ${SPARK_HOME}/python/lib

WORKDIR /opt/spark/work-dir

ENTRYPOINT [ "/opt/entrypoint.sh" ]

# Specify the User that the actual main process will run as
ARG spark_uid=185
USER ${spark_uid}

This dockerfile is for building the pyspark image from scratch. The file can also be found inside the folder:

spark/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/bindings/python/Dockerfile.

The above docker image is built using a custom script found inside spark/bin directory: docker-image-tool.sh

Build and push the base pyspark image:

sudo service docker start

./spark/bin/docker-image-tool.sh -r acr_registry -t latest -p ./Dockerfile build
./spark/bin/docker-image-tool.sh -r acr_registry -t latest push

The above steps creates and pushes the image in the following repository: myacr.azurecr.io/spark-py:latest

The idea behind having 2 separate images is that the pyspark image will not have to be edited often as these is generic. The application image (below) is the one that an user of the spark job is concerned with and thus it is expected that the codes will be updated often.

The base image can be re-used over and over again.

Step 5: Build and push task specific images

Build the main application image using the below dockerfile. (aks-spark-scheduler/Dockerfile):

ARG base_img

FROM $base_img

USER root

COPY requirements.txt /
RUN pip install -r /requirements.txt

# Download hadoop-azure, azure-storage, and dependencies (See above)
RUN wget --quiet https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/3.3.5/hadoop-azure-3.3.5.jar -O /opt/spark/jars/hadoop-azure-3.3.5.jar
RUN wget --quiet https://repo1.maven.org/maven2/com/microsoft/azure/azure-storage/7.0.1/azure-storage-7.0.1.jar -O /opt/spark/jars/azure-storage-7.0.1.jar
RUN wget --quiet https://repo1.maven.org/maven2/com/microsoft/azure/azure-keyvault-core/1.0.0/azure-keyvault-core-1.0.0.jar -O /opt/spark/jars/azure-keyvault-core-1.0.0.jar
RUN wget --quiet https://repo1.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.13/httpclient-4.5.13.jar -O /opt/spark/jars/httpclient-4.5.13.jar
RUN wget --quiet https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-util-ajax/9.4.48.v20220622/jetty-util-ajax-9.4.48.v20220622.jar -O /opt/spark/jars/jetty-util-ajax-9.4.48.v20220622.jar
RUN wget --quiet https://repo1.maven.org/maven2/org/wildfly/openssl/wildfly-openssl/1.1.3.Final/wildfly-openssl-1.1.3.Final.jar -O /opt/spark/jars/wildfly-openssl-1.1.3.Final.jar
RUN wget --quiet https://repo1.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.13/httpcore-4.4.13.jar -O /opt/spark/jars/httpcore-4.4.13.jar
RUN wget --quiet https://repo1.maven.org/maven2/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar -O /opt/spark/jars/commons-logging-1.1.3.jar
RUN wget --quiet https://repo1.maven.org/maven2/commons-codec/commons-codec/1.15/commons-codec-1.15.jar -O /opt/spark/jars/commons-codec-1.15.jar
RUN wget --quiet https://repo1.maven.org/maven2/org/eclipse/jetty/jetty-util/9.4.48.v20220622/jetty-util-9.4.48.v20220622.jar -O /opt/spark/jars/jetty-util-9.4.48.v20220622.jar
RUN wget --quiet https://repo1.maven.org/maven2/org/apache/hadoop/thirdparty/hadoop-shaded-guava/1.1.1/hadoop-shaded-guava-1.1.1.jar -O /opt/spark/jars/hadoop-shaded-guava-1.1.1.jar

ENV PATH=$PATH:/app
ENV PYTHONPATH /app

COPY *.jar /opt/spark/jars/
COPY aks_executor.py blob_storage.py helpers.py __init__.py /app/
ADD tasks /app/tasks

requirements.txt file contains list of python packages specific to tasks, also we should not add pyspark in requirements.txt because we are already building it from from source.

Also, any additional jars that is required by the task specific SparkSession is copied to /opt/spark/jars/ folder in the container. The SparkSession will pick up the jars from this location because we have specified the SPARK_HOME folder to be /opt/spark/

Build the derived image using our pyspark image as the base image:

sudo service docker start

docker build --build-arg "base_img=myacr.azurecr.io/spark-py:latest" -t aks-scheduler .
docker tag aks-scheduler myacr.azurecr.io/aks-scheduler
docker push myacr.azurecr.io/aks-scheduler

The main application file that will be executed when the spark job starts running is aks_executor.py.

In order to test whether the image is built correctly before using them in k8s, we can run a container and start a bash shell:

docker run -it myacr.azurecr.io/aks-scheduler:latest /bin/bash

Then we can run:

cd /opt/spark/bin
./spark-submit /app/aks_executor.py

Note that before running the above spark-submit command, we need to set the environment variables required by aks_executor.py file else the run would fail.

If there are no trivial errors like missing files or environment variables or python packages, then we are good to run on k8s.

Although expect OOM errors if the data size is large since we are running them on our local machine.

Step 6: spark-submit jobs

There are 2 ways to submit jobs to a Spark cluster on K8s:

  1. Using spark-submit command
  2. Using the spark-operator: sparkoperator.k8s.io
spark-submit to k8s cluster

In the 1st approach, we run spark-submit command as shown below, that will create the spark job in our k8s cluster:

./spark/bin/spark-submit \
--master k8s://http://127.0.0.1:8001 \
--deploy-mode cluster \
--name my-spark-job \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.file.upload.path='local:///opt/spark/jars' \
--jars structuredstreamforspark_2.12-3.0.0-2.0.0.jar \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.dynamicAllocation.minExecutors=1 \
--conf spark.dynamicAllocation.maxExecutors=20 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.container.image=myacr.azurecr.io/aks-scheduler:latest \
--conf spark.kubernetes.container.image.pullSecrets=my-secret \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--packages org.apache.hadoop:hadoop-azure:3.3.5 \
--conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" \
local:///app/aks_executor.py

The arguments are as follows:

--master k8s://http://127.0.0.1:8001
Kubernetes master node.
Note that for master we are referring to a local url. This is possible
because we have ran kubectl proxy earlier that proxies requests to the
API server from our localhost.

--deploy-mode cluster
We are running a spark cluster and not a standalone instance

--name my-spark-job
Name of spark job

--class org.apache.spark.examples.SparkPi
Default class

--conf spark.kubernetes.file.upload.path='local:///opt/spark/jars'
Path where any additional files such as jars, python packages
and zip files will be uploaded

--jars structuredstreamforspark_2.12-3.0.0-2.0.0.jar
Additional jars required by our tasks

--conf spark.dynamicAllocation.enabled=true
Autoscaling enabled

--conf spark.dynamicAllocation.minExecutors=1
Minimum number of executors set to 1

--conf spark.dynamicAllocation.maxExecutors=20
Maximum number of executors set to 20. These value should be at-most the
maximum number of nodes allowed in our pool we created earlier.

--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark
Service account for spark driver and executor pods

--conf spark.kubernetes.container.image=myacr.azurecr.io/aks-scheduler:latest
The docker image that will be pulled by k8s and run

--conf spark.kubernetes.container.image.pullSecrets=my-secret
AKS secret name to pull images from ACR

--conf spark.kubernetes.container.image.pullPolicy=Always
Everytime spark-submit is run, pull the image from ACR

--packages org.apache.hadoop:hadoop-azure:3.3.5
Additional packages that will be installed at runtime.

local:///app/aks_executor.py
The main application file that will be run on container start. The path
local: refers to path inside the container. Since our docker image
was built with a folder /app containing all our task related files.

With the spark-submit approach, there is no direct way to schedule the spark jobs if we want to run the spark_task every day.

One way is to create a cron that will run the spark-submit command, but running the cron on our local machine is unreliable and we must use additional services in Azure to do this.

One important thing to point out is that the aks_executor.py file accepts environment variables at runtime, but we have not included the variables in our spark-submit command.

One way by which we can include environment variables for spark executors is setting spark.executorEnv.key = value

But there is no simple way to do that for the driver node.

Another possibility is to use command line arguments but that might expose the secrets in the log files.

Since we have moved away from using spark-submit to using spark operator (below) which allows us to use kubernetes secrets, hence the aks_executor.py file reflects the latest changes only.

Another solution is to use kubernetes itself to run the spark-submit periodically. This can be accomplished using the spark-operator.

This is similar to deploying an application in kubernetes

In order to use the spark-operator, we need to first install spark-operator in our k8s cluster using Helm:

# Install Helm for Kubernetes
wget https://get.helm.sh/helm-v3.12.1-linux-amd64.tar.gz
tar xvf helm-v3.12.1-linux-amd64.tar.gz
sudo mv linux-amd64/helm /usr/local/bin

# Add spark-operator
helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator

# Install spark-operator in default namespace
helm install my-release spark-operator/spark-operator --set sparkJobNamespace=default

Then we create our deployment file as follows:

aks-manifest.yml

apiVersion: sparkoperator.k8s.io/v1beta2
kind: ScheduledSparkApplication
metadata:
name: spark-task
namespace: default
spec:
# Run the job every day at 3 AM
schedule: "0 3 * * *"
concurrencyPolicy: Allow
template:
type: Python
pythonVersion: "3"
mode: cluster
image: myacr.azurecr.io/aks-scheduler:latest
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: 'local:///app/aks_executor.py'
imagePullSecrets:
- my-secret
deps:
packages:
- org.apache.hadoop:hadoop-azure:3.3.5
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
sparkVersion: "3.4.2"
dynamicAllocation:
enabled: true
initialExecutors: 1
minExecutors: 1
maxExecutors: 20
volumes:
- name: "spark-local-dir-1"
hostPath:
path: "/tmp/mnt-1"
- name: "spark-local-dir-2"
hostPath:
path: "/tmp/mnt-2"
- name: "spark-local-dir-3"
hostPath:
path: "/tmp/mnt-3"
driver:
serviceAccount: spark
javaOptions: "-Divy.cache.dir=/tmp -Divy.home=/tmp"
envSecretKeyRefs:
APP_ID:
name: app
key: app_id
TENANT_ID:
name: app
key: tenant_id
APP_SECRET:
name: app
key: app_secret
BLOB_STORAGE_URL:
name: app
key: blob_storage_url
BLOB_STORAGE_INP_CONTAINER:
name: app
key: blob_storage_inp
BLOB_STORAGE_OUT_CONTAINER:
name: app
key: blob_storage_out
executor:
volumeMounts:
- name: "spark-local-dir-1"
mountPath: "/tmp/mnt-1"
- name: "spark-local-dir-2"
mountPath: "/tmp/mnt-2"
- name: "spark-local-dir-3"
mountPath: "/tmp/mnt-3"
envSecretKeyRefs:
APP_ID:
name: app
key: app_id
TENANT_ID:
name: app
key: tenant_id
APP_SECRET:
name: app
key: app_secret
BLOB_STORAGE_URL:
name: app
key: blob_storage_url
BLOB_STORAGE_INP_CONTAINER:
name: app
key: blob_storage_inp
BLOB_STORAGE_OUT_CONTAINER:
name: app
key: blob_storage_out

Note that we are creating a ScheduledSparkApplication because we are running the application using a cron everyday at 3 AM. If scheduling was not required, then we would just use SparkApplication.

The attributes in the YAML file are similar to what we saw for spark-submit command.

In-fact k8s runs spark-submit commands in the backend using these attributes.

In concurrencyPolicy, we set it to Allow i.e. multiple runs of the cron can take place at the same time. If the interval b/w 2 runs is shorter than the completion time of tasks, then multiple runs happens at the same time.

During Spark shuffle operations, executors send and receive data from one another and thus need to store their intermediate data structures.

One possible way, they can do it is using local directory within the docker container. But docker containers suffers from Copy On Write performance bottleneck i.e. when we have to write some large file in a folder, if that file is not already present, then it is searched through multiple layers and then copied at write time from the base image which incurs disk I/O.

To overcome this, k8s can mount additional volumes to the executor pods which is then used as a temporary location to write the intermediate data structures. Having SSD based disks are preferred.

In both the driver and executors, users can pass secrets as environment variables, that will be read by aks_executor.py file at runtime. Alternatively, the secrets can be read from key-vault from within the application.

The secrets can be created from local by running the following kubectl command:

kubectl create secret generic app \
--from-literal="app_id=$APP_ID" \
--from-literal="tenant_id=$TENANT_ID" \
--from-literal="app_secret=$APP_SECRET" \
--from-literal="blob_storage_url=$BLOB_STORAGE_URL" \
--from-literal="blob_storage_inp=$BLOB_STORAGE_INP_CONTAINER" \
--from-literal="blob_storage_out=$BLOB_STORAGE_OUT_CONTAINER"

To deploy the manifest file, run the following command:

kubectl apply -f aks-manifest.yml

Once the job starts, we can view the driver and executor pods by using the command:

kubectl get pods
driver and executor pods for spark application

To view the logs for any specific pod:

kubectl logs <pod name>

Note that the number of executor pods will vary depending on the load, since we have enabled autoscaling.

One advantage with running a spark cluster with k8s is that we can select the appropriate VM size for our tasks.

With synapse or databricks, the minimum configuration nodes itself can be quite an overkill for some tasks and thus although we might require 2GB RAM, we are bound to use 32GB machines which will cost 16 times more.

What is the role of Kubernetes?

source: google cloud
  1. When we send the spark-submit command to the k8s API server, the cluster manager will schedule the spark driver pod.
  2. The driver pod will pull the container image from ACR and run it.
  3. The driver pod would run the aks_executor.py code.
  4. When the spark driver encounters a spark read query, it will “lazily” read the data.
  5. Then for subsequent operations on the dataframes e.g. group by, filter, join, distinct etc. the driver would generate a logical plan for running the operations on each partition.
    The plan is composed of stages and tasks.
  6. On encountering show(), write, save() etc. operations, the driver generates a physical plan that will actually execute the operations on the partitions.
  7. The driver pod would then send request to the k8s scheduler to create executor pods.
  8. The k8s scheduler will create the executor pods, notify the driver about their endpoints.
  9. The driver will distribute the tasks to the executor pods.
  10. Depending on the operations, network I/O takes place among the driver and executor pods as well as among executor pods. The networking is handled by k8s.
  11. Kubernetes scheduler also handles autoscaling of the executor pods.
  12. Kubernetes also handles the health check for the driver and the executor pods. If any executor pod is down, it will notify the driver.

After the application is completed running, we can see that the driver pod is still present because k8s does garbage collection on the node.

How does it ensure high availability ?

  1. Whenever any executor pod crashes, the driver pod will request k8s scheduler to create a new one.
  2. When the driver pod crashes, the application is killed. In order to restart the application, we specify the retryPolicy in the aks-manifest.yml file above
  3. Whenever any executor pod crashes, any RDD or Dataframe, it calculates will be lost.
    But since spark uses an in-memory DAG to compute the transformations, it can re-run the DAG from start to recompute the lost RDD or Dataframe on a new executor pod.
  4. The spark operator itself runs in a pod which controls the spark application.
    If the spark operator pod gets killed, the entire application will be unavailable. To enable replication of the spark operator pod, install spark operator with additional replicaCount value:
helm install my-release spark-operator/spark-operator \
--set sparkJobNamespace=default \
--set replicaCount=3

How to ensure optimum resource utilization?

To view how many CPU cores and memory is being consumed by the driver and the executors, we can run the following command:

kubectl top pod --namespace default

We can see that each executor or average uses 0.5 to 1 CPU core and around 600–700MB of memory.

CPU=548m implies 548 millicores i.e. 0.548 cores

Memory=660Mi implies 660 MB

Since our node pool can allocate maximum of 20 instances (see above where we create the node pool) and each instance has 4 cores and 16GB of memory, thus we can create a maximum of 160 executors (assuming each executor takes up 0.5 core minimum) to make full utilization of our node pool.

dynamicAllocation:
enabled: true
initialExecutors: 1
minExecutors: 1
maxExecutors: 160

Using 20 executors, our task took 45 minutes to complete, whereas with 160 executors it took only 12 minutes.

But one problem that can be seen here is that with 160 executors, although the driver requests these many executors given the data size and computations, but k8s takes a lot of time to start all these pods. Often by the time the pods starts running, the task is almost completed.

--

--

Responses (1)