Atomicity with Gears
This tutorial uses Lettuce, which is an unsupported Redis library. For production applications, we recommend using Jedis
Improving atomicity and performance with RedisGears
What is RedisGears?
RedisGears is a dynamic server-side data processing engine, where the "server" part is Redis itself. RedisGears is distributed as a Redis module. You can start a Redis instance preconfigured with Gears using the official Docker image:
docker run -p 6379:6379 redislabs/redisgears:latest
Or, as I do most of the time, using the "redismod" image which include Gears and all the other Redis, Inc. supported modules:
docker run -p 6379:6379 redislabs/redismod
RedisGears was built with the purpose of providing a data processing engine inside of Redis, with more formal semantics than the simpler Lua server-side scripting. Think of it as a more flexible map-reduce engine for Redis. It supports supports transaction, batch, and event-driven processing of Redis data. Gears allow you to localize computation and data provides a built-in coordinator to facilitate processing distributed data in a clustered environment.
In RedisGears, the main unit of processing is the RedisGears function, which can be (currently) written in Python (more languages are being worked on). These functions run on their own thread, separate from Redis' main thread and can be executed in response to keyspace events or imperatively as a result of external commands. The functions are "registered" (deployed) to the Gears engine, and have an associated name and a registration Id.
During registration we pick a specific reader for our function which defines how the function gets its initial data:
KeysReader
: Redis keys and values.KeysOnlyReader
: Redis keys.StreamReader
: Redis Stream messages.PythonReader
: Arbitrary Python generator.ShardsIDReader
: Shard ID.CommandReader
: Command arguments from application client.
A Rate-Limiting RedisGears Function
Depending on the reader type, Gear Functions can either be run immediately, on demand, as batch jobs or in an event-driven manner by registering it to trigger automatically on various types of events.
The Python function rate_limit
takes 3 parameters:
key
: The Redis key backing the counter for a given user.max_request
: The request quota for the user.expiry
: The number of seconds in the future to set the counter TTL.
def rate_limit(key, max_requests, expiry):
requests = execute('GET', key)
requests = int(requests) if requests else -1
max_requests = int(max_requests)
expiry = int(expiry)
if (requests == -1) or (requests < max_requests):
with atomic():
execute('INCR', key)
execute('EXPIRE', key, expiry)
return False
else:
return True
# Function registration
gb = GB('CommandReader')
gb.map(lambda x: rate_limit(x[1], x[2], x[3]))
gb.register(trigger='RateLimiter')
Place the script under src/main/resources/scripts
. Now, Let's break it down:
The rate_limit
function
Similarly to what we did in the previous implementation, we:
- Retrieve the current number of requests for the passed
key
byexecute
-ing theGET
command. - Cast the result to an
int
and if not found, default to-1
- Cast
max_requests
andexpiry
toint
- If the quota hasn't been exceeded, perform the
INCR
/EXPIRE
commands in a transactions (with atomic():
) and returnFalse
(no rate limiting - request is allowed) - Otherwise, return
True
(deny the request)
Function Registration
- At the bottom of the script, in the
# Function registration
section, we instantiate theGearsBuilder
(GB
) using theCommandReader
reader. TheGearsBuilder
"builds" the context of the function, in parameters, transformations, triggers, etc. - We use the
map
method to performs a one-to-one mapping of records to the params of therate_limit
function via a mapper function callback. - We can now invoke the
register
action to register the function as an event handler. The event in our case is the trigger'RateLimiter'
.
RedisGears in SpringBoot
In order to use our RedisGear function from our SpringBoot application we need to do a few things:
- Deploy the function to the Redis server
- Execute the function to get a yay/nay answer on each request
Lettuce Mod
LettuceMod is a Java client for Redis Modules based on Lettuce created by Julien Ruaux . It supports the following modules in standalone or cluster configurations:
- Triggers and Functions
- JSON
- Search
- Time Series
To use LettuceMod we'll add the dependency to the Maven POM as shown:
<dependency>
<groupId>com.redis</groupId>
<artifactId>spring-lettucemod</artifactId>
<version>1.7.0</version>
</dependency>
Accessing Gears Commands in SpringBoot
To access any of the LettuceMod supported modules we will inject a StatefulRedisModulesConnection
in
our FixedWindowRateLimiterApplication
class as follows:
@Autowired
StatefulRedisModulesConnection<String, String> connection;
Add the matching import statement:
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
Registering the Gears function
We'll start by writing a function to determine whether the function with the trigger RateLimiter
has been
registered. It takes a List
of Registration
s and digs deep to extract the value of the trigger
argument
using the Java Streams API:
private Optional<String> getGearsRegistrationIdForTrigger(List<Registration> registrations, String trigger) {
return registrations.stream().filter(r -> r.getData().getArgs().get("trigger").equals(trigger)).findFirst().map(Registration::getId);
}
In the @PostConstruct
annotated method loadGearsScript
method:
- We retrieve an instance of the
RedisGearsCommands
from the previously injectedStatefulRedisModulesConnection
- We get the currently registered Gears functions via the
dumpregistrations
method - We pass the list of registrations to our
getGearsRegistrationIdForTrigger
- If we don't find the registration we proceed to register the function:
- Load the function from the classpath into a
String
namedpy
- Use the
pyexecute
method passing thepy
script payload
- Load the function from the classpath into a
@PostConstruct
public void loadGearsScript() throws IOException {
String py = StreamUtils.copyToString(new ClassPathResource("scripts/rateLimiter.py").getInputStream(),
Charset.defaultCharset());
RedisGearsCommands<String, String> gears = connection.sync();
List<Registration> registrations = gears.dumpregistrations();
Optional<String> maybeRegistrationId = getGearsRegistrationIdForTrigger(registrations, "RateLimiter");
if (maybeRegistrationId.isEmpty()) {
try {
ExecutionResults er = gears.pyexecute(py);
if (er.isOk()) {
logger.info("RateLimiter.py has been registered");
} else if (er.isError()) {
logger.error(String.format("Could not register RateLimiter.py -> %s", Arrays.toString(er.getErrors().toArray())));
}
} catch (RedisCommandExecutionException rcee) {
logger.error(String.format("Could not register RateLimiter.py -> %s", rcee.getMessage()));
}
} else {
logger.info("RateLimiter.py has already been registered");
}
}
Modifying the Filter to use the Gears function
Next, we'll modify the filter to include the StatefulRedisModulesConnection
as well as the
quota; the value that we need to pass to the function:
class RateLimiterHandlerFilterFunction implements HandlerFilterFunction<ServerResponse, ServerResponse> {
private StatefulRedisModulesConnection<String, String> connection;
private Long maxRequestPerMinute;
public RateLimiterHandlerFilterFunction(StatefulRedisModulesConnection<String, String> connection,
Long maxRequestPerMinute) {
this.connection = connection;
this.maxRequestPerMinute = maxRequestPerMinute;
}
Now we can modify the filter
method to use the function. Gears functions are invoked by
triggering the correct event RateLimiter
and passing the parameters required by the function;
the key
, the quota and the TTL seconds in the future.
As we've have done previously, if the function returns false
we let the request through, otherwise
we return an HTTP 429
:
@Override
public Mono<ServerResponse> filter(ServerRequest request, HandlerFunction<ServerResponse> next) {
int currentMinute = LocalTime.now().getMinute();
String key = String.format("rl_%s:%s", requestAddress(request.remoteAddress()), currentMinute);
RedisGearsCommands<String, String> gears = connection.sync();
List<Object> results = gears.trigger("RateLimiter", key, Long.toString(maxRequestPerMinute), "59");
if (!results.isEmpty() && !Boolean.parseBoolean((String) results.get(0))) {
return next.handle(request);
} else {
return ServerResponse.status(TOO_MANY_REQUESTS).build();
}
}
Testing with curl
Once again, we use curl loop to test the limiter:
for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; done
You should see the 21st request being rejected:
➜ for n in {1..22}; do echo $(curl -s -w " :: HTTP %{http_code}, %{size_download} bytes, %{time_total} s" -X GET http://localhost:8080/api/ping); sleep 0.5; done
PONG :: HTTP 200, 4 bytes, 0.064786 s
PONG :: HTTP 200, 4 bytes, 0.009926 s
PONG :: HTTP 200, 4 bytes, 0.009546 s
PONG :: HTTP 200, 4 bytes, 0.010189 s
PONG :: HTTP 200, 4 bytes, 0.009399 s
PONG :: HTTP 200, 4 bytes, 0.009210 s
PONG :: HTTP 200, 4 bytes, 0.008333 s
PONG :: HTTP 200, 4 bytes, 0.008009 s
PONG :: HTTP 200, 4 bytes, 0.008919 s
PONG :: HTTP 200, 4 bytes, 0.009271 s
PONG :: HTTP 200, 4 bytes, 0.007515 s
PONG :: HTTP 200, 4 bytes, 0.007057 s
PONG :: HTTP 200, 4 bytes, 0.008373 s
PONG :: HTTP 200, 4 bytes, 0.007573 s
PONG :: HTTP 200, 4 bytes, 0.008209 s
PONG :: HTTP 200, 4 bytes, 0.009080 s
PONG :: HTTP 200, 4 bytes, 0.007595 s
PONG :: HTTP 200, 4 bytes, 0.007955 s
PONG :: HTTP 200, 4 bytes, 0.007693 s
PONG :: HTTP 200, 4 bytes, 0.008743 s
:: HTTP 429, 0 bytes, 0.007226 s
:: HTTP 429, 0 bytes, 0.007388 s
If we run Redis in monitor mode, we should see the Lua calls to RG.TRIGGER
and under that you should see the
calls to GET
, INCR
and EXPIRE
for allowed requests:
1631249244.006212 [0 172.17.0.1:56036] "RG.TRIGGER" "RateLimiter" "rl_localhost:47" "20" "59"
1631249244.006995 [0 ?:0] "GET" "rl_localhost:47"
1631249244.007182 [0 ?:0] "INCR" "rl_localhost:47"
1631249244.007269 [0 ?:0] "EXPIRE" "rl_localhost:47" "59"
And for rate limited request you should see only the call to GET
:
1631249244.538478 [0 172.17.0.1:56036] "RG.TRIGGER" "RateLimiter" "rl_localhost:47" "20" "59"
1631249244.538809 [0 ?:0] "GET" "rl_localhost:47"
The complete code for this implementation is under the branch with_gears
.