Using Redis with FastAPI

This tutorial helps you get started with Redis and FastAPI.

Introduction#

FastAPI is a Python web framework based on the Starlette microframework. With deep support for asyncio, FastAPI is indeed very fast. FastAPI also distinguishes itself with features like automatic OpenAPI (OAS) documentation for your API, easy-to-use data validation tools, and more.

Of course, the best way to make your FastAPI service even faster is to use Redis. Unlike most databases, Redis excels at low-latency access because it's an in-memory database.

In this tutorial, we'll walk through the steps necessary to use Redis with FastAPI. We're going to build IsBitcoinLit, an API that stores Bitcoin sentiment and price averages in RedisTimeSeries, then rolls these averages up for the last three hours.

Next, let's look at the learning objectives of this tutorial.

Learning Objectives#

The learning objectives of this tutorial are:

  1. Learn how to install aioredis-py and connect to Redis
  2. Learn how to integrate aioredis-py with FastAPI
  3. Learn how to use RedisTimeSeries to store and query timeseries data
  4. Learn how to use Redis as a cache with aioredis-py

Let's get started!

Pre-Tutorial Quiz#

Want to check gaps in your knowledge of Redis and FastAPI before you continue? Take our short pre-tutorial quiz!

You can also visit the quiz directly.

Set Up the IsBitcoinLit Project#

You can achieve the learning objectives of this tutorial by reading through the text and code examples that follow.

However, we recommend that you set up the example project yourself, so that you can try out some of the code as you learn. The project has a permissive license that allows you to use it freely.

To get started, fork the example project on GitHub.

Follow the README to the project running.

About RedisTimeSeries#

RedisTimeSeries is a source available Redis Module that adds a timeseries data type to Redis. Timeseries is a great way to model any data that you want to query over time, like in this case, the ever-changing price of Bitcoin.

You can get started by following the setup instructions in the RedisTimeSeries documentation.

However, note that this tutorial's example project configures RedisTimeSeries automatically for you with the redismod Docker image.

An Asyncio Primer#

The IsBitcoinLit project is completely async. That means we use an asyncio-compatible Redis client called aioredis-py and FastAPI's async features.

If you aren't familiar with asyncio, take a few minutes to watch this primer on asyncio before continuing:

Installing the Redis Client#

We're going to start this tutorial assuming that you have a FastAPI project to work with. We'll use the IsBitcoinLit project for our examples.

Poetry is the best way to manage Python dependencies today, so we'll use it in this tutorial.

IsBitcoinLit includes a pyproject.toml file that Poetry uses to manage the project's directories, but if you had not already created one, you could do so like this:

$ poetry init

Once you have a pyproject.toml file, and assuming you already added FastAPI and any other necessary dependencies, you could add aioredis-py to your project like this:

$ poetry add aioredis-py@2.0.0b1

NOTE: This tutorial uses a beta version of aioredis-py 2.0. The 2.0 version of aioredis-py features an API that matches the most popular synchronous Redis client for Python, redis-py.

The aioredis-py client is now installed. Time to write some code!

Integrate aioredis-py with FastAPI#

We're going to use Redis for a few things in this FastAPI app:

  1. Storing 30-second averages of sentiment and price for the last 24 hours with RedisTimeSeries
  2. Rolling up these averages into a three-hour snapshot with RedisTimeSeries
  3. Caching the three-hour snapshot

Let's look at each of these integration points in more detail.

Creating the Timeseries#

The data for our app consists of 30-second averages of Bitcoin prices and sentiment ratings for the last 24 hours. We pull these from the SentiCrypt API.

NOTE: We have no affiliation with SentiCrypt or any idea how accurate these numbers are. This example is just for fun!

We're going to store price and sentiment averages in a timeseries with RedisTimeSeries, so we want to make sure that when the app starts up, the timeseries exists.

We can use a startup event to accomplish this. Doing so looks like the following:

@app.on_event('startup')
async def startup_event():
keys = Keys()
await initialize_redis(keys)

We'll use the TS.CREATE RedisTimeSeries command to create the timeseries within our initialize_redis() function:

