try to localize the event queue and processing flag to the socket

This commit is contained in:
Masen Furer 2024-12-20 14:24:24 -08:00
parent 973e1141de
commit 73204a1a16
No known key found for this signature in database
GPG Key ID: 2AE2BD5531FF94F4

View File

@ -35,11 +35,6 @@ const cookies = new Cookies();
// Dictionary holding component references. // Dictionary holding component references.
export const refs = {}; export const refs = {};
// Flag ensures that only one event is processing on the backend concurrently.
let event_processing = false;
// Array holding pending events to be processed.
const event_queue = [];
/** /**
* Generate a UUID (Used for session tokens). * Generate a UUID (Used for session tokens).
* Taken from: https://stackoverflow.com/questions/105034/how-do-i-create-a-guid-uuid * Taken from: https://stackoverflow.com/questions/105034/how-do-i-create-a-guid-uuid
@ -108,13 +103,16 @@ export const getBackendURL = (url_str) => {
/** /**
* Determine if any event in the event queue is stateful. * Determine if any event in the event queue is stateful.
* *
* @param socket The socket object to check for stateful events.
* @returns True if there's any event that requires state and False if none of them do. * @returns True if there's any event that requires state and False if none of them do.
*/ */
export const isStateful = () => { export const isStateful = (socket) => {
if (event_queue.length === 0) { if (socket.event_queue.length === 0) {
return false; return false;
} }
return event_queue.some((event) => event.name.startsWith("reflex___state")); return socket.event_queue.some((event) =>
event.name.startsWith("reflex___state")
);
}; };
/** /**
@ -295,10 +293,7 @@ export const applyEvent = async (event, socket) => {
// Send the event to the server. // Send the event to the server.
if (socket) { if (socket) {
socket.emit( socket.emit("event", event);
"event",
event,
);
return true; return true;
} }
@ -339,7 +334,7 @@ export const applyRestEvent = async (event, socket) => {
* @param socket The socket object to send the event on. * @param socket The socket object to send the event on.
*/ */
export const queueEvents = async (events, socket) => { export const queueEvents = async (events, socket) => {
event_queue.push(...events); socket.current.event_queue.push(...events);
await processEvent(socket.current); await processEvent(socket.current);
}; };
@ -349,20 +344,20 @@ export const queueEvents = async (events, socket) => {
*/ */
export const processEvent = async (socket) => { export const processEvent = async (socket) => {
// Only proceed if the socket is up and no event in the queue uses state, otherwise we throw the event into the void // Only proceed if the socket is up and no event in the queue uses state, otherwise we throw the event into the void
if (!socket && isStateful()) { if (!socket && isStateful(socket)) {
return; return;
} }
// Only proceed if we're not already processing an event. // Only proceed if we're not already processing an event.
if (event_queue.length === 0 || event_processing) { if (socket.event_queue.length === 0 || socket.event_processing) {
return; return;
} }
// Set processing to true to block other events from being processed. // Set processing to true to block other events from being processed.
event_processing = true; socket.event_processing = true;
// Apply the next event in the queue. // Apply the next event in the queue.
const event = event_queue.shift(); const event = socket.event_queue.shift();
let eventSent = false; let eventSent = false;
// Process events with handlers via REST and all others via websockets. // Process events with handlers via REST and all others via websockets.
@ -373,7 +368,7 @@ export const processEvent = async (socket) => {
} }
// If no event was sent, set processing to false. // If no event was sent, set processing to false.
if (!eventSent) { if (!eventSent) {
event_processing = false; socket.event_processing = false;
// recursively call processEvent to drain the queue, since there is // recursively call processEvent to drain the queue, since there is
// no state update to trigger the useEffect event loop. // no state update to trigger the useEffect event loop.
await processEvent(socket); await processEvent(socket);
@ -409,8 +404,12 @@ export const connect = async (
transports: transports, transports: transports,
autoUnref: false, autoUnref: false,
}); });
// Array holding pending events to be processed.
socket.current.event_queue = [];
// Flag ensures that only one event is processing on the backend concurrently.
socket.current.event_processing = false;
// Ensure undefined fields in events are sent as null instead of removed // Ensure undefined fields in events are sent as null instead of removed
socket.current.io.encoder.replacer = (k, v) => (v === undefined ? null : v) socket.current.io.encoder.replacer = (k, v) => (v === undefined ? null : v);
function checkVisibility() { function checkVisibility() {
if (document.visibilityState === "visible") { if (document.visibilityState === "visible") {
@ -443,9 +442,7 @@ export const connect = async (
// When the socket disconnects reset the event_processing flag // When the socket disconnects reset the event_processing flag
socket.current.on("disconnect", () => { socket.current.on("disconnect", () => {
event_processing = false; socket.current.event_processing = false;
socket.current.io.skipReconnect = true;
socket.current = null;
window.removeEventListener("pagehide", pagehideHandler); window.removeEventListener("pagehide", pagehideHandler);
document.removeEventListener("visibilitychange", checkVisibility); document.removeEventListener("visibilitychange", checkVisibility);
}); });
@ -456,16 +453,15 @@ export const connect = async (
dispatch[substate](update.delta[substate]); dispatch[substate](update.delta[substate]);
} }
applyClientStorageDelta(client_storage, update.delta); applyClientStorageDelta(client_storage, update.delta);
event_processing = !update.final; socket.current.event_processing = !update.final;
if (update.events) { if (update.events) {
queueEvents(update.events, socket); queueEvents(update.events, socket);
} }
}); });
socket.current.on("reload", async (event) => { socket.current.on("reload", async (event) => {
event_processing = false; socket.current.event_processing = false;
queueEvents([...initialEvents(), event], socket); queueEvents([...initialEvents(), event], socket);
}); });
}; };
/** /**
@ -491,7 +487,7 @@ export const uploadFiles = async (
return false; return false;
} }
const upload_ref_name = `__upload_controllers_${upload_id}` const upload_ref_name = `__upload_controllers_${upload_id}`;
if (refs[upload_ref_name]) { if (refs[upload_ref_name]) {
console.log("Upload already in progress for ", upload_id); console.log("Upload already in progress for ", upload_id);
@ -755,9 +751,32 @@ export const useEventLoop = (
} }
}; };
const sentHydrate = useRef(false); // Avoid double-hydrate due to React strict-mode
useEffect(() => { useEffect(() => {
if (router.isReady && !sentHydrate.current) { // Initialize the websocket connection.
if (!socket.current) {
console.log("Creating websocket connection");
connect(
socket,
dispatch,
["websocket"],
setConnectErrors,
client_storage
);
return () => {
if (socket.current) {
console.log("Discarded socket disconnected");
socket.current.io.skipReconnect = true;
socket.current.disconnect();
socket.current = null;
}
};
}
}, []);
//const sentHydrate = useRef(false); // Avoid double-hydrate due to React strict-mode
useEffect(() => {
console.log("Maybe initial hydrate", router.isReady, !!socket.current);
if (router.isReady && socket.current) {
const events = initial_events(); const events = initial_events();
addEvents( addEvents(
events.map((e) => ({ events.map((e) => ({
@ -769,9 +788,9 @@ export const useEventLoop = (
}))(router), }))(router),
})) }))
); );
sentHydrate.current = true; //sentHydrate.current = true;
} }
}, [router.isReady]); }, [router.isReady, socket.current]);
// Handle frontend errors and send them to the backend via websocket. // Handle frontend errors and send them to the backend via websocket.
useEffect(() => { useEffect(() => {
@ -803,34 +822,23 @@ export const useEventLoop = (
}, []); }, []);
// Main event loop. // Main event loop.
useEffect(() => { // useEffect(() => {
// Skip if the router is not ready. // // Skip if the router is not ready.
if (!router.isReady) { // if (!router.isReady) {
return; // return;
} // }
// only use websockets if state is present // (async () => {
if (Object.keys(initialState).length > 1) { // // Process all outstanding events.
// Initialize the websocket connection. // console.log("Process outstanding events");
if (!socket.current) { // while (socket.current.event_queue.length > 0 && !socket.current.event_processing) {
connect( // await processEvent(socket.current);
socket, // }
dispatch, // })();
["websocket"], // });
setConnectErrors,
client_storage
);
}
(async () => {
// Process all outstanding events.
while (event_queue.length > 0 && !event_processing) {
await processEvent(socket.current);
}
})();
}
});
// localStorage event handling // localStorage event handling
useEffect(() => { useEffect(() => {
console.log("Setting up localStorage event listener");
const storage_to_state_map = {}; const storage_to_state_map = {};
if (client_storage.local_storage && typeof window !== "undefined") { if (client_storage.local_storage && typeof window !== "undefined") {
@ -858,10 +866,11 @@ export const useEventLoop = (
window.addEventListener("storage", handleStorage); window.addEventListener("storage", handleStorage);
return () => window.removeEventListener("storage", handleStorage); return () => window.removeEventListener("storage", handleStorage);
}); }, []);
// Route after the initial page hydration. // Route after the initial page hydration.
useEffect(() => { useEffect(() => {
console.log("Setting up route change listeners");
const change_start = () => { const change_start = () => {
const main_state_dispatch = dispatch["reflex___state____state"]; const main_state_dispatch = dispatch["reflex___state____state"];
if (main_state_dispatch !== undefined) { if (main_state_dispatch !== undefined) {