Plan Pipeline
PlanPipeline orchestrates multi-step workflows by running plans sequentially. Each step can be a static plan or a dynamic factory that builds a plan from the previous step’s output.
Problem
Section titled “Problem”Some workflows require multiple stages where later stages depend on earlier results:
- Extract configuration, then use it to parameterize a data extraction plan
- Run an initial extraction, transform the results, then feed them into a second extraction
- Chain independent plans while maintaining a unified lineage across all steps
PlanPipeline provides a simple sequential runner with cross-plan lineage connections.
import { PlanPipeline } from '@origints/core'
const result = await new PlanPipeline() .step('config', configPlan) // static plan .step('extract', input => { // dynamic factory const { threshold } = input as Config return buildExtractionPlan(threshold) }) .step('finalize', finalizePlan) // static plan .run().step(name, planOrFactory)
Section titled “.step(name, planOrFactory)”Add a step to the pipeline. The second argument is either:
- A
Plan— executed directly (static step) - A factory function
(input: unknown) => Plan— receives the previous step’s output and returns a plan to execute (dynamic step)
.run(ctx?)
Section titled “.run(ctx?)”Execute all steps sequentially. Accepts an optional RunContext (for readFile, glob, registry).
Returns a PipelineResult:
type PipelineResult<Out> = | { ok: true; value: Out; pipelineLineage: PipelineLineage } | { ok: false failedStep: string error: RunResult<unknown> pipelineLineage: PipelineLineage }Static vs Dynamic Steps
Section titled “Static vs Dynamic Steps”Static steps
Section titled “Static steps”Static steps run their plan regardless of previous output. Useful for independent extractions that happen to run in sequence.
new PlanPipeline().step('users', usersPlan).step('config', configPlan).run()Dynamic steps
Section titled “Dynamic steps”Dynamic steps receive the previous step’s output and build a plan from it. This enables data-driven plan construction.
new PlanPipeline() .step('config', configPlan) .step('extract', config => { const { source, format } = config as AppConfig return new Planner() .in(loadFile(source)) .mapIn(format === 'json' ? parseJson() : parseCsv()) .emit((out, $) => out.add('data', $.get('records').strings())) .compile() }) .run()Cross-Plan Lineage
Section titled “Cross-Plan Lineage”PipelineLineage tracks execution across all steps:
interface PipelineLineage { readonly steps: ReadonlyMap<string, PipelineStepResult> readonly connections: readonly PipelineConnection[]}
interface PipelineStepResult { readonly ok: boolean readonly lineage: RuntimeLineage readonly output?: unknown}
interface PipelineConnection { readonly fromStep: string readonly toStep: string readonly value: unknown}Connections are recorded for dynamic factory steps, capturing the data flow from one step’s output to the next step’s plan construction.
Formatting
Section titled “Formatting”import { formatPipelineLineage } from '@origints/core'
const formatted = formatPipelineLineage(result.pipelineLineage, { config: configPlan.ast, extract: extractPlan.ast,})
// formatted.success: boolean// formatted.steps: FormattedPipelineStep[]// formatted.summary: string (e.g., "2 steps succeeded")Error Handling
Section titled “Error Handling”- Fail-fast: The pipeline stops at the first failing step
- Partial lineage: Even on failure, lineage is available for all attempted steps
failedStep: The result includes the name of the step that failederror: The fullRunResultfrom the failing step is preserved
const result = await pipeline.run()
if (!result.ok) { console.error(`Pipeline failed at step "${result.failedStep}"`) for (const failure of result.error.failures) { console.error(` [${failure.kind}] ${failure.message}`) } // Lineage still available for debugging console.log(`Steps attempted: ${result.pipelineLineage.steps.size}`)}When to Use Pipeline vs ForEach
Section titled “When to Use Pipeline vs ForEach”| Use case | Approach |
|---|---|
| Iterate over values to parameterize extraction | ForEachSpec |
| Chain independent plan executions | PlanPipeline |
| Build a plan based on previous plan’s output | PlanPipeline (dynamic step) |
| Dynamic cell/column lookup in XLSX | ForEachSpec + dynamic predicates |
| Multi-stage ETL workflow | PlanPipeline |
| Nested iteration (e.g., subjects x students) | Nested ForEachSpec |