Processing Time Series data with Redis and .NET


Profile picture for Steve Lorello
Author:
Steve Lorello, .NET Developer Advocate at Redis

Time Series data can be used to measure anything from remote sensor readings to stock market feeds. Working with time series data in .NET is a snap with Redis and NRedisTimeSeries. In this tutorial, we'll explore how to use them together.

Create your Project#

Start out by creating a project with the command:

dotnet new console -n TimeSeriesDemoApp

Next, inside the TimeSeriesDemoApp directory, run the command:

dotnet add package NRedisTimeSeries

Get a RedisTimeSeries Database#

The next step is to get a RedisTimeSeries database up and running. The easiest way to do that for development purposes is to use Docker:

docker run -p 6379:63379 redislabs/redistimeseries

If you are well past getting started and want to get something into your production, your best bet is to run it in Redis Enterprise.

Connecting to Redis#

Open the Program.cs file, in here, create a new ConnectionMultiplexer using a connection string (which will vary based on what deployment you're using). Then, for our basic Docker setup, you'll just run:

var muxer = ConnectionMultiplexer.Connect("localhost");
var db = muxer.GetDatabase();

Create a Time Series#

Now that you've gotten a handle to Redis, your next step is to initialize a time series. This will be a bit of a toy example. We are going to start off by just creating a time series called sensor, we will set its retention period to 1 minute, and we just give it an id label of sensor-1:

await db.TimeSeriesCreateAsync("sensor", 60000, new List<TimeSeriesLabel>{new TimeSeriesLabel("id", "sensor-1")});

Producer Task#

Next, we'll create a task that will run a consumer in the background. Every second it will send a random integer between 1 and 50 into our time series.

var producerTask = Task.Run(async()=>{
while(true)
{
await db.TimeSeriesAddAsync("sensor", "*", Random.Shared.Next(50));
await Task.Delay(1000);
}
});

Consumer Task#

With the Producer created, we'll create a consumer loop that will do the opposite. Every second it will pull the most recent item in the time series off and print it out.

var consumerTask = Task.Run(async()=>{
while(true)
{
await Task.Delay(1000);
var result = await db.TimeSeriesGetAsync("sensor");
Console.WriteLine($"{result.Time.Value}: {result.Val}");
}
});
await Task.WhenAll(producerTask, consumerTask);

Run the App#

Now that we produce and consume data run the app with dotnet run. This will run a continuous loop in the time series as it continually produces and consumes data points.

Run Aggregations in the Time Series#

Now what we've done so far is produce a time series of random integer data for our .NET app to consume. What if we wanted to do something a bit more interesting with it, though? Let's say we wanted to calculate a moving average every 5 seconds. We can do that with ease using Redis TimeSeries.

Create Rules to Store Aggregations#

Let's run min, max, and average every 5 seconds on our Time Series. Redis will do this passively in the background after we set up some keys to store them in and set up the rules.

var aggregations = new TsAggregation[]{TsAggregation.Avg, TsAggregation.Min, TsAggregation.Max};
foreach(var agg in aggregations)
{
await db.TimeSeriesCreateAsync($"sensor:{agg}", 60000, new List<TimeSeriesLabel>{new ("type", agg.ToString()), new("aggregation-for", "sensor-1")});
await(db.TimeSeriesCreateRuleAsync("sensor", new TimeSeriesRule($"sensor:{agg}", 5000, agg)));
}

Process Results from Aggregations#

With the rules established, we can consume the relevant time series to get the results. When we were creating the time series for our aggregations, we added a label to all of them: new TimeSeriesLabel("aggregation-for", "sensor-1"). We essentially told Redis that this time series would be an aggregation for sensor-1. We can then use that label to find just the time series aggregations of sensor-1. With this in mind, we can grab all the sensor aggregations in one command to Redis using MGET.

var aggregationConsumerTask = Task.Run(async()=>
{
while(true)
{
await Task.Delay(5000);
var results = await db.TimeSeriesMGetAsync(new List<string>(){"aggregation-for=sensor-1"}, true);
foreach(var result in results)
{
Console.WriteLine($"{result.labels.First(x=>x.Key == "type").Value}: {result.value.Val}");
}
}
});

With all these sets, you can now just update the Task.WhenAll call at the end to include the new consumer task:

await Task.WhenAll(producerTask, consumerTask, aggregationConsumerTask);

When we run the application with dotnet run, you will see that the application will also print out the average, min, and max for the last 5 seconds of the time series, in addition to the regular ticks of the time series.

Resources#

  • The Source Code for this demo is located in GitHub
  • The source code for NRedisTimeSeries is also located in GitHub
  • More information about Redis Time Series can be found at redistimeseries.io