From c902e6dd45b6991505b7dac9abf2979c2b95fe0e Mon Sep 17 00:00:00 2001 From: Masen Furer Date: Mon, 6 Jan 2025 15:35:58 -0800 Subject: [PATCH] Enable automatic retry on redis errors ExponentialBackoff 3x retry for BusyLoadingError, ConnectionError, and TimeoutError --- reflex/utils/prerequisites.py | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/reflex/utils/prerequisites.py b/reflex/utils/prerequisites.py index 797d28701..1f95a5649 100644 --- a/reflex/utils/prerequisites.py +++ b/reflex/utils/prerequisites.py @@ -21,15 +21,17 @@ import zipfile from datetime import datetime from pathlib import Path from types import ModuleType -from typing import Callable, List, Optional +from typing import Any, Callable, List, Optional import httpx import typer from alembic.util.exc import CommandError from packaging import version from redis import Redis as RedisSync -from redis import exceptions from redis.asyncio import Redis +from redis.backoff import ExponentialBackoff +from redis.exceptions import BusyLoadingError, ConnectionError, RedisError, TimeoutError +from redis.retry import Retry from reflex import constants, model from reflex.compiler import templates @@ -327,16 +329,24 @@ def get_compiled_app(reload: bool = False, export: bool = False) -> ModuleType: return app_module +def _get_common_redis_kwargs() -> dict[str, Any]: + return { + "retry": Retry(ExponentialBackoff(), 3), + "retry_on_error": [BusyLoadingError, ConnectionError, TimeoutError], + } + + def get_redis() -> Redis | None: """Get the asynchronous redis client. Returns: The asynchronous redis client. """ - if isinstance((redis_url_or_options := parse_redis_url()), str): - return Redis.from_url(redis_url_or_options) - elif isinstance(redis_url_or_options, dict): - return Redis(**redis_url_or_options) + if (redis_url := parse_redis_url()) is not None: + return Redis.from_url( + redis_url, + **_get_common_redis_kwargs(), + ) return None @@ -346,14 +356,15 @@ def get_redis_sync() -> RedisSync | None: Returns: The synchronous redis client. """ - if isinstance((redis_url_or_options := parse_redis_url()), str): - return RedisSync.from_url(redis_url_or_options) - elif isinstance(redis_url_or_options, dict): - return RedisSync(**redis_url_or_options) + if (redis_url := parse_redis_url()) is not None: + return RedisSync.from_url( + redis_url, + **_get_common_redis_kwargs(), + ) return None -def parse_redis_url() -> str | dict | None: +def parse_redis_url() -> str | None: """Parse the REDIS_URL in config if applicable. Returns: @@ -387,7 +398,7 @@ async def get_redis_status() -> dict[str, bool | None]: redis_client.ping() else: status = None - except exceptions.RedisError: + except RedisError: status = False return {"redis": status}