feat(ingest/airflow): add teradata operator support for Airflow plugin (#15418)

Co-authored-by: Sergio Gómez Villamor <sgomezvillamor@gmail.com>
This commit is contained in:
btkcodedev 2025-12-01 21:21:45 +05:30 committed by GitHub
parent 284f8cc100
commit fed7f567b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 965 additions and 1 deletions

View File

@ -94,6 +94,7 @@ Supported operators:
- `RedshiftSQLOperator`
- `SnowflakeOperator` and `SnowflakeOperatorAsync`
- `SqliteOperator`
- `TeradataOperator` (_Note: Teradata uses two-tier `database.table` naming without a schema level_)
- `TrinoOperator`
<!--

View File

@ -98,6 +98,7 @@ integration_test_requirements = {
"snowflake-connector-python>=2.7.10",
"virtualenv", # needed by PythonVirtualenvOperator
"apache-airflow-providers-sqlite",
"apache-airflow-providers-teradata",
}

View File

@ -67,6 +67,10 @@ class ExtractorManager(OLExtractorManager):
BigQueryInsertJobOperatorExtractor
)
self.task_to_extractor.extractors["TeradataOperator"] = (
TeradataOperatorExtractor
)
self._graph: Optional["DataHubGraph"] = None
@contextlib.contextmanager
@ -333,4 +337,29 @@ def _snowflake_default_schema(self: "SnowflakeExtractor") -> Optional[str]:
or self.conn.schema
)
# TODO: Should we try a fallback of:
# execute_query_on_hook(self.hook, "SELECT current_schema();")[0][0]
# execute_query_on_hook(self.hook, "SELECT current_schema();")
class TeradataOperatorExtractor(BaseExtractor):
"""Extractor for Teradata SQL operations.
Extracts lineage from TeradataOperator tasks by parsing the SQL queries
and understanding Teradata's two-tier database.table naming convention.
"""
def extract(self) -> Optional[TaskMetadata]:
from airflow.providers.teradata.operators.teradata import TeradataOperator
operator: "TeradataOperator" = self.operator
sql = operator.sql
if not sql:
self.log.warning("No query found in TeradataOperator")
return None
return _parse_sql_into_task_metadata(
self,
sql,
platform="teradata",
default_database=None,
default_schema=None,
)

View File

@ -0,0 +1,39 @@
from datetime import datetime
from airflow import DAG
from airflow.providers.teradata.operators.teradata import TeradataOperator
TERADATA_COST_TABLE = "costs"
TERADATA_PROCESSED_TABLE = "processed_costs"
def _fake_teradata_execute(*args, **kwargs):
pass
with DAG(
"teradata_operator",
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
) as dag:
TeradataOperator.execute = _fake_teradata_execute
transform_cost_table = TeradataOperator(
teradata_conn_id="my_teradata",
task_id="transform_cost_table",
sql="""
CREATE OR REPLACE TABLE {{ params.out_table_name }} AS
SELECT
id,
month,
total_cost,
area,
total_cost / area as cost_per_area
FROM {{ params.in_table_name }}
""",
params={
"in_table_name": TERADATA_COST_TABLE,
"out_table_name": TERADATA_PROCESSED_TABLE,
},
)

View File

