From 73204a1a16b182d06a35ba6331af237f76a0eada Mon Sep 17 00:00:00 2001 From: Masen Furer Date: Fri, 20 Dec 2024 14:24:24 -0800 Subject: [PATCH] try to localize the event queue and processing flag to the socket --- reflex/.templates/web/utils/state.js | 121 ++++++++++++++------------- 1 file changed, 65 insertions(+), 56 deletions(-) diff --git a/reflex/.templates/web/utils/state.js b/reflex/.templates/web/utils/state.js index 918bf4b1b..7f284613f 100644 --- a/reflex/.templates/web/utils/state.js +++ b/reflex/.templates/web/utils/state.js @@ -35,11 +35,6 @@ const cookies = new Cookies(); // Dictionary holding component references. export const refs = {}; -// Flag ensures that only one event is processing on the backend concurrently. -let event_processing = false; -// Array holding pending events to be processed. -const event_queue = []; - /** * Generate a UUID (Used for session tokens). * Taken from: https://stackoverflow.com/questions/105034/how-do-i-create-a-guid-uuid @@ -108,13 +103,16 @@ export const getBackendURL = (url_str) => { /** * Determine if any event in the event queue is stateful. * + * @param socket The socket object to check for stateful events. * @returns True if there's any event that requires state and False if none of them do. */ -export const isStateful = () => { - if (event_queue.length === 0) { +export const isStateful = (socket) => { + if (socket.event_queue.length === 0) { return false; } - return event_queue.some((event) => event.name.startsWith("reflex___state")); + return socket.event_queue.some((event) => + event.name.startsWith("reflex___state") + ); }; /** @@ -295,10 +293,7 @@ export const applyEvent = async (event, socket) => { // Send the event to the server. if (socket) { - socket.emit( - "event", - event, - ); + socket.emit("event", event); return true; } @@ -339,7 +334,7 @@ export const applyRestEvent = async (event, socket) => { * @param socket The socket object to send the event on. */ export const queueEvents = async (events, socket) => { - event_queue.push(...events); + socket.current.event_queue.push(...events); await processEvent(socket.current); }; @@ -349,20 +344,20 @@ export const queueEvents = async (events, socket) => { */ export const processEvent = async (socket) => { // Only proceed if the socket is up and no event in the queue uses state, otherwise we throw the event into the void - if (!socket && isStateful()) { + if (!socket && isStateful(socket)) { return; } // Only proceed if we're not already processing an event. - if (event_queue.length === 0 || event_processing) { + if (socket.event_queue.length === 0 || socket.event_processing) { return; } // Set processing to true to block other events from being processed. - event_processing = true; + socket.event_processing = true; // Apply the next event in the queue. - const event = event_queue.shift(); + const event = socket.event_queue.shift(); let eventSent = false; // Process events with handlers via REST and all others via websockets. @@ -373,7 +368,7 @@ export const processEvent = async (socket) => { } // If no event was sent, set processing to false. if (!eventSent) { - event_processing = false; + socket.event_processing = false; // recursively call processEvent to drain the queue, since there is // no state update to trigger the useEffect event loop. await processEvent(socket); @@ -409,8 +404,12 @@ export const connect = async ( transports: transports, autoUnref: false, }); + // Array holding pending events to be processed. + socket.current.event_queue = []; + // Flag ensures that only one event is processing on the backend concurrently. + socket.current.event_processing = false; // Ensure undefined fields in events are sent as null instead of removed - socket.current.io.encoder.replacer = (k, v) => (v === undefined ? null : v) + socket.current.io.encoder.replacer = (k, v) => (v === undefined ? null : v); function checkVisibility() { if (document.visibilityState === "visible") { @@ -443,9 +442,7 @@ export const connect = async ( // When the socket disconnects reset the event_processing flag socket.current.on("disconnect", () => { - event_processing = false; - socket.current.io.skipReconnect = true; - socket.current = null; + socket.current.event_processing = false; window.removeEventListener("pagehide", pagehideHandler); document.removeEventListener("visibilitychange", checkVisibility); }); @@ -456,16 +453,15 @@ export const connect = async ( dispatch[substate](update.delta[substate]); } applyClientStorageDelta(client_storage, update.delta); - event_processing = !update.final; + socket.current.event_processing = !update.final; if (update.events) { queueEvents(update.events, socket); } }); socket.current.on("reload", async (event) => { - event_processing = false; + socket.current.event_processing = false; queueEvents([...initialEvents(), event], socket); }); - }; /** @@ -491,7 +487,7 @@ export const uploadFiles = async ( return false; } - const upload_ref_name = `__upload_controllers_${upload_id}` + const upload_ref_name = `__upload_controllers_${upload_id}`; if (refs[upload_ref_name]) { console.log("Upload already in progress for ", upload_id); @@ -755,9 +751,32 @@ export const useEventLoop = ( } }; - const sentHydrate = useRef(false); // Avoid double-hydrate due to React strict-mode useEffect(() => { - if (router.isReady && !sentHydrate.current) { + // Initialize the websocket connection. + if (!socket.current) { + console.log("Creating websocket connection"); + connect( + socket, + dispatch, + ["websocket"], + setConnectErrors, + client_storage + ); + return () => { + if (socket.current) { + console.log("Discarded socket disconnected"); + socket.current.io.skipReconnect = true; + socket.current.disconnect(); + socket.current = null; + } + }; + } + }, []); + + //const sentHydrate = useRef(false); // Avoid double-hydrate due to React strict-mode + useEffect(() => { + console.log("Maybe initial hydrate", router.isReady, !!socket.current); + if (router.isReady && socket.current) { const events = initial_events(); addEvents( events.map((e) => ({ @@ -769,9 +788,9 @@ export const useEventLoop = ( }))(router), })) ); - sentHydrate.current = true; + //sentHydrate.current = true; } - }, [router.isReady]); + }, [router.isReady, socket.current]); // Handle frontend errors and send them to the backend via websocket. useEffect(() => { @@ -803,34 +822,23 @@ export const useEventLoop = ( }, []); // Main event loop. - useEffect(() => { - // Skip if the router is not ready. - if (!router.isReady) { - return; - } - // only use websockets if state is present - if (Object.keys(initialState).length > 1) { - // Initialize the websocket connection. - if (!socket.current) { - connect( - socket, - dispatch, - ["websocket"], - setConnectErrors, - client_storage - ); - } - (async () => { - // Process all outstanding events. - while (event_queue.length > 0 && !event_processing) { - await processEvent(socket.current); - } - })(); - } - }); + // useEffect(() => { + // // Skip if the router is not ready. + // if (!router.isReady) { + // return; + // } + // (async () => { + // // Process all outstanding events. + // console.log("Process outstanding events"); + // while (socket.current.event_queue.length > 0 && !socket.current.event_processing) { + // await processEvent(socket.current); + // } + // })(); + // }); // localStorage event handling useEffect(() => { + console.log("Setting up localStorage event listener"); const storage_to_state_map = {}; if (client_storage.local_storage && typeof window !== "undefined") { @@ -858,10 +866,11 @@ export const useEventLoop = ( window.addEventListener("storage", handleStorage); return () => window.removeEventListener("storage", handleStorage); - }); + }, []); // Route after the initial page hydration. useEffect(() => { + console.log("Setting up route change listeners"); const change_start = () => { const main_state_dispatch = dispatch["reflex___state____state"]; if (main_state_dispatch !== undefined) {