Dean Wampler2020-02-13 | 14 min read

Dean Wampler provides a distilled overview of Ray, an open source system for scaling Python systems from single machines to large clusters. If you are interested in additional insights, register for the upcoming Ray Summit.


This post is for people making technology decisions, by which I mean data science team leads, architects, dev team leads, even managers who are involved in strategic decisions about the technology used in their organizations. If your team has started using ​Ray​ and you’re wondering what it is, this post is for you. If you’re wondering if Ray should be part of your technical strategy for Python-based applications, especially ML and AI, this post is for you. If you want a more in-depth technical introduction to Ray, see ​this post on the Ray project blog​.

What is Ray?

Ray is an open-source system for scaling Python applications from single machines to large clusters. Its design is driven by the unique challenges of next-generation ML and AI systems, but its features make Ray an excellent choice for all Python-based applications that need to scale across a cluster, especially if they have distributed state. Ray also provides a minimally-invasive and intuitive API, so you get these benefits without a lot of effort and expertise in distributed systems programming.

Developers indicate in their code which parts should be distributed across a cluster and run asynchronously, then Ray handles the distribution for you. If run locally, the application can use all the cores in the machine (you can also specify a limit). When one machine isn’t enough, it’s straightforward to run Ray on a cluster of machines and have the application leverage the cluster. The only code change required at this point is the options you pass when initializing Ray in the application.

ML libraries that use Ray, such as RLlib for reinforcement learning (RL), Tune for hyper parameter tuning, and Serve for model serving (experimental), are implemented with Ray internally for its scalable, distributed computing and state management benefits, while providing a domain-specific API for the purposes they serve.

Motivations for Ray: Training a Reinforcement Learning (RL) Model

To understand the motivations for Ray, consider the example of training a reinforcement learning (RL) model. RL is the type of machine learning that was used recently to ​beat the world’s best Go players​ and ​achieve expert game play for Atari and similar games​.

Scalable RL requires many capabilities that Ray was designed to provide:

  • Highly parallelized and efficient execution of ​tasks (millions or more) - When training models, we repeat the same calculations over and over again to find the best model approach (“hyper parameters”) and, once the best structure is chosen, to find the model parameters​ that work best. We also require proper sequencing of tasks when they have dependencies on the results of other tasks.
  • ​Automatic Fault Tolerance - With all these tasks, a percentage of them may fail for a variety of reasons so we need a system that supports monitoring of tasks and recovery from failures.
  • Diverse computing patterns​ - Model training involves a lot of computational mathematics. Most RL model training, in particular, also requires efficient execution of a simulator—for example, a game engine we want to beat or a model representing real-world activity like autonomous driving. The computing patterns used (algorithms, memory access patterns, etc.) are more typical of general computing systems, which can be very different from the computing patterns common in data systems where high-throughput transformations and aggregations of records are the norm. Another difference is the dynamic nature of these computations. Think of how a game player (or simulator) adapts to the evolving state of a game, improving strategy, trying new tactics, etc. These diverse requirements are seen in a variety of newer ML-based systems like robotics, autonomous vehicles, computer vision systems, automatic dialog systems, etc.
  • Distributed state management​- With RL, the current model parameters and the simulator state need to be tracked between training iterations. This state becomes distributed because the tasks are distributed. Proper state management also requires proper sequencing of stateful operations..

Of course, other ML/AI systems require some or all of these capabilities. So do general Python applications operating at scale.

The Gist of Ray

Ray libraries like ​RLlib​, ​Tune​, and ​Serve​, use Ray but mostly hide it from users. However, using the Ray API itself is straightforward. Suppose you have an “expensive” function to run repeatedly over data records. If it’s ​stateless​, meaning it doesn’t maintain any state between invocations,​ a​nd you want to invoke it in parallel, all you need to do is turn the function into a Ray ​task​ by adding the ​@ray.remote​ annotation as follows:

def slow(record):
    new_record = expensive_process(record)
    return new_record

Then initialize Ray and call it over your data set as follows:

ray.init() # Arguments can specify the cluster location, etc.
futures = [slow.remote(r) for r in records]

Notice how we invoke the function slow using ​slow.remote instead.​ Each call returns immediately with a future​. We have a collection of them. If we’re running in a cluster, Ray manages the resources available and places this task on a node with the resources necessary to run the function.

We can now ask Ray to return each result as it finishes using ​ray.wait. ​Here’s one idiomatic way to do this:

while len(futures) > 0:
     finished, rest = ray.wait(futures)
     # Do something with “finished”, which has 1 value:
     value = ray.get(finished[0]) # Get the value from the future
     futures = rest

As written, we’ll wait until one of the invocations of ​slow ​completes, at which point ​ray.wait will return two lists. The first will have a single entry, the id of the future for the completed ​slow invocation. The rest of the list of futures that we passed in will be in the second list—​rest.​ We call ​ray.get ​to retrieve the value of the finished future. (Note: that’s a blocking call, but it returns immediately because we already know it’s done.) We finish the loop by resetting our list to be what’s remaining, then repeat until all remote invocations have completed and the results have been processed.

