add typing in reflex/utils

This commit is contained in:
Lendemor 2024-11-20 20:13:16 +01:00
parent 4a65a97c21
commit dd9a8d8ef7
12 changed files with 76 additions and 57 deletions

View File

@ -150,8 +150,8 @@ def _run(
env: constants.Env = constants.Env.DEV, env: constants.Env = constants.Env.DEV,
frontend: bool = True, frontend: bool = True,
backend: bool = True, backend: bool = True,
frontend_port: str = str(config.frontend_port), frontend_port: int = config.frontend_port,
backend_port: str = str(config.backend_port), backend_port: int = config.backend_port,
backend_host: str = config.backend_host, backend_host: str = config.backend_host,
loglevel: constants.LogLevel = config.loglevel, loglevel: constants.LogLevel = config.loglevel,
): ):
@ -185,18 +185,22 @@ def _run(
# Find the next available open port if applicable. # Find the next available open port if applicable.
if frontend: if frontend:
frontend_port = processes.handle_port( frontend_port = processes.handle_port(
"frontend", frontend_port, str(constants.DefaultPorts.FRONTEND_PORT) "frontend",
frontend_port,
constants.DefaultPorts.FRONTEND_PORT,
) )
if backend: if backend:
backend_port = processes.handle_port( backend_port = processes.handle_port(
"backend", backend_port, str(constants.DefaultPorts.BACKEND_PORT) "backend",
backend_port,
constants.DefaultPorts.BACKEND_PORT,
) )
# Apply the new ports to the config. # Apply the new ports to the config.
if frontend_port != str(config.frontend_port): if frontend_port != config.frontend_port:
config._set_persistent(frontend_port=frontend_port) config._set_persistent(frontend_port=frontend_port)
if backend_port != str(config.backend_port): if backend_port != config.backend_port:
config._set_persistent(backend_port=backend_port) config._set_persistent(backend_port=backend_port)
# Reload the config to make sure the env vars are persistent. # Reload the config to make sure the env vars are persistent.
@ -287,10 +291,10 @@ def run(
help="Execute only backend.", help="Execute only backend.",
envvar=environment.REFLEX_BACKEND_ONLY.name, envvar=environment.REFLEX_BACKEND_ONLY.name,
), ),
frontend_port: str = typer.Option( frontend_port: int = typer.Option(
config.frontend_port, help="Specify a different frontend port." config.frontend_port, help="Specify a different frontend port."
), ),
backend_port: str = typer.Option( backend_port: int = typer.Option(
config.backend_port, help="Specify a different backend port." config.backend_port, help="Specify a different backend port."
), ),
backend_host: str = typer.Option( backend_host: str = typer.Option(

View File

@ -23,7 +23,7 @@ def set_env_json():
) )
def generate_sitemap_config(deploy_url: str, export=False): def generate_sitemap_config(deploy_url: str, export: bool = False):
"""Generate the sitemap config file. """Generate the sitemap config file.
Args: Args:

View File

@ -2,6 +2,7 @@
import contextlib import contextlib
import sys import sys
from typing import Any
async def windows_hot_reload_lifespan_hack(): async def windows_hot_reload_lifespan_hack():
@ -74,7 +75,7 @@ with pydantic_v1_patch():
import sqlmodel as sqlmodel import sqlmodel as sqlmodel
def sqlmodel_field_has_primary_key(field) -> bool: def sqlmodel_field_has_primary_key(field: Any) -> bool:
"""Determines if a field is a priamary. """Determines if a field is a priamary.
Args: Args:

View File