@ -0,0 +1,674 @@
[
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,teradata_operator,prod)",
"changeType": "UPSERT",
"aspectName": "dataFlowInfo",
"aspect": {
"json": {
"customProperties": {
"_access_control": "None",
"catchup": "False",
"description": "None",
"doc_md": "None",
"fileloc": "<fileloc>",
"is_paused_upon_creation": "None",
"start_date": "DateTime(2023, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))",
"tags": "[]",
"timezone": "Timezone('UTC')"
},
"externalUrl": "http://airflow.example.com/tree?dag_id=teradata_operator",
"name": "teradata_operator",
"env": "PROD"
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,teradata_operator,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,teradata_operator,prod)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [
{
"owner": "urn:li:corpuser:airflow",
"type": "DEVELOPER",
"source": {
"type": "SERVICE"
}
}
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:airflow"
}
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,teradata_operator,prod)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,teradata_operator,prod)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,teradata_operator,prod),transform_cost_table)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,teradata_operator,prod)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "teradata_operator"
}
]
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,teradata_operator,prod),transform_cost_table)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'transform_cost_table'",
"execution_timeout": "None",
"sla": "None",
"sql": "'\\n CREATE OR REPLACE TABLE processed_costs AS\\n SELECT\\n id,\\n month,\\n total_cost,\\n area,\\n total_cost / area as cost_per_area\\n FROM costs\\n '",
"task_id": "'transform_cost_table'",
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[]",
"outlets": "[]",
"openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.30.1/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE OR REPLACE TABLE processed_costs AS\\n SELECT\\n id,\\n month,\\n total_cost,\\n area,\\n total_cost / area as cost_per_area\\n FROM costs\\n \"}"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=teradata_operator&_flt_3_task_id=transform_cost_table",
"name": "transform_cost_table",
"type": {
"string": "COMMAND"
},
"env": "PROD"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,teradata_operator,prod),transform_cost_table)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,teradata_operator,prod),transform_cost_table)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD)"
],
"inputDatajobs": [],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),id)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD),id)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),month)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD),month)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),total_cost)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD),total_cost)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),area)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD),area)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),area)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),total_cost)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD),cost_per_area)"
],
"confidenceScore": 1.0
}
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetKey",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:teradata",
"name": "costs",
"origin": "PROD"
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetKey",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:teradata",
"name": "processed_costs",
"origin": "PROD"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,teradata_operator,prod),transform_cost_table)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [
{
"owner": "urn:li:corpuser:airflow",
"type": "DEVELOPER",
"source": {
"type": "SERVICE"
}
}
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:airflow"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,teradata_operator,prod),transform_cost_table)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:119395991908a63ca4316b2b06ccfb1c",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceProperties",
"aspect": {
"json": {
"customProperties": {
"run_id": "manual_run_test",
"duration": "<duration>",
"start_date": "<start_date>",
"end_date": "<end_date>",
"execution_date": "2023-09-27 21:34:38+00:00",
"try_number": "0",
"max_tries": "0",
"external_executor_id": "None",
"state": "running",
"operator": "TeradataOperator",
"priority_weight": "1",
"log_url": "http://airflow.example.com/dags/teradata_operator/grid?dag_run_id=manual_run_test&task_id=transform_cost_table&base_date=2023-09-27T21%3A34%3A38.000000%2B0000&tab=logs",
"orchestrator": "airflow",
"dag_id": "teradata_operator",
"task_id": "transform_cost_table"
},
"externalUrl": "http://airflow.example.com/dags/teradata_operator/grid?dag_run_id=manual_run_test&task_id=transform_cost_table&base_date=2023-09-27T21%3A34%3A38.000000%2B0000&tab=logs",
"name": "teradata_operator_transform_cost_table_manual_run_test",
"type": "BATCH_AD_HOC",
"created": {
"time": 1764297417922,
"actor": "urn:li:corpuser:datahub"
}
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:119395991908a63ca4316b2b06ccfb1c",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRelationships",
"aspect": {
"json": {
"parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(airflow,teradata_operator,prod),transform_cost_table)",
"upstreamInstances": []
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:119395991908a63ca4316b2b06ccfb1c",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceInput",
"aspect": {
"json": {
"inputs": [
"urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD)"
]
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:119395991908a63ca4316b2b06ccfb1c",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceOutput",
"aspect": {
"json": {
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD)"
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetKey",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:teradata",
"name": "costs",
"origin": "PROD"
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetKey",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:teradata",
"name": "processed_costs",
"origin": "PROD"
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:119395991908a63ca4316b2b06ccfb1c",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1764297417922,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "STARTED",
"attempt": 1
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD)",
"changeType": "UPSERT",
"aspectName": "operation",
"aspect": {
"json": {
"timestampMillis": 1764297418061,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"actor": "urn:li:corpuser:airflow",
"operationType": "CREATE",
"lastUpdatedTimestamp": 1764297418061
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,teradata_operator,prod),transform_cost_table)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"depends_on_past": "False",
"email": "None",
"label": "'transform_cost_table'",
"execution_timeout": "None",
"sla": "None",
"sql": "'\\n CREATE OR REPLACE TABLE processed_costs AS\\n SELECT\\n id,\\n month,\\n total_cost,\\n area,\\n total_cost / area as cost_per_area\\n FROM costs\\n '",
"task_id": "'transform_cost_table'",
"trigger_rule": "<TriggerRule.ALL_SUCCESS: 'all_success'>",
"wait_for_downstream": "False",
"downstream_task_ids": "[]",
"inlets": "[]",
"outlets": "[]",
"openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.30.1/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n CREATE OR REPLACE TABLE processed_costs AS\\n SELECT\\n id,\\n month,\\n total_cost,\\n area,\\n total_cost / area as cost_per_area\\n FROM costs\\n \"}"
},
"externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=teradata_operator&_flt_3_task_id=transform_cost_table",
"name": "transform_cost_table",
"type": {
"string": "COMMAND"
},
"env": "PROD"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,teradata_operator,prod),transform_cost_table)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,teradata_operator,prod),transform_cost_table)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"inputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD)"
],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD)"
],
"inputDatajobs": [],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),id)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD),id)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),month)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD),month)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),total_cost)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD),total_cost)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),area)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD),area)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),area)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),total_cost)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD),cost_per_area)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),id)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD),id)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),month)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD),month)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),total_cost)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD),total_cost)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),area)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD),area)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),area)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD),total_cost)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD),cost_per_area)"
],
"confidenceScore": 1.0
}
]
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:teradata,costs,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetKey",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:teradata",
"name": "costs",
"origin": "PROD"
}
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:teradata,processed_costs,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetKey",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:teradata",
"name": "processed_costs",
"origin": "PROD"
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,teradata_operator,prod),transform_cost_table)",
"changeType": "UPSERT",
"aspectName": "ownership",
"aspect": {
"json": {
"owners": [
{
"owner": "urn:li:corpuser:airflow",
"type": "DEVELOPER",
"source": {
"type": "SERVICE"
}
}
],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:airflow"
}
}
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,teradata_operator,prod),transform_cost_table)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:119395991908a63ca4316b2b06ccfb1c",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceRunEvent",
"aspect": {
"json": {
"timestampMillis": 1764297418067,
"partitionSpec": {
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"status": "COMPLETE",
"result": {
"type": "SUCCESS",
"nativeResultType": "airflow"
}
}
}
}
]

