From 1379bc25103d4c2da667c547f7914bf9a13a4758 Mon Sep 17 00:00:00 2001 From: Masen Furer Date: Thu, 12 Dec 2024 01:42:33 -0800 Subject: [PATCH] Set top-level hash expiration if HEXPIRE is not supported Continue to support redis < 7.2 Instead of updating individual substate expiration, in older versions of redis any update to any substate will cause the entire state expiration to be refreshed, which is better than not supporting older redis versions. --- reflex/state.py | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/reflex/state.py b/reflex/state.py index 9564d8037..14b85d744 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -3228,6 +3228,9 @@ class StateManagerRedis(StateManager): # The maximum time to hold a lock (ms). lock_expiration: int = pydantic.Field(default_factory=_default_lock_expiration) + # If HEXPIRE is not supported, use EXPIRE instead. + _hexpire_not_supported: bool | None = pydantic.PrivateAttr(None) + # The keyspace subscription string when redis is waiting for lock to be released _redis_notify_keyspace_events: str = ( "K" # Enable keyspace notifications (target a particular key) @@ -3375,6 +3378,7 @@ class StateManagerRedis(StateManager): Raises: LockExpiredError: If lock_id is provided and the lock for the token is not held by that ID. RuntimeError: If the state instance doesn't match the state name in the token. + ResponseError: If the redis command fails. """ # Check that we're holding the lock. if ( @@ -3406,16 +3410,33 @@ class StateManagerRedis(StateManager): if not redis_hashset: return + try: + await self._hset_pipeline(client_token, redis_hashset) + except ResponseError as re: + if "unknown command 'HEXPIRE'" not in str(re): + raise + # HEXPIRE not supported, try again with fallback expire. + self._hexpire_not_supported = True + await self._hset_pipeline(client_token, redis_hashset) + + async def _hset_pipeline(self, client_token: str, redis_hashset: dict[str, bytes]): + """Set multiple fields in a hash with expiration. + + Args: + client_token: The name of the hash. + redis_hashset: The keys and values to set. + """ pipe = self.redis.pipeline() - await ( - pipe.hset(name=client_token, mapping=redis_hashset) - .hexpire( # type: ignore + pipe.hset(name=client_token, mapping=redis_hashset) + if self._hexpire_not_supported: + pipe.expire(client_token, self.token_expiration) + else: + pipe.hexpire( client_token, self.token_expiration, *redis_hashset.keys(), ) - .execute() - ) + await pipe.execute() @override @contextlib.asynccontextmanager