Pipeline API¶
The | operator composes transformation stages into a lazy pipeline.
How it works¶
CrystalXMLSource | stage returns a Pipeline object. Chaining multiple
stages creates a composition; nothing executes until you iterate or sink the
result.
from crxml import CrystalXMLSource, RenameFields, CastTypes
pipe = (
CrystalXMLSource("report.xml")
| RenameFields({"f1": "name", "f2": "total"})
| CastTypes({"total": float})
)
# No iteration has happened yet
for row in pipe: # execution starts here
print(row)
Immutable composition¶
Pipelines are immutable. Every | produces a new Pipeline object without
modifying the previous one:
base = CrystalXMLSource("report.xml") | RenameFields(mapping)
# These are independent, each re-reads the source
pipe_a = base | CastTypes({"amount": float})
pipe_b = base | DropFields("tax_rate")
Pipeline object¶
Usually created implicitly via |. The Pipeline class is also importable:
from crxml import Pipeline, CrystalXMLSource, RenameFields
pipe = Pipeline(CrystalXMLSource("report.xml"), RenameFields(mapping))
Lazy evaluation¶
Pipelines are fully lazy until one of:
for row in pipeline:, per-row iterationlist(pipeline), collect all rowsto_dataframe(pipeline), DataFrame sinkto_csv(pipeline, path), CSV sinkcollect(pipeline), list sink
Example: 3-stage pipeline¶
pipe = (
CrystalXMLSource("report.xml")
| RenameFields({"vendor": "supplier", "price": "cost"})
| CastTypes({"cost": float})
| FilterRows(lambda r: r["cost"] > 100)
)
Each stage processes rows in sequence. A row dropped by FilterRows never
reaches later stages.