Fix processing flag for generator events ()

This commit is contained in:
Nikhil Rao 2023-06-09 10:50:03 -07:00 committed by GitHub
parent a846953926
commit fedecfdf44
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 89 additions and 59 deletions
pynecone
tests

View File

@ -4,7 +4,6 @@
{% for custom_code in custom_codes %}
{{custom_code}}
{% endfor %}
{% endblock %}
{% block export %}
@ -18,46 +17,67 @@ export default function Component() {
const { {{const.color_mode}}, {{const.toggle_color_mode}} } = {{const.use_color_mode}}()
const focusRef = useRef();
// Function to add new events to the event queue.
const Event = (events, _e) => {
preventDefault(_e);
{{state_name|react_setter}}({
...{{state_name}},
events: [...{{state_name}}.events, ...events],
})
{{state_name|react_setter}}(state => ({
...state,
events: [...state.events, ...events],
}))
}
const File = files => {{state_name|react_setter}}({
...{{state_name}},
// Function to add new files to be uploaded.
const File = files => {{state_name|react_setter}}(state => ({
...state,
files,
})
}))
useEffect(()=>{
if(!isReady) {
// Main event loop.
useEffect(()=> {
// Skip if the router is not ready.
if (!isReady) {
return;
}
// Initialize the websocket connection.
if (!{{const.socket}}.current) {
connect({{const.socket}}, {{state_name}}, {{state_name|react_setter}}, {{const.result}}, {{const.result|react_setter}}, {{const.router}}, {{transports}}, setNotConnected)
}
const update = async () => {
if ({{const.result}}.{{const.state}} != null){
{{state_name|react_setter}}({
...{{const.result}}.{{const.state}},
events: [...{{state_name}}.{{const.events}}, ...{{const.result}}.{{const.events}}],
})
{{const.result|react_setter}}({
{{const.state}}: null,
{{const.events}}: [],
{{const.processing}}: {{const.result}}.{{const.processing}},
})
}
await updateState({{state_name}}, {{state_name|react_setter}}, {{const.result}}, {{const.result|react_setter}}, {{const.router}}, {{const.socket}}.current)
// If we are not processing an event, process the next event.
if (!{{const.result}}.{{const.processing}}) {
processEvent({{state_name}}, {{state_name|react_setter}}, {{const.result}}, {{const.result|react_setter}}, {{const.router}}, {{const.socket}}.current)
}
// If there is a new result, update the state.
if ({{const.result}}.{{const.state}} != null) {
// Apply the new result to the state and the new events to the queue.
{{state_name|react_setter}}(state => ({
...{{const.result}}.{{const.state}},
events: [...state.{{const.events}}, ...{{const.result}}.{{const.events}}],
}))
// Reset the result.
{{const.result|react_setter}}(result => ({
{{const.state}}: null,
{{const.events}}: [],
{{const.final}}: true,
{{const.processing}}: !{{const.result}}.{{const.final}},
}))
// Process the next event.
processEvent({{state_name}}, {{state_name|react_setter}}, {{const.result}}, {{const.result|react_setter}}, {{const.router}}, {{const.socket}}.current)
}
if (focusRef.current)
focusRef.current.focus();
update()
})
// Set focus to the specified element.
useEffect(() => {
if (focusRef.current) {
focusRef.current.focus();
}
})
// Route after the initial page hydration.
useEffect(() => {
const change_complete = () => Event([E('{{state_name}}.{{const.hydrate}}', {})])
{{const.router}}.events.on('routeChangeComplete', change_complete)

View File

@ -129,16 +129,15 @@ export const applyEvent = async (event, router, socket) => {
* @param event The current event
* @param state The state with the event queue.
* @param setResult The function to set the result.
*
* @returns Whether the event was sent.
*/
export const applyRestEvent = async (event, state, setResult) => {
let eventSent = false;
if (event.handler == "uploadFiles") {
eventSent = await uploadFiles(state, setResult, event.name);
}
if (!eventSent) {
// If no event was sent, set processing to false and return.
setResult({ ...state, processing: false });
}
return eventSent;
};
/**
@ -150,7 +149,7 @@ export const applyRestEvent = async (event, state, setResult) => {
* @param router The router object.
* @param socket The socket object to send the event on.
*/
export const updateState = async (
export const processEvent = async (
state,
setState,
result,
@ -166,20 +165,23 @@ export const updateState = async (
// Set processing to true to block other events from being processed.
setResult({ ...result, processing: true });
// Pop the next event off the queue and apply it.
const event = state.events.shift();
// Apply the next event in the queue.
const event = state.events[0];
// Set new events to avoid reprocessing the same event.
setState({ ...state, events: state.events });
setState(state => ({ ...state, events: state.events.slice(1) }));
// Process events with handlers via REST and all others via websockets.
let eventSent = false;
if (event.handler) {
await applyRestEvent(event, state, setResult);
eventSent = await applyRestEvent(event, state, setResult);
} else {
const eventSent = await applyEvent(event, router, socket);
if (!eventSent) {
// If no event was sent, set processing to false and return.
setResult({ ...state, processing: false });
}
eventSent = await applyEvent(event, router, socket);
}
// If no event was sent, set processing to false.
if (!eventSent) {
setResult({ ...state, final: true, processing: false });
}
};
@ -214,7 +216,7 @@ export const connect = async (
// Once the socket is open, hydrate the page.
socket.current.on("connect", () => {
updateState(state, setState, result, setResult, router, socket.current);
processEvent(state, setState, result, setResult, router, socket.current);
setNotConnected(false)
});
@ -223,13 +225,14 @@ export const connect = async (
});
// On each received message, apply the delta and set the result.
socket.current.on("event", function (update) {
socket.current.on("event", update => {
update = JSON5.parse(update);
applyDelta(state, update.delta);
setResult({
processing: update.processing,
state: state,
events: update.events,
final: update.final,
processing: true,
});
});
};
@ -241,6 +244,8 @@ export const connect = async (
* @param setResult The function to set the result.
* @param handler The handler to use.
* @param endpoint The endpoint to upload to.
*
* @returns Whether the files were uploaded.
*/
export const uploadFiles = async (state, setResult, handler) => {
const files = state.files;
@ -272,9 +277,10 @@ export const uploadFiles = async (state, setResult, handler) => {
// Set processing to false and return.
setResult({
processing: false,
state: state,
events: update.events,
final: true,
processing: false,
});
});

View File

@ -23,7 +23,7 @@ DEFAULT_IMPORTS: imports.ImportDict = {
"next/router": {ImportVar(tag="useRouter")},
f"/{constants.STATE_PATH}": {
ImportVar(tag="connect"),
ImportVar(tag="updateState"),
ImportVar(tag="processEvent"),
ImportVar(tag="uploadFiles"),
ImportVar(tag="E"),
ImportVar(tag="isTrue"),

View File

@ -28,10 +28,12 @@ class PyneconeJinjaEnvironment(Environment):
"event_endpoint": constants.Endpoint.EVENT.name,
"events": constants.EVENTS,
"state": constants.STATE,
"final": constants.FINAL,
"processing": constants.PROCESSING,
"initial_result": {
constants.STATE: None,
constants.EVENTS: [],
constants.FINAL: True,
constants.PROCESSING: False,
},
"color_mode": constants.COLOR_MODE,

View File

@ -129,6 +129,8 @@ ROUTER = "router"
SOCKET = "socket"
# The name of the variable to hold API results.
RESULT = "result"
# The name of the final variable.
FINAL = "final"
# The name of the process variable.
PROCESSING = "processing"
# The name of the state variable.

View File

@ -655,7 +655,7 @@ class State(Base, ABC, extra=pydantic.Extra.allow):
self.clean()
# Run the event generator and return state updates.
async for events, processing in event_iter:
async for events, final in event_iter:
# Fix the returned events.
events = fix_events(events, event.token) # type: ignore
@ -663,7 +663,7 @@ class State(Base, ABC, extra=pydantic.Extra.allow):
delta = self.get_delta()
# Yield the state update.
yield StateUpdate(delta=delta, events=events, processing=processing)
yield StateUpdate(delta=delta, events=events, final=final)
# Clean the state to prepare for the next event.
self.clean()
@ -681,7 +681,7 @@ class State(Base, ABC, extra=pydantic.Extra.allow):
Yields:
Tuple containing:
0: The state update after processing the event.
1: Whether the event is being processed.
1: Whether the event is the final event.
"""
# Get the function to process the event.
fn = functools.partial(handler.fn, state)
@ -699,24 +699,24 @@ class State(Base, ABC, extra=pydantic.Extra.allow):
# Handle async generators.
if inspect.isasyncgen(events):
async for event in events:
yield event, True
yield None, False
yield event, False
yield None, True
# Handle regular generators.
elif inspect.isgenerator(events):
for event in events:
yield event, True
yield None, False
yield event, False
yield None, True
# Handle regular event chains.
else:
yield events, False
yield events, True
# If an error occurs, throw a window alert.
except Exception:
error = traceback.format_exc()
print(error)
yield [window_alert("An error occurred. See logs for details.")], False
yield [window_alert("An error occurred. See logs for details.")], True
def _always_dirty_computed_vars(self) -> Set[str]:
"""The set of ComputedVars that always need to be recalculated.
@ -881,8 +881,8 @@ class StateUpdate(Base):
# Events to be added to the event queue.
events: List[Event] = []
# Whether the event is still processing.
processing: bool = False
# Whether this is the final state update for the event.
final: bool = True
class StateManager(Base):

View File

@ -647,14 +647,14 @@ async def test_process_event_generator(gen_state):
count += 1
if count == 6:
assert update.delta == {}
assert not update.processing
assert update.final
else:
assert gen_state.value == count
assert update.delta == {
"gen_state": {"value": count},
}
assert update.processing
assert not update.final
assert count == 6