Run Haystack Pipelines in production with Ray Serve
A guide to scale Haystack pipelines for production with Ray Serve
June 14, 2023Ray Serve is a library built on top of the Ray framework for building online inference APIs. Serve is designed to be framework-agnostic, and while its simple design lets you quickly integrate pretty much any Python logic you need to deploy, building up complex inference services is still possible and straightforward.
Haystack pipelines can be quite complex, but even the simpler ones consist of multiple components, which in turn might rely on different models and technologies - this aspect make them a good benchmark to test out Ray Serveās capabilities.
Serve a Retriever - Reader QA pipeline
Prepare a set of documents
Letās begin by serving locally a Haystack pipeline thatās simple but non-trivial: a question-answering system working on an existing set of documents. For simplicity, weāll use a Docker image running Elasticsearch and populated with a set of documents about countries and capitals that can be queried right ahead. So before we start, letās run the image:
docker run -p 9200:9200 -p 9300:9300 -d deepset/elasticsearch-countries-and-capitals:latest
The Elasticsearch instance will accept connections on localhost
through the port 9200
.
Run the Haystack pipeline
Haystack pipelines can be defined through Python code or a
yaml definition that is eventually and transparently converted into Python code. Weāll use the latter format, so open an editor and save the following definition in a file called pipeline.yml
version: ignore
# define all the building-blocks for a Pipeline
components:
- name: DocumentStore
type: ElasticsearchDocumentStore
params:
host: localhost
- name: Retriever
type: DensePassageRetriever
params:
document_store: DocumentStore
top_k: 5
- name: Reader
type: FARMReader
params:
model_name_or_path: deepset/roberta-base-squad2
context_window_size: 1000
return_no_answer: true
pipelines:
- name: query
nodes:
- name: Retriever
inputs: [Query]
- name: Reader
inputs: [Retriever]
To perform a query, we need a bit of Python code to read the yaml configuration above and tell Haystack to run the resulting pipeline:
from pathlib import Path
from haystack import Pipeline
config = Path("pipeline.yml")
pipeline = Pipeline.load_from_yaml(config, "query")
answer = pipeline.run("What is the capital of France?")
print(f"Answer: {answer}")
As you can see, each time we want to make a question we need to rebuild the pipeline and call the run
method - letās make an inference service out of this and query it over HTTP.
Run the Haystack pipeline through Ray Serve
A Ray Serve deployment looks exactly like a regular HTTP server from the caller perspective, and to provide an āingressā endpoint the bare minimum requirement is writing a Python class similar to the following:
@serve.deployment
class MinimalIngress:
async def __call__(self, request: Request) -> str:
name = await request.json()["name"]
return f"Hello {name}"
The presence of an async def __call__
method is the only contract we need to fulfill in order to spawn a Ray Serve deployment. To create a deployment that will run a Haystack pipeline, letās create a file called capitals.py
and add the following code:
from pathlib import Path
from ray import serve
from starlette.requests import Request
from haystack import Pipeline
@serve.deployment
class HaystackQA:
def __init__(self):
# Load the pipeline from file and store it in self._pipeline
# so we can reuse it every time __call__ is invoked.
config = Path("pipeline.yml")
self._pipeline = Pipeline.load_from_yaml(config, "query")
async def __call__(self, request: Request) -> str:
query: str = str(await request.body())
res = self._pipeline.run(query=query)
answers = res.get("answers", [])
if answers:
# Return the first answer
return answers[0].answer
return ""
haystack_deployment = HaystackQA.bind()
A couple of things to note about the code above:
- We added a
__init__
method where we initialize the pipeline once and for the whole lifetime of the deployment. - Every class decorated with
@serve.deployment
gets abind
method that tells Ray Serve to attach ourHaystackQA
class to a deployment object that we namehaystack_deployment
. In a moment, when we will start the server process, Ray Serve will pick up the value of this variable.
The content of the Python file capitals.py
is now all we need to āserveā our deployment. Ray Serve can be run from Python directly, but for this example weāll show how to start the process from the command line, which is closer to what we would do in a production use case. From a shell where the Python environment containing ray
is active, you can just run:
serve run capitals:haystack_deployment
You should see several log lines scrolling down in the shell, and if everything went well, a final message Deployed Serve app successfully.
will tell us that the deployment is ready to accept connections - we can make a query now. We will use curl
but really any HTTP client will work the same. From the command line:
curl -X POST -d "What is the capital of Italy?" http://localhost:8000
It might take a while to get a response the very first time, while Haystack downloads the model locally, but in the end you should see the response Rome
.
Scale the deployment
What we did so far doesnāt look much different from putting a pipeline.run()
call behind a REST API, but Ray Serve shines when itās time to put our pipeline in production. For example, letās say we want to scale our HaystackQA
horizontally to serve multiple requests at once. All we need to do is to change the Python code like this:
@serve.deployment(num_replicas=3) # this is the only line to change!
class HaystackQA:
def __init__(self):
...
We can now stop the server and restart it with the same serve run
command: thatās it, we know have 3 instances of our HaystackQA
deployment as you can see from the Ray control panel running at
http://127.0.0.1:8265/#/actors
In case we want to adapt the number of deployments to the current traffic so we can optimize costs, Ray Serve offers autoscaling out of the box. All we need to change is again the @serve.deployment
decorator like this:
@serve.deployment(
autoscaling_config={
"min_replicas": 1,
"initial_replicas": 2,
"max_replicas": 5,
"downscale_delay_s": 30,
}
)
class HaystackQA:
def __init__(self):
...
The autoscaling_config
parameters are self-explanatory and the
docs are very helpful in detailing the logic behind that. In this case we tell Ray Serve that we want to scale up our deployment to up to 5 deployment replicas and scale down to at least 1. We also want to start with 2 replicas and downscale if thereās no activity for 30 seconds. Once again you can observe how this works behind the scenes by starting the server and looking at the dashboard on
http://127.0.0.1:8265/#/actors
.
Whatās next
The deployment strategy we introduced in this article is ok for many use cases ranging from trying out Haystack to demo applications and small POCs, but close readers will note the limit of scaling up and down a whole Haystack pipeline: certain nodes of a pipeline might benefit more than others of horizontal scaling and redundancy, can we do that with Ray Serve? The answer is yes, and weāll see in an upcoming article how we can split a pipeline into multiple deployments that can be then managed by Ray Serve with different strategies.