Skip to content

Pipeline API

Pipeline API provides building blocks for YAML pipeline.

Each stage and configuration object here represents a specific part of the YAML definition file.

Example pipeline stage with all possible keys. Actual usage of keys depends on used model kind.

- name: str
  kind: source | sink | transform
  log_level: DEBUG
  skip_if: 
  on_error: error | continue
  connection: Connection
    kind: Parquet | Deltatable | Rest | ...
    locator: str
    config: ConnectionConfiguration
    client: RestClient
    request: RestRequest
      endpoint: str
      method: RequestMethod
      errorhandler: RestErrorHandler
      auth: RestAuth
        method: header | oauth2 | ...
        values: StringDict
      query: dict | @json-magic
      body: dict | @json-magic
      response: RestResponse
        handler: 
            kind: ResponseHandlerTypes
            page_param: str
            page_size: str
            param_locator: ParameterDisposition
            total_records: str
            throttle: int | float 
        content_type: DataType
        locator: str
    fields: Fields
  show: int
  show_schema: bool
  query: > str
  stages: Stages
  throttle: int

__all__

__all__ = [
    "Column",
    "Columns",
    "Stages",
    "Stage",
    "ConnectionConfiguration",
    "Variables",
    "FlowContext",
    "S3Config",
    "GSConfig",
    "ClickhouseConfig",
    "BigQueryConfig",
    "BucketConfig",
    "Connection",
    "PhysicalConnection",
    "EphemeralConnection",
    "ParquetConnection",
    "CSVConnection",
    "JSONConnection",
    "FileConnection",
    "BigQueryConnection",
    "ClickhouseConnection",
    "DeltatableConnection",
    "Dimension",
    "CustomConnection",
    "RestConnection",
    "SQLGenConnection",
]

BigQueryConfig

Bases: BaseModel

Configuration for accessing BigQuery datasets.

credential_file

credential_file: Optional[str] = None

Path to service account JSON credentials for BigQuery access.

dataset

dataset: Optional[str] = None

BigQuery dataset name.

project

project: Optional[str] = None

GCP project ID containing the BigQuery dataset.

region

region: Optional[str] = None

BigQuery region (e.g., "us-central1").

BigQueryConnection

Bases: PhysicalConnection, VersionedConnection

data_mode

data_mode: str = 'error'

Data mode for write operation. For Deltatable valid options are:

  • append adds new data
  • overwrite replaces all data
  • error fails (table is read-only)

key

key: Optional[List[str]] = None

List of versioned fields

kind

kind: Literal['BigQuery']

partition

partition: Optional[List[str]] = None

If set then delta table is partitioned using specified fields for faster reads

schema_mode

schema_mode: Optional[str] = None

Deltatable schema behaviour. If not set then write fails with error in case the data schema does not match existing table schema.

Schema evolution options:

  • merge adds new columns from data
  • overwrite adds new columns, drops missing

version

version: Optional[str] = None

Field for record version timestamp

BucketConfig

Bases: BaseModel

bucket

bucket: Optional[str] = None

Bucket name eg. s3://my-bucket or local absolute path eg. /data/path

data_prefix

data_prefix: Optional[str] = None

Prefix for data files: s3://my-bucket/<prefix> or /data/path/<prefix>

locator_wildcard

locator_wildcard: Optional[Tuple] = None

Regular expression and wildcard to modify locator. Useful in cases when you sink data to data-YYYY.parquet but want to read data-*.parquet. Provide tuple with patten as first element and replacement as second one. Example:

('-\d{4}-', '*')

This will create wildcard for data-YYYY-base.parquet as data*base.parquet.

Wildcard Description
* matches any number of any characters (including none)
** matches any number of subdirectories (including none)
[abc] matches one character given in the bracket
[a-z] matches one character from the range given in the bracket

Wildcard is automatically applied in tap and show_schema operations.

region

region: str | None = None

CSVConnection

ClickhouseConfig

Bases: DatabaseConfig

Configuration for ClickHouse database, extending generic SQL config.

blocksize

blocksize: int = 50000

Number of rows to process per block for batch operations.

ClickhouseConnection

Bases: PhysicalConnection, VersionedConnection

key

key: Optional[List[str]] = None

List of versioned fields

kind

kind: Literal['Clickhouse']

version

version: Optional[str] = None

Field for record version timestamp

Column

Bases: BaseModel

Data column (equivalent to database column)

name

name: str

Column name must follow rules set to SQL engine column names

type

type: str

Any data type support by SQL engine

Columns

Bases: RootModel[List[Column]]

Iterable list-like collection of Fields.

Connection

Bases: BaseModel

config

config: Optional[ConnectionConfiguration] = None

Optional configuration for the current connection. If not present then global configuration will be used.

create_statement

create_statement: str | None = None

Create statement for given table. Must be in requested dialect.

fields

fields: Optional[Columns] = None

If set then schema is used to generate source structure in case actual source does not provide data in which case generation of ephemeral view fails.

kind

kind: str

Model type e.g. Deltatable, Bigquery, Clickhouse, Parquet, File Custom connections can be loaded from module.

params

params: Optional[dict] = Field(default_factory=dict)

Any parameters that can be passed to connection.

show_schema

show_schema: Optional[bool] = None

If true then schema is automatically detected from the input data and logged '

ConnectionConfiguration

Bases: BaseModel

Top-level container for all connection configurations in a pipeline.

Includes default configuration blocks for supported sources and sinks like: - Local file systems - S3 and GCS buckets - BigQuery datasets - ClickHouse databases

These can be customized per pipeline or extended with new sources.

Example

To support MySQL, you can define a new model like this:

class MySQLConfig(DatabaseConfig):
    pass  # You can add MySQL-specific fields here if needed

class ExtendedConnectionConfiguration(ConnectionConfiguration):
    mysql: MySQLConfig = PydanticField(default_factory=MySQLConfig)

This will allow you to specify MySQL connection settings in your YAML:

connection:
  kind: MySQL
  config:
    host: localhost
    port: 3306
    database: sales
    username: admin
    password: secret

bigquery

bigquery: BigQueryConfig = Field(
    default_factory=BigQueryConfig
)

BigQuery connection configuration.

clickhouse

clickhouse: ClickhouseConfig = Field(
    default_factory=ClickhouseConfig
)

Language model configuration.

gs

gs: GSConfig = Field(default_factory=GSConfig)

