CustomConnection User Guide¶
Welcome to the CustomConnection user guide. This document will walk you through:
- What
CustomConnection
is and when to use it - The available configuration fields
- How to reference a
CustomConnection
in your pipeline YAML - Example usage and best practices
1. Introduction¶
CustomConnection
allows you to plug in your own connection logic into the AnkaFlow pipeline. Your custom connection class must implement the base Connection
interface (or derive from it), and provide the following methods:
tap()
: Extract data from the sourcesink()
: Write data to the destinationsql()
: Execute or generate SQL statementsshow_schema()
: Display or return the table/schema structure
Even if your class does not need all of these for its logic, it must expose them (they can be no-ops).
2. CustomConnection Fields¶
Below is a description of each field in the CustomConnection
model.
Field | Type | Description |
---|---|---|
kind |
Literal["CustomConnection"] |
Must always be set to CustomConnection to select this provider. |
module |
str |
Python module path containing your custom connection class (e.g. myapp.connectors.database ). |
classname |
str |
The name of the class to load from the specified module. |
params |
dict (default {} ) |
Arbitrary parameters passed into your connection’s constructor. |
config |
ConnectionConfiguration \| None |
(Optional) Pre-built configuration object injected by BaseConnection super-class. |
fields |
List[Field] \| None |
(Optional) Schema fields, auto-populated or used by the base implementation if needed. |
locator |
str \| None |
(Optional) Name or identifier used by Connection.locate() for dynamic discovery. |
3. Defining a CustomConnection in Your Pipeline¶
In your pipeline YAML, under a sink
or tap
stage, you reference a custom connection like this:
- name: MySqlWriter
kind: sink
connection:
kind: CustomConnection
module: myapp.connectors.database
classname: MySQL
params:
port: 5555
Explanation:
name:
Logical name for the pipeline stage (MySqlWriter
).kind: sink
Specifies that this stage writes data (a "sink" stage).connection:
Block configures how to connect to the target system:kind
: Must beCustomConnection
.module
: Python import path where your class lives.classname
: Actual class name inside that module.params
: Any keyword arguments your class expects (e.g. table name, credentials key).
show: 1
Enables schema introspection after connecting (Debug mode).
4. Implementing Your Custom Class¶
In myapp/connectors/database.py
:
from ankaflow.connection import Connection
class MySQL(Connection):
def init(self):
# This method is provided by base class as convenience to
# avoid mucking with super().__init__()
# It is called in the end of base class __init__ hence
# self.config and other attributes are already populated.
pass
async def tap(self, query: str|None = None):
# Extract data from external storage and cache to pipeline
df = get_dataframe_from_mysql(query)
await self.c.register("tmp_{self.name}", df)
await self.c.sql(f'CREATE TABLE "{self.name}" AS SELECT * FROM data')
# You can use CREATE OR REPLACE or CREATE IF NOT EXISTS
# CREATE OR REPLACE Will overwrite existing data (does not enforce stage name uniqueness)
# CREATE IF NOT EXISTS will append data if table already exists.
await self.c.unregister("tmp_{self.name}")
async def sink(self, from_name: str):
# Write data to external storage
# from_name is always previous stage name (presumably tap or transform)
relation = await self.c.sql(f'SELECT * FROM "{from_name}"')
df = await relation.df() # of fetchall()
send_df_to_mysql(df)
async def sql(self, query: str):
# Execute SQL logic on target
execute_sql_on_mysql(query)
async def show_schema(self):
# Return table schema, e.g. list of Field objects
return []
Note: All methods must be async.
Note: Your class must derive from the base
Connection
to inherit common logic and have access toself.conn.cfg
,self.conn.locator()
, etc.
5. Best Practices & Troubleshooting¶
- Validation: Always validate your
params
against what your class expects. Mismatches will not raise errors atStages.load()
time but at runtime. - Module Path: Ensure your project’s root is on
PYTHONPATH
so thatimport module
succeeds at runtime. - Debugging: Use
show: 1
in your YAML to print the output of tap(). - Error Handling: Wrap your I/O in try/except blocks and surface meaningful messages; Ankalow will propagate exceptions upstream.
Happy piping!