async def make_timeseries(key):
"""
Create a timeseries with the Redis key `key`.
We'll use the duplicate policy known as "first," which ignores
duplicate pairs of timestamp and values if we add them.
Because of this, we don't worry about handling this logic
ourselves -- but note that there is a performance cost to writes
using this policy.
"""
try:
await redis.execute_command(
'TS.CREATE', key,
'DUPLICATE_POLICY', 'first',
)
except ResponseError as e:
# Time series probably already exists
log.info('Could not create timeseries %s, error: %s', key, e)

TIP: An interesting point to note from this code is that when we create a timeseries, we can use the DUPLICATE_POLICY option to specify how to handle duplicate pairs of timestamp and values.

Storing Sentiment and Price Data in RedisTimeSeries#

A /refresh endpoint exists in the app to allow a client to trigger a refresh of the 30-second averages. This is the entire function:

@app.post('/refresh')
async def refresh(background_tasks: BackgroundTasks, keys: Keys = Depends(make_keys)):
async with httpx.AsyncClient() as client:
data = await client.get(SENTIMENT_API_URL)
await persist(keys, data.json())
data = await calculate_three_hours_of_data(keys)
background_tasks.add_task(set_cache, data, keys)

As is often the case with Python, a lot happens in a few lines, so let's walk through them.

The first thing we do is get the latest sentiment and price data from SentiCrypt. The response data looks like this:

[
{
"count": 7259,
"timestamp": 1625592626.3452034,
"rate": 0.0,
"last": 0.33,
"sum": 1425.82,
"mean": 0.2,
"median": 0.23,
"btc_price": "33885.23"
},
//... Many more entries
]

Then we save the data into two timeseries in Redis with the persist() function. That ends up calling another helper, add_many_to_timeseries(), like this:

await add_many_to_timeseries(
(
(ts_price_key, 'btc_price'),
(ts_sentiment_key, 'mean'),
), data,
)

The add_many_to_timeseries() function takes a list of (timeseries key, sample key) pairs and a list of samples from SentiCrypt. For each sample, it reads the value of the sample key in the SentiCrypt sample, like "btc_price," and adds that value to the given timeseries key.

Here's the function:

async def add_many_to_timeseries(
key_pairs: Iterable[Tuple[str, str]],
data: BitcoinSentiments
):
"""
Add many samples to a single timeseries key.
`key_pairs` is an iteratble of tuples containing in the 0th position the
timestamp key into which to insert entries and the 1th position the name
of the key within th `data` dict to find the sample.
"""
partial = functools.partial(redis.execute_command, 'TS.MADD')
for datapoint in data:
for timeseries_key, sample_key in key_pairs:
partial = functools.partial(
partial, timeseries_key, int(
float(datapoint['timestamp']) * 1000,
),
datapoint[sample_key],
)
return await partial()

This code is dense, so let's break it down.

We're using the TS.MADD RedisTimeSeries command to add many samples to a timeseries. We use TS.MADD because doing so is faster than TS.ADD for adding batches of samples to a timeseries.

This results in a single large TS.MADD call that adds price data to the price timeseries and sentiment data to the sentiment timeseries. Conveniently, TS.MADD can add samples to multiple timeseries in a single call.

Calculating Three-Hour Averages with RedisTimeSeries#

Clients use IsBitcoinLit to get the average price and sentiment for each of the last three hours. But so far, we've only stored 30-second averages in Redis. How do we calculate the average of these averages for the last three hours?

When we run /refresh, we call calculate_three_hours_of_data() to do so. The function looks like this:

async def calculate_three_hours_of_data(keys: Keys) -> Dict[str, str]:
sentiment_key = keys.timeseries_sentiment_key()
price_key = keys.timeseries_price_key()
three_hours_ago_ms = int((now() - timedelta(hours=3)).timestamp() * 1000)
sentiment = await get_hourly_average(sentiment_key, three_hours_ago_ms)
price = await get_hourly_average(price_key, three_hours_ago_ms)
last_three_hours = [{
'price': data[0][1], 'sentiment': data[1][1],
'time': datetime.fromtimestamp(data[0][0] / 1000, tz=timezone.utc),
}
for data in zip(price, sentiment)]
return {
'hourly_average_of_averages': last_three_hours,
'sentiment_direction': get_direction(last_three_hours, 'sentiment'),
'price_direction': get_direction(last_three_hours, 'price'),
}

There is more going on here than we need to know for this tutorial. As a summary, most of this code exists to support calls to get_hourly_average().

That function is where the core logic exists to calculate averages for the last three hours, so let's see what it contains:

async def get_hourly_average(ts_key: str, top_of_the_hour: int):
response = await redis.execute_command(
'TS.RANGE', ts_key, top_of_the_hour, '+',
'AGGREGATION', 'avg', HOURLY_BUCKET,
)
# The response is a list of the structure [timestamp, average].
return response

Here, we use the TS.RANGE command to get the samples in the timeseries from the "top" of the hour three hours ago, until the latest sample in the series. With the AGGREGATE parameter, we get back the averages of the samples in hourly buckets.

So where does this leave us? With averages of the averages, one for each of the last three hours.

Caching Data with Redis#

Let's review. We have code that achieves the following:

  1. Gets the latest sentiment and price data from SentiCrypt.
  2. Saves the data into two timeseries in Redis.
  3. Calculates the average of the averages for the last three hours.

The snapshot of averages for the last three hours is the data we want to serve clients when they hit the /is-bitcoin-lit endpoint. We could run this calculation every time a client requests data, but that would be inefficient. Let's cache it in Redis!

First, we'll look at writing to the cache. Then we'll see how FastAPI reads from the cache.

Writing Cache Data to Redis#

Take a closer look at the last line of the refresh() function:

background_tasks.add_task(set_cache, data, keys)

In FastAPI, you can run code outside of a web request after returning a response. This feature is called background tasks.

This is not as robust as using a background task library like Celery. Instead, Background Tasks are a simple way to run code outside of a web request, which is a great fit for things like updating a cache.

When you call add_task(), you pass in a function and a list of arguments. Here, we pass in set_cache(). This function saves the three-hour averages summary to Redis. Let's look at how it works:

async def set_cache(data, keys: Keys):
def serialize_dates(v):
return v.isoformat() if isinstance(v, datetime) else v
await redis.set(
keys.cache_key(),
json.dumps(data, default=serialize_dates),
ex=TWO_MINUTES,
)

First, we serialize the three-hour summary data to JSON and save it to Redis. We use the ex parameter to set the expiration time for the data to two minutes.

TIP: You need to provide a default serializer for the json.dumps() function so that dumps() knows how to serialize datetime objects.

This means that after every refresh, we've primed the cache. The cache isn't primed for long -- only two minutes -- but it's something!

Reading Cache Data to Redis#

We haven't even seen the API endpoint that clients will use yet! Here it is:

@app.get('/is-bitcoin-lit')
async def bitcoin(background_tasks: BackgroundTasks, keys: Keys = Depends(make_keys)):
data = await get_cache(keys)
if not data:
data = await calculate_three_hours_of_data(keys)
background_tasks.add_task(set_cache, data, keys)
return data

To use this endpoint, clients make a GET request to /is-bitcoin-lit. Then we try to get the cached three-hour summary from Redis. If we can't, we calculate the three-hour summary, return it, and then save it outside of the web request.

We've already seen how calculating the summary data works, and we just explored saving the summary data to Redis. So, let's look at the get_cache() function, where we read the cached data:

def datetime_parser(dct):
for k, v in dct.items():
if isinstance(v, str) and v.endswith('+00:00'):
try:
dct[k] = datetime.datetime.fromisoformat(v)
except:
pass
return dct
async def get_cache(keys: Keys):
current_hour_cache_key = keys.cache_key()
current_hour_stats = await redis.get(current_hour_cache_key)
if current_hour_stats:
return json.loads(current_hour_stats, object_hook=datetime_parser)

Remember that when we serialized the summary data to JSON, we needed to provide a default serializer for json.dumps() that understood datetime objects. Now that we're deserializing that data, we need to give json.loads() an "object hook" that understands datetime strings. That's what datetime_parser() does.

Other than parsing dates, this code is relatively straightforward. We get the current hour's cache key, and then we try to get the cached data from Redis. If we can't, we return None.

Summary#

Putting all the pieces together, we now have a FastAPI app that can retrieve Bitcoin price and sentiment averages, store the averages in Redis, cache three-hour summary data in Redis, and serve the data to clients. Not too shabby!

Here are a few notes to consider:

  1. We manually controlled caching in this tutorial, but you can also use a library like aiocache to cache data in Redis.
  2. We ran RedisTimeSeries commands like TS.MADD using the execute_command() method in aioredis-py. If you are instead using redis-py in a synchronous project, you can use the redistimeseries-py library to run RedisTimeSeries commands.