
* fix and test bug in config env loading * streamline env var interpretation with @adhami3310 * improve error messages, fix invalid value for TELEMETRY_ENABLED * just a small hint * ruffing * fix typo from review * refactor - ruff broke the imports.. * cleanup imports * more * add internal and enum env var support * ruff cleanup * more global imports * revert telemetry, it lives in rx.Config * minor fixes/cleanup * i missed some refs * fix darglint * reload config is internal * fix EnvVar name * add test for EnvVar + minor typing improvement * bool tests * was this broken? * retain old behavior * migrate APP_HARNESS_HEADLESS to new env var system * migrate more APP_HARNESS env vars to new config system * migrate SCREENSHOT_DIR to new env var system * refactor EnvVar.get to be a method * readd deleted functions and deprecate them * improve EnvVar api, cleanup RELOAD_CONFIG question * move is_prod_mode back to where it was
59 lines
1.4 KiB
Python
59 lines
1.4 KiB
Python
"""Utilities for working with registries."""
|
|
|
|
import httpx
|
|
|
|
from reflex.config import environment
|
|
from reflex.utils import console, net
|
|
|
|
|
|
def latency(registry: str) -> int:
|
|
"""Get the latency of a registry.
|
|
|
|
Args:
|
|
registry (str): The URL of the registry.
|
|
|
|
Returns:
|
|
int: The latency of the registry in microseconds.
|
|
"""
|
|
try:
|
|
return net.get(registry).elapsed.microseconds
|
|
except httpx.HTTPError:
|
|
console.info(f"Failed to connect to {registry}.")
|
|
return 10_000_000
|
|
|
|
|
|
def average_latency(registry, attempts: int = 3) -> int:
|
|
"""Get the average latency of a registry.
|
|
|
|
Args:
|
|
registry (str): The URL of the registry.
|
|
attempts (int): The number of attempts to make. Defaults to 10.
|
|
|
|
Returns:
|
|
int: The average latency of the registry in microseconds.
|
|
"""
|
|
return sum(latency(registry) for _ in range(attempts)) // attempts
|
|
|
|
|
|
def get_best_registry() -> str:
|
|
"""Get the best registry based on latency.
|
|
|
|
Returns:
|
|
str: The best registry.
|
|
"""
|
|
registries = [
|
|
"https://registry.npmjs.org",
|
|
"https://r.cnpmjs.org",
|
|
]
|
|
|
|
return min(registries, key=average_latency)
|
|
|
|
|
|
def _get_npm_registry() -> str:
|
|
"""Get npm registry. If environment variable is set, use it first.
|
|
|
|
Returns:
|
|
str:
|
|
"""
|
|
return environment.NPM_CONFIG_REGISTRY.get() or get_best_registry()
|