diff --git a/ingestion/setup.py b/ingestion/setup.py index 2924bf116a6..918225e30c8 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -70,7 +70,7 @@ plugins: Dict[str, Set[str]] = { "pyarrow~=6.0.1", "google-cloud-datacatalog==3.6.2", }, - "bigquery-usage": {"google-cloud-logging", "cachetools"}, + "bigquery-usage": {"google-cloud-logging", "cachetools", "sqllineage==1.3.3"}, # "docker": {"docker==5.0.3"}, "docker": {"python_on_whales==0.34.0"}, "backup": {"boto3~=1.19.12"}, diff --git a/ingestion/src/metadata/ingestion/source/bigquery_usage.py b/ingestion/src/metadata/ingestion/source/bigquery_usage.py index eb5cbe68e9b..c14893875dc 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery_usage.py +++ b/ingestion/src/metadata/ingestion/source/bigquery_usage.py @@ -14,6 +14,7 @@ import collections # This import verifies that the dependencies are available. import logging as log import os +import traceback from datetime import datetime from typing import Iterable @@ -24,10 +25,11 @@ from metadata.generated.schema.entity.services.databaseService import ( ) from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.models.table_queries import TableQuery +from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.ingestion.source.bigquery import BigQueryConfig, BigquerySource from metadata.ingestion.source.sql_alchemy_helper import SQLSourceStatus -from metadata.utils.helpers import get_start_and_end +from metadata.utils.helpers import get_start_and_end, ingest_lineage logger = log.getLogger(__name__) @@ -39,7 +41,7 @@ class BigqueryUsageSource(Source[TableQuery]): def __init__(self, config, metadata_config, ctx): super().__init__(ctx) self.temp_credentials = None - + self.metadata_config = metadata_config self.config = config self.project_id = self.config.project_id self.logger_name = "cloudaudit.googleapis.com%2Fdata_access" @@ -64,7 +66,6 @@ class BigqueryUsageSource(Source[TableQuery]): def get_connection_url(self): if self.project_id: - print(f"{self.scheme}://{self.project_id}") return f"{self.scheme}://{self.project_id}" return f"{self.scheme}://" @@ -130,12 +131,19 @@ class BigqueryUsageSource(Source[TableQuery]): service_name=self.config.service_name, ) yield tq + + query_info = { + "sql": tq.sql, + "from_type": "table", + "to_type": "table", + "service_name": self.config.service_name, + } + + ingest_lineage(query_info, self.metadata_config) + except Exception as err: logger.error(repr(err)) - def close(self): - pass - def get_status(self) -> SourceStatus: return self.status diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index ec4791f273a..e240d6c4c8c 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -8,10 +8,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import logging +import traceback from datetime import datetime, timedelta from typing import Any, Dict, Iterable, List +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.api.services.createDashboardService import ( CreateDashboardServiceRequest, ) @@ -27,11 +30,14 @@ from metadata.generated.schema.api.services.createPipelineService import ( from metadata.generated.schema.api.services.createStorageService import ( CreateStorageServiceRequest, ) +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.dashboardService import DashboardService from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.messagingService import MessagingService from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.generated.schema.entity.services.storageService import StorageService +from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.ometa_api import OpenMetadata logger = logging.getLogger(__name__) @@ -183,6 +189,52 @@ def datetime_to_ts(date: datetime) -> int: return int(date.timestamp()) +def create_lineage(from_table, to_table, query_info, metadata): + try: + from_fqdn = f"{query_info.get('service_name')}.{from_table}" + from_entity: Table = metadata.get_by_name(entity=Table, fqdn=from_fqdn) + to_fqdn = f"{query_info.get('service_name')}.{to_table}" + to_entity: Table = metadata.get_by_name(entity=Table, fqdn=to_fqdn) + if not from_entity or not to_entity: + return None + + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference( + id=from_entity.id.__root__, + type=query_info["from_type"], + ), + toEntity=EntityReference( + id=to_entity.id.__root__, + type=query_info["to_type"], + ), + ) + ) + + created_lineage = metadata.add_lineage(lineage) + logger.info(f"Successfully added Lineage {created_lineage}") + + except Exception as err: + logger.debug(traceback.print_exc()) + logger.error(err) + + +def ingest_lineage(query_info, metadata_config): + from sqllineage.runner import LineageRunner + + result = LineageRunner(query_info["sql"]) + metadata = OpenMetadata(metadata_config) + for intermediate_table in result.intermediate_tables: + for source_table in result.source_tables: + create_lineage(source_table, intermediate_table, query_info, metadata) + for target_table in result.target_tables: + create_lineage(intermediate_table, target_table, query_info, metadata) + if not result.intermediate_tables: + for target_table in result.target_tables: + for source_table in result.source_tables: + create_lineage(source_table, target_table, query_info, metadata) + + def get_raw_extract_iter(alchemy_helper) -> Iterable[Dict[str, Any]]: """ Provides iterator of result row from SQLAlchemy helper