Videos
Hi,
I need some clarifications. In my use case the first step requires an LLM to identify and split different segment of the input text / document. Then, for each of the segments I have a linear flow to follow (extract info, call agents, ...). Finally I have to collect all the outputs.
I am unsure how to achieve the "for loop" (if possible). Instead of an add_edge, I'd need an add edges
workflow.add_node("split", split)
workflow.add_node("extract", extract)
workflow.add_node("collect", collect)
workflow.set_entry_point("split") # after split I get an array of chunks
workflow.add_edges("split", "extract") # for each chunk do some custom logic
workflow.collect_edges("extract", "collect") # collect everythingI've been building a production agentic system and the trickiest part was getting the checkpoint/interrupt pattern right. Here's what actually works.
The key is interrupt_before=["integrator"] when compiling the graph. This pauses execution before any real-world action fires — state is persisted to SQLite, and the workflow resumes exactly where it left off when you call approve.
pythonreturn workflow.compile(
checkpointer=checkpointer,
interrupt_before=["integrator"]
)
What trips people up: you need an AsyncSqliteSaver checkpointer, otherwise state doesn't persist across API calls. Without it, resuming the graph just restarts from scratch.
The approval endpoint then just resumes the existing graph run with the stored thread config — no re-execution of previous nodes.
Anyone else using this pattern in production? Curious how others are handling the state schema as workflows get more complex.
3-minute demo video and full source code in the links below.