Throw warnings when Redis lock is held for more than the allowed threshold
This commit is contained in:
parent
adfda8adfd
commit
89cb17cb75
@ -684,6 +684,9 @@ class Config(Base):
|
||||
# Maximum expiration lock time for redis state manager
|
||||
redis_lock_expiration: int = constants.Expiration.LOCK
|
||||
|
||||
# Maximum lock time before warning for redis state manager.
|
||||
redis_lock_warning_threshold: int = constants.Expiration.LOCK_WARNING_THRESHOLD
|
||||
|
||||
# Token expiration time for redis state manager
|
||||
redis_token_expiration: int = constants.Expiration.TOKEN
|
||||
|
||||
|
@ -29,6 +29,8 @@ class Expiration(SimpleNamespace):
|
||||
LOCK = 10000
|
||||
# The PING timeout
|
||||
PING = 120
|
||||
# The maximum time in milliseconds to hold a lock before throwing a warning.
|
||||
LOCK_WARNING_THRESHOLD = 1000
|
||||
|
||||
|
||||
class GitIgnore(SimpleNamespace):
|
||||
|
@ -94,6 +94,7 @@ from reflex.utils.exceptions import (
|
||||
DynamicRouteArgShadowsStateVar,
|
||||
EventHandlerShadowsBuiltInStateMethod,
|
||||
ImmutableStateError,
|
||||
InvalidLockWarningThresholdError,
|
||||
InvalidStateManagerMode,
|
||||
LockExpiredError,
|
||||
ReflexRuntimeError,
|
||||
@ -3203,6 +3204,30 @@ def _default_lock_expiration() -> int:
|
||||
return get_config().redis_lock_expiration
|
||||
|
||||
|
||||
def _default_lock_warning_threshold() -> int:
|
||||
"""Get the default lock warning threshold.
|
||||
|
||||
Returns:
|
||||
The default lock warning threshold.
|
||||
"""
|
||||
lock_warning_threshold = get_config().redis_lock_warning_threshold
|
||||
_validate_lock_warning_threshold(lock_warning_threshold, _default_lock_expiration())
|
||||
return lock_warning_threshold
|
||||
|
||||
|
||||
def _validate_lock_warning_threshold(lock_warning_threshold: int, lock_expiration: int):
|
||||
"""Validate the lock warning threshold.
|
||||
|
||||
Args:
|
||||
lock_warning_threshold: The lock warning threshold.
|
||||
lock_expiration: The lock expiration time.
|
||||
"""
|
||||
if lock_warning_threshold >= lock_expiration:
|
||||
raise InvalidLockWarningThresholdError(
|
||||
f"The lock warning threshold({lock_warning_threshold}) must be less than the lock expiration time({lock_expiration})."
|
||||
)
|
||||
|
||||
|
||||
class StateManagerRedis(StateManager):
|
||||
"""A state manager that stores states in redis."""
|
||||
|
||||
@ -3215,6 +3240,11 @@ class StateManagerRedis(StateManager):
|
||||
# The maximum time to hold a lock (ms).
|
||||
lock_expiration: int = pydantic.Field(default_factory=_default_lock_expiration)
|
||||
|
||||
# The minimum time to hold a lock (ms).
|
||||
lock_warning_threshold: int = pydantic.Field(
|
||||
default_factory=_default_lock_warning_threshold
|
||||
)
|
||||
|
||||
# The keyspace subscription string when redis is waiting for lock to be released
|
||||
_redis_notify_keyspace_events: str = (
|
||||
"K" # Enable keyspace notifications (target a particular key)
|
||||
@ -3402,6 +3432,19 @@ class StateManagerRedis(StateManager):
|
||||
f"`app.state_manager.lock_expiration` (currently {self.lock_expiration}) "
|
||||
"or use `@rx.event(background=True)` decorator for long-running tasks."
|
||||
)
|
||||
elif lock_id is not None:
|
||||
time_taken = self.lock_expiration / 1000 - (
|
||||
await self.redis.ttl(self._lock_key(token))
|
||||
)
|
||||
_validate_lock_warning_threshold(
|
||||
self.lock_warning_threshold, self.lock_expiration
|
||||
)
|
||||
if time_taken > self.lock_warning_threshold / 1000:
|
||||
console.warn(
|
||||
f"Lock for token {token} was held too long {time_taken=}s, avoid blocking operations.",
|
||||
dedupe=True,
|
||||
)
|
||||
|
||||
client_token, substate_name = _split_substate_key(token)
|
||||
# If the substate name on the token doesn't match the instance name, it cannot have a parent.
|
||||
if state.parent_state is not None and state.get_full_name() != substate_name:
|
||||
|
@ -20,6 +20,24 @@ _EMITTED_DEPRECATION_WARNINGS = set()
|
||||
# Info messages which have been printed.
|
||||
_EMITTED_INFO = set()
|
||||
|
||||
# Warnings which have been printed.
|
||||
_EMIITED_WARNINGS = set()
|
||||
|
||||
# Errors which have been printed.
|
||||
_EMITTED_ERRORS = set()
|
||||
|
||||
# Success messages which have been printed.
|
||||
_EMITTED_SUCCESS = set()
|
||||
|
||||
# Debug messages which have been printed.
|
||||
_EMITTED_DEBUG = set()
|
||||
|
||||
# Logs which have been printed.
|
||||
_EMITTED_LOGS = set()
|
||||
|
||||
# Prints which have been printed.
|
||||
_EMITTED_PRINTS = set()
|
||||
|
||||
|
||||
def set_log_level(log_level: LogLevel):
|
||||
"""Set the log level.
|
||||
@ -55,25 +73,37 @@ def is_debug() -> bool:
|
||||
return _LOG_LEVEL <= LogLevel.DEBUG
|
||||
|
||||
|
||||
def print(msg: str, **kwargs):
|
||||
def print(msg: str, dedupe: bool = False, **kwargs):
|
||||
"""Print a message.
|
||||
|
||||
Args:
|
||||
msg: The message to print.
|
||||
dedupe: If True, suppress multiple console logs of print message.
|
||||
kwargs: Keyword arguments to pass to the print function.
|
||||
"""
|
||||
if dedupe:
|
||||
if msg in _EMITTED_PRINTS:
|
||||
return
|
||||
else:
|
||||
_EMITTED_PRINTS.add(msg)
|
||||
_console.print(msg, **kwargs)
|
||||
|
||||
|
||||
def debug(msg: str, **kwargs):
|
||||
def debug(msg: str, dedupe: bool = False, **kwargs):
|
||||
"""Print a debug message.
|
||||
|
||||
Args:
|
||||
msg: The debug message.
|
||||
dedupe: If True, suppress multiple console logs of debug message.
|
||||
kwargs: Keyword arguments to pass to the print function.
|
||||
"""
|
||||
if is_debug():
|
||||
msg_ = f"[purple]Debug: {msg}[/purple]"
|
||||
if dedupe:
|
||||
if msg_ in _EMITTED_DEBUG:
|
||||
return
|
||||
else:
|
||||
_EMITTED_DEBUG.add(msg_)
|
||||
if progress := kwargs.pop("progress", None):
|
||||
progress.console.print(msg_, **kwargs)
|
||||
else:
|
||||
@ -97,25 +127,37 @@ def info(msg: str, dedupe: bool = False, **kwargs):
|
||||
print(f"[cyan]Info: {msg}[/cyan]", **kwargs)
|
||||
|
||||
|
||||
def success(msg: str, **kwargs):
|
||||
def success(msg: str, dedupe: bool = False, **kwargs):
|
||||
"""Print a success message.
|
||||
|
||||
Args:
|
||||
msg: The success message.
|
||||
dedupe: If True, suppress multiple console logs of success message.
|
||||
kwargs: Keyword arguments to pass to the print function.
|
||||
"""
|
||||
if _LOG_LEVEL <= LogLevel.INFO:
|
||||
if dedupe:
|
||||
if msg in _EMITTED_SUCCESS:
|
||||
return
|
||||
else:
|
||||
_EMITTED_SUCCESS.add(msg)
|
||||
print(f"[green]Success: {msg}[/green]", **kwargs)
|
||||
|
||||
|
||||
def log(msg: str, **kwargs):
|
||||
def log(msg: str, dedupe: bool = False, **kwargs):
|
||||
"""Takes a string and logs it to the console.
|
||||
|
||||
Args:
|
||||
msg: The message to log.
|
||||
dedupe: If True, suppress multiple console logs of log message.
|
||||
kwargs: Keyword arguments to pass to the print function.
|
||||
"""
|
||||
if _LOG_LEVEL <= LogLevel.INFO:
|
||||
if dedupe:
|
||||
if msg in _EMITTED_LOGS:
|
||||
return
|
||||
else:
|
||||
_EMITTED_LOGS.add(msg)
|
||||
_console.log(msg, **kwargs)
|
||||
|
||||
|
||||
@ -129,14 +171,20 @@ def rule(title: str, **kwargs):
|
||||
_console.rule(title, **kwargs)
|
||||
|
||||
|
||||
def warn(msg: str, **kwargs):
|
||||
def warn(msg: str, dedupe: bool = False, **kwargs):
|
||||
"""Print a warning message.
|
||||
|
||||
Args:
|
||||
msg: The warning message.
|
||||
dedupe: If True, suppress multiple console logs of warning message.
|
||||
kwargs: Keyword arguments to pass to the print function.
|
||||
"""
|
||||
if _LOG_LEVEL <= LogLevel.WARNING:
|
||||
if dedupe:
|
||||
if msg in _EMIITED_WARNINGS:
|
||||
return
|
||||
else:
|
||||
_EMIITED_WARNINGS.add(msg)
|
||||
print(f"[orange1]Warning: {msg}[/orange1]", **kwargs)
|
||||
|
||||
|
||||
@ -169,14 +217,20 @@ def deprecate(
|
||||
_EMITTED_DEPRECATION_WARNINGS.add(feature_name)
|
||||
|
||||
|
||||
def error(msg: str, **kwargs):
|
||||
def error(msg: str, dedupe: bool = False, **kwargs):
|
||||
"""Print an error message.
|
||||
|
||||
Args:
|
||||
msg: The error message.
|
||||
dedupe: If True, suppress multiple console logs of error message.
|
||||
kwargs: Keyword arguments to pass to the print function.
|
||||
"""
|
||||
if _LOG_LEVEL <= LogLevel.ERROR:
|
||||
if dedupe:
|
||||
if msg in _EMITTED_ERRORS:
|
||||
return
|
||||
else:
|
||||
_EMITTED_ERRORS.add(msg)
|
||||
print(f"[red]{msg}[/red]", **kwargs)
|
||||
|
||||
|
||||
|
@ -183,3 +183,7 @@ def raise_system_package_missing_error(package: str) -> NoReturn:
|
||||
" Please install it through your system package manager."
|
||||
+ (f" You can do so by running 'brew install {package}'." if IS_MACOS else "")
|
||||
)
|
||||
|
||||
|
||||
class InvalidLockWarningThresholdError(ReflexError, ValueError):
|
||||
"""Raised when an invalid lock warning threshold is provided."""
|
||||
|
Loading…
Reference in New Issue
Block a user