give option to only use main thread (#4809)

* give option to only use main thread

* change default to main thread

* fix comment

* default to None, as 0 would raise a ValueError

Co-authored-by: Masen Furer <m_github@0x26.net>

* add warning about passing 0

* move executor to config

---------

Co-authored-by: Masen Furer <m_github@0x26.net>
This commit is contained in:
Khaleel Al-Adhami 2025-02-13 18:36:30 -08:00 committed by GitHub
parent 8e579efe47
commit 7c4257a222
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 127 additions and 32 deletions

View File

@ -11,12 +11,11 @@ import functools
import inspect import inspect
import io import io
import json import json
import multiprocessing
import platform
import sys import sys
import traceback import traceback
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from timeit import default_timer as timer
from types import SimpleNamespace from types import SimpleNamespace
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
@ -76,7 +75,7 @@ from reflex.components.core.client_side_routing import (
from reflex.components.core.sticky import sticky from reflex.components.core.sticky import sticky
from reflex.components.core.upload import Upload, get_upload_dir from reflex.components.core.upload import Upload, get_upload_dir
from reflex.components.radix import themes from reflex.components.radix import themes
from reflex.config import environment, get_config from reflex.config import ExecutorType, environment, get_config
from reflex.event import ( from reflex.event import (
_EVENT_FIELDS, _EVENT_FIELDS,
Event, Event,
@ -1114,10 +1113,23 @@ class App(MiddlewareMixin, LifespanMixin):
app_wrappers[(1, "ToasterProvider")] = toast_provider app_wrappers[(1, "ToasterProvider")] = toast_provider
with console.timing("Evaluate Pages (Frontend)"): with console.timing("Evaluate Pages (Frontend)"):
performance_metrics: list[tuple[str, float]] = []
for route in self._unevaluated_pages: for route in self._unevaluated_pages:
console.debug(f"Evaluating page: {route}") console.debug(f"Evaluating page: {route}")
start = timer()
self._compile_page(route, save_page=should_compile) self._compile_page(route, save_page=should_compile)
end = timer()
performance_metrics.append((route, end - start))
progress.advance(task) progress.advance(task)
console.debug(
"Slowest pages:\n"
+ "\n".join(
f"{route}: {time * 1000:.1f}ms"
for route, time in sorted(
performance_metrics, key=lambda x: x[1], reverse=True
)[:10]
)
)
# Add the optional endpoints (_upload) # Add the optional endpoints (_upload)
self._add_optional_endpoints() self._add_optional_endpoints()
@ -1130,7 +1142,7 @@ class App(MiddlewareMixin, LifespanMixin):
progress.advance(task) progress.advance(task)
# Store the compile results. # Store the compile results.
compile_results = [] compile_results: list[tuple[str, str]] = []
progress.advance(task) progress.advance(task)
@ -1209,33 +1221,19 @@ class App(MiddlewareMixin, LifespanMixin):
), ),
) )
# Use a forking process pool, if possible. Much faster, especially for large sites. executor = ExecutorType.get_executor_from_environment()
# Fallback to ThreadPoolExecutor as something that will always work.
executor = None
if (
platform.system() in ("Linux", "Darwin")
and (number_of_processes := environment.REFLEX_COMPILE_PROCESSES.get())
is not None
):
executor = concurrent.futures.ProcessPoolExecutor(
max_workers=number_of_processes or None,
mp_context=multiprocessing.get_context("fork"),
)
else:
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=environment.REFLEX_COMPILE_THREADS.get() or None
)
for route, component in zip(self._pages, page_components, strict=True): for route, component in zip(self._pages, page_components, strict=True):
ExecutorSafeFunctions.COMPONENTS[route] = component ExecutorSafeFunctions.COMPONENTS[route] = component
ExecutorSafeFunctions.STATE = self._state ExecutorSafeFunctions.STATE = self._state
with executor: with console.timing("Compile to Javascript"), executor as executor:
result_futures = [] result_futures: list[concurrent.futures.Future[tuple[str, str]]] = []
def _submit_work(fn: Callable, *args, **kwargs): def _submit_work(fn: Callable[..., tuple[str, str]], *args, **kwargs):
f = executor.submit(fn, *args, **kwargs) f = executor.submit(fn, *args, **kwargs)
f.add_done_callback(lambda _: progress.advance(task))
result_futures.append(f) result_futures.append(f)
# Compile the pre-compiled pages. # Compile the pre-compiled pages.
@ -1261,10 +1259,10 @@ class App(MiddlewareMixin, LifespanMixin):
_submit_work(compiler.remove_tailwind_from_postcss) _submit_work(compiler.remove_tailwind_from_postcss)
# Wait for all compilation tasks to complete. # Wait for all compilation tasks to complete.
with console.timing("Compile to Javascript"): compile_results.extend(
for future in concurrent.futures.as_completed(result_futures): future.result()
compile_results.append(future.result()) for future in concurrent.futures.as_completed(result_futures)
progress.advance(task) )
app_root = self._app_root(app_wrappers=app_wrappers) app_root = self._app_root(app_wrappers=app_wrappers)
@ -1289,10 +1287,12 @@ class App(MiddlewareMixin, LifespanMixin):
progress.advance(task) progress.advance(task)
# Compile custom components. # Compile custom components.
*custom_components_result, custom_components_imports = ( (
compiler.compile_components(custom_components) custom_components_output,
) custom_components_result,
compile_results.append(custom_components_result) custom_components_imports,
) = compiler.compile_components(custom_components)
compile_results.append((custom_components_output, custom_components_result))
all_imports.update(custom_components_imports) all_imports.update(custom_components_imports)
progress.advance(task) progress.advance(task)

