From 973e1141de71e12187f507286ee481d574a7c480 Mon Sep 17 00:00:00 2001 From: Masen Furer Date: Fri, 20 Dec 2024 10:20:06 -0800 Subject: [PATCH] Disconnect old websockets and avoid duplicating ws during hot reload --- reflex/.templates/web/utils/state.js | 10 +++++++++- reflex/config.py | 5 +++++ reflex/proxy.py | 24 ++++++++++++++++++------ reflex/utils/console.py | 7 ++++++- 4 files changed, 38 insertions(+), 8 deletions(-) diff --git a/reflex/.templates/web/utils/state.js b/reflex/.templates/web/utils/state.js index 608df084a..918bf4b1b 100644 --- a/reflex/.templates/web/utils/state.js +++ b/reflex/.templates/web/utils/state.js @@ -398,6 +398,11 @@ export const connect = async ( // Get backend URL object from the endpoint. const endpoint = getBackendURL(EVENTURL); + // Disconnect old socket + if (socket.current && socket.current.connected) { + socket.current.disconnect(); + } + // Create the socket. socket.current = io(endpoint.href, { path: endpoint["pathname"], @@ -429,6 +434,7 @@ export const connect = async ( socket.current.on("connect", () => { setConnectErrors([]); window.addEventListener("pagehide", pagehideHandler); + document.addEventListener("visibilitychange", checkVisibility); }); socket.current.on("connect_error", (error) => { @@ -438,7 +444,10 @@ export const connect = async ( // When the socket disconnects reset the event_processing flag socket.current.on("disconnect", () => { event_processing = false; + socket.current.io.skipReconnect = true; + socket.current = null; window.removeEventListener("pagehide", pagehideHandler); + document.removeEventListener("visibilitychange", checkVisibility); }); // On each received message, queue the updates and events. @@ -457,7 +466,6 @@ export const connect = async ( queueEvents([...initialEvents(), event], socket); }); - document.addEventListener("visibilitychange", checkVisibility); }; /** diff --git a/reflex/config.py b/reflex/config.py index 0579b019f..87d9ce665 100644 --- a/reflex/config.py +++ b/reflex/config.py @@ -26,6 +26,7 @@ from typing import ( from typing_extensions import Annotated, get_type_hints +from reflex.utils.console import set_log_level from reflex.utils.exceptions import ConfigError, EnvironmentVarValueError from reflex.utils.types import GenericType, is_union, value_inside_optional @@ -599,6 +600,7 @@ class Config(Base): class Config: """Pydantic config for the config.""" + use_enum_values = False validate_assignment = True # The name of the app (should match the name of the app directory). @@ -718,6 +720,9 @@ class Config(Base): self._non_default_attributes.update(kwargs) self._replace_defaults(**kwargs) + # Set the log level for this process + set_log_level(self.loglevel) + if ( self.state_manager_mode == constants.StateManagerMode.REDIS and not self.redis_url diff --git a/reflex/proxy.py b/reflex/proxy.py index e0124a0fb..a0da1905b 100644 --- a/reflex/proxy.py +++ b/reflex/proxy.py @@ -4,7 +4,7 @@ from __future__ import annotations import asyncio from contextlib import asynccontextmanager -from typing import AsyncGenerator +from typing import Any, AsyncGenerator from urllib.parse import urlparse import aiohttp @@ -34,6 +34,7 @@ except ImportError: """ yield else: + MAX_PROXY_RETRY = 25 async def proxy_http_with_retry( *, @@ -41,25 +42,36 @@ else: scope: Scope, receive: Receive, send: Send, - ) -> None: + ) -> Any: """Proxy an HTTP request with retries. Args: context: The proxy context. - scope: The ASGI scope. + scope: The request scope. receive: The receive channel. send: The send channel. + + Returns: + The response from `proxy_http`. """ - for _attempt in range(100): + for _attempt in range(MAX_PROXY_RETRY): try: return await proxy_http( - context=context, scope=scope, receive=receive, send=send + context=context, + scope=scope, + receive=receive, + send=send, ) - except aiohttp.client_exceptions.ClientError as err: # noqa: PERF203 + except aiohttp.ClientError as err: # noqa: PERF203 console.debug( f"Retrying request {scope['path']} due to client error {err!r}." ) await asyncio.sleep(0.3) + except Exception as ex: + console.debug( + f"Retrying request {scope['path']} due to unhandled exception {ex!r}." + ) + await asyncio.sleep(0.3) def _get_proxy_app_with_context(frontend_host: str) -> tuple[ProxyContext, ASGIApp]: """Get the proxy app with the given frontend host. diff --git a/reflex/utils/console.py b/reflex/utils/console.py index be545140a..1c08a04b6 100644 --- a/reflex/utils/console.py +++ b/reflex/utils/console.py @@ -2,6 +2,8 @@ from __future__ import annotations +import os + from rich.console import Console from rich.progress import MofNCompleteColumn, Progress, TimeElapsedColumn from rich.prompt import Prompt @@ -12,7 +14,7 @@ from reflex.constants import LogLevel _console = Console() # The current log level. -_LOG_LEVEL = LogLevel.INFO +_LOG_LEVEL = LogLevel.DEFAULT # Deprecated features who's warning has been printed. _EMITTED_DEPRECATION_WARNINGS = set() @@ -61,6 +63,9 @@ def set_log_level(log_level: LogLevel): raise ValueError(f"Invalid log level: {log_level}") from ae global _LOG_LEVEL + if log_level != _LOG_LEVEL: + # Set the loglevel persistently for subprocesses + os.environ["LOGLEVEL"] = log_level.value _LOG_LEVEL = log_level