You can also pass arguments to ​ray.wait ​to return more than one at a time and to set a timeout. If you aren’t waiting on a collection of concurrent tasks, you can also wait on a specific future by calling ​ray.get(future_id)​.

Without arguments, ​ray.init ​assumes local execution and uses all available CPU cores. You can provide arguments to specify a cluster to run on, the number of CPU or GPU cores to use, etc.

Suppose one remote function has passed the future from another remote function invocation. Ray will automatically sequence such dependencies so they are evaluated in the required order. You don’t have to do anything yourself to handle this situation.

Suppose you have a ​stateful​ computation to do. When we used ​ray.get​ above, we were actually retrieving the values from a distributed object store. You can explicitly put objects there yourself if you want with ​ray.put​ which returns an id you can pass later to ​ray.get ​to retrieve it again.

Handling Stateful Computation with an Actor Model

Ray supports a more intuitive and flexible way to manage setting and retrieving state with an actor model.​ It uses regular Python classes that are converted into remote ​actors ​with the same @ray.remote​ annotation. For simplicity, suppose you need to count the number of times that slow​ is called. Here is a class to do just that:

class CountedSlows:
    def __init__(self, initial_count = 0):
        self.count = initial_count
    def slow(self, record):
        self.count += 1
        new_record = expensive_process(record)
        return new_record
    def get_count(self):
        return self.count

Except for the annotation, this looks like a normal Python class declaration, although normally you wouldn’t define the ​get_count​ method just to retrieve the ​count. ​I’ll come back to this shortly.

Now use it in a similar way. Note how an instance of the class is constructed and how methods on the instance are invoked, using remote​ as before:

cs = CountedSlows.remote() # Note how actor construction works
futures = [cs.slow.remote(r) for r in records]
while len(futures) > 0:
    finished, rest = ray.wait(futures)
    value = ray.get(finished[0])
futures = rest
count_future_id = cs.get_count.remote()

The last line should print the number that equals the size of the original collection. Note that I called the method get_count to retrieve the value of the ​count ​attribute. At this time, Ray doesn’t support retrieving instance ​attributes​ like ​count​ directly, so adding the method to retrieve it is the one required difference when compared to a regular Python class.

Ray Unifies Tasks and Actors

In both of the above cases, Ray keeps track of where the tasks and actors are located in the cluster, eliminating the need to explicitly know and manage such locations in user code. Mutation of state inside actors is handled in a thread-safe way, without the need for explicit concurrency primitives. Hence, Ray provides intuitive, distributed state management for applications, which means that Ray is an excellent platform for implementing ​statefulserverless applications in general. Furthermore, when communicating between tasks and actors on the same machine, the state is transparently managed through shared memory, with zero-copy serialization between the actors and tasks, for optimal performance.

Note: ​Let me emphasize an important benefit Ray is providing here. Without Ray, when you need to scale out an application over a cluster, you have to decide how many instances to create, where to place them in the cluster (or use a system like Kubernetes), how to manage their life cycles, how they will communicate information and coordinate between themselves, etc., etc. Ray does all this for you with minimal effort on your part. You mostly just write normal Python code. It’s a powerful tool for simplifying the design and management of your microservice architecture.

Adopting Ray

What if you’re already using other concurrency APIs like ​multiprocessing, ​asyncio​, or joblib? While they work well for scaling on a single machine, they don’t provide scaling to a cluster. Ray recently introduced experimental implementations of these APIs that allow your applications to scale to a cluster. The only change required in your code is the import statement. For example, if you are using ​multiprocessing.Pool ​this is the usual import statement:

from multiprocessing.pool import Pool

To use the Ray implementation, use this statement instead:

from ray.experimental.multiprocessing.pool import Pool

That’s all it takes.

What about Dask, which appears to provide many of the same capabilities as Ray? Dask is a good choice if you want distributed collections, like numpy arrays and Pandas DataFrames. (A research project called Modin that uses Ray will eventually meet this need.) Ray is designed for more general scenarios where distributed state management is required and where heterogeneous task execution must be very efficient at massive scale, like we need for reinforcement learning.


We've seen how Ray's abstractions and features make it a straightforward tool to use, while providing powerful distributed computing and state-management capabilities. Although the design of Ray was motivated by the specific needs of high-performance, highly demanding ML/AI applications, it is broadly applicable, even offering a new way to approach microservice-based architectures.
I hope you found this brief explanation of Ray intriguing. Please ​give it a try​ and let me know what you think! Send to: dean@anyscale.io

Dean Wampler is an expert in data engineering for scalable, streaming data systems and applications of machine learning and artificial intelligence (ML/AI).