diff --git a/ingestion/examples/airflow_lineage/openmetadata_airflow_lineage_example.py b/ingestion/examples/airflow_lineage/openmetadata_airflow_lineage_example.py new file mode 100644 index 00000000000..3012b1b8809 --- /dev/null +++ b/ingestion/examples/airflow_lineage/openmetadata_airflow_lineage_example.py @@ -0,0 +1,49 @@ +""" +OpenMetadata Airflow Lineage Backend example. Airflow provides a pluggable lineage backend that can +read a DAG's configured inlets and outlets to compose a lineage. With OpenMetadata we have a airflow lineage backend +to get all of the workflows in Airflow and also any lineage user's configured. +Please refer to https://docs.open-metadata.org/lineage/configure-airflow-lineage on how to configure the lineage backend +with Airflow Scheduler +This is an example to demonstrate on how to configure a Airflow DAG's inlets and outlets +""" + + +from datetime import timedelta + +from airflow.decorators import dag, task +from airflow.utils.dates import days_ago + +from metadata.ingestion.models.table_metadata import Table + +default_args = { + "owner": "openmetadata_airflow_example", + "depends_on_past": False, + "email": ["user@company.com"], + "execution_timeout": timedelta(minutes=5), +} + + +@dag( + default_args=default_args, + description="OpenMetadata Airflow Lineage example DAG", + schedule_interval=timedelta(days=1), + start_date=days_ago(1), + catchup=False, +) +def openmetadata_airflow_lineage_example(): + @task( + inlets={ + "tables": [ + Table(fullyQualifiedName="bigquery.shopify.raw_order"), + Table(fullyQualifiedName="bigquery.shopify.raw_customer") + ], + }, + outlets={"tables": [Table(fullyQualifiedName="bigquery.shopify.fact_order")]}, + ) + def generate_data(): + """ write your query to generate ETL""" + pass + + generate_data() + +openmetadata_airflow_lineage_example_dag = openmetadata_airflow_lineage_example() diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py b/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py index ec9f1fa79de..afcd7f23e90 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py @@ -8,8 +8,10 @@ from airflow.lineage.backend import LineageBackend from metadata.generated.schema.api.data.createPipeline import CreatePipelineEntityRequest from metadata.generated.schema.api.data.createTask import CreateTaskEntityRequest +from metadata.generated.schema.api.lineage.addLineage import AddLineage from metadata.generated.schema.api.services.createPipelineService import CreatePipelineServiceEntityRequest from metadata.generated.schema.entity.services.pipelineService import PipelineServiceType +from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig, OpenMetadataAPIClient from metadata.utils.helpers import convert_epoch_to_iso @@ -168,6 +170,37 @@ def parse_lineage_to_openmetadata(config: OpenMetadataLineageConfig, pipeline = client.create_or_update_pipeline(create_pipeline) operator.log.info("Create Pipeline {}".format(pipeline)) + operator.log.info("Parsing Lineage") + for table in inlets: + table_entity = client.get_table_by_name(table.fullyQualifiedName) + operator.log.debug("from entity {}".format(table_entity)) + lineage = AddLineage( + edge=EntitiesEdge(fromEntity=EntityReference(id=table_entity.id, type='table'), + toEntity=EntityReference(id=pipeline.id, type='pipeline')) + ) + operator.log.debug("from lineage {}".format(lineage)) + client.create_or_update_lineage(lineage) + + for table in outlets: + table_entity = client.get_table_by_name(table.fullyQualifiedName) + operator.log.debug("to entity {}".format(table_entity)) + lineage = AddLineage( + edge=EntitiesEdge( + fromEntity=EntityReference(id=pipeline.id, type='pipeline'), + toEntity=EntityReference(id=table_entity.id, type='table')) + ) + operator.log.debug("to lineage {}".format(lineage)) + client.create_or_update_lineage(lineage) + +def is_airflow_version_1() -> bool: + try: + from airflow.hooks.base import BaseHook + return False + except ModuleNotFoundError: + from airflow.hooks.base_hook import BaseHook + return True + + class OpenMetadataLineageBackend(LineageBackend): """ Sends lineage data from tasks to OpenMetadata. @@ -191,20 +224,37 @@ class OpenMetadataLineageBackend(LineageBackend): outlets: Optional[List] = None, context: Dict = None, ) -> None: - config = get_lineage_config() - metadata_config = MetadataServerConfig.parse_obj( - { - 'api_endpoint': config.api_endpoint, - 'auth_provider_type': config.auth_provider_type, - 'secret_key': config.secret_key - } - ) - client = OpenMetadataAPIClient(metadata_config) + try: + config = get_lineage_config() + metadata_config = MetadataServerConfig.parse_obj( + { + 'api_endpoint': config.api_endpoint, + 'auth_provider_type': config.auth_provider_type, + 'secret_key': config.secret_key + } + ) + client = OpenMetadataAPIClient(metadata_config) + op_inlets = [] + op_outlets = [] + if ( + isinstance(operator._inlets, list) and len(operator._inlets) == 1 + and isinstance(operator._inlets[0], dict) and not is_airflow_version_1() + ): + op_inlets = operator._inlets[0].get("tables", []) + + if ( + isinstance(operator._outlets, list) and len(operator._outlets) == 1 + and isinstance(operator._outlets[0], dict) and not is_airflow_version_1() + ): + op_outlets = operator._outlets[0].get("tables", []) + if len(op_inlets) == 0 and len(operator.inlets) != 0: + op_inlets = operator.inlets + if len(op_outlets) == 0 and len(operator.outlets) != 0: + op_outlets = operator.outlets + parse_lineage_to_openmetadata( - config, context, operator, operator.inlets, operator.outlets, client + config, context, operator, op_inlets, op_outlets, client ) except Exception as e: - operator.log.error(traceback.format_exc()) operator.log.error(e) - operator.log.error(traceback.print_exc()) diff --git a/ingestion/src/metadata/ingestion/models/table_metadata.py b/ingestion/src/metadata/ingestion/models/table_metadata.py index d768c15c307..ab18777bf71 100644 --- a/ingestion/src/metadata/ingestion/models/table_metadata.py +++ b/ingestion/src/metadata/ingestion/models/table_metadata.py @@ -13,156 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import copy -import json - -from typing import Any, Dict, Iterable, List, Optional, Union +from typing import Any, Dict, List, Optional from pydantic import BaseModel from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.models.json_serializable import JsonSerializable - -DESCRIPTION_NODE_LABEL_VAL = 'Description' -DESCRIPTION_NODE_LABEL = DESCRIPTION_NODE_LABEL_VAL -class ColumnMetadata: - - def __init__(self, - name: str, - documentation: Union[str, None], - column_data_type: str, - ordinal_position: int, - pii_tags: List[str] = None - ) -> None: - """ - TODO: Add stats - :param name: - :param documentation: - :param column_data_type: - :param ordinal_position: - :param pii_tags: - """ - self.name = name - self.documentation = documentation - self.column_data_type = column_data_type - self.ordinal_position = ordinal_position - self.pii_tags = pii_tags if pii_tags else [] - - def __repr__(self) -> str: - return 'ColumnMetadata({!r}, {!r}, {!r}, {!r}, {!r})'.format(self.name, - self.documentation, - self.column_data_type, - self.ordinal_position, - self.pii_tags) - - -class TableMetadata: - """ - Table metadata that contains columns. - This class can be used for both table and view metadata. If it is a View, is_view=True should be passed in. - """ - - def __init__(self, - name: str, - documentation: Union[str, None], - columns: Iterable[ColumnMetadata] = None - ) -> None: - """ - :param name: - :param documentation: - :param columns: - """ - self.name = name - self.documentation = documentation - self.columns = columns if columns else [] - - def add_column(self, column: ColumnMetadata): - self.columns.append(column) - - def to_json(self): - return json.dumps(self, default=lambda o: o.__dict__) - - def __repr__(self) -> str: - return 'TableMetadata({!r}, {!r}, {!r} '.format(self.name, - self.documentation, - self.columns) - - -class DatabaseMetadata(JsonSerializable): - """ - Database metadata that contains Tables. - This class can be used to generate catalog batch REST api representation - All connectors should use this to build JSON representation of catalog REST API. - """ - DATABASE_NODE_LABEL = 'database' - CLUSTER_NODE_LABEL = 'Cluster' - - @classmethod - def create(cls, record: Dict[str, Any], service: Dict[str, Any]): - name = record.name - documentation = record.documentation - service = service - tables = [] - for table in record.tables: - table_metadata: TableMetadata = TableMetadata(table.name, table.documentation) - for column in table.columns: - column: ColumnMetadata = ColumnMetadata(column.name, column.documentation, - column.column_data_type, column.ordinal_position, - column.pii_tags) - table_metadata.add_column(column) - tables.append(table_metadata) - return cls(name, documentation, tables, service) - - def __init__(self, - name: str, - documentation: Union[str, None], - tables: Iterable[TableMetadata] = None, - service: Dict[str, Any] = None, - **kwargs: Any - ) -> None: - """ - :param name: - :param service: - :param documentation: - :param tables: - :param kwargs: Put additional attributes to the table model if there is any. - """ - self.name = name - self.documentation = documentation - self.tables = tables if tables else [] - self.service = service - if kwargs: - self.attrs = copy.deepcopy(kwargs) - - def __repr__(self) -> str: - return 'DatabaseMetadata({!r}, {!r}, {!r})'.format(self.name, - self.documentation, - self.tables) - - def add_table(self, table: TableMetadata) -> None: - self.tables.append(table) - - -class JDBCMetadata(JsonSerializable): - def __init__(self, - connection_url: str, - driver_class: str): - self.connection_url = connection_url - self.driver_class = driver_class - - -class ServiceMetadata(JsonSerializable): - """ - Service Metadata contains the configuration to connect to a database - """ - - def __init__(self, - name: str, - jdbc: JDBCMetadata): - self.name = name - self.jdbc = jdbc +class Table(BaseModel): + """Table Fully Qualified Name """ + fullyQualifiedName: str class TableESDocument(BaseModel): diff --git a/ingestion/src/metadata/ingestion/source/sample_data.py b/ingestion/src/metadata/ingestion/source/sample_data.py index c2c641b97ac..d5a44a1d81d 100644 --- a/ingestion/src/metadata/ingestion/source/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/sample_data.py @@ -416,15 +416,12 @@ class SampleDataSource(Source): def ingest_lineage(self) -> Iterable[AddLineage]: for edge in self.lineage: - print(edge) from_entity_ref = get_lineage_entity_ref(edge['from'], self.metadata_config) - print("hello from {}".format(from_entity_ref)) to_entity_ref = get_lineage_entity_ref(edge['to'], self.metadata_config) lineage = AddLineage( edge=EntitiesEdge(fromEntity=from_entity_ref, toEntity=to_entity_ref) ) - print(lineage) yield lineage def close(self): diff --git a/ingestion/src/metadata/ingestion/source/sample_users.py b/ingestion/src/metadata/ingestion/source/sample_users.py index 557ab9c5f3a..f0b1dcf204d 100644 --- a/ingestion/src/metadata/ingestion/source/sample_users.py +++ b/ingestion/src/metadata/ingestion/source/sample_users.py @@ -22,7 +22,6 @@ from dataclasses import dataclass, field from metadata.config.common import ConfigModel from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig, OpenMetadataAPIClient -from metadata.ingestion.models.table_metadata import DatabaseMetadata from metadata.ingestion.models.user import User @@ -100,7 +99,7 @@ class SampleUsersSource(Source): def prepare(self): pass - def next_record(self) -> Iterable[DatabaseMetadata]: + def next_record(self) -> Iterable[User]: for user in self.sample_columns: user_metadata = User(user['email'], user['first_name'],