@ -71,7 +71,7 @@ def notify_backend():
# run_process_and_launch_url is assumed to be used # run_process_and_launch_url is assumed to be used
# only to launch the frontend # only to launch the frontend
# If this is not the case, might have to change the logic # If this is not the case, might have to change the logic
def run_process_and_launch_url(run_command: list[str], backend_present=True): def run_process_and_launch_url(run_command: list[str], backend_present: bool = True):
"""Run the process and launch the URL. """Run the process and launch the URL.
Args: Args:
@ -134,7 +134,7 @@ def run_process_and_launch_url(run_command: list[str], backend_present=True):
break # while True break # while True
def run_frontend(root: Path, port: str, backend_present=True): def run_frontend(root: Path, port: str, backend_present: bool = True):
"""Run the frontend. """Run the frontend.
Args: Args:
@ -156,7 +156,7 @@ def run_frontend(root: Path, port: str, backend_present=True):
) )
def run_frontend_prod(root: Path, port: str, backend_present=True): def run_frontend_prod(root: Path, port: str, backend_present: bool = True):
"""Run the frontend. """Run the frontend.
Args: Args:
@ -238,7 +238,7 @@ def run_backend(
run_uvicorn_backend(host, port, loglevel) run_uvicorn_backend(host, port, loglevel)
def run_uvicorn_backend(host, port, loglevel: LogLevel): def run_uvicorn_backend(host: str, port: int, loglevel: LogLevel):
"""Run the backend in development mode using Uvicorn. """Run the backend in development mode using Uvicorn.
Args: Args:
@ -258,7 +258,7 @@ def run_uvicorn_backend(host, port, loglevel: LogLevel):
) )
def run_granian_backend(host, port, loglevel: LogLevel): def run_granian_backend(host: str, port: int, loglevel: LogLevel):
"""Run the backend in development mode using Granian. """Run the backend in development mode using Granian.
Args: Args:
@ -323,7 +323,7 @@ def run_backend_prod(
run_uvicorn_backend_prod(host, port, loglevel) run_uvicorn_backend_prod(host, port, loglevel)
def run_uvicorn_backend_prod(host, port, loglevel): def run_uvicorn_backend_prod(host: str, port: int, loglevel: LogLevel):
"""Run the backend in production mode using Uvicorn. """Run the backend in production mode using Uvicorn.
Args: Args:
@ -375,7 +375,7 @@ def run_uvicorn_backend_prod(host, port, loglevel):
) )
def run_granian_backend_prod(host, port, loglevel): def run_granian_backend_prod(host: str, port: int, loglevel: LogLevel):
"""Run the backend in production mode using Granian. """Run the backend in production mode using Granian.
Args: Args:

View File

@ -221,7 +221,7 @@ def _escape_js_string(string: str) -> str:
""" """
# TODO: we may need to re-vist this logic after new Var API is implemented. # TODO: we may need to re-vist this logic after new Var API is implemented.
def escape_outside_segments(segment): def escape_outside_segments(segment: str):
"""Escape backticks in segments outside of `${}`. """Escape backticks in segments outside of `${}`.
Args: Args:
@ -284,7 +284,7 @@ def format_var(var: Var) -> str:
return str(var) return str(var)
def format_route(route: str, format_case=True) -> str: def format_route(route: str, format_case: bool = True) -> str:
"""Format the given route. """Format the given route.
Args: Args:

View File

@ -5,7 +5,11 @@ import copy
import lazy_loader as lazy import lazy_loader as lazy
def attach(package_name, submodules=None, submod_attrs=None): def attach(
package_name: str,
submodules: set | None = None,
submod_attrs: dict | None = None,
):
"""Replaces a package's __getattr__, __dir__, and __all__ attributes using lazy.attach. """Replaces a package's __getattr__, __dir__, and __all__ attributes using lazy.attach.
The lazy loader __getattr__ doesn't support tuples as list values. We needed to add The lazy loader __getattr__ doesn't support tuples as list values. We needed to add
this functionality (tuples) in Reflex to support 'import as _' statements. This function this functionality (tuples) in Reflex to support 'import as _' statements. This function

View File

