Pipeline API
Pipeline API provides building blocks for YAML pipeline.
Each stage and configuration object here represents a specific part of the YAML definition file.
Example pipeline stage with all possible keys. Actual usage of keys depends on used model kind
.
- name: str
kind: source | sink | transform
log_level: DEBUG
skip_if:
on_error: error | continue
connection: Connection
kind: Parquet | Deltatable | Rest | ...
locator: str
config: ConnectionConfiguration
client: RestClient
request: RestRequest
endpoint: str
method: RequestMethod
errorhandler: RestErrorHandler
auth: RestAuth
method: header | oauth2 | ...
values: StringDict
query: dict | @json-magic
body: dict | @json-magic
response: RestResponse
handler:
kind: ResponseHandlerTypes
page_param: str
page_size: str
param_locator: ParameterDisposition
total_records: str
throttle: int | float
content_type: DataType
locator: str
fields: Fields
show: int
show_schema: bool
query: > str
stages: Stages
throttle: int
__all__
¶
__all__ = [
"Column",
"Columns",
"Stages",
"Stage",
"ConnectionConfiguration",
"Variables",
"FlowContext",
"S3Config",
"GSConfig",
"ClickhouseConfig",
"BigQueryConfig",
"BucketConfig",
"Connection",
"PhysicalConnection",
"EphemeralConnection",
"ParquetConnection",
"CSVConnection",
"JSONConnection",
"FileConnection",
"BigQueryConnection",
"ClickhouseConnection",
"DeltatableConnection",
"Dimension",
"CustomConnection",
"RestConnection",
"SQLGenConnection",
]
BigQueryConfig
¶
Bases: BaseModel
Configuration for accessing BigQuery datasets.
credential_file
¶
credential_file: Optional[str] = None
Path to service account JSON credentials for BigQuery access.
BigQueryConnection
¶
Bases: PhysicalConnection
, VersionedConnection
data_mode
¶
data_mode: str = 'error'
Data mode for write operation. For Deltatable valid options are:
append
adds new dataoverwrite
replaces all dataerror
fails (table is read-only)
partition
¶
partition: Optional[List[str]] = None
If set then delta table is partitioned using specified fields for faster reads
schema_mode
¶
schema_mode: Optional[str] = None
Deltatable schema behaviour. If not set then write fails with error in case the data schema does not match existing table schema.
Schema evolution options:
merge
adds new columns from dataoverwrite
adds new columns, drops missing
BucketConfig
¶
Bases: BaseModel
bucket
¶
bucket: Optional[str] = None
Bucket name eg. s3://my-bucket
or local absolute path eg. /data/path
data_prefix
¶
data_prefix: Optional[str] = None
Prefix for data files: s3://my-bucket/<prefix>
or
/data/path/<prefix>
locator_wildcard
¶
locator_wildcard: Optional[Tuple] = None
Regular expression and wildcard to modify locator
.
Useful in cases when you sink data to data-YYYY.parquet
but want to read data-*.parquet
.
Provide tuple with patten as first element and replacement
as second one. Example:
('-\d{4}-', '*')
This will create wildcard for data-YYYY-base.parquet
as
data*base.parquet
.
Wildcard | Description |
---|---|
* |
matches any number of any characters (including none) |
** |
matches any number of subdirectories (including none) |
[abc] |
matches one character given in the bracket |
[a-z] |
matches one character from the range given in the bracket |
Wildcard is automatically applied in tap and show_schema operations.
CSVConnection
¶
Bases: PhysicalConnection
ClickhouseConfig
¶
ClickhouseConnection
¶
Column
¶
Connection
¶
Bases: BaseModel
config
¶
config: Optional[ConnectionConfiguration] = None
Optional configuration for the current connection. If not present then global configuration will be used.
create_statement
¶
create_statement: str | None = None
Create statement for given table. Must be in requested dialect.
fields
¶
fields: Optional[Columns] = None
If set then schema is used to generate source structure in case actual source does not provide data in which case generation of ephemeral view fails.
kind
¶
kind: str
Model type e.g. Deltatable, Bigquery, Clickhouse, Parquet, File Custom connections can be loaded from module.
params
¶
params: Optional[dict] = Field(default_factory=dict)
Any parameters that can be passed to connection.
show_schema
¶
show_schema: Optional[bool] = None
If true then schema is automatically detected from the input data and logged '
ConnectionConfiguration
¶
Bases: BaseModel
Top-level container for all connection configurations in a pipeline.
Includes default configuration blocks for supported sources and sinks like: - Local file systems - S3 and GCS buckets - BigQuery datasets - ClickHouse databases
These can be customized per pipeline or extended with new sources.
Example
To support MySQL, you can define a new model like this:
class MySQLConfig(DatabaseConfig):
pass # You can add MySQL-specific fields here if needed
class ExtendedConnectionConfiguration(ConnectionConfiguration):
mysql: MySQLConfig = PydanticField(default_factory=MySQLConfig)
This will allow you to specify MySQL connection settings in your YAML:
connection:
kind: MySQL
config:
host: localhost
port: 3306
database: sales
username: admin
password: secret
bigquery
¶
bigquery: BigQueryConfig = Field(
default_factory=BigQueryConfig
)
BigQuery connection configuration.
clickhouse
¶
clickhouse: ClickhouseConfig = Field(
default_factory=ClickhouseConfig
)
Language model configuration.
CustomConnection
¶
Bases: PhysicalConnection
Custom connection provider. Custom connection may implement its own logic but must derive from base Connection class, and expose tap(), sink(), sql() and show_schema() even if they are no-op.
params
¶
params: Optional[dict] = Field(default_factory=dict)
Free-form configuration parameters passed to the loaded class
load
¶
load() -> Type[Connection]
Dynamically load the connection class from the given module and member.
Returns:
Type | Description |
---|---|
Type[Connection]
|
A subclass of Connection. |
Raises:
Type | Description |
---|---|
ImportError
|
If the module or member can't be imported. |
Source code in ankaflow/models/connections.py
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
|
DeltatableConnection
¶
Bases: PhysicalConnection
, VersionedConnection
data_mode
¶
data_mode: str = 'error'
Data mode for write operation. For Deltatable valid options are:
append
adds new dataoverwrite
replaces all dataerror
fails (table is read-only)
optimize
¶
optimize: Optional[Union[str, int]] = 1
Use with Deltatable and other engines whether to optimize after each sink operation. With larger tables this may be a lengthy synchronous operation.
Default value is optimize and vacuum with 7 day retention.
Deltatable¶
Values are optimize,vacuum,all,Literal[int]
.
If value is literal int is provided then parts older than
number of days will be removed. Note that this will override
default retention days.
String options vacuum,all
are equivalent to 0.
partition
¶
partition: Optional[List[str]] = None
If set then delta table is partitioned using specified fields for faster reads
schema_mode
¶
schema_mode: Optional[str] = None
Deltatable schema behaviour. If not set then write fails with error in case the data schema does not match existing table schema.
Schema evolution options:
merge
adds new columns from dataoverwrite
adds new columns, drops missing
Dimension
¶
Bases: EphemeralConnection
load
¶
load() -> Type[Connection]
Dynamically load the connection class from the given module and member.
Returns:
Type | Description |
---|---|
Type[Connection]
|
A subclass of Connection. |
Raises:
Type | Description |
---|---|
ImportError
|
If the module or member can't be imported. |
Source code in ankaflow/models/connections.py
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
|
EphemeralConnection
¶
Bases: Connection
FileConnection
¶
Bases: PhysicalConnection
FlowContext
¶
Bases: ImmutableMap
Context dictionary can be used to supply arbitrary data to pipeline that can be referenced in templates much much like variables with the difference that they cannot be modified at runtime.
Context can be initiated via dictionary:
context = FlowContext(**{'foo':'Bar'}])
Items can be reference using both bracket and dot notation:
context.foo == context['foo'] == 'Bar'
GSConfig
¶
Bases: BucketConfig
Google Cloud Storage (GCS) configuration.
- HMAC credentials allow reading/writing Parquet, JSON, and CSV.
- Delta table writes require a full service account credential file.
credential_file
¶
credential_file: Optional[str] = None
Path to GCP service account credential file (JSON). Required for certain write operations (e.g., Delta tables).
JSONConnection
¶
Bases: PhysicalConnection
ParquetConnection
¶
Bases: PhysicalConnection
PhysicalConnection
¶
Bases: Connection
locator
¶
locator: str
Table name or file name or URI, or other identifier required by the connection.
RestConnection
¶
Bases: EphemeralConnection
Configuration for a REST-based data connection.
This model defines how to configure and execute REST API requests as part of a pipeline step. It includes the request definition, client behavior (e.g., retries, headers), and optional schema discovery.
client
¶
client: RestClientConfig
Configuration for the REST client (e.g., base URL, headers, auth).
fields
¶
fields: Optional[Columns] = None
Optional schema definition used to validate or transform response data.
It is recommended to manually specify the schema after initial discovery. This ensures downstream pipeline stages remain stable, even when the remote API returns no results (e.g., due to no updates in an incremental fetch). Explicit schema prevents silent failures or schema drift in such cases.
show_schema
¶
show_schema: Optional[bool] = None
If True, the connector will attempt to infer or display the response schema automatically.
S3Config
¶
Bases: BucketConfig
S3-specific bucket configuration including optional authentication.
SQLGenConnection
¶
Bases: EphemeralConnection
SQLGen connection is intended for SQL code generation:
Prompt should instruct the model to generate SQL query and and executes resutling a VIEW in the internal database.
Example:
Stage 1: Name: ReadSomeParquetData
Stage 2: Name: CodeGen, query: Given SQL table `ReadSomeParquetData` generate SQL query to
count number of rows in the table.
Inside stage 2 the following happens:
1. Prompt is sent to inferecne endpoint
2. Endpoint is expected to respond with valid SQL
3. Connection will execute a statement `CREATE OR REPLACE VIEW StageName AS <received_select_statement>`
where statement in the exmaple is likely `SELECT COUNT() FROM ReadSomeParquetData`
.
variables
¶
variables: dict | None = None
Variables passed to Prompt. Prompt must be supplied in the
query
field of the Stage
.
Prompt may contain Jinja2-style placeholders:
Example
Here's my name: {{name}}.
The connection will render the prompt template using variables
Stage
¶
Bases: BaseModel
connection
¶
connection: Optional[
Union[
RestConnection,
VariableConnection,
BigQueryConnection,
DeltatableConnection,
ParquetConnection,
ClickhouseConnection,
CustomConnection,
JSONConnection,
CSVConnection,
FileConnection,
SQLGenConnection,
Dimension,
]
] = Field(None, discriminator="kind")
Defines how the data is read from / written to the target.
Connection fields may contain templates and they will be recursively.
Special construct is JSON> which allows dynamically generating parameters as runtime:
- kind: source
name: source_name
connection:
kind: File
params: >
JSON>{
"key": "value",
"dynkey": <<API.property>>,
}
In the above app params
are constructed as JSON string.
It is possible to even construct parameter keys dynamically:
params: >
JSON>
{
<% for number in [1,2,3] %>
"key_<< number >>":<< number >><% if not loop.last %>,<% endif %>
<% endfor %>
}
Above example results the following:
params: >
{
"key_1": 1,
"key_2": 2,
"key_3": 3
}
JSON> structure cannot contain nested JSON> structures, the entire string following the JSON> header must result a valid JSON.
Inbuilt connections include:
- Deltatable (read)
If connection is Deltatable
then query is required to narrow
down data stored in the delta table. FROM
clause must be Deltatable
:
- kind: source
name: delta_tap
connection:
kind: Deltatable
locator: delta_table
query: >
select * from Deltatable
- Deltatable (write)
See also StorageOptions
The following example writes data from preceding stage
to delta table, appending the data, partitions using
part
column, and optimizes and vacuums immediately without
retention after write.
- kind: sink
name: delta_sink
connection:
kind: Deltatable
locator: delta_table
optimize: 0
data_mode: append
partition:
- part_field
- Parquet (read/write)
- JSON (read/write; NOTE: write operation generates newline-delimited JSON)
- CSV (read/write)
-
Variable (read/write)
-
File (read)
File can be read from a connected filesystem (including s3). File name and file type must be specified in the pipeline context:
- `context.FileName`: file name relative to `file_prefix`
- `context.FileType`: file type, one of CSV, XLSX, JSON, HTML
Any file reader configuration must be passed in params
.
- Rest (bidirectional)
Rest connections consist of two parts: Client and Request. Client contains base URL and authentication (basic, digest, header and oauth2 are supported).
- name: TheRest
kind: source
connection:
kind: Rest
client:
base_url: https://api.example.com
auth: # Every request to given endpoint share the same authentication
method: basic
values:
username: TheName
password: ThePassword
request:
endpoint: /v1/some-path
content_type: application/json
method: post
query: # Query parameters
date: <<API.dt(None).isoformat()>>
body: # JSON payload
param: 1
response:
content_type: application/json
locator: "JMESPath.to.data"
Any custom source (API) can be used as long as available via module.
kind
¶
kind: str
Defines which action will be performed:
- source
- transform
- sink
- pipeline
- sql
Sink¶
Sink reads output from previous stage and stores in specified location.
NOTE: If query is supplied with the stage then sink uses output of the query rather than preceding stage directly. Subsequent sink will use preceding stage. If supplied query must create either view or table with same name as current stage.
- name: my_sink
kind: sink
connection:
kind: Variable
query: >
CREATE VIEW my_sink AS
SELECT
1 as foo
Pipeline¶
If pipeline is preceded by any stage then the subpipeline will be executed as many times as there are rows in the previous stage output. This is useful if you want to run same pipeline with different parameters. Make sure the pipeline is preceded by source or transform producing required number of rows. If you need to run subpipeline only once there are two options:
- Place it to the top
- Preced with tranform producing single row only
Each row is then passed to subpipeline in a special variable.
Example pipeline iterating subpipeline 5 times:
- kind: transform name: generate_5 # Generate 5 rows query: > select unnest as current_variable from unnest(generate_series(1,5)) show: 5
- kind: pipeline name: looped_5x stages: - kind: transform name: inside_loop # In query we can reference the value passed from parent pipeline query: > select 'Currently running iteration: {API.look('loop_control.current_variable', variables)}' as value show: 5
locator
¶
locator: Optional[str] = None
Currently unused: Name for the connection configuration: name, or URI.
log_level
¶
log_level: Optional[LogLevel] = None
Set logging level. All stages after (including current) will log with specified level. Possible values: INFO (default), DEBUG, WARNING. Log level will be reset to INFO after each pipeline (including nested pipelines).
name
¶
name: str
Name of the stage, must be unique across all stages in the pipeline and conform to the rules: Must start with letter, may contain lowercase letters, number and underscores. Name is used to reference this stage by other subsequent stages.
on_error
¶
on_error: str = 'fail'
If set to 'continue' then pipeline will not fail. Subsequent stages referring to failed one must handle missing data.
query
¶
query: Optional[str] = None
SQL Query or dictionary with custom source parameters. May contain {dynamic variables}.
show
¶
show: int = 0
If set to positive integer then given number of rows from this stage will get logged. If set to -1 then all rows will be loggged. Set to 0 to disable logging.
skip_if
¶
skip_if: Optional[Any] = None
Any value that can evaluated using bool().
or template string e.g. << True >>
.
When the expression evaluates to True then the stage is skipped.
throttle
¶
throttle: Optional[Union[int, float]] = None
If set to positive value then flow execution will be paused after the stage for given number of seconds.
Useful when dealing with rate limits or otherwise spreading the load over time.
Stages
¶
Bases: RootModel[List[Stage]]
A sequence of processing stages in a data pipeline.
Each Datablock
in root
represents one discrete step or transformation
in an end-to-end business workflow (e.g., tap, transform, sink, validate).
Attributes:
Name | Type | Description |
---|---|---|
root |
List[Datablock]
|
Ordered list of pipeline stages to execute. |
enumerate_steps
¶
enumerate_steps() -> Iterator[tuple[int, Stage]]
Yield each stage along with its 0-based position.
Use this when you need both the stage and its index for logging, metrics, or conditional branching.
Returns:
Type | Description |
---|---|
Iterator[tuple[int, Stage]]
|
Iterator[Tuple[int, Datablock]]: Pairs of (index, stage). |
Source code in ankaflow/models/core.py
407 408 409 410 411 412 413 414 415 416 |
|
load
¶
load(source: Union[str, Path, IO[str], Loadable]) -> Stages
Load a pipeline from YAML (path, YAML-string, file-like or Loadable).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source
|
str | Path | IO[str] | Loadable
|
|
required |
Returns:
Name | Type | Description |
---|---|---|
Stages |
Stages
|
a validated |
Source code in ankaflow/models/core.py
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 |
|
steps
¶
steps() -> Iterator[Stage]
Yield each stage in execution order.
Returns:
Type | Description |
---|---|
Iterator[Stage]
|
Iterator[Datablock]: An iterator over the stages, |
Iterator[Stage]
|
from first to last. |
Source code in ankaflow/models/core.py
398 399 400 401 402 403 404 405 |
|
Variables
¶
Bases: dict
Variables is a dict
-based collection
storing arbitrary data under keys. Variable can be populated via:
- Dictionary passed to pipeline:
Flow(defs, context, variables={'foo': 'bar'})
- Sink operation:
- kind: sink name: my_sink connection: kind: Variable locator: variable_name
This step will place a list of records from preceding step in
variables as {'variable_name': t.List[dict]}
In most cases sainking to variable is not needed as all preceding stages
in a pipeline can be referenced via name
. Sinking to variable is
useful when you need to share data with subpipeline.
Special variable is loop_control
which is populated
dynamically from previous step and current stage type is pipeline.
In case previous step generates multiple records then the new pipeline
is called for each record, and control variable holds a dict
representing the current record.
components
¶
configs
¶
BigQueryConfig
¶
Bases: BaseModel
Configuration for accessing BigQuery datasets.
credential_file
¶
credential_file: Optional[str] = None
Path to service account JSON credentials for BigQuery access.
BucketConfig
¶
Bases: BaseModel
bucket
¶
bucket: Optional[str] = None
Bucket name eg. s3://my-bucket
or local absolute path eg. /data/path
data_prefix
¶
data_prefix: Optional[str] = None
Prefix for data files: s3://my-bucket/<prefix>
or
/data/path/<prefix>
locator_wildcard
¶
locator_wildcard: Optional[Tuple] = None
Regular expression and wildcard to modify locator
.
Useful in cases when you sink data to data-YYYY.parquet
but want to read data-*.parquet
.
Provide tuple with patten as first element and replacement
as second one. Example:
('-\d{4}-', '*')
This will create wildcard for data-YYYY-base.parquet
as
data*base.parquet
.
Wildcard | Description |
---|---|
* |
matches any number of any characters (including none) |
** |
matches any number of subdirectories (including none) |
[abc] |
matches one character given in the bracket |
[a-z] |
matches one character from the range given in the bracket |
Wildcard is automatically applied in tap and show_schema operations.
ClickhouseConfig
¶
ConnectionConfiguration
¶
Bases: BaseModel
Top-level container for all connection configurations in a pipeline.
Includes default configuration blocks for supported sources and sinks like: - Local file systems - S3 and GCS buckets - BigQuery datasets - ClickHouse databases
These can be customized per pipeline or extended with new sources.
Example
To support MySQL, you can define a new model like this:
class MySQLConfig(DatabaseConfig):
pass # You can add MySQL-specific fields here if needed
class ExtendedConnectionConfiguration(ConnectionConfiguration):
mysql: MySQLConfig = PydanticField(default_factory=MySQLConfig)
This will allow you to specify MySQL connection settings in your YAML:
connection:
kind: MySQL
config:
host: localhost
port: 3306
database: sales
username: admin
password: secret
bigquery
¶
bigquery: BigQueryConfig = Field(
default_factory=BigQueryConfig
)
BigQuery connection configuration.
clickhouse
¶
clickhouse: ClickhouseConfig = Field(
default_factory=ClickhouseConfig
)
Language model configuration.
DatabaseConfig
¶
Bases: BaseModel
Base class for SQL database connection configurations.
cluster
¶
cluster: Optional[str] = None
Optional cluster name or identifier (used in ClickHouse and other distributed systems).
GSConfig
¶
Bases: BucketConfig
Google Cloud Storage (GCS) configuration.
- HMAC credentials allow reading/writing Parquet, JSON, and CSV.
- Delta table writes require a full service account credential file.
credential_file
¶
credential_file: Optional[str] = None
Path to GCP service account credential file (JSON). Required for certain write operations (e.g., Delta tables).
S3Config
¶
Bases: BucketConfig
S3-specific bucket configuration including optional authentication.
connections
¶
BigQueryConnection
¶
Bases: PhysicalConnection
, VersionedConnection
data_mode
¶
data_mode: str = 'error'
Data mode for write operation. For Deltatable valid options are:
append
adds new dataoverwrite
replaces all dataerror
fails (table is read-only)
partition
¶
partition: Optional[List[str]] = None
If set then delta table is partitioned using specified fields for faster reads
schema_mode
¶
schema_mode: Optional[str] = None
Deltatable schema behaviour. If not set then write fails with error in case the data schema does not match existing table schema.
Schema evolution options:
merge
adds new columns from dataoverwrite
adds new columns, drops missing
CSVConnection
¶
Bases: PhysicalConnection
ClickhouseConnection
¶
Connection
¶
Bases: BaseModel
config
¶
config: Optional[ConnectionConfiguration] = None
Optional configuration for the current connection. If not present then global configuration will be used.
create_statement
¶
create_statement: str | None = None
Create statement for given table. Must be in requested dialect.
fields
¶
fields: Optional[Columns] = None
If set then schema is used to generate source structure in case actual source does not provide data in which case generation of ephemeral view fails.
kind
¶
kind: str
Model type e.g. Deltatable, Bigquery, Clickhouse, Parquet, File Custom connections can be loaded from module.
params
¶
params: Optional[dict] = Field(default_factory=dict)
Any parameters that can be passed to connection.
show_schema
¶
show_schema: Optional[bool] = None
If true then schema is automatically detected from the input data and logged '
CustomConnection
¶
Bases: PhysicalConnection
Custom connection provider. Custom connection may implement its own logic but must derive from base Connection class, and expose tap(), sink(), sql() and show_schema() even if they are no-op.
params
¶
params: Optional[dict] = Field(default_factory=dict)
Free-form configuration parameters passed to the loaded class
load
¶
load() -> Type[Connection]
Dynamically load the connection class from the given module and member.
Returns:
Type | Description |
---|---|
Type[Connection]
|
A subclass of Connection. |
Raises:
Type | Description |
---|---|
ImportError
|
If the module or member can't be imported. |
Source code in ankaflow/models/connections.py
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
|
DeltatableConnection
¶
Bases: PhysicalConnection
, VersionedConnection
data_mode
¶
data_mode: str = 'error'
Data mode for write operation. For Deltatable valid options are:
append
adds new dataoverwrite
replaces all dataerror
fails (table is read-only)
optimize
¶
optimize: Optional[Union[str, int]] = 1
Use with Deltatable and other engines whether to optimize after each sink operation. With larger tables this may be a lengthy synchronous operation.
Default value is optimize and vacuum with 7 day retention.
Deltatable¶
Values are optimize,vacuum,all,Literal[int]
.
If value is literal int is provided then parts older than
number of days will be removed. Note that this will override
default retention days.
String options vacuum,all
are equivalent to 0.
partition
¶
partition: Optional[List[str]] = None
If set then delta table is partitioned using specified fields for faster reads
schema_mode
¶
schema_mode: Optional[str] = None
Deltatable schema behaviour. If not set then write fails with error in case the data schema does not match existing table schema.
Schema evolution options:
merge
adds new columns from dataoverwrite
adds new columns, drops missing
Dimension
¶
Bases: EphemeralConnection
load
¶
load() -> Type[Connection]
Dynamically load the connection class from the given module and member.
Returns:
Type | Description |
---|---|
Type[Connection]
|
A subclass of Connection. |
Raises:
Type | Description |
---|---|
ImportError
|
If the module or member can't be imported. |
Source code in ankaflow/models/connections.py
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 |
|
FileConnection
¶
Bases: PhysicalConnection
JSONConnection
¶
Bases: PhysicalConnection
ParquetConnection
¶
Bases: PhysicalConnection
PhysicalConnection
¶
Bases: Connection
locator
¶
locator: str
Table name or file name or URI, or other identifier required by the connection.
RestConnection
¶
Bases: EphemeralConnection
Configuration for a REST-based data connection.
This model defines how to configure and execute REST API requests as part of a pipeline step. It includes the request definition, client behavior (e.g., retries, headers), and optional schema discovery.
client
¶
client: RestClientConfig
Configuration for the REST client (e.g., base URL, headers, auth).
fields
¶
fields: Optional[Columns] = None
Optional schema definition used to validate or transform response data.
It is recommended to manually specify the schema after initial discovery. This ensures downstream pipeline stages remain stable, even when the remote API returns no results (e.g., due to no updates in an incremental fetch). Explicit schema prevents silent failures or schema drift in such cases.
show_schema
¶
show_schema: Optional[bool] = None
If True, the connector will attempt to infer or display the response schema automatically.
SQLGenConnection
¶
Bases: EphemeralConnection
SQLGen connection is intended for SQL code generation:
Prompt should instruct the model to generate SQL query and and executes resutling a VIEW in the internal database.
Example:
Stage 1: Name: ReadSomeParquetData
Stage 2: Name: CodeGen, query: Given SQL table `ReadSomeParquetData` generate SQL query to
count number of rows in the table.
Inside stage 2 the following happens:
1. Prompt is sent to inferecne endpoint
2. Endpoint is expected to respond with valid SQL
3. Connection will execute a statement `CREATE OR REPLACE VIEW StageName AS <received_select_statement>`
where statement in the exmaple is likely `SELECT COUNT() FROM ReadSomeParquetData`
.
variables
¶
variables: dict | None = None
Variables passed to Prompt. Prompt must be supplied in the
query
field of the Stage
.
Prompt may contain Jinja2-style placeholders:
Example
Here's my name: {{name}}.
The connection will render the prompt template using variables
VariableConnection
¶
Bases: PhysicalConnection
core
¶
FlowContext
¶
Bases: ImmutableMap
Context dictionary can be used to supply arbitrary data to pipeline that can be referenced in templates much much like variables with the difference that they cannot be modified at runtime.
Context can be initiated via dictionary:
context = FlowContext(**{'foo':'Bar'}])
Items can be reference using both bracket and dot notation:
context.foo == context['foo'] == 'Bar'
Stage
¶
Bases: BaseModel
connection
¶
connection: Optional[
Union[
RestConnection,
VariableConnection,
BigQueryConnection,
DeltatableConnection,
ParquetConnection,
ClickhouseConnection,
CustomConnection,
JSONConnection,
CSVConnection,
FileConnection,
SQLGenConnection,
Dimension,
]
] = Field(None, discriminator="kind")
Defines how the data is read from / written to the target.
Connection fields may contain templates and they will be recursively.
Special construct is JSON> which allows dynamically generating parameters as runtime:
- kind: source
name: source_name
connection:
kind: File
params: >
JSON>{
"key": "value",
"dynkey": <<API.property>>,
}
In the above app params
are constructed as JSON string.
It is possible to even construct parameter keys dynamically:
params: >
JSON>
{
<% for number in [1,2,3] %>
"key_<< number >>":<< number >><% if not loop.last %>,<% endif %>
<% endfor %>
}
Above example results the following:
params: >
{
"key_1": 1,
"key_2": 2,
"key_3": 3
}
JSON> structure cannot contain nested JSON> structures, the entire string following the JSON> header must result a valid JSON.
Inbuilt connections include:
- Deltatable (read)
If connection is Deltatable
then query is required to narrow
down data stored in the delta table. FROM
clause must be Deltatable
:
- kind: source
name: delta_tap
connection:
kind: Deltatable
locator: delta_table
query: >
select * from Deltatable
- Deltatable (write)
See also StorageOptions
The following example writes data from preceding stage
to delta table, appending the data, partitions using
part
column, and optimizes and vacuums immediately without
retention after write.
- kind: sink
name: delta_sink
connection:
kind: Deltatable
locator: delta_table
optimize: 0
data_mode: append
partition:
- part_field
- Parquet (read/write)
- JSON (read/write; NOTE: write operation generates newline-delimited JSON)
- CSV (read/write)
-
Variable (read/write)
-
File (read)
File can be read from a connected filesystem (including s3). File name and file type must be specified in the pipeline context:
- `context.FileName`: file name relative to `file_prefix`
- `context.FileType`: file type, one of CSV, XLSX, JSON, HTML
Any file reader configuration must be passed in params
.
- Rest (bidirectional)
Rest connections consist of two parts: Client and Request. Client contains base URL and authentication (basic, digest, header and oauth2 are supported).
- name: TheRest
kind: source
connection:
kind: Rest
client:
base_url: https://api.example.com
auth: # Every request to given endpoint share the same authentication
method: basic
values:
username: TheName
password: ThePassword
request:
endpoint: /v1/some-path
content_type: application/json
method: post
query: # Query parameters
date: <<API.dt(None).isoformat()>>
body: # JSON payload
param: 1
response:
content_type: application/json
locator: "JMESPath.to.data"
Any custom source (API) can be used as long as available via module.
kind
¶
kind: str
Defines which action will be performed:
- source
- transform
- sink
- pipeline
- sql
Sink¶
Sink reads output from previous stage and stores in specified location.
NOTE: If query is supplied with the stage then sink uses output of the query rather than preceding stage directly. Subsequent sink will use preceding stage. If supplied query must create either view or table with same name as current stage.
- name: my_sink
kind: sink
connection:
kind: Variable
query: >
CREATE VIEW my_sink AS
SELECT
1 as foo
Pipeline¶
If pipeline is preceded by any stage then the subpipeline will be executed as many times as there are rows in the previous stage output. This is useful if you want to run same pipeline with different parameters. Make sure the pipeline is preceded by source or transform producing required number of rows. If you need to run subpipeline only once there are two options:
- Place it to the top
- Preced with tranform producing single row only
Each row is then passed to subpipeline in a special variable.
Example pipeline iterating subpipeline 5 times:
- kind: transform name: generate_5 # Generate 5 rows query: > select unnest as current_variable from unnest(generate_series(1,5)) show: 5
- kind: pipeline name: looped_5x stages: - kind: transform name: inside_loop # In query we can reference the value passed from parent pipeline query: > select 'Currently running iteration: {API.look('loop_control.current_variable', variables)}' as value show: 5
locator
¶
locator: Optional[str] = None
Currently unused: Name for the connection configuration: name, or URI.
log_level
¶
log_level: Optional[LogLevel] = None
Set logging level. All stages after (including current) will log with specified level. Possible values: INFO (default), DEBUG, WARNING. Log level will be reset to INFO after each pipeline (including nested pipelines).
name
¶
name: str
Name of the stage, must be unique across all stages in the pipeline and conform to the rules: Must start with letter, may contain lowercase letters, number and underscores. Name is used to reference this stage by other subsequent stages.
on_error
¶
on_error: str = 'fail'
If set to 'continue' then pipeline will not fail. Subsequent stages referring to failed one must handle missing data.
query
¶
query: Optional[str] = None
SQL Query or dictionary with custom source parameters. May contain {dynamic variables}.
show
¶
show: int = 0
If set to positive integer then given number of rows from this stage will get logged. If set to -1 then all rows will be loggged. Set to 0 to disable logging.
skip_if
¶
skip_if: Optional[Any] = None
Any value that can evaluated using bool().
or template string e.g. << True >>
.
When the expression evaluates to True then the stage is skipped.
throttle
¶
throttle: Optional[Union[int, float]] = None
If set to positive value then flow execution will be paused after the stage for given number of seconds.
Useful when dealing with rate limits or otherwise spreading the load over time.
Stages
¶
Bases: RootModel[List[Stage]]
A sequence of processing stages in a data pipeline.
Each Datablock
in root
represents one discrete step or transformation
in an end-to-end business workflow (e.g., tap, transform, sink, validate).
Attributes:
Name | Type | Description |
---|---|---|
root |
List[Datablock]
|
Ordered list of pipeline stages to execute. |
enumerate_steps
¶
enumerate_steps() -> Iterator[tuple[int, Stage]]
Yield each stage along with its 0-based position.
Use this when you need both the stage and its index for logging, metrics, or conditional branching.
Returns:
Type | Description |
---|---|
Iterator[tuple[int, Stage]]
|
Iterator[Tuple[int, Datablock]]: Pairs of (index, stage). |
Source code in ankaflow/models/core.py
407 408 409 410 411 412 413 414 415 416 |
|
load
¶
load(source: Union[str, Path, IO[str], Loadable]) -> Stages
Load a pipeline from YAML (path, YAML-string, file-like or Loadable).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source
|
str | Path | IO[str] | Loadable
|
|
required |
Returns:
Name | Type | Description |
---|---|---|
Stages |
Stages
|
a validated |
Source code in ankaflow/models/core.py
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 |
|
steps
¶
steps() -> Iterator[Stage]
Yield each stage in execution order.
Returns:
Type | Description |
---|---|
Iterator[Stage]
|
Iterator[Datablock]: An iterator over the stages, |
Iterator[Stage]
|
from first to last. |
Source code in ankaflow/models/core.py
398 399 400 401 402 403 404 405 |
|
Variables
¶
Bases: dict
Variables is a dict
-based collection
storing arbitrary data under keys. Variable can be populated via:
- Dictionary passed to pipeline:
Flow(defs, context, variables={'foo': 'bar'})
- Sink operation:
- kind: sink name: my_sink connection: kind: Variable locator: variable_name
This step will place a list of records from preceding step in
variables as {'variable_name': t.List[dict]}
In most cases sainking to variable is not needed as all preceding stages
in a pipeline can be referenced via name
. Sinking to variable is
useful when you need to share data with subpipeline.
Special variable is loop_control
which is populated
dynamically from previous step and current stage type is pipeline.
In case previous step generates multiple records then the new pipeline
is called for each record, and control variable holds a dict
representing the current record.
enums
¶
rest
¶
BasicHandler
¶
Bases: BaseModel
A no-op response handler used when no special processing (like pagination or transformation) is required.
Typically used for single-response REST endpoints where the entire payload is returned in one request.
Attributes:
Name | Type | Description |
---|---|---|
kind |
Literal
|
Specifies the handler type as BASIC. |
Paginator
¶
Bases: BaseModel
A response handler for paginated REST APIs.
This handler generates repeated requests by incrementing a
page-related parameter until no more data is available.
The stopping condition is usually inferred from the number
of records in the response being less than page_size
, or
from a total record count field.
increment
¶
increment: int
Page parameter increment. Original request configuration
should include initial value e.g. page_no=1
page_param
¶
page_param: str
Page parameter in the request (query or body) This will be incremented from request to request
page_size
¶
page_size: int
Page size should be explicitly defined. If response contains less records it is considered to be last page
param_locator
¶
param_locator: ParameterDisposition
Define where the parameter is located: body or query
throttle
¶
throttle: Optional[Union[int, float]] = None
If set to positive value then each page request is throttled given number of seconds.
Useful when dealing with rate limits or otherwise spreading the load over time.
total_records
¶
total_records: Optional[str] = None
JMESPath to total records count in the response.
Request
¶
Bases: BaseModel
body
¶
body: Optional[Union[str, Dict]] = None
Request body parameters.
This field accepts either:
- A Python dict
representing a direct key-value mapping, or
- A Jinja-templated JSON string with magic @json
prefix, e.g.:
@json{"parameter": "value"}
The template will be rendered using the following custom delimiters:
- << ... >>
for variable interpolation
- <% ... %>
for logic/control flow (e.g., for-loops)
- <# ... #>
for inline comments
The template will be rendered before being parsed into a valid JSON object. This allows the use of dynamic expressions, filters, and control flow such as loops.
Example with looping¶
Given:
variables = {
"MyList": [
{"id": 1, "value": 10},
{"id": 2, "value": 20}
]
}
You can generate a dynamic body with:
body: >
@json[
<% for row in API.look("MyTable", variables) %>
{ "id": << row.id >>, "value": << row.value >> }<% if not loop.last %>,<% endif %>
<% endfor %>
]
This will render to a proper JSON list:
[
{ "id": 1, "value": 10 },
{ "id": 2, "value": 20 }
]
Notes: - When using @json, the entire string is rendered as a Jinja template and then parsed with json.loads(). - Nested @json blocks are not supported. - Newlines and whitespace are automatically collapsed during rendering.
endpoint
¶
endpoint: str
Request endpoint e.g. get/data
under base url:
Example https://api.example.com/v1
+ get/data
errorhandler
¶
errorhandler: RestErrorHandler = Field(
default_factory=RestErrorHandler
)
Custom error handler e.g. for searching conditions in response or custom status codes
ResponseHandlerTypes
¶
RestAuth
¶
Bases: BaseModel
Authenctication configuration for Rest connection.
NOTE: Not all authentication methods may not work in browser due to limitations in the network API.
coerce_to_stringdict
¶
coerce_to_stringdict(v)
Rest header values must be strings. This convenience validator automatically converts regiular dictionary to StringDict.
Source code in ankaflow/models/rest.py
45 46 47 48 49 50 51 52 53 54 55 56 |
|
RestClientConfig
¶
Bases: BaseModel
Rest client for given base URL. Includes transport and authentication configuration
RestErrorHandler
¶
Bases: BaseModel
condition
¶
condition: Optional[str] = None
JMESPath expression to look for in the response body. Error will be generated if expression evaluates to True
error_status_codes
¶
error_status_codes: List[int] = []
List of HTTP status codes to be treated as errors.
message
¶
message: Optional[str] = None
JMESPath expression to extract error message from respose. If omitted entire response will be included in error.
RestResponse
¶
StatePoller
¶
Bases: BaseModel
A response handler for state-based polling APIs.
This handler is designed for asynchronous workflows where the
client repeatedly polls an endpoint until a certain state is reached
(e.g., job completion, resource readiness). Once the condition is met,
the pipeline continues by reading from the final data locator
.
URLPoller
¶
Bases: BaseModel
URL Poller makes request(s) to remote API until an URL is returned