Fix #748: Airflow lineage add inlet , outlet parsing to construct lineage (#749)

This commit is contained in:
Sriharsha Chintalapani 2021-10-12 21:03:15 -07:00 committed by GitHub
parent d93c2f1e43
commit 5644addc9a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 116 additions and 161 deletions

View File

@ -0,0 +1,49 @@
"""
OpenMetadata Airflow Lineage Backend example. Airflow provides a pluggable lineage backend that can
read a DAG's configured inlets and outlets to compose a lineage. With OpenMetadata we have a airflow lineage backend
to get all of the workflows in Airflow and also any lineage user's configured.
Please refer to https://docs.open-metadata.org/lineage/configure-airflow-lineage on how to configure the lineage backend
with Airflow Scheduler
This is an example to demonstrate on how to configure a Airflow DAG's inlets and outlets
"""
from datetime import timedelta
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from metadata.ingestion.models.table_metadata import Table
default_args = {
"owner": "openmetadata_airflow_example",
"depends_on_past": False,
"email": ["user@company.com"],
"execution_timeout": timedelta(minutes=5),
}
@dag(
default_args=default_args,
description="OpenMetadata Airflow Lineage example DAG",
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
catchup=False,
)
def openmetadata_airflow_lineage_example():
@task(
inlets={
"tables": [
Table(fullyQualifiedName="bigquery.shopify.raw_order"),
Table(fullyQualifiedName="bigquery.shopify.raw_customer")
],
},
outlets={"tables": [Table(fullyQualifiedName="bigquery.shopify.fact_order")]},
)
def generate_data():
""" write your query to generate ETL"""
pass
generate_data()
openmetadata_airflow_lineage_example_dag = openmetadata_airflow_lineage_example()

View File

