move executor to config

This commit is contained in:
Khaleel Al-Adhami 2025-02-13 17:24:34 -08:00
parent d1756e2888
commit 95b5227d8b
2 changed files with 86 additions and 74 deletions

View File

@ -11,8 +11,6 @@ import functools
import inspect import inspect
import io import io
import json import json
import multiprocessing
import platform
import sys import sys
import traceback import traceback
from datetime import datetime from datetime import datetime
@ -31,7 +29,6 @@ from typing import (
Optional, Optional,
Set, Set,
Type, Type,
TypeVar,
Union, Union,
get_args, get_args,
get_type_hints, get_type_hints,
@ -1171,77 +1168,7 @@ class App(MiddlewareMixin, LifespanMixin):
), ),
) )
executor_type = environment.REFLEX_COMPILE_EXECUTOR.get() executor = ExecutorType.get_executor_from_environment()
reflex_compile_processes = environment.REFLEX_COMPILE_PROCESSES.get()
reflex_compile_threads = environment.REFLEX_COMPILE_THREADS.get()
# By default, use the main thread. Unless the user has specified a different executor.
# Using a process pool is much faster, but not supported on all platforms. It's gated behind a flag.
if executor_type is None:
if (
platform.system() not in ("Linux", "Darwin")
and reflex_compile_processes is not None
):
console.warn("Multiprocessing is only supported on Linux and MacOS.")
if (
platform.system() in ("Linux", "Darwin")
and reflex_compile_processes is not None
):
if reflex_compile_processes == 0:
console.warn(
"Number of processes must be greater than 0. If you want to use the default number of processes, set REFLEX_COMPILE_EXECUTOR to 'process'. Defaulting to None."
)
reflex_compile_processes = None
elif reflex_compile_processes < 0:
console.warn(
"Number of processes must be greater than 0. Defaulting to None."
)
reflex_compile_processes = None
executor_type = ExecutorType.PROCESS
elif reflex_compile_threads is not None:
if reflex_compile_threads == 0:
console.warn(
"Number of threads must be greater than 0. If you want to use the default number of threads, set REFLEX_COMPILE_EXECUTOR to 'thread'. Defaulting to None."
)
reflex_compile_threads = None
elif reflex_compile_threads < 0:
console.warn(
"Number of threads must be greater than 0. Defaulting to None."
)
reflex_compile_threads = None
executor_type = ExecutorType.THREAD
else:
executor_type = ExecutorType.MAIN_THREAD
match executor_type:
case ExecutorType.PROCESS:
executor = concurrent.futures.ProcessPoolExecutor(
max_workers=reflex_compile_processes,
mp_context=multiprocessing.get_context("fork"),
)
case ExecutorType.THREAD:
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=reflex_compile_threads
)
case ExecutorType.MAIN_THREAD:
T = TypeVar("T")
class MainThreadExecutor:
def __enter__(self):
return self
def __exit__(self, *args):
pass
def submit(
self, fn: Callable[..., T], *args, **kwargs
) -> concurrent.futures.Future[T]:
future_job = concurrent.futures.Future()
future_job.set_result(fn(*args, **kwargs))
return future_job
executor = MainThreadExecutor()
for route, component in zip(self._pages, page_components, strict=True): for route, component in zip(self._pages, page_components, strict=True):
ExecutorSafeFunctions.COMPONENTS[route] = component ExecutorSafeFunctions.COMPONENTS[route] = component

View File

@ -2,11 +2,14 @@
from __future__ import annotations from __future__ import annotations
import concurrent.futures
import dataclasses import dataclasses
import enum import enum
import importlib import importlib
import inspect import inspect
import multiprocessing
import os import os
import platform
import sys import sys
import threading import threading
import urllib.parse import urllib.parse
@ -16,6 +19,7 @@ from types import ModuleType
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
Any, Any,
Callable,
Dict, Dict,
Generic, Generic,
List, List,
@ -488,6 +492,87 @@ class ExecutorType(enum.Enum):
PROCESS = "process" PROCESS = "process"
MAIN_THREAD = "main_thread" MAIN_THREAD = "main_thread"
@classmethod
def get_executor_from_environment(cls):
"""Get the executor based on the environment variables.
Returns:
The executor.
"""
executor_type = environment.REFLEX_COMPILE_EXECUTOR.get()
reflex_compile_processes = environment.REFLEX_COMPILE_PROCESSES.get()
reflex_compile_threads = environment.REFLEX_COMPILE_THREADS.get()
# By default, use the main thread. Unless the user has specified a different executor.
# Using a process pool is much faster, but not supported on all platforms. It's gated behind a flag.
if executor_type is None:
if (
platform.system() not in ("Linux", "Darwin")
and reflex_compile_processes is not None
):
console.warn("Multiprocessing is only supported on Linux and MacOS.")
if (
platform.system() in ("Linux", "Darwin")
and reflex_compile_processes is not None
):
if reflex_compile_processes == 0:
console.warn(
"Number of processes must be greater than 0. If you want to use the default number of processes, set REFLEX_COMPILE_EXECUTOR to 'process'. Defaulting to None."
)
reflex_compile_processes = None
elif reflex_compile_processes < 0:
console.warn(
"Number of processes must be greater than 0. Defaulting to None."
)
reflex_compile_processes = None
executor_type = ExecutorType.PROCESS
elif reflex_compile_threads is not None:
if reflex_compile_threads == 0:
console.warn(
"Number of threads must be greater than 0. If you want to use the default number of threads, set REFLEX_COMPILE_EXECUTOR to 'thread'. Defaulting to None."
)
reflex_compile_threads = None
elif reflex_compile_threads < 0:
console.warn(
"Number of threads must be greater than 0. Defaulting to None."
)
reflex_compile_threads = None
executor_type = ExecutorType.THREAD
else:
executor_type = ExecutorType.MAIN_THREAD
match executor_type:
case ExecutorType.PROCESS:
executor = concurrent.futures.ProcessPoolExecutor(
max_workers=reflex_compile_processes,
mp_context=multiprocessing.get_context("fork"),
)
case ExecutorType.THREAD:
executor = concurrent.futures.ThreadPoolExecutor(
max_workers=reflex_compile_threads
)
case ExecutorType.MAIN_THREAD:
FUTURE_RESULT_TYPE = TypeVar("FUTURE_RESULT_TYPE")
class MainThreadExecutor:
def __enter__(self):
return self
def __exit__(self, *args):
pass
def submit(
self, fn: Callable[..., FUTURE_RESULT_TYPE], *args, **kwargs
) -> concurrent.futures.Future[FUTURE_RESULT_TYPE]:
future_job = concurrent.futures.Future()
future_job.set_result(fn(*args, **kwargs))
return future_job
executor = MainThreadExecutor()
return executor
class EnvironmentVariables: class EnvironmentVariables:
"""Environment variables class to instantiate environment variables.""" """Environment variables class to instantiate environment variables."""