From 95b5227d8b7ccf449bddbe4b40b45adaa474a6cb Mon Sep 17 00:00:00 2001 From: Khaleel Al-Adhami Date: Thu, 13 Feb 2025 17:24:34 -0800 Subject: [PATCH] move executor to config --- reflex/app.py | 75 +----------------------------------------- reflex/config.py | 85 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 74 deletions(-) diff --git a/reflex/app.py b/reflex/app.py index 5b7dc54e6..16d20eb4a 100644 --- a/reflex/app.py +++ b/reflex/app.py @@ -11,8 +11,6 @@ import functools import inspect import io import json -import multiprocessing -import platform import sys import traceback from datetime import datetime @@ -31,7 +29,6 @@ from typing import ( Optional, Set, Type, - TypeVar, Union, get_args, get_type_hints, @@ -1171,77 +1168,7 @@ class App(MiddlewareMixin, LifespanMixin): ), ) - executor_type = environment.REFLEX_COMPILE_EXECUTOR.get() - - reflex_compile_processes = environment.REFLEX_COMPILE_PROCESSES.get() - reflex_compile_threads = environment.REFLEX_COMPILE_THREADS.get() - # By default, use the main thread. Unless the user has specified a different executor. - # Using a process pool is much faster, but not supported on all platforms. It's gated behind a flag. - if executor_type is None: - if ( - platform.system() not in ("Linux", "Darwin") - and reflex_compile_processes is not None - ): - console.warn("Multiprocessing is only supported on Linux and MacOS.") - - if ( - platform.system() in ("Linux", "Darwin") - and reflex_compile_processes is not None - ): - if reflex_compile_processes == 0: - console.warn( - "Number of processes must be greater than 0. If you want to use the default number of processes, set REFLEX_COMPILE_EXECUTOR to 'process'. Defaulting to None." - ) - reflex_compile_processes = None - elif reflex_compile_processes < 0: - console.warn( - "Number of processes must be greater than 0. Defaulting to None." - ) - reflex_compile_processes = None - executor_type = ExecutorType.PROCESS - elif reflex_compile_threads is not None: - if reflex_compile_threads == 0: - console.warn( - "Number of threads must be greater than 0. If you want to use the default number of threads, set REFLEX_COMPILE_EXECUTOR to 'thread'. Defaulting to None." - ) - reflex_compile_threads = None - elif reflex_compile_threads < 0: - console.warn( - "Number of threads must be greater than 0. Defaulting to None." - ) - reflex_compile_threads = None - executor_type = ExecutorType.THREAD - else: - executor_type = ExecutorType.MAIN_THREAD - - match executor_type: - case ExecutorType.PROCESS: - executor = concurrent.futures.ProcessPoolExecutor( - max_workers=reflex_compile_processes, - mp_context=multiprocessing.get_context("fork"), - ) - case ExecutorType.THREAD: - executor = concurrent.futures.ThreadPoolExecutor( - max_workers=reflex_compile_threads - ) - case ExecutorType.MAIN_THREAD: - T = TypeVar("T") - - class MainThreadExecutor: - def __enter__(self): - return self - - def __exit__(self, *args): - pass - - def submit( - self, fn: Callable[..., T], *args, **kwargs - ) -> concurrent.futures.Future[T]: - future_job = concurrent.futures.Future() - future_job.set_result(fn(*args, **kwargs)) - return future_job - - executor = MainThreadExecutor() + executor = ExecutorType.get_executor_from_environment() for route, component in zip(self._pages, page_components, strict=True): ExecutorSafeFunctions.COMPONENTS[route] = component diff --git a/reflex/config.py b/reflex/config.py index 2ae19cd72..abe46bdd5 100644 --- a/reflex/config.py +++ b/reflex/config.py @@ -2,11 +2,14 @@ from __future__ import annotations +import concurrent.futures import dataclasses import enum import importlib import inspect +import multiprocessing import os +import platform import sys import threading import urllib.parse @@ -16,6 +19,7 @@ from types import ModuleType from typing import ( TYPE_CHECKING, Any, + Callable, Dict, Generic, List, @@ -488,6 +492,87 @@ class ExecutorType(enum.Enum): PROCESS = "process" MAIN_THREAD = "main_thread" + @classmethod + def get_executor_from_environment(cls): + """Get the executor based on the environment variables. + + Returns: + The executor. + """ + executor_type = environment.REFLEX_COMPILE_EXECUTOR.get() + + reflex_compile_processes = environment.REFLEX_COMPILE_PROCESSES.get() + reflex_compile_threads = environment.REFLEX_COMPILE_THREADS.get() + # By default, use the main thread. Unless the user has specified a different executor. + # Using a process pool is much faster, but not supported on all platforms. It's gated behind a flag. + if executor_type is None: + if ( + platform.system() not in ("Linux", "Darwin") + and reflex_compile_processes is not None + ): + console.warn("Multiprocessing is only supported on Linux and MacOS.") + + if ( + platform.system() in ("Linux", "Darwin") + and reflex_compile_processes is not None + ): + if reflex_compile_processes == 0: + console.warn( + "Number of processes must be greater than 0. If you want to use the default number of processes, set REFLEX_COMPILE_EXECUTOR to 'process'. Defaulting to None." + ) + reflex_compile_processes = None + elif reflex_compile_processes < 0: + console.warn( + "Number of processes must be greater than 0. Defaulting to None." + ) + reflex_compile_processes = None + executor_type = ExecutorType.PROCESS + elif reflex_compile_threads is not None: + if reflex_compile_threads == 0: + console.warn( + "Number of threads must be greater than 0. If you want to use the default number of threads, set REFLEX_COMPILE_EXECUTOR to 'thread'. Defaulting to None." + ) + reflex_compile_threads = None + elif reflex_compile_threads < 0: + console.warn( + "Number of threads must be greater than 0. Defaulting to None." + ) + reflex_compile_threads = None + executor_type = ExecutorType.THREAD + else: + executor_type = ExecutorType.MAIN_THREAD + + match executor_type: + case ExecutorType.PROCESS: + executor = concurrent.futures.ProcessPoolExecutor( + max_workers=reflex_compile_processes, + mp_context=multiprocessing.get_context("fork"), + ) + case ExecutorType.THREAD: + executor = concurrent.futures.ThreadPoolExecutor( + max_workers=reflex_compile_threads + ) + case ExecutorType.MAIN_THREAD: + FUTURE_RESULT_TYPE = TypeVar("FUTURE_RESULT_TYPE") + + class MainThreadExecutor: + def __enter__(self): + return self + + def __exit__(self, *args): + pass + + def submit( + self, fn: Callable[..., FUTURE_RESULT_TYPE], *args, **kwargs + ) -> concurrent.futures.Future[FUTURE_RESULT_TYPE]: + future_job = concurrent.futures.Future() + future_job.set_result(fn(*args, **kwargs)) + return future_job + + executor = MainThreadExecutor() + + return executor + class EnvironmentVariables: """Environment variables class to instantiate environment variables."""