Fix #1952: Airflow Openmetadata lineage allow config to be read from … (#1953)

* Fix #1952: Airflow Openmetadata lineage allow config to be read from env variable

* Fix #1952: Airflow Openmetadata lineage allow config to be read from env variable

* Fix #1952: Airflow Openmetadata lineage allow config to be read from env variable

* Fix #1952: Airflow Openmetadata lineage allow config to be read from env variable

* Fix #1952: Airflow Openmetadata lineage allow config to be read from env variable
This commit is contained in:
Sriharsha Chintalapani 2021-12-28 22:01:18 -08:00 committed by GitHub
parent 671ab8a68c
commit 30eeec498f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,4 +1,5 @@
import json
import os
import traceback
from typing import TYPE_CHECKING, Dict, List, Optional
@ -41,20 +42,34 @@ class OpenMetadataLineageConfig(ConfigModel):
def get_lineage_config() -> OpenMetadataLineageConfig:
"""Load the lineage config from airflow_provider_openmetadata.cfg."""
airflow_service_name = conf.get(
"lineage", "airflow_service_name", fallback="airflow"
)
api_endpoint = conf.get(
"lineage", "openmetadata_api_endpoint", fallback="http://localhost:8585"
)
auth_provider_type = conf.get("lineage", "auth_provider_type", fallback="no-auth")
secret_key = conf.get("lineage", "secret_key", fallback=None)
airflow_service_name = conf.get("lineage", "airflow_service_name", fallback=None)
if airflow_service_name:
api_endpoint = conf.get(
"lineage", "openmetadata_api_endpoint", fallback="http://localhost:8585"
)
auth_provider_type = conf.get(
"lineage", "auth_provider_type", fallback="no-auth"
)
secret_key = conf.get("lineage", "secret_key", fallback=None)
return OpenMetadataLineageConfig.parse_obj(
{
"airflow_service_name": airflow_service_name,
"api_endpoint": api_endpoint,
"auth_provider_type": auth_provider_type,
"secret_key": secret_key,
}
)
else:
openmetadata_config_file = os.getenv("OPENMETADATA_LINEAGE_CONFIG")
if openmetadata_config_file:
config = json.load(open(openmetadata_config_file))
return OpenMetadataLineageConfig.parse_obj(config)
return OpenMetadataLineageConfig.parse_obj(
{
"airflow_service_name": airflow_service_name,
"api_endpoint": api_endpoint,
"auth_provider_type": auth_provider_type,
"secret_key": secret_key,
"airflow_service_name": "airflow",
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth",
}
)
@ -137,90 +152,95 @@ def parse_lineage_to_openmetadata(
operator.log.info("Task Properties {}".format(task_properties))
operator.log.info("DAG properties {}".format(dag_properties))
# operator.log.info("Pipeline Context {}".format(context))
timestamp = int(dateutil.parser.parse(context["ts"]).timestamp() * 1000)
owner = dag.owner
tags = dag.tags
airflow_service_entity = None
operator.log.info("Get Airflow Service ID")
airflow_service_entity = client.get_by_name(
entity=PipelineService, fqdn=config.airflow_service_name
)
if airflow_service_entity is None:
pipeline_service = CreatePipelineServiceEntityRequest(
name=config.airflow_service_name,
serviceType=PipelineServiceType.Airflow,
pipelineUrl=pipeline_service_url,
try:
timestamp = int(dateutil.parser.parse(context["ts"]).timestamp() * 1000)
owner = dag.owner
tags = dag.tags
airflow_service_entity = None
operator.log.info("Get Airflow Service ID")
airflow_service_entity = client.get_by_name(
entity=PipelineService, fqdn=config.airflow_service_name
)
airflow_service_entity = client.create_or_update(pipeline_service)
operator.log.info("airflow service entity {}", airflow_service_entity)
operator.log.info(task_properties)
operator.log.info(dag_properties)
downstream_tasks = []
dag_start_date = convert_epoch_to_iso(int(float(dag_properties["start_date"])))
if "_downstream_task_ids" in task_properties:
downstream_tasks = ast.literal_eval(task_properties["_downstream_task_ids"])
operator.log.info("downstream tasks {}".format(downstream_tasks))
task_start_date = (
task_properties["start_date"].isoformat()
if "start_time" in task_properties
else None
)
task_end_date = (
task_properties["end_date"].isoformat()
if "end_time" in task_properties
else None
)
task = Task(
name=task_properties["task_id"],
displayName=task_properties["label"],
taskUrl=task_url,
taskType=task_properties["_task_type"],
startDate=task_start_date,
endDate=task_end_date,
downstreamTasks=downstream_tasks,
)
create_pipeline = CreatePipelineEntityRequest(
name=dag.dag_id,
displayName=dag.dag_id,
description=dag.description,
pipelineUrl=dag_url,
startDate=dag_start_date,
tasks=[task],
service=EntityReference(id=airflow_service_entity.id, type="pipelineService"),
)
pipeline = client.create_or_update(create_pipeline)
operator.log.info("Create Pipeline {}".format(pipeline))
operator.log.info("Parsing Lineage")
for table in inlets:
table_entity = client.get_by_name(entity=Table, fqdn=table.fullyQualifiedName)
operator.log.debug("from entity {}".format(table_entity))
lineage = AddLineage(
edge=EntitiesEdge(
fromEntity=EntityReference(id=table_entity.id, type="table"),
toEntity=EntityReference(id=pipeline.id, type="pipeline"),
if airflow_service_entity is None:
pipeline_service = CreatePipelineServiceEntityRequest(
name=config.airflow_service_name,
serviceType=PipelineServiceType.Airflow,
pipelineUrl=pipeline_service_url,
)
)
operator.log.debug("from lineage {}".format(lineage))
client.add_lineage(lineage)
airflow_service_entity = client.create_or_update(pipeline_service)
operator.log.info("airflow service entity {}", airflow_service_entity)
downstream_tasks = []
dag_start_date = convert_epoch_to_iso(int(float(dag_properties["start_date"])))
for table in outlets:
table_entity = client.get_by_name(entity=Table, fqdn=table.fullyQualifiedName)
operator.log.debug("to entity {}".format(table_entity))
lineage = AddLineage(
edge=EntitiesEdge(
fromEntity=EntityReference(id=pipeline.id, type="pipeline"),
toEntity=EntityReference(id=table_entity.id, type="table"),
)
if "_downstream_task_ids" in task_properties:
downstream_tasks = ast.literal_eval(task_properties["_downstream_task_ids"])
operator.log.info("downstream tasks {}".format(downstream_tasks))
task_start_date = (
task_properties["start_date"].isoformat()
if "start_time" in task_properties
else None
)
operator.log.debug("to lineage {}".format(lineage))
client.add_lineage(lineage)
task_end_date = (
task_properties["end_date"].isoformat()
if "end_time" in task_properties
else None
)
task = Task(
name=task_properties["task_id"],
displayName=task_properties["label"],
taskUrl=task_url,
taskType=task_properties["_task_type"],
startDate=task_start_date,
endDate=task_end_date,
downstreamTasks=downstream_tasks,
)
create_pipeline = CreatePipelineEntityRequest(
name=dag.dag_id,
displayName=dag.dag_id,
description=dag.description,
pipelineUrl=dag_url,
startDate=dag_start_date,
tasks=[task],
service=EntityReference(
id=airflow_service_entity.id, type="pipelineService"
),
)
pipeline = client.create_or_update(create_pipeline)
operator.log.info("Parsing Lineage")
for table in inlets:
table_entity = client.get_by_name(
entity=Table, fqdn=table.fullyQualifiedName
)
operator.log.debug("from entity {}".format(table_entity))
lineage = AddLineage(
edge=EntitiesEdge(
fromEntity=EntityReference(id=table_entity.id, type="table"),
toEntity=EntityReference(id=pipeline.id, type="pipeline"),
)
)
operator.log.debug("from lineage {}".format(lineage))
client.add_lineage(lineage)
for table in outlets:
table_entity = client.get_by_name(
entity=Table, fqdn=table.fullyQualifiedName
)
operator.log.debug("to entity {}".format(table_entity))
lineage = AddLineage(
edge=EntitiesEdge(
fromEntity=EntityReference(id=pipeline.id, type="pipeline"),
toEntity=EntityReference(id=table_entity.id, type="table"),
)
)
operator.log.debug("to lineage {}".format(lineage))
client.add_lineage(lineage)
except Exception as e:
operator.log.error(
f"Failed to parse Airflow DAG task and publish to OpenMetadata due to {e}"
)
operator.log.error(traceback.format_exc())
def is_airflow_version_1() -> bool: