Begin utilizing ML platform whether or not you have a cluster

This post demonstrates an example of migrating Machine Learning research code to production-grade and deploying efficient, reproducible and persistent environment for training of hundreds of models.

My GitHub repository “kubeflow_k8s” contains all the python and yaml files, and installation instructions required to obtain the demonstrated here results.

Model training example train_amazon.py” prepares text training data, downloads and uptrains TensorFlow text sentiment model and runs its inference for validation text data — all in a single run:

python3 train_amazon.py

Goal is to attain production-grade environment:

Left: pipeline and components. Center: completed pipelines. Right: Kubernetes load monitoring dashboard

Here are the main steps.

Cluster
GitHub repository “Kubeflow_k8s” details a step-by-step process to install and deploy Kubeflow/Kubernetes cluster on desktop machine, tested on my Fedora/32-Core/130 Gib and Macbook Pro mini/2-Core/8 Gib.

Refactor into persistent functional components train_amazon_refactured.py illustrates splitting of “train_amazon.py” into stateless functional blocks that persist its results. For example:

def get_model(output_untrained_model: str):
hub_layer = hub.KerasLayer(
"https://tfhub.dev/google/tf2-preview/nnlm-en-dim128/1", output_shape = [128],
input_shape = [], dtype = tf.string, name = 'input', trainable = False
)

model = tf.keras.Sequential()
model.add(hub_layer)
model.add(tf.keras.layers.Dense(64, activation = 'relu'))
model.add(tf.keras.layers.Dense(3, activation = 'softmax', name = 'output'))
model.compile(
loss = 'categorical_crossentropy',
optimizer = 'Adam', metrics = ['accuracy']
)
model.summary()
print("\n\nsave untrained_model.pickle\n\n")
model.save(output_untrained_model)

“get_model” function downloads a trained model “nnlm-en-dim128”, adds two fully connected layers, defines a three-class classifier layer, and saves untrained model. Validate the new outputs do not change after refactoring:

python3 train_amazon_refactured.py

Containerize into Kubeflow components and pipeline
kfp_train_amazon.py defines four components and a pipeline. Compare the new “get_model” container recipe (which “under the hood” prescribes a local docker image and components building via Kubeflow decorator) to the previous functional component:

@component(
packages_to_install = ["tensorflow", "tensorflow_hub"],
output_component_file = "component_nnlm_model.yaml"
)
def nnlm_model_download(untrained_model: Output[Artifact]):
import tensorflow as tf
import tensorflow_hub as hub

hub_layer = hub.KerasLayer(
"https://tfhub.dev/google/tf2-preview/nnlm-en-dim128/1", output_shape = [128], input_shape = [],
dtype = tf.string, name = 'input', trainable = False
)

model = tf.keras.Sequential()
model.add(hub_layer)
model.add(tf.keras.layers.Dense(64, activation = 'relu'))
model.add(tf.keras.layers.Dense(3, activation = 'softmax', name = 'output'))
model.compile(
loss = 'categorical_crossentropy', optimizer = 'Adam', metrics = ['accuracy']
)
model.summary()
print("\n\nsave untrained_model.pickle\n\n")
model.save(untrained_model.path)

Kubeflow pipeline is defined in “pipeline_amazon.yaml” as follows:

@dsl.pipeline(name = "train_amazon_pipeline")
def my_pipeline(epochs: int = 10,
batch_size: int = 12,
num_samples: int = 10000,
url_train: str = "https://www.dropbox.com/s/tdsek2g4jwfoy8q/train.csv?dl=1",
url_test: str = "https://www.dropbox.com/s/tdsek2g4jwfoy8q/test.csv?dl=1"):
download_train_data_task = load_dataset(url = url_train, num_samples = num_samples)

nnlm_model = nnlm_model_download()

train_model_task = train(
epochs = epochs, batch_size = batch_size,
input_labels_artifacts = download_train_data_task.outputs["output_labels_artifacts"],
input_text_artifacts = download_train_data_task.outputs["output_text_artifacts"],
input_untrained_model = nnlm_model.outputs["untrained_model"]
)

download_test_data_task = load_dataset(url = url_test, num_samples = num_samples)

eval_model_task = eval_model(
input_model = train_model_task.outputs["output_model"],
input_labels_artifacts = download_test_data_task.outputs["output_labels_artifacts"],
input_text_artifacts = download_test_data_task.outputs["output_text_artifacts"]
)


kfp.compiler.Compiler(mode = kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE).compile(
pipeline_func = my_pipeline, package_path = 'pipeline_amazon.yaml'
)

Generate pipeline yaml file “pipeline_amazon.yaml”:

python3 kfp_train_amazon.py

This short model training example defines simple components and one pipeline in a single file “kfp_train_amazon.py”. Actual large-scale production projects re-use both components and pipeline fragments across multiple pipelines, and also include unit tests both for components and for pipeline segments.

Kubeflow pipelines can be seamlessly deployed across multiple production environments with — local, managed and on-premises Kubeflow/Kubernetes clusters, which in turn schedule and persist pipeline executions:

Left: upload new pipeline. Center: pipeline DAG. Right: pipeline running and completed containers

Results
Kubeflow/Kubernetes — “pipeline_amazon.yaml” generated automatically and defines Kubernetes controller ‘workflow’ and component containers:

Accuracy — “pipeline_amazon.yaml” overfits for the ten thousand sentences long training data with training-validation accuracy 0.95 and 0.75 respectively. Training with the full data resolves over-fitting and validation accuracy of 97%:

python3 train_amazon_refactured.py

# run time approx. 20mins 5 epochs. Validation:

8883/8883 [=========] - 2020s 227ms/step - loss: 0.1355 - accuracy: 0.9723

MLOps and stress test — my 32-core 130 Gib Fedora 36 manages to schedule tens of simultaneously running pipelines:

python3 kfp_stress.py

with Docker resources set to 20 cores and 64 Gib memory. Obliviously, since this simple example loads training and test data fully in memory, memory is the main bottleneck for pipelines running in parallel. I take advantage of the Kubeflow component cache that allows to skip container execution for identical components and parameters:

My 2-core 8 Gib mac Book mini manages to run just a few jobs but Kubeflow end point often fails. In its “idle” state, Kubeflow/Kubernetes locally installed cluster runs 14 deployments.

Future work

Containerize — refactor code into docker image and create four new components and pipelines using containers

Memory — find solution for the memory bottleneck

Inferences — create serving service using eval-model docker component

GPU — add GPU Tensorflow training and inferences and benchmark the speedup. Scheduling new GPU resources to local Kubeflow cluster: https://jacobtomlinson.dev/posts/2022/quick-hack-adding-gpu-support-to-kind/

Metadata — persist the default Kubeflow database used for storing metadata about the completed containers. This data will be lost when local Kubeflow cluster crashes and it is desirable to store metadata about completed tasks in a separate persistent storage.

Network bottlenecks — low-compute network components downloading data turn out to become a bottleneck in stress deployments, such as hundreds of simultaneously running pipeline, while high computational loads are handled efficiently by cluster.

Advanced MLOps — create production-grade with another ML platform (MLFlow, AirFlow, MetaFlow, other) and compare to Kubeflow/Kubernetes.

Advanced MLOps — deploy new master and two slaves Virtual Machines, install new Kubeflow cluster, define new scheduler and schedule GPU on the slave nodes.

Advanced MLOps — install cloud platform onto the new Virtual Machines (OpenStack or similar) and configure resources, such as allocate GPU nodes and auto-scale.

--

--