Documentation

Using Redis with FastAPI

Andrew Brookins
Author
Andrew Brookins, Former Curriculum Software Engineer at Redis

Introduction#

Learning Objectives#

  1. 1.Learn how to install aioredis-py and connect to Redis
  2. 2.Learn how to integrate aioredis-py with FastAPI
  3. 3.Learn how to use Redis to store and query timeseries data
  4. 4.Learn how to use Redis as a cache with aioredis-py

Pre-Tutorial Quiz#

Set Up the IsBitcoinLit Project#

About Redis for time series data#

An Asyncio Primer#

Installing the Redis Client#

    $ poetry init
    $ poetry add aioredis@2.0.0
NOTE

Integrate aioredis-py with FastAPI#

  1. 1.Storing 30-second averages of sentiment and price for the last 24 hours with Redis Time Series
  2. 2.Rolling up these averages into a three-hour snapshot with Redis Time Series
  3. 3.Caching the three-hour snapshot

Creating the time series#

NOTE
@app.on_event('startup')
async def startup_event():
    keys = Keys()
    await initialize_redis(keys)
async def make_timeseries(key):
    """
    Create a timeseries with the Redis key `key`.

    We'll use the duplicate policy known as "first," which ignores
    duplicate pairs of timestamp and values if we add them.

    Because of this, we don't worry about handling this logic
    ourselves -- but note that there is a performance cost to writes
    using this policy.
    """
    try:
        await redis.execute_command(
            'TS.CREATE', key,
            'DUPLICATE_POLICY', 'first',
        )
    except ResponseError as e:
        # Time series probably already exists
        log.info('Could not create time series %s, error: %s', key, e)
TIP

Storing Sentiment and Price Data in Redis#

@app.post('/refresh')
async def refresh(background_tasks: BackgroundTasks, keys: Keys = Depends(make_keys)):
    async with httpx.AsyncClient() as client:
        data = await client.get(SENTIMENT_API_URL)
    await persist(keys, data.json())
    data = await calculate_three_hours_of_data(keys)
    background_tasks.add_task(set_cache, data, keys)
[
  {
    "count": 7259,
    "timestamp": 1625592626.3452034,
    "rate": 0.0,
    "last": 0.33,
    "sum": 1425.82,
    "mean": 0.2,
    "median": 0.23,
    "btc_price": "33885.23"
  }
  //... Many more entries
]
    await add_many_to_timeseries(
        (
            (ts_price_key, 'btc_price'),
            (ts_sentiment_key, 'mean'),
        ), data,
    )
async def add_many_to_timeseries(
    key_pairs: Iterable[Tuple[str, str]],
    data: BitcoinSentiments
):
    """
    Add many samples to a single timeseries key.

    `key_pairs` is an iteratble of tuples containing in the 0th position the
    timestamp key into which to insert entries and the 1th position the name
    of the key within th `data` dict to find the sample.
    """
    partial = functools.partial(redis.execute_command, 'TS.MADD')
    for datapoint in data:
        for timeseries_key, sample_key in key_pairs:
            partial = functools.partial(
                partial, timeseries_key, int(
                    float(datapoint['timestamp']) * 1000,
                ),
                datapoint[sample_key],
            )
    return await partial()

Calculating Three-Hour Averages with Redis#

async def calculate_three_hours_of_data(keys: Keys) -> Dict[str, str]:
    sentiment_key = keys.timeseries_sentiment_key()
    price_key = keys.timeseries_price_key()
    three_hours_ago_ms = int((now() - timedelta(hours=3)).timestamp() * 1000)

    sentiment = await get_hourly_average(sentiment_key, three_hours_ago_ms)
    price = await get_hourly_average(price_key, three_hours_ago_ms)

    last_three_hours = [{
        'price': data[0][1], 'sentiment': data[1][1],
        'time': datetime.fromtimestamp(data[0][0] / 1000, tz=timezone.utc),
    }
        for data in zip(price, sentiment)]

    return {
        'hourly_average_of_averages': last_three_hours,
        'sentiment_direction': get_direction(last_three_hours, 'sentiment'),
        'price_direction': get_direction(last_three_hours, 'price'),
    }
async def get_hourly_average(ts_key: str, top_of_the_hour: int):
    response = await redis.execute_command(
        'TS.RANGE', ts_key, top_of_the_hour, '+',
        'AGGREGATION', 'avg', HOURLY_BUCKET,
    )
    # The response is a list of the structure [timestamp, average].
    return response

Caching Data with Redis#

  1. 1.Gets the latest sentiment and price data from SentiCrypt.
  2. 2.Saves the data into two time series in Redis.
  3. 3.Calculates the average of the averages for the last three hours.

Writing Cache Data to Redis#

    background_tasks.add_task(set_cache, data, keys)
async def set_cache(data, keys: Keys):
    def serialize_dates(v):
        return v.isoformat() if isinstance(v, datetime) else v

    await redis.set(
        keys.cache_key(),
        json.dumps(data, default=serialize_dates),
        ex=TWO_MINUTES,
    )

Reading Cache Data to Redis#

@app.get('/is-bitcoin-lit')
async def bitcoin(background_tasks: BackgroundTasks, keys: Keys = Depends(make_keys)):
    data = await get_cache(keys)

    if not data:
        data = await calculate_three_hours_of_data(keys)
        background_tasks.add_task(set_cache, data, keys)

    return data
def datetime_parser(dct):
    for k, v in dct.items():
        if isinstance(v, str) and v.endswith('+00:00'):
            try:
                dct[k] = datetime.datetime.fromisoformat(v)
            except:
                pass
    return dct


async def get_cache(keys: Keys):
    current_hour_cache_key = keys.cache_key()
    current_hour_stats = await redis.get(current_hour_cache_key)

    if current_hour_stats:
        return json.loads(current_hour_stats, object_hook=datetime_parser)

Summary#

  1. 1.We manually controlled caching in this tutorial, but you can also use a library like aiocache to cache data in Redis.
  2. 2.We ran Redis commands like TS.MADD using the execute_command() method in aioredis-py. If you are instead using redis-py in a synchronous project, you can use the same commands.