Set top-level hash expiration if HEXPIRE is not supported

Continue to support redis < 7.2

Instead of updating individual substate expiration, in older versions of redis
any update to any substate will cause the entire state expiration to be
refreshed, which is better than not supporting older redis versions.
This commit is contained in:
Masen Furer 2024-12-12 01:42:33 -08:00
parent 8d99b7662a
commit 1379bc2510
No known key found for this signature in database
GPG Key ID: B0008AD22B3B3A95

View File

@ -3228,6 +3228,9 @@ class StateManagerRedis(StateManager):
# The maximum time to hold a lock (ms). # The maximum time to hold a lock (ms).
lock_expiration: int = pydantic.Field(default_factory=_default_lock_expiration) lock_expiration: int = pydantic.Field(default_factory=_default_lock_expiration)
# If HEXPIRE is not supported, use EXPIRE instead.
_hexpire_not_supported: bool | None = pydantic.PrivateAttr(None)
# The keyspace subscription string when redis is waiting for lock to be released # The keyspace subscription string when redis is waiting for lock to be released
_redis_notify_keyspace_events: str = ( _redis_notify_keyspace_events: str = (
"K" # Enable keyspace notifications (target a particular key) "K" # Enable keyspace notifications (target a particular key)
@ -3375,6 +3378,7 @@ class StateManagerRedis(StateManager):
Raises: Raises:
LockExpiredError: If lock_id is provided and the lock for the token is not held by that ID. LockExpiredError: If lock_id is provided and the lock for the token is not held by that ID.
RuntimeError: If the state instance doesn't match the state name in the token. RuntimeError: If the state instance doesn't match the state name in the token.
ResponseError: If the redis command fails.
""" """
# Check that we're holding the lock. # Check that we're holding the lock.
if ( if (
@ -3406,16 +3410,33 @@ class StateManagerRedis(StateManager):
if not redis_hashset: if not redis_hashset:
return return
try:
await self._hset_pipeline(client_token, redis_hashset)
except ResponseError as re:
if "unknown command 'HEXPIRE'" not in str(re):
raise
# HEXPIRE not supported, try again with fallback expire.
self._hexpire_not_supported = True
await self._hset_pipeline(client_token, redis_hashset)
async def _hset_pipeline(self, client_token: str, redis_hashset: dict[str, bytes]):
"""Set multiple fields in a hash with expiration.
Args:
client_token: The name of the hash.
redis_hashset: The keys and values to set.
"""
pipe = self.redis.pipeline() pipe = self.redis.pipeline()
await (
pipe.hset(name=client_token, mapping=redis_hashset) pipe.hset(name=client_token, mapping=redis_hashset)
.hexpire( # type: ignore if self._hexpire_not_supported:
pipe.expire(client_token, self.token_expiration)
else:
pipe.hexpire(
client_token, client_token,
self.token_expiration, self.token_expiration,
*redis_hashset.keys(), *redis_hashset.keys(),
) )
.execute() await pipe.execute()
)
@override @override
@contextlib.asynccontextmanager @contextlib.asynccontextmanager