Getting Started¶
pypekit is a Python library for designing and running data-processing pipelines.
Describe each task once and pypekit will automatically construct every valid pipeline for you and execute them with caching.
Installation¶
pip install pypekit
Defining Tasks¶
Create a task by subclassing Task, overriding the input_types and output_types properties, and implementing the run() method.
| Property | Purpose |
|---|---|
input_types |
A list of strings representing input types that the task can accept. |
output_types |
A list of strings representing output types that the task produces. |
run(): This method is where the task's logic is implemented. It processes the input and produces the output.
Pipeline Construction Rules¶
- Pipelines start with tasks that have
"source"ininput_typesand end with tasks that have"sink"inoutput_types. - All other types are intermediate and dictate how tasks can be chained.
- Two tasks connect when at least one output type of the upstream task matches an input type of the downstream task.
from pypekit import Task
class Source(Task):
input_types = ["source"]
output_types = ["a"]
def run(self, _):
print("Running Source")
return "source"
class Transform1(Task):
input_types = ["a"]
output_types = ["b"]
def run(self, x):
print("Running Transform1")
return x + "_transformed-1"
class Transform2(Task):
input_types = ["a", "b"]
output_types = ["b"]
def run(self, x):
print("Running Transform2")
return x + "_transformed-2"
class Sink1(Task):
input_types = ["b"]
output_types = ["sink"]
def run(self, x):
print("Running Sink1")
return x + "_sink-1"
class Sink2(Task):
input_types = ["b"]
output_types = ["sink"]
def run(self, x):
print("Running Sink2")
return x + "_sink-2"
Building a Repository¶
Collect your task classes in a Repository, then call build_tree() to let pypekit construct a tree of tasks based on the defined input_types and output_types.
The resulting tree represents every source-to-sink pathway implied by your task definitions.
from pypekit import Repository
repository = Repository([
Source,
Transform1,
Transform2,
Sink1,
Sink2
])
root = repository.build_tree()
Inspect Tree¶
To inspect the tree structure, call build_tree_string(), which will return a string representation of the tree.
tree_representation = repository.build_tree_string()
print(tree_representation)
└── Root()
└── Source()
├── Transform1()
│ ├── Transform2()
│ │ ├── Sink1()
│ │ └── Sink2()
│ ├── Sink1()
│ └── Sink2()
└── Transform2()
├── Sink1()
└── Sink2()
Build Pipelines¶
To get all viable pipelines from the tree, call build_pipelines().
pipelines = repository.build_pipelines()
for p in pipelines:
print(p)
Pipeline(tasks=[Source(), Transform1(), Transform2(), Sink1()]) Pipeline(tasks=[Source(), Transform1(), Transform2(), Sink2()]) Pipeline(tasks=[Source(), Transform1(), Sink1()]) Pipeline(tasks=[Source(), Transform1(), Sink2()]) Pipeline(tasks=[Source(), Transform2(), Sink1()]) Pipeline(tasks=[Source(), Transform2(), Sink2()])
Execute Pipelines with Caching¶
Running many similar pipelines can be wasteful if they share sub-chains.
To cache intermediate results, pass a list of pipelines to the CachedExecutor and call the run() method.
The executor only runs tasks that have not been executed with the same input before, and reuses cached results for the rest.
from pypekit import CachedExecutor
executor = CachedExecutor(pipelines, verbose=True)
results = executor.run()
Running Source Running Transform1 Running Transform2 Running Sink1 Pipeline 1/6 completed. Runtime: 0.00s. Running Sink2 Pipeline 2/6 completed. Runtime: 0.00s. Running Sink1 Pipeline 3/6 completed. Runtime: 0.00s. Running Sink2 Pipeline 4/6 completed. Runtime: 0.00s. Running Transform2 Running Sink1 Pipeline 5/6 completed. Runtime: 0.00s. Running Sink2 Pipeline 6/6 completed. Runtime: 0.00s.
Inspect Results¶
After run() finishes, executor.results is a nested dict whose keys are pipeline IDs.
Each entry records:
output– the output of the pipeline,runtime– cumulative seconds spent (for cached tasks, the runtime is also taken from the cache),tasks– the task list that formed the pipeline.
import json
for r in results.values():
print(json.dumps(r, indent=2))
{
"output": "source_transformed-1_transformed-2_sink-1",
"runtime": 7.836699933250202e-05,
"tasks": [
"Source()",
"Transform1()",
"Transform2()",
"Sink1()"
]
}
{
"output": "source_transformed-1_transformed-2_sink-2",
"runtime": 8.094199893093901e-05,
"tasks": [
"Source()",
"Transform1()",
"Transform2()",
"Sink2()"
]
}
{
"output": "source_transformed-1_sink-1",
"runtime": 7.130399899324402e-05,
"tasks": [
"Source()",
"Transform1()",
"Sink1()"
]
}
{
"output": "source_transformed-1_sink-2",
"runtime": 7.185499907791382e-05,
"tasks": [
"Source()",
"Transform1()",
"Sink2()"
]
}
{
"output": "source_transformed-2_sink-1",
"runtime": 6.648499856964918e-05,
"tasks": [
"Source()",
"Transform2()",
"Sink1()"
]
}
{
"output": "source_transformed-2_sink-2",
"runtime": 6.663499880232848e-05,
"tasks": [
"Source()",
"Transform2()",
"Sink2()"
]
}
Reusing Cache¶
If you already have a cache from a previous run, you can reuse it by passing the cache argument to the CachedExecutor.
new_executor = CachedExecutor(pipelines, cache=executor.cache, verbose=True)
new_executor.run();
Pipeline 1/6 completed. Runtime: 0.00s. Pipeline 2/6 completed. Runtime: 0.00s. Pipeline 3/6 completed. Runtime: 0.00s. Pipeline 4/6 completed. Runtime: 0.00s. Pipeline 5/6 completed. Runtime: 0.00s. Pipeline 6/6 completed. Runtime: 0.00s.
Instances, Parameters and Pipelines¶
A repository can mix and match several flavours of “tasks”:
| What you pass | How the repository treats it |
|---|---|
| A class | Instantiated at every node of the tree. |
An instance (Task()) |
Re-use the same instance everywhere it’s needed. |
A tuple (Task, kwargs) |
Task class with kwargs to use on instantiation. |
| An existing Pipeline | Used as an instance of a task. |
This flexibility lets you
- reuse heavyweight objects (e.g. a loaded ML model),
- scan hyper-parameters by specifying multiple (class, kwargs) tuples,
- embed pre-fabricated sub-pipelines inside larger graphs,
from pypekit import Pipeline
# New task that takes arguments
class Transform3(Task):
input_types = ["a"]
output_types = ["b"]
def __init__(self, **kwargs):
self.test = kwargs.get("test", False)
def run(self, x):
print("Running Transform3 with test =", self.test)
return x + "_transformed-3" + ("-test" if self.test else "")
pipeline = Pipeline([
Transform1(),
Sink2()
])
repository = Repository([
Source,
Transform1,
(Transform3, {"test": True}), # Transform3 will be instantiated with the argument test=True every time it is used
Sink1(), # Every node with the task Sink1 will have the same instance
pipeline # Pipeline instance as task
])
repository.build_tree()
print(repository.build_tree_string())
└── Root()
└── Source()
├── Transform1()
│ └── Sink1()
├── Transform3(test=True)
│ └── Sink1()
└── Pipeline(tasks=[Transform1(), Sink2()])