@ -666,7 +666,9 @@ def init_reflex_json(project_hash: int | None):
path_ops.update_json_file(get_web_dir() / constants.Reflex.JSON, reflex_json) path_ops.update_json_file(get_web_dir() / constants.Reflex.JSON, reflex_json)
def update_next_config(export=False, transpile_packages: Optional[List[str]] = None): def update_next_config(
export: bool = False, transpile_packages: Optional[List[str]] = None
):
"""Update Next.js config from Reflex config. """Update Next.js config from Reflex config.
Args: Args:
@ -908,7 +910,7 @@ def cached_procedure(cache_file: str, payload_fn: Callable[..., str]):
The decorated function. The decorated function.
""" """
def _inner_decorator(func): def _inner_decorator(func: Callable):
def _inner(*args, **kwargs): def _inner(*args, **kwargs):
payload = _read_cached_procedure_file(cache_file) payload = _read_cached_procedure_file(cache_file)
new_payload = payload_fn(*args, **kwargs) new_payload = payload_fn(*args, **kwargs)
@ -1090,7 +1092,7 @@ def validate_bun():
raise typer.Exit(1) raise typer.Exit(1)
def validate_frontend_dependencies(init=True): def validate_frontend_dependencies(init: bool = True):
"""Validate frontend dependencies to ensure they meet requirements. """Validate frontend dependencies to ensure they meet requirements.
Args: Args:
@ -1477,7 +1479,7 @@ def initialize_main_module_index_from_generation(app_name: str, generation_hash:
) )
render_func_name = defined_funcs[-1] render_func_name = defined_funcs[-1]
def replace_content(_match): def replace_content(_match: re.Match) -> str:
return "\n".join( return "\n".join(
[ [
resp.text, resp.text,
@ -1507,7 +1509,7 @@ def initialize_main_module_index_from_generation(app_name: str, generation_hash:
main_module_path.write_text(main_module_code) main_module_path.write_text(main_module_code)
def format_address_width(address_width) -> int | None: def format_address_width(address_width: str | None) -> int | None:
"""Cast address width to an int. """Cast address width to an int.
Args: Args:

View File

@ -15,12 +15,13 @@ from typing import Callable, Generator, List, Optional, Tuple, Union
import psutil import psutil
import typer import typer
from redis.exceptions import RedisError from redis.exceptions import RedisError
from rich.progress import Progress
from reflex import constants from reflex import constants
from reflex.utils import console, path_ops, prerequisites from reflex.utils import console, path_ops, prerequisites
def kill(pid): def kill(pid: int):
"""Kill a process. """Kill a process.
Args: Args:
@ -48,7 +49,7 @@ def get_num_workers() -> int:
return (os.cpu_count() or 1) * 2 + 1 return (os.cpu_count() or 1) * 2 + 1
def get_process_on_port(port) -> Optional[psutil.Process]: def get_process_on_port(port: int) -> Optional[psutil.Process]:
"""Get the process on the given port. """Get the process on the given port.
Args: Args:
@ -71,7 +72,7 @@ def get_process_on_port(port) -> Optional[psutil.Process]:
return None return None
def is_process_on_port(port) -> bool: def is_process_on_port(port: int) -> bool:
"""Check if a process is running on the given port. """Check if a process is running on the given port.
Args: Args:
@ -83,7 +84,7 @@ def is_process_on_port(port) -> bool:
return get_process_on_port(port) is not None return get_process_on_port(port) is not None
def kill_process_on_port(port): def kill_process_on_port(port: int):
"""Kill the process on the given port. """Kill the process on the given port.
Args: Args:
@ -94,7 +95,7 @@ def kill_process_on_port(port):
get_process_on_port(port).kill() # type: ignore get_process_on_port(port).kill() # type: ignore
def change_port(port: str, _type: str) -> str: def change_port(port: int, _type: str) -> int:
"""Change the port. """Change the port.
Args: Args:
@ -105,7 +106,7 @@ def change_port(port: str, _type: str) -> str:
The new port. The new port.
""" """
new_port = str(int(port) + 1) new_port = port + 1
if is_process_on_port(new_port): if is_process_on_port(new_port):
return change_port(new_port, _type) return change_port(new_port, _type)
console.info( console.info(
@ -114,7 +115,7 @@ def change_port(port: str, _type: str) -> str:
return new_port return new_port
def handle_port(service_name: str, port: str, default_port: str) -> str: def handle_port(service_name: str, port: int, default_port: int) -> int:
"""Change port if the specified port is in use and is not explicitly specified as a CLI arg or config arg. """Change port if the specified port is in use and is not explicitly specified as a CLI arg or config arg.
otherwise tell the user the port is in use and exit the app. otherwise tell the user the port is in use and exit the app.
@ -133,7 +134,7 @@ def handle_port(service_name: str, port: str, default_port: str) -> str:
Exit:when the port is in use. Exit:when the port is in use.
""" """
if is_process_on_port(port): if is_process_on_port(port):
if int(port) == int(default_port): if port == int(default_port):
return change_port(port, service_name) return change_port(port, service_name)
else: else:
console.error(f"{service_name.capitalize()} port: {port} is already in use") console.error(f"{service_name.capitalize()} port: {port} is already in use")
@ -141,7 +142,12 @@ def handle_port(service_name: str, port: str, default_port: str) -> str:
return port return port
def new_process(args, run: bool = False, show_logs: bool = False, **kwargs): def new_process(
args: str | list[str | Path | None] | list[str],
run: bool = False,
show_logs: bool = False,
**kwargs,
):
"""Wrapper over subprocess.Popen to unify the launch of child processes. """Wrapper over subprocess.Popen to unify the launch of child processes.
Args: Args:
@ -163,7 +169,7 @@ def new_process(args, run: bool = False, show_logs: bool = False, **kwargs):
"installed and added to your system's PATH environment variable or try running " "installed and added to your system's PATH environment variable or try running "
"`reflex init` again." "`reflex init` again."
) )
if None in args: if isinstance(args, list) and None in args:
console.error(f"Invalid command: {args}") console.error(f"Invalid command: {args}")
raise typer.Exit(1) raise typer.Exit(1)
# Add the node bin path to the PATH environment variable. # Add the node bin path to the PATH environment variable.
@ -185,7 +191,7 @@ def new_process(args, run: bool = False, show_logs: bool = False, **kwargs):
} }
console.debug(f"Running command: {args}") console.debug(f"Running command: {args}")
fn = subprocess.run if run else subprocess.Popen fn = subprocess.run if run else subprocess.Popen
return fn(args, **kwargs) return fn(args, **kwargs) # type: ignore
@contextlib.contextmanager @contextlib.contextmanager
@ -241,7 +247,7 @@ def run_concurrently(*fns: Union[Callable, Tuple]) -> None:
def stream_logs( def stream_logs(
message: str, message: str,
process: subprocess.Popen, process: subprocess.Popen,
progress=None, progress: Progress | None = None,
suppress_errors: bool = False, suppress_errors: bool = False,
analytics_enabled: bool = False, analytics_enabled: bool = False,
): ):
@ -369,10 +375,10 @@ def get_command_with_loglevel(command: list[str]) -> list[str]:
def run_process_with_fallback( def run_process_with_fallback(
args, args: list[str],
*, *,
show_status_message, show_status_message: str,
fallback=None, fallback: str | list | None = None,
analytics_enabled: bool = False, analytics_enabled: bool = False,
**kwargs, **kwargs,
): ):
@ -411,7 +417,7 @@ def run_process_with_fallback(
) )
def execute_command_and_return_output(command) -> str | None: def execute_command_and_return_output(command: str) -> str | None:
"""Execute a command and return the output. """Execute a command and return the output.
Args: Args:

View File

@ -83,7 +83,7 @@ DEFAULT_IMPORTS = {
} }
def _walk_files(path): def _walk_files(path: str | Path):
"""Walk all files in a path. """Walk all files in a path.
This can be replaced with Path.walk() in python3.12. This can be replaced with Path.walk() in python3.12.
@ -114,7 +114,9 @@ def _relative_to_pwd(path: Path) -> Path:
return path return path
def _get_type_hint(value, type_hint_globals, is_optional=True) -> str: def _get_type_hint(
value: Any, type_hint_globals: dict, is_optional: bool = True
) -> str:
"""Resolve the type hint for value. """Resolve the type hint for value.
Args: Args:
@ -382,7 +384,7 @@ def _extract_class_props_as_ast_nodes(
return kwargs return kwargs
def type_to_ast(typ, cls: type) -> ast.AST: def type_to_ast(typ: Any, cls: type) -> ast.AST:
"""Converts any type annotation into its AST representation. """Converts any type annotation into its AST representation.
Handles nested generic types, unions, etc. Handles nested generic types, unions, etc.
@ -440,7 +442,7 @@ def type_to_ast(typ, cls: type) -> ast.AST:
) )
def _get_parent_imports(func): def _get_parent_imports(func: Callable):
_imports = {"reflex.vars": ["Var"]} _imports = {"reflex.vars": ["Var"]}
for type_hint in inspect.get_annotations(func).values(): for type_hint in inspect.get_annotations(func).values():
try: try:
@ -1049,7 +1051,7 @@ class PyiGenerator:
pyi_path.write_text(pyi_content) pyi_path.write_text(pyi_content)
logger.info(f"Wrote {relpath}") logger.info(f"Wrote {relpath}")
def _get_init_lazy_imports(self, mod, new_tree): def _get_init_lazy_imports(self, mod: tuple | ModuleType, new_tree: ast.AST):
# retrieve the _SUBMODULES and _SUBMOD_ATTRS from an init file if present. # retrieve the _SUBMODULES and _SUBMOD_ATTRS from an init file if present.
sub_mods = getattr(mod, "_SUBMODULES", None) sub_mods = getattr(mod, "_SUBMODULES", None)
sub_mod_attrs = getattr(mod, "_SUBMOD_ATTRS", None) sub_mod_attrs = getattr(mod, "_SUBMOD_ATTRS", None)
@ -1135,7 +1137,7 @@ class PyiGenerator:
if pyi_path: if pyi_path:
self.written_files.append(pyi_path) self.written_files.append(pyi_path)
def scan_all(self, targets, changed_files: list[Path] | None = None): def scan_all(self, targets: list, changed_files: list[Path] | None = None):
"""Scan all targets for class inheriting Component and generate the .pyi files. """Scan all targets for class inheriting Component and generate the .pyi files.
Args: Args:

