Enable automatic retry on redis errors

ExponentialBackoff 3x retry for BusyLoadingError, ConnectionError, and TimeoutError
This commit is contained in:
Masen Furer 2025-01-06 15:35:58 -08:00
parent eae15e3a83
commit c902e6dd45
No known key found for this signature in database
GPG Key ID: B0008AD22B3B3A95

View File

@ -21,15 +21,17 @@ import zipfile
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from types import ModuleType from types import ModuleType
from typing import Callable, List, Optional from typing import Any, Callable, List, Optional
import httpx import httpx
import typer import typer
from alembic.util.exc import CommandError from alembic.util.exc import CommandError
from packaging import version from packaging import version
from redis import Redis as RedisSync from redis import Redis as RedisSync
from redis import exceptions
from redis.asyncio import Redis from redis.asyncio import Redis
from redis.backoff import ExponentialBackoff
from redis.exceptions import BusyLoadingError, ConnectionError, RedisError, TimeoutError
from redis.retry import Retry
from reflex import constants, model from reflex import constants, model
from reflex.compiler import templates from reflex.compiler import templates
@ -327,16 +329,24 @@ def get_compiled_app(reload: bool = False, export: bool = False) -> ModuleType:
return app_module return app_module
def _get_common_redis_kwargs() -> dict[str, Any]:
return {
"retry": Retry(ExponentialBackoff(), 3),
"retry_on_error": [BusyLoadingError, ConnectionError, TimeoutError],
}
def get_redis() -> Redis | None: def get_redis() -> Redis | None:
"""Get the asynchronous redis client. """Get the asynchronous redis client.
Returns: Returns:
The asynchronous redis client. The asynchronous redis client.
""" """
if isinstance((redis_url_or_options := parse_redis_url()), str): if (redis_url := parse_redis_url()) is not None:
return Redis.from_url(redis_url_or_options) return Redis.from_url(
elif isinstance(redis_url_or_options, dict): redis_url,
return Redis(**redis_url_or_options) **_get_common_redis_kwargs(),
)
return None return None
@ -346,14 +356,15 @@ def get_redis_sync() -> RedisSync | None:
Returns: Returns:
The synchronous redis client. The synchronous redis client.
""" """
if isinstance((redis_url_or_options := parse_redis_url()), str): if (redis_url := parse_redis_url()) is not None:
return RedisSync.from_url(redis_url_or_options) return RedisSync.from_url(
elif isinstance(redis_url_or_options, dict): redis_url,
return RedisSync(**redis_url_or_options) **_get_common_redis_kwargs(),
)
return None return None
def parse_redis_url() -> str | dict | None: def parse_redis_url() -> str | None:
"""Parse the REDIS_URL in config if applicable. """Parse the REDIS_URL in config if applicable.
Returns: Returns:
@ -387,7 +398,7 @@ async def get_redis_status() -> dict[str, bool | None]:
redis_client.ping() redis_client.ping()
else: else:
status = None status = None
except exceptions.RedisError: except RedisError:
status = False status = False
return {"redis": status} return {"redis": status}