Skip to content

Plan Pipeline

PlanPipeline orchestrates multi-step workflows by running plans sequentially. Each step can be a static plan or a dynamic factory that builds a plan from the previous step’s output.

Some workflows require multiple stages where later stages depend on earlier results:

  • Extract configuration, then use it to parameterize a data extraction plan
  • Run an initial extraction, transform the results, then feed them into a second extraction
  • Chain independent plans while maintaining a unified lineage across all steps

PlanPipeline provides a simple sequential runner with cross-plan lineage connections.

import { PlanPipeline } from '@origints/core'
const result = await new PlanPipeline()
.step('config', configPlan) // static plan
.step('extract', input => {
// dynamic factory
const { threshold } = input as Config
return buildExtractionPlan(threshold)
})
.step('finalize', finalizePlan) // static plan
.run()

Add a step to the pipeline. The second argument is either:

  • A Plan — executed directly (static step)
  • A factory function (input: unknown) => Plan — receives the previous step’s output and returns a plan to execute (dynamic step)

Execute all steps sequentially. Accepts an optional RunContext (for readFile, glob, registry).

Returns a PipelineResult:

type PipelineResult<Out> =
| { ok: true; value: Out; pipelineLineage: PipelineLineage }
| {
ok: false
failedStep: string
error: RunResult<unknown>
pipelineLineage: PipelineLineage
}

Static steps run their plan regardless of previous output. Useful for independent extractions that happen to run in sequence.

new PlanPipeline().step('users', usersPlan).step('config', configPlan).run()

Dynamic steps receive the previous step’s output and build a plan from it. This enables data-driven plan construction.

new PlanPipeline()
.step('config', configPlan)
.step('extract', config => {
const { source, format } = config as AppConfig
return new Planner()
.in(loadFile(source))
.mapIn(format === 'json' ? parseJson() : parseCsv())
.emit((out, $) => out.add('data', $.get('records').strings()))
.compile()
})
.run()

PipelineLineage tracks execution across all steps:

interface PipelineLineage {
readonly steps: ReadonlyMap<string, PipelineStepResult>
readonly connections: readonly PipelineConnection[]
}
interface PipelineStepResult {
readonly ok: boolean
readonly lineage: RuntimeLineage
readonly output?: unknown
}
interface PipelineConnection {
readonly fromStep: string
readonly toStep: string
readonly value: unknown
}

Connections are recorded for dynamic factory steps, capturing the data flow from one step’s output to the next step’s plan construction.

import { formatPipelineLineage } from '@origints/core'
const formatted = formatPipelineLineage(result.pipelineLineage, {
config: configPlan.ast,
extract: extractPlan.ast,
})
// formatted.success: boolean
// formatted.steps: FormattedPipelineStep[]
// formatted.summary: string (e.g., "2 steps succeeded")
  • Fail-fast: The pipeline stops at the first failing step
  • Partial lineage: Even on failure, lineage is available for all attempted steps
  • failedStep: The result includes the name of the step that failed
  • error: The full RunResult from the failing step is preserved
const result = await pipeline.run()
if (!result.ok) {
console.error(`Pipeline failed at step "${result.failedStep}"`)
for (const failure of result.error.failures) {
console.error(` [${failure.kind}] ${failure.message}`)
}
// Lineage still available for debugging
console.log(`Steps attempted: ${result.pipelineLineage.steps.size}`)
}
Use caseApproach
Iterate over values to parameterize extractionForEachSpec
Chain independent plan executionsPlanPipeline
Build a plan based on previous plan’s outputPlanPipeline (dynamic step)
Dynamic cell/column lookup in XLSXForEachSpec + dynamic predicates
Multi-stage ETL workflowPlanPipeline
Nested iteration (e.g., subjects x students)Nested ForEachSpec