@ -8,8 +8,10 @@ from airflow.lineage.backend import LineageBackend
from metadata.generated.schema.api.data.createPipeline import CreatePipelineEntityRequest
from metadata.generated.schema.api.data.createTask import CreateTaskEntityRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineage
from metadata.generated.schema.api.services.createPipelineService import CreatePipelineServiceEntityRequest
from metadata.generated.schema.entity.services.pipelineService import PipelineServiceType
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig, OpenMetadataAPIClient
from metadata.utils.helpers import convert_epoch_to_iso
@ -168,6 +170,37 @@ def parse_lineage_to_openmetadata(config: OpenMetadataLineageConfig,
pipeline = client.create_or_update_pipeline(create_pipeline)
operator.log.info("Create Pipeline {}".format(pipeline))
operator.log.info("Parsing Lineage")
for table in inlets:
table_entity = client.get_table_by_name(table.fullyQualifiedName)
operator.log.debug("from entity {}".format(table_entity))
lineage = AddLineage(
edge=EntitiesEdge(fromEntity=EntityReference(id=table_entity.id, type='table'),
toEntity=EntityReference(id=pipeline.id, type='pipeline'))
)
operator.log.debug("from lineage {}".format(lineage))
client.create_or_update_lineage(lineage)
for table in outlets:
table_entity = client.get_table_by_name(table.fullyQualifiedName)
operator.log.debug("to entity {}".format(table_entity))
lineage = AddLineage(
edge=EntitiesEdge(
fromEntity=EntityReference(id=pipeline.id, type='pipeline'),
toEntity=EntityReference(id=table_entity.id, type='table'))
)
operator.log.debug("to lineage {}".format(lineage))
client.create_or_update_lineage(lineage)
def is_airflow_version_1() -> bool:
try:
from airflow.hooks.base import BaseHook
return False
except ModuleNotFoundError:
from airflow.hooks.base_hook import BaseHook
return True
class OpenMetadataLineageBackend(LineageBackend):
"""
Sends lineage data from tasks to OpenMetadata.
@ -191,20 +224,37 @@ class OpenMetadataLineageBackend(LineageBackend):
outlets: Optional[List] = None,
context: Dict = None,
) -> None:
config = get_lineage_config()
metadata_config = MetadataServerConfig.parse_obj(
{
'api_endpoint': config.api_endpoint,
'auth_provider_type': config.auth_provider_type,
'secret_key': config.secret_key
}
)
client = OpenMetadataAPIClient(metadata_config)
try:
config = get_lineage_config()
metadata_config = MetadataServerConfig.parse_obj(
{
'api_endpoint': config.api_endpoint,
'auth_provider_type': config.auth_provider_type,
'secret_key': config.secret_key
}
)
client = OpenMetadataAPIClient(metadata_config)
op_inlets = []
op_outlets = []
if (
isinstance(operator._inlets, list) and len(operator._inlets) == 1
and isinstance(operator._inlets[0], dict) and not is_airflow_version_1()
):
op_inlets = operator._inlets[0].get("tables", [])
if (
isinstance(operator._outlets, list) and len(operator._outlets) == 1
and isinstance(operator._outlets[0], dict) and not is_airflow_version_1()
):
op_outlets = operator._outlets[0].get("tables", [])
if len(op_inlets) == 0 and len(operator.inlets) != 0:
op_inlets = operator.inlets
if len(op_outlets) == 0 and len(operator.outlets) != 0:
op_outlets = operator.outlets
parse_lineage_to_openmetadata(
config, context, operator, operator.inlets, operator.outlets, client
config, context, operator, op_inlets, op_outlets, client
)
except Exception as e:
operator.log.error(traceback.format_exc())
operator.log.error(e)
operator.log.error(traceback.print_exc())

View File

@ -13,156 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import json
from typing import Any, Dict, Iterable, List, Optional, Union
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.models.json_serializable import JsonSerializable
DESCRIPTION_NODE_LABEL_VAL = 'Description'
DESCRIPTION_NODE_LABEL = DESCRIPTION_NODE_LABEL_VAL
class ColumnMetadata:
def __init__(self,
name: str,
documentation: Union[str, None],
column_data_type: str,
ordinal_position: int,
pii_tags: List[str] = None
) -> None:
"""
TODO: Add stats
:param name:
:param documentation:
:param column_data_type:
:param ordinal_position:
:param pii_tags:
"""
self.name = name
self.documentation = documentation
self.column_data_type = column_data_type
self.ordinal_position = ordinal_position
self.pii_tags = pii_tags if pii_tags else []
def __repr__(self) -> str:
return 'ColumnMetadata({!r}, {!r}, {!r}, {!r}, {!r})'.format(self.name,
self.documentation,
self.column_data_type,
self.ordinal_position,
self.pii_tags)
class TableMetadata:
"""
Table metadata that contains columns.
This class can be used for both table and view metadata. If it is a View, is_view=True should be passed in.
"""
def __init__(self,
name: str,
documentation: Union[str, None],
columns: Iterable[ColumnMetadata] = None
) -> None:
"""
:param name:
:param documentation:
:param columns:
"""
self.name = name
self.documentation = documentation
self.columns = columns if columns else []
def add_column(self, column: ColumnMetadata):
self.columns.append(column)
def to_json(self):
return json.dumps(self, default=lambda o: o.__dict__)
def __repr__(self) -> str:
return 'TableMetadata({!r}, {!r}, {!r} '.format(self.name,
self.documentation,
self.columns)
class DatabaseMetadata(JsonSerializable):
"""
Database metadata that contains Tables.
This class can be used to generate catalog batch REST api representation
All connectors should use this to build JSON representation of catalog REST API.
"""
DATABASE_NODE_LABEL = 'database'
CLUSTER_NODE_LABEL = 'Cluster'
@classmethod
def create(cls, record: Dict[str, Any], service: Dict[str, Any]):
name = record.name
documentation = record.documentation
service = service
tables = []
for table in record.tables:
table_metadata: TableMetadata = TableMetadata(table.name, table.documentation)
for column in table.columns:
column: ColumnMetadata = ColumnMetadata(column.name, column.documentation,
column.column_data_type, column.ordinal_position,
column.pii_tags)
table_metadata.add_column(column)
tables.append(table_metadata)
return cls(name, documentation, tables, service)
def __init__(self,
name: str,
documentation: Union[str, None],
tables: Iterable[TableMetadata] = None,
service: Dict[str, Any] = None,
**kwargs: Any
) -> None:
"""
:param name:
:param service:
:param documentation:
:param tables:
:param kwargs: Put additional attributes to the table model if there is any.
"""
self.name = name
self.documentation = documentation
self.tables = tables if tables else []
self.service = service
if kwargs:
self.attrs = copy.deepcopy(kwargs)
def __repr__(self) -> str:
return 'DatabaseMetadata({!r}, {!r}, {!r})'.format(self.name,
self.documentation,
self.tables)
def add_table(self, table: TableMetadata) -> None:
self.tables.append(table)
class JDBCMetadata(JsonSerializable):
def __init__(self,
connection_url: str,
driver_class: str):
self.connection_url = connection_url
self.driver_class = driver_class
class ServiceMetadata(JsonSerializable):
"""
Service Metadata contains the configuration to connect to a database
"""
def __init__(self,
name: str,
jdbc: JDBCMetadata):
self.name = name
self.jdbc = jdbc
class Table(BaseModel):
"""Table Fully Qualified Name """
fullyQualifiedName: str
class TableESDocument(BaseModel):

View File

@ -416,15 +416,12 @@ class SampleDataSource(Source):
def ingest_lineage(self) -> Iterable[AddLineage]:
for edge in self.lineage:
print(edge)
from_entity_ref = get_lineage_entity_ref(edge['from'], self.metadata_config)
print("hello from {}".format(from_entity_ref))
to_entity_ref = get_lineage_entity_ref(edge['to'], self.metadata_config)
lineage = AddLineage(
edge=EntitiesEdge(fromEntity=from_entity_ref,
toEntity=to_entity_ref)
)
print(lineage)
yield lineage
def close(self):

View File

@ -22,7 +22,6 @@ from dataclasses import dataclass, field
from metadata.config.common import ConfigModel
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig, OpenMetadataAPIClient
from metadata.ingestion.models.table_metadata import DatabaseMetadata
from metadata.ingestion.models.user import User
@ -100,7 +99,7 @@ class SampleUsersSource(Source):
def prepare(self):
pass
def next_record(self) -> Iterable[DatabaseMetadata]:
def next_record(self) -> Iterable[User]:
for user in self.sample_columns:
user_metadata = User(user['email'],
user['first_name'],