Workload Parallelization in Kubernetes with Python, DIY-style

Because sometimes you just need to do it yourself, with Python

TL;DR: This post describes a basic design that will allow you to distribute [large] tasks to multiple workers running in Kubernetes to be run in parallel. It uses containers that run a listener service (that you write). You deploy these containers using a Kubernetes Job workload controller. You then send your various tasks to all of the containers with some client code. Your listener app deserializes the work, runs it, and returns the output. This is the architecture in a nutshell.

The tips and details I provide will be using Python, but the general design could work in any language, using a serialization method that allows transmission of data via TCP. This is definitely an architecture guide and not an implementation guide or a tutorial. There won’t be a ton of cut-and-paste-able nuggets or information on how to set up your Kubernetes cluster. But it might still be worth your time.

Because this piece is long, here’s a quick outline:

The Main and the Workers

Put Kubernetes on the back burner for a moment. The basic design used here for distributing tasks from one Main to many Workers could be run in a context of multiple processes on a single machine, multiple physical machines on a network, VMs, or containers, as with Kubernetes. It’s worthwhile to wrap our minds around this design before moving on to a Kubernetes implementation, so these first two sections just deal with this basic distribution design.

Two Environments

The design involves setting up two runtimes, Main and Worker. For now, we’ll just consider these as distinct environments. Both environments contain your library which defines the classes and functions that can be instantiated and called to do some work, respectively. The Worker environment contains a listener service which is running. The Main environment contains client code that is able to create an appropriate request to be sent to the Worker listeners.

Serialization is the Secret Sauce

The key to this design is the ability to serialize your in-memory objects which provide configuration and inputs for the various worker processes. In Python, serialization via the standard library’s pickle module is usually sufficient. Serialization allows you to transmit information between two very similar environments, trusting that the environments define and use the information in same way.

Here’s a trivial example that demonstrates distributed compute using pickle serialization. You’ve created a class that takes a single parameter on construction, y. It has a single method called power which takes a single paramter, x, and then returns you x^y (x to the power of y). In Python:

class PowerOp:    def __init__(self, y):
self.y = y

def power(self, x):
return x**self.y

You’d like to find the cube of every number from 1 to 10. If you wanted to distribute this work, you could package your PowerOp class and install the package in the Main and Worker environments. In the Main runtime, you could instantiate the PowerOp class with a value of 3, then pickle a tuple of (<PowerOp obj>, x) where x is a number from 1 to 10. Each pickled tuple could be sent to a different worker, which, in its runtime, unpickles the tuple and passes x to the power() method of the PowerOp object. Here’s an example for a single number:

# Main runtime code:import picklepower_operator = PowerOp(3)task4 = (power_operator, 4)task4_pkl = pickle.dumps(task4)# Send the pickled task to the Worker

and then on the worker side…

# Worker4 runtime code:import pickle# Receive the tasktask4 = pickle.loads(task4_pkl)power_operator, x = task_4result = power_operator.power(x)result_pkl = pickle.dumps(result)# Send the result_pkl back

This doesn’t require a listener service. One (very inefficient) way to distribute your parallel tasks would even be to email all your pickled tasks to your co-workers who have the same libraries installed. They could run your tasks and email you the pickled results back. Ta-da! Distributed compute!

But there is a better way.

The Client and the Listeners

The architecture that we’re using in this guide relies on distributing our tasks to our workers by having each worker run a listener service that is configured to receive our task and execute it. We’ll look at loosely coupling our listener from our library with client code, and then what a listener service implementation might look like.

Client code for Loosely Coupling our Listener and Library

“Loose coupling” means that two parts of a system which have separate responsibilities and are likely to evolve separately should interact with each other via an interface. Using our trivial example from above, it means that we shouldn’t hard-code our client-listener operations into the package we made for our PowerOp class. Likewise, our client-listener code shouldn’t have to know the inner workings of the classes that it is handling. Rather, these two will interact with a very basic interface. Let’s walk through an example. As a start, we’ll separate our code into two libraries. It could have the following directory structure:

src/    power_op_pkg/
setup.py # classic packaging script
p_op/
power_op.py # contains the PowerOp class
listener_pkg/
setup.py # classic packaging script
my_listener/
client.py # used by the Main runtime
send_recv.py # some internal logic
listener.py # runs in the Worker runtime

These represent two different packages that should both be installed in each environment.

