Automated Lineage by Query for Python SDK & CLI (#12672)

This commit is contained in:
Mayur Singal 2023-07-31 19:25:41 +05:30 committed by GitHub
parent 361629fffd
commit 8f6e5eed31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 187 additions and 0 deletions

View File

@ -0,0 +1,11 @@
serviceName: local_mysql
query: insert into target_table(json) as select json from source_table
# filePath: test.sql
# parseTimeout: 360 # timeout in seconds
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"

View File

@ -0,0 +1,74 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Lineage utility for the metadata CLI
"""
import pathlib
import sys
import traceback
from typing import Optional
from pydantic import BaseModel
from metadata.config.common import load_config_file
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.workflow import WorkflowConfig
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import UTF_8
from metadata.utils.logger import cli_logger
from metadata.utils.workflow_output_handler import WorkflowType, print_init_error
logger = cli_logger()
class LineageWorkflow(BaseModel):
filePath: Optional[str]
query: Optional[str]
serviceName: str
workflowConfig: WorkflowConfig
parseTimeout: Optional[int] = 5 * 60 # default parsing timeout to be 5 mins
def run_lineage(config_path: str) -> None:
"""
Run the ingestion workflow from a config path
to a JSON or YAML file
:param config_path: Path to load JSON config
"""
config_file = pathlib.Path(config_path)
config_dict = None
try:
config_dict = load_config_file(config_file)
workflow = LineageWorkflow.parse_obj(config_dict)
except Exception as exc:
logger.debug(traceback.format_exc())
print_init_error(exc, config_dict, WorkflowType.INGEST)
sys.exit(1)
if workflow.filePath:
with open(workflow.filePath, encoding=UTF_8) as sql_file:
sql = sql_file.read()
else:
sql = workflow.query
metadata = OpenMetadata(config=workflow.workflowConfig.openMetadataServerConfig)
service: DatabaseService = metadata.get_by_name(
entity=DatabaseService, fqn=workflow.serviceName
)
if service:
metadata.add_lineage_by_query(
database_service=service, timeout=workflow.parseTimeout, sql=sql
)
else:
logger.error(f"Service not found with name {workflow.serviceName}")

View File

@ -23,6 +23,7 @@ from metadata.cli.dataquality import run_test
from metadata.cli.docker import BACKEND_DATABASES, DockerActions, run_docker
from metadata.cli.ingest import run_ingest
from metadata.cli.insight import run_insight
from metadata.cli.lineage import run_lineage
from metadata.cli.openmetadata_dag_config_migration import (
run_openmetadata_dag_config_migration,
)
@ -46,6 +47,7 @@ class MetadataCommands(Enum):
RESTORE = "restore"
WEBHOOK = "webhook"
INSIGHT = "insight"
LINEAGE = "lineage"
OPENMETADATA_IMPORTS_MIGRATION = "openmetadata_imports_migration"
OPENMETADATA_DAG_CONFIG_MIGRATION = "openmetadata_dag_config_migration"
@ -336,6 +338,9 @@ def get_parser(args=None):
create_common_config_parser_args(
sub_parser.add_parser(MetadataCommands.INGEST.value, help="Ingestion Workflow")
)
create_common_config_parser_args(
sub_parser.add_parser(MetadataCommands.LINEAGE.value, help="Lineage Workflow")
)
create_common_config_parser_args(
sub_parser.add_parser(
MetadataCommands.PROFILE.value,
@ -407,6 +412,8 @@ def metadata(args=None): # pylint: disable=too-many-branches
if metadata_workflow == MetadataCommands.INGEST.value:
run_ingest(config_path=config_file)
if metadata_workflow == MetadataCommands.LINEAGE.value:
run_lineage(config_path=config_file)
if metadata_workflow == MetadataCommands.INSIGHT.value:
run_insight(config_path=config_file)
if metadata_workflow == MetadataCommands.PROFILE.value:

View File

@ -19,7 +19,10 @@ from typing import Any, Dict, Generic, Optional, Type, TypeVar, Union
from pydantic import BaseModel
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.parser import LINEAGE_PARSING_TIMEOUT
from metadata.ingestion.ometa.client import REST, APIError
from metadata.ingestion.ometa.utils import get_entity_type
from metadata.utils.logger import ometa_logger
@ -144,3 +147,39 @@ class OMetaLineageMixin(Generic[T]):
except APIError as err:
logger.debug(traceback.format_exc())
logger.error(f"Error {err.status_code} trying to DELETE linage for {edge}")
def add_lineage_by_query(
self,
database_service: DatabaseService,
sql: str,
database_name: str = None,
schema_name: str = None,
timeout: int = LINEAGE_PARSING_TIMEOUT,
) -> None:
"""
Method parses the query and generated the lineage
between source and target tables
"""
# pylint: disable=import-outside-toplevel,cyclic-import
# importing inside the method to avoid circular import
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
if database_service:
connection_type = database_service.serviceType.value
add_lineage_request = get_lineage_by_query(
metadata=self,
service_name=database_service.name.__root__,
dialect=ConnectionTypeDialectMapper.dialect_of(connection_type),
query=sql,
database_name=database_name,
schema_name=schema_name,
timeout_seconds=timeout,
)
for lineage_request in add_lineage_request or []:
resp = self.add_lineage(lineage_request)
entity_name = resp.get("entity", {}).get("name")
for node in resp.get("nodes", []):
logger.info(
f"added lineage between table {node.get('name')} and {entity_name} "
)

View File

@ -394,3 +394,59 @@ The UI currently supports showing the column lineage information. Data about the
will be surfaced soon. Thanks!
{% /note %}
# Automated SQL lineage
In case you want OpenMetadata to identify the lineage based on the sql query, then you can make use of the method `add_lineage_by_query` of the python SDK to parser the sql and generate the lineage in OpenMetadata.
follow the below code snippet for the example:
```python
from metadata.generated.schema.entity.services.databaseService import DatabaseService
database_service: DatabaseService = metadata.get_by_name(
entity=DatabaseService, fqn="my_service"
)
metadata.add_lineage_by_query(
database_service=database_service,
timeout=200, # timeout in seconds
sql="insert into target_table(id) as select id from source_table" # your sql query
)
```
Above example would create a lineage between `target_table` and `source_table` within `my_service` database service.
## Automated SQL lineage via CLI
To create the automated sql lineage via CLI, you need to make sure that you have installed the openmetadata-ingestion package in your local environment using command `pip install openmetadata-ingestion`.
Once that is done you will have to prepare a yaml file as follows.
```yaml
serviceName: local_mysql
query: insert into target_table(id) as select id from source_table
# filePath: test.sql
# parseTimeout: 360 # timeout in seconds
workflowConfig:
# loggerLevel: DEBUG # DEBUG, INFO, WARN or ERROR
openMetadataServerConfig:
hostPort: <OpenMetadata host and port>
authProvider: <OpenMetadata auth provider>
```
- **serviceName**: Name of the database service which contains tha table involved in query.
- **query**: You can specify the raw sql query within the yaml file itself.
- **filePath**: In case the query is too big then you can also save query in a file and pass the path to the file in this field.
- **parseTimeout**: Timeout for the lineage parsing process.
- **workflowConfig**: The main property here is the openMetadataServerConfig, where you can define the host and security provider of your OpenMetadata installation.
Once the yaml file is prepare you can run the command
```
metadata lineage -c path/to/your_config_yaml.yaml
```