View File

@ -257,6 +257,16 @@ def _run_airflow(
conn_type="sqlite",
host=str(tmp_path / "my_sqlite.db"),
).get_uri(),
"AIRFLOW_CONN_MY_TERADATA": Connection(
conn_id="my_teradata",
conn_type="teradata",
host="fake_teradata_host",
login="fake_username",
password="fake_password",
extra={
"tmode": "ANSI",
},
).get_uri(),
# Ensure that the plugin waits for metadata to be written.
# Note that we could also disable the RUN_IN_THREAD entirely,
# but I want to minimize the difference between CI and prod.
@ -407,6 +417,7 @@ test_cases = [
DagTestCase("datahub_emitter_operator_jinja_template_dag"),
DagTestCase("athena_operator"),
DagTestCase("bigquery_insert_job_operator"),
DagTestCase("teradata_operator"),
]

View File

@ -0,0 +1,209 @@
"""Unit tests for TeradataOperatorExtractor."""
import sys
from unittest import mock
import pytest
from datahub_airflow_plugin._extractors import (
ExtractorManager,
TeradataOperatorExtractor,
)
mock_teradata_module = mock.MagicMock()
sys.modules["airflow.providers.teradata"] = mock_teradata_module
sys.modules["airflow.providers.teradata.operators"] = mock_teradata_module
sys.modules["airflow.providers.teradata.operators.teradata"] = mock_teradata_module
class TestTeradataOperatorExtractor:
"""Test suite for TeradataOperatorExtractor."""
def test_extractor_reads_sql_field(self):
"""Test that extractor correctly reads operator.sql field."""
operator = mock.Mock()
operator.dag_id = "test_dag"
operator.task_id = "test_task"
operator.sql = "SELECT * FROM database.table"
extractor = TeradataOperatorExtractor(operator)
with mock.patch(
"datahub_airflow_plugin._extractors._parse_sql_into_task_metadata"
) as mock_parse:
extractor.extract()
mock_parse.assert_called_once()
call_args = mock_parse.call_args
assert call_args[0][1] == "SELECT * FROM database.table"
def test_extractor_handles_missing_sql(self):
"""Test that extractor handles None/empty SQL gracefully."""
operator = mock.Mock()
operator.dag_id = "test_dag"
operator.task_id = "test_task"
operator.sql = None
extractor = TeradataOperatorExtractor(operator)
result = extractor.extract()
assert result is None
def test_extractor_uses_teradata_platform(self):
"""Test that extractor uses 'teradata' as platform."""
operator = mock.Mock()
operator.dag_id = "test_dag"
operator.task_id = "test_task"
operator.sql = "SELECT * FROM test_table"
extractor = TeradataOperatorExtractor(operator)
with mock.patch(
"datahub_airflow_plugin._extractors._parse_sql_into_task_metadata"
) as mock_parse:
extractor.extract()
call_kwargs = mock_parse.call_args[1]
assert call_kwargs["platform"] == "teradata"
def test_extractor_sets_none_defaults_for_two_tier_architecture(self):
"""Test that default_database and default_schema are None.
Teradata uses 2-tier naming (database.table) not 3-tier.
Setting defaults to None prevents incorrect URN generation.
"""
operator = mock.Mock()
operator.dag_id = "test_dag"
operator.task_id = "test_task"
operator.sql = "SELECT * FROM yellow_taxi.rides"
extractor = TeradataOperatorExtractor(operator)
with mock.patch(
"datahub_airflow_plugin._extractors._parse_sql_into_task_metadata"
) as mock_parse:
extractor.extract()
call_kwargs = mock_parse.call_args[1]
assert call_kwargs["default_database"] is None
assert call_kwargs["default_schema"] is None
def test_extractor_passes_self_as_first_arg(self):
"""Test that extractor passes itself to _parse_sql_into_task_metadata."""
operator = mock.Mock()
operator.dag_id = "test_dag"
operator.task_id = "test_task"
operator.sql = "INSERT INTO dest SELECT * FROM src"
extractor = TeradataOperatorExtractor(operator)
with mock.patch(
"datahub_airflow_plugin._extractors._parse_sql_into_task_metadata"
) as mock_parse:
extractor.extract()
call_args = mock_parse.call_args[0]
assert call_args[0] is extractor
def test_extractor_handles_multiline_sql(self):
"""Test that extractor handles multiline SQL statements."""
multiline_sql = """
CREATE TABLE yellow_taxi.staging AS
SELECT
pickup_datetime,
passenger_count,
trip_distance
FROM yellow_taxi.raw_rides
WHERE trip_distance > 0
"""
operator = mock.Mock()
operator.dag_id = "etl_dag"
operator.task_id = "transform_data"
operator.sql = multiline_sql
extractor = TeradataOperatorExtractor(operator)
with mock.patch(
"datahub_airflow_plugin._extractors._parse_sql_into_task_metadata"
) as mock_parse:
extractor.extract()
call_args = mock_parse.call_args
assert call_args[0][1] == multiline_sql
assert call_args[1]["platform"] == "teradata"
def test_extractor_handles_empty_string_sql(self):
"""Test that extractor treats empty string SQL like None."""
operator = mock.Mock()
operator.dag_id = "test_dag"
operator.task_id = "test_task"
operator.sql = ""
extractor = TeradataOperatorExtractor(operator)
result = extractor.extract()
# Empty string is falsy, should return None
assert result is None
@pytest.mark.parametrize(
"sql",
[
"SELECT * FROM db1.table1",
"INSERT INTO db2.table2 SELECT * FROM db1.table1",
"CREATE TABLE db.new_table AS SELECT * FROM db.old_table",
"UPDATE db.table SET col = 'value' WHERE id = 1",
"DELETE FROM db.table WHERE status = 'archived'",
],
)
def test_extractor_handles_various_sql_statements(self, sql):
"""Test that extractor handles various SQL statement types."""
operator = mock.Mock()
operator.dag_id = "test_dag"
operator.task_id = "test_task"
operator.sql = sql
extractor = TeradataOperatorExtractor(operator)
with mock.patch(
"datahub_airflow_plugin._extractors._parse_sql_into_task_metadata"
) as mock_parse:
mock_parse.return_value = mock.Mock() # Non-None result
result = extractor.extract()
assert result is not None
assert mock_parse.call_args[0][1] == sql
def test_extractor_registered_in_manager(self):
"""Test that TeradataOperator is registered with TeradataOperatorExtractor."""
manager = ExtractorManager()
assert "TeradataOperator" in manager.task_to_extractor.extractors
assert (
manager.task_to_extractor.extractors["TeradataOperator"]
== TeradataOperatorExtractor
)
def test_extractor_follows_athena_pattern(self):
"""Test that TeradataOperatorExtractor follows same pattern as AthenaOperatorExtractor.
Both handle databases with non-standard naming conventions:
- Athena: catalog.database.table (3-tier but different from schema-based DBs)
- Teradata: database.table (2-tier, no schema concept)
"""
from datahub_airflow_plugin._extractors import AthenaOperatorExtractor
assert hasattr(TeradataOperatorExtractor, "extract")
assert hasattr(AthenaOperatorExtractor, "extract")
teradata_op = mock.Mock()
teradata_op.dag_id = "dag"
teradata_op.task_id = "task"
teradata_op.sql = "SELECT 1"
teradata_extractor = TeradataOperatorExtractor(teradata_op)
with mock.patch(
"datahub_airflow_plugin._extractors._parse_sql_into_task_metadata"
) as mock_parse:
teradata_extractor.extract()
assert mock_parse.called