enable PTH rule

This commit is contained in:
Lendemor 2024-12-04 00:38:33 +01:00
parent c721227a06
commit 0a1e00fe27
19 changed files with 78 additions and 80 deletions

View File

@ -5,6 +5,7 @@ from __future__ import annotations
import argparse
import json
import os
from pathlib import Path
from utils import send_data_to_posthog
@ -18,7 +19,7 @@ def extract_stats_from_json(json_file: str) -> list[dict]:
Returns:
list[dict]: The stats for each test.
"""
with open(json_file, "r") as file:
with Path(json_file).open() as file:
json_data = json.load(file)
# Load the JSON data if it is a string, otherwise assume it's already a dictionary

View File

@ -5,6 +5,7 @@ from __future__ import annotations
import argparse
import json
import os
from pathlib import Path
from utils import send_data_to_posthog
@ -18,7 +19,7 @@ def extract_stats_from_json(json_file: str) -> dict:
Returns:
dict: The stats for each test.
"""
with open(json_file, "r") as file:
with Path(json_file).open() as file:
json_data = json.load(file)
# Load the JSON data if it is a string, otherwise assume it's already a dictionary

View File

@ -93,7 +93,7 @@ build-backend = "poetry.core.masonry.api"
[tool.ruff]
target-version = "py39"
lint.isort.split-on-trailing-comma = false
lint.select = ["B", "D", "E", "F", "I", "SIM", "W"]
lint.select = ["B", "D", "E", "F", "I", "SIM", "W", "PTH"]
lint.ignore = ["B008", "D205", "E501", "F403", "SIM115"]
lint.pydocstyle.convention = "google"

View File

@ -858,7 +858,7 @@ def get_config(reload: bool = False) -> Config:
with _config_lock:
sys_path = sys.path.copy()
sys.path.clear()
sys.path.append(os.getcwd())
sys.path.append(str(Path.cwd()))
try:
# Try to import the module with only the current directory in the path.
return _get_config()

View File

@ -10,7 +10,7 @@ class CustomComponents(SimpleNamespace):
"""Constants for the custom components."""
# The name of the custom components source directory.
SRC_DIR = "custom_components"
SRC_DIR = Path("custom_components")
# The name of the custom components pyproject.toml file.
PYPROJECT_TOML = Path("pyproject.toml")
# The name of the custom components package README file.

View File