View File

@ -22,15 +22,15 @@ def latency(registry: str) -> int:
return 10_000_000 return 10_000_000
def average_latency(registry, attempts: int = 3) -> int: def average_latency(registry: str, attempts: int = 3) -> int:
"""Get the average latency of a registry. """Get the average latency of a registry.
Args: Args:
registry (str): The URL of the registry. registry: The URL of the registry.
attempts (int): The number of attempts to make. Defaults to 10. attempts: The number of attempts to make. Defaults to 10.
Returns: Returns:
int: The average latency of the registry in microseconds. The average latency of the registry in microseconds.
""" """
return sum(latency(registry) for _ in range(attempts)) // attempts return sum(latency(registry) for _ in range(attempts)) // attempts

View File

@ -160,7 +160,7 @@ def _send_event(event_data: dict) -> bool:
return False return False
def _send(event, telemetry_enabled, **kwargs): def _send(event: str, telemetry_enabled: bool | None, **kwargs):
from reflex.config import get_config from reflex.config import get_config
# Get the telemetry_enabled from the config if it is not specified. # Get the telemetry_enabled from the config if it is not specified.
@ -186,7 +186,7 @@ def send(event: str, telemetry_enabled: bool | None = None, **kwargs):
kwargs: Additional data to send with the event. kwargs: Additional data to send with the event.
""" """
async def async_send(event, telemetry_enabled, **kwargs): async def async_send(event: str, telemetry_enabled: bool | None, **kwargs):
return _send(event, telemetry_enabled, **kwargs) return _send(event, telemetry_enabled, **kwargs)
try: try:

