CSRedis is an MIT Licensed Open source project which provides a straightforward interface for executing commands. CSRedis can be used effectively for performing blocking stream reads with the one major downside that it does not support any async API for them.
Before we begin, we'll start up Redis. If you are developing locally, which we'll assume you are for the duration of this tutorial, you can start Redis with a simple docker command.
We will build a simple console application for streaming telemetry using the library. To do so, use the
dotnet new command:
cd StreamsWithCSRedis command to change directories into the application's directory and run the following to add the CSRedis package
When we start up our app, the first thing we'll do is create our
avg group. To make this group, open up
Program.cs and add to it the following:
This code will create a cancellation token for the threads we'll spin up to do the writes/reads to the stream, create a client, check if our
avg group already exists, and finally create the
avg group if it doesn't.
Next, we'll write out to the stream. We'll call the stream
stream, and send a
time field along with the stream. We'll do this every 2 seconds. We'll put this on its own thread, since this operation isn't actually 'blocking' in the Redis sense, it may be alright to spin it out on its task, but as the other two operations in here are blocking, it's better to spin it off on its own thread as well. Add the following to your
The next issue we'll need to dispose of is parsing the read results from the
XREADGROUP commands. CSRedis handles return types generally as tuples in a reply, so we'll need a way to parse the result into something more useable. In this case, we'll parse the results into a dictionary. For the sake of brevity, we will keep everything in this project in
Program.cs on the top-level method, so we'll declare a
Func to handle the parsing. This function will pull the first message from the first stream and arrange the values returned into a dictionary. A couple of things to consider here if you wanted to expand this further is that you could reply with a dictionary of dictionaries if you were pulling back multiple messages from multiple streams. This complexity is intentionally left out.
There are two primary types of 'read' methods,
XREADGROUP, this is in addition to the various range methods, which are their category and operate semantically differently from the read operations.
XREAD lets you read off a given stream and read the next item that hit's the stream. You can do this with the special
$ id. For our purposes here, we are going to block for two seconds, or whenever we get a response back from redis, whichever comes first:
XREADGROUP commands operate very similarly to
XREAD. In this case, however, the creation of the group told us what id to start at, and by passing in the
> we necessarily start off at the next message in the queue. Because we are reading out of a group, we'll also want to
XACK to any messages that we pull down. Also, since this is our average group, we'll maintain an average for our stream's temperatures.
The last thing we'll need to do is start up all the threads, set a cancellation timeout (so the app doesn't run forever), and join all the threads back together:
Now that the app is written, all that's left to do is run it. You can do so by running `dotnet run in your terminal.