The listener package provides some client code that we can include in our script. The interface should be very simple, allowing us to pass in a list of callable objects and any parameters needed. For now, we won’t worry about the way the Client knows how to find the worker listeners since there are a number of ways to answer that question.

Once we’ve built out listener package, we want the client code to run like this:

# Main runtimefrom p_op.power_op import PowerOp
from my_listener.client import Client
cuber = PowerOp(3)
tasks = []
for i in range(1, 10):
tasks.append((cuber.power, (i,), {}))
results = Client.distribute(tasks)

Note a few things. First, note that we’re passing in cuber.power rather than the cuber object itself. That’s an interface choice. We don’t want our listener package to know a specific method name to call. We want it to be able to call the object that is passed in with any arguments provided. (If you’re looking for alternatives, consider the Command pattern.) Then note that we passed it a 3-item tuple of a callable, a tuple of arguments, and an empty dictionary. This generalizes the form of passing parameters to the callable (see example below). Also note that we didn’t do any serialization. That should be done under the hood of the client code.

So what’s the Client actually doing? How about this:

from my_listener.send_recv import send_recv_many,class Client:    @staticmethod
def get_listener_addrs():
# logic to return listener addresses
@staticmethod
def distribute(task_tuples, addrs=None):
if not addrs:
addrs = Client.get_listener_addrs()
return send_recv_many(task_tuples, addrs)

We’ve left it all generalized, but the purpose is clear: The client grabs the addresses of the listeners (logic tbd) and calls a send_recv_many function that maps tasks to the various listeners.

What exactly is this send_recv_many function doing? As opposed to our client code, this code is tightly coupled with whatever listener implementation we choose. The only things that are absolute musts are 1) in order for the workers to actually work in parallel, whatever logic we use for sending our various tasks to listeners and getting individual responses must be asynchronous, and 2) we must block, wait, gather, whatever term you like, the responses to these asynchronous requests prior to returning them.

With those caveats in mind, lets discuss our options for the listener service.

Listener Service Implementation Options

Ignoring the entire question of service architecture, any request our listener service handles should probably include something like the following:

# ... somewhere in the bowels of our request handler ...func, args, kwargs = one_task_tuple
result = func(*args, **kwargs)
# ... and we proceed to send back the result ...

In our running example, funccorresponds to cuber.power, argsis a tuple containing the number to be cubed, and kwargs is an empty dict. To reiterate a previous point, our listener doesn’t need to know anything about the PowerOp class. Provided that the environment is able to deserialize the callable object that was sent, it can call the method with the correct parameters in a completely generalized way.

But what sort of listener are we talking about here? The two options that I recommend are a TCP listener or a lightweight HTTP app, run with a framework like Flask. Of these two, I am a big fan of a TCP listener, and I’ve even written another Medium post on how to implement a socket-based service in Python with socketserver. It will keep your environment clean, and for a task like this, it’s all you need.

If you choose to make a Flask app and you want to use the pickle module for serialization, keep in mind you’ll also need to base64 encode your objects to make them transmittable in an HTTP request. Python’s base64 library has everything you need for that.

One important note: There are advantages to having your listener service terminate after handling a single request, particularly when we start deploying it on Kubernetes. This is very simple with a TCP listener, and I can think of simple ways I would try with a Flask app, though I haven’t tried them.

Also, don’t forget that the functions in send_recv.py should be async. Or else why are we even doing this?

The Kubernetes Part

And finally, we’re at the part that probably got you to click on this article in the first place. For this part, let’s talk Containers, Jobs, Services, and Namespaces.

Containers

You need to containerize your Worker environment. Your container’s environment should therefore have any library that you’re hoping to parallelize work for, and your listener service code. The final command of your container’s definition should be to run the listener service. When your listener service terminates, then so does the container. This is a meaningful feature when using a Kubernetes Job workload controller.

You want to host your container image in a repository from which your Kubernetes control plane can pull it.

Jobs

Photo by AB on Unsplash

← No, not that Jobs.

A Job, or I should say a Job is a type of Kubernetes resource called a workload controller. (The docs.)

A workload controller is a service that runs in the Kubernetes control plane (the Kubernetes brain). When it starts up, it knows that some configuration should be present in Kubernetes, and its job is to continually try to make (or keep) that configuration happen(ing).

