888 lines
27 KiB
Python
888 lines
27 KiB
Python
"""The Reflex config."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import dataclasses
|
|
import enum
|
|
import importlib
|
|
import inspect
|
|
import os
|
|
import sys
|
|
import threading
|
|
import urllib.parse
|
|
from importlib.util import find_spec
|
|
from pathlib import Path
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
Any,
|
|
Dict,
|
|
Generic,
|
|
List,
|
|
Optional,
|
|
Set,
|
|
TypeVar,
|
|
get_args,
|
|
)
|
|
|
|
from typing_extensions import Annotated, get_type_hints
|
|
|
|
from reflex.utils.exceptions import ConfigError, EnvironmentVarValueError
|
|
from reflex.utils.types import GenericType, is_union, value_inside_optional
|
|
|
|
try:
|
|
import pydantic.v1 as pydantic
|
|
except ModuleNotFoundError:
|
|
import pydantic
|
|
|
|
from reflex_cli.constants.hosting import Hosting
|
|
|
|
from reflex import constants
|
|
from reflex.base import Base
|
|
from reflex.utils import console
|
|
|
|
|
|
class DBConfig(Base):
|
|
"""Database config."""
|
|
|
|
engine: str
|
|
username: Optional[str] = ""
|
|
password: Optional[str] = ""
|
|
host: Optional[str] = ""
|
|
port: Optional[int] = None
|
|
database: str
|
|
|
|
@classmethod
|
|
def postgresql(
|
|
cls,
|
|
database: str,
|
|
username: str,
|
|
password: str | None = None,
|
|
host: str | None = None,
|
|
port: int | None = 5432,
|
|
) -> DBConfig:
|
|
"""Create an instance with postgresql engine.
|
|
|
|
Args:
|
|
database: Database name.
|
|
username: Database username.
|
|
password: Database password.
|
|
host: Database host.
|
|
port: Database port.
|
|
|
|
Returns:
|
|
DBConfig instance.
|
|
"""
|
|
return cls(
|
|
engine="postgresql",
|
|
username=username,
|
|
password=password,
|
|
host=host,
|
|
port=port,
|
|
database=database,
|
|
)
|
|
|
|
@classmethod
|
|
def postgresql_psycopg(
|
|
cls,
|
|
database: str,
|
|
username: str,
|
|
password: str | None = None,
|
|
host: str | None = None,
|
|
port: int | None = 5432,
|
|
) -> DBConfig:
|
|
"""Create an instance with postgresql+psycopg engine.
|
|
|
|
Args:
|
|
database: Database name.
|
|
username: Database username.
|
|
password: Database password.
|
|
host: Database host.
|
|
port: Database port.
|
|
|
|
Returns:
|
|
DBConfig instance.
|
|
"""
|
|
return cls(
|
|
engine="postgresql+psycopg",
|
|
username=username,
|
|
password=password,
|
|
host=host,
|
|
port=port,
|
|
database=database,
|
|
)
|
|
|
|
@classmethod
|
|
def sqlite(
|
|
cls,
|
|
database: str,
|
|
) -> DBConfig:
|
|
"""Create an instance with sqlite engine.
|
|
|
|
Args:
|
|
database: Database name.
|
|
|
|
Returns:
|
|
DBConfig instance.
|
|
"""
|
|
return cls(
|
|
engine="sqlite",
|
|
database=database,
|
|
)
|
|
|
|
def get_url(self) -> str:
|
|
"""Get database URL.
|
|
|
|
Returns:
|
|
The database URL.
|
|
"""
|
|
host = (
|
|
f"{self.host}:{self.port}" if self.host and self.port else self.host or ""
|
|
)
|
|
username = urllib.parse.quote_plus(self.username) if self.username else ""
|
|
password = urllib.parse.quote_plus(self.password) if self.password else ""
|
|
|
|
if username:
|
|
path = f"{username}:{password}@{host}" if password else f"{username}@{host}"
|
|
else:
|
|
path = f"{host}"
|
|
|
|
return f"{self.engine}://{path}/{self.database}"
|
|
|
|
|
|
def get_default_value_for_field(field: dataclasses.Field) -> Any:
|
|
"""Get the default value for a field.
|
|
|
|
Args:
|
|
field: The field.
|
|
|
|
Returns:
|
|
The default value.
|
|
|
|
Raises:
|
|
ValueError: If no default value is found.
|
|
"""
|
|
if field.default != dataclasses.MISSING:
|
|
return field.default
|
|
elif field.default_factory != dataclasses.MISSING:
|
|
return field.default_factory()
|
|
else:
|
|
raise ValueError(
|
|
f"Missing value for environment variable {field.name} and no default value found"
|
|
)
|
|
|
|
|
|
# TODO: Change all interpret_.* signatures to value: str, field: dataclasses.Field once we migrate rx.Config to dataclasses
|
|
def interpret_boolean_env(value: str, field_name: str) -> bool:
|
|
"""Interpret a boolean environment variable value.
|
|
|
|
Args:
|
|
value: The environment variable value.
|
|
field_name: The field name.
|
|
|
|
Returns:
|
|
The interpreted value.
|
|
|
|
Raises:
|
|
EnvironmentVarValueError: If the value is invalid.
|
|
"""
|
|
true_values = ["true", "1", "yes", "y"]
|
|
false_values = ["false", "0", "no", "n"]
|
|
|
|
if value.lower() in true_values:
|
|
return True
|
|
elif value.lower() in false_values:
|
|
return False
|
|
raise EnvironmentVarValueError(f"Invalid boolean value: {value} for {field_name}")
|
|
|
|
|
|
def interpret_int_env(value: str, field_name: str) -> int:
|
|
"""Interpret an integer environment variable value.
|
|
|
|
Args:
|
|
value: The environment variable value.
|
|
field_name: The field name.
|
|
|
|
Returns:
|
|
The interpreted value.
|
|
|
|
Raises:
|
|
EnvironmentVarValueError: If the value is invalid.
|
|
"""
|
|
try:
|
|
return int(value)
|
|
except ValueError as ve:
|
|
raise EnvironmentVarValueError(
|
|
f"Invalid integer value: {value} for {field_name}"
|
|
) from ve
|
|
|
|
|
|
def interpret_existing_path_env(value: str, field_name: str) -> ExistingPath:
|
|
"""Interpret a path environment variable value as an existing path.
|
|
|
|
Args:
|
|
value: The environment variable value.
|
|
field_name: The field name.
|
|
|
|
Returns:
|
|
The interpreted value.
|
|
|
|
Raises:
|
|
EnvironmentVarValueError: If the path does not exist.
|
|
"""
|
|
path = Path(value)
|
|
if not path.exists():
|
|
raise EnvironmentVarValueError(f"Path does not exist: {path} for {field_name}")
|
|
return path
|
|
|
|
|
|
def interpret_path_env(value: str, field_name: str) -> Path:
|
|
"""Interpret a path environment variable value.
|
|
|
|
Args:
|
|
value: The environment variable value.
|
|
field_name: The field name.
|
|
|
|
Returns:
|
|
The interpreted value.
|
|
"""
|
|
return Path(value)
|
|
|
|
|
|
def interpret_enum_env(value: str, field_type: GenericType, field_name: str) -> Any:
|
|
"""Interpret an enum environment variable value.
|
|
|
|
Args:
|
|
value: The environment variable value.
|
|
field_type: The field type.
|
|
field_name: The field name.
|
|
|
|
Returns:
|
|
The interpreted value.
|
|
|
|
Raises:
|
|
EnvironmentVarValueError: If the value is invalid.
|
|
"""
|
|
try:
|
|
return field_type(value)
|
|
except ValueError as ve:
|
|
raise EnvironmentVarValueError(
|
|
f"Invalid enum value: {value} for {field_name}"
|
|
) from ve
|
|
|
|
|
|
def interpret_env_var_value(
|
|
value: str, field_type: GenericType, field_name: str
|
|
) -> Any:
|
|
"""Interpret an environment variable value based on the field type.
|
|
|
|
Args:
|
|
value: The environment variable value.
|
|
field_type: The field type.
|
|
field_name: The field name.
|
|
|
|
Returns:
|
|
The interpreted value.
|
|
|
|
Raises:
|
|
ValueError: If the value is invalid.
|
|
"""
|
|
field_type = value_inside_optional(field_type)
|
|
|
|
if is_union(field_type):
|
|
raise ValueError(
|
|
f"Union types are not supported for environment variables: {field_name}."
|
|
)
|
|
|
|
if field_type is bool:
|
|
return interpret_boolean_env(value, field_name)
|
|
elif field_type is str:
|
|
return value
|
|
elif field_type is int:
|
|
return interpret_int_env(value, field_name)
|
|
elif field_type is Path:
|
|
return interpret_path_env(value, field_name)
|
|
elif field_type is ExistingPath:
|
|
return interpret_existing_path_env(value, field_name)
|
|
elif inspect.isclass(field_type) and issubclass(field_type, enum.Enum):
|
|
return interpret_enum_env(value, field_type, field_name)
|
|
|
|
else:
|
|
raise ValueError(
|
|
f"Invalid type for environment variable {field_name}: {field_type}. This is probably an issue in Reflex."
|
|
)
|
|
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
class EnvVar(Generic[T]):
|
|
"""Environment variable."""
|
|
|
|
name: str
|
|
default: Any
|
|
type_: T
|
|
|
|
def __init__(self, name: str, default: Any, type_: T) -> None:
|
|
"""Initialize the environment variable.
|
|
|
|
Args:
|
|
name: The environment variable name.
|
|
default: The default value.
|
|
type_: The type of the value.
|
|
"""
|
|
self.name = name
|
|
self.default = default
|
|
self.type_ = type_
|
|
|
|
def interpret(self, value: str) -> T:
|
|
"""Interpret the environment variable value.
|
|
|
|
Args:
|
|
value: The environment variable value.
|
|
|
|
Returns:
|
|
The interpreted value.
|
|
"""
|
|
return interpret_env_var_value(value, self.type_, self.name)
|
|
|
|
def getenv(self) -> Optional[T]:
|
|
"""Get the interpreted environment variable value.
|
|
|
|
Returns:
|
|
The environment variable value.
|
|
"""
|
|
env_value = os.getenv(self.name, None)
|
|
if env_value is not None:
|
|
return self.interpret(env_value)
|
|
return None
|
|
|
|
def is_set(self) -> bool:
|
|
"""Check if the environment variable is set.
|
|
|
|
Returns:
|
|
True if the environment variable is set.
|
|
"""
|
|
return self.name in os.environ
|
|
|
|
def get(self) -> T:
|
|
"""Get the interpreted environment variable value or the default value if not set.
|
|
|
|
Returns:
|
|
The interpreted value.
|
|
"""
|
|
env_value = self.getenv()
|
|
if env_value is not None:
|
|
return env_value
|
|
return self.default
|
|
|
|
def set(self, value: T | None) -> None:
|
|
"""Set the environment variable. None unsets the variable.
|
|
|
|
Args:
|
|
value: The value to set.
|
|
"""
|
|
if value is None:
|
|
_ = os.environ.pop(self.name, None)
|
|
else:
|
|
if isinstance(value, enum.Enum):
|
|
value = value.value
|
|
os.environ[self.name] = str(value)
|
|
|
|
|
|
class env_var: # type: ignore
|
|
"""Descriptor for environment variables."""
|
|
|
|
name: str
|
|
default: Any
|
|
internal: bool = False
|
|
|
|
def __init__(self, default: Any, internal: bool = False) -> None:
|
|
"""Initialize the descriptor.
|
|
|
|
Args:
|
|
default: The default value.
|
|
internal: Whether the environment variable is reflex internal.
|
|
"""
|
|
self.default = default
|
|
self.internal = internal
|
|
|
|
def __set_name__(self, owner, name):
|
|
"""Set the name of the descriptor.
|
|
|
|
Args:
|
|
owner: The owner class.
|
|
name: The name of the descriptor.
|
|
"""
|
|
self.name = name
|
|
|
|
def __get__(self, instance, owner):
|
|
"""Get the EnvVar instance.
|
|
|
|
Args:
|
|
instance: The instance.
|
|
owner: The owner class.
|
|
|
|
Returns:
|
|
The EnvVar instance.
|
|
"""
|
|
type_ = get_args(get_type_hints(owner)[self.name])[0]
|
|
env_name = self.name
|
|
if self.internal:
|
|
env_name = f"__{env_name}"
|
|
return EnvVar(name=env_name, default=self.default, type_=type_)
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
|
|
def env_var(default, internal=False) -> EnvVar:
|
|
"""Typing helper for the env_var descriptor.
|
|
|
|
Args:
|
|
default: The default value.
|
|
internal: Whether the environment variable is reflex internal.
|
|
|
|
Returns:
|
|
The EnvVar instance.
|
|
"""
|
|
return default
|
|
|
|
|
|
class PathExistsFlag:
|
|
"""Flag to indicate that a path must exist."""
|
|
|
|
|
|
ExistingPath = Annotated[Path, PathExistsFlag]
|
|
|
|
|
|
class PerformanceMode(enum.Enum):
|
|
"""Performance mode for the app."""
|
|
|
|
WARN = "warn"
|
|
RAISE = "raise"
|
|
OFF = "off"
|
|
|
|
|
|
class EnvironmentVariables:
|
|
"""Environment variables class to instantiate environment variables."""
|
|
|
|
# Whether to use npm over bun to install frontend packages.
|
|
REFLEX_USE_NPM: EnvVar[bool] = env_var(False)
|
|
|
|
# The npm registry to use.
|
|
NPM_CONFIG_REGISTRY: EnvVar[Optional[str]] = env_var(None)
|
|
|
|
# Whether to use Granian for the backend. Otherwise, use Uvicorn.
|
|
REFLEX_USE_GRANIAN: EnvVar[bool] = env_var(False)
|
|
|
|
# The username to use for authentication on python package repository. Username and password must both be provided.
|
|
TWINE_USERNAME: EnvVar[Optional[str]] = env_var(None)
|
|
|
|
# The password to use for authentication on python package repository. Username and password must both be provided.
|
|
TWINE_PASSWORD: EnvVar[Optional[str]] = env_var(None)
|
|
|
|
# Whether to use the system installed bun. If set to false, bun will be bundled with the app.
|
|
REFLEX_USE_SYSTEM_BUN: EnvVar[bool] = env_var(False)
|
|
|
|
# Whether to use the system installed node and npm. If set to false, node and npm will be bundled with the app.
|
|
REFLEX_USE_SYSTEM_NODE: EnvVar[bool] = env_var(False)
|
|
|
|
# The working directory for the next.js commands.
|
|
REFLEX_WEB_WORKDIR: EnvVar[Path] = env_var(Path(constants.Dirs.WEB))
|
|
|
|
# Path to the alembic config file
|
|
ALEMBIC_CONFIG: EnvVar[ExistingPath] = env_var(Path(constants.ALEMBIC_CONFIG))
|
|
|
|
# Disable SSL verification for HTTPX requests.
|
|
SSL_NO_VERIFY: EnvVar[bool] = env_var(False)
|
|
|
|
# The directory to store uploaded files.
|
|
REFLEX_UPLOADED_FILES_DIR: EnvVar[Path] = env_var(
|
|
Path(constants.Dirs.UPLOADED_FILES)
|
|
)
|
|
|
|
# Whether to use separate processes to compile the frontend and how many. If not set, defaults to thread executor.
|
|
REFLEX_COMPILE_PROCESSES: EnvVar[Optional[int]] = env_var(None)
|
|
|
|
# Whether to use separate threads to compile the frontend and how many. Defaults to `min(32, os.cpu_count() + 4)`.
|
|
REFLEX_COMPILE_THREADS: EnvVar[Optional[int]] = env_var(None)
|
|
|
|
# The directory to store reflex dependencies.
|
|
REFLEX_DIR: EnvVar[Path] = env_var(Path(constants.Reflex.DIR))
|
|
|
|
# Whether to print the SQL queries if the log level is INFO or lower.
|
|
SQLALCHEMY_ECHO: EnvVar[bool] = env_var(False)
|
|
|
|
# Whether to check db connections before using them.
|
|
SQLALCHEMY_POOL_PRE_PING: EnvVar[bool] = env_var(True)
|
|
|
|
# Whether to ignore the redis config error. Some redis servers only allow out-of-band configuration.
|
|
REFLEX_IGNORE_REDIS_CONFIG_ERROR: EnvVar[bool] = env_var(False)
|
|
|
|
# Whether to skip purging the web directory in dev mode.
|
|
REFLEX_PERSIST_WEB_DIR: EnvVar[bool] = env_var(False)
|
|
|
|
# The reflex.build frontend host.
|
|
REFLEX_BUILD_FRONTEND: EnvVar[str] = env_var(
|
|
constants.Templates.REFLEX_BUILD_FRONTEND
|
|
)
|
|
|
|
# The reflex.build backend host.
|
|
REFLEX_BUILD_BACKEND: EnvVar[str] = env_var(
|
|
constants.Templates.REFLEX_BUILD_BACKEND
|
|
)
|
|
|
|
# This env var stores the execution mode of the app
|
|
REFLEX_ENV_MODE: EnvVar[constants.Env] = env_var(constants.Env.DEV)
|
|
|
|
# Whether to run the backend only. Exclusive with REFLEX_FRONTEND_ONLY.
|
|
REFLEX_BACKEND_ONLY: EnvVar[bool] = env_var(False)
|
|
|
|
# Whether to run the frontend only. Exclusive with REFLEX_BACKEND_ONLY.
|
|
REFLEX_FRONTEND_ONLY: EnvVar[bool] = env_var(False)
|
|
|
|
# Reflex internal env to reload the config.
|
|
RELOAD_CONFIG: EnvVar[bool] = env_var(False, internal=True)
|
|
|
|
# If this env var is set to "yes", App.compile will be a no-op
|
|
REFLEX_SKIP_COMPILE: EnvVar[bool] = env_var(False, internal=True)
|
|
|
|
# Whether to run app harness tests in headless mode.
|
|
APP_HARNESS_HEADLESS: EnvVar[bool] = env_var(False)
|
|
|
|
# Which app harness driver to use.
|
|
APP_HARNESS_DRIVER: EnvVar[str] = env_var("Chrome")
|
|
|
|
# Arguments to pass to the app harness driver.
|
|
APP_HARNESS_DRIVER_ARGS: EnvVar[str] = env_var("")
|
|
|
|
# Where to save screenshots when tests fail.
|
|
SCREENSHOT_DIR: EnvVar[Optional[Path]] = env_var(None)
|
|
|
|
# Whether to check for outdated package versions.
|
|
REFLEX_CHECK_LATEST_VERSION: EnvVar[bool] = env_var(True)
|
|
|
|
# In which performance mode to run the app.
|
|
REFLEX_PERF_MODE: EnvVar[Optional[PerformanceMode]] = env_var(PerformanceMode.WARN)
|
|
|
|
# The maximum size of the reflex state in kilobytes.
|
|
REFLEX_STATE_SIZE_LIMIT: EnvVar[int] = env_var(1000)
|
|
|
|
|
|
environment = EnvironmentVariables()
|
|
|
|
|
|
# These vars are not logged because they may contain sensitive information.
|
|
_sensitive_env_vars = {"DB_URL", "ASYNC_DB_URL", "REDIS_URL"}
|
|
|
|
|
|
class Config(Base):
|
|
"""The config defines runtime settings for the app.
|
|
|
|
By default, the config is defined in an `rxconfig.py` file in the root of the app.
|
|
|
|
```python
|
|
# rxconfig.py
|
|
import reflex as rx
|
|
|
|
config = rx.Config(
|
|
app_name="myapp",
|
|
api_url="http://localhost:8000",
|
|
)
|
|
```
|
|
|
|
Every config value can be overridden by an environment variable with the same name in uppercase.
|
|
For example, `db_url` can be overridden by setting the `DB_URL` environment variable.
|
|
|
|
See the [configuration](https://reflex.dev/docs/getting-started/configuration/) docs for more info.
|
|
"""
|
|
|
|
class Config:
|
|
"""Pydantic config for the config."""
|
|
|
|
validate_assignment = True
|
|
|
|
# The name of the app (should match the name of the app directory).
|
|
app_name: str
|
|
|
|
# The log level to use.
|
|
loglevel: constants.LogLevel = constants.LogLevel.DEFAULT
|
|
|
|
# The port to run the frontend on. NOTE: When running in dev mode, the next available port will be used if this is taken.
|
|
frontend_port: int = constants.DefaultPorts.FRONTEND_PORT
|
|
|
|
# The path to run the frontend on. For example, "/app" will run the frontend on http://localhost:3000/app
|
|
frontend_path: str = ""
|
|
|
|
# The port to run the backend on. NOTE: When running in dev mode, the next available port will be used if this is taken.
|
|
backend_port: int = constants.DefaultPorts.BACKEND_PORT
|
|
|
|
# The backend url the frontend will connect to. This must be updated if the backend is hosted elsewhere, or in production.
|
|
api_url: str = f"http://localhost:{backend_port}"
|
|
|
|
# The url the frontend will be hosted on.
|
|
deploy_url: Optional[str] = f"http://localhost:{frontend_port}"
|
|
|
|
# The url the backend will be hosted on.
|
|
backend_host: str = "0.0.0.0"
|
|
|
|
# The database url used by rx.Model.
|
|
db_url: Optional[str] = "sqlite:///reflex.db"
|
|
|
|
# The async database url used by rx.Model.
|
|
async_db_url: Optional[str] = None
|
|
|
|
# The redis url
|
|
redis_url: Optional[str] = None
|
|
|
|
# Telemetry opt-in.
|
|
telemetry_enabled: bool = True
|
|
|
|
# The bun path
|
|
bun_path: ExistingPath = constants.Bun.DEFAULT_PATH
|
|
|
|
# Timeout to do a production build of a frontend page.
|
|
static_page_generation_timeout: int = 60
|
|
|
|
# List of origins that are allowed to connect to the backend API.
|
|
cors_allowed_origins: List[str] = ["*"]
|
|
|
|
# Tailwind config.
|
|
tailwind: Optional[Dict[str, Any]] = {"plugins": ["@tailwindcss/typography"]}
|
|
|
|
# Timeout when launching the gunicorn server. TODO(rename this to backend_timeout?)
|
|
timeout: int = 120
|
|
|
|
# Whether to enable or disable nextJS gzip compression.
|
|
next_compression: bool = True
|
|
|
|
# Whether to use React strict mode in nextJS
|
|
react_strict_mode: bool = True
|
|
|
|
# Additional frontend packages to install.
|
|
frontend_packages: List[str] = []
|
|
|
|
# The hosting service backend URL.
|
|
cp_backend_url: str = Hosting.HOSTING_SERVICE
|
|
# The hosting service frontend URL.
|
|
cp_web_url: str = Hosting.HOSTING_SERVICE_UI
|
|
|
|
# The worker class used in production mode
|
|
gunicorn_worker_class: str = "uvicorn.workers.UvicornH11Worker"
|
|
|
|
# Number of gunicorn workers from user
|
|
gunicorn_workers: Optional[int] = None
|
|
|
|
# Number of requests before a worker is restarted
|
|
gunicorn_max_requests: int = 100
|
|
|
|
# Variance limit for max requests; gunicorn only
|
|
gunicorn_max_requests_jitter: int = 25
|
|
|
|
# Indicate which type of state manager to use
|
|
state_manager_mode: constants.StateManagerMode = constants.StateManagerMode.DISK
|
|
|
|
# Maximum expiration lock time for redis state manager
|
|
redis_lock_expiration: int = constants.Expiration.LOCK
|
|
|
|
# Maximum lock time before warning for redis state manager.
|
|
redis_lock_warning_threshold: int = constants.Expiration.LOCK_WARNING_THRESHOLD
|
|
|
|
# Token expiration time for redis state manager
|
|
redis_token_expiration: int = constants.Expiration.TOKEN
|
|
|
|
# Attributes that were explicitly set by the user.
|
|
_non_default_attributes: Set[str] = pydantic.PrivateAttr(set())
|
|
|
|
# Path to file containing key-values pairs to override in the environment; Dotenv format.
|
|
env_file: Optional[str] = None
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
"""Initialize the config values.
|
|
|
|
Args:
|
|
*args: The args to pass to the Pydantic init method.
|
|
**kwargs: The kwargs to pass to the Pydantic init method.
|
|
|
|
Raises:
|
|
ConfigError: If some values in the config are invalid.
|
|
"""
|
|
super().__init__(*args, **kwargs)
|
|
|
|
# Update the config from environment variables.
|
|
env_kwargs = self.update_from_env()
|
|
for key, env_value in env_kwargs.items():
|
|
setattr(self, key, env_value)
|
|
|
|
# Update default URLs if ports were set
|
|
kwargs.update(env_kwargs)
|
|
self._non_default_attributes.update(kwargs)
|
|
self._replace_defaults(**kwargs)
|
|
|
|
if (
|
|
self.state_manager_mode == constants.StateManagerMode.REDIS
|
|
and not self.redis_url
|
|
):
|
|
raise ConfigError(
|
|
"REDIS_URL is required when using the redis state manager."
|
|
)
|
|
|
|
@property
|
|
def module(self) -> str:
|
|
"""Get the module name of the app.
|
|
|
|
Returns:
|
|
The module name.
|
|
"""
|
|
return ".".join([self.app_name, self.app_name])
|
|
|
|
def update_from_env(self) -> dict[str, Any]:
|
|
"""Update the config values based on set environment variables.
|
|
If there is a set env_file, it is loaded first.
|
|
|
|
Returns:
|
|
The updated config values.
|
|
"""
|
|
if self.env_file:
|
|
try:
|
|
from dotenv import load_dotenv # type: ignore
|
|
|
|
# load env file if exists
|
|
load_dotenv(self.env_file, override=True)
|
|
except ImportError:
|
|
console.error(
|
|
"""The `python-dotenv` package is required to load environment variables from a file. Run `pip install "python-dotenv>=1.0.1"`."""
|
|
)
|
|
|
|
updated_values = {}
|
|
# Iterate over the fields.
|
|
for key, field in self.__fields__.items():
|
|
# The env var name is the key in uppercase.
|
|
env_var = os.environ.get(key.upper())
|
|
|
|
# If the env var is set, override the config value.
|
|
if env_var is not None:
|
|
# Interpret the value.
|
|
value = interpret_env_var_value(env_var, field.outer_type_, field.name)
|
|
|
|
# Set the value.
|
|
updated_values[key] = value
|
|
|
|
if key.upper() in _sensitive_env_vars:
|
|
env_var = "***"
|
|
|
|
console.info(
|
|
f"Overriding config value {key} with env var {key.upper()}={env_var}",
|
|
dedupe=True,
|
|
)
|
|
|
|
return updated_values
|
|
|
|
def get_event_namespace(self) -> str:
|
|
"""Get the path that the backend Websocket server lists on.
|
|
|
|
Returns:
|
|
The namespace for websocket.
|
|
"""
|
|
event_url = constants.Endpoint.EVENT.get_url()
|
|
return urllib.parse.urlsplit(event_url).path
|
|
|
|
def _replace_defaults(self, **kwargs):
|
|
"""Replace formatted defaults when the caller provides updates.
|
|
|
|
Args:
|
|
**kwargs: The kwargs passed to the config or from the env.
|
|
"""
|
|
if "api_url" not in self._non_default_attributes and "backend_port" in kwargs:
|
|
self.api_url = f"http://localhost:{kwargs['backend_port']}"
|
|
|
|
if (
|
|
"deploy_url" not in self._non_default_attributes
|
|
and "frontend_port" in kwargs
|
|
):
|
|
self.deploy_url = f"http://localhost:{kwargs['frontend_port']}"
|
|
|
|
if "api_url" not in self._non_default_attributes:
|
|
# If running in Github Codespaces, override API_URL
|
|
codespace_name = os.getenv("CODESPACE_NAME")
|
|
GITHUB_CODESPACES_PORT_FORWARDING_DOMAIN = os.getenv(
|
|
"GITHUB_CODESPACES_PORT_FORWARDING_DOMAIN"
|
|
)
|
|
# If running on Replit.com interactively, override API_URL to ensure we maintain the backend_port
|
|
replit_dev_domain = os.getenv("REPLIT_DEV_DOMAIN")
|
|
backend_port = kwargs.get("backend_port", self.backend_port)
|
|
if codespace_name and GITHUB_CODESPACES_PORT_FORWARDING_DOMAIN:
|
|
self.api_url = (
|
|
f"https://{codespace_name}-{kwargs.get('backend_port', self.backend_port)}"
|
|
f".{GITHUB_CODESPACES_PORT_FORWARDING_DOMAIN}"
|
|
)
|
|
elif replit_dev_domain and backend_port:
|
|
self.api_url = f"https://{replit_dev_domain}:{backend_port}"
|
|
|
|
def _set_persistent(self, **kwargs):
|
|
"""Set values in this config and in the environment so they persist into subprocess.
|
|
|
|
Args:
|
|
**kwargs: The kwargs passed to the config.
|
|
"""
|
|
for key, value in kwargs.items():
|
|
if value is not None:
|
|
os.environ[key.upper()] = str(value)
|
|
setattr(self, key, value)
|
|
self._non_default_attributes.update(kwargs)
|
|
self._replace_defaults(**kwargs)
|
|
|
|
|
|
def _get_config() -> Config:
|
|
"""Get the app config.
|
|
|
|
Returns:
|
|
The app config.
|
|
"""
|
|
# only import the module if it exists. If a module spec exists then
|
|
# the module exists.
|
|
spec = find_spec(constants.Config.MODULE)
|
|
if not spec:
|
|
# we need this condition to ensure that a ModuleNotFound error is not thrown when
|
|
# running unit/integration tests or during `reflex init`.
|
|
return Config(app_name="")
|
|
rxconfig = importlib.import_module(constants.Config.MODULE)
|
|
return rxconfig.config
|
|
|
|
|
|
# Protect sys.path from concurrent modification
|
|
_config_lock = threading.RLock()
|
|
|
|
|
|
def get_config(reload: bool = False) -> Config:
|
|
"""Get the app config.
|
|
|
|
Args:
|
|
reload: Re-import the rxconfig module from disk
|
|
|
|
Returns:
|
|
The app config.
|
|
"""
|
|
cached_rxconfig = sys.modules.get(constants.Config.MODULE, None)
|
|
if cached_rxconfig is not None:
|
|
if reload:
|
|
# Remove any cached module when `reload` is requested.
|
|
del sys.modules[constants.Config.MODULE]
|
|
else:
|
|
return cached_rxconfig.config
|
|
|
|
with _config_lock:
|
|
sys_path = sys.path.copy()
|
|
sys.path.clear()
|
|
sys.path.append(str(Path.cwd()))
|
|
try:
|
|
# Try to import the module with only the current directory in the path.
|
|
return _get_config()
|
|
except Exception:
|
|
# If the module import fails, try to import with the original sys.path.
|
|
sys.path.extend(sys_path)
|
|
return _get_config()
|
|
finally:
|
|
# Restore the original sys.path.
|
|
sys.path.clear()
|
|
sys.path.extend(sys_path)
|