use pathlib as much as possible (#3967)

* use pathlib as much as possible

* fixstuff

* break locally to unbreak in CI 🤷

* add type on env

* debug attempt 1

* debugged

* oops, there is the actual fix

* fix 3.9 compat
This commit is contained in:
Thomas Brandého 2024-10-03 08:50:39 -07:00 committed by GitHub
parent f3be9a3305
commit 3f51943162
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 202 additions and 260 deletions

View File

@ -3,8 +3,8 @@
from __future__ import annotations
import json
import os
import sys
from pathlib import Path
from utils import send_data_to_posthog
@ -28,7 +28,7 @@ def insert_benchmarking_data(
send_data_to_posthog("lighthouse_benchmark", properties)
def get_lighthouse_scores(directory_path: str) -> dict:
def get_lighthouse_scores(directory_path: str | Path) -> dict:
"""Extracts the Lighthouse scores from the JSON files in the specified directory.
Args:
@ -38,24 +38,21 @@ def get_lighthouse_scores(directory_path: str) -> dict:
dict: The Lighthouse scores.
"""
scores = {}
directory_path = Path(directory_path)
try:
for filename in os.listdir(directory_path):
if filename.endswith(".json") and filename != "manifest.json":
file_path = os.path.join(directory_path, filename)
with open(file_path, "r") as file:
data = json.load(file)
# Extract scores and add them to the dictionary with the filename as key
scores[data["finalUrl"].replace("http://localhost:3000/", "/")] = {
"performance_score": data["categories"]["performance"]["score"],
"accessibility_score": data["categories"]["accessibility"][
"score"
],
"best_practices_score": data["categories"]["best-practices"][
"score"
],
"seo_score": data["categories"]["seo"]["score"],
}
for filename in directory_path.iterdir():
if filename.suffix == ".json" and filename.stem != "manifest":
file_path = directory_path / filename
data = json.loads(file_path.read_text())
# Extract scores and add them to the dictionary with the filename as key
scores[data["finalUrl"].replace("http://localhost:3000/", "/")] = {
"performance_score": data["categories"]["performance"]["score"],
"accessibility_score": data["categories"]["accessibility"]["score"],
"best_practices_score": data["categories"]["best-practices"][
"score"
],
"seo_score": data["categories"]["seo"]["score"],
}
except Exception as e:
return {"error": e}

View File

@ -2,11 +2,12 @@
import argparse
import os
from pathlib import Path
from utils import get_directory_size, get_python_version, send_data_to_posthog
def get_package_size(venv_path, os_name):
def get_package_size(venv_path: Path, os_name):
"""Get the size of a specified package.
Args:
@ -26,14 +27,12 @@ def get_package_size(venv_path, os_name):
is_windows = "windows" in os_name
full_path = (
["lib", f"python{python_version}", "site-packages"]
package_dir: Path = (
venv_path / "lib" / f"python{python_version}" / "site-packages"
if not is_windows
else ["Lib", "site-packages"]
else venv_path / "Lib" / "site-packages"
)
package_dir = os.path.join(venv_path, *full_path)
if not os.path.exists(package_dir):
if not package_dir.exists():
raise ValueError(
"Error: Virtual environment does not exist or is not activated."
)
@ -63,9 +62,9 @@ def insert_benchmarking_data(
path: The path to the dir or file to check size.
"""
if "./dist" in path:
size = get_directory_size(path)
size = get_directory_size(Path(path))
else:
size = get_package_size(path, os_type_version)
size = get_package_size(Path(path), os_type_version)
# Prepare the event data
properties = {

View File

@ -2,6 +2,7 @@
import argparse
import os
from pathlib import Path
from utils import get_directory_size, send_data_to_posthog
@ -28,7 +29,7 @@ def insert_benchmarking_data(
pr_id: The id of the PR.
path: The path to the dir or file to check size.
"""
size = get_directory_size(path)
size = get_directory_size(Path(path))
# Prepare the event data
properties = {

View File

@ -2,12 +2,13 @@
import os
import subprocess
from pathlib import Path
import httpx
from httpx import HTTPError
def get_python_version(venv_path, os_name):
def get_python_version(venv_path: Path, os_name):
"""Get the python version of python in a virtual env.
Args:
@ -18,13 +19,13 @@ def get_python_version(venv_path, os_name):
The python version.
"""
python_executable = (
os.path.join(venv_path, "bin", "python")
venv_path / "bin" / "python"
if "windows" not in os_name
else os.path.join(venv_path, "Scripts", "python.exe")
else venv_path / "Scripts" / "python.exe"
)
try:
output = subprocess.check_output(
[python_executable, "--version"], stderr=subprocess.STDOUT
[str(python_executable), "--version"], stderr=subprocess.STDOUT
)
python_version = output.decode("utf-8").strip().split()[1]
return ".".join(python_version.split(".")[:-1])
@ -32,7 +33,7 @@ def get_python_version(venv_path, os_name):
return None
def get_directory_size(directory):
def get_directory_size(directory: Path):
"""Get the size of a directory in bytes.
Args:
@ -44,8 +45,8 @@ def get_directory_size(directory):
total_size = 0
for dirpath, _, filenames in os.walk(directory):
for f in filenames:
fp = os.path.join(dirpath, f)
total_size += os.path.getsize(fp)
fp = Path(dirpath) / f
total_size += fp.stat().st_size
return total_size

View File

@ -171,7 +171,7 @@ def _compile_root_stylesheet(stylesheets: list[str]) -> str:
stylesheet_full_path = (
Path.cwd() / constants.Dirs.APP_ASSETS / stylesheet.strip("/")
)
if not os.path.exists(stylesheet_full_path):
if not stylesheet_full_path.exists():
raise FileNotFoundError(
f"The stylesheet file {stylesheet_full_path} does not exist."
)

View File

@ -2,7 +2,6 @@
from __future__ import annotations
import os
from pathlib import Path
from typing import Any, Callable, Dict, Optional, Type, Union
from urllib.parse import urlparse
@ -457,16 +456,16 @@ def add_meta(
return page
def write_page(path: str, code: str):
def write_page(path: str | Path, code: str):
"""Write the given code to the given path.
Args:
path: The path to write the code to.
code: The code to write.
"""
path_ops.mkdir(os.path.dirname(path))
with open(path, "w", encoding="utf-8") as f:
f.write(code)
path = Path(path)
path_ops.mkdir(path.parent)
path.write_text(code, encoding="utf-8")
def empty_dir(path: str | Path, keep_files: list[str] | None = None):

View File

@ -6,7 +6,8 @@ import importlib
import os
import sys
import urllib.parse
from typing import Any, Dict, List, Optional, Set
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Union
try:
import pydantic.v1 as pydantic
@ -188,7 +189,7 @@ class Config(Base):
telemetry_enabled: bool = True
# The bun path
bun_path: str = constants.Bun.DEFAULT_PATH
bun_path: Union[str, Path] = constants.Bun.DEFAULT_PATH
# List of origins that are allowed to connect to the backend API.
cors_allowed_origins: List[str] = ["*"]

View File

@ -6,6 +6,7 @@ import os
import platform
from enum import Enum
from importlib import metadata
from pathlib import Path
from types import SimpleNamespace
from platformdirs import PlatformDirs
@ -66,18 +67,19 @@ class Reflex(SimpleNamespace):
# Get directory value from enviroment variables if it exists.
_dir = os.environ.get("REFLEX_DIR", "")
DIR = _dir or (
# on windows, we use C:/Users/<username>/AppData/Local/reflex.
# on macOS, we use ~/Library/Application Support/reflex.
# on linux, we use ~/.local/share/reflex.
# If user sets REFLEX_DIR envroment variable use that instead.
PlatformDirs(MODULE_NAME, False).user_data_dir
DIR = Path(
_dir
or (
# on windows, we use C:/Users/<username>/AppData/Local/reflex.
# on macOS, we use ~/Library/Application Support/reflex.
# on linux, we use ~/.local/share/reflex.
# If user sets REFLEX_DIR envroment variable use that instead.
PlatformDirs(MODULE_NAME, False).user_data_dir
)
)
# The root directory of the reflex library.
ROOT_DIR = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
ROOT_DIR = Path(__file__).parents[2]
RELEASES_URL = f"https://api.github.com/repos/reflex-dev/templates/releases"
@ -125,11 +127,11 @@ class Templates(SimpleNamespace):
"""Folders used by the template system of Reflex."""
# The template directory used during reflex init.
BASE = os.path.join(Reflex.ROOT_DIR, Reflex.MODULE_NAME, ".templates")
BASE = Reflex.ROOT_DIR / Reflex.MODULE_NAME / ".templates"
# The web subdirectory of the template directory.
WEB_TEMPLATE = os.path.join(BASE, "web")
WEB_TEMPLATE = BASE / "web"
# The jinja template directory.
JINJA_TEMPLATE = os.path.join(BASE, "jinja")
JINJA_TEMPLATE = BASE / "jinja"
# Where the code for the templates is stored.
CODE = "code"

View File

@ -1,6 +1,7 @@
"""Config constants."""
import os
from pathlib import Path
from types import SimpleNamespace
from reflex.constants.base import Dirs, Reflex
@ -17,9 +18,7 @@ class Config(SimpleNamespace):
# The name of the reflex config module.
MODULE = "rxconfig"
# The python config file.
FILE = f"{MODULE}{Ext.PY}"
# The previous config file.
PREVIOUS_FILE = f"pcconfig{Ext.PY}"
FILE = Path(f"{MODULE}{Ext.PY}")
class Expiration(SimpleNamespace):
@ -37,7 +36,7 @@ class GitIgnore(SimpleNamespace):
"""Gitignore constants."""
# The gitignore file.
FILE = ".gitignore"
FILE = Path(".gitignore")
# Files to gitignore.
DEFAULTS = {Dirs.WEB, "*.db", "__pycache__/", "*.py[cod]", "assets/external/"}

View File

@ -2,6 +2,7 @@
from __future__ import annotations
from pathlib import Path
from types import SimpleNamespace
@ -11,9 +12,9 @@ class CustomComponents(SimpleNamespace):
# The name of the custom components source directory.
SRC_DIR = "custom_components"
# The name of the custom components pyproject.toml file.
PYPROJECT_TOML = "pyproject.toml"
PYPROJECT_TOML = Path("pyproject.toml")
# The name of the custom components package README file.
PACKAGE_README = "README.md"
PACKAGE_README = Path("README.md")
# The name of the custom components package .gitignore file.
PACKAGE_GITIGNORE = ".gitignore"
# The name of the distribution directory as result of a build.
@ -29,6 +30,6 @@ class CustomComponents(SimpleNamespace):
"testpypi": "https://test.pypi.org/legacy/",
}
# The .gitignore file for the custom component project.
FILE = ".gitignore"
FILE = Path(".gitignore")
# Files to gitignore.
DEFAULTS = {"__pycache__/", "*.py[cod]", "*.egg-info/", "dist/"}

View File

@ -2,7 +2,6 @@
from __future__ import annotations
import os
import platform
from types import SimpleNamespace
@ -40,11 +39,10 @@ class Bun(SimpleNamespace):
# Min Bun Version
MIN_VERSION = "0.7.0"
# The directory to store the bun.
ROOT_PATH = os.path.join(Reflex.DIR, "bun")
ROOT_PATH = Reflex.DIR / "bun"
# Default bun path.
DEFAULT_PATH = os.path.join(
ROOT_PATH, "bin", "bun" if not IS_WINDOWS else "bun.exe"
)
DEFAULT_PATH = ROOT_PATH / "bin" / ("bun" if not IS_WINDOWS else "bun.exe")
# URL to bun install script.
INSTALL_URL = "https://bun.sh/install"
# URL to windows install script.
@ -65,10 +63,10 @@ class Fnm(SimpleNamespace):
# The FNM version.
VERSION = "1.35.1"
# The directory to store fnm.
DIR = os.path.join(Reflex.DIR, "fnm")
DIR = Reflex.DIR / "fnm"
FILENAME = get_fnm_name()
# The fnm executable binary.
EXE = os.path.join(DIR, "fnm.exe" if IS_WINDOWS else "fnm")
EXE = DIR / ("fnm.exe" if IS_WINDOWS else "fnm")
# The URL to the fnm release binary
INSTALL_URL = (
@ -86,18 +84,19 @@ class Node(SimpleNamespace):
MIN_VERSION = "18.17.0"
# The node bin path.
BIN_PATH = os.path.join(
Fnm.DIR,
"node-versions",
f"v{VERSION}",
"installation",
"bin" if not IS_WINDOWS else "",
BIN_PATH = (
Fnm.DIR
/ "node-versions"
/ f"v{VERSION}"
/ "installation"
/ ("bin" if not IS_WINDOWS else "")
)
# The default path where node is installed.
PATH = os.path.join(BIN_PATH, "node.exe" if IS_WINDOWS else "node")
PATH = BIN_PATH / ("node.exe" if IS_WINDOWS else "node")
# The default path where npm is installed.
NPM_PATH = os.path.join(BIN_PATH, "npm")
NPM_PATH = BIN_PATH / "npm"
# The environment variable to use the system installed node.
USE_SYSTEM_VAR = "REFLEX_USE_SYSTEM_NODE"

View File

@ -36,7 +36,7 @@ POST_CUSTOM_COMPONENTS_GALLERY_TIMEOUT = 15
@contextmanager
def set_directory(working_directory: str):
def set_directory(working_directory: str | Path):
"""Context manager that sets the working directory.
Args:
@ -45,7 +45,8 @@ def set_directory(working_directory: str):
Yields:
Yield to the caller to perform operations in the working directory.
"""
current_directory = os.getcwd()
current_directory = Path.cwd()
working_directory = Path(working_directory)
try:
os.chdir(working_directory)
yield
@ -62,14 +63,14 @@ def _create_package_config(module_name: str, package_name: str):
"""
from reflex.compiler import templates
with open(CustomComponents.PYPROJECT_TOML, "w") as f:
f.write(
templates.CUSTOM_COMPONENTS_PYPROJECT_TOML.render(
module_name=module_name,
package_name=package_name,
reflex_version=constants.Reflex.VERSION,
)
pyproject = Path(CustomComponents.PYPROJECT_TOML)
pyproject.write_text(
templates.CUSTOM_COMPONENTS_PYPROJECT_TOML.render(
module_name=module_name,
package_name=package_name,
reflex_version=constants.Reflex.VERSION,
)
)
def _get_package_config(exit_on_fail: bool = True) -> dict:
@ -84,11 +85,11 @@ def _get_package_config(exit_on_fail: bool = True) -> dict:
Raises:
Exit: If the pyproject.toml file is not found.
"""
pyproject = Path(CustomComponents.PYPROJECT_TOML)
try:
with open(CustomComponents.PYPROJECT_TOML, "rb") as f:
return dict(tomlkit.load(f))
return dict(tomlkit.loads(pyproject.read_bytes()))
except (OSError, TOMLKitError) as ex:
console.error(f"Unable to read from pyproject.toml due to {ex}")
console.error(f"Unable to read from {pyproject} due to {ex}")
if exit_on_fail:
raise typer.Exit(code=1) from ex
raise
@ -103,17 +104,17 @@ def _create_readme(module_name: str, package_name: str):
"""
from reflex.compiler import templates
with open(CustomComponents.PACKAGE_README, "w") as f:
f.write(
templates.CUSTOM_COMPONENTS_README.render(
module_name=module_name,
package_name=package_name,
)
readme = Path(CustomComponents.PACKAGE_README)
readme.write_text(
templates.CUSTOM_COMPONENTS_README.render(
module_name=module_name,
package_name=package_name,
)
)
def _write_source_and_init_py(
custom_component_src_dir: str,
custom_component_src_dir: Path,
component_class_name: str,
module_name: str,
):
@ -126,27 +127,17 @@ def _write_source_and_init_py(
"""
from reflex.compiler import templates
with open(
os.path.join(
custom_component_src_dir,
f"{module_name}.py",
),
"w",
) as f:
f.write(
templates.CUSTOM_COMPONENTS_SOURCE.render(
component_class_name=component_class_name, module_name=module_name
)
module_path = custom_component_src_dir / f"{module_name}.py"
module_path.write_text(
templates.CUSTOM_COMPONENTS_SOURCE.render(
component_class_name=component_class_name, module_name=module_name
)
)
with open(
os.path.join(
custom_component_src_dir,
CustomComponents.INIT_FILE,
),
"w",
) as f:
f.write(templates.CUSTOM_COMPONENTS_INIT_FILE.render(module_name=module_name))
init_path = custom_component_src_dir / CustomComponents.INIT_FILE
init_path.write_text(
templates.CUSTOM_COMPONENTS_INIT_FILE.render(module_name=module_name)
)
def _populate_demo_app(name_variants: NameVariants):
@ -192,7 +183,7 @@ def _get_default_library_name_parts() -> list[str]:
Returns:
The parts of default library name.
"""
current_dir_name = os.getcwd().split(os.path.sep)[-1]
current_dir_name = Path.cwd().name
cleaned_dir_name = re.sub("[^0-9a-zA-Z-_]+", "", current_dir_name).lower()
parts = [part for part in re.split("-|_", cleaned_dir_name) if part]
@ -345,7 +336,7 @@ def init(
console.set_log_level(loglevel)
if os.path.exists(CustomComponents.PYPROJECT_TOML):
if CustomComponents.PYPROJECT_TOML.exists():
console.error(f"A {CustomComponents.PYPROJECT_TOML} already exists. Aborting.")
typer.Exit(code=1)

View File

@ -114,9 +114,6 @@ def _init(
app_name, generation_hash=generation_hash
)
# Migrate Pynecone projects to Reflex.
prerequisites.migrate_to_reflex()
# Initialize the .gitignore.
prerequisites.initialize_gitignore()

View File

@ -61,8 +61,8 @@ def generate_sitemap_config(deploy_url: str, export=False):
def _zip(
component_name: constants.ComponentName,
target: str,
root_dir: str,
target: str | Path,
root_dir: str | Path,
exclude_venv_dirs: bool,
upload_db_file: bool = False,
dirs_to_exclude: set[str] | None = None,
@ -82,22 +82,22 @@ def _zip(
top_level_dirs_to_exclude: The top level directory names immediately under root_dir to exclude. Do not exclude folders by these names further in the sub-directories.
"""
target = Path(target)
root_dir = Path(root_dir)
dirs_to_exclude = dirs_to_exclude or set()
files_to_exclude = files_to_exclude or set()
files_to_zip: list[str] = []
# Traverse the root directory in a top-down manner. In this traversal order,
# we can modify the dirs list in-place to remove directories we don't want to include.
for root, dirs, files in os.walk(root_dir, topdown=True):
root = Path(root)
# Modify the dirs in-place so excluded and hidden directories are skipped in next traversal.
dirs[:] = [
d
for d in dirs
if (basename := os.path.basename(os.path.normpath(d)))
not in dirs_to_exclude
if (basename := Path(d).resolve().name) not in dirs_to_exclude
and not basename.startswith(".")
and (
not exclude_venv_dirs or not _looks_like_venv_dir(os.path.join(root, d))
)
and (not exclude_venv_dirs or not _looks_like_venv_dir(root / d))
]
# If we are at the top level with root_dir, exclude the top level dirs.
if top_level_dirs_to_exclude and root == root_dir:
@ -109,7 +109,7 @@ def _zip(
if not f.startswith(".") and (upload_db_file or not f.endswith(".db"))
]
files_to_zip += [
os.path.join(root, file) for file in files if file not in files_to_exclude
str(root / file) for file in files if file not in files_to_exclude
]
# Create a progress bar for zipping the component.
@ -126,13 +126,13 @@ def _zip(
for file in files_to_zip:
console.debug(f"{target}: {file}", progress=progress)
progress.advance(task)
zipf.write(file, os.path.relpath(file, root_dir))
zipf.write(file, Path(file).relative_to(root_dir))
def zip_app(
frontend: bool = True,
backend: bool = True,
zip_dest_dir: str = os.getcwd(),
zip_dest_dir: str | Path = Path.cwd(),
upload_db_file: bool = False,
):
"""Zip up the app.
@ -143,6 +143,7 @@ def zip_app(
zip_dest_dir: The directory to export the zip file to.
upload_db_file: Whether to upload the database file.
"""
zip_dest_dir = Path(zip_dest_dir)
files_to_exclude = {
constants.ComponentName.FRONTEND.zip(),
constants.ComponentName.BACKEND.zip(),
@ -151,8 +152,8 @@ def zip_app(
if frontend:
_zip(
component_name=constants.ComponentName.FRONTEND,
target=os.path.join(zip_dest_dir, constants.ComponentName.FRONTEND.zip()),
root_dir=str(prerequisites.get_web_dir() / constants.Dirs.STATIC),
target=zip_dest_dir / constants.ComponentName.FRONTEND.zip(),
root_dir=prerequisites.get_web_dir() / constants.Dirs.STATIC,
files_to_exclude=files_to_exclude,
exclude_venv_dirs=False,
)
@ -160,8 +161,8 @@ def zip_app(
if backend:
_zip(
component_name=constants.ComponentName.BACKEND,
target=os.path.join(zip_dest_dir, constants.ComponentName.BACKEND.zip()),
root_dir=".",
target=zip_dest_dir / constants.ComponentName.BACKEND.zip(),
root_dir=Path("."),
dirs_to_exclude={"__pycache__"},
files_to_exclude=files_to_exclude,
top_level_dirs_to_exclude={"assets"},
@ -266,5 +267,6 @@ def setup_frontend_prod(
build(deploy_url=get_config().deploy_url)
def _looks_like_venv_dir(dir_to_check: str) -> bool:
return os.path.exists(os.path.join(dir_to_check, "pyvenv.cfg"))
def _looks_like_venv_dir(dir_to_check: str | Path) -> bool:
dir_to_check = Path(dir_to_check)
return (dir_to_check / "pyvenv.cfg").exists()

View File

@ -164,7 +164,7 @@ def use_system_bun() -> bool:
return use_system_install(constants.Bun.USE_SYSTEM_VAR)
def get_node_bin_path() -> str | None:
def get_node_bin_path() -> Path | None:
"""Get the node binary dir path.
Returns:
@ -173,8 +173,8 @@ def get_node_bin_path() -> str | None:
bin_path = Path(constants.Node.BIN_PATH)
if not bin_path.exists():
str_path = which("node")
return str(Path(str_path).parent.resolve()) if str_path else str_path
return str(bin_path.resolve())
return Path(str_path).parent.resolve() if str_path else None
return bin_path.resolve()
def get_node_path() -> str | None:

View File

@ -2,9 +2,9 @@
from __future__ import annotations
import contextlib
import dataclasses
import functools
import glob
import importlib
import importlib.metadata
import json
@ -19,7 +19,6 @@ import tempfile
import time
import zipfile
from datetime import datetime
from fileinput import FileInput
from pathlib import Path
from types import ModuleType
from typing import Callable, List, Optional
@ -192,7 +191,7 @@ def get_bun_version() -> version.Version | None:
"""
try:
# Run the bun -v command and capture the output
result = processes.new_process([get_config().bun_path, "-v"], run=True)
result = processes.new_process([str(get_config().bun_path), "-v"], run=True)
return version.parse(result.stdout) # type: ignore
except FileNotFoundError:
return None
@ -217,7 +216,7 @@ def get_install_package_manager() -> str | None:
or windows_npm_escape_hatch()
):
return get_package_manager()
return get_config().bun_path
return str(get_config().bun_path)
def get_package_manager() -> str | None:
@ -394,9 +393,7 @@ def validate_app_name(app_name: str | None = None) -> str:
Raises:
Exit: if the app directory name is reflex or if the name is not standard for a python package name.
"""
app_name = (
app_name if app_name else os.getcwd().split(os.path.sep)[-1].replace("-", "_")
)
app_name = app_name if app_name else Path.cwd().name.replace("-", "_")
# Make sure the app is not named "reflex".
if app_name.lower() == constants.Reflex.MODULE_NAME:
console.error(
@ -430,7 +427,7 @@ def create_config(app_name: str):
def initialize_gitignore(
gitignore_file: str = constants.GitIgnore.FILE,
gitignore_file: Path = constants.GitIgnore.FILE,
files_to_ignore: set[str] = constants.GitIgnore.DEFAULTS,
):
"""Initialize the template .gitignore file.
@ -441,9 +438,10 @@ def initialize_gitignore(
"""
# Combine with the current ignored files.
current_ignore: set[str] = set()
if os.path.exists(gitignore_file):
with open(gitignore_file, "r") as f:
current_ignore |= set([line.strip() for line in f.readlines()])
if gitignore_file.exists():
current_ignore |= set(
line.strip() for line in gitignore_file.read_text().splitlines()
)
if files_to_ignore == current_ignore:
console.debug(f"{gitignore_file} already up to date.")
@ -451,9 +449,11 @@ def initialize_gitignore(
files_to_ignore |= current_ignore
# Write files to the .gitignore file.
with open(gitignore_file, "w", newline="\n") as f:
console.debug(f"Creating {gitignore_file}")
f.write(f"{(path_ops.join(sorted(files_to_ignore))).lstrip()}\n")
gitignore_file.touch(exist_ok=True)
console.debug(f"Creating {gitignore_file}")
gitignore_file.write_text(
"\n".join(sorted(files_to_ignore)) + "\n",
)
def initialize_requirements_txt():
@ -546,8 +546,8 @@ def initialize_app_directory(
# Rename the template app to the app name.
path_ops.mv(template_code_dir_name, app_name)
path_ops.mv(
os.path.join(app_name, template_name + constants.Ext.PY),
os.path.join(app_name, app_name + constants.Ext.PY),
Path(app_name) / (template_name + constants.Ext.PY),
Path(app_name) / (app_name + constants.Ext.PY),
)
# Fix up the imports.
@ -691,7 +691,7 @@ def _update_next_config(
def remove_existing_bun_installation():
"""Remove existing bun installation."""
console.debug("Removing existing bun installation.")
if os.path.exists(get_config().bun_path):
if Path(get_config().bun_path).exists():
path_ops.rm(constants.Bun.ROOT_PATH)
@ -731,7 +731,7 @@ def download_and_extract_fnm_zip():
# Download the zip file
url = constants.Fnm.INSTALL_URL
console.debug(f"Downloading {url}")
fnm_zip_file = os.path.join(constants.Fnm.DIR, f"{constants.Fnm.FILENAME}.zip")
fnm_zip_file = constants.Fnm.DIR / f"{constants.Fnm.FILENAME}.zip"
# Function to download and extract the FNM zip release.
try:
# Download the FNM zip release.
@ -770,7 +770,7 @@ def install_node():
return
path_ops.mkdir(constants.Fnm.DIR)
if not os.path.exists(constants.Fnm.EXE):
if not constants.Fnm.EXE.exists():
download_and_extract_fnm_zip()
if constants.IS_WINDOWS:
@ -827,7 +827,7 @@ def install_bun():
)
# Skip if bun is already installed.
if os.path.exists(get_config().bun_path) and get_bun_version() == version.parse(
if Path(get_config().bun_path).exists() and get_bun_version() == version.parse(
constants.Bun.VERSION
):
console.debug("Skipping bun installation as it is already installed.")
@ -842,7 +842,7 @@ def install_bun():
f"irm {constants.Bun.WINDOWS_INSTALL_URL}|iex",
],
env={
"BUN_INSTALL": constants.Bun.ROOT_PATH,
"BUN_INSTALL": str(constants.Bun.ROOT_PATH),
"BUN_VERSION": constants.Bun.VERSION,
},
shell=True,
@ -858,25 +858,26 @@ def install_bun():
download_and_run(
constants.Bun.INSTALL_URL,
f"bun-v{constants.Bun.VERSION}",
BUN_INSTALL=constants.Bun.ROOT_PATH,
BUN_INSTALL=str(constants.Bun.ROOT_PATH),
)
def _write_cached_procedure_file(payload: str, cache_file: str):
with open(cache_file, "w") as f:
f.write(payload)
def _write_cached_procedure_file(payload: str, cache_file: str | Path):
cache_file = Path(cache_file)
cache_file.write_text(payload)
def _read_cached_procedure_file(cache_file: str) -> str | None:
if os.path.exists(cache_file):
with open(cache_file, "r") as f:
return f.read()
def _read_cached_procedure_file(cache_file: str | Path) -> str | None:
cache_file = Path(cache_file)
if cache_file.exists():
return cache_file.read_text()
return None
def _clear_cached_procedure_file(cache_file: str):
if os.path.exists(cache_file):
os.remove(cache_file)
def _clear_cached_procedure_file(cache_file: str | Path):
cache_file = Path(cache_file)
if cache_file.exists():
cache_file.unlink()
def cached_procedure(cache_file: str, payload_fn: Callable[..., str]):
@ -977,7 +978,7 @@ def needs_reinit(frontend: bool = True) -> bool:
Raises:
Exit: If the app is not initialized.
"""
if not os.path.exists(constants.Config.FILE):
if not constants.Config.FILE.exists():
console.error(
f"[cyan]{constants.Config.FILE}[/cyan] not found. Move to the root folder of your project, or run [bold]{constants.Reflex.MODULE_NAME} init[/bold] to start a new project."
)
@ -988,7 +989,7 @@ def needs_reinit(frontend: bool = True) -> bool:
return False
# Make sure the .reflex directory exists.
if not os.path.exists(constants.Reflex.DIR):
if not constants.Reflex.DIR.exists():
return True
# Make sure the .web directory exists in frontend mode.
@ -1093,25 +1094,21 @@ def ensure_reflex_installation_id() -> Optional[int]:
"""
try:
initialize_reflex_user_directory()
installation_id_file = os.path.join(constants.Reflex.DIR, "installation_id")
installation_id_file = constants.Reflex.DIR / "installation_id"
installation_id = None
if os.path.exists(installation_id_file):
try:
with open(installation_id_file, "r") as f:
installation_id = int(f.read())
except Exception:
if installation_id_file.exists():
with contextlib.suppress(Exception):
installation_id = int(installation_id_file.read_text())
# If anything goes wrong at all... just regenerate.
# Like what? Examples:
# - file not exists
# - file not readable
# - content not parseable as an int
pass
if installation_id is None:
installation_id = random.getrandbits(128)
with open(installation_id_file, "w") as f:
f.write(str(installation_id))
installation_id_file.write_text(str(installation_id))
# If we get here, installation_id is definitely set
return installation_id
except Exception as e:
@ -1205,50 +1202,6 @@ def prompt_for_template(templates: list[Template]) -> str:
return templates[int(template)].name
def migrate_to_reflex():
"""Migration from Pynecone to Reflex."""
# Check if the old config file exists.
if not os.path.exists(constants.Config.PREVIOUS_FILE):
return
# Ask the user if they want to migrate.
action = console.ask(
"Pynecone project detected. Automatically upgrade to Reflex?",
choices=["y", "n"],
)
if action == "n":
return
# Rename pcconfig to rxconfig.
console.log(
f"[bold]Renaming {constants.Config.PREVIOUS_FILE} to {constants.Config.FILE}"
)
os.rename(constants.Config.PREVIOUS_FILE, constants.Config.FILE)
# Find all python files in the app directory.
file_pattern = os.path.join(get_config().app_name, "**/*.py")
file_list = glob.glob(file_pattern, recursive=True)
# Add the config file to the list of files to be migrated.
file_list.append(constants.Config.FILE)
# Migrate all files.
updates = {
"Pynecone": "Reflex",
"pynecone as pc": "reflex as rx",
"pynecone.io": "reflex.dev",
"pynecone": "reflex",
"pc.": "rx.",
"pcconfig": "rxconfig",
}
for file_path in file_list:
with FileInput(file_path, inplace=True) as file:
for line in file:
for old, new in updates.items():
line = line.replace(old, new)
print(line, end="")
def fetch_app_templates(version: str) -> dict[str, Template]:
"""Fetch a dict of templates from the templates repo using github API.
@ -1401,7 +1354,7 @@ def initialize_app(app_name: str, template: str | None = None):
from reflex.utils import telemetry
# Check if the app is already initialized.
if os.path.exists(constants.Config.FILE):
if constants.Config.FILE.exists():
telemetry.send("reinit")
return

View File

@ -156,7 +156,7 @@ def new_process(args, run: bool = False, show_logs: bool = False, **kwargs):
Raises:
Exit: When attempting to run a command with a None value.
"""
node_bin_path = path_ops.get_node_bin_path()
node_bin_path = str(path_ops.get_node_bin_path())
if not node_bin_path and not prerequisites.CURRENTLY_INSTALLING_NODE:
console.warn(
"The path to the Node binary could not be found. Please ensure that Node is properly "
@ -167,7 +167,7 @@ def new_process(args, run: bool = False, show_logs: bool = False, **kwargs):
console.error(f"Invalid command: {args}")
raise typer.Exit(1)
# Add the node bin path to the PATH environment variable.
env = {
env: dict[str, str] = {
**os.environ,
"PATH": os.pathsep.join(
[node_bin_path if node_bin_path else "", os.environ["PATH"]]

View File

@ -8,7 +8,7 @@ import pytest
import requests
def check_urls(repo_dir):
def check_urls(repo_dir: Path):
"""Check that all URLs in the repo are valid and secure.
Args:
@ -21,33 +21,33 @@ def check_urls(repo_dir):
errors = []
for root, _dirs, files in os.walk(repo_dir):
if "__pycache__" in root:
root = Path(root)
if root.stem == "__pycache__":
continue
for file_name in files:
if not file_name.endswith(".py") and not file_name.endswith(".md"):
continue
file_path = os.path.join(root, file_name)
file_path = root / file_name
try:
with open(file_path, "r", encoding="utf-8", errors="ignore") as file:
for line in file:
urls = url_pattern.findall(line)
for url in set(urls):
if url.startswith("http://"):
errors.append(
f"Found insecure HTTP URL: {url} in {file_path}"
)
url = url.strip('"\n')
try:
response = requests.head(
url, allow_redirects=True, timeout=5
)
response.raise_for_status()
except requests.RequestException as e:
errors.append(
f"Error accessing URL: {url} in {file_path} | Error: {e}, , Check your path ends with a /"
)
for line in file_path.read_text().splitlines():
urls = url_pattern.findall(line)
for url in set(urls):
if url.startswith("http://"):
errors.append(
f"Found insecure HTTP URL: {url} in {file_path}"
)
url = url.strip('"\n')
try:
response = requests.head(
url, allow_redirects=True, timeout=5
)
response.raise_for_status()
except requests.RequestException as e:
errors.append(
f"Error accessing URL: {url} in {file_path} | Error: {e}, , Check your path ends with a /"
)
except Exception as e:
errors.append(f"Error reading file: {file_path} | Error: {e}")
@ -58,7 +58,7 @@ def check_urls(repo_dir):
"repo_dir",
[Path(__file__).resolve().parent.parent / "reflex"],
)
def test_find_and_check_urls(repo_dir):
def test_find_and_check_urls(repo_dir: Path):
"""Test that all URLs in the repo are valid and secure.
Args:

View File

@ -1,4 +1,4 @@
import os
from pathlib import Path
from typing import List
import pytest
@ -130,7 +130,7 @@ def test_compile_stylesheets(tmp_path, mocker):
]
assert compiler.compile_root_stylesheet(stylesheets) == (
os.path.join(".web", "styles", "styles.css"),
str(Path(".web") / "styles" / "styles.css"),
f"@import url('./tailwind.css'); \n"
f"@import url('https://fonts.googleapis.com/css?family=Sofia&effect=neon|outline|emboss|shadow-multiple'); \n"
f"@import url('https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap.min.css'); \n"
@ -164,7 +164,7 @@ def test_compile_stylesheets_exclude_tailwind(tmp_path, mocker):
]
assert compiler.compile_root_stylesheet(stylesheets) == (
os.path.join(".web", "styles", "styles.css"),
str(Path(".web") / "styles" / "styles.css"),
"@import url('../public/styles.css'); \n",
)

View File

@ -192,4 +192,4 @@ def test_reflex_dir_env_var(monkeypatch, tmp_path):
mp_ctx = multiprocessing.get_context(method="spawn")
with mp_ctx.Pool(processes=1) as pool:
assert pool.apply(reflex_dir_constant) == str(tmp_path)
assert pool.apply(reflex_dir_constant) == tmp_path

View File

@ -52,4 +52,4 @@ def test_send(mocker, event):
telemetry._send(event, telemetry_enabled=True)
httpx_post_mock.assert_called_once()
pathlib_path_read_text_mock.assert_called_once()
assert pathlib_path_read_text_mock.call_count == 2

View File

@ -117,7 +117,7 @@ def test_remove_existing_bun_installation(mocker):
Args:
mocker: Pytest mocker.
"""
mocker.patch("reflex.utils.prerequisites.os.path.exists", return_value=True)
mocker.patch("reflex.utils.prerequisites.Path.exists", return_value=True)
rm = mocker.patch("reflex.utils.prerequisites.path_ops.rm", mocker.Mock())
prerequisites.remove_existing_bun_installation()
@ -458,7 +458,7 @@ def test_bun_install_without_unzip(mocker):
mocker: Pytest mocker object.
"""
mocker.patch("reflex.utils.path_ops.which", return_value=None)
mocker.patch("os.path.exists", return_value=False)
mocker.patch("pathlib.Path.exists", return_value=False)
mocker.patch("reflex.utils.prerequisites.constants.IS_WINDOWS", False)
with pytest.raises(FileNotFoundError):
@ -476,7 +476,7 @@ def test_bun_install_version(mocker, bun_version):
"""
mocker.patch("reflex.utils.prerequisites.constants.IS_WINDOWS", False)
mocker.patch("os.path.exists", return_value=True)
mocker.patch("pathlib.Path.exists", return_value=True)
mocker.patch(
"reflex.utils.prerequisites.get_bun_version",
return_value=version.parse(bun_version),