From 608dcab2265477589cce0b6d149f1bece05dd2b0 Mon Sep 17 00:00:00 2001 From: Andreas Eismann Date: Wed, 17 Jul 2024 22:40:55 +0200 Subject: [PATCH] fix: Separate stateupdates and events. --- reflex/app.py | 41 ++++++++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/reflex/app.py b/reflex/app.py index 7e1f9da1c..001c95885 100644 --- a/reflex/app.py +++ b/reflex/app.py @@ -1259,6 +1259,9 @@ async def process( constants.RouteVar.CLIENT_IP: client_ip, } ) + print( + f"Processing event: {event.name} with payload: {event.payload} {event.substate_token}" + ) # Get the state for the session exclusively. async with app.state_manager.modify_state(event.substate_token) as state: # re-assign only when the value is different @@ -1484,14 +1487,19 @@ class EventNamespace(AsyncNamespace): print(f"Entering room `{room}`") await self.enter_room(sid, room) - # for room in self.rooms(sid): - # if room not in update.scopes and room != sid: - # print(f"Leaving room `{room}`") - # await self.leave_room(sid, room) - - # deltas = {delta._scope: {state: delta} for state, delta in update.delta.values.items()} + for room in self.rooms(sid): + if room not in update.scopes and room != sid: + print(f"Leaving room `{room}`") + await self.leave_room(sid, room) delta_by_scope = {} + events_by_scope = {} + + for event in update.events: + scope = self.token_to_sid.get(event.token, event.token) + events = events_by_scope.get(scope, []) + events.append(event) + events_by_scope[scope] = events for state, delta in update.delta.items(): key = delta.get("_scope", sid) @@ -1500,8 +1508,10 @@ class EventNamespace(AsyncNamespace): delta_by_scope[key] = d for scope, deltas in delta_by_scope.items(): + events = events_by_scope.get(scope, []) + print(f"Sending update to {scope} {events}") single_update = StateUpdate( - delta=deltas, scopes=[scope], events=update.events, final=update.final + delta=deltas, scopes=[scope], events=events, final=update.final ) await asyncio.create_task( @@ -1510,6 +1520,21 @@ class EventNamespace(AsyncNamespace): ) ) + for key in events_by_scope: + if key not in delta_by_scope: + single_update = StateUpdate( + delta={}, + scopes=[key], + events=events_by_scope.get(key, []), + final=update.final, + ) + print(f"Sending event to {key}") + await asyncio.create_task( + self.emit( + str(constants.SocketEvent.EVENT), single_update.json(), to=key + ) + ) + update.scopes = [] # Creating a task prevents the update from being blocked behind other coroutines. @@ -1544,6 +1569,8 @@ class EventNamespace(AsyncNamespace): except (KeyError, IndexError): client_ip = environ.get("REMOTE_ADDR", "0.0.0.0") + print(f"Received event {event.name} {event.token} from {client_ip}") + # Process the events. async for update in process(self.app, event, sid, headers, client_ip): # Emit the update from processing the event.