Google Cloud Storage configuration.

llm

llm: LLMConfig = Field(default_factory=LLMConfig)

Language model configuration.

local

local: BucketConfig = Field(default_factory=BucketConfig)

Local file system configuration.

s3

s3: S3Config = Field(default_factory=S3Config)

S3 cloud storage configuration.

CustomConnection

Bases: PhysicalConnection

Custom connection provider. Custom connection may implement its own logic but must derive from base Connection class, and expose tap(), sink(), sql() and show_schema() even if they are no-op.

classname

classname: str

Name of the class to load from the module

module

module: str

Python module where the connection class is defined

params

params: Optional[dict] = Field(default_factory=dict)

Free-form configuration parameters passed to the loaded class

load

load() -> Type[Connection]

Dynamically load the connection class from the given module and member.

Returns:

Type Description
Type[Connection]

A subclass of Connection.

Raises:

Type Description
ImportError

If the module or member can't be imported.

Source code in ankaflow/models/connections.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def load(self) -> t.Type[Connection]:
    """
    Dynamically load the connection class from the given module and member.

    Returns:
        A subclass of Connection.

    Raises:
        ImportError: If the module or member can't be imported.
    """
    mod = importlib.import_module(self.module)
    try:
        cls = getattr(mod, self.classname)
    except AttributeError as e:
        raise ImportError(
            f"Could not load '{self.classname}' from module '{self.module}'"
        ) from e
    # TODO: Refactor imports to facilitate this check early
    # if not issubclass(cls, Connection):
    #     raise TypeError(
    #         f"{cls.__name__} is not a subclass of Connection"
    #     )

    return cls

DeltatableConnection

Bases: PhysicalConnection, VersionedConnection

data_mode

data_mode: str = 'error'

Data mode for write operation. For Deltatable valid options are:

  • append adds new data
  • overwrite replaces all data
  • error fails (table is read-only)

kind

kind: Literal['Deltatable']

optimize

optimize: Optional[Union[str, int]] = 1

Use with Deltatable and other engines whether to optimize after each sink operation. With larger tables this may be a lengthy synchronous operation.

Default value is optimize and vacuum with 7 day retention.

Deltatable

Values are optimize,vacuum,all,Literal[int]. If value is literal int is provided then parts older than number of days will be removed. Note that this will override default retention days.

String options vacuum,all are equivalent to 0.

partition

partition: Optional[List[str]] = None

If set then delta table is partitioned using specified fields for faster reads

schema_mode

schema_mode: Optional[str] = None

Deltatable schema behaviour. If not set then write fails with error in case the data schema does not match existing table schema.

Schema evolution options:

  • merge adds new columns from data
  • overwrite adds new columns, drops missing

Dimension

Bases: EphemeralConnection

load

load() -> Type[Connection]

Dynamically load the connection class from the given module and member.

Returns:

Type Description
Type[Connection]

A subclass of Connection.

Raises:

Type Description
ImportError

If the module or member can't be imported.

Source code in ankaflow/models/connections.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def load(self) -> t.Type[Connection]:
    """
    Dynamically load the connection class from the given module and member.

    Returns:
        A subclass of Connection.

    Raises:
        ImportError: If the module or member can't be imported.
    """
    mod = importlib.import_module(self.module)
    try:
        cls = getattr(mod, self.kind)
    except AttributeError as e:
        raise ImportError(
            f"Could not load '{self.kind}' from module '{self.module}'"
        ) from e

    return cls

EphemeralConnection

Bases: Connection

FileConnection

FlowContext

Bases: ImmutableMap

Context dictionary can be used to supply arbitrary data to pipeline that can be referenced in templates much much like variables with the difference that they cannot be modified at runtime.

Context can be initiated via dictionary:

context = FlowContext(**{'foo':'Bar'}])

Items can be reference using both bracket and dot notation:

context.foo == context['foo'] == 'Bar'

GSConfig

Bases: BucketConfig

Google Cloud Storage (GCS) configuration.

  • HMAC credentials allow reading/writing Parquet, JSON, and CSV.
  • Delta table writes require a full service account credential file.

credential_file

credential_file: Optional[str] = None

Path to GCP service account credential file (JSON). Required for certain write operations (e.g., Delta tables).

hmac_key

hmac_key: Optional[str] = None

GCS HMAC key ID for authenticated access.

hmac_secret

hmac_secret: Optional[str] = None

GCS HMAC secret for authenticated access.

JSONConnection

ParquetConnection

PhysicalConnection

Bases: Connection

locator

locator: str

Table name or file name or URI, or other identifier required by the connection.

RestConnection

Bases: EphemeralConnection

Configuration for a REST-based data connection.

This model defines how to configure and execute REST API requests as part of a pipeline step. It includes the request definition, client behavior (e.g., retries, headers), and optional schema discovery.

client

Configuration for the REST client (e.g., base URL, headers, auth).

fields

fields: Optional[Columns] = None

Optional schema definition used to validate or transform response data.

It is recommended to manually specify the schema after initial discovery. This ensures downstream pipeline stages remain stable, even when the remote API returns no results (e.g., due to no updates in an incremental fetch). Explicit schema prevents silent failures or schema drift in such cases.

kind

kind: Literal['Rest']

Specifies the connection type as "Rest".

request

request: Request

Request template specifying method, path, body, etc.

show_schema

show_schema: Optional[bool] = None

If True, the connector will attempt to infer or display the response schema automatically.

S3Config

Bases: BucketConfig

S3-specific bucket configuration including optional authentication.

access_key_id

access_key_id: Optional[str] = None

AWS access key ID for private S3 access.

secret_access_key

secret_access_key: Optional[str] = None

AWS secret access key for private S3 access.

SQLGenConnection

Bases: EphemeralConnection

SQLGen connection is intended for SQL code generation:

Prompt should instruct the model to generate SQL query and and executes resutling a VIEW in the internal database.

Example:

Stage 1: Name: ReadSomeParquetData

Stage 2: Name: CodeGen, query: Given SQL table `ReadSomeParquetData` generate SQL query to
    count number of rows in the table.

Inside stage 2 the following happens:

1. Prompt is sent to inferecne endpoint

2. Endpoint is expected to respond with valid SQL

3. Connection will execute a statement `CREATE OR REPLACE VIEW StageName AS <received_select_statement>`
    where statement in the exmaple is likely `SELECT COUNT() FROM ReadSomeParquetData`

