How to handle blocking stream reads with ServiceStack.Redis
ServiceStack.Redis is part of the ServiceStack suite, it has some restrictions when used for commercial purposes - see their license
#
Start RedisIf you're developing locally (which is what we will assume for the balance of this tutorial), you can start Redis fairly quickly with docker:
#
Create the appWe will build a simple console application for streaming telemetry using the library. To do so, use the dotnet new
command:
#
Add the package to your appYou can add this package to your app with:
#
Initialize the client managerTo initialize a client with ServiceStack, you'll need to create a RedisClientManager
. Then, add the following to Program.cs
.
#
Add items to streamsRedis streams are not yet fully supported by ServiceStack.Redis, however, you can run raw commands easily with the CustomAsync
method. So let's create a new class called Producer.cs
and add the following to it.
This code will send new telemetry every 10 seconds to the telemetry
stream, with a temp
record and a time
record.
#
Reading messagesAs mentioned earlier, ServiceStack does not have native support for the Streams API, so we need to do a bit of work after retrieving a record from a stream. However, this isn't a complex operation since the resulting structure is a predictable set of nested arrays going from an array of the streams requested to an array of messages retrieved from each stream to the message itself split between its id and its attributes. Finally, the field value pairs within a message; this looks something like this:
This data structure is pretty predictable to parse, so we'll add a little parsing method. First, Create Consumer.cs
and add the following to it:
ParseStreamResult
will yield the first message from the first stream of an XREAD
or XREADGROUP
, this isn't a fully generalized solution but will serve our purposes here.
#
Reading a stream outside a group with XREADTo read the next message in a stream, which is necessarily a blocking operation, you will use the XREAD
command with the BLOCK
option and the special $
id. Then, in the Consumer
class, add the following, which will read off the stream in a continuous loop, blocking for 20 seconds at each request.
#
Reading with consumer groupsReading messages in a consumer group can be helpful in cases where you have a common task that you want to distribute across many consumers in a high-throughput environment. It's a two-step process:
- Read the stream
- Acknowledge receipt of the message
This task can be done by running an XREADGROUP
and a XACK
back to back. The XREADGROUP
will take, in addition to the parameters we spoke about for the XREAD
, the GROUP
name, the consumer's name, and instead of taking the special $
id, it will take the special >
id, which will have it take the next unassigned id for the group. We'll then extract the information from it, update our average, and then acknowledge the receipt of the message.
#
Create the group and start the tasksThe final bit we need is to create the group and start up all the tasks. We'll use the XGROUP
command with the MKSTREAM
option to create the group. We'll then start up all the tasks we need for our producer and consumers, and we'll await everything. Add the following to your Program.cs
file:
#
Run the appAll that's left to do is to run the app, and you'll see a continuous stream of messages coming in every 10 seconds. You can run the app by running:
#
Resources:- The source for this tutorial is in GitHub
- Redis University has an extensive course on Redis Streams where you can learn everything you need to know about them.
- You can learn more about Redis Streams in the Streams Info article on redis.io