$ poetry init
$ poetry add aioredis@2.0.0
@app.on_event('startup')
async def startup_event():
keys = Keys()
await initialize_redis(keys)
async def make_timeseries(key):
"""
Create a timeseries with the Redis key `key`.
We'll use the duplicate policy known as "first," which ignores
duplicate pairs of timestamp and values if we add them.
Because of this, we don't worry about handling this logic
ourselves -- but note that there is a performance cost to writes
using this policy.
"""
try:
await redis.execute_command(
'TS.CREATE', key,
'DUPLICATE_POLICY', 'first',
)
except ResponseError as e:
# Time series probably already exists
log.info('Could not create time series %s, error: %s', key, e)
@app.post('/refresh')
async def refresh(background_tasks: BackgroundTasks, keys: Keys = Depends(make_keys)):
async with httpx.AsyncClient() as client:
data = await client.get(SENTIMENT_API_URL)
await persist(keys, data.json())
data = await calculate_three_hours_of_data(keys)
background_tasks.add_task(set_cache, data, keys)
[
{
"count": 7259,
"timestamp": 1625592626.3452034,
"rate": 0.0,
"last": 0.33,
"sum": 1425.82,
"mean": 0.2,
"median": 0.23,
"btc_price": "33885.23"
}
//... Many more entries
]
await add_many_to_timeseries(
(
(ts_price_key, 'btc_price'),
(ts_sentiment_key, 'mean'),
), data,
)
async def add_many_to_timeseries(
key_pairs: Iterable[Tuple[str, str]],
data: BitcoinSentiments
):
"""
Add many samples to a single timeseries key.
`key_pairs` is an iteratble of tuples containing in the 0th position the
timestamp key into which to insert entries and the 1th position the name
of the key within th `data` dict to find the sample.
"""
partial = functools.partial(redis.execute_command, 'TS.MADD')
for datapoint in data:
for timeseries_key, sample_key in key_pairs:
partial = functools.partial(
partial, timeseries_key, int(
float(datapoint['timestamp']) * 1000,
),
datapoint[sample_key],
)
return await partial()
async def calculate_three_hours_of_data(keys: Keys) -> Dict[str, str]:
sentiment_key = keys.timeseries_sentiment_key()
price_key = keys.timeseries_price_key()
three_hours_ago_ms = int((now() - timedelta(hours=3)).timestamp() * 1000)
sentiment = await get_hourly_average(sentiment_key, three_hours_ago_ms)
price = await get_hourly_average(price_key, three_hours_ago_ms)
last_three_hours = [{
'price': data[0][1], 'sentiment': data[1][1],
'time': datetime.fromtimestamp(data[0][0] / 1000, tz=timezone.utc),
}
for data in zip(price, sentiment)]
return {
'hourly_average_of_averages': last_three_hours,
'sentiment_direction': get_direction(last_three_hours, 'sentiment'),
'price_direction': get_direction(last_three_hours, 'price'),
}
async def get_hourly_average(ts_key: str, top_of_the_hour: int):
response = await redis.execute_command(
'TS.RANGE', ts_key, top_of_the_hour, '+',
'AGGREGATION', 'avg', HOURLY_BUCKET,
)
# The response is a list of the structure [timestamp, average].
return response
background_tasks.add_task(set_cache, data, keys)
async def set_cache(data, keys: Keys):
def serialize_dates(v):
return v.isoformat() if isinstance(v, datetime) else v
await redis.set(
keys.cache_key(),
json.dumps(data, default=serialize_dates),
ex=TWO_MINUTES,
)
@app.get('/is-bitcoin-lit')
async def bitcoin(background_tasks: BackgroundTasks, keys: Keys = Depends(make_keys)):
data = await get_cache(keys)
if not data:
data = await calculate_three_hours_of_data(keys)
background_tasks.add_task(set_cache, data, keys)
return data
def datetime_parser(dct):
for k, v in dct.items():
if isinstance(v, str) and v.endswith('+00:00'):
try:
dct[k] = datetime.datetime.fromisoformat(v)
except:
pass
return dct
async def get_cache(keys: Keys):
current_hour_cache_key = keys.cache_key()
current_hour_stats = await redis.get(current_hour_cache_key)
if current_hour_stats:
return json.loads(current_hour_stats, object_hook=datetime_parser)