.

kind

kind: Literal['SQLGen']

Specifies the kind==SQLGen

variables

variables: dict | None = None

Variables passed to Prompt. Prompt must be supplied in the query field of the Stage.

Prompt may contain Jinja2-style placeholders:

Example

Here's my name: {{name}}.

The connection will render the prompt template using variables

Stage

Bases: BaseModel

connection

Defines how the data is read from / written to the target.

Connection fields may contain templates and they will be recursively.

Special construct is JSON> which allows dynamically generating parameters as runtime:

- kind: source
  name: source_name
  connection:
    kind: File
    params: >
      JSON>{
        "key": "value",
        "dynkey": <<API.property>>,
      }

In the above app params are constructed as JSON string. It is possible to even construct parameter keys dynamically:

params: >
  JSON>
  {
    <% for number in [1,2,3] %>
    "key_<< number >>":<< number >><% if not loop.last %>,<% endif %>
    <% endfor %>
  }

Above example results the following:

params: >
  {
    "key_1": 1,
    "key_2": 2,
    "key_3": 3
  }

JSON> structure cannot contain nested JSON> structures, the entire string following the JSON> header must result a valid JSON.

Inbuilt connections include:

  • Deltatable (read)

If connection is Deltatable then query is required to narrow down data stored in the delta table. FROM clause must be Deltatable: - kind: source name: delta_tap connection: kind: Deltatable locator: delta_table query: > select * from Deltatable

  • Deltatable (write)

See also StorageOptions

The following example writes data from preceding stage to delta table, appending the data, partitions using part column, and optimizes and vacuums immediately without retention after write. - kind: sink name: delta_sink connection: kind: Deltatable locator: delta_table optimize: 0 data_mode: append partition: - part_field

  • Parquet (read/write)
  • JSON (read/write; NOTE: write operation generates newline-delimited JSON)
  • CSV (read/write)
  • Variable (read/write)

  • File (read)

File can be read from a connected filesystem (including s3). File name and file type must be specified in the pipeline context:

- `context.FileName`: file name relative to `file_prefix`
- `context.FileType`: file type, one of CSV, XLSX, JSON, HTML

Any file reader configuration must be passed in params.

  • Rest (bidirectional)

Rest connections consist of two parts: Client and Request. Client contains base URL and authentication (basic, digest, header and oauth2 are supported).

- name: TheRest kind: source connection: kind: Rest client: base_url: https://api.example.com auth: # Every request to given endpoint share the same authentication method: basic values: username: TheName password: ThePassword request: endpoint: /v1/some-path content_type: application/json method: post query: # Query parameters date: <<API.dt(None).isoformat()>> body: # JSON payload param: 1 response: content_type: application/json locator: "JMESPath.to.data"

Any custom source (API) can be used as long as available via module.

context

context: Optional[FlowContext] = None

@private Global context passed to given stage

explain

explain: Optional[bool] = None

If set to true then SQL query explanation will be logged.

fields

fields: Optional[List[Column]] = None

Explicitly defined output fields.

kind

kind: str

Defines which action will be performed:

  • source
  • transform
  • sink
  • pipeline
  • sql
Sink

Sink reads output from previous stage and stores in specified location.

NOTE: If query is supplied with the stage then sink uses output of the query rather than preceding stage directly. Subsequent sink will use preceding stage. If supplied query must create either view or table with same name as current stage.

- name: my_sink
kind: sink
connection:
    kind: Variable
query: >
    CREATE VIEW my_sink AS
    SELECT
    1 as foo
Pipeline

If pipeline is preceded by any stage then the subpipeline will be executed as many times as there are rows in the previous stage output. This is useful if you want to run same pipeline with different parameters. Make sure the pipeline is preceded by source or transform producing required number of rows. If you need to run subpipeline only once there are two options:

  1. Place it to the top
  2. Preced with tranform producing single row only

Each row is then passed to subpipeline in a special variable.

Example pipeline iterating subpipeline 5 times:

  • kind: transform name: generate_5 # Generate 5 rows query: > select unnest as current_variable from unnest(generate_series(1,5)) show: 5
  • kind: pipeline name: looped_5x stages: - kind: transform name: inside_loop # In query we can reference the value passed from parent pipeline query: > select 'Currently running iteration: {API.look('loop_control.current_variable', variables)}' as value show: 5

locator

locator: Optional[str] = None

Currently unused: Name for the connection configuration: name, or URI.

log_level

log_level: Optional[LogLevel] = None

Set logging level. All stages after (including current) will log with specified level. Possible values: INFO (default), DEBUG, WARNING. Log level will be reset to INFO after each pipeline (including nested pipelines).

name

name: str

Name of the stage, must be unique across all stages in the pipeline and conform to the rules: Must start with letter, may contain lowercase letters, number and underscores. Name is used to reference this stage by other subsequent stages.

on_error

on_error: str = 'fail'

If set to 'continue' then pipeline will not fail. Subsequent stages referring to failed one must handle missing data.

query

query: Optional[str] = None

SQL Query or dictionary with custom source parameters. May contain {dynamic variables}.

show

show: int = 0

If set to positive integer then given number of rows from this stage will get logged. If set to -1 then all rows will be loggged. Set to 0 to disable logging.

show_schema

show_schema: Optional[bool] = None

If True then schema is logged

skip_if

skip_if: Optional[Any] = None

Any value that can evaluated using bool(). or template string e.g. << True >>. When the expression evaluates to True then the stage is skipped.

stages

stages: Optional[Stages] = None

Used when kind is Flow

throttle

throttle: Optional[Union[int, float]] = None

If set to positive value then flow execution will be paused after the stage for given number of seconds.

Useful when dealing with rate limits or otherwise spreading the load over time.

Stages

Bases: RootModel[List[Stage]]

A sequence of processing stages in a data pipeline.

Each Datablock in root represents one discrete step or transformation in an end-to-end business workflow (e.g., tap, transform, sink, validate).

Attributes:

Name Type Description
root List[Datablock]

Ordered list of pipeline stages to execute.

enumerate_steps

enumerate_steps() -> Iterator[tuple[int, Stage]]

Yield each stage along with its 0-based position.

Use this when you need both the stage and its index for logging, metrics, or conditional branching.

Returns:

Type Description
Iterator[tuple[int, Stage]]

Iterator[Tuple[int, Datablock]]: Pairs of (index, stage).