When you create a new Job workload controller, you tell Kubernetes that there should be k-many containers running some container image, and that Kubernetes should try to keep k-many containers running until n-many containers have exited.

In our beloved example, let’s imagine that we want to find the cube of 10 numbers, but we want to use only 3 workers to distribute the work. Let’s also say that our listeners are configured to handle a single request and then exit, terminating the container. We can tell Kubernetes to create a Job that keeps 3 pods running with our worker image on them at all times until 10 pods have successfully terminated. (Side note, if you’re doing a little retro-implementation now on your listener service to ensure that all this will happen, bravo/-a.) Then, as the Client running in our Main runtime distributes the tasks, it should see 3 workers. The Client should immediately distribute 3 tasks, one to each worker. (The remaining tasks might be pending in some sort of timeout loop, waiting for a listener that is listening, or you might be submitting in batches.) After a worker completes a task and returns its result, it will terminate. Our Kubernetes Job controller will see that the termination was clean, will count this as 1 completion (only 9 more to go), and will create another Worker pod. Once that new Worker pod is listening, our Client can find it and send another task.

When creating a Job, the attribute parallelism describes how many pods should be kept available, while completions describes how many pods should terminate cleanly before Kubernetes considers the Job complete.

If you’re not familiar with creating Kubernetes resources, there are a plethora of guides out there to walk you through it.

Services

Creating a service that provides a stable, abstract endpoint between your client code and your listeners is a good way to do things, particularly when your parallelism < completions. For our use case, Kubernetes has a service type called NodePort that is perfectly sufficient. A NodePort opens up some port number on all the Nodes in the cluster, then forwards requests from those node ports to a certain port (which you specify) on pods that have a certain label (which you specify). Thus, if we can discover the IP of a single node in our cluster, then given the port number we specify when creating our NodePort service, we can send requests to a single location that is known at the time of Job creation.

Namespaces

One final note, when creating more than one resource for a single purpose with a limited lifespan, I like to create a new Kubernetes namespace resource.

Tie it Together with a Context Manager

We’ve described multiple Kubernetes resources (Job, NodePort, Namespace) that need to be created prior to running our tasks and cleaned up immediately afterwards. The mouse running on that wheel in your Python developer brain should now be waving a giant flag that says “Context Manager”.

We should write code that can create and terminate these Kubernetes resources, and then we should put that code into the __enter__ and __exit__ methods of a class that we create called JobContext.

As before, we should keep this code un-coupled from our other packages — the listener code need not be bound to a Kubernetes implementation, and our PowerOp code has nothing to do with either, really. Here’s what our src directory might look like with our new Kubernetes-managing code in place:

src/power_op_pkg/
setup.py # classic packaging script
p_op/
power_op.py # contains the PowerOp class
listener_pkg/
setup.py # classic packaging script
my_listener/
client.py # used by the Main runtime
send_recv.py # some internal logic
listener.py # runs in the Worker runtime
kube_pkg/
setup.py # classic packaging script
kube_helper/
configs/
job.yaml # configs a job
namespace.yaml # configs a namespace
node_port.yaml # configs a node port
create.py # create resources from yamls
cleanup.py # clean up resources
job_context.py # context manager

Using a context manager keeps our code tidy, and it also allows us to address the concern of grabbing the listener address in a way external to the Client, which helps decouple it further from the implementation:

# Main runtimefrom p_op.power_op import PowerOp
from my_listener.client import Client
from kube_helper.job_context import JobContext
cuber = PowerOp(3)
tasks = []
for i in range(1, 10):
tasks.append((cuber.power, (i,), {}))
params = {} # define Kubernetes resource params herewith JobContext(**params) as job:
results = Client.distribute(tasks, job.addr)

You can see we’ve replaced our Client class with a JobContext class. Here’s a sketch for what a class like that might look like:

from kube_helper.create import create_resource
from kube_helper.cleanup import cleanup_resource
class JobContext: resources = ['namespace', 'job', 'node_port'] def __init__(self, params):
self.params = params
self.addr = None

def __enter__(self):
for resource in self.resources:
create_resource(resource, self.params)
self.addr = self._get_addr()
return self
def __exit__(self, etype, exc, tb):
for resource in self.resources:
cleanup_resource(resource, self.params)
def _get_addr(self):
# grab the hostname or IP of a node
# get the node port number from params
# return a tuple of (host, port)

The End

John is a software engineer who primarily works in data science platform design.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store