[ENG-2157] [Refix] Allow rx.download
to resolve rx.get_upload_url
(#4470)
* [ENG-2157] [Refix] Allow `rx.download` to resolve `rx.get_upload_url` Update the special case in a way that works with the new Var system and its unstoppable determination to concatenate strings instead of using template string This has been broken since 0.6.0, so a regression test was added to avoid future inadvertant breakage. Fix #2812 (again) * use safer indirect eval
This commit is contained in:
parent
438b31f270
commit
dcdcbfd833
@ -208,11 +208,16 @@ export const applyEvent = async (event, socket) => {
|
|||||||
if (event.name == "_download") {
|
if (event.name == "_download") {
|
||||||
const a = document.createElement("a");
|
const a = document.createElement("a");
|
||||||
a.hidden = true;
|
a.hidden = true;
|
||||||
|
a.href = event.payload.url;
|
||||||
// Special case when linking to uploaded files
|
// Special case when linking to uploaded files
|
||||||
a.href = event.payload.url.replace(
|
if (a.href.includes("getBackendURL(env.UPLOAD)")) {
|
||||||
"${getBackendURL(env.UPLOAD)}",
|
a.href = eval?.(
|
||||||
getBackendURL(env.UPLOAD)
|
event.payload.url.replace(
|
||||||
);
|
"getBackendURL(env.UPLOAD)",
|
||||||
|
`"${getBackendURL(env.UPLOAD)}"`
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
a.download = event.payload.filename;
|
a.download = event.payload.filename;
|
||||||
a.click();
|
a.click();
|
||||||
a.remove();
|
a.remove();
|
||||||
|
@ -6,12 +6,16 @@ import asyncio
|
|||||||
import time
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Generator
|
from typing import Generator
|
||||||
|
from urllib.parse import urlsplit
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from selenium.webdriver.common.by import By
|
from selenium.webdriver.common.by import By
|
||||||
|
|
||||||
|
from reflex.constants.event import Endpoint
|
||||||
from reflex.testing import AppHarness, WebDriver
|
from reflex.testing import AppHarness, WebDriver
|
||||||
|
|
||||||
|
from .utils import poll_for_navigation
|
||||||
|
|
||||||
|
|
||||||
def UploadFile():
|
def UploadFile():
|
||||||
"""App for testing dynamic routes."""
|
"""App for testing dynamic routes."""
|
||||||
@ -23,7 +27,7 @@ def UploadFile():
|
|||||||
|
|
||||||
class UploadState(rx.State):
|
class UploadState(rx.State):
|
||||||
_file_data: Dict[str, str] = {}
|
_file_data: Dict[str, str] = {}
|
||||||
event_order: List[str] = []
|
event_order: rx.Field[List[str]] = rx.field([])
|
||||||
progress_dicts: List[dict] = []
|
progress_dicts: List[dict] = []
|
||||||
disabled: bool = False
|
disabled: bool = False
|
||||||
large_data: str = ""
|
large_data: str = ""
|
||||||
@ -50,6 +54,15 @@ def UploadFile():
|
|||||||
self.large_data = ""
|
self.large_data = ""
|
||||||
self.event_order.append("chain_event")
|
self.event_order.append("chain_event")
|
||||||
|
|
||||||
|
async def handle_upload_tertiary(self, files: List[rx.UploadFile]):
|
||||||
|
for file in files:
|
||||||
|
(rx.get_upload_dir() / (file.filename or "INVALID")).write_bytes(
|
||||||
|
await file.read()
|
||||||
|
)
|
||||||
|
|
||||||
|
def do_download(self):
|
||||||
|
return rx.download(rx.get_upload_url("test.txt"))
|
||||||
|
|
||||||
def index():
|
def index():
|
||||||
return rx.vstack(
|
return rx.vstack(
|
||||||
rx.input(
|
rx.input(
|
||||||
@ -123,6 +136,34 @@ def UploadFile():
|
|||||||
on_click=rx.cancel_upload("secondary"),
|
on_click=rx.cancel_upload("secondary"),
|
||||||
id="cancel_button_secondary",
|
id="cancel_button_secondary",
|
||||||
),
|
),
|
||||||
|
rx.heading("Tertiary Upload/Download"),
|
||||||
|
rx.upload.root(
|
||||||
|
rx.vstack(
|
||||||
|
rx.button("Select File"),
|
||||||
|
rx.text("Drag and drop files here or click to select files"),
|
||||||
|
),
|
||||||
|
id="tertiary",
|
||||||
|
),
|
||||||
|
rx.button(
|
||||||
|
"Upload",
|
||||||
|
on_click=UploadState.handle_upload_tertiary( # type: ignore
|
||||||
|
rx.upload_files(
|
||||||
|
upload_id="tertiary",
|
||||||
|
),
|
||||||
|
),
|
||||||
|
id="upload_button_tertiary",
|
||||||
|
),
|
||||||
|
rx.button(
|
||||||
|
"Download - Frontend",
|
||||||
|
on_click=rx.download(rx.get_upload_url("test.txt")),
|
||||||
|
id="download-frontend",
|
||||||
|
),
|
||||||
|
rx.button(
|
||||||
|
"Download - Backend",
|
||||||
|
on_click=UploadState.do_download,
|
||||||
|
id="download-backend",
|
||||||
|
),
|
||||||
|
rx.text(UploadState.event_order.to_string(), id="event-order"),
|
||||||
)
|
)
|
||||||
|
|
||||||
app = rx.App(state=rx.State)
|
app = rx.App(state=rx.State)
|
||||||
@ -164,6 +205,24 @@ def driver(upload_file: AppHarness):
|
|||||||
driver.quit()
|
driver.quit()
|
||||||
|
|
||||||
|
|
||||||
|
def poll_for_token(driver: WebDriver, upload_file: AppHarness) -> str:
|
||||||
|
"""Poll for the token input to be populated.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
driver: WebDriver instance.
|
||||||
|
upload_file: harness for UploadFile app.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
token value
|
||||||
|
"""
|
||||||
|
token_input = driver.find_element(By.ID, "token")
|
||||||
|
assert token_input
|
||||||
|
# wait for the backend connection to send the token
|
||||||
|
token = upload_file.poll_for_value(token_input)
|
||||||
|
assert token is not None
|
||||||
|
return token
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("secondary", [False, True])
|
@pytest.mark.parametrize("secondary", [False, True])
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_upload_file(
|
async def test_upload_file(
|
||||||
@ -178,11 +237,7 @@ async def test_upload_file(
|
|||||||
secondary: whether to use the secondary upload form
|
secondary: whether to use the secondary upload form
|
||||||
"""
|
"""
|
||||||
assert upload_file.app_instance is not None
|
assert upload_file.app_instance is not None
|
||||||
token_input = driver.find_element(By.ID, "token")
|
token = poll_for_token(driver, upload_file)
|
||||||
assert token_input
|
|
||||||
# wait for the backend connection to send the token
|
|
||||||
token = upload_file.poll_for_value(token_input)
|
|
||||||
assert token is not None
|
|
||||||
full_state_name = upload_file.get_full_state_name(["_upload_state"])
|
full_state_name = upload_file.get_full_state_name(["_upload_state"])
|
||||||
state_name = upload_file.get_state_name("_upload_state")
|
state_name = upload_file.get_state_name("_upload_state")
|
||||||
substate_token = f"{token}_{full_state_name}"
|
substate_token = f"{token}_{full_state_name}"
|
||||||
@ -204,6 +259,19 @@ async def test_upload_file(
|
|||||||
upload_box.send_keys(str(target_file))
|
upload_box.send_keys(str(target_file))
|
||||||
upload_button.click()
|
upload_button.click()
|
||||||
|
|
||||||
|
# check that the selected files are displayed
|
||||||
|
selected_files = driver.find_element(By.ID, f"selected_files{suffix}")
|
||||||
|
assert Path(selected_files.text).name == Path(exp_name).name
|
||||||
|
|
||||||
|
if secondary:
|
||||||
|
event_order_displayed = driver.find_element(By.ID, "event-order")
|
||||||
|
AppHarness._poll_for(lambda: "chain_event" in event_order_displayed.text)
|
||||||
|
|
||||||
|
state = await upload_file.get_state(substate_token)
|
||||||
|
# only the secondary form tracks progress and chain events
|
||||||
|
assert state.substates[state_name].event_order.count("upload_progress") == 1
|
||||||
|
assert state.substates[state_name].event_order.count("chain_event") == 1
|
||||||
|
|
||||||
# look up the backend state and assert on uploaded contents
|
# look up the backend state and assert on uploaded contents
|
||||||
async def get_file_data():
|
async def get_file_data():
|
||||||
return (
|
return (
|
||||||
@ -217,16 +285,6 @@ async def test_upload_file(
|
|||||||
normalized_file_data = {Path(k).name: v for k, v in file_data.items()}
|
normalized_file_data = {Path(k).name: v for k, v in file_data.items()}
|
||||||
assert normalized_file_data[Path(exp_name).name] == exp_contents
|
assert normalized_file_data[Path(exp_name).name] == exp_contents
|
||||||
|
|
||||||
# check that the selected files are displayed
|
|
||||||
selected_files = driver.find_element(By.ID, f"selected_files{suffix}")
|
|
||||||
assert Path(selected_files.text).name == Path(exp_name).name
|
|
||||||
|
|
||||||
state = await upload_file.get_state(substate_token)
|
|
||||||
if secondary:
|
|
||||||
# only the secondary form tracks progress and chain events
|
|
||||||
assert state.substates[state_name].event_order.count("upload_progress") == 1
|
|
||||||
assert state.substates[state_name].event_order.count("chain_event") == 1
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_upload_file_multiple(tmp_path, upload_file: AppHarness, driver):
|
async def test_upload_file_multiple(tmp_path, upload_file: AppHarness, driver):
|
||||||
@ -238,11 +296,7 @@ async def test_upload_file_multiple(tmp_path, upload_file: AppHarness, driver):
|
|||||||
driver: WebDriver instance.
|
driver: WebDriver instance.
|
||||||
"""
|
"""
|
||||||
assert upload_file.app_instance is not None
|
assert upload_file.app_instance is not None
|
||||||
token_input = driver.find_element(By.ID, "token")
|
token = poll_for_token(driver, upload_file)
|
||||||
assert token_input
|
|
||||||
# wait for the backend connection to send the token
|
|
||||||
token = upload_file.poll_for_value(token_input)
|
|
||||||
assert token is not None
|
|
||||||
full_state_name = upload_file.get_full_state_name(["_upload_state"])
|
full_state_name = upload_file.get_full_state_name(["_upload_state"])
|
||||||
state_name = upload_file.get_state_name("_upload_state")
|
state_name = upload_file.get_state_name("_upload_state")
|
||||||
substate_token = f"{token}_{full_state_name}"
|
substate_token = f"{token}_{full_state_name}"
|
||||||
@ -301,11 +355,7 @@ def test_clear_files(
|
|||||||
secondary: whether to use the secondary upload form.
|
secondary: whether to use the secondary upload form.
|
||||||
"""
|
"""
|
||||||
assert upload_file.app_instance is not None
|
assert upload_file.app_instance is not None
|
||||||
token_input = driver.find_element(By.ID, "token")
|
poll_for_token(driver, upload_file)
|
||||||
assert token_input
|
|
||||||
# wait for the backend connection to send the token
|
|
||||||
token = upload_file.poll_for_value(token_input)
|
|
||||||
assert token is not None
|
|
||||||
|
|
||||||
suffix = "_secondary" if secondary else ""
|
suffix = "_secondary" if secondary else ""
|
||||||
|
|
||||||
@ -357,11 +407,7 @@ async def test_cancel_upload(tmp_path, upload_file: AppHarness, driver: WebDrive
|
|||||||
driver: WebDriver instance.
|
driver: WebDriver instance.
|
||||||
"""
|
"""
|
||||||
assert upload_file.app_instance is not None
|
assert upload_file.app_instance is not None
|
||||||
token_input = driver.find_element(By.ID, "token")
|
token = poll_for_token(driver, upload_file)
|
||||||
assert token_input
|
|
||||||
# wait for the backend connection to send the token
|
|
||||||
token = upload_file.poll_for_value(token_input)
|
|
||||||
assert token is not None
|
|
||||||
state_name = upload_file.get_state_name("_upload_state")
|
state_name = upload_file.get_state_name("_upload_state")
|
||||||
state_full_name = upload_file.get_full_state_name(["_upload_state"])
|
state_full_name = upload_file.get_full_state_name(["_upload_state"])
|
||||||
substate_token = f"{token}_{state_full_name}"
|
substate_token = f"{token}_{state_full_name}"
|
||||||
@ -403,3 +449,55 @@ async def test_cancel_upload(tmp_path, upload_file: AppHarness, driver: WebDrive
|
|||||||
assert Path(exp_name).name not in normalized_file_data
|
assert Path(exp_name).name not in normalized_file_data
|
||||||
|
|
||||||
target_file.unlink()
|
target_file.unlink()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_upload_download_file(
|
||||||
|
tmp_path,
|
||||||
|
upload_file: AppHarness,
|
||||||
|
driver: WebDriver,
|
||||||
|
):
|
||||||
|
"""Submit a file upload and then fetch it with rx.download.
|
||||||
|
|
||||||
|
This checks the special case `getBackendURL` logic in the _download event
|
||||||
|
handler in state.js.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
tmp_path: pytest tmp_path fixture
|
||||||
|
upload_file: harness for UploadFile app.
|
||||||
|
driver: WebDriver instance.
|
||||||
|
"""
|
||||||
|
assert upload_file.app_instance is not None
|
||||||
|
poll_for_token(driver, upload_file)
|
||||||
|
|
||||||
|
upload_box = driver.find_elements(By.XPATH, "//input[@type='file']")[2]
|
||||||
|
assert upload_box
|
||||||
|
upload_button = driver.find_element(By.ID, "upload_button_tertiary")
|
||||||
|
assert upload_button
|
||||||
|
|
||||||
|
exp_name = "test.txt"
|
||||||
|
exp_contents = "test file contents!"
|
||||||
|
target_file = tmp_path / exp_name
|
||||||
|
target_file.write_text(exp_contents)
|
||||||
|
|
||||||
|
upload_box.send_keys(str(target_file))
|
||||||
|
upload_button.click()
|
||||||
|
|
||||||
|
# Download via event embedded in frontend code.
|
||||||
|
download_frontend = driver.find_element(By.ID, "download-frontend")
|
||||||
|
with poll_for_navigation(driver):
|
||||||
|
download_frontend.click()
|
||||||
|
assert urlsplit(driver.current_url).path == f"/{Endpoint.UPLOAD.value}/test.txt"
|
||||||
|
assert driver.find_element(by=By.TAG_NAME, value="body").text == exp_contents
|
||||||
|
|
||||||
|
# Go back and wait for the app to reload.
|
||||||
|
with poll_for_navigation(driver):
|
||||||
|
driver.back()
|
||||||
|
poll_for_token(driver, upload_file)
|
||||||
|
|
||||||
|
# Download via backend event handler.
|
||||||
|
download_backend = driver.find_element(By.ID, "download-backend")
|
||||||
|
with poll_for_navigation(driver):
|
||||||
|
download_backend.click()
|
||||||
|
assert urlsplit(driver.current_url).path == f"/{Endpoint.UPLOAD.value}/test.txt"
|
||||||
|
assert driver.find_element(by=By.TAG_NAME, value="body").text == exp_contents
|
||||||
|
Loading…
Reference in New Issue
Block a user