Revert "use process pool to compile faster (#2377)" (#2434)

This commit is contained in:
Nikhil Rao 2024-01-23 03:56:07 +07:00 committed by GitHub
parent b305f895a8
commit f513f4c089
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 90 additions and 222 deletions

View File

@ -6,9 +6,7 @@ import concurrent.futures
import contextlib
import copy
import functools
import multiprocessing
import os
import platform
from typing import (
Any,
AsyncIterator,
@ -37,7 +35,6 @@ from reflex.admin import AdminDash
from reflex.base import Base
from reflex.compiler import compiler
from reflex.compiler import utils as compiler_utils
from reflex.compiler.compiler import ExecutorSafeFunctions
from reflex.components import connection_modal
from reflex.components.base.app_wrap import AppWrap
from reflex.components.base.fragment import Fragment
@ -663,24 +660,15 @@ class App(Base):
TimeElapsedColumn(),
)
# try to be somewhat accurate - but still not 100%
adhoc_steps_without_executor = 6
fixed_pages_within_executor = 7
progress.start()
task = progress.add_task(
"Compiling:",
total=len(self.pages)
+ fixed_pages_within_executor
+ adhoc_steps_without_executor,
)
# Get the env mode.
config = get_config()
# Store the compile results.
compile_results = []
# Compile the pages in parallel.
custom_components = set()
# TODO Anecdotally, processes=2 works 10% faster (cpu_count=12)
all_imports = {}
app_wrappers: Dict[tuple[int, str], Component] = {
# Default app wrap component renders {children}
@ -690,137 +678,127 @@ class App(Base):
# If a theme component was provided, wrap the app with it
app_wrappers[(20, "Theme")] = self.theme
progress.advance(task)
with progress, concurrent.futures.ThreadPoolExecutor() as thread_pool:
fixed_pages = 7
task = progress.add_task("Compiling:", total=len(self.pages) + fixed_pages)
for _route, component in self.pages.items():
# Merge the component style with the app style.
component.add_style(self.style)
component.apply_theme(self.theme)
# Add component.get_imports() to all_imports.
all_imports.update(component.get_imports())
# Add the app wrappers from this component.
app_wrappers.update(component.get_app_wrap_components())
# Add the custom components from the page to the set.
custom_components |= component.get_custom_components()
progress.advance(task)
# Perform auto-memoization of stateful components.
(
stateful_components_path,
stateful_components_code,
page_components,
) = compiler.compile_stateful_components(self.pages.values())
progress.advance(task)
# Catch "static" apps (that do not define a rx.State subclass) which are trying to access rx.State.
if code_uses_state_contexts(stateful_components_code) and self.state is None:
raise RuntimeError(
"To access rx.State in frontend components, at least one "
"subclass of rx.State must be defined in the app."
)
compile_results.append((stateful_components_path, stateful_components_code))
app_root = self._app_root(app_wrappers=app_wrappers)
progress.advance(task)
# Prepopulate the global ExecutorSafeFunctions class with input data required by the compile functions.
# This is required for multiprocessing to work, in presence of non-picklable inputs.
for route, component in zip(self.pages, page_components):
ExecutorSafeFunctions.COMPILE_PAGE_ARGS_BY_ROUTE[route] = (
route,
component,
self.state,
)
ExecutorSafeFunctions.COMPILE_APP_APP_ROOT = app_root
ExecutorSafeFunctions.CUSTOM_COMPONENTS = custom_components
ExecutorSafeFunctions.HEAD_COMPONENTS = self.head_components
ExecutorSafeFunctions.STYLE = self.style
ExecutorSafeFunctions.STATE = self.state
# Use a forking process pool, if possible. Much faster, especially for large sites.
# Fallback to ThreadPoolExecutor as something that will always work.
executor = None
if platform.system() in ("Linux", "Darwin"):
executor = concurrent.futures.ProcessPoolExecutor(
mp_context=multiprocessing.get_context("fork")
)
else:
executor = concurrent.futures.ThreadPoolExecutor()
with executor:
result_futures = []
def _mark_complete(_=None):
def mark_complete(_=None):
progress.advance(task)
def _submit_work(fn, *args, **kwargs):
f = executor.submit(fn, *args, **kwargs)
f.add_done_callback(_mark_complete)
for _route, component in self.pages.items():
# Merge the component style with the app style.
component.add_style(self.style)
component.apply_theme(self.theme)
# Add component.get_imports() to all_imports.
all_imports.update(component.get_imports())
# Add the app wrappers from this component.
app_wrappers.update(component.get_app_wrap_components())
# Add the custom components from the page to the set.
custom_components |= component.get_custom_components()
# Perform auto-memoization of stateful components.
(
stateful_components_path,
stateful_components_code,
page_components,
) = compiler.compile_stateful_components(self.pages.values())
# Catch "static" apps (that do not define a rx.State subclass) which are trying to access rx.State.
if (
code_uses_state_contexts(stateful_components_code)
and self.state is None
):
raise RuntimeError(
"To access rx.State in frontend components, at least one "
"subclass of rx.State must be defined in the app."
)
compile_results.append((stateful_components_path, stateful_components_code))
result_futures = []
def submit_work(fn, *args, **kwargs):
"""Submit work to the thread pool and add a callback to mark the task as complete.
The Future will be added to the `result_futures` list.
Args:
fn: The function to submit.
*args: The args to submit.
**kwargs: The kwargs to submit.
"""
f = thread_pool.submit(fn, *args, **kwargs)
f.add_done_callback(mark_complete)
result_futures.append(f)
# Compile all page components.
for route in self.pages:
_submit_work(ExecutorSafeFunctions.compile_page, route)
for route, component in zip(self.pages, page_components):
submit_work(
compiler.compile_page,
route,
component,
self.state,
)
# Compile the app wrapper.
_submit_work(ExecutorSafeFunctions.compile_app)
app_root = self._app_root(app_wrappers=app_wrappers)
submit_work(compiler.compile_app, app_root)
# Compile the custom components.
_submit_work(ExecutorSafeFunctions.compile_custom_components)
submit_work(compiler.compile_components, custom_components)
# Compile the root stylesheet with base styles.
_submit_work(compiler.compile_root_stylesheet, self.stylesheets)
submit_work(compiler.compile_root_stylesheet, self.stylesheets)
# Compile the root document.
_submit_work(ExecutorSafeFunctions.compile_document_root)
submit_work(compiler.compile_document_root, self.head_components)
# Compile the theme.
_submit_work(ExecutorSafeFunctions.compile_theme)
submit_work(compiler.compile_theme, style=self.style)
# Compile the contexts.
_submit_work(ExecutorSafeFunctions.compile_contexts)
submit_work(compiler.compile_contexts, self.state)
# Compile the Tailwind config.
if config.tailwind is not None:
config.tailwind["content"] = config.tailwind.get(
"content", constants.Tailwind.CONTENT
)
_submit_work(compiler.compile_tailwind, config.tailwind)
submit_work(compiler.compile_tailwind, config.tailwind)
else:
_submit_work(compiler.remove_tailwind_from_postcss)
submit_work(compiler.remove_tailwind_from_postcss)
# Get imports from AppWrap components.
all_imports.update(app_root.get_imports())
# Iterate through all the custom components and add their imports to the all_imports.
for component in custom_components:
all_imports.update(component.get_imports())
# Wait for all compilation tasks to complete.
for future in concurrent.futures.as_completed(result_futures):
compile_results.append(future.result())
# Get imports from AppWrap components.
all_imports.update(app_root.get_imports())
# Empty the .web pages directory.
compiler.purge_web_pages_dir()
# Iterate through all the custom components and add their imports to the all_imports.
for component in custom_components:
all_imports.update(component.get_imports())
# Avoid flickering when installing frontend packages
progress.stop()
progress.advance(task)
# Install frontend packages.
self.get_frontend_packages(all_imports)
# Empty the .web pages directory.
compiler.purge_web_pages_dir()
progress.advance(task)
progress.stop()
# Install frontend packages.
self.get_frontend_packages(all_imports)
for output_path, code in compile_results:
compiler_utils.write_page(output_path, code)
# Write the pages at the end to trigger the NextJS hot reload only once.
write_page_futures = []
for output_path, code in compile_results:
write_page_futures.append(
thread_pool.submit(compiler_utils.write_page, output_path, code)
)
for future in concurrent.futures.as_completed(write_page_futures):
future.result()
@contextlib.asynccontextmanager
async def modify_state(self, token: str) -> AsyncIterator[BaseState]:

View File

@ -454,113 +454,3 @@ def remove_tailwind_from_postcss() -> tuple[str, str]:
def purge_web_pages_dir():
"""Empty out .web directory."""
utils.empty_dir(constants.Dirs.WEB_PAGES, keep_files=["_app.js"])
class ExecutorSafeFunctions:
"""Helper class to allow parallelisation of parts of the compilation process.
This class (and its class attributes) are available at global scope.
In a multiprocessing context (like when using a ProcessPoolExecutor), the content of this
global class is logically replicated to any FORKED process.
How it works:
* Before the child process is forked, ensure that we stash any input data required by any future
function call in the child process.
* After the child process is forked, the child process will have a copy of the global class, which
includes the previously stashed input data.
* Any task submitted to the child process simply needs a way to communicate which input data the
requested function call requires.
Why do we need this? Passing input data directly to child process often not possible because the input data is not picklable.
The mechanic described here removes the need to pickle the input data at all.
Limitations:
* This can never support returning unpicklable OUTPUT data.
* Any object mutations done by the child process will not propagate back to the parent process (fork goes one way!).
"""
COMPILE_PAGE_ARGS_BY_ROUTE = {}
COMPILE_APP_APP_ROOT: Component | None = None
CUSTOM_COMPONENTS: set[CustomComponent] | None = None
HEAD_COMPONENTS: list[Component] | None = None
STYLE: ComponentStyle | None = None
STATE: type[BaseState] | None = None
@classmethod
def compile_page(cls, route: str):
"""Compile a page.
Args:
route: The route of the page to compile.
Returns:
The path and code of the compiled page.
"""
return compile_page(*cls.COMPILE_PAGE_ARGS_BY_ROUTE[route])
@classmethod
def compile_app(cls):
"""Compile the app.
Returns:
The path and code of the compiled app.
Raises:
ValueError: If the app root is not set.
"""
if cls.COMPILE_APP_APP_ROOT is None:
raise ValueError("COMPILE_APP_APP_ROOT should be set")
return compile_app(cls.COMPILE_APP_APP_ROOT)
@classmethod
def compile_custom_components(cls):
"""Compile the custom components.
Returns:
The path and code of the compiled custom components.
Raises:
ValueError: If the custom components are not set.
"""
if cls.CUSTOM_COMPONENTS is None:
raise ValueError("CUSTOM_COMPONENTS should be set")
return compile_components(cls.CUSTOM_COMPONENTS)
@classmethod
def compile_document_root(cls):
"""Compile the document root.
Returns:
The path and code of the compiled document root.
Raises:
ValueError: If the head components are not set.
"""
if cls.HEAD_COMPONENTS is None:
raise ValueError("HEAD_COMPONENTS should be set")
return compile_document_root(cls.HEAD_COMPONENTS)
@classmethod
def compile_theme(cls):
"""Compile the theme.
Returns:
The path and code of the compiled theme.
Raises:
ValueError: If the style is not set.
"""
if cls.STYLE is None:
raise ValueError("STYLE should be set")
return compile_theme(cls.STYLE)
@classmethod
def compile_contexts(cls):
"""Compile the contexts.
Returns:
The path and code of the compiled contexts.
"""
return compile_contexts(cls.STATE)