Skip to content

@origints/core

The core package provides the planning system, execution runtime, lineage tracking, schema derivation, and all format-independent extraction primitives.

Terminal window
npm install @origints/core
  • Two-phase architecture: plan then execute
  • Immutable execution plans that can be inspected and reused
  • First-class provenance tracking with full lineage graphs
  • Structured failure types (missing, type, format, constraint, runtime, panic, validation)
  • Fail-fast execution with no silent coercions
  • Schema validation via Standard Schema (Zod, Valibot, etc.)
  • Transform registry for decoupled execution logic
  • JSON Schema derivation from plans or specs
  • Output transforms (groupBy, aggregate, sort, filter, drop, lookup, joinBy, etc.)
  • Opt-in benchmarking with per-node and per-extraction timing
import { Planner, load, run } from '@origints/core'
const plan = new Planner()
.in(load({ name: 'Alice', age: 30, role: 'admin' }))
.emit((out, $) =>
out
.add('name', $.get('name').string())
.add('age', $.get('age').number())
.add('role', $.get('role').string())
)
.compile()
const result = await run(plan)
// result.value: { name: 'Alice', age: 30, role: 'admin' }
const plan = new Planner()
.in(load({ name: 'Alice', age: 30, role: 'admin' }))
.emit((out, $) =>
out.addAll({
name: $.get('name').string(),
age: $.get('age').number(),
role: $.get('role').string(),
})
)
.compile()
import { runOk } from '@origints/core'
const value = await runOk(plan) // throws on failure, returns value on success
import { Planner, loadFile, run, parseJson } from '@origints/core'
const plan = new Planner()
.in(loadFile('data.json'))
.mapIn(parseJson())
.emit((out, $) =>
out.add('id', $.get('id').number()).add('name', $.get('name').string())
)
.compile()
const result = await run(plan, {
readFile: path => fs.promises.readFile(path),
})
import { Planner, load, run, optional } from '@origints/core'
const plan = new Planner()
.in(load({ name: 'Alice' }))
.emit((out, $) =>
out
.add('name', $.get('name').string())
.add('nickname', optional($.get('nickname').string()))
.add('score', optional($.get('score').number(), 0))
)
.compile()
const result = await run(plan)
// result.value: { name: 'Alice', nickname: undefined, score: 0 }
import {
Planner,
load,
run,
tryExtract,
mapSpec,
literal,
} from '@origints/core'
const plan = new Planner()
.in(load({ price: '42.50' }))
.emit((out, $) =>
out.add(
'price',
tryExtract(
$.get('price').number(),
mapSpec(
$.get('price').string(),
v => parseFloat(v as string),
'parseFloat'
),
literal(null)
)
)
)
.compile()
const result = await run(plan)
// result.value: { price: 42.5 }
import { Planner, load, run, guard, tryExtract, literal } from '@origints/core'
const plan = new Planner()
.in(load({ age: -5 }))
.emit((out, $) =>
out.add(
'age',
tryExtract(
guard(
$.get('age').number(),
v => (v as number) >= 0,
'Age must be non-negative'
),
literal(0)
)
)
)
.compile()
const result = await run(plan)
// result.value: { age: 0 }

Replace nested combinators with readable chains:

import { Planner, load, fluent, literal } from '@origints/core'
const plan = new Planner()
.in(load({ price: '-5' }))
.emit((out, $) =>
out.add(
'price',
fluent($.get('price').string())
.map(v => parseFloat(v as string), 'parseFloat')
.guard(v => (v as number) > 0, 'Must be positive')
.or(literal(0))
)
)
.compile()
// Flat merge — combines outputs
const combined = Planner.merge(configPlan, usersPlan)
// Named merge — nests under keys
const nested = Planner.mergeAs({ config: configPlan, users: usersPlan })
import { formatLineage, formatLineageAsString } from '@origints/core'
const result = await run(plan)
console.log(formatLineageAsString(result.lineage, plan.ast))
const trace = formatLineage(result.lineage, plan.ast)
console.log(JSON.stringify(trace, null, 2))

Enable per-node execution timing to identify bottlenecks:

import { run, formatBenchmark } from '@origints/core'
const result = await run(plan, { benchmark: true })
if (result.benchmark) {
// Structured data: result.benchmark.nodes, .phases, .totalMs
console.log(formatBenchmark(result.benchmark))
}

The benchmark includes per-node wall-clock timing, sub-timings for each extraction within emit nodes, sub-timings for each output transform within mapOut nodes, and phase-level aggregation. Zero overhead when disabled.

See Lineage, Source Maps & Benchmarking for the full reference.

.mapOut() applies structural transformations after extraction. All path-based methods use callback selectors for type-safe path construction with IDE autocomplete.

import { sum } from '@origints/core'
const plan = new Planner()
.in(
load({
items: [
{ cat: 'A', amount: 100 },
{ cat: 'B', amount: 50 },
],
})
)
.emit((out, $) =>
out.add(
'items',
$.get('items').array(i => ({
cat: i.get('cat').string(),
amount: i.get('amount').number(),
}))
)
)
.mapOut($ => $.groupBy(o => o.items, 'cat'))
.compile()

Scoped transforms operate within sub-paths. Chain .each() to apply per-element:

.mapOut($ => $
.groupBy(o => o.items, 'cat')
.at(o => o.items).each().sort('amount', 'desc')
.at(o => o.items).each().aggregate({
operations: [sum('amount', 'total')],
into: 'inline',
})
)

Available transforms: groupBy, indexBy, aggregate, nest, unnest, sort, filter, rename, pick, omit, drop, lookup, joinBy, derive, pivot, apply.

See Output Transforms for the full reference.

import { JsonSchema } from '@origints/core'
const schema = JsonSchema.output(plan, {
draft: '2020-12',
title: 'User',
deduplicate: true,
})

Import focused subsystems:

import { extract, object } from '@origints/core/extract'
import { Plan, Planner } from '@origints/core/plan'
import { run, runOk } from '@origints/core/run'
import { OutputTransformBuilder } from '@origints/core/transforms'
import { PlanPipeline } from '@origints/core/pipeline'
  • Not a general-purpose ETL framework
  • Not optimized for streaming large datasets
  • Not a schema definition language

MIT