add granian prod&dev, add gunicorn dev

This commit is contained in:
KronosDev-Pro 2024-11-08 16:29:16 +00:00
parent ea06469370
commit 60ff800270
7 changed files with 556 additions and 273 deletions

View File

@ -697,15 +697,28 @@ class Config(Base):
env_file: Optional[str] = None
# Custom Backend Server
backend_server_prod: server.CustomBackendServer = server.GunicornBackendServer(
app=f"reflex.app_module_for_backend:{constants.CompileVars.APP}.{constants.CompileVars.API}",
worker_class="uvicorn.workers.UvicornH11Worker", # type: ignore
max_requests=100,
max_requests_jitter=25,
preload_app=True,
timeout=120,
# backend_server_prod: server.CustomBackendServer = server.GunicornBackendServer(
# worker_class="uvicorn.workers.UvicornH11Worker", # type: ignore
# max_requests=100,
# max_requests_jitter=25,
# timeout=120,
# )
backend_server_prod: server.CustomBackendServer = server.GranianBackendServer(
threads=2,
workers=4,
)
backend_server_dev: server.CustomBackendServer = server.UvicornBackendServer()
backend_server_dev: server.CustomBackendServer = server.GranianBackendServer(
threads=1,
workers=1,
)
# backend_server_dev: server.CustomBackendServer = server.GunicornBackendServer(
# worker_class="uvicorn.workers.UvicornH11Worker", # type: ignore
# max_requests=100,
# max_requests_jitter=25,
# timeout=120,
# threads=1,
# workers=1,
# )
def __init__(self, *args, **kwargs):
"""Initialize the config values.
@ -737,14 +750,26 @@ class Config(Base):
raise ConfigError(
"REDIS_URL is required when using the redis state manager."
)
print("[reflex.config::Config] --")
for key in ("timeout", "gunicorn_worker_class", "gunicorn_workers", "gunicorn_max_requests", "gunicorn_max_requests_jitter"):
for key in (
"timeout",
"gunicorn_worker_class",
"gunicorn_workers",
"gunicorn_max_requests",
"gunicorn_max_requests_jitter",
):
if isinstance(self.backend_server_prod, server.GunicornBackendServer):
value = self.get_value(key)
if value != self.backend_server_prod.get_fields()[key.replace("gunicorn_", "")].default and value is not None:
setattr(self.backend_server_prod, key.replace("gunicorn_", ""), value)
print("[reflex.config::Config] done")
if (
value
!= self.backend_server_prod.get_fields()[
key.replace("gunicorn_", "")
].default
and value is not None
):
setattr(
self.backend_server_prod, key.replace("gunicorn_", ""), value
)
@property
def module(self) -> str:

View File

@ -1,4 +1,4 @@
"""Import every *BackendServer."""
from .base import CustomBackendServer
from .granian import GranianBackendServer

View File

@ -1,14 +1,95 @@
from abc import abstractmethod, ABCMeta
"""The base for CustomBackendServer."""
from __future__ import annotations
import os
from abc import abstractmethod
from pathlib import Path
from reflex import constants
from reflex.base import Base
from reflex.constants.base import Env, LogLevel
class CustomBackendServer(Base):
"""BackendServer base."""
@staticmethod
def get_app_module(for_granian_target: bool = False, add_extra_api: bool = False):
"""Get the app module for the backend.
Returns:
The app module for the backend.
"""
import reflex
if for_granian_target:
app_path = str(Path(reflex.__file__).parent / "app_module_for_backend.py")
else:
app_path = "reflex.app_module_for_backend"
return f"{app_path}:{constants.CompileVars.APP}{f'.{constants.CompileVars.API}' if add_extra_api else ''}"
def get_available_cpus(self) -> int:
"""Get available cpus."""
return os.cpu_count() or 1
def get_max_workers(self) -> int:
"""Get max workers."""
# https://docs.gunicorn.org/en/latest/settings.html#workers
return (os.cpu_count() or 1) * 4 + 1
def get_recommended_workers(self) -> int:
"""Get recommended workers."""
# https://docs.gunicorn.org/en/latest/settings.html#workers
return (os.cpu_count() or 1) * 2 + 1
def get_max_threads(self, wait_time_ms: int = 50, service_time_ms: int = 5) -> int:
"""Get max threads."""
# https://engineering.zalando.com/posts/2019/04/how-to-set-an-ideal-thread-pool-size.html
# Brian Goetz formula
return int(self.get_available_cpus() * (1 + wait_time_ms / service_time_ms))
def get_recommended_threads(
self,
target_reqs: int | None = None,
wait_time_ms: int = 50,
service_time_ms: int = 5,
) -> int:
"""Get recommended threads."""
# https://engineering.zalando.com/posts/2019/04/how-to-set-an-ideal-thread-pool-size.html
max_available_threads = self.get_max_threads()
if target_reqs:
# Little's law formula
need_threads = target_reqs * (
(wait_time_ms / 1000) + (service_time_ms / 1000)
)
else:
need_threads = self.get_max_threads(wait_time_ms, service_time_ms)
return int(
max_available_threads
if need_threads > max_available_threads
else need_threads
)
@abstractmethod
def check_import(self, extra: bool = False):
"""Check package importation."""
raise NotImplementedError()
@abstractmethod
def setup(self, host: str, port: int, loglevel: LogLevel, env: Env):
"""Setup."""
raise NotImplementedError()
@abstractmethod
def run_prod(self):
"""Run in production mode."""
raise NotImplementedError()
@abstractmethod
def run_dev(self):
"""Run in development mode."""
raise NotImplementedError()

View File

@ -1,36 +1,272 @@
"""The GranianBackendServer."""
from __future__ import annotations
import sys
from dataclasses import dataclass
from dataclasses import field as dc_field
from pathlib import Path
from typing import Any, Literal, Type
from reflex.constants.base import Env, LogLevel
from reflex.server.base import CustomBackendServer
from reflex.utils import console
@dataclass
class HTTP1Settings: # https://github.com/emmett-framework/granian/blob/261ceba3fd93bca10300e91d1498bee6df9e3576/granian/http.py#L6
keep_alive: bool = True
max_buffer_size: int = 8192 + 4096 * 100
pipeline_flush: bool = False
class HTTP1Settings:
"""Granian HTTP1Settings."""
# https://github.com/emmett-framework/granian/blob/261ceba3fd93bca10300e91d1498bee6df9e3576/granian/http.py#L6
keep_alive: bool = dc_field(default=True)
max_buffer_size: int = dc_field(default=8192 + 4096 * 100)
pipeline_flush: bool = dc_field(default=False)
@dataclass
class HTTP2Settings: # https://github.com/emmett-framework/granian/blob/261ceba3fd93bca10300e91d1498bee6df9e3576/granian/http.py#L13
adaptive_window: bool = False
initial_connection_window_size: int = 1024 * 1024
initial_stream_window_size: int = 1024 * 1024
keep_alive_interval: int | None = None
keep_alive_timeout: int = 20
max_concurrent_streams: int = 200
max_frame_size: int = 1024 * 16
max_headers_size: int = 16 * 1024 * 1024
max_send_buffer_size: int = 1024 * 400
"""Granian HTTP2Settings."""
adaptive_window: bool = dc_field(default=False)
initial_connection_window_size: int = dc_field(default=1024 * 1024)
initial_stream_window_size: int = dc_field(default=1024 * 1024)
keep_alive_interval: int | None = dc_field(default=None)
keep_alive_timeout: int = dc_field(default=20)
max_concurrent_streams: int = dc_field(default=200)
max_frame_size: int = dc_field(default=1024 * 16)
max_headers_size: int = dc_field(default=16 * 1024 * 1024)
max_send_buffer_size: int = dc_field(default=1024 * 400)
try:
import watchfiles
import watchfiles # type: ignore
except ImportError:
watchfiles = None
_mapping_attr_to_cli: dict[str, str] = {
"address": "--host",
"port": "--port",
"interface": "--interface",
"http": "--http",
"websockets": "--ws", # NOTE: when `websockets` True: `--ws`; False: `--no-ws`
"workers": "--workers",
"threads": "--threads",
"blocking_threads": "--blocking-threads",
"threading_mode": "--threading-mode",
"loop": "--loop",
"loop_opt": "--opt", # NOTE: when `loop_opt` True: `--opt`; False: `--no-opt`
"backlog": "--backlog",
"backpressure": "--backpressure",
"http1_keep_alive": "--http1-keep-alive",
"http1_max_buffer_size": "--http1-max-buffer-size",
"http1_pipeline_flush": "--http1-pipeline-flush",
"http2_adaptive_window": "--http2-adaptive-window",
"http2_initial_connection_window_size": "--http2-initial-connection-window-size",
"http2_initial_stream_window_size": "--http2-initial-stream-window-size",
"http2_keep_alive_interval": "--http2-keep-alive-interval",
"http2_keep_alive_timeout": "--http2-keep-alive-timeout",
"http2_max_concurrent_streams": "--http2-max-concurrent-streams",
"http2_max_frame_size": "--http2-max-frame-size",
"http2_max_headers_size": "--http2-max-headers-size",
"http2_max_send_buffer_size": "--http2-max-send-buffer-size",
"log_enabled": "--log", # NOTE: when `log_enabled` True: `--log`; False: `--no-log`
"log_level": "--log-level",
"log_access": "--log-access", # NOTE: when `log_access` True: `--log-access`; False: `--no-log-access`
"log_access_format": "--access-log-fmt",
"ssl_cert": "--ssl-certificate",
"ssl_key": "--ssl-keyfile",
"ssl_key_password": "--ssl-keyfile-password",
"url_path_prefix": "--url-path-prefix",
"respawn_failed_workers": "--respawn-failed-workers", # NOTE: when `respawn_failed_workers` True: `--respawn-failed-workers`; False: `--no-respawn-failed-workers`
"respawn_interval": "--respawn-interval",
"workers_lifetime": "--workers-lifetime",
"factory": "--factory", # NOTE: when `factory` True: `--factory`; False: `--no-factory`
"reload": "--reload", # NOTE: when `reload` True: `--reload`; False: `--no-reload`
"reload_paths": "--reload-paths",
"reload_ignore_dirs": "--reload-ignore-dirs",
"reload_ignore_patterns": "--reload-ignore-patterns",
"reload_ignore_paths": "--reload-ignore-paths",
"process_name": "--process-name",
"pid_file": "--pid-file",
}
class GranianBackendServer(CustomBackendServer):
"""Granian backendServer."""
# https://github.com/emmett-framework/granian/blob/fc11808ed177362fcd9359a455a733065ddbc505/granian/server.py#L69
target: str | None = None
address: str = "127.0.0.1"
port: int = 8000
interface: Literal["asgi", "asginl", "rsgi", "wsgi"] = "rsgi"
workers: int = 0
threads: int = 0
blocking_threads: int | None = None
threading_mode: Literal["runtime", "workers"] = "workers"
loop: Literal["auto", "asyncio", "uvloop"] = "auto"
loop_opt: bool = False
http: Literal["auto", "1", "2"] = "auto"
websockets: bool = True
backlog: int = 1024
backpressure: int | None = None
# http1_settings: HTTP1Settings | None = None
# NOTE: child of http1_settings, needed only for cli mode
http1_keep_alive: bool = HTTP1Settings.keep_alive
http1_max_buffer_size: int = HTTP1Settings.max_buffer_size
http1_pipeline_flush: bool = HTTP1Settings.pipeline_flush
# http2_settings: HTTP2Settings | None = None
# NOTE: child of http2_settings, needed only for cli mode
http2_adaptive_window: bool = HTTP2Settings.adaptive_window
http2_initial_connection_window_size: int = (
HTTP2Settings.initial_connection_window_size
)
http2_initial_stream_window_size: int = HTTP2Settings.initial_stream_window_size
http2_keep_alive_interval: int | None = HTTP2Settings.keep_alive_interval
http2_keep_alive_timeout: int = HTTP2Settings.keep_alive_timeout
http2_max_concurrent_streams: int = HTTP2Settings.max_concurrent_streams
http2_max_frame_size: int = HTTP2Settings.max_frame_size
http2_max_headers_size: int = HTTP2Settings.max_headers_size
http2_max_send_buffer_size: int = HTTP2Settings.max_send_buffer_size
log_enabled: bool = True
log_level: Literal["critical", "error", "warning", "warn", "info", "debug"] = "info"
log_dictconfig: dict[str, Any] | None = None
log_access: bool = False
log_access_format: str | None = None
ssl_cert: Path | None = None
ssl_key: Path | None = None
ssl_key_password: str | None = None
url_path_prefix: str | None = None
respawn_failed_workers: bool = False
respawn_interval: float = 3.5
workers_lifetime: int | None = None
factory: bool = False
reload: bool = False
reload_paths: list[Path] | None = None
reload_ignore_dirs: list[str] | None = None
reload_ignore_patterns: list[str] | None = None
reload_ignore_paths: list[Path] | None = None
reload_filter: Type[getattr(watchfiles, "BaseFilter", None)] | None = None # type: ignore
process_name: str | None = None
pid_file: Path | None = None
def check_import(self, extra: bool = False):
"""Check package importation."""
from importlib.util import find_spec
errors: list[str] = []
if find_spec("granian") is None:
errors.append(
'The `granian` package is required to run `GranianBackendServer`. Run `pip install "granian>=1.6.0"`.'
)
if find_spec("watchfiles") is None and extra:
# NOTE: the `\[` is for force `rich.Console` to not consider it like a color or anything else which he not printing `[.*]`
errors.append(
r'Using --reload in `GranianBackendServer` requires the granian\[reload] extra. Run `pip install "granian\[reload]>=1.6.0"`.'
) # type: ignore
if errors:
console.error("\n".join(errors))
sys.exit()
def setup(self, host: str, port: int, loglevel: LogLevel, env: Env):
"""Setup."""
self.target = self.get_app_module(for_granian_target=True, add_extra_api=True)
self.log_level = loglevel.value # type: ignore
self.address = host
self.port = port
self.interface = "asgi" # prevent obvious error
if env == Env.PROD:
if self.workers == self.get_fields()["workers"].default:
self.workers = self.get_recommended_workers()
else:
if self.workers > (max_threads := self.get_max_workers()):
self.workers = max_threads
if self.threads == self.get_fields()["threads"].default:
self.threads = self.get_recommended_threads()
else:
if self.threads > (max_threads := self.get_max_threads()):
self.threads = max_threads
if env == Env.DEV:
from reflex.config import get_config # prevent circular import
self.reload = True
self.reload_paths = [Path(get_config().app_name)]
self.reload_ignore_dirs = [".web"]
def run_prod(self):
pass
"""Run in production mode."""
self.check_import()
command = ["granian"]
for key, field in self.get_fields().items():
if key != "target":
value = getattr(self, key)
if _mapping_attr_to_cli.get(key) and value != field.default:
if isinstance(value, list):
for v in value:
command += [_mapping_attr_to_cli[key], str(v)]
elif isinstance(value, bool):
command.append(
f"--{'no-' if value is False else ''}{_mapping_attr_to_cli[key][2:]}"
)
else:
command += [_mapping_attr_to_cli[key], str(value)]
return command + [
self.get_app_module(for_granian_target=True, add_extra_api=True)
]
def run_dev(self):
pass
"""Run in development mode."""
self.check_import(extra=self.reload)
from granian import Granian
exclude_keys = (
"http1_keep_alive",
"http1_max_buffer_size",
"http1_pipeline_flush",
"http2_adaptive_window",
"http2_initial_connection_window_size",
"http2_initial_stream_window_size",
"http2_keep_alive_interval",
"http2_keep_alive_timeout",
"http2_max_concurrent_streams",
"http2_max_frame_size",
"http2_max_headers_size",
"http2_max_send_buffer_size",
)
model = self.get_fields()
Granian(
**{
**{
key: value
for key, value in self.dict().items()
if key not in exclude_keys and value != model[key].default
},
"http1_settings": HTTP1Settings(
self.http1_keep_alive,
self.http1_max_buffer_size,
self.http1_pipeline_flush,
),
"http2_settings": HTTP2Settings(
self.http2_adaptive_window,
self.http2_initial_connection_window_size,
self.http2_initial_stream_window_size,
self.http2_keep_alive_interval,
self.http2_keep_alive_timeout,
self.http2_max_concurrent_streams,
self.http2_max_frame_size,
self.http2_max_headers_size,
self.http2_max_send_buffer_size,
),
}
).serve()

View File

@ -1,34 +1,15 @@
from typing import Any, Literal, Callable
"""The GunicornBackendServer."""
from __future__ import annotations
import os
import sys
import ssl
from pydantic import Field
import sys
from typing import Any, Callable, Literal
from gunicorn.app.base import BaseApplication
import psutil
from reflex import constants
from reflex.constants.base import Env, LogLevel
from reflex.server.base import CustomBackendServer
class StandaloneApplication(BaseApplication):
def __init__(self, app, options=None):
self.options = options or {}
self.application = app
super().__init__()
def load_config(self):
config = {key: value for key, value in self.options.items()
if key in self.cfg.settings and value is not None} # type: ignore
for key, value in config.items():
self.cfg.set(key.lower(), value) # type: ignore
def load(self):
return self.application
from reflex.utils import console
_mapping_attr_to_cli: dict[str, str] = {
"config": "--config",
@ -105,10 +86,13 @@ _mapping_attr_to_cli: dict[str, str] = {
"header_map": "--header-map",
}
class GunicornBackendServer(CustomBackendServer):
"""Gunicorn backendServer."""
# https://github.com/benoitc/gunicorn/blob/bacbf8aa5152b94e44aa5d2a94aeaf0318a85248/gunicorn/config.py
app: str
app_uri: str | None
config: str = "./gunicorn.conf.py"
"""\
@ -128,7 +112,7 @@ class GunicornBackendServer(CustomBackendServer):
A WSGI application path in pattern ``$(MODULE_NAME):$(VARIABLE_NAME)``.
"""
bind: list[str] = ['127.0.0.1:8000']
bind: list[str] = ["127.0.0.1:8000"]
"""\
The socket to bind.
@ -162,7 +146,7 @@ class GunicornBackendServer(CustomBackendServer):
Must be a positive integer. Generally set in the 64-2048 range.
"""
workers: int = 1
workers: int = 0
"""\
The number of worker processes for handling requests.
@ -175,7 +159,14 @@ class GunicornBackendServer(CustomBackendServer):
it is not defined, the default is ``1``.
"""
worker_class: Literal["sync", "eventlet", "gevent", "tornado", "gthread", "uvicorn.workers.UvicornH11Worker"] = "sync"
worker_class: Literal[
"sync",
"eventlet",
"gevent",
"tornado",
"gthread",
"uvicorn.workers.UvicornH11Worker",
] = "sync"
"""\
The type of workers to use.
@ -202,7 +193,7 @@ class GunicornBackendServer(CustomBackendServer):
``gunicorn.workers.ggevent.GeventWorker``.
"""
threads: int = 1
threads: int = 0
"""\
The number of worker threads for handling requests.
@ -493,7 +484,11 @@ class GunicornBackendServer(CustomBackendServer):
temporary directory.
"""
secure_scheme_headers: dict[str, Any] = {'X-FORWARDED-PROTOCOL': 'ssl', 'X-FORWARDED-PROTO': 'https', 'X-FORWARDED-SSL': 'on'}
secure_scheme_headers: dict[str, Any] = {
"X-FORWARDED-PROTOCOL": "ssl",
"X-FORWARDED-PROTO": "https",
"X-FORWARDED-SSL": "on",
}
"""\
A dictionary containing headers and values that the front-end proxy
uses to indicate HTTPS requests. If the source IP is permitted by
@ -588,7 +583,9 @@ class GunicornBackendServer(CustomBackendServer):
Disable redirect access logs to syslog.
"""
access_log_format: str = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'
access_log_format: str = (
'%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'
)
"""\
The access log format.
@ -686,7 +683,11 @@ class GunicornBackendServer(CustomBackendServer):
if sys.platform == "darwin"
else (
"unix:///var/run/log"
if sys.platform in ('freebsd', 'dragonfly', )
if sys.platform
in (
"freebsd",
"dragonfly",
)
else (
"unix:///dev/log"
if sys.platform == "openbsd"
@ -858,7 +859,9 @@ class GunicornBackendServer(CustomBackendServer):
The callable needs to accept a single instance variable for the Arbiter.
"""
pre_request: Callable = lambda worker, req: worker.log.debug("%s %s", req.method, req.path)
pre_request: Callable = lambda worker, req: worker.log.debug(
"%s %s", req.method, req.path
)
"""\
Called just before a worker processes the request.
@ -908,7 +911,9 @@ class GunicornBackendServer(CustomBackendServer):
The callable needs to accept a single instance variable for the Arbiter.
"""
ssl_context: Callable = lambda config, default_ssl_context_factory: default_ssl_context_factory()
ssl_context: Callable = (
lambda config, default_ssl_context_factory: default_ssl_context_factory()
)
"""\
Called when SSLContext is needed.
@ -975,7 +980,9 @@ class GunicornBackendServer(CustomBackendServer):
SSL certificate file
"""
ssl_version: int = ssl.PROTOCOL_TLS if hasattr(ssl, "PROTOCOL_TLS") else ssl.PROTOCOL_SSLv23
ssl_version: int = (
ssl.PROTOCOL_TLS if hasattr(ssl, "PROTOCOL_TLS") else ssl.PROTOCOL_SSLv23
)
"""\
SSL version to use (see stdlib ssl module's).
@ -1163,33 +1170,96 @@ class GunicornBackendServer(CustomBackendServer):
on a proxy in front of Gunicorn.
"""
# def __init__(self, *args, **kwargs):
# super().__init__(*args, **kwargs)
def check_import(self, extra: bool = False):
"""Check package importation."""
from importlib.util import find_spec
errors: list[str] = []
if find_spec("gunicorn") is None:
errors.append(
'The `gunicorn` package is required to run `GunicornBackendServer`. Run `pip install "gunicorn>=20.1.0"`.'
)
if errors:
console.error("\n".join(errors))
sys.exit()
def setup(self, host: str, port: int, loglevel: LogLevel, env: Env):
"""Setup."""
self.app_uri = f"{self.get_app_module()}()"
self.loglevel = loglevel.value # type: ignore
self.bind = [f"{host}:{port}"]
if env == Env.PROD:
if self.workers == self.get_fields()["workers"].default:
self.workers = self.get_recommended_workers()
else:
if self.workers > (max_threads := self.get_max_workers()):
self.workers = max_threads
if self.threads == self.get_fields()["threads"].default:
self.threads = self.get_recommended_threads()
else:
if self.threads > (max_threads := self.get_max_threads()):
self.threads = max_threads
self.preload_app = True
if env == Env.DEV:
self.reload = True
def run_prod(self) -> list[str]:
print("[reflex.server.gunicorn::GunicornBackendServer] start")
"""Run in production mode."""
self.check_import()
command = ["gunicorn"]
for key,field in self.get_fields().items():
for key, field in self.get_fields().items():
if key != "app":
value = self.__getattribute__(key)
if key == "preload":
print(_mapping_attr_to_cli.get(key, None), value, field.default)
if _mapping_attr_to_cli.get(key, None):
if value != field.default:
if isinstance(value, list):
for v in value:
command += [_mapping_attr_to_cli[key], str(v)]
elif isinstance(value, bool):
value = getattr(self, key)
if _mapping_attr_to_cli.get(key) and value != field.default:
if isinstance(value, list):
for v in value:
command += [_mapping_attr_to_cli[key], str(v)]
elif isinstance(value, bool):
if (key == "sendfile" and value is False) or (
key != "sendfile" and value
):
command.append(_mapping_attr_to_cli[key])
else:
command += [_mapping_attr_to_cli[key], str(value)]
else:
command += [_mapping_attr_to_cli[key], str(value)]
print("[reflex.server.gunicorn::GunicornBackendServer] done")
return command + [f"reflex.app_module_for_backend:{constants.CompileVars.APP}()"]
return command + [f"{self.get_app_module()}()"]
def run_dev(self):
StandaloneApplication(
app=self.app,
options=self.dict().items()
).run()
"""Run in development mode."""
self.check_import()
console.info(
"For development mode, we recommand to use `UvicornBackendServer` than `GunicornBackendServer`"
)
from gunicorn.app.base import BaseApplication
from gunicorn.util import import_app as gunicorn_import_app
options_ = self.dict()
options_.pop("app", None)
class StandaloneApplication(BaseApplication):
def __init__(self, app_uri, options=None):
self.options = options or {}
self.app_uri = app_uri
super().__init__()
def load_config(self):
config = {
key: value
for key, value in self.options.items()
if key in self.cfg.settings and value is not None
} # type: ignore
for key, value in config.items():
self.cfg.set(key.lower(), value) # type: ignore
def load(self):
return gunicorn_import_app(self.app_uri)
StandaloneApplication(app_uri=self.app_uri, options=options_).run()

View File

@ -1,10 +1,41 @@
"""The UvicornBackendServer."""
from __future__ import annotations
import sys
from reflex.constants.base import Env, LogLevel
from reflex.server.base import CustomBackendServer
from reflex.utils import console
# TODO
class UvicornBackendServer(CustomBackendServer):
"""Uvicorn backendServer."""
def check_import(self, extra: bool = False):
"""Check package importation."""
from importlib.util import find_spec
errors: list[str] = []
if find_spec("uvicorn") is None:
errors.append(
'The `uvicorn` package is required to run `UvicornBackendServer`. Run `pip install "uvicorn>=0.20.0"`.'
)
if errors:
console.error("\n".join(errors))
sys.exit()
def setup(self, host: str, port: int, loglevel: LogLevel, env: Env):
"""Setup."""
pass
def run_prod(self):
"""Run in production mode."""
pass
def run_dev(self):
"""Run in development mode."""
pass

View File

@ -2,9 +2,6 @@
from __future__ import annotations
from abc import abstractmethod, ABCMeta
from typing import IO, Any, Literal, Sequence, Type
import hashlib
import json
import os
@ -12,20 +9,14 @@ import platform
import re
import subprocess
import sys
import asyncio
import ssl
from configparser import RawConfigParser
from pathlib import Path
from urllib.parse import urljoin
from pydantic import Field
from dataclasses import dataclass
import psutil
from reflex import constants, server
from reflex.base import Base
from reflex import constants
from reflex.config import environment, get_config
from reflex.constants.base import LogLevel
from reflex.constants.base import Env, LogLevel
from reflex.utils import console, path_ops
from reflex.utils.prerequisites import get_web_dir
@ -187,44 +178,11 @@ def run_frontend_prod(root: Path, port: str, backend_present=True):
)
def should_use_granian():
"""Whether to use Granian for backend.
Returns:
True if Granian should be used.
"""
return environment.REFLEX_USE_GRANIAN.get()
def get_app_module():
"""Get the app module for the backend.
Returns:
The app module for the backend.
"""
return f"reflex.app_module_for_backend:{constants.CompileVars.APP}"
### REWORK <--
def get_granian_target():
"""Get the Granian target for the backend.
Returns:
The Granian target for the backend.
"""
import reflex
app_module_path = Path(reflex.__file__).parent / "app_module_for_backend.py"
return (
f"{app_module_path!s}:{constants.CompileVars.APP}.{constants.CompileVars.API}"
)
def run_backend(
host: str,
port: int,
loglevel: constants.LogLevel = constants.LogLevel.ERROR,
loglevel: LogLevel = LogLevel.ERROR,
frontend_present: bool = False,
):
"""Run the backend.
@ -236,6 +194,7 @@ def run_backend(
frontend_present: Whether the frontend is present.
"""
web_dir = get_web_dir()
config = get_config()
# Create a .nocompile file to skip compile for backend.
if web_dir.exists():
(web_dir / constants.NOCOMPILE_FILE).touch()
@ -244,78 +203,15 @@ def run_backend(
notify_backend()
# Run the backend in development mode.
if should_use_granian():
run_granian_backend(host, port, loglevel)
else:
run_uvicorn_backend(host, port, loglevel)
def run_uvicorn_backend(host, port, loglevel: LogLevel):
"""Run the backend in development mode using Uvicorn.
Args:
host: The app host
port: The app port
loglevel: The log level.
"""
import uvicorn
uvicorn.run(
app=f"{get_app_module()}.{constants.CompileVars.API}",
host=host,
port=port,
log_level=loglevel.value,
reload=True,
reload_dirs=[get_config().app_name],
)
def run_granian_backend(host, port, loglevel: LogLevel):
"""Run the backend in development mode using Granian.
Args:
host: The app host
port: The app port
loglevel: The log level.
"""
console.debug("Using Granian for backend")
try:
from granian import Granian # type: ignore
from granian.constants import Interfaces # type: ignore
from granian.log import LogLevels # type: ignore
Granian(
target=get_granian_target(),
address=host,
port=port,
interface=Interfaces.ASGI,
log_level=LogLevels(loglevel.value),
reload=True,
reload_paths=[Path(get_config().app_name)],
reload_ignore_dirs=[".web"],
).serve()
except ImportError:
console.error(
'InstallError: REFLEX_USE_GRANIAN is set but `granian` is not installed. (run `pip install "granian[reload]>=1.6.0"`)'
)
os._exit(1)
def _get_backend_workers():
from reflex.utils import processes
config = get_config()
return (
processes.get_num_workers()
if not config.gunicorn_workers
else config.gunicorn_workers
)
backend_server_prod = config.backend_server_prod
backend_server_prod.setup(host, port, loglevel, Env.DEV)
backend_server_prod.run_dev()
def run_backend_prod(
host: str,
port: int,
loglevel: constants.LogLevel = constants.LogLevel.ERROR,
loglevel: LogLevel = LogLevel.ERROR,
frontend_present: bool = False,
):
"""Run the backend.
@ -326,77 +222,24 @@ def run_backend_prod(
loglevel: The log level.
frontend_present: Whether the frontend is present.
"""
print("[reflex.utils.exec::run_backend_prod] start")
if not frontend_present:
notify_backend()
from reflex.utils import processes
config = get_config()
if should_use_granian():
run_granian_backend_prod(host, port, loglevel)
else:
from reflex.utils import processes
if not frontend_present:
notify_backend()
backend_server_prod = config.backend_server_prod
if isinstance(backend_server_prod, server.GunicornBackendServer):
backend_server_prod.app = f"{get_app_module()}()"
backend_server_prod.preload_app = True
backend_server_prod.loglevel = loglevel.value # type: ignore
backend_server_prod.bind = [f"{host}:{port}"]
backend_server_prod.threads = _get_backend_workers()
backend_server_prod.workers = _get_backend_workers()
print(backend_server_prod.run_prod())
processes.new_process(
backend_server_prod.run_prod(),
run=True,
show_logs=True,
env={
environment.REFLEX_SKIP_COMPILE.name: "true"
}, # skip compile for prod backend
)
print("[reflex.utils.exec::run_backend_prod] done")
def run_granian_backend_prod(host, port, loglevel):
"""Run the backend in production mode using Granian.
Args:
host: The app host
port: The app port
loglevel: The log level.
"""
from reflex.utils import processes
try:
from granian.constants import Interfaces # type: ignore
command = [
"granian",
"--workers",
str(_get_backend_workers()),
"--log-level",
"critical",
"--host",
host,
"--port",
str(port),
"--interface",
str(Interfaces.ASGI),
get_granian_target(),
]
processes.new_process(
command,
run=True,
show_logs=True,
env={
environment.REFLEX_SKIP_COMPILE.name: "true"
}, # skip compile for prod backend
)
except ImportError:
console.error(
'InstallError: REFLEX_USE_GRANIAN is set but `granian` is not installed. (run `pip install "granian[reload]>=1.6.0"`)'
)
# Run the backend in production mode.
backend_server_prod = config.backend_server_prod
backend_server_prod.setup(host, port, loglevel, Env.PROD)
processes.new_process(
backend_server_prod.run_prod(),
run=True,
show_logs=True,
env={
environment.REFLEX_SKIP_COMPILE.name: "true"
}, # skip compile for prod backend
)
### REWORK-->
@ -522,6 +365,3 @@ def should_skip_compile() -> bool:
removal_version="0.7.0",
)
return environment.REFLEX_SKIP_COMPILE.get()
### REWORK <--