View File

@ -508,7 +508,7 @@ def compile_tailwind(
The compiled Tailwind config. The compiled Tailwind config.
""" """
# Get the path for the output file. # Get the path for the output file.
output_path = get_web_dir() / constants.Tailwind.CONFIG output_path = str((get_web_dir() / constants.Tailwind.CONFIG).absolute())
# Compile the config. # Compile the config.
code = _compile_tailwind(config) code = _compile_tailwind(config)

View File

@ -2,11 +2,14 @@
from __future__ import annotations from __future__ import annotations
import concurrent.futures
import dataclasses import dataclasses
import enum import enum
import importlib import importlib
import inspect import inspect
import multiprocessing
import os import os
import platform
import sys import sys
import threading import threading
import urllib.parse import urllib.parse
@ -17,6 +20,7 @@ from types import ModuleType
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Any, Any,
Callable,
Dict, Dict,
Generic, Generic,
List, List,
@ -497,6 +501,95 @@ class PerformanceMode(enum.Enum):
OFF = "off" OFF = "off"
class ExecutorType(enum.Enum):
"""Executor for compiling the frontend."""
THREAD = "thread"
PROCESS = "process"
MAIN_THREAD = "main_thread"
@classmethod
def get_executor_from_environment(cls):
"""Get the executor based on the environment variables.
Returns:
The executor.
"""
executor_type = environment.REFLEX_COMPILE_EXECUTOR.get()
reflex_compile_processes = environment.REFLEX_COMPILE_PROCESSES.get()
reflex_compile_threads = environment.REFLEX_COMPILE_THREADS.get()
# By default, use the main thread. Unless the user has specified a different executor.
# Using a process pool is much faster, but not supported on all platforms. It's gated behind a flag.
if executor_type is None:
if (
platform.system() not in ("Linux", "Darwin")
and reflex_compile_processes is not None
):
console.warn("Multiprocessing is only supported on Linux and MacOS.")
if (
platform.system() in ("Linux", "Darwin")
and reflex_compile_processes is not None
):
if reflex_compile_processes == 0:
console.warn(
"Number of processes must be greater than 0. If you want to use the default number of processes, set REFLEX_COMPILE_EXECUTOR to 'process'. Defaulting to None."
)
reflex_compile_processes = None
elif reflex_compile_processes < 0:
console.warn(
"Number of processes must be greater than 0. Defaulting to None."
)
reflex_compile_processes = None
executor_type = ExecutorType.PROCESS
elif reflex_compile_threads is not None:
if reflex_compile_threads == 0:
console.warn(
"Number of threads must be greater than 0. If you want to use the default number of threads, set REFLEX_COMPILE_EXECUTOR to 'thread'. Defaulting to None."
)
reflex_compile_threads = None
elif reflex_compile_threads < 0:
console.warn(
"Number of threads must be greater than 0. Defaulting to None."
)
reflex_compile_threads = None
executor_type = ExecutorType.THREAD
else:
executor_type = ExecutorType.MAIN_THREAD
match executor_type:
case ExecutorType.PROCESS:
executor = concurrent.futures.ProcessPoolExecutor(
max_workers=reflex_compile_processes,
mp_context=multiprocessing.get_context("fork"),
)
case ExecutorType.THREAD:
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=reflex_compile_threads
)
case ExecutorType.MAIN_THREAD:
FUTURE_RESULT_TYPE = TypeVar("FUTURE_RESULT_TYPE")
class MainThreadExecutor:
def __enter__(self):
return self
def __exit__(self, *args):
pass
def submit(
self, fn: Callable[..., FUTURE_RESULT_TYPE], *args, **kwargs
) -> concurrent.futures.Future[FUTURE_RESULT_TYPE]:
future_job = concurrent.futures.Future()
future_job.set_result(fn(*args, **kwargs))
return future_job
executor = MainThreadExecutor()
return executor
class EnvironmentVariables: class EnvironmentVariables:
"""Environment variables class to instantiate environment variables.""" """Environment variables class to instantiate environment variables."""
@ -538,6 +631,8 @@ class EnvironmentVariables:
Path(constants.Dirs.UPLOADED_FILES) Path(constants.Dirs.UPLOADED_FILES)
) )
REFLEX_COMPILE_EXECUTOR: EnvVar[Optional[ExecutorType]] = env_var(None)
# Whether to use separate processes to compile the frontend and how many. If not set, defaults to thread executor. # Whether to use separate processes to compile the frontend and how many. If not set, defaults to thread executor.
REFLEX_COMPILE_PROCESSES: EnvVar[Optional[int]] = env_var(None) REFLEX_COMPILE_PROCESSES: EnvVar[Optional[int]] = env_var(None)