Source code in ankaflow/models/core.py
407
408
409
410
411
412
413
414
415
416
def enumerate_steps(self) -> t.Iterator[tuple[int, Stage]]:
    """Yield each stage along with its 0-based position.

    Use this when you need both the stage and its index for logging,
    metrics, or conditional branching.

    Returns:
        Iterator[Tuple[int, Datablock]]: Pairs of (index, stage).
    """
    return enumerate(self.root)

load

load(source: Union[str, Path, IO[str], Loadable]) -> Stages

Load a pipeline from YAML (path, YAML-string, file-like or Loadable).

Parameters:

Name Type Description Default
source str | Path | IO[str] | Loadable
  • Path to a .yaml file
  • Raw YAML content
  • File-like object returning YAML
  • Any object with a .load() method returning Python data
required

Returns:

Name Type Description
Stages Stages

a validated Stages instance.

Source code in ankaflow/models/core.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
@classmethod
def load(
    cls,
    source: t.Union[str, Path, t.IO[str], Loadable],
) -> "Stages":
    """Load a pipeline from YAML (path, YAML-string, file-like or Loadable).

    Args:
        source (str | Path | IO[str] | Loadable):
            - Path to a .yaml file
            - Raw YAML content
            - File-like object returning YAML
            - Any object with a `.load()` method returning Python data

    Returns:
        Stages: a validated `Stages` instance.
    """
    # 1) If it’s a loader-object, pull Python data directly
    if isinstance(source, Loadable):
        data = source.load()

    else:
        # 2) Read text for YAML parsing:
        if hasattr(source, "read"):
            text = t.cast(t.IO[str], source).read()
        else:
            text = str(source)

        # 3) First, try parsing as raw YAML
        try:
            data = yaml.safe_load(text)
        except yaml.YAMLError:
            data = None

        # 4) Only if that parse returned a `str` we treat it as a filename
        if isinstance(data, str):
            try:
                text = Path(data).read_text()
                data = yaml.safe_load(text)
            except (OSError, yaml.YAMLError) as e:
                raise ValueError(
                    f"Could not interpret {data!r} as YAML or file path"
                ) from e

    # 5) Validate final shape
    if not isinstance(data, list):
        raise ValueError(
            f"Expected a list of pipeline stages, got {type(data).__name__}"
        )

    # 5) Finally, validate into our model
    return cls.model_validate(data)

steps

steps() -> Iterator[Stage]

Yield each stage in execution order.

Returns:

Type Description
Iterator[Stage]

Iterator[Datablock]: An iterator over the stages,

Iterator[Stage]

from first to last.

Source code in ankaflow/models/core.py
398
399
400
401
402
403
404
405
def steps(self) -> t.Iterator[Stage]:
    """Yield each stage in execution order.

    Returns:
        Iterator[Datablock]: An iterator over the stages,
        from first to last.
    """
    return iter(self.root)

Variables

Bases: dict

Variables is a dict-based collection storing arbitrary data under keys. Variable can be populated via:

  • Dictionary passed to pipeline: Flow(defs, context, variables={'foo': 'bar'})
  • Sink operation:
    - kind: sink
    
        name: my_sink
    
        connection:
    
            kind: Variable
    
            locator: variable_name
    

This step will place a list of records from preceding step in variables as {'variable_name': t.List[dict]}

In most cases sainking to variable is not needed as all preceding stages in a pipeline can be referenced via name. Sinking to variable is useful when you need to share data with subpipeline.

Special variable is loop_control which is populated dynamically from previous step and current stage type is pipeline. In case previous step generates multiple records then the new pipeline is called for each record, and control variable holds a dict representing the current record.

components

Column

Bases: BaseModel

Data column (equivalent to database column)

name

name: str

Column name must follow rules set to SQL engine column names

type

type: str

Any data type support by SQL engine

Columns

Bases: RootModel[List[Column]]

Iterable list-like collection of Fields.

configs

BigQueryConfig

Bases: BaseModel

Configuration for accessing BigQuery datasets.

credential_file

credential_file: Optional[str] = None

Path to service account JSON credentials for BigQuery access.

dataset

dataset: Optional[str] = None

BigQuery dataset name.

project

project: Optional[str] = None

GCP project ID containing the BigQuery dataset.

region

region: Optional[str] = None

BigQuery region (e.g., "us-central1").

BucketConfig

Bases: BaseModel

bucket

bucket: Optional[str] = None

Bucket name eg. s3://my-bucket or local absolute path eg. /data/path

data_prefix

data_prefix: Optional[str] = None

Prefix for data files: s3://my-bucket/<prefix> or /data/path/<prefix>

locator_wildcard

locator_wildcard: Optional[Tuple] = None

Regular expression and wildcard to modify locator. Useful in cases when you sink data to data-YYYY.parquet but want to read data-*.parquet. Provide tuple with patten as first element and replacement as second one. Example:

('-\d{4}-', '*')

This will create wildcard for data-YYYY-base.parquet as data*base.parquet.

Wildcard Description
* matches any number of any characters (including none)
** matches any number of subdirectories (including none)
[abc] matches one character given in the bracket
[a-z] matches one character from the range given in the bracket

Wildcard is automatically applied in tap and show_schema operations.

region

region: str | None = None

ClickhouseConfig

Bases: DatabaseConfig

Configuration for ClickHouse database, extending generic SQL config.

blocksize

blocksize: int = 50000

Number of rows to process per block for batch operations.

ConnectionConfiguration

Bases: BaseModel

Top-level container for all connection configurations in a pipeline.

Includes default configuration blocks for supported sources and sinks like: - Local file systems - S3 and GCS buckets - BigQuery datasets - ClickHouse databases

These can be customized per pipeline or extended with new sources.

Example

To support MySQL, you can define a new model like this:

class MySQLConfig(DatabaseConfig):
    pass  # You can add MySQL-specific fields here if needed

class ExtendedConnectionConfiguration(ConnectionConfiguration):
    mysql: MySQLConfig = PydanticField(default_factory=MySQLConfig)

This will allow you to specify MySQL connection settings in your YAML:

connection:
  kind: MySQL
  config:
    host: localhost
    port: 3306
    database: sales
    username: admin
    password: secret

bigquery

bigquery: BigQueryConfig = Field(
    default_factory=BigQueryConfig
)

BigQuery connection configuration.

clickhouse

