From 89cb17cb7515ed2ead5b5c2b1b0d45476725a4f5 Mon Sep 17 00:00:00 2001 From: Elijah Date: Wed, 11 Dec 2024 17:55:01 +0000 Subject: [PATCH] Throw warnings when Redis lock is held for more than the allowed threshold --- reflex/config.py | 3 ++ reflex/constants/config.py | 2 ++ reflex/state.py | 43 +++++++++++++++++++++++++ reflex/utils/console.py | 66 ++++++++++++++++++++++++++++++++++---- reflex/utils/exceptions.py | 4 +++ 5 files changed, 112 insertions(+), 6 deletions(-) diff --git a/reflex/config.py b/reflex/config.py index ae2c0ea0e..bbea6a5d0 100644 --- a/reflex/config.py +++ b/reflex/config.py @@ -684,6 +684,9 @@ class Config(Base): # Maximum expiration lock time for redis state manager redis_lock_expiration: int = constants.Expiration.LOCK + # Maximum lock time before warning for redis state manager. + redis_lock_warning_threshold: int = constants.Expiration.LOCK_WARNING_THRESHOLD + # Token expiration time for redis state manager redis_token_expiration: int = constants.Expiration.TOKEN diff --git a/reflex/constants/config.py b/reflex/constants/config.py index 970e67844..7425fd864 100644 --- a/reflex/constants/config.py +++ b/reflex/constants/config.py @@ -29,6 +29,8 @@ class Expiration(SimpleNamespace): LOCK = 10000 # The PING timeout PING = 120 + # The maximum time in milliseconds to hold a lock before throwing a warning. + LOCK_WARNING_THRESHOLD = 1000 class GitIgnore(SimpleNamespace): diff --git a/reflex/state.py b/reflex/state.py index 3e606bf57..a5b9f14e7 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -94,6 +94,7 @@ from reflex.utils.exceptions import ( DynamicRouteArgShadowsStateVar, EventHandlerShadowsBuiltInStateMethod, ImmutableStateError, + InvalidLockWarningThresholdError, InvalidStateManagerMode, LockExpiredError, ReflexRuntimeError, @@ -3203,6 +3204,30 @@ def _default_lock_expiration() -> int: return get_config().redis_lock_expiration +def _default_lock_warning_threshold() -> int: + """Get the default lock warning threshold. + + Returns: + The default lock warning threshold. + """ + lock_warning_threshold = get_config().redis_lock_warning_threshold + _validate_lock_warning_threshold(lock_warning_threshold, _default_lock_expiration()) + return lock_warning_threshold + + +def _validate_lock_warning_threshold(lock_warning_threshold: int, lock_expiration: int): + """Validate the lock warning threshold. + + Args: + lock_warning_threshold: The lock warning threshold. + lock_expiration: The lock expiration time. + """ + if lock_warning_threshold >= lock_expiration: + raise InvalidLockWarningThresholdError( + f"The lock warning threshold({lock_warning_threshold}) must be less than the lock expiration time({lock_expiration})." + ) + + class StateManagerRedis(StateManager): """A state manager that stores states in redis.""" @@ -3215,6 +3240,11 @@ class StateManagerRedis(StateManager): # The maximum time to hold a lock (ms). lock_expiration: int = pydantic.Field(default_factory=_default_lock_expiration) + # The minimum time to hold a lock (ms). + lock_warning_threshold: int = pydantic.Field( + default_factory=_default_lock_warning_threshold + ) + # The keyspace subscription string when redis is waiting for lock to be released _redis_notify_keyspace_events: str = ( "K" # Enable keyspace notifications (target a particular key) @@ -3402,6 +3432,19 @@ class StateManagerRedis(StateManager): f"`app.state_manager.lock_expiration` (currently {self.lock_expiration}) " "or use `@rx.event(background=True)` decorator for long-running tasks." ) + elif lock_id is not None: + time_taken = self.lock_expiration / 1000 - ( + await self.redis.ttl(self._lock_key(token)) + ) + _validate_lock_warning_threshold( + self.lock_warning_threshold, self.lock_expiration + ) + if time_taken > self.lock_warning_threshold / 1000: + console.warn( + f"Lock for token {token} was held too long {time_taken=}s, avoid blocking operations.", + dedupe=True, + ) + client_token, substate_name = _split_substate_key(token) # If the substate name on the token doesn't match the instance name, it cannot have a parent. if state.parent_state is not None and state.get_full_name() != substate_name: diff --git a/reflex/utils/console.py b/reflex/utils/console.py index b3ba7163d..be545140a 100644 --- a/reflex/utils/console.py +++ b/reflex/utils/console.py @@ -20,6 +20,24 @@ _EMITTED_DEPRECATION_WARNINGS = set() # Info messages which have been printed. _EMITTED_INFO = set() +# Warnings which have been printed. +_EMIITED_WARNINGS = set() + +# Errors which have been printed. +_EMITTED_ERRORS = set() + +# Success messages which have been printed. +_EMITTED_SUCCESS = set() + +# Debug messages which have been printed. +_EMITTED_DEBUG = set() + +# Logs which have been printed. +_EMITTED_LOGS = set() + +# Prints which have been printed. +_EMITTED_PRINTS = set() + def set_log_level(log_level: LogLevel): """Set the log level. @@ -55,25 +73,37 @@ def is_debug() -> bool: return _LOG_LEVEL <= LogLevel.DEBUG -def print(msg: str, **kwargs): +def print(msg: str, dedupe: bool = False, **kwargs): """Print a message. Args: msg: The message to print. + dedupe: If True, suppress multiple console logs of print message. kwargs: Keyword arguments to pass to the print function. """ + if dedupe: + if msg in _EMITTED_PRINTS: + return + else: + _EMITTED_PRINTS.add(msg) _console.print(msg, **kwargs) -def debug(msg: str, **kwargs): +def debug(msg: str, dedupe: bool = False, **kwargs): """Print a debug message. Args: msg: The debug message. + dedupe: If True, suppress multiple console logs of debug message. kwargs: Keyword arguments to pass to the print function. """ if is_debug(): msg_ = f"[purple]Debug: {msg}[/purple]" + if dedupe: + if msg_ in _EMITTED_DEBUG: + return + else: + _EMITTED_DEBUG.add(msg_) if progress := kwargs.pop("progress", None): progress.console.print(msg_, **kwargs) else: @@ -97,25 +127,37 @@ def info(msg: str, dedupe: bool = False, **kwargs): print(f"[cyan]Info: {msg}[/cyan]", **kwargs) -def success(msg: str, **kwargs): +def success(msg: str, dedupe: bool = False, **kwargs): """Print a success message. Args: msg: The success message. + dedupe: If True, suppress multiple console logs of success message. kwargs: Keyword arguments to pass to the print function. """ if _LOG_LEVEL <= LogLevel.INFO: + if dedupe: + if msg in _EMITTED_SUCCESS: + return + else: + _EMITTED_SUCCESS.add(msg) print(f"[green]Success: {msg}[/green]", **kwargs) -def log(msg: str, **kwargs): +def log(msg: str, dedupe: bool = False, **kwargs): """Takes a string and logs it to the console. Args: msg: The message to log. + dedupe: If True, suppress multiple console logs of log message. kwargs: Keyword arguments to pass to the print function. """ if _LOG_LEVEL <= LogLevel.INFO: + if dedupe: + if msg in _EMITTED_LOGS: + return + else: + _EMITTED_LOGS.add(msg) _console.log(msg, **kwargs) @@ -129,14 +171,20 @@ def rule(title: str, **kwargs): _console.rule(title, **kwargs) -def warn(msg: str, **kwargs): +def warn(msg: str, dedupe: bool = False, **kwargs): """Print a warning message. Args: msg: The warning message. + dedupe: If True, suppress multiple console logs of warning message. kwargs: Keyword arguments to pass to the print function. """ if _LOG_LEVEL <= LogLevel.WARNING: + if dedupe: + if msg in _EMIITED_WARNINGS: + return + else: + _EMIITED_WARNINGS.add(msg) print(f"[orange1]Warning: {msg}[/orange1]", **kwargs) @@ -169,14 +217,20 @@ def deprecate( _EMITTED_DEPRECATION_WARNINGS.add(feature_name) -def error(msg: str, **kwargs): +def error(msg: str, dedupe: bool = False, **kwargs): """Print an error message. Args: msg: The error message. + dedupe: If True, suppress multiple console logs of error message. kwargs: Keyword arguments to pass to the print function. """ if _LOG_LEVEL <= LogLevel.ERROR: + if dedupe: + if msg in _EMITTED_ERRORS: + return + else: + _EMITTED_ERRORS.add(msg) print(f"[red]{msg}[/red]", **kwargs) diff --git a/reflex/utils/exceptions.py b/reflex/utils/exceptions.py index 6c378e159..8ec2eeb73 100644 --- a/reflex/utils/exceptions.py +++ b/reflex/utils/exceptions.py @@ -183,3 +183,7 @@ def raise_system_package_missing_error(package: str) -> NoReturn: " Please install it through your system package manager." + (f" You can do so by running 'brew install {package}'." if IS_MACOS else "") ) + + +class InvalidLockWarningThresholdError(ReflexError, ValueError): + """Raised when an invalid lock warning threshold is provided."""