View File

@ -154,7 +154,7 @@ class Unset:
@lru_cache() @lru_cache()
def get_origin(tp): def get_origin(tp: Any):
"""Get the origin of a class. """Get the origin of a class.
Args: Args:
@ -237,7 +237,7 @@ def is_literal(cls: GenericType) -> bool:
return get_origin(cls) is Literal return get_origin(cls) is Literal
def has_args(cls) -> bool: def has_args(cls: Type) -> bool:
"""Check if the class has generic parameters. """Check if the class has generic parameters.
Args: Args:
@ -740,7 +740,7 @@ def check_prop_in_allowed_types(prop: Any, allowed_types: Iterable) -> bool:
return type_ in allowed_types return type_ in allowed_types
def is_encoded_fstring(value) -> bool: def is_encoded_fstring(value: Any) -> bool:
"""Check if a value is an encoded Var f-string. """Check if a value is an encoded Var f-string.
Args: Args:
@ -783,7 +783,7 @@ def validate_literal(key: str, value: Any, expected_type: Type, comp_name: str):
) )
def validate_parameter_literals(func): def validate_parameter_literals(func: Callable):
"""Decorator to check that the arguments passed to a function """Decorator to check that the arguments passed to a function
correspond to the correct function parameter if it (the parameter) correspond to the correct function parameter if it (the parameter)
is a literal type. is a literal type.