diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py b/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py index d4d00c1f35b..b5d5dff1402 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py @@ -1,4 +1,5 @@ import json +import os import traceback from typing import TYPE_CHECKING, Dict, List, Optional @@ -41,20 +42,34 @@ class OpenMetadataLineageConfig(ConfigModel): def get_lineage_config() -> OpenMetadataLineageConfig: """Load the lineage config from airflow_provider_openmetadata.cfg.""" - airflow_service_name = conf.get( - "lineage", "airflow_service_name", fallback="airflow" - ) - api_endpoint = conf.get( - "lineage", "openmetadata_api_endpoint", fallback="http://localhost:8585" - ) - auth_provider_type = conf.get("lineage", "auth_provider_type", fallback="no-auth") - secret_key = conf.get("lineage", "secret_key", fallback=None) + airflow_service_name = conf.get("lineage", "airflow_service_name", fallback=None) + if airflow_service_name: + api_endpoint = conf.get( + "lineage", "openmetadata_api_endpoint", fallback="http://localhost:8585" + ) + auth_provider_type = conf.get( + "lineage", "auth_provider_type", fallback="no-auth" + ) + secret_key = conf.get("lineage", "secret_key", fallback=None) + return OpenMetadataLineageConfig.parse_obj( + { + "airflow_service_name": airflow_service_name, + "api_endpoint": api_endpoint, + "auth_provider_type": auth_provider_type, + "secret_key": secret_key, + } + ) + else: + openmetadata_config_file = os.getenv("OPENMETADATA_LINEAGE_CONFIG") + if openmetadata_config_file: + config = json.load(open(openmetadata_config_file)) + return OpenMetadataLineageConfig.parse_obj(config) + return OpenMetadataLineageConfig.parse_obj( { - "airflow_service_name": airflow_service_name, - "api_endpoint": api_endpoint, - "auth_provider_type": auth_provider_type, - "secret_key": secret_key, + "airflow_service_name": "airflow", + "api_endpoint": "http://localhost:8585/api", + "auth_provider_type": "no-auth", } ) @@ -137,90 +152,95 @@ def parse_lineage_to_openmetadata( operator.log.info("Task Properties {}".format(task_properties)) operator.log.info("DAG properties {}".format(dag_properties)) - # operator.log.info("Pipeline Context {}".format(context)) - - timestamp = int(dateutil.parser.parse(context["ts"]).timestamp() * 1000) - owner = dag.owner - tags = dag.tags - airflow_service_entity = None - operator.log.info("Get Airflow Service ID") - airflow_service_entity = client.get_by_name( - entity=PipelineService, fqdn=config.airflow_service_name - ) - if airflow_service_entity is None: - pipeline_service = CreatePipelineServiceEntityRequest( - name=config.airflow_service_name, - serviceType=PipelineServiceType.Airflow, - pipelineUrl=pipeline_service_url, + try: + timestamp = int(dateutil.parser.parse(context["ts"]).timestamp() * 1000) + owner = dag.owner + tags = dag.tags + airflow_service_entity = None + operator.log.info("Get Airflow Service ID") + airflow_service_entity = client.get_by_name( + entity=PipelineService, fqdn=config.airflow_service_name ) - airflow_service_entity = client.create_or_update(pipeline_service) - - operator.log.info("airflow service entity {}", airflow_service_entity) - operator.log.info(task_properties) - operator.log.info(dag_properties) - downstream_tasks = [] - dag_start_date = convert_epoch_to_iso(int(float(dag_properties["start_date"]))) - - if "_downstream_task_ids" in task_properties: - downstream_tasks = ast.literal_eval(task_properties["_downstream_task_ids"]) - - operator.log.info("downstream tasks {}".format(downstream_tasks)) - task_start_date = ( - task_properties["start_date"].isoformat() - if "start_time" in task_properties - else None - ) - task_end_date = ( - task_properties["end_date"].isoformat() - if "end_time" in task_properties - else None - ) - - task = Task( - name=task_properties["task_id"], - displayName=task_properties["label"], - taskUrl=task_url, - taskType=task_properties["_task_type"], - startDate=task_start_date, - endDate=task_end_date, - downstreamTasks=downstream_tasks, - ) - create_pipeline = CreatePipelineEntityRequest( - name=dag.dag_id, - displayName=dag.dag_id, - description=dag.description, - pipelineUrl=dag_url, - startDate=dag_start_date, - tasks=[task], - service=EntityReference(id=airflow_service_entity.id, type="pipelineService"), - ) - pipeline = client.create_or_update(create_pipeline) - operator.log.info("Create Pipeline {}".format(pipeline)) - - operator.log.info("Parsing Lineage") - for table in inlets: - table_entity = client.get_by_name(entity=Table, fqdn=table.fullyQualifiedName) - operator.log.debug("from entity {}".format(table_entity)) - lineage = AddLineage( - edge=EntitiesEdge( - fromEntity=EntityReference(id=table_entity.id, type="table"), - toEntity=EntityReference(id=pipeline.id, type="pipeline"), + if airflow_service_entity is None: + pipeline_service = CreatePipelineServiceEntityRequest( + name=config.airflow_service_name, + serviceType=PipelineServiceType.Airflow, + pipelineUrl=pipeline_service_url, ) - ) - operator.log.debug("from lineage {}".format(lineage)) - client.add_lineage(lineage) + airflow_service_entity = client.create_or_update(pipeline_service) + operator.log.info("airflow service entity {}", airflow_service_entity) + downstream_tasks = [] + dag_start_date = convert_epoch_to_iso(int(float(dag_properties["start_date"]))) - for table in outlets: - table_entity = client.get_by_name(entity=Table, fqdn=table.fullyQualifiedName) - operator.log.debug("to entity {}".format(table_entity)) - lineage = AddLineage( - edge=EntitiesEdge( - fromEntity=EntityReference(id=pipeline.id, type="pipeline"), - toEntity=EntityReference(id=table_entity.id, type="table"), - ) + if "_downstream_task_ids" in task_properties: + downstream_tasks = ast.literal_eval(task_properties["_downstream_task_ids"]) + + operator.log.info("downstream tasks {}".format(downstream_tasks)) + task_start_date = ( + task_properties["start_date"].isoformat() + if "start_time" in task_properties + else None ) - operator.log.debug("to lineage {}".format(lineage)) - client.add_lineage(lineage) + task_end_date = ( + task_properties["end_date"].isoformat() + if "end_time" in task_properties + else None + ) + + task = Task( + name=task_properties["task_id"], + displayName=task_properties["label"], + taskUrl=task_url, + taskType=task_properties["_task_type"], + startDate=task_start_date, + endDate=task_end_date, + downstreamTasks=downstream_tasks, + ) + create_pipeline = CreatePipelineEntityRequest( + name=dag.dag_id, + displayName=dag.dag_id, + description=dag.description, + pipelineUrl=dag_url, + startDate=dag_start_date, + tasks=[task], + service=EntityReference( + id=airflow_service_entity.id, type="pipelineService" + ), + ) + pipeline = client.create_or_update(create_pipeline) + operator.log.info("Parsing Lineage") + for table in inlets: + table_entity = client.get_by_name( + entity=Table, fqdn=table.fullyQualifiedName + ) + operator.log.debug("from entity {}".format(table_entity)) + lineage = AddLineage( + edge=EntitiesEdge( + fromEntity=EntityReference(id=table_entity.id, type="table"), + toEntity=EntityReference(id=pipeline.id, type="pipeline"), + ) + ) + operator.log.debug("from lineage {}".format(lineage)) + client.add_lineage(lineage) + + for table in outlets: + table_entity = client.get_by_name( + entity=Table, fqdn=table.fullyQualifiedName + ) + operator.log.debug("to entity {}".format(table_entity)) + lineage = AddLineage( + edge=EntitiesEdge( + fromEntity=EntityReference(id=pipeline.id, type="pipeline"), + toEntity=EntityReference(id=table_entity.id, type="table"), + ) + ) + operator.log.debug("to lineage {}".format(lineage)) + client.add_lineage(lineage) + except Exception as e: + operator.log.error( + f"Failed to parse Airflow DAG task and publish to OpenMetadata due to {e}" + ) + operator.log.error(traceback.format_exc()) def is_airflow_version_1() -> bool: