Redis Streams are a powerful data structure that allows you to use Redis as a sort of Message bus to transport messages between different application components. The way streams operate in Redis is very fast and memory efficient. This article will not go over the minutia of every command available for Redis Streams, but rather it's aimed to provide a high-level tutorial for how you can use Redis Streams with .NET.
The first thing we'll want to do is start Redis. If you already have an instance of Redis, you can ignore this bit and adjust the connection step below to connect to your instance of Redis. Redis is straightforward to get up and running; you can do so using docker:
For simplicity's sake, we'll stick to a simple console app, from which we'll spin out a few tasks that will perform the various add/read operations that we'll use. Create a new console app with the
dotnet new command:
Next, we'll need to add the client library that we will use to interface with Redis StackExchange.Redis is the canonical package, thus, we will use that in this example. First cd into the RedisStreamsBasics directory and then run the
dotnet add package directory:
StackExchange.Redis centers more or less around the
ConnectionMultiplexer, which handles the routing and queuing of all commands that you send to Redis. So our first step that's code-related is to initialize the Multiplexer. Creating the Multiplexer is pretty straightforward; open up
Program.cs in your IDE and add the following bit to it:
We're also initializing a
CancellationTokenSource here. We'll set these up towards the end of this tutorial so that this application does not run endlessly. Also, we're creating a couple of constants, the stream's name and the group's name, that we'll use later, and we are also grabbing an
IDatabase object from the Multiplexer to use
A Consumer Group in a Redis Stream allows you to group a bunch of consumers to pull messages off the stream for the group. This functionality is excellent when you have high throughput workloads, and you want to scale out the workers who will process your messages. To use a consumer group, you first need to create it. To create a consumer group, you'll use the
StreamCreateConsumerGroupAsync method, passing in the
groupName, as well as the starting id - we'll use the
0-0 id (the lowest id allowable in Redis Streams). Before invoking this call, it's wise to validate that the group doesn't exist yet, as creating an already existing user group will result in an error. So first, we'll check if the stream exists; if it doesn't, we can create the group. Next, we'll use the stream info method to see if any groups match the
Three tasks will run in parallel for our program. The first is the
producerTask. This Task will write a random number between 50 and 65 as the
temp and send the current time as the
The results retrieved from Redis will be in a reasonably readable form; all the same, it is helpful for our purposes to parse the result into a dictionary. To do this, add an inline function to handle the parsing:
Note: Stream messages enforce no requirement that field names be unique. We use a dictionary for clarity sake in this example, but you will need to ensure that you are not passing in multiple fields with the same names in your usage to prevent an issue using a dictionary.
Next, we'll need to spin up a task to read the most recent element off of the stream. To do this, we'll use the
StreamRangeAsync method passing in two special ids,
- which means the lowest id, and
+, which means the highest id. Running this command will result in some duplication. This redundancy is necessary because the
StackExchange.Redis library does not support blocking stream reads and does not support the special
$ character for stream reads. Overcoming this behavior is explored in-depth in the Blocking Reads tutorial. For this tutorial, you can manage these most-recent reads with the following code:
The final Task we'll spin up is the read task for the consumer group. Due to the nature of consumer groups, you can spin this Task up multiple times to scale out the processing as needed. It's the responsibility of Redis to keep track of which messages it's distributed to the consumer group. As well as tracking which messages Consumers have acknowledged. Acknowledging messages adds a layer of validation that all messages were processed. If something happens to one of your processing tasks or processes, you can more easily know what messages you missed.
We'll check to see if we have a recent message-id to handle all of this. If we do, we will send an acknowledgment to the server that the id was processed. Then we will grab the next message to be processed from the stream, pull out the data and the id and print out the result.
Finally, we need to set the timeout and await the tasks at the end of our program:
You can now run this app with the
dotnet run command.