diff --git a/reflex/config.py b/reflex/config.py index 7614417d5..7fcfd4c3e 100644 --- a/reflex/config.py +++ b/reflex/config.py @@ -36,7 +36,7 @@ except ModuleNotFoundError: from reflex_cli.constants.hosting import Hosting -from reflex import constants +from reflex import constants, server from reflex.base import Base from reflex.utils import console @@ -652,7 +652,7 @@ class Config(Base): # Tailwind config. tailwind: Optional[Dict[str, Any]] = {"plugins": ["@tailwindcss/typography"]} - # Timeout when launching the gunicorn server. TODO(rename this to backend_timeout?) + # Timeout when launching the gunicorn server. TODO(rename this to backend_timeout?); deprecated timeout: int = 120 # Whether to enable or disable nextJS gzip compression. @@ -669,16 +669,16 @@ class Config(Base): # The hosting service frontend URL. cp_web_url: str = Hosting.HOSTING_SERVICE_UI - # The worker class used in production mode + # The worker class used in production mode; deprecated gunicorn_worker_class: str = "uvicorn.workers.UvicornH11Worker" - # Number of gunicorn workers from user + # Number of gunicorn workers from user; deprecated gunicorn_workers: Optional[int] = None - # Number of requests before a worker is restarted + # Number of requests before a worker is restarted; deprecated gunicorn_max_requests: int = 100 - # Variance limit for max requests; gunicorn only + # Variance limit for max requests; gunicorn only; deprecated gunicorn_max_requests_jitter: int = 25 # Indicate which type of state manager to use @@ -699,6 +699,14 @@ class Config(Base): # Path to file containing key-values pairs to override in the environment; Dotenv format. env_file: Optional[str] = None + # Custom Backend Server + backend_server_prod: server.CustomBackendServer = server.GunicornBackendServer( + threads=2, workers=4, max_requests=100, max_requests_jitter=25, timeout=120 + ) + backend_server_dev: server.CustomBackendServer = server.UvicornBackendServer( + workers=1, + ) + def __init__(self, *args, **kwargs): """Initialize the config values. @@ -729,6 +737,24 @@ class Config(Base): "REDIS_URL is required when using the redis state manager." ) + if any( + getattr(self.get_fields().get(key, None), "default", None) + == self.get_value(key) + for key in ( + "timeout", + "gunicorn_worker_class", + "gunicorn_workers", + "gunicorn_max_requests", + "gunicorn_max_requests_jitter", + ) + ): + console.deprecate( + 'The following reflex configuration fields are obsolete: "timeout", "gunicorn_worker_class", "gunicorn_workers", "gunicorn_max_requests", "gunicorn_max_requests_jitter"\nplease update your configuration.', + reason="Use `config.backend_server_dev` or `config.backend_server_prod` instead in your `rxconfig.py`.", + deprecation_version="0.7.x", + removal_version="x.x.x", + ) + @property def module(self) -> str: """Get the module name of the app. diff --git a/reflex/server/__init__.py b/reflex/server/__init__.py new file mode 100644 index 000000000..b39dc8049 --- /dev/null +++ b/reflex/server/__init__.py @@ -0,0 +1,13 @@ +"""Import every *BackendServer.""" + +from .base import CustomBackendServer +from .granian import GranianBackendServer +from .gunicorn import GunicornBackendServer +from .uvicorn import UvicornBackendServer + +__all__ = [ + "CustomBackendServer", + "GranianBackendServer", + "GunicornBackendServer", + "UvicornBackendServer", +] diff --git a/reflex/server/base.py b/reflex/server/base.py new file mode 100644 index 000000000..fdd231393 --- /dev/null +++ b/reflex/server/base.py @@ -0,0 +1,390 @@ +"""The base for CustomBackendServer.""" +# ruff: noqa: RUF009 + +from __future__ import annotations + +import os +from abc import abstractmethod +from dataclasses import Field, dataclass +from dataclasses import field as dc_field +from pathlib import Path +from typing import Any, Callable, ClassVar, Sequence + +from reflex import constants +from reflex.constants.base import Env, LogLevel + +ReturnCliTypeFn = Callable[[Any], str] + + +class CliType: + """Cli type transformer.""" + + @staticmethod + def default(fmt: str) -> ReturnCliTypeFn: + """Default cli transformer. + + Example: + fmt: `'--env-file {value}'` + value: `'/config.conf'` + result => `'--env-file /config.conf'` + + Args: + fmt (str): format + + Returns: + ReturnCliTypeFn: function wrapper + """ + + def wrapper(value: bool) -> str: + return fmt.format(value=value) + + return wrapper + + @staticmethod + def boolean(fmt: str, bool_value: bool = True) -> ReturnCliTypeFn: + """When cli mode args only show when we want to activate it. + + Example: + fmt: `'--reload'` + value: `False` + result => `''` + + Example: + fmt: `'--reload'` + value: `True` + result => `'--reload'` + + Args: + fmt (str): format + bool_value (bool): boolean value used for toggle condition + + Returns: + ReturnCliTypeFn: function wrapper + """ + + def wrapper(value: bool) -> str: + return fmt if value is bool_value else "" + + return wrapper + + @staticmethod + def boolean_toggle( + fmt: str, + toggle_kw: str = "no", + toggle_sep: str = "-", + toggle_value: bool = False, + **kwargs, + ) -> ReturnCliTypeFn: + """When the cli mode is a boolean toggle `--access-log`/`--no-access-log`. + + Example: + fmt: `'--{toggle_kw}{toggle_sep}access-log'` + value: `False` + toggle_value: `False` (default) + result => `'--no-access-log'` + + Example: + fmt: `'--{toggle_kw}{toggle_sep}access-log'` + value: `True` + toggle_value: `False` (default) + result => `'--access-log'` + + Example: + fmt: `'--{toggle_kw}{toggle_sep}access-log'` + value: `True` + toggle_value: `True` + result => `'--no-access-log'` + + Args: + fmt (str): format + toggle_kw (str): keyword used when toggled. Defaults to "no". + toggle_sep (str): separator used when toggled. Defaults to "-". + toggle_value (bool): boolean value used for toggle condition. Defaults to False. + **kwargs: Keyword arguments to pass to the format string function. + + Returns: + ReturnCliTypeFn: function wrapper + """ + + def wrapper(value: bool) -> str: + return fmt.format( + **kwargs, + toggle_kw=(toggle_kw if value is toggle_value else ""), + toggle_sep=(toggle_sep if value is toggle_value else ""), + ) + + return wrapper + + @staticmethod + def multiple( + fmt: str, + join_sep: str | None = None, + value_transformer: Callable[[Any], str] = lambda value: str(value), + ) -> ReturnCliTypeFn: + r"""When the cli mode need multiple args or single args from an sequence. + + Example (Multiple args mode): + fmt: `'--header {value}'`. + data_list: `['X-Forwarded-Proto=https', 'X-Forwarded-For=0.0.0.0']` + join_sep: `None` + result => `'--header \"X-Forwarded-Proto=https\" --header \"X-Forwarded-For=0.0.0.0\"'` + + Example (Single args mode): + fmt: `--headers {values}` + data_list: `['X-Forwarded-Proto=https', 'X-Forwarded-For=0.0.0.0']` + join_sep (required): `';'` + result => `--headers \"X-Forwarded-Proto=https;X-Forwarded-For=0.0.0.0\"` + + Example (Single args mode): + fmt: `--headers {values}` + data_list: `[('X-Forwarded-Proto', 'https'), ('X-Forwarded-For', '0.0.0.0')]` + join_sep (required): `';'` + value_transformer: `lambda value: f'{value[0]}:{value[1]}'` + result => `--headers \"X-Forwarded-Proto:https;X-Forwarded-For:0.0.0.0\"` + + Args: + fmt (str): format + join_sep (str): separator used + value_transformer (Callable[[Any], str]): function used for transformer the element + + Returns: + ReturnCliTypeFn: function wrapper + """ + + def wrapper(values: Sequence[str]) -> str: + return ( + fmt.format( + values=join_sep.join(value_transformer(value) for value in values) + ) + if join_sep + else " ".join( + [fmt.format(value=value_transformer(value)) for value in values] + ) + ) + + return wrapper + + +def field_( + *, + default: Any = None, + metadata_cli: ReturnCliTypeFn | None = None, + exclude: bool = False, + **kwargs, +): + """Custom dataclass field builder. + + Args: + default (Any): default value. Defaults to None. + metadata_cli (ReturnCliTypeFn | None): cli wrapper function. Defaults to None. + exclude (bool): used for excluding the field to the server configuration (system field). Defaults to False. + **kwargs: Keyword arguments to pass to the field dataclasses function. + + Returns: + Field: return the field dataclasses + """ + params_ = { + "default": default, + "metadata": {"cli": metadata_cli, "exclude": exclude}, + **kwargs, + } + + if kwargs.get("default_factory", False): + params_.pop("default", None) + + return dc_field(**params_) + + +@dataclass +class CustomBackendServer: + """BackendServer base.""" + + _env: ClassVar[Env] = field_( + default=Env.DEV, metadata_cli=None, exclude=True, repr=False, init=False + ) + _app: ClassVar[Any] = field_( + default=None, metadata_cli=None, exclude=True, repr=False, init=False + ) + _app_uri: ClassVar[str] = field_( + default="", metadata_cli=None, exclude=True, repr=False, init=False + ) + + @staticmethod + def get_app_module( + for_granian_target: bool = False, add_extra_api: bool = False + ) -> str: + """Get the app module for the backend. + + Args: + for_granian_target (bool): make the return compatible with Granian. Defaults to False. + add_extra_api (bool): add the keyword "api" at the end (needed for Uvicorn & Granian). Defaults to False. + + Returns: + str: The app module for the backend. + """ + import reflex + + if for_granian_target: + app_path = str(Path(reflex.__file__).parent / "app_module_for_backend.py") + else: + app_path = "reflex.app_module_for_backend" + + return f"{app_path}:{constants.CompileVars.APP}{f'.{constants.CompileVars.API}' if add_extra_api else ''}" + + def get_available_cpus(self) -> int: + """Get available cpus. + + Returns: + int: number of available cpu cores + """ + return os.cpu_count() or 1 + + def get_max_workers(self) -> int: + """Get maximum workers. + + Returns: + int: get the maximum number of workers + """ + # https://docs.gunicorn.org/en/latest/settings.html#workers + return (os.cpu_count() or 1) * 4 + 1 + + def get_recommended_workers(self) -> int: + """Get recommended workers. + + Returns: + int: get the recommended number of workers + """ + # https://docs.gunicorn.org/en/latest/settings.html#workers + return (os.cpu_count() or 1) * 2 + 1 + + def get_max_threads(self, wait_time_ms: int = 50, service_time_ms: int = 5) -> int: + """Get maximum threads. + + Args: + wait_time_ms (int): the mean waiting duration targeted. Defaults to 50. + service_time_ms (int): the mean working duration. Defaults to 5. + + Returns: + int: get the maximum number of threads + """ + # https://engineering.zalando.com/posts/2019/04/how-to-set-an-ideal-thread-pool-size.html + # Brian Goetz formula + return int(self.get_available_cpus() * (1 + wait_time_ms / service_time_ms)) + + def get_recommended_threads( + self, + target_reqs: int | None = None, + wait_time_ms: int = 50, + service_time_ms: int = 5, + ) -> int: + """Get recommended threads. + + Args: + target_reqs (int | None): number of requests targeted. Defaults to None. + wait_time_ms (int): the mean waiting duration targeted. Defaults to 50. + service_time_ms (int): the mean working duration. Defaults to 5. + + Returns: + int: get the recommended number of threads + """ + # https://engineering.zalando.com/posts/2019/04/how-to-set-an-ideal-thread-pool-size.html + max_available_threads = self.get_max_threads() + + if target_reqs: + # Little's law formula + need_threads = target_reqs * ( + (wait_time_ms / 1000) + (service_time_ms / 1000) + ) + else: + need_threads = self.get_max_threads(wait_time_ms, service_time_ms) + + return int(min(need_threads, max_available_threads)) + + def get_fields(self) -> dict[str, Field]: + """Return all the fields. + + Returns: + dict[str, Field]: return the fields dictionary + """ + return self.__dataclass_fields__ + + def get_values(self) -> dict[str, Any]: + """Return all values. + + Returns: + dict[str, Any]: returns the value of the fields + """ + return { + key: getattr(self, key) + for key, field in self.__dataclass_fields__.items() + if field.metadata["exclude"] is False + } + + def is_default_value(self, key: str, value: Any | None = None) -> bool: + """Check if the `value` is the same value from default context. + + Args: + key (str): the name of the field + value (Any | None, optional): the value to check if is equal to the default value. Defaults to None. + + Returns: + bool: result of the condition of value are equal to the default value + """ + from dataclasses import MISSING + + field = self.get_fields()[key] + if value is None: + value = getattr(self, key, None) + + if field.default != MISSING: + return value == field.default + else: + if field.default_factory != MISSING: + return value == field.default_factory() + + return False + + @abstractmethod + def get_backend_bind(self) -> tuple[str, int]: + """Return the backend host and port. + + Returns: + tuple[str, int]: The host address and port. + """ + raise NotImplementedError() + + @abstractmethod + def check_import(self): + """Check package importation.""" + raise NotImplementedError() + + @abstractmethod + def setup(self, host: str, port: int, loglevel: LogLevel, env: Env): + """Setup. + + Args: + host (str): host address + port (int): port address + loglevel (LogLevel): log level + env (Env): prod/dev environment + """ + raise NotImplementedError() + + @abstractmethod + def run_prod(self) -> list[str]: + """Run in production mode. + + Returns: + list[str]: Command ready to be executed + """ + raise NotImplementedError() + + @abstractmethod + def run_dev(self): + """Run in development mode.""" + raise NotImplementedError() + + @abstractmethod + async def shutdown(self): + """Shutdown the backend server.""" + raise NotImplementedError() diff --git a/reflex/server/granian.py b/reflex/server/granian.py new file mode 100644 index 000000000..4fb61f4f2 --- /dev/null +++ b/reflex/server/granian.py @@ -0,0 +1,328 @@ +"""The GranianBackendServer.""" +# ruff: noqa: RUF009 + +from __future__ import annotations + +from dataclasses import dataclass +from dataclasses import field as dc_field +from pathlib import Path +from typing import Any, Literal + +from reflex.constants.base import Env, LogLevel +from reflex.server.base import CliType, CustomBackendServer, field_ +from reflex.utils import console + + +@dataclass +class HTTP1Settings: + """Granian HTTP1Settings.""" + + # https://github.com/emmett-framework/granian/blob/261ceba3fd93bca10300e91d1498bee6df9e3576/granian/http.py#L6 + keep_alive: bool = dc_field(default=True) + max_buffer_size: int = dc_field(default=8192 + 4096 * 100) + pipeline_flush: bool = dc_field(default=False) + + +@dataclass +class HTTP2Settings: # https://github.com/emmett-framework/granian/blob/261ceba3fd93bca10300e91d1498bee6df9e3576/granian/http.py#L13 + """Granian HTTP2Settings.""" + + adaptive_window: bool = dc_field(default=False) + initial_connection_window_size: int = dc_field(default=1024 * 1024) + initial_stream_window_size: int = dc_field(default=1024 * 1024) + keep_alive_interval: int | None = dc_field(default=None) + keep_alive_timeout: int = dc_field(default=20) + max_concurrent_streams: int = dc_field(default=200) + max_frame_size: int = dc_field(default=1024 * 16) + max_headers_size: int = dc_field(default=16 * 1024 * 1024) + max_send_buffer_size: int = dc_field(default=1024 * 400) + + +@dataclass +class GranianBackendServer(CustomBackendServer): + """Granian backendServer. + + https://github.com/emmett-framework/granian/blob/fc11808ed177362fcd9359a455a733065ddbc505/granian/cli.py#L52 (until Granian has the proper documentation) + + """ + + address: str = field_( + default="127.0.0.1", metadata_cli=CliType.default("--host {value}") + ) + port: int = field_(default=8000, metadata_cli=CliType.default("--port {value}")) + interface: Literal["asgi", "asginl", "rsgi", "wsgi"] = field_( + default="rsgi", metadata_cli=CliType.default("--interface {value}") + ) + workers: int = field_(default=0, metadata_cli=CliType.default("--workers {value}")) + threads: int = field_(default=0, metadata_cli=CliType.default("--threads {value}")) + blocking_threads: int | None = field_( + default=None, metadata_cli=CliType.default("--blocking-threads {value}") + ) + threading_mode: Literal["runtime", "workers"] = field_( + default="workers", metadata_cli=CliType.default("--threading-mode {value}") + ) + loop: Literal["auto", "asyncio", "uvloop"] = field_( + default="auto", metadata_cli=CliType.default("--loop {value}") + ) + loop_opt: bool = field_( + default=False, + metadata_cli=CliType.boolean_toggle("--{toggle_kw}{toggle_sep}opt"), + ) + http: Literal["auto", "1", "2"] = field_( + default="auto", metadata_cli=CliType.default("--http {value}") + ) + websockets: bool = field_( + default=True, metadata_cli=CliType.boolean_toggle("--{toggle_kw}{toggle_sep}ws") + ) + backlog: int = field_( + default=1024, metadata_cli=CliType.default("--backlog {value}") + ) + backpressure: int | None = field_( + default=None, metadata_cli=CliType.default("--backpressure {value}") + ) + http1_keep_alive: bool = field_( + default=True, metadata_cli=CliType.default("--http1-keep-alive {value}") + ) + http1_max_buffer_size: int = field_( + default=417792, metadata_cli=CliType.default("--http1-max-buffer-size {value}") + ) + http1_pipeline_flush: bool = field_( + default=False, metadata_cli=CliType.default("--http1-pipeline-flush {value}") + ) + http2_adaptive_window: bool = field_( + default=False, metadata_cli=CliType.default("--http2-adaptive-window {value}") + ) + http2_initial_connection_window_size: int = field_( + default=1048576, + metadata_cli=CliType.default("--http2-initial-connection-window-size {value}"), + ) + http2_initial_stream_window_size: int = field_( + default=1048576, + metadata_cli=CliType.default("--http2-initial-stream-window-size {value}"), + ) + http2_keep_alive_interval: int | None = field_( + default=None, + metadata_cli=CliType.default("--http2-keep-alive-interval {value}"), + ) + http2_keep_alive_timeout: int = field_( + default=20, metadata_cli=CliType.default("--http2-keep-alive-timeout {value}") + ) + http2_max_concurrent_streams: int = field_( + default=200, + metadata_cli=CliType.default("--http2-max-concurrent-streams {value}"), + ) + http2_max_frame_size: int = field_( + default=16384, metadata_cli=CliType.default("--http2-max-frame-size {value}") + ) + http2_max_headers_size: int = field_( + default=16777216, + metadata_cli=CliType.default("--http2-max-headers-size {value}"), + ) + http2_max_send_buffer_size: int = field_( + default=409600, + metadata_cli=CliType.default("--http2-max-send-buffer-size {value}"), + ) + log_enabled: bool = field_( + default=True, + metadata_cli=CliType.boolean_toggle("--{toggle_kw}{toggle_sep}log"), + ) + log_level: Literal["critical", "error", "warning", "warn", "info", "debug"] = ( + field_(default="info", metadata_cli=CliType.default("--log-level {value}")) + ) + log_dictconfig: dict[str, Any] | None = field_(default=None, metadata_cli=None) + log_access: bool = field_( + default=False, + metadata_cli=CliType.boolean_toggle("--{toggle_kw}{toggle_sep}log-access"), + ) + log_access_format: str | None = field_( + default=None, metadata_cli=CliType.default("--access-log-fmt {value}") + ) + ssl_cert: Path | None = field_( + default=None, metadata_cli=CliType.default("--ssl-certificate {value}") + ) + ssl_key: Path | None = field_( + default=None, metadata_cli=CliType.default("--ssl-keyfile {value}") + ) + ssl_key_password: str | None = field_( + default=None, metadata_cli=CliType.default("--ssl-keyfile-password {value}") + ) + url_path_prefix: str | None = field_( + default=None, metadata_cli=CliType.default("--url-path-prefix {value}") + ) + respawn_failed_workers: bool = field_( + default=False, + metadata_cli=CliType.boolean_toggle( + "--{toggle_kw}{toggle_sep}respawn-failed-workers" + ), + ) + respawn_interval: float = field_( + default=3.5, metadata_cli=CliType.default("--respawn-interval {value}") + ) + workers_lifetime: int | None = field_( + default=None, metadata_cli=CliType.default("--workers-lifetime {value}") + ) + factory: bool = field_( + default=False, + metadata_cli=CliType.boolean_toggle("--{toggle_kw}{toggle_sep}factory"), + ) + reload: bool = field_( + default=False, + metadata_cli=CliType.boolean_toggle("--{toggle_kw}{toggle_sep}reload"), + ) + reload_paths: list[Path] | None = field_( + default=None, metadata_cli=CliType.multiple("--reload-paths {value}") + ) + reload_ignore_dirs: list[str] | None = field_( + default=None, metadata_cli=CliType.multiple("--reload-ignore-dirs {value}") + ) + reload_ignore_patterns: list[str] | None = field_( + default=None, metadata_cli=CliType.multiple("--reload-ignore-patterns {value}") + ) + reload_ignore_paths: list[Path] | None = field_( + default=None, metadata_cli=CliType.multiple("--reload-ignore-paths {value}") + ) + reload_filter: object | None = field_( # type: ignore + default=None, metadata_cli=None + ) + process_name: str | None = field_( + default=None, metadata_cli=CliType.default("--process-name {value}") + ) + pid_file: Path | None = field_( + default=None, metadata_cli=CliType.default("--pid-file {value}") + ) + + def get_backend_bind(self) -> tuple[str, int]: + """Return the backend host and port. + + Returns: + tuple[str, int]: The host address and port. + """ + return self.address, self.port + + def check_import(self): + """Check package importation. + + Raises: + ImportError: raise when some required packaging missing. + """ + from importlib.util import find_spec + + errors: list[str] = [] + + if find_spec("granian") is None: + errors.append( + 'The `granian` package is required to run `GranianBackendServer`. Run `pip install "granian>=1.6.0"`.' + ) + + if find_spec("watchfiles") is None and self.reload: + errors.append( + 'Using `--reload` in `GranianBackendServer` requires the `watchfiles` extra. Run `pip install "watchfiles~=0.21"`.' + ) + + if errors: + console.error("\n".join(errors)) + raise ImportError() + + def setup(self, host: str, port: int, loglevel: LogLevel, env: Env): + """Setup. + + Args: + host (str): host address + port (int): port address + loglevel (LogLevel): log level + env (Env): prod/dev environment + """ + self.check_import() + self._app_uri = self.get_app_module(for_granian_target=True, add_extra_api=True) # type: ignore + self.log_level = loglevel.value # type: ignore + self.address = host + self.port = port + self.interface = "asgi" # NOTE: prevent obvious error + self._env = env # type: ignore + + if self.workers == self.get_fields()["workers"].default: + self.workers = self.get_recommended_workers() + else: + if self.workers > (max_workers := self.get_max_workers()): + self.workers = max_workers + + if self.threads == self.get_fields()["threads"].default: + self.threads = self.get_recommended_threads() + else: + if self.threads > (max_threads := self.get_max_threads()): + self.threads = max_threads + + def run_prod(self): + """Run in production mode. + + Returns: + list[str]: Command ready to be executed + """ + self.check_import() + command = ["granian"] + + for key, field in self.get_fields().items(): + if ( + field.metadata["exclude"] is False + and field.metadata["cli"] + and not self.is_default_value(key, (value := getattr(self, key))) + ): + command += field.metadata["cli"](value).split(" ") + + return [*command, self._app_uri] + + def run_dev(self): + """Run in development mode.""" + self.check_import() + from granian import Granian # type: ignore + + exclude_keys = ( + "http1_keep_alive", + "http1_max_buffer_size", + "http1_pipeline_flush", + "http2_adaptive_window", + "http2_initial_connection_window_size", + "http2_initial_stream_window_size", + "http2_keep_alive_interval", + "http2_keep_alive_timeout", + "http2_max_concurrent_streams", + "http2_max_frame_size", + "http2_max_headers_size", + "http2_max_send_buffer_size", + ) + + self._app = Granian( # type: ignore + **{ + **{ + key: value + for key, value in self.get_values().items() + if ( + key not in exclude_keys + and not self.is_default_value(key, value) + ) + }, + "target": self._app_uri, + "http1_settings": HTTP1Settings( + self.http1_keep_alive, + self.http1_max_buffer_size, + self.http1_pipeline_flush, + ), + "http2_settings": HTTP2Settings( + self.http2_adaptive_window, + self.http2_initial_connection_window_size, + self.http2_initial_stream_window_size, + self.http2_keep_alive_interval, + self.http2_keep_alive_timeout, + self.http2_max_concurrent_streams, + self.http2_max_frame_size, + self.http2_max_headers_size, + self.http2_max_send_buffer_size, + ), + } + ) + self._app.serve() + + async def shutdown(self): + """Shutdown the backend server.""" + if self._app and self._env == Env.DEV: + self._app.shutdown() diff --git a/reflex/server/gunicorn.py b/reflex/server/gunicorn.py new file mode 100644 index 000000000..63a6ba9ce --- /dev/null +++ b/reflex/server/gunicorn.py @@ -0,0 +1,406 @@ +"""The GunicornBackendServer.""" +# ruff: noqa: RUF009 + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Callable, Literal + +from reflex.constants.base import IS_WINDOWS, Env, LogLevel +from reflex.server.base import CliType, CustomBackendServer, field_ +from reflex.utils import console + + +@dataclass +class GunicornBackendServer(CustomBackendServer): + """Gunicorn backendServer. + + https://docs.gunicorn.org/en/latest/settings.html + """ + + config: str = field_( + default="./gunicorn.conf.py", metadata_cli=CliType.default("--config {value}") + ) + bind: list[str] = field_( + default=None, + default_factory=lambda: ["127.0.0.1:8000"], + metadata_cli=CliType.multiple("--bind {value}"), + ) + backlog: int = field_( + default=2048, metadata_cli=CliType.default("--backlog {value}") + ) + workers: int = field_(default=0, metadata_cli=CliType.default("--workers {value}")) + worker_class: Literal[ + "sync", + "eventlet", + "gevent", + "tornado", + "gthread", + "uvicorn.workers.UvicornH11Worker", + ] = field_(default="sync", metadata_cli=CliType.default("--worker-class {value}")) + threads: int = field_(default=0, metadata_cli=CliType.default("--threads {value}")) + worker_connections: int = field_( + default=1000, metadata_cli=CliType.default("--worker-connections {value}") + ) + max_requests: int = field_( + default=0, metadata_cli=CliType.default("--max-requests {value}") + ) + max_requests_jitter: int = field_( + default=0, metadata_cli=CliType.default("--max-requests-jitter {value}") + ) + timeout: int = field_(default=30, metadata_cli=CliType.default("--timeout {value}")) + graceful_timeout: int = field_( + default=30, metadata_cli=CliType.default("--graceful-timeout {value}") + ) + keepalive: int = field_( + default=2, metadata_cli=CliType.default("--keep-alive {value}") + ) + limit_request_line: int = field_( + default=4094, metadata_cli=CliType.default("--limit-request-line {value}") + ) + limit_request_fields: int = field_( + default=100, metadata_cli=CliType.default("--limit-request-fields {value}") + ) + limit_request_field_size: int = field_( + default=8190, metadata_cli=CliType.default("--limit-request-field_size {value}") + ) + reload: bool = field_(default=False, metadata_cli=CliType.boolean("--reload")) + reload_engine: Literal["auto", "poll", "inotify"] = field_( + default="auto", metadata_cli=CliType.default("--reload-engine {value}") + ) + reload_extra_files: list[str] = field_( + default=None, + default_factory=lambda: [], + metadata_cli=CliType.default("--reload-extra-file {value}"), + ) + spew: bool = field_(default=False, metadata_cli=CliType.boolean("--spew")) + check_config: bool = field_( + default=False, metadata_cli=CliType.boolean("--check-config") + ) + print_config: bool = field_( + default=False, metadata_cli=CliType.boolean("--print-config") + ) + preload_app: bool = field_(default=False, metadata_cli=CliType.boolean("--preload")) + sendfile: bool | None = field_( + default=None, metadata_cli=CliType.boolean("--no-sendfile", bool_value=False) + ) + reuse_port: bool = field_( + default=False, metadata_cli=CliType.boolean("--reuse-port") + ) + chdir: str = field_(default=".", metadata_cli=CliType.default("--chdir {value}")) + daemon: bool = field_(default=False, metadata_cli=CliType.boolean("--daemon")) + raw_env: list[str] = field_( + default=None, + default_factory=lambda: [], + metadata_cli=CliType.multiple("--env {value}"), + ) + pidfile: str | None = field_( + default=None, metadata_cli=CliType.default("--pid {value}") + ) + worker_tmp_dir: str | None = field_( + default=None, metadata_cli=CliType.default("--worker-tmp-dir {value}") + ) + user: int = field_(default=1000, metadata_cli=CliType.default("--user {value}")) + group: int = field_(default=1000, metadata_cli=CliType.default("--group {value}")) + umask: int = field_(default=0, metadata_cli=CliType.default("--umask {value}")) + initgroups: bool = field_( + default=False, metadata_cli=CliType.boolean("--initgroups") + ) + tmp_upload_dir: str | None = field_(default=None, metadata_cli=None) + secure_scheme_headers: dict[str, Any] = field_( + default=None, + default_factory=lambda: { + "X-FORWARDED-PROTOCOL": "ssl", + "X-FORWARDED-PROTO": "https", + "X-FORWARDED-SSL": "on", + }, + metadata_cli=None, + ) + forwarded_allow_ips: str = field_( + default="127.0.0.1,::1", + metadata_cli=CliType.default("--forwarded-allow-ips {value}"), + ) + accesslog: str | None = field_( + default=None, metadata_cli=CliType.default("--access-logfile {value}") + ) + disable_redirect_access_to_syslog: bool = field_( + default=False, + metadata_cli=CliType.boolean("--disable-redirect-access-to-syslog"), + ) + access_log_format: str = field_( + default='%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"', + metadata_cli=CliType.default("--access-logformat {value}"), + ) + errorlog: str = field_( + default="-", metadata_cli=CliType.default("--error-logfile {value}") + ) + loglevel: Literal["debug", "info", "warning", "error", "critical"] = field_( + default="info", metadata_cli=CliType.default("--log-level {value}") + ) + capture_output: bool = field_( + default=False, metadata_cli=CliType.boolean("--capture-output") + ) + logger_class: str = field_( + default="gunicorn.glogging.Logger", + metadata_cli=CliType.default("--logger-class {value}"), + ) + logconfig: str | None = field_( + default=None, metadata_cli=CliType.default("--log-config {value}") + ) + logconfig_dict: dict = field_( + default=None, default_factory=lambda: {}, metadata_cli=None + ) + logconfig_json: str | None = field_( + default=None, metadata_cli=CliType.default("--log-config-json {value}") + ) + syslog_addr: str = field_( + default="udp://localhost:514", + metadata_cli=CliType.default("--log-syslog-to {value}"), + ) + syslog: bool = field_(default=False, metadata_cli=CliType.boolean("--log-syslog")) + syslog_prefix: str | None = field_( + default=None, metadata_cli=CliType.default("--log-syslog-prefix {value}") + ) + syslog_facility: str = field_( + default="user", metadata_cli=CliType.default("--log-syslog-facility {value}") + ) + enable_stdio_inheritance: bool = field_( + default=False, metadata_cli=CliType.boolean("--enable-stdio-inheritance") + ) + statsd_host: str | None = field_( + default=None, metadata_cli=CliType.default("--statsd-host {value}") + ) + dogstatsd_tags: str = field_( + default="", metadata_cli=CliType.default("--dogstatsd-tags {value}") + ) + statsd_prefix: str = field_( + default="", metadata_cli=CliType.default("--statsd-prefix {value}") + ) + proc_name: str | None = field_( + default=None, metadata_cli=CliType.default("--name {value}") + ) + default_proc_name: str = field_(default="gunicorn", metadata_cli=None) + pythonpath: str | None = field_( + default=None, metadata_cli=CliType.default("--pythonpath {value}") + ) + paste: str | None = field_( + default=None, metadata_cli=CliType.default("--paster {value}") + ) + on_starting: Callable = field_(default=lambda server: None, metadata_cli=None) + on_reload: Callable = field_(default=lambda server: None, metadata_cli=None) + when_ready: Callable = field_(default=lambda server: None, metadata_cli=None) + pre_fork: Callable = field_(default=lambda server, worker: None, metadata_cli=None) + post_fork: Callable = field_(default=lambda server, worker: None, metadata_cli=None) + post_worker_init: Callable = field_(default=lambda worker: None, metadata_cli=None) + worker_int: Callable = field_(default=lambda worker: None, metadata_cli=None) + worker_abort: Callable = field_(default=lambda worker: None, metadata_cli=None) + pre_exec: Callable = field_(default=lambda server: None, metadata_cli=None) + pre_request: Callable = field_( + default=lambda worker, req: worker.log.debug("%s %s", req.method, req.path), + metadata_cli=None, + ) + post_request: Callable = field_( + default=lambda worker, req, environ, resp: None, metadata_cli=None + ) + child_exit: Callable = field_( + default=lambda server, worker: None, metadata_cli=None + ) + worker_exit: Callable = field_( + default=lambda server, worker: None, metadata_cli=None + ) + nworkers_changed: Callable = field_( + default=lambda server, new_value, old_value: None, metadata_cli=None + ) + on_exit: Callable = field_(default=lambda server: None, metadata_cli=None) + ssl_context: Callable[[Any, Any], Any] = field_( + default=lambda config, + default_ssl_context_factory: default_ssl_context_factory(), + metadata_cli=None, + ) + proxy_protocol: bool = field_( + default=False, metadata_cli=CliType.boolean("--proxy-protocol") + ) + proxy_allow_ips: str = field_( + default="127.0.0.1,::1", + metadata_cli=CliType.default("--proxy-allow-from {value}"), + ) + keyfile: str | None = field_( + default=None, metadata_cli=CliType.default("--keyfile {value}") + ) + certfile: str | None = field_( + default=None, metadata_cli=CliType.default("--certfile {value}") + ) + ssl_version: int = field_( + default=2, metadata_cli=CliType.default("--ssl-version {value}") + ) + cert_reqs: int = field_( + default=0, metadata_cli=CliType.default("--cert-reqs {value}") + ) + ca_certs: str | None = field_( + default=None, metadata_cli=CliType.default("--ca-certs {value}") + ) + suppress_ragged_eofs: bool = field_( + default=True, metadata_cli=CliType.boolean("--suppress-ragged-eofs") + ) + do_handshake_on_connect: bool = field_( + default=False, metadata_cli=CliType.boolean("--do-handshake-on-connect") + ) + ciphers: str | None = field_( + default=None, metadata_cli=CliType.default("--ciphers {value}") + ) + raw_paste_global_conf: list[str] = field_( + default=None, + default_factory=lambda: [], + metadata_cli=CliType.multiple("--paste-global {value}"), + ) + permit_obsolete_folding: bool = field_( + default=False, metadata_cli=CliType.boolean("--permit-obsolete-folding") + ) + strip_header_spaces: bool = field_( + default=False, metadata_cli=CliType.boolean("--strip-header-spaces") + ) + permit_unconventional_http_method: bool = field_( + default=False, + metadata_cli=CliType.boolean("--permit-unconventional-http-method"), + ) + permit_unconventional_http_version: bool = field_( + default=False, + metadata_cli=CliType.boolean("--permit-unconventional-http-version"), + ) + casefold_http_method: bool = field_( + default=False, metadata_cli=CliType.boolean("--casefold-http-method") + ) + forwarder_headers: str = field_( + default="SCRIPT_NAME,PATH_INFO", + metadata_cli=CliType.default("--forwarder-headers {value}"), + ) + header_map: Literal["drop", "refuse", "dangerous"] = field_( + default="drop", metadata_cli=CliType.default("--header-map {value}") + ) + + def get_backend_bind(self) -> tuple[str, int]: + """Return the backend host and port. + + Returns: + tuple[str, int]: The host address and port. + """ + host, port = self.bind[0].split(":") + return host, int(port) + + def check_import(self): + """Check package importation. + + Raises: + ImportError: raise when some required packaging missing. + """ + from importlib.util import find_spec + + errors: list[str] = [] + + if IS_WINDOWS: + errors.append( + "The `GunicornBackendServer` only works on UNIX machines. We recommend using the `UvicornBackendServer` for Windows machines." + ) + + if find_spec("gunicorn") is None: + errors.append( + 'The `gunicorn` package is required to run `GunicornBackendServer`. Run `pip install "gunicorn>=20.1.0"`.' + ) + + if errors: + console.error("\n".join(errors)) + raise ImportError() + + def setup(self, host: str, port: int, loglevel: LogLevel, env: Env): + """Setup. + + Args: + host (str): host address + port (int): port address + loglevel (LogLevel): log level + env (Env): prod/dev environment + """ + self.check_import() + self._app_uri = f"{self.get_app_module()}()" # type: ignore + self.loglevel = loglevel.value # type: ignore + self.bind = [f"{host}:{port}"] + self._env = env # type: ignore + + if self.workers == self.get_fields()["workers"].default: + self.workers = self.get_recommended_workers() + else: + if self.workers > (max_threads := self.get_max_workers()): + self.workers = max_threads + + if self.threads == self.get_fields()["threads"].default: + self.threads = self.get_recommended_threads() + else: + if self.threads > (max_threads := self.get_max_threads()): + self.threads = max_threads + + def run_prod(self) -> list[str]: + """Run in production mode. + + Returns: + list[str]: Command ready to be executed + """ + self.check_import() + command = ["gunicorn"] + + for key, field in self.get_fields().items(): + if ( + field.metadata["exclude"] is False + and field.metadata["cli"] + and not self.is_default_value(key, (value := getattr(self, key))) + ): + command += field.metadata["cli"](value).split(" ") + + return [*command, self._app_uri] + + def run_dev(self): + """Run in development mode.""" + self.check_import() + console.info( + "For development mode, we recommand to use `UvicornBackendServer` than `GunicornBackendServer`" + ) + + from gunicorn.app.base import BaseApplication + from gunicorn.util import import_app as gunicorn_import_app + + model = self.get_fields() + options_ = { + key: value + for key, value in self.get_values().items() + if value != model[key].default + } + + class StandaloneApplication(BaseApplication): + def __init__(self, app_uri, options=None): + self.options = options or {} + self._app_uri = app_uri + super().__init__() + + def load_config(self): + config = { + key: value + for key, value in self.options.items() + if key in self.cfg.settings and value is not None # type: ignore + } + for key, value in config.items(): + self.cfg.set(key.lower(), value) # type: ignore + + def load(self): + return gunicorn_import_app(self._app_uri) + + def stop(self): + from gunicorn.arbiter import Arbiter + + Arbiter(self).stop() + + self._app = StandaloneApplication(app_uri=self._app_uri, options=options_) # type: ignore + self._app.run() + + async def shutdown(self): + """Shutdown the backend server.""" + if self._app and self._env == Env.DEV: + self._app.stop() # type: ignore diff --git a/reflex/server/uvicorn.py b/reflex/server/uvicorn.py new file mode 100644 index 000000000..7caf91e0b --- /dev/null +++ b/reflex/server/uvicorn.py @@ -0,0 +1,278 @@ +"""The UvicornBackendServer.""" +# ruff: noqa: RUF009 + +from __future__ import annotations + +# `UvicornBackendServer` defer from other `*BackendServer`, because `uvicorn` he is natively integrated inside the reflex project via Fastapi (same for asyncio) +import asyncio +import os +import ssl +from configparser import RawConfigParser +from dataclasses import dataclass +from typing import IO, Any, Awaitable, Callable + +from uvicorn import Config, Server +from uvicorn.config import ( + LOGGING_CONFIG, + SSL_PROTOCOL_VERSION, + HTTPProtocolType, + InterfaceType, + LifespanType, + LoopSetupType, + WSProtocolType, +) + +from reflex.constants.base import Env, LogLevel +from reflex.server.base import CliType, CustomBackendServer, field_ +from reflex.utils import console + + +@dataclass +class UvicornBackendServer(CustomBackendServer): + """Uvicorn backendServer. + + https://www.uvicorn.org/settings/ + """ + + host: str = field_( + default="127.0.0.1", metadata_cli=CliType.default("--host {value}") + ) + port: int = field_(default=8000, metadata_cli=CliType.default("--port {value}")) + uds: str | None = field_( + default=None, metadata_cli=CliType.default("--uds {value}") + ) + fd: int | None = field_(default=None, metadata_cli=CliType.default("--fd {value}")) + loop: LoopSetupType = field_( + default="auto", metadata_cli=CliType.default("--loop {value}") + ) + http: type[asyncio.Protocol] | HTTPProtocolType = field_( + default="auto", metadata_cli=CliType.default("--http {value}") + ) + ws: type[asyncio.Protocol] | WSProtocolType = field_( + default="auto", metadata_cli=CliType.default("--ws {value}") + ) + ws_max_size: int = field_( + default=16777216, metadata_cli=CliType.default("--ws-max-size {value}") + ) + ws_max_queue: int = field_( + default=32, metadata_cli=CliType.default("--ws-max-queue {value}") + ) + ws_ping_interval: float | None = field_( + default=20.0, metadata_cli=CliType.default("--ws-ping-interval {value}") + ) + ws_ping_timeout: float | None = field_( + default=20.0, metadata_cli=CliType.default("--ws-ping-timeout {value}") + ) + ws_per_message_deflate: bool = field_( + default=True, metadata_cli=CliType.default("--ws-per-message-deflate {value}") + ) + lifespan: LifespanType = field_( + default="auto", metadata_cli=CliType.default("--lifespan {value}") + ) + env_file: str | os.PathLike[str] | None = field_( + default=None, metadata_cli=CliType.default("--env-file {value}") + ) + log_config: dict[str, Any] | str | RawConfigParser | IO[Any] | None = field_( + default=None, + default_factory=lambda: LOGGING_CONFIG, + metadata_cli=CliType.default("--log-config {value}"), + ) + log_level: str | int | None = field_( + default=None, metadata_cli=CliType.default("--log-level {value}") + ) + access_log: bool = field_( + default=True, + metadata_cli=CliType.boolean_toggle("--{toggle_kw}{toggle_sep}access-log"), + ) + use_colors: bool | None = field_( + default=None, + metadata_cli=CliType.boolean_toggle("--{toggle_kw}{toggle_sep}use-colors"), + ) + interface: InterfaceType = field_( + default="auto", metadata_cli=CliType.default("--interface {value}") + ) + reload: bool = field_( + default=False, metadata_cli=CliType.default("--reload {value}") + ) + reload_dirs: list[str] | str | None = field_( + default=None, metadata_cli=CliType.multiple("--reload_dir {value}") + ) + reload_delay: float = field_( + default=0.25, metadata_cli=CliType.default("--reload-delay {value}") + ) + reload_includes: list[str] | str | None = field_( + default=None, metadata_cli=CliType.multiple("----reload-include {value}") + ) + reload_excludes: list[str] | str | None = field_( + default=None, metadata_cli=CliType.multiple("--reload-exclude {value}") + ) + workers: int = field_(default=0, metadata_cli=CliType.default("--workers {value}")) + proxy_headers: bool = field_( + default=True, + metadata_cli=CliType.boolean_toggle("--{toggle_kw}{toggle_sep}proxy-headers"), + ) + server_header: bool = field_( + default=True, + metadata_cli=CliType.boolean_toggle("--{toggle_kw}{toggle_sep}server-header"), + ) + date_header: bool = field_( + default=True, + metadata_cli=CliType.boolean_toggle("--{toggle_kw}{toggle_sep}date-header"), + ) + forwarded_allow_ips: list[str] | str | None = field_( + default=None, + metadata_cli=CliType.multiple("--forwarded-allow-ips {value}", join_sep=","), + ) + root_path: str = field_( + default="", metadata_cli=CliType.default("--root-path {value}") + ) + limit_concurrency: int | None = field_( + default=None, metadata_cli=CliType.default("--limit-concurrency {value}") + ) + limit_max_requests: int | None = field_( + default=None, metadata_cli=CliType.default("--limit-max-requests {value}") + ) + backlog: int = field_( + default=2048, metadata_cli=CliType.default("--backlog {value}") + ) + timeout_keep_alive: int = field_( + default=5, metadata_cli=CliType.default("--timeout-keep-alive {value}") + ) + timeout_notify: int = field_(default=30, metadata_cli=None) + timeout_graceful_shutdown: int | None = field_( + default=None, + metadata_cli=CliType.default("--timeout-graceful-shutdown {value}"), + ) + callback_notify: Callable[..., Awaitable[None]] | None = field_( + default=None, metadata_cli=None + ) + ssl_keyfile: str | os.PathLike[str] | None = field_( + default=None, metadata_cli=CliType.default("--ssl-keyfile {value}") + ) + ssl_certfile: str | os.PathLike[str] | None = field_( + default=None, metadata_cli=CliType.default("--ssl-certfile {value}") + ) + ssl_keyfile_password: str | None = field_( + default=None, metadata_cli=CliType.default("--ssl-keyfile-password {value}") + ) + ssl_version: int = field_( + default=SSL_PROTOCOL_VERSION, + metadata_cli=CliType.default("--ssl-version {value}"), + ) + ssl_cert_reqs: int = field_( + default=ssl.CERT_NONE, metadata_cli=CliType.default("--ssl-cert-reqs {value}") + ) + ssl_ca_certs: str | None = field_( + default=None, metadata_cli=CliType.default("--ssl-ca-certs {value}") + ) + ssl_ciphers: str = field_( + default="TLSv1", metadata_cli=CliType.default("--ssl-ciphers {value}") + ) + headers: list[tuple[str, str]] | None = field_( + default=None, + metadata_cli=CliType.multiple( + "--header {value}", value_transformer=lambda value: f"{value[0]}:{value[1]}" + ), + ) + factory: bool = field_( + default=False, metadata_cli=CliType.default("--factory {value}") + ) + h11_max_incomplete_event_size: int | None = field_( + default=None, + metadata_cli=CliType.default("--h11-max-incomplete-event-size {value}"), + ) + + def get_backend_bind(self) -> tuple[str, int]: + """Return the backend host and port. + + Returns: + tuple[str, int]: The host address and port. + """ + return self.host, self.port + + def check_import(self): + """Check package importation. + + Raises: + ImportError: raise when some required packaging missing. + """ + from importlib.util import find_spec + + errors: list[str] = [] + + if find_spec("uvicorn") is None: + errors.append( + 'The `uvicorn` package is required to run `UvicornBackendServer`. Run `pip install "uvicorn>=0.20.0"`.' + ) + + if find_spec("watchfiles") is None and ( + self.reload_includes and self.reload_excludes + ): + errors.append( + 'Using `--reload-include` and `--reload-exclude` in `UvicornBackendServer` requires the `watchfiles` extra. Run `pip install "watchfiles>=0.13"`.' + ) + + if errors: + console.error("\n".join(errors)) + raise ImportError() + + def setup(self, host: str, port: int, loglevel: LogLevel, env: Env): + """Setup. + + Args: + host (str): host address + port (int): port address + loglevel (LogLevel): log level + env (Env): prod/dev environment + """ + self.check_import() + self._app_uri = self.get_app_module(add_extra_api=True) # type: ignore + self.log_level = loglevel.value + self.host = host + self.port = port + self._env = env # type: ignore + + if self.workers == self.get_fields()["workers"].default: + self.workers = self.get_recommended_workers() + else: + if self.workers > (max_workers := self.get_max_workers()): + self.workers = max_workers + + def run_prod(self) -> list[str]: + """Run in production mode. + + Returns: + list[str]: Command ready to be executed + """ + self.check_import() + command = ["uvicorn"] + + for key, field in self.get_fields().items(): + if ( + field.metadata["exclude"] is False + and field.metadata["cli"] + and not self.is_default_value(key, (value := getattr(self, key))) + ): + command += field.metadata["cli"](value).split(" ") + + return [*command, self._app_uri] + + def run_dev(self): + """Run in development mode.""" + self.check_import() + + options_ = { + key: value + for key, value in self.get_values().items() + if not self.is_default_value(key, value) + } + + self._app = Server( # type: ignore + config=Config(**options_, app=self._app_uri), + ) + self._app.run() + + async def shutdown(self): + """Shutdown the backend server.""" + if self._app and self._env == Env.DEV: + self._app.shutdown() # type: ignore diff --git a/reflex/utils/exec.py b/reflex/utils/exec.py index 621c4a608..b30915575 100644 --- a/reflex/utils/exec.py +++ b/reflex/utils/exec.py @@ -10,18 +10,21 @@ import re import subprocess import sys from pathlib import Path +from threading import Barrier, Event from urllib.parse import urljoin import psutil from reflex import constants from reflex.config import environment, get_config -from reflex.constants.base import LogLevel +from reflex.constants.base import Env, LogLevel from reflex.utils import console, path_ops from reflex.utils.prerequisites import get_web_dir # For uvicorn windows bug fix (#2335) frontend_process = None +barrier = Barrier(2) +failed_start_signal = Event() def detect_package_change(json_file_path: Path) -> str: @@ -61,8 +64,14 @@ def kill(proc_pid: int): process.kill() -def notify_backend(): - """Output a string notifying where the backend is running.""" +def notify_backend(only_backend: bool = False): + """Output a string notifying where the backend is running. + + Args: + only_backend: Whether the frontend is present. + """ + if not only_backend: + barrier.wait() console.print( f"Backend running at: [bold green]http://0.0.0.0:{get_config().backend_port}[/bold green]" ) @@ -110,8 +119,14 @@ def run_process_and_launch_url(run_command: list[str], backend_present=True): console.print( f"App running at: [bold green]{url}[/bold green]{' (Frontend-only mode)' if not backend_present else ''}" ) + if backend_present: - notify_backend() + barrier.wait() + if failed_start_signal.is_set(): + kill(process.pid) + process = None + break + first_run = False else: console.print("New packages detected: Updating app...") @@ -130,7 +145,7 @@ def run_process_and_launch_url(run_command: list[str], backend_present=True): kill(process.pid) process = None break # for line in process.stdout - if process is not None: + if (process is not None) or (failed_start_signal.is_set() and process is None): break # while True @@ -178,43 +193,10 @@ def run_frontend_prod(root: Path, port: str, backend_present=True): ) -def should_use_granian(): - """Whether to use Granian for backend. - - Returns: - True if Granian should be used. - """ - return environment.REFLEX_USE_GRANIAN.get() - - -def get_app_module(): - """Get the app module for the backend. - - Returns: - The app module for the backend. - """ - return f"reflex.app_module_for_backend:{constants.CompileVars.APP}" - - -def get_granian_target(): - """Get the Granian target for the backend. - - Returns: - The Granian target for the backend. - """ - import reflex - - app_module_path = Path(reflex.__file__).parent / "app_module_for_backend.py" - - return ( - f"{app_module_path!s}:{constants.CompileVars.APP}.{constants.CompileVars.API}" - ) - - def run_backend( host: str, port: int, - loglevel: constants.LogLevel = constants.LogLevel.ERROR, + loglevel: LogLevel = LogLevel.ERROR, frontend_present: bool = False, ): """Run the backend. @@ -226,86 +208,29 @@ def run_backend( frontend_present: Whether the frontend is present. """ web_dir = get_web_dir() + config = get_config() # Create a .nocompile file to skip compile for backend. if web_dir.exists(): (web_dir / constants.NOCOMPILE_FILE).touch() - if not frontend_present: - notify_backend() - # Run the backend in development mode. - if should_use_granian(): - run_granian_backend(host, port, loglevel) - else: - run_uvicorn_backend(host, port, loglevel) - - -def run_uvicorn_backend(host, port, loglevel: LogLevel): - """Run the backend in development mode using Uvicorn. - - Args: - host: The app host - port: The app port - loglevel: The log level. - """ - import uvicorn - - uvicorn.run( - app=f"{get_app_module()}.{constants.CompileVars.API}", - host=host, - port=port, - log_level=loglevel.value, - reload=True, - reload_dirs=[get_config().app_name], - ) - - -def run_granian_backend(host, port, loglevel: LogLevel): - """Run the backend in development mode using Granian. - - Args: - host: The app host - port: The app port - loglevel: The log level. - """ - console.debug("Using Granian for backend") + backend_server_dev = config.backend_server_dev try: - from granian import Granian # type: ignore - from granian.constants import Interfaces # type: ignore - from granian.log import LogLevels # type: ignore - - Granian( - target=get_granian_target(), - address=host, - port=port, - interface=Interfaces.ASGI, - log_level=LogLevels(loglevel.value), - reload=True, - reload_paths=[Path(get_config().app_name)], - reload_ignore_dirs=[".web"], - ).serve() + backend_server_dev.setup(host, port, loglevel, Env.DEV) except ImportError: - console.error( - 'InstallError: REFLEX_USE_GRANIAN is set but `granian` is not installed. (run `pip install "granian[reload]>=1.6.0"`)' - ) - os._exit(1) + if frontend_present: + failed_start_signal.set() + barrier.wait() # for unlock frontend server + return - -def _get_backend_workers(): - from reflex.utils import processes - - config = get_config() - return ( - processes.get_num_workers() - if not config.gunicorn_workers - else config.gunicorn_workers - ) + notify_backend(not frontend_present) + backend_server_dev.run_dev() def run_backend_prod( host: str, port: int, - loglevel: constants.LogLevel = constants.LogLevel.ERROR, + loglevel: LogLevel = LogLevel.ERROR, frontend_present: bool = False, ): """Run the backend. @@ -316,108 +241,31 @@ def run_backend_prod( loglevel: The log level. frontend_present: Whether the frontend is present. """ - if not frontend_present: - notify_backend() - - if should_use_granian(): - run_granian_backend_prod(host, port, loglevel) - else: - run_uvicorn_backend_prod(host, port, loglevel) - - -def run_uvicorn_backend_prod(host, port, loglevel): - """Run the backend in production mode using Uvicorn. - - Args: - host: The app host - port: The app port - loglevel: The log level. - """ from reflex.utils import processes config = get_config() - app_module = get_app_module() + # Run the backend in production mode. + backend_server_prod = config.backend_server_prod + try: + backend_server_prod.setup(host, port, loglevel, Env.PROD) + except ImportError: + if frontend_present: + failed_start_signal.set() + barrier.wait() # for unlock frontend server + return - RUN_BACKEND_PROD = f"gunicorn --worker-class {config.gunicorn_worker_class} --max-requests {config.gunicorn_max_requests} --max-requests-jitter {config.gunicorn_max_requests_jitter} --preload --timeout {config.timeout} --log-level critical".split() - RUN_BACKEND_PROD_WINDOWS = f"uvicorn --limit-max-requests {config.gunicorn_max_requests} --timeout-keep-alive {config.timeout}".split() - command = ( - [ - *RUN_BACKEND_PROD_WINDOWS, - "--host", - host, - "--port", - str(port), - app_module, - ] - if constants.IS_WINDOWS - else [ - *RUN_BACKEND_PROD, - "--bind", - f"{host}:{port}", - "--threads", - str(_get_backend_workers()), - f"{app_module}()", - ] - ) - - command += [ - "--log-level", - loglevel.value, - "--workers", - str(_get_backend_workers()), - ] + notify_backend(not frontend_present) processes.new_process( - command, + backend_server_prod.run_prod(), run=True, show_logs=True, env={ - environment.REFLEX_SKIP_COMPILE.name: "true" - }, # skip compile for prod backend + environment.REFLEX_SKIP_COMPILE.name: "true" # skip compile for prod backend + }, ) -def run_granian_backend_prod(host, port, loglevel): - """Run the backend in production mode using Granian. - - Args: - host: The app host - port: The app port - loglevel: The log level. - """ - from reflex.utils import processes - - try: - from granian.constants import Interfaces # type: ignore - - command = [ - "granian", - "--workers", - str(_get_backend_workers()), - "--log-level", - "critical", - "--host", - host, - "--port", - str(port), - "--interface", - str(Interfaces.ASGI), - get_granian_target(), - ] - processes.new_process( - command, - run=True, - show_logs=True, - env={ - environment.REFLEX_SKIP_COMPILE.name: "true" - }, # skip compile for prod backend - ) - except ImportError: - console.error( - 'InstallError: REFLEX_USE_GRANIAN is set but `granian` is not installed. (run `pip install "granian[reload]>=1.6.0"`)' - ) - - def output_system_info(): """Show system information if the loglevel is in DEBUG.""" if console._LOG_LEVEL > constants.LogLevel.DEBUG: