fix: Separate stateupdates and events.
This commit is contained in:
parent
50b9f7b1da
commit
608dcab226
@ -1259,6 +1259,9 @@ async def process(
|
|||||||
constants.RouteVar.CLIENT_IP: client_ip,
|
constants.RouteVar.CLIENT_IP: client_ip,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
print(
|
||||||
|
f"Processing event: {event.name} with payload: {event.payload} {event.substate_token}"
|
||||||
|
)
|
||||||
# Get the state for the session exclusively.
|
# Get the state for the session exclusively.
|
||||||
async with app.state_manager.modify_state(event.substate_token) as state:
|
async with app.state_manager.modify_state(event.substate_token) as state:
|
||||||
# re-assign only when the value is different
|
# re-assign only when the value is different
|
||||||
@ -1484,14 +1487,19 @@ class EventNamespace(AsyncNamespace):
|
|||||||
print(f"Entering room `{room}`")
|
print(f"Entering room `{room}`")
|
||||||
await self.enter_room(sid, room)
|
await self.enter_room(sid, room)
|
||||||
|
|
||||||
# for room in self.rooms(sid):
|
for room in self.rooms(sid):
|
||||||
# if room not in update.scopes and room != sid:
|
if room not in update.scopes and room != sid:
|
||||||
# print(f"Leaving room `{room}`")
|
print(f"Leaving room `{room}`")
|
||||||
# await self.leave_room(sid, room)
|
await self.leave_room(sid, room)
|
||||||
|
|
||||||
# deltas = {delta._scope: {state: delta} for state, delta in update.delta.values.items()}
|
|
||||||
|
|
||||||
delta_by_scope = {}
|
delta_by_scope = {}
|
||||||
|
events_by_scope = {}
|
||||||
|
|
||||||
|
for event in update.events:
|
||||||
|
scope = self.token_to_sid.get(event.token, event.token)
|
||||||
|
events = events_by_scope.get(scope, [])
|
||||||
|
events.append(event)
|
||||||
|
events_by_scope[scope] = events
|
||||||
|
|
||||||
for state, delta in update.delta.items():
|
for state, delta in update.delta.items():
|
||||||
key = delta.get("_scope", sid)
|
key = delta.get("_scope", sid)
|
||||||
@ -1500,8 +1508,10 @@ class EventNamespace(AsyncNamespace):
|
|||||||
delta_by_scope[key] = d
|
delta_by_scope[key] = d
|
||||||
|
|
||||||
for scope, deltas in delta_by_scope.items():
|
for scope, deltas in delta_by_scope.items():
|
||||||
|
events = events_by_scope.get(scope, [])
|
||||||
|
print(f"Sending update to {scope} {events}")
|
||||||
single_update = StateUpdate(
|
single_update = StateUpdate(
|
||||||
delta=deltas, scopes=[scope], events=update.events, final=update.final
|
delta=deltas, scopes=[scope], events=events, final=update.final
|
||||||
)
|
)
|
||||||
|
|
||||||
await asyncio.create_task(
|
await asyncio.create_task(
|
||||||
@ -1510,6 +1520,21 @@ class EventNamespace(AsyncNamespace):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
for key in events_by_scope:
|
||||||
|
if key not in delta_by_scope:
|
||||||
|
single_update = StateUpdate(
|
||||||
|
delta={},
|
||||||
|
scopes=[key],
|
||||||
|
events=events_by_scope.get(key, []),
|
||||||
|
final=update.final,
|
||||||
|
)
|
||||||
|
print(f"Sending event to {key}")
|
||||||
|
await asyncio.create_task(
|
||||||
|
self.emit(
|
||||||
|
str(constants.SocketEvent.EVENT), single_update.json(), to=key
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
update.scopes = []
|
update.scopes = []
|
||||||
|
|
||||||
# Creating a task prevents the update from being blocked behind other coroutines.
|
# Creating a task prevents the update from being blocked behind other coroutines.
|
||||||
@ -1544,6 +1569,8 @@ class EventNamespace(AsyncNamespace):
|
|||||||
except (KeyError, IndexError):
|
except (KeyError, IndexError):
|
||||||
client_ip = environ.get("REMOTE_ADDR", "0.0.0.0")
|
client_ip = environ.get("REMOTE_ADDR", "0.0.0.0")
|
||||||
|
|
||||||
|
print(f"Received event {event.name} {event.token} from {client_ip}")
|
||||||
|
|
||||||
# Process the events.
|
# Process the events.
|
||||||
async for update in process(self.app, event, sid, headers, client_ip):
|
async for update in process(self.app, event, sid, headers, client_ip):
|
||||||
# Emit the update from processing the event.
|
# Emit the update from processing the event.
|
||||||
|
Loading…
Reference in New Issue
Block a user