@ -150,27 +150,27 @@ def _populate_demo_app(name_variants: NameVariants):
from reflex.compiler import templates
from reflex.reflex import _init
demo_app_dir = name_variants.demo_app_dir
demo_app_dir = Path(name_variants.demo_app_dir)
demo_app_name = name_variants.demo_app_name
console.info(f"Creating app for testing: {demo_app_dir}")
console.info(f"Creating app for testing: {demo_app_dir!s}")
os.makedirs(demo_app_dir)
demo_app_dir.mkdir(exist_ok=True)
with set_directory(demo_app_dir):
# We start with the blank template as basis.
_init(name=demo_app_name, template=constants.Templates.DEFAULT)
# Then overwrite the app source file with the one we want for testing custom components.
# This source file is rendered using jinja template file.
with open(f"{demo_app_name}/{demo_app_name}.py", "w") as f:
f.write(
templates.CUSTOM_COMPONENTS_DEMO_APP.render(
custom_component_module_dir=name_variants.custom_component_module_dir,
module_name=name_variants.module_name,
)
demo_file = Path(f"{demo_app_name}/{demo_app_name}.py")
demo_file.write_text(
templates.CUSTOM_COMPONENTS_DEMO_APP.render(
custom_component_module_dir=name_variants.custom_component_module_dir,
module_name=name_variants.module_name,
)
)
# Append the custom component package to the requirements.txt file.
with open(f"{constants.RequirementsTxt.FILE}", "a") as f:
with Path(f"{constants.RequirementsTxt.FILE}").open(mode="a") as f:
f.write(f"{name_variants.package_name}\n")
@ -296,13 +296,14 @@ def _populate_custom_component_project(name_variants: NameVariants):
)
console.info(
f"Initializing the component directory: {CustomComponents.SRC_DIR}/{name_variants.custom_component_module_dir}"
f"Initializing the component directory: {CustomComponents.SRC_DIR / name_variants.custom_component_module_dir}"
)
os.makedirs(CustomComponents.SRC_DIR)
CustomComponents.SRC_DIR.mkdir(exist_ok=True)
with set_directory(CustomComponents.SRC_DIR):
os.makedirs(name_variants.custom_component_module_dir)
module_dir = Path(name_variants.custom_component_module_dir)
module_dir.mkdir(exist_ok=True, parents=True)
_write_source_and_init_py(
custom_component_src_dir=name_variants.custom_component_module_dir,
custom_component_src_dir=module_dir,
component_class_name=name_variants.component_class_name,
module_name=name_variants.module_name,
)
@ -814,7 +815,7 @@ def _validate_project_info():
)
pyproject_toml["project"] = project
try:
with open(CustomComponents.PYPROJECT_TOML, "w") as f:
with CustomComponents.PYPROJECT_TOML.open("w") as f:
tomlkit.dump(pyproject_toml, f)
except (OSError, TOMLKitError) as ex:
console.error(f"Unable to write to pyproject.toml due to {ex}")
@ -922,16 +923,15 @@ def _validate_url_with_protocol_prefix(url: str | None) -> bool:
def _get_file_from_prompt_in_loop() -> Tuple[bytes, str] | None:
image_file = file_extension = None
while image_file is None:
image_filepath = console.ask(
"Upload a preview image of your demo app (enter to skip)"
image_filepath = Path(
console.ask("Upload a preview image of your demo app (enter to skip)")
)
if not image_filepath:
break
file_extension = image_filepath.split(".")[-1]
file_extension = image_filepath.suffix
try:
with open(image_filepath, "rb") as f:
image_file = f.read()
return image_file, file_extension
image_file = image_filepath.read_bytes()
return image_file, file_extension
except OSError as ose:
console.error(f"Unable to read the {file_extension} file due to {ose}")
raise typer.Exit(code=1) from ose

View File

@ -3,7 +3,6 @@
from __future__ import annotations
import atexit
import os
from pathlib import Path
from typing import List, Optional
@ -300,7 +299,7 @@ def export(
True, "--frontend-only", help="Export only frontend.", show_default=False
),
zip_dest_dir: str = typer.Option(
os.getcwd(),
str(Path.cwd()),
help="The directory to export the zip files to.",
show_default=False,
),

View File

@ -8,7 +8,6 @@ import dataclasses
import functools
import inspect
import os
import pathlib
import platform
import re
import signal
@ -20,6 +19,7 @@ import threading
import time
import types
from http.server import SimpleHTTPRequestHandler
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
@ -100,7 +100,7 @@ class chdir(contextlib.AbstractContextManager):
def __enter__(self):
"""Save current directory and perform chdir."""
self._old_cwd.append(os.getcwd())
self._old_cwd.append(Path.cwd())
os.chdir(self.path)
def __exit__(self, *excinfo):
@ -120,8 +120,8 @@ class AppHarness:
app_source: Optional[
Callable[[], None] | types.ModuleType | str | functools.partial[Any]
]
app_path: pathlib.Path
app_module_path: pathlib.Path
app_path: Path
app_module_path: Path
app_module: Optional[types.ModuleType] = None
app_instance: Optional[reflex.App] = None
frontend_process: Optional[subprocess.Popen] = None
@ -136,7 +136,7 @@ class AppHarness:
@classmethod
def create(
cls,
root: pathlib.Path,
root: Path,
app_source: Optional[
Callable[[], None] | types.ModuleType | str | functools.partial[Any]
] = None,
@ -815,7 +815,7 @@ class AppHarness:
class SimpleHTTPRequestHandlerCustomErrors(SimpleHTTPRequestHandler):
"""SimpleHTTPRequestHandler with custom error page handling."""
def __init__(self, *args, error_page_map: dict[int, pathlib.Path], **kwargs):
def __init__(self, *args, error_page_map: dict[int, Path], **kwargs):
"""Initialize the handler.
Args:
@ -858,8 +858,8 @@ class Subdir404TCPServer(socketserver.TCPServer):
def __init__(
self,
*args,
root: pathlib.Path,
error_page_map: dict[int, pathlib.Path] | None,
root: Path,
error_page_map: dict[int, Path] | None,
**kwargs,
):
"""Initialize the server.

View File

@ -150,7 +150,7 @@ def zip_app(
_zip(
component_name=constants.ComponentName.BACKEND,
target=zip_dest_dir / constants.ComponentName.BACKEND.zip(),
root_dir=Path("."),
root_dir=Path(),
dirs_to_exclude={"__pycache__"},
files_to_exclude=files_to_exclude,
top_level_dirs_to_exclude={"assets"},

View File

@ -24,7 +24,7 @@ from reflex.utils.prerequisites import get_web_dir
frontend_process = None
def detect_package_change(json_file_path: str) -> str:
def detect_package_change(json_file_path: Path) -> str:
"""Calculates the SHA-256 hash of a JSON file and returns it as a hexadecimal string.
Args:
@ -37,7 +37,7 @@ def detect_package_change(json_file_path: str) -> str:
>>> detect_package_change("package.json")
'a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6q7r8s9t0u1v2w3x4y5z6a7b8c9d0e1f2'
"""
with open(json_file_path, "r") as file:
with json_file_path.open("r") as file:
json_data = json.load(file)
# Calculate the hash
@ -81,7 +81,7 @@ def run_process_and_launch_url(run_command: list[str], backend_present=True):
from reflex.utils import processes
json_file_path = get_web_dir() / constants.PackageJson.PATH
last_hash = detect_package_change(str(json_file_path))
last_hash = detect_package_change(json_file_path)
process = None
first_run = True
@ -124,7 +124,7 @@ def run_process_and_launch_url(run_command: list[str], backend_present=True):
"`REFLEX_USE_NPM=1 reflex init`\n"
"`REFLEX_USE_NPM=1 reflex run`"
)
new_hash = detect_package_change(str(json_file_path))
new_hash = detect_package_change(json_file_path)
if new_hash != last_hash:
last_hash = new_hash
kill(process.pid)

View File

@ -1,6 +1,5 @@
"""Export utilities."""
import os
from pathlib import Path
from typing import Optional
@ -15,7 +14,7 @@ def export(
zipping: bool = True,
frontend: bool = True,
backend: bool = True,
zip_dest_dir: str = os.getcwd(),
zip_dest_dir: str = str(Path.cwd()),
upload_db_file: bool = False,
api_url: Optional[str] = None,
deploy_url: Optional[str] = None,

View File

@ -205,14 +205,14 @@ def update_json_file(file_path: str | Path, update_dict: dict[str, int | str]):
# Read the existing json object from the file.
json_object = {}
if fp.stat().st_size:
with open(fp) as f:
with fp.open() as f:
json_object = json.load(f)
# Update the json object with the new data.
json_object.update(update_dict)
# Write the updated json object to the file
with open(fp, "w") as f:
with fp.open("w") as f:
json.dump(json_object, f, ensure_ascii=False)

View File

@ -291,7 +291,7 @@ def get_app(reload: bool = False) -> ModuleType:
"If this error occurs in a reflex test case, ensure that `get_app` is mocked."
)
module = config.module
sys.path.insert(0, os.getcwd())
sys.path.insert(0, str(Path.cwd()))
app = __import__(module, fromlist=(constants.CompileVars.APP,))
if reload:
@ -439,9 +439,11 @@ def create_config(app_name: str):
from reflex.compiler import templates
config_name = f"{re.sub(r'[^a-zA-Z]', '', app_name).capitalize()}Config"
with open(constants.Config.FILE, "w") as f:
console.debug(f"Creating {constants.Config.FILE}")
f.write(templates.RXCONFIG.render(app_name=app_name, config_name=config_name))
console.debug(f"Creating {constants.Config.FILE}")
constants.Config.FILE.write_text(
templates.RXCONFIG.render(app_name=app_name, config_name=config_name)
)
def initialize_gitignore(
@ -495,14 +497,14 @@ def initialize_requirements_txt():
console.debug(f"Detected encoding for {fp} as {encoding}.")
try:
other_requirements_exist = False
with open(fp, "r", encoding=encoding) as f:
with fp.open("r", encoding=encoding) as f:
for req in f.readlines():
# Check if we have a package name that is reflex
if re.match(r"^reflex[^a-zA-Z0-9]", req):
console.debug(f"{fp} already has reflex as dependency.")
return
other_requirements_exist = True
with open(fp, "a", encoding=encoding) as f:
with fp.open("a", encoding=encoding) as f:
preceding_newline = "\n" if other_requirements_exist else ""
f.write(
f"{preceding_newline}{constants.RequirementsTxt.DEFAULTS_STUB}{constants.Reflex.VERSION}\n"
@ -733,13 +735,13 @@ def download_and_run(url: str, *args, show_status: bool = False, **env):
response.raise_for_status()
# Save the script to a temporary file.
script = tempfile.NamedTemporaryFile()
with open(script.name, "w") as f:
f.write(response.text)
script = Path(tempfile.NamedTemporaryFile().name)
script.write_text(response.text)
# Run the script.
env = {**os.environ, **env}
process = processes.new_process(["bash", f.name, *args], env=env)
process = processes.new_process(["bash", str(script), *args], env=env)
show = processes.show_status if show_status else processes.show_logs
show(f"Installing {url}", process)
@ -753,14 +755,14 @@ def download_and_extract_fnm_zip():
# Download the zip file
url = constants.Fnm.INSTALL_URL
console.debug(f"Downloading {url}")
fnm_zip_file = constants.Fnm.DIR / f"{constants.Fnm.FILENAME}.zip"
fnm_zip_file: Path = constants.Fnm.DIR / f"{constants.Fnm.FILENAME}.zip"
# Function to download and extract the FNM zip release.
try:
# Download the FNM zip release.
# TODO: show progress to improve UX
response = net.get(url, follow_redirects=True)
response.raise_for_status()
with open(fnm_zip_file, "wb") as output_file:
with fnm_zip_file.open("wb") as output_file:
for chunk in response.iter_bytes():
output_file.write(chunk)
@ -808,7 +810,7 @@ def install_node():
)
else: # All other platforms (Linux, MacOS).
# Add execute permissions to fnm executable.
os.chmod(constants.Fnm.EXE, stat.S_IXUSR)
constants.Fnm.EXE.chmod(stat.S_IXUSR)
# Install node.
# Specify arm64 arch explicitly for M1s and M2s.
architecture_arg = (
@ -1324,7 +1326,7 @@ def create_config_init_app_from_remote_template(app_name: str, template_url: str
raise typer.Exit(1) from ose
# Use httpx GET with redirects to download the zip file.
zip_file_path = Path(temp_dir) / "template.zip"
zip_file_path: Path = Path(temp_dir) / "template.zip"
try:
# Note: following redirects can be risky. We only allow this for reflex built templates at the moment.
response = net.get(template_url, follow_redirects=True)
@ -1334,9 +1336,8 @@ def create_config_init_app_from_remote_template(app_name: str, template_url: str
console.error(f"Failed to download the template: {he}")
raise typer.Exit(1) from he
try:
with open(zip_file_path, "wb") as f:
f.write(response.content)
console.debug(f"Downloaded the zip to {zip_file_path}")
zip_file_path.write_bytes(response.content)
console.debug(f"Downloaded the zip to {zip_file_path}")
except OSError as ose:
console.error(f"Unable to write the downloaded zip to disk {ose}")
raise typer.Exit(1) from ose

View File

@ -24,7 +24,7 @@ from reflex.vars.base import Var
logger = logging.getLogger("pyi_generator")
PWD = Path(".").resolve()
PWD = Path().resolve()
EXCLUDED_FILES = [
"app.py",

View File

@ -2,6 +2,7 @@
from __future__ import annotations
from pathlib import Path
from typing import Generator
import pytest
@ -186,8 +187,7 @@ def CallScript():
self.reset()
app = rx.App(state=rx.State)
with open("assets/external.js", "w") as f:
f.write(external_scripts)
Path("assets/external.js").write_text(external_scripts)
@app.add_page
def index():

View File

@ -206,7 +206,7 @@ class chdir(contextlib.AbstractContextManager):
def __enter__(self):
"""Save current directory and perform chdir."""
self._old_cwd.append(Path(".").resolve())
self._old_cwd.append(Path().resolve())
os.chdir(self.path)
def __exit__(self, *excinfo):

View File

@ -61,14 +61,13 @@ class FileUploadState(State):
"""
for file in files:
upload_data = await file.read()
outfile = f"{self._tmp_path}/{file.filename}"
assert file.filename is not None
outfile = self._tmp_path / file.filename
# Save the file.
with open(outfile, "wb") as file_object:
file_object.write(upload_data)
outfile.write_bytes(upload_data)
# Update the img var.
assert file.filename is not None
self.img_list.append(file.filename)
@rx.event(background=True)
@ -109,14 +108,13 @@ class ChildFileUploadState(FileStateBase1):
"""
for file in files:
upload_data = await file.read()
outfile = f"{self._tmp_path}/{file.filename}"
assert file.filename is not None
outfile = self._tmp_path / file.filename
# Save the file.
with open(outfile, "wb") as file_object:
file_object.write(upload_data)
outfile.write_bytes(upload_data)
# Update the img var.
assert file.filename is not None
self.img_list.append(file.filename)
@rx.event(background=True)
@ -157,14 +155,13 @@ class GrandChildFileUploadState(FileStateBase2):
"""
for file in files:
upload_data = await file.read()
outfile = f"{self._tmp_path}/{file.filename}"
assert file.filename is not None
outfile = self._tmp_path / file.filename
# Save the file.
with open(outfile, "wb") as file_object:
file_object.write(upload_data)
outfile.write_bytes(upload_data)
# Update the img var.
assert file.filename is not None
self.img_list.append(file.filename)
@rx.event(background=True)

View File

@ -224,7 +224,7 @@ def test_serialize(value: Any, expected: str):
),
(Color(color="slate", shade=1), '"var(--slate-1)"', True),
(BaseSubclass, '"BaseSubclass"', True),
(Path("."), '"."', True),
(Path(), '"."', True),
],
)
def test_serialize_var_to_str(value: Any, expected: str, exp_var_is_string: bool):

View File

@ -298,7 +298,7 @@ def tmp_working_dir(tmp_path):
Yields:
subdirectory of tmp_path which is now the current working directory.
"""
old_pwd = Path(".").resolve()
old_pwd = Path().resolve()
working_dir = tmp_path / "working_dir"
working_dir.mkdir()
os.chdir(working_dir)