Blocking Stream reads with CSRedis
CSRedis is an MIT Licensed Open source project which provides a straightforward interface for executing commands. CSRedis can be used effectively for performing blocking stream reads with the one major downside that it does not support any async API for them.
#
Start RedisBefore we begin, we'll start up Redis. If you are developing locally, which we'll assume you are for the duration of this tutorial, you can start Redis with a simple docker command.
#
Create the appWe will build a simple console application for streaming telemetry using the library. To do so, use the dotnet new
command:
#
Add the package to your appRun the cd StreamsWithCSRedis
command to change directories into the application's directory and run the following to add the CSRedis package
#
Create groupWhen we start up our app, the first thing we'll do is create our avg
group. To make this group, open up Program.cs
and add to it the following:
This code will create a cancellation token for the threads we'll spin up to do the writes/reads to the stream, create a client, check if our avg
group already exists, and finally create the avg
group if it doesn't.
#
Write to the streamNext, we'll write out to the stream. We'll call the stream stream
, and send a temp
and time
field along with the stream. We'll do this every 2 seconds. We'll put this on its own thread, since this operation isn't actually 'blocking' in the Redis sense, it may be alright to spin it out on its task, but as the other two operations in here are blocking, it's better to spin it off on its own thread as well. Add the following to your Program.cs
file:
#
Parsing read resultsThe next issue we'll need to dispose of is parsing the read results from the XREAD
and XREADGROUP
commands. CSRedis handles return types generally as tuples in a reply, so we'll need a way to parse the result into something more useable. In this case, we'll parse the results into a dictionary. For the sake of brevity, we will keep everything in this project in Program.cs
on the top-level method, so we'll declare a Func
to handle the parsing. This function will pull the first message from the first stream and arrange the values returned into a dictionary. A couple of things to consider here if you wanted to expand this further is that you could reply with a dictionary of dictionaries if you were pulling back multiple messages from multiple streams. This complexity is intentionally left out.
#
Blocking XREADThere are two primary types of 'read' methods, XREAD
and XREADGROUP
, this is in addition to the various range methods, which are their category and operate semantically differently from the read operations. XREAD
lets you read off a given stream and read the next item that hit's the stream. You can do this with the special $
id. For our purposes here, we are going to block for two seconds, or whenever we get a response back from redis, whichever comes first:
#
Blocking XREADGROUPBlocking XREADGROUP
commands operate very similarly to XREAD
. In this case, however, the creation of the group told us what id to start at, and by passing in the >
we necessarily start off at the next message in the queue. Because we are reading out of a group, we'll also want to XACK
to any messages that we pull down. Also, since this is our average group, we'll maintain an average for our stream's temperatures.
#
Spin up threadsThe last thing we'll need to do is start up all the threads, set a cancellation timeout (so the app doesn't run forever), and join all the threads back together:
#
Run the appNow that the app is written, all that's left to do is run it. You can do so by running `dotnet run in your terminal.
#
Resources:- The source for this tutorial is in GitHub
- Redis University has an extensive course on Redis Streams where you can learn everything you need to know about them.
- You can learn more about Redis Streams in the Streams Info article on redis.io