clickhouse: ClickhouseConfig = Field(
    default_factory=ClickhouseConfig
)

Language model configuration.

gs

gs: GSConfig = Field(default_factory=GSConfig)

Google Cloud Storage configuration.

llm

llm: LLMConfig = Field(default_factory=LLMConfig)

Language model configuration.

local

local: BucketConfig = Field(default_factory=BucketConfig)

Local file system configuration.

s3

s3: S3Config = Field(default_factory=S3Config)

S3 cloud storage configuration.

DatabaseConfig

Bases: BaseModel

Base class for SQL database connection configurations.

cluster

cluster: Optional[str] = None

Optional cluster name or identifier (used in ClickHouse and other distributed systems).

database

database: Optional[str] = None

Database name.

host

host: Optional[str] = None

Hostname or IP address of the database server.

password

password: Optional[str] = None

Password for authentication.

port

port: Optional[int | str] = None

Database port.

username

username: Optional[str] = None

Username for authentication.

GSConfig

Bases: BucketConfig

Google Cloud Storage (GCS) configuration.

  • HMAC credentials allow reading/writing Parquet, JSON, and CSV.
  • Delta table writes require a full service account credential file.

credential_file

credential_file: Optional[str] = None

Path to GCP service account credential file (JSON). Required for certain write operations (e.g., Delta tables).

hmac_key

hmac_key: Optional[str] = None

GCS HMAC key ID for authenticated access.

hmac_secret

hmac_secret: Optional[str] = None

GCS HMAC secret for authenticated access.

S3Config

Bases: BucketConfig

S3-specific bucket configuration including optional authentication.

access_key_id

access_key_id: Optional[str] = None

AWS access key ID for private S3 access.

secret_access_key

secret_access_key: Optional[str] = None

AWS secret access key for private S3 access.

connections

BigQueryConnection

Bases: PhysicalConnection, VersionedConnection

data_mode

data_mode: str = 'error'

Data mode for write operation. For Deltatable valid options are:

  • append adds new data
  • overwrite replaces all data
  • error fails (table is read-only)

key

key: Optional[List[str]] = None

List of versioned fields

kind

kind: Literal['BigQuery']

partition

partition: Optional[List[str]] = None

If set then delta table is partitioned using specified fields for faster reads

schema_mode

schema_mode: Optional[str] = None

Deltatable schema behaviour. If not set then write fails with error in case the data schema does not match existing table schema.

Schema evolution options:

  • merge adds new columns from data
  • overwrite adds new columns, drops missing

version

version: Optional[str] = None

Field for record version timestamp

CSVConnection

ClickhouseConnection

Bases: PhysicalConnection, VersionedConnection

key

key: Optional[List[str]] = None

List of versioned fields

kind

kind: Literal['Clickhouse']

version

version: Optional[str] = None

Field for record version timestamp

Connection

Bases: BaseModel

config

config: Optional[ConnectionConfiguration] = None

Optional configuration for the current connection. If not present then global configuration will be used.

create_statement

create_statement: str | None = None

Create statement for given table. Must be in requested dialect.

fields

fields: Optional[Columns] = None

If set then schema is used to generate source structure in case actual source does not provide data in which case generation of ephemeral view fails.

kind

kind: str

Model type e.g. Deltatable, Bigquery, Clickhouse, Parquet, File Custom connections can be loaded from module.

params

params: Optional[dict] = Field(default_factory=dict)

Any parameters that can be passed to connection.

show_schema

show_schema: Optional[bool] = None

If true then schema is automatically detected from the input data and logged '

CustomConnection

Bases: PhysicalConnection

Custom connection provider. Custom connection may implement its own logic but must derive from base Connection class, and expose tap(), sink(), sql() and show_schema() even if they are no-op.

classname

classname: str

Name of the class to load from the module

module

module: str

Python module where the connection class is defined

params

params: Optional[dict] = Field(default_factory=dict)

Free-form configuration parameters passed to the loaded class

load

load() -> Type[Connection]

Dynamically load the connection class from the given module and member.

Returns:

Type Description
Type[Connection]

A subclass of Connection.

Raises:

Type Description
ImportError

If the module or member can't be imported.

Source code in ankaflow/models/connections.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def load(self) -> t.Type[Connection]:
    """
    Dynamically load the connection class from the given module and member.

    Returns:
        A subclass of Connection.

    Raises:
        ImportError: If the module or member can't be imported.
    """
    mod = importlib.import_module(self.module)
    try:
        cls = getattr(mod, self.classname)
    except AttributeError as e:
        raise ImportError(
            f"Could not load '{self.classname}' from module '{self.module}'"
        ) from e
    # TODO: Refactor imports to facilitate this check early
    # if not issubclass(cls, Connection):
    #     raise TypeError(
    #         f"{cls.__name__} is not a subclass of Connection"
    #     )

    return cls

DeltatableConnection

Bases: PhysicalConnection, VersionedConnection

data_mode

data_mode: str = 'error'

Data mode for write operation. For Deltatable valid options are:

  • append adds new data
  • overwrite replaces all data
  • error fails (table is read-only)

kind

kind: Literal['Deltatable']

optimize

optimize: Optional[Union[str, int]] = 1

Use with Deltatable and other engines whether to optimize after each sink operation. With larger tables this may be a lengthy synchronous operation.

Default value is optimize and vacuum with 7 day retention.

Deltatable

Values are optimize,vacuum,all,Literal[int]. If value is literal int is provided then parts older than number of days will be removed. Note that this will override default retention days.

String options vacuum,all are equivalent to 0.

partition

partition: Optional[List[str]] = None

If set then delta table is partitioned using specified fields for faster reads

schema_mode

schema_mode: Optional[str] = None

Deltatable schema behaviour. If not set then write fails with error in case the data schema does not match existing table schema.

Schema evolution options:

  • merge adds new columns from data
  • overwrite adds new columns, drops missing

Dimension

Bases: EphemeralConnection

load

load() -> Type[Connection]

Dynamically load the connection class from the given module and member.

Returns:

Type Description
Type[Connection]

A subclass of Connection.

Raises:

Type Description
ImportError

If the module or member can't be imported.

Source code in ankaflow/models/connections.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def load(self) -> t.Type[Connection]:
    """
    Dynamically load the connection class from the given module and member.

    Returns:
        A subclass of Connection.

    Raises:
        ImportError: If the module or member can't be imported.
    """
    mod = importlib.import_module(self.module)
    try:
        cls = getattr(mod, self.kind)
    except AttributeError as e:
        raise ImportError(
            f"Could not load '{self.kind}' from module '{self.module}'"
        ) from e

    return cls

FileConnection

JSONConnection

ParquetConnection

PhysicalConnection

Bases: Connection

locator

locator: str

Table name or file name or URI, or other identifier required by the connection.

RestConnection

Bases: EphemeralConnection

Configuration for a REST-based data connection.

This model defines how to configure and execute REST API requests as part of a pipeline step. It includes the request definition, client behavior (e.g., retries, headers), and optional schema discovery.

client

Configuration for the REST client (e.g., base URL, headers, auth).

fields

fields: Optional[Columns] = None

Optional schema definition used to validate or transform response data.

It is recommended to manually specify the schema after initial discovery. This ensures downstream pipeline stages remain stable, even when the remote API returns no results (e.g., due to no updates in an incremental fetch). Explicit schema prevents silent failures or schema drift in such cases.

kind

kind: Literal['Rest']

Specifies the connection type as "Rest".

request

request: Request

Request template specifying method, path, body, etc.

show_schema

show_schema: Optional[bool] = None

If True, the connector will attempt to infer or display the response schema automatically.

SQLGenConnection

Bases: EphemeralConnection

SQLGen connection is intended for SQL code generation:

Prompt should instruct the model to generate SQL query and and executes resutling a VIEW in the internal database.

Example:

Stage 1: Name: ReadSomeParquetData

Stage 2: Name: CodeGen, query: Given SQL table `ReadSomeParquetData` generate SQL query to
    count number of rows in the table.

Inside stage 2 the following happens:

1. Prompt is sent to inferecne endpoint

2. Endpoint is expected to respond with valid SQL

3. Connection will execute a statement `CREATE OR REPLACE VIEW StageName AS <received_select_statement>`
    where statement in the exmaple is likely `SELECT COUNT() FROM ReadSomeParquetData`

.

kind

kind: Literal['SQLGen']

Specifies the kind==SQLGen

variables

variables: dict | None = None

Variables passed to Prompt. Prompt must be supplied in the query field of the Stage.

Prompt may contain Jinja2-style placeholders:

Example

Here's my name: {{name}}.

The connection will render the prompt template using variables

VariableConnection

VersionedConnection

Bases: BaseModel

key

key: Optional[List[str]] = None

List of versioned fields

version

version: Optional[str] = None

Field for record version timestamp

core

FlowContext

Bases: ImmutableMap

Context dictionary can be used to supply arbitrary data to pipeline that can be referenced in templates much much like variables with the difference that they cannot be modified at runtime.

Context can be initiated via dictionary:

context = FlowContext(**{'foo':'Bar'}])

Items can be reference using both bracket and dot notation:

context.foo == context['foo'] == 'Bar'

Stage

Bases: BaseModel

connection

Defines how the data is read from / written to the target.

Connection fields may contain templates and they will be recursively.

Special construct is JSON> which allows dynamically generating parameters as runtime:

- kind: source
  name: source_name
  connection:
    kind: File
    params: >
      JSON>{
        "key": "value",
        "dynkey": <<API.property>>,
      }

In the above app params are constructed as JSON string. It is possible to even construct parameter keys dynamically:

params: >
  JSON>
  {
    <% for number in [1,2,3] %>
    "key_<< number >>":<< number >><% if not loop.last %>,<% endif %>
    <% endfor %>
  }

Above example results the following:

params: >
  {
    "key_1": 1,
    "key_2": 2,
    "key_3": 3
  }

JSON> structure cannot contain nested JSON> structures, the entire string following the JSON> header must result a valid JSON.

Inbuilt connections include:

  • Deltatable (read)

If connection is Deltatable then query is required to narrow down data stored in the delta table. FROM clause must be Deltatable: - kind: source name: delta_tap connection: kind: Deltatable locator: delta_table query: > select * from Deltatable

  • Deltatable (write)

See also StorageOptions

The following example writes data from preceding stage to delta table, appending the data, partitions using part column, and optimizes and vacuums immediately without retention after write. - kind: sink name: delta_sink connection: kind: Deltatable locator: delta_table optimize: 0 data_mode: append partition: - part_field

  • Parquet (read/write)
  • JSON (read/write; NOTE: write operation generates newline-delimited JSON)
  • CSV (read/write)
  • Variable (read/write)

  • File (read)

File can be read from a connected filesystem (including s3). File name and file type must be specified in the pipeline context:

- `context.FileName`: file name relative to `file_prefix`
- `context.FileType`: file type, one of CSV, XLSX, JSON, HTML

Any file reader configuration must be passed in params.

  • Rest (bidirectional)

Rest connections consist of two parts: Client and Request. Client contains base URL and authentication (basic, digest, header and oauth2 are supported).

- name: TheRest kind: source connection: kind: Rest client: base_url: https://api.example.com auth: # Every request to given endpoint share the same authentication method: basic values: username: TheName password: ThePassword request: endpoint: /v1/some-path content_type: application/json method: post query: # Query parameters date: <<API.dt(None).isoformat()>> body: # JSON payload param: 1 response: content_type: application/json locator: "JMESPath.to.data"

Any custom source (API) can be used as long as available via module.

context

context: Optional[FlowContext] = None

@private Global context passed to given stage

explain

explain: Optional[bool] = None

If set to true then SQL query explanation will be logged.

fields

fields: Optional[List[Column]] = None

Explicitly defined output fields.

kind

kind: str

Defines which action will be performed:

  • source
  • transform
  • sink
  • pipeline
  • sql
Sink

Sink reads output from previous stage and stores in specified location.

NOTE: If query is supplied with the stage then sink uses output of the query rather than preceding stage directly. Subsequent sink will use preceding stage. If supplied query must create either view or table with same name as current stage.

- name: my_sink
kind: sink
connection:
    kind: Variable
query: >
    CREATE VIEW my_sink AS
    SELECT
    1 as foo
Pipeline

If pipeline is preceded by any stage then the subpipeline will be executed as many times as there are rows in the previous stage output. This is useful if you want to run same pipeline with different parameters. Make sure the pipeline is preceded by source or transform producing required number of rows. If you need to run subpipeline only once there are two options:

  1. Place it to the top
  2. Preced with tranform producing single row only

Each row is then passed to subpipeline in a special variable.

Example pipeline iterating subpipeline 5 times:

  • kind: transform name: generate_5 # Generate 5 rows query: > select unnest as current_variable from unnest(generate_series(1,5)) show: 5
  • kind: pipeline name: looped_5x stages: - kind: transform name: inside_loop # In query we can reference the value passed from parent pipeline query: > select 'Currently running iteration: {API.look('loop_control.current_variable', variables)}' as value show: 5

locator

locator: Optional[str] = None

Currently unused: Name for the connection configuration: name, or URI.

log_level

log_level: Optional[LogLevel] = None

Set logging level. All stages after (including current) will log with specified level. Possible values: INFO (default), DEBUG, WARNING. Log level will be reset to INFO after each pipeline (including nested pipelines).

name

name: str

Name of the stage, must be unique across all stages in the pipeline and conform to the rules: Must start with letter, may contain lowercase letters, number and underscores. Name is used to reference this stage by other subsequent stages.

on_error

on_error: str = 'fail'

If set to 'continue' then pipeline will not fail. Subsequent stages referring to failed one must handle missing data.

query

query: Optional[str] = None

SQL Query or dictionary with custom source parameters. May contain {dynamic variables}.

show

show: int = 0

If set to positive integer then given number of rows from this stage will get logged. If set to -1 then all rows will be loggged. Set to 0 to disable logging.

show_schema

show_schema: Optional[bool] = None

If True then schema is logged

skip_if

skip_if: Optional[Any] = None

Any value that can evaluated using bool(). or template string e.g. << True >>. When the expression evaluates to True then the stage is skipped.

stages

stages: Optional[Stages] = None

Used when kind is Flow

throttle

throttle: Optional[Union[int, float]] = None

If set to positive value then flow execution will be paused after the stage for given number of seconds.

Useful when dealing with rate limits or otherwise spreading the load over time.

Stages

Bases: RootModel[List[Stage]]

A sequence of processing stages in a data pipeline.

Each Datablock in root represents one discrete step or transformation in an end-to-end business workflow (e.g., tap, transform, sink, validate).

Attributes:

Name Type Description
root List[Datablock]

Ordered list of pipeline stages to execute.

enumerate_steps

enumerate_steps() -> Iterator[tuple[int, Stage]]

Yield each stage along with its 0-based position.

Use this when you need both the stage and its index for logging, metrics, or conditional branching.

Returns:

Type Description
Iterator[tuple[int, Stage]]

Iterator[Tuple[int, Datablock]]: Pairs of (index, stage).

Source code in ankaflow/models/core.py
407
408
409
410
411
412
413
414
415
416
def enumerate_steps(self) -> t.Iterator[tuple[int, Stage]]:
    """Yield each stage along with its 0-based position.

    Use this when you need both the stage and its index for logging,
    metrics, or conditional branching.

    Returns:
        Iterator[Tuple[int, Datablock]]: Pairs of (index, stage).
    """
    return enumerate(self.root)

load

load(source: Union[str, Path, IO[str], Loadable]) -> Stages

Load a pipeline from YAML (path, YAML-string, file-like or Loadable).

Parameters:

Name Type Description Default
source str | Path | IO[str] | Loadable
  • Path to a .yaml file
  • Raw YAML content
  • File-like object returning YAML
  • Any object with a .load() method returning Python data
required

Returns:

Name Type Description
Stages Stages

a validated Stages instance.

Source code in ankaflow/models/core.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
@classmethod
def load(
    cls,
    source: t.Union[str, Path, t.IO[str], Loadable],
) -> "Stages":
    """Load a pipeline from YAML (path, YAML-string, file-like or Loadable).

    Args:
        source (str | Path | IO[str] | Loadable):
            - Path to a .yaml file
            - Raw YAML content
            - File-like object returning YAML
            - Any object with a `.load()` method returning Python data

    Returns:
        Stages: a validated `Stages` instance.
    """
    # 1) If it’s a loader-object, pull Python data directly
    if isinstance(source, Loadable):
        data = source.load()

    else:
        # 2) Read text for YAML parsing:
        if hasattr(source, "read"):
            text = t.cast(t.IO[str], source).read()
        else:
            text = str(source)

        # 3) First, try parsing as raw YAML
        try:
            data = yaml.safe_load(text)
        except yaml.YAMLError:
            data = None

        # 4) Only if that parse returned a `str` we treat it as a filename
        if isinstance(data, str):
            try:
                text = Path(data).read_text()
                data = yaml.safe_load(text)
            except (OSError, yaml.YAMLError) as e:
                raise ValueError(
                    f"Could not interpret {data!r} as YAML or file path"
                ) from e

    # 5) Validate final shape
    if not isinstance(data, list):
        raise ValueError(
            f"Expected a list of pipeline stages, got {type(data).__name__}"
        )

    # 5) Finally, validate into our model
    return cls.model_validate(data)

steps

steps() -> Iterator[Stage]

Yield each stage in execution order.

Returns:

Type Description
Iterator[Stage]

Iterator[Datablock]: An iterator over the stages,

Iterator[Stage]

from first to last.

Source code in ankaflow/models/core.py
398
399
400
401
402
403
404
405
def steps(self) -> t.Iterator[Stage]:
    """Yield each stage in execution order.

    Returns:
        Iterator[Datablock]: An iterator over the stages,
        from first to last.
    """
    return iter(self.root)

Variables

Bases: dict

Variables is a dict-based collection storing arbitrary data under keys. Variable can be populated via:

  • Dictionary passed to pipeline: Flow(defs, context, variables={'foo': 'bar'})
  • Sink operation:
    - kind: sink
    
        name: my_sink
    
        connection:
    
            kind: Variable
    
            locator: variable_name
    

This step will place a list of records from preceding step in variables as {'variable_name': t.List[dict]}

In most cases sainking to variable is not needed as all preceding stages in a pipeline can be referenced via name. Sinking to variable is useful when you need to share data with subpipeline.

Special variable is loop_control which is populated dynamically from previous step and current stage type is pipeline. In case previous step generates multiple records then the new pipeline is called for each record, and control variable holds a dict representing the current record.

enums

AuthType

Bases: Enum

ContentType

Bases: Enum

DataType

Bases: Enum

LogLevel

Bases: Enum

ModelType

Bases: Enum

ParameterDisposition

Bases: Enum

RequestMethod

Bases: Enum

llm

LLMConfig

Bases: BaseModel

Confuration for Language Model

kind

kind: LLMKind = MOCK

Language model provider: mock(default)|openai

model

model: str | None = None

Language model, if not set then uses default

proxy

proxy: LLMProxy | None = None

Reverse proxy used to connect to provider (optional)

LLMKind

Bases: str, Enum

Supported LLM protocol backends.

MOCK

MOCK = 'mock'

OPENAI

OPENAI = 'openai'

LLMProxy

Bases: BaseModel

Configuration for proxy-based LLM usage.

client

request

request: Request

rest

BasicHandler

Bases: BaseModel

A no-op response handler used when no special processing (like pagination or transformation) is required.

Typically used for single-response REST endpoints where the entire payload is returned in one request.

Attributes:

Name Type Description
kind Literal

Specifies the handler type as BASIC.

kind

kind: Literal[BASIC]

Specifies the handler type as Basic.

Paginator

Bases: BaseModel

A response handler for paginated REST APIs.

This handler generates repeated requests by incrementing a page-related parameter until no more data is available. The stopping condition is usually inferred from the number of records in the response being less than page_size, or from a total record count field.

increment

increment: int

Page parameter increment. Original request configuration should include initial value e.g. page_no=1

kind

kind: Literal[PAGINATOR]

Specifies the handler type as Paginator.

page_param

page_param: str

Page parameter in the request (query or body) This will be incremented from request to request

page_size

page_size: int

Page size should be explicitly defined. If response contains less records it is considered to be last page

param_locator

param_locator: ParameterDisposition

Define where the parameter is located: body or query

throttle

throttle: Optional[Union[int, float]] = None

If set to positive value then each page request is throttled given number of seconds.

Useful when dealing with rate limits or otherwise spreading the load over time.

total_records

total_records: Optional[str] = None

JMESPath to total records count in the response.

Request

Bases: BaseModel

body

body: Optional[Union[str, Dict]] = None

Request body parameters.

This field accepts either: - A Python dict representing a direct key-value mapping, or - A Jinja-templated JSON string with magic @json prefix, e.g.: @json{"parameter": "value"}

The template will be rendered using the following custom delimiters: - << ... >> for variable interpolation - <% ... %> for logic/control flow (e.g., for-loops) - <# ... #> for inline comments

The template will be rendered before being parsed into a valid JSON object. This allows the use of dynamic expressions, filters, and control flow such as loops.

Example with looping

Given:

variables = {
    "MyList": [
        {"id": 1, "value": 10},
        {"id": 2, "value": 20}
    ]
}

You can generate a dynamic body with:

body: >
@json[
    <% for row in API.look("MyTable", variables) %>
        { "id": << row.id >>, "value": << row.value >> }<% if not loop.last %>,<% endif %>
    <% endfor %>
]

This will render to a proper JSON list:

[
    { "id": 1, "value": 10 },
    { "id": 2, "value": 20 }
]

Notes: - When using @json, the entire string is rendered as a Jinja template and then parsed with json.loads(). - Nested @json blocks are not supported. - Newlines and whitespace are automatically collapsed during rendering.

content_type

content_type: ContentType = JSON

Request content type

endpoint

endpoint: str

Request endpoint e.g. get/data under base url:

Example https://api.example.com/v1 + get/data

errorhandler

errorhandler: RestErrorHandler = Field(
    default_factory=RestErrorHandler
)

Custom error handler e.g. for searching conditions in response or custom status codes

method

method: RequestMethod

Request method e.g. post,get,put

query

query: Dict = {}

Query parameters. Parameters may contain template variables.

response

response: RestResponse

Response handling configuration

ResponseHandlerTypes

RestAuth

Bases: BaseModel

Authenctication configuration for Rest connection.

NOTE: Not all authentication methods may not work in browser due to limitations in the network API.

method

method: AuthType

Specifies authentiation type.

values

values: StringDict

Mapping of parameter names and values.

{ 'X-Auth-Token': '' }

coerce_to_stringdict

coerce_to_stringdict(v)

Rest header values must be strings. This convenience validator automatically converts regiular dictionary to StringDict.

Source code in ankaflow/models/rest.py
45
46
47
48
49
50
51
52
53
54
55
56
@field_validator("values", mode="before")
@classmethod
def coerce_to_stringdict(cls, v):
    """Rest header values must be strings.
    This convenience validator  automatically
    converts regiular dictionary to StringDict.
    """
    if isinstance(v, StringDict):
        return v
    if isinstance(v, dict):
        return StringDict(v)
    raise TypeError("Expected a StringDict or a dict for `values`")

RestClientConfig

Bases: BaseModel

Rest client for given base URL. Includes transport and authentication configuration

base_url

base_url: str

Base URL, typically server or API root. All endpoints with the same base URL share the same authentication.

Example: https://api.example.com/v1

timeout

timeout: Optional[float] = None

Request timeout in seconds. Default is 5. Set 0 to disable timout.

RestErrorHandler

Bases: BaseModel

condition

condition: Optional[str] = None

JMESPath expression to look for in the response body. Error will be generated if expression evaluates to True

error_status_codes

error_status_codes: List[int] = []

List of HTTP status codes to be treated as errors.

message

message: Optional[str] = None

JMESPath expression to extract error message from respose. If omitted entire response will be included in error.

RestResponse

Bases: BaseModel

Response configuration. Response can be paged, polled URL or in body.

content_type

content_type: DataType

Returned data type

locator

locator: Optional[str] = None

JMESPath to read data from JSON body. If not set then entire body is treated as data.

StatePoller

Bases: BaseModel

A response handler for state-based polling APIs.

This handler is designed for asynchronous workflows where the client repeatedly polls an endpoint until a certain state is reached (e.g., job completion, resource readiness). Once the condition is met, the pipeline continues by reading from the final data locator.

kind

kind: Literal[STATEPOLLING]

Specifies the handler type as StatePolling.

ready_status

ready_status: str

JMESPath to read status from response. If the value at path evaluates to True then no more requests are made, and the API tries to read data from URL specified by locator.

URLPoller

Bases: BaseModel

URL Poller makes request(s) to remote API until an URL is returned

kind

kind: Literal[URLPOLLING]

Specifies the handler type as URLPolling.

ready_status

ready_status: Optional[str] = None

JMESPath to read status from response. If the value at path evaluates to True then no more requests are made, and the API tries to read data from URL specified by locator.