Lineage-1: Move view lineage processing to lineage workflow (#18220)

Co-authored-by: Sachin Chaurasiya <sachinchaurasiyachotey87@gmail.com>
This commit is contained in:
Mayur Singal 2024-10-28 18:18:22 +05:30 committed by GitHub
parent 24de281026
commit 9d91325af8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
71 changed files with 1079 additions and 288 deletions

View File

@ -24,9 +24,11 @@ from typing_extensions import Annotated
from metadata.generated.schema.entity.data.container import Container
from metadata.generated.schema.entity.data.query import Query
from metadata.generated.schema.entity.data.table import Table, TableType
from metadata.ingestion.models.custom_pydantic import BaseModel
from metadata.ingestion.ometa.client import REST, APIError
from metadata.ingestion.ometa.utils import quote
from metadata.ingestion.source.models import TableView
from metadata.utils.elasticsearch import ES_INDEX_MAP
from metadata.utils.logger import ometa_logger
@ -90,7 +92,7 @@ class ESMixin(Generic[T]):
# sort_field needs to be unique for the pagination to work, so we can use the FQN
paginate_query = (
"/search/query?q=&size={size}&deleted=false{filter}&index={index}"
"/search/query?q=&size={size}&deleted=false{filter}&index={index}{include_fields}"
"&sort_field=fullyQualifiedName{after}"
)
@ -301,13 +303,19 @@ class ESMixin(Generic[T]):
logger.warning(f"Unknown error extracting results from ES query [{err}]")
return None
def paginate_es(
def _get_include_fields_query(self, fields: Optional[List[str]]) -> str:
"""Get the include fields query"""
if fields:
return "&include_source_fields=" + "&include_source_fields=".join(fields)
return ""
def _paginate_es_internal(
self,
entity: Type[T],
query_filter: Optional[str] = None,
size: int = 100,
fields: Optional[List[str]] = None,
) -> Iterator[T]:
include_fields: Optional[List[str]] = None,
) -> Iterator[ESResponse]:
"""Paginate through the ES results, ignoring individual errors"""
after: Optional[str] = None
error_pages = 0
@ -316,6 +324,7 @@ class ESMixin(Generic[T]):
index=ES_INDEX_MAP[entity.__name__],
filter="&query_filter=" + quote_plus(query_filter) if query_filter else "",
size=size,
include_fields=self._get_include_fields_query(include_fields),
)
while True:
query_string = query(
@ -330,10 +339,7 @@ class ESMixin(Generic[T]):
continue
else:
break
yield from self._yield_hits_from_api(
response=response, entity=entity, fields=fields
)
yield response
# Get next page
last_hit = response.hits.hits[-1] if response.hits.hits else None
@ -343,6 +349,18 @@ class ESMixin(Generic[T]):
after = ",".join(last_hit.sort)
def paginate_es(
self,
entity: Type[T],
query_filter: Optional[str] = None,
size: int = 100,
fields: Optional[List[str]] = None,
) -> Iterator[T]:
for response in self._paginate_es_internal(entity, query_filter, size):
yield from self._yield_hits_from_api(
response=response, entity=entity, fields=fields
)
def _get_es_response(self, query_string: str) -> Optional[ESResponse]:
"""Get the Elasticsearch response"""
try:
@ -369,3 +387,43 @@ class ESMixin(Generic[T]):
logger.warning(
f"Error while getting {hit.source['fullyQualifiedName']} - {exc}"
)
def yield_es_view_def(
self,
service_name: str,
) -> Iterable[TableView]:
"""
Get the view definition from ES
"""
from metadata.utils import fqn
query = {
"query": {
"bool": {
"must": [
{"term": {"service.name.keyword": service_name}},
{"term": {"tableType": TableType.View.value}},
{"term": {"deleted": False}},
{"exists": {"field": "schemaDefinition"}},
]
}
}
}
query = json.dumps(query)
for response in self._paginate_es_internal(
entity=Table,
query_filter=query,
include_fields=["schemaDefinition", "fullyQualifiedName"],
):
for hit in response.hits.hits:
_, database_name, schema_name, table_name = fqn.split(
hit.source["fullyQualifiedName"]
)
yield TableView(
view_definition=hit.source["schemaDefinition"],
service_name=service_name,
db_name=database_name,
schema_name=schema_name,
table_name=table_name,
)

View File

@ -11,14 +11,26 @@
"""
Handle big query lineage extraction
"""
from metadata.ingestion.source.database.bigquery.queries import BIGQUERY_STATEMENT
from typing import Dict, List
from metadata.ingestion.source.database.bigquery.queries import (
BIGQUERY_GET_STORED_PROCEDURE_QUERIES,
BIGQUERY_STATEMENT,
)
from metadata.ingestion.source.database.bigquery.query_parser import (
BigqueryQueryParserSource,
)
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureLineageMixin,
)
from metadata.utils.helpers import get_start_and_end
class BigqueryLineageSource(BigqueryQueryParserSource, LineageSource):
class BigqueryLineageSource(
BigqueryQueryParserSource, StoredProcedureLineageMixin, LineageSource
):
"""
Implements the necessary methods to extract
Database lineage from Bigquery Source
@ -32,3 +44,18 @@ class BigqueryLineageSource(BigqueryQueryParserSource, LineageSource):
OR (statement_type = "INSERT" and UPPER(query) like '%%INSERT%%INTO%%SELECT%%')
)
"""
def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Pick the stored procedure name from the context
and return the list of associated queries
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = BIGQUERY_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
region=self.service_connection.usageLocation,
)
queries_dict = self.procedure_queries_dict(
query=query,
)
return queries_dict

View File

@ -77,7 +77,6 @@ from metadata.ingestion.source.database.bigquery.models import (
BigQueryStoredProcedure,
)
from metadata.ingestion.source.database.bigquery.queries import (
BIGQUERY_GET_STORED_PROCEDURE_QUERIES,
BIGQUERY_GET_STORED_PROCEDURES,
BIGQUERY_LIFE_CYCLE_QUERY,
BIGQUERY_SCHEMA_DESCRIPTION,
@ -95,14 +94,9 @@ from metadata.ingestion.source.database.life_cycle_query_mixin import (
LifeCycleQueryMixin,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureMixin,
)
from metadata.utils import fqn
from metadata.utils.credentials import GOOGLE_CREDENTIALS
from metadata.utils.filters import filter_by_database, filter_by_schema
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import (
get_all_table_ddls,
@ -223,9 +217,7 @@ Inspector.get_all_table_ddls = get_all_table_ddls
Inspector.get_table_ddl = get_table_ddl
class BigquerySource(
LifeCycleQueryMixin, StoredProcedureMixin, CommonDbSourceService, MultiDBSource
):
class BigquerySource(LifeCycleQueryMixin, CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Bigquery Source
@ -850,22 +842,6 @@ class BigquerySource(
)
)
def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Pick the stored procedure name from the context
and return the list of associated queries
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = BIGQUERY_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
region=self.service_connection.usageLocation,
)
queries_dict = self.procedure_queries_dict(
query=query,
)
return queries_dict
def mark_tables_as_deleted(self):
"""
Use the current inspector to mark tables as deleted

View File

@ -116,7 +116,7 @@ WITH SP_HISTORY AS (
AND job_type = "QUERY"
AND state = "DONE"
AND error_result is NULL
AND query LIKE 'CALL%%'
AND UPPER(query) LIKE 'CALL%%'
),
Q_HISTORY AS (
SELECT

View File

@ -117,7 +117,6 @@ class DatabaseServiceTopology(ServiceTopology):
# until we have finished ingesting all the metadata from the source.
post_process=[
"yield_view_lineage",
"yield_procedure_lineage_and_queries",
"yield_external_table_lineage",
],
)

View File

@ -0,0 +1,46 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Db2 lineage module
"""
from typing import Optional
from metadata.generated.schema.entity.services.connections.database.db2Connection import (
Db2Connection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class Db2LineageSource(LineageSource):
"""
Db2 lineage source implements view lineage
"""
@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
connection: Db2Connection = config.serviceConnection.root.config
if not isinstance(connection, Db2Connection):
raise InvalidSourceException(
f"Expected Db2Connection, but got {connection}"
)
return cls(config, metadata)

View File

@ -0,0 +1,46 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Druid lineage module
"""
from typing import Optional
from metadata.generated.schema.entity.services.connections.database.druidConnection import (
DruidConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class DruidLineageSource(LineageSource):
"""
Druid lineage source implements view lineage
"""
@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
connection: DruidConnection = config.serviceConnection.root.config
if not isinstance(connection, DruidConnection):
raise InvalidSourceException(
f"Expected DruidConnection, but got {connection}"
)
return cls(config, metadata)

View File

@ -0,0 +1,46 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Greenplum lineage module
"""
from typing import Optional
from metadata.generated.schema.entity.services.connections.database.greenplumConnection import (
GreenplumConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class GreenplumLineageSource(LineageSource):
"""
Greenplum lineage source implements view lineage
"""
@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
connection: GreenplumConnection = config.serviceConnection.root.config
if not isinstance(connection, GreenplumConnection):
raise InvalidSourceException(
f"Expected GreenplumConnection, but got {connection}"
)
return cls(config, metadata)

View File

@ -0,0 +1,46 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Hive lineage module
"""
from typing import Optional
from metadata.generated.schema.entity.services.connections.database.hiveConnection import (
HiveConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class HiveLineageSource(LineageSource):
"""
Hive lineage source implements view lineage
"""
@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
connection: HiveConnection = config.serviceConnection.root.config
if not isinstance(connection, HiveConnection):
raise InvalidSourceException(
f"Expected HiveConnection, but got {connection}"
)
return cls(config, metadata)

View File

@ -0,0 +1,46 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Impala lineage module
"""
from typing import Optional
from metadata.generated.schema.entity.services.connections.database.impalaConnection import (
ImpalaConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class ImpalaLineageSource(LineageSource):
"""
Impala lineage source implements view lineage
"""
@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
connection: ImpalaConnection = config.serviceConnection.root.config
if not isinstance(connection, ImpalaConnection):
raise InvalidSourceException(
f"Expected ImpalaConnection, but got {connection}"
)
return cls(config, metadata)

View File

@ -15,17 +15,22 @@ import csv
import os
import traceback
from abc import ABC
from typing import Iterable, Iterator, Union
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial
from typing import Callable, Iterable, Iterator, Union
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.basic import FullyQualifiedEntityName, SqlQuery
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.api.models import Either
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.ingestion.source.models import TableView
from metadata.utils import fqn
from metadata.utils.db_utils import get_view_lineage
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -43,12 +48,14 @@ class LineageSource(QueryParserSource, ABC):
- schema
"""
dialect: Dialect
def yield_table_queries_from_logs(self) -> Iterator[TableQuery]:
"""
Method to handle the usage from query logs
"""
try:
query_log_path = self.config.sourceConfig.config.queryLogFilePath
query_log_path = self.source_config.queryLogFilePath
if os.path.isfile(query_log_path):
file_paths = [query_log_path]
elif os.path.isdir(query_log_path):
@ -89,6 +96,28 @@ class LineageSource(QueryParserSource, ABC):
)
yield from self.yield_table_query()
def generate_lineage_in_thread(self, producer_fn: Callable, processor_fn: Callable):
with ThreadPoolExecutor(max_workers=self.source_config.threads) as executor:
futures = []
for produced_input in producer_fn():
futures.append(executor.submit(processor_fn, produced_input))
# Handle remaining futures after the loop
for future in as_completed(
futures, timeout=self.source_config.parsingTimeoutLimit
):
try:
results = future.result(
timeout=self.source_config.parsingTimeoutLimit
)
yield from results
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Error processing result for {produced_input}: {exc}"
)
def yield_table_query(self) -> Iterator[TableQuery]:
"""
Given an engine, iterate over the query results to
@ -127,6 +156,88 @@ class LineageSource(QueryParserSource, ABC):
)
return fqn.get_query_checksum(table_query.query) in checksums or {}
def query_lineage_generator(
self, table_query: TableQuery
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
if not self._query_already_processed(table_query):
lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query(
self.metadata,
query=table_query.query,
service_name=table_query.serviceName,
database_name=table_query.databaseName,
schema_name=table_query.databaseSchema,
dialect=self.dialect,
timeout_seconds=self.source_config.parsingTimeoutLimit,
)
for lineage_request in lineages or []:
yield lineage_request
# If we identified lineage properly, ingest the original query
if lineage_request.right:
yield Either(
right=CreateQueryRequest(
query=SqlQuery(table_query.query),
query_type=table_query.query_type,
duration=table_query.duration,
processedLineage=True,
service=FullyQualifiedEntityName(self.config.serviceName),
)
)
def yield_query_lineage(
self,
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
"""
Based on the query logs, prepare the lineage
and send it to the sink
"""
connection_type = str(self.service_connection.type.value)
self.dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
producer_fn = self.get_table_query
processor_fn = self.query_lineage_generator
yield from self.generate_lineage_in_thread(producer_fn, processor_fn)
def view_lineage_generator(
self, view: TableView
) -> Iterable[Either[AddLineageRequest]]:
try:
for lineage in get_view_lineage(
view=view,
metadata=self.metadata,
service_name=self.config.serviceName,
connection_type=self.service_connection.type.value,
timeout_seconds=self.source_config.parsingTimeoutLimit,
):
if lineage.right is not None:
yield Either(
right=OMetaLineageRequest(
lineage_request=lineage.right,
override_lineage=self.source_config.overrideViewLineage,
)
)
else:
yield lineage
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error processing view {view}: {exc}")
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
logger.info("Processing View Lineage")
producer_fn = partial(self.metadata.yield_es_view_def, self.config.serviceName)
processor_fn = self.view_lineage_generator
yield from self.generate_lineage_in_thread(producer_fn, processor_fn)
def yield_procedure_lineage(
self,
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
"""
By default stored procedure lineage is not supported.
"""
logger.info(
f"Processing Procedure Lineage not supported for {str(self.service_connection.type.value)}"
)
def _iter(
self, *_, **__
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
@ -134,33 +245,14 @@ class LineageSource(QueryParserSource, ABC):
Based on the query logs, prepare the lineage
and send it to the sink
"""
connection_type = str(self.service_connection.type.value)
dialect = ConnectionTypeDialectMapper.dialect_of(connection_type)
for table_query in self.get_table_query():
if not self._query_already_processed(table_query):
lineages: Iterable[Either[AddLineageRequest]] = get_lineage_by_query(
self.metadata,
query=table_query.query,
service_name=table_query.serviceName,
database_name=table_query.databaseName,
schema_name=table_query.databaseSchema,
dialect=dialect,
timeout_seconds=self.source_config.parsingTimeoutLimit,
if self.source_config.processViewLineage:
yield from self.yield_view_lineage() or []
if self.source_config.processStoredProcedureLineage:
yield from self.yield_procedure_lineage() or []
if self.source_config.processQueryLineage:
if hasattr(self.service_connection, "supportsLineageExtraction"):
yield from self.yield_query_lineage() or []
else:
logger.warning(
f"Lineage extraction is not supported for {str(self.service_connection.type.value)} connection"
)
for lineage_request in lineages or []:
yield lineage_request
# If we identified lineage properly, ingest the original query
if lineage_request.right:
yield Either(
right=CreateQueryRequest(
query=SqlQuery(table_query.query),
query_type=table_query.query_type,
duration=table_query.duration,
processedLineage=True,
service=FullyQualifiedEntityName(
self.config.serviceName
),
)
)

View File

@ -0,0 +1,46 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Mariadb lineage module
"""
from typing import Optional
from metadata.generated.schema.entity.services.connections.database.mariaDBConnection import (
MariadbConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class MariadbLineageSource(LineageSource):
"""
Mariadb lineage source implements view lineage
"""
@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
connection: MariadbConnection = config.serviceConnection.root.config
if not isinstance(connection, MariadbConnection):
raise InvalidSourceException(
f"Expected MariadbConnection, but got {connection}"
)
return cls(config, metadata)

View File

@ -11,21 +11,39 @@
"""
MSSQL lineage module
"""
import traceback
from datetime import datetime
from typing import Dict, List
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.ingestion.source.database.mssql.constants import (
DEFAULT_DATETIME_FORMAT,
MSSQL_DATEFORMAT_DATETIME_MAP,
)
from metadata.ingestion.source.database.mssql.queries import MSSQL_SQL_STATEMENT
from metadata.ingestion.source.database.mssql.queries import (
MSSQL_GET_STORED_PROCEDURE_QUERIES,
MSSQL_SQL_STATEMENT,
)
from metadata.ingestion.source.database.mssql.query_parser import MssqlQueryParserSource
from metadata.ingestion.source.database.mssql.utils import (
get_sqlalchemy_engine_dateformat,
)
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureLineageMixin,
)
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class MssqlLineageSource(MssqlQueryParserSource, LineageSource):
class MssqlLineageSource(
MssqlQueryParserSource, StoredProcedureLineageMixin, LineageSource
):
sql_stmt = MSSQL_SQL_STATEMENT
@ -55,3 +73,34 @@ class MssqlLineageSource(MssqlQueryParserSource, LineageSource):
filters=self.get_filters(),
result_limit=self.source_config.resultLimit,
)
def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Return the dictionary associating stored procedures to the
queries they triggered
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
server_date_format = get_sqlalchemy_engine_dateformat(self.engine)
current_datetime_format = MSSQL_DATEFORMAT_DATETIME_MAP.get(
server_date_format, DEFAULT_DATETIME_FORMAT
)
start = start.strftime(current_datetime_format)
query = MSSQL_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
)
try:
queries_dict = self.procedure_queries_dict(
query=query,
)
except Exception as ex: # pylint: disable=broad-except
logger.debug(f"Error runnning query:\n{query}")
self.status.failed(
StackTraceError(
name="Stored Procedure",
error=f"Error trying to get stored procedure queries: {ex}",
stackTrace=traceback.format_exc(),
)
)
return {}
return queries_dict

View File

@ -10,7 +10,7 @@
# limitations under the License.
"""MSSQL source module"""
import traceback
from typing import Dict, Iterable, List, Optional
from typing import Iterable, Optional
from sqlalchemy.dialects.mssql.base import MSDialect, ischema_names
from sqlalchemy.engine.reflection import Inspector
@ -35,24 +35,18 @@ from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.mssql.constants import (
DEFAULT_DATETIME_FORMAT,
MSSQL_DATEFORMAT_DATETIME_MAP,
)
from metadata.ingestion.source.database.mssql.models import (
STORED_PROC_LANGUAGE_MAP,
MssqlStoredProcedure,
)
from metadata.ingestion.source.database.mssql.queries import (
MSSQL_GET_DATABASE,
MSSQL_GET_STORED_PROCEDURE_QUERIES,
MSSQL_GET_STORED_PROCEDURES,
)
from metadata.ingestion.source.database.mssql.utils import (
get_columns,
get_foreign_keys,
get_pk_constraint,
get_sqlalchemy_engine_dateformat,
get_table_comment,
get_table_names,
get_unique_constraints,
@ -60,13 +54,8 @@ from metadata.ingestion.source.database.mssql.utils import (
get_view_names,
)
from metadata.ingestion.source.database.multi_db_source import MultiDBSource
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureMixin,
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqa_utils import update_mssql_ischema_names
from metadata.utils.sqlalchemy_utils import (
@ -99,7 +88,7 @@ Inspector.get_all_table_ddls = get_all_table_ddls
Inspector.get_table_ddl = get_table_ddl
class MssqlSource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource):
class MssqlSource(CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from MSSQL Source
@ -213,34 +202,3 @@ class MssqlSource(StoredProcedureMixin, CommonDbSourceService, MultiDBSource):
stackTrace=traceback.format_exc(),
)
)
def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Return the dictionary associating stored procedures to the
queries they triggered
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
server_date_format = get_sqlalchemy_engine_dateformat(self.engine)
current_datetime_format = MSSQL_DATEFORMAT_DATETIME_MAP.get(
server_date_format, DEFAULT_DATETIME_FORMAT
)
start = start.strftime(current_datetime_format)
query = MSSQL_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
)
try:
queries_dict = self.procedure_queries_dict(
query=query,
)
except Exception as ex: # pylint: disable=broad-except
logger.debug(f"Error runnning query:\n{query}")
self.status.failed(
StackTraceError(
name="Stored Procedure",
error=f"Error trying to get stored procedure queries: {ex}",
stackTrace=traceback.format_exc(),
)
)
return {}
return queries_dict

View File

@ -0,0 +1,46 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Mysql lineage module
"""
from typing import Optional
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class MysqlLineageSource(LineageSource):
"""
Mysql lineage source implements view lineage
"""
@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
connection: MysqlConnection = config.serviceConnection.root.config
if not isinstance(connection, MysqlConnection):
raise InvalidSourceException(
f"Expected MysqlConnection, but got {connection}"
)
return cls(config, metadata)

View File

@ -30,16 +30,26 @@ workflowConfig:
jwtToken: "token"
"""
from typing import Dict, List
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.ingestion.source.database.oracle.queries import (
ORACLE_GET_STORED_PROCEDURE_QUERIES,
ORACLE_QUERY_HISTORY_STATEMENT,
)
from metadata.ingestion.source.database.oracle.query_parser import (
OracleQueryParserSource,
)
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureLineageMixin,
)
from metadata.utils.helpers import get_start_and_end
class OracleLineageSource(OracleQueryParserSource, LineageSource):
class OracleLineageSource(
OracleQueryParserSource, StoredProcedureLineageMixin, LineageSource
):
filters = """
AND COMMAND_TYPE IN (1, 2) AND (
lower(SQL_FULLTEXT) LIKE '%%create%%table%%as%%select%%'
@ -50,3 +60,20 @@ class OracleLineageSource(OracleQueryParserSource, LineageSource):
"""
sql_stmt = ORACLE_QUERY_HISTORY_STATEMENT
stored_procedure_query = ORACLE_GET_STORED_PROCEDURE_QUERIES
def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Return the dictionary associating stored procedures to the
queries they triggered
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = self.stored_procedure_query.format(
start_date=start,
)
queries_dict = self.procedure_queries_dict(
query=query,
)
return queries_dict

View File

@ -12,7 +12,7 @@
# pylint: disable=protected-access
"""Oracle source module"""
import traceback
from typing import Dict, Iterable, List, Optional
from typing import Iterable, Optional
from sqlalchemy.dialects.oracle.base import INTERVAL, OracleDialect, ischema_names
from sqlalchemy.engine import Inspector
@ -49,7 +49,6 @@ from metadata.ingestion.source.database.oracle.models import (
OracleStoredProcedure,
)
from metadata.ingestion.source.database.oracle.queries import (
ORACLE_GET_STORED_PROCEDURE_QUERIES,
ORACLE_GET_STORED_PROCEDURES,
)
from metadata.ingestion.source.database.oracle.utils import (
@ -65,12 +64,7 @@ from metadata.ingestion.source.database.oracle.utils import (
get_view_names,
get_view_names_dialect,
)
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureMixin,
)
from metadata.utils import fqn
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import (
get_all_table_comments,
@ -109,7 +103,7 @@ Inspector.get_table_ddl = get_table_ddl
OracleDialect._get_constraint_data = _get_constraint_data
class OracleSource(StoredProcedureMixin, CommonDbSourceService):
class OracleSource(CommonDbSourceService):
"""
Implements the necessary methods to extract
Database metadata from Oracle Source
@ -250,18 +244,3 @@ class OracleSource(StoredProcedureMixin, CommonDbSourceService):
stackTrace=traceback.format_exc(),
)
)
def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Return the dictionary associating stored procedures to the
queries they triggered
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = ORACLE_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
)
queries_dict = self.procedure_queries_dict(
query=query,
)
return queries_dict

View File

@ -0,0 +1,46 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Pinotdb lineage module
"""
from typing import Optional
from metadata.generated.schema.entity.services.connections.database.pinotDBConnection import (
PinotDBConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class PinotdbLineageSource(LineageSource):
"""
Pinotdb lineage source implements view lineage
"""
@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
connection: PinotDBConnection = config.serviceConnection.root.config
if not isinstance(connection, PinotDBConnection):
raise InvalidSourceException(
f"Expected PinotDBConnection, but got {connection}"
)
return cls(config, metadata)

View File

@ -13,7 +13,7 @@ Postgres source module
"""
import traceback
from collections import namedtuple
from typing import Dict, Iterable, List, Optional, Tuple
from typing import Iterable, Optional, Tuple
from sqlalchemy import String as SqlAlchemyString
from sqlalchemy import sql
@ -72,10 +72,6 @@ from metadata.ingestion.source.database.postgres.utils import (
get_table_owner,
get_view_definition,
)
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureMixin,
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.importer import import_side_effects
@ -156,7 +152,7 @@ Inspector.get_json_fields_and_type = get_json_fields_and_type
PGDialect.get_foreign_keys = get_foreign_keys
class PostgresSource(CommonDbSourceService, MultiDBSource, StoredProcedureMixin):
class PostgresSource(CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Postgres Source
@ -364,10 +360,3 @@ class PostgresSource(CommonDbSourceService, MultiDBSource, StoredProcedureMixin)
stackTrace=traceback.format_exc(),
)
)
def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Return the dictionary associating stored procedures to the
queries they triggered
"""
return {}

View File

@ -56,6 +56,7 @@ class QueryParserSource(Source, ABC):
super().__init__()
self.config = config
self.metadata = metadata
self.service_name = self.config.serviceName
self.service_connection = self.config.serviceConnection.root.config
self.source_config = self.config.sourceConfig.config
self.start, self.end = get_start_and_end(self.source_config.queryLogDuration)

View File

@ -31,20 +31,30 @@ workflowConfig:
"""
import traceback
from typing import Iterator
from typing import Dict, Iterator, List
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.ingestion.source.database.redshift.queries import REDSHIFT_SQL_STATEMENT
from metadata.ingestion.source.database.redshift.queries import (
REDSHIFT_GET_STORED_PROCEDURE_QUERIES,
REDSHIFT_SQL_STATEMENT,
)
from metadata.ingestion.source.database.redshift.query_parser import (
RedshiftQueryParserSource,
)
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureLineageMixin,
)
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class RedshiftLineageSource(RedshiftQueryParserSource, LineageSource):
class RedshiftLineageSource(
RedshiftQueryParserSource, StoredProcedureLineageMixin, LineageSource
):
filters = """
AND (
querytxt ILIKE '%%create%%table%%as%%select%%'
@ -85,3 +95,17 @@ class RedshiftLineageSource(RedshiftQueryParserSource, LineageSource):
logger.warning(
f"Error processing query_dict {query_dict}: {exc}"
)
def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Return the dictionary associating stored procedures to the
queries they triggered
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = REDSHIFT_GET_STORED_PROCEDURE_QUERIES.format(start_date=start)
queries_dict = self.procedure_queries_dict(
query=query,
)
return queries_dict

View File

@ -14,7 +14,7 @@ Redshift source ingestion
import re
import traceback
from typing import Dict, Iterable, List, Optional, Tuple
from typing import Iterable, List, Optional, Tuple
from sqlalchemy import sql
from sqlalchemy.dialects.postgresql.base import PGDialect
@ -71,7 +71,6 @@ from metadata.ingestion.source.database.redshift.models import RedshiftStoredPro
from metadata.ingestion.source.database.redshift.queries import (
REDSHIFT_GET_ALL_RELATION_INFO,
REDSHIFT_GET_DATABASE_NAMES,
REDSHIFT_GET_STORED_PROCEDURE_QUERIES,
REDSHIFT_GET_STORED_PROCEDURES,
REDSHIFT_LIFE_CYCLE_QUERY,
REDSHIFT_PARTITION_DETAILS,
@ -85,17 +84,12 @@ from metadata.ingestion.source.database.redshift.utils import (
get_table_comment,
get_view_definition,
)
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureMixin,
)
from metadata.utils import fqn
from metadata.utils.execution_time_tracker import (
calculate_execution_time,
calculate_execution_time_generator,
)
from metadata.utils.filters import filter_by_database
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import (
get_all_table_comments,
@ -132,9 +126,7 @@ Inspector.get_all_table_ddls = get_all_table_ddls
Inspector.get_table_ddl = get_table_ddl
class RedshiftSource(
LifeCycleQueryMixin, StoredProcedureMixin, CommonDbSourceService, MultiDBSource
):
class RedshiftSource(LifeCycleQueryMixin, CommonDbSourceService, MultiDBSource):
"""
Implements the necessary methods to extract
Database metadata from Redshift Source
@ -424,23 +416,6 @@ class RedshiftSource(
)
)
def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Return the dictionary associating stored procedures to the
queries they triggered
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = REDSHIFT_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
database_name=self.context.get().database,
)
queries_dict = self.procedure_queries_dict(
query=query,
)
return queries_dict
def mark_tables_as_deleted(self):
"""
Use the current inspector to mark tables as deleted

View File

@ -328,7 +328,7 @@ Q_HISTORY as (
pid as query_session_id,
starttime as query_start_time,
endtime as query_end_time,
userid as query_user_name
b.usename as query_user_name
from STL_QUERY q
join pg_catalog.pg_user b
on b.usesysid = q.userid
@ -339,16 +339,16 @@ Q_HISTORY as (
and userid <> 1
)
select
sp.procedure_text,
trim(sp.procedure_text) procedure_text,
sp.procedure_start_time,
sp.procedure_end_time,
q.query_text,
trim(q.query_text) query_text,
q.query_type,
q.query_database_name,
trim(q.query_database_name) query_database_name,
null as query_schema_name,
q.query_start_time,
q.query_end_time,
q.query_user_name
trim(q.query_user_name) query_user_name
from SP_HISTORY sp
join Q_HISTORY q
on sp.procedure_session_id = q.query_session_id

View File

@ -0,0 +1,46 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Singlestore lineage module
"""
from typing import Optional
from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import (
SingleStoreConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class SinglestoreLineageSource(LineageSource):
"""
Singlestore lineage source implements view lineage
"""
@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
connection: SingleStoreConnection = config.serviceConnection.root.config
if not isinstance(connection, SingleStoreConnection):
raise InvalidSourceException(
f"Expected SingleStoreConnection, but got {connection}"
)
return cls(config, metadata)

View File

@ -12,14 +12,26 @@
Snowflake lineage module
"""
from typing import Dict, List
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.ingestion.source.database.snowflake.queries import SNOWFLAKE_SQL_STATEMENT
from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES,
SNOWFLAKE_SQL_STATEMENT,
)
from metadata.ingestion.source.database.snowflake.query_parser import (
SnowflakeQueryParserSource,
)
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureLineageMixin,
)
from metadata.utils.helpers import get_start_and_end
class SnowflakeLineageSource(SnowflakeQueryParserSource, LineageSource):
class SnowflakeLineageSource(
SnowflakeQueryParserSource, StoredProcedureLineageMixin, LineageSource
):
"""
Snowflake class for Lineage
"""
@ -32,3 +44,20 @@ class SnowflakeLineageSource(SnowflakeQueryParserSource, LineageSource):
OR (QUERY_TYPE = 'INSERT' and query_text ILIKE '%%insert%%into%%select%%')
)
"""
stored_procedure_query = SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES
def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Return the dictionary associating stored procedures to the
queries they triggered
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = self.stored_procedure_query.format(
start_date=start,
)
queries_dict = self.procedure_queries_dict(
query=query,
)
return queries_dict

View File

@ -14,7 +14,7 @@ Snowflake source module
import json
import traceback
from datetime import datetime
from typing import Dict, Iterable, List, Optional, Tuple
from typing import Iterable, List, Optional, Tuple
import sqlparse
from snowflake.sqlalchemy.custom_types import VARIANT
@ -80,7 +80,6 @@ from metadata.ingestion.source.database.snowflake.queries import (
SNOWFLAKE_GET_EXTERNAL_LOCATIONS,
SNOWFLAKE_GET_ORGANIZATION_NAME,
SNOWFLAKE_GET_SCHEMA_COMMENTS,
SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES,
SNOWFLAKE_GET_STORED_PROCEDURES,
SNOWFLAKE_LIFE_CYCLE_QUERY,
SNOWFLAKE_SESSION_TAG_QUERY,
@ -102,13 +101,8 @@ from metadata.ingestion.source.database.snowflake.utils import (
get_view_names_reflection,
normalize_names,
)
from metadata.ingestion.source.database.stored_procedures_mixin import (
QueryByProcedure,
StoredProcedureMixin,
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_database
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
from metadata.utils.sqlalchemy_utils import get_all_table_comments, get_all_table_ddls
from metadata.utils.tag_utils import get_ometa_tag_and_classification
@ -145,7 +139,6 @@ SnowflakeDialect._get_schema_foreign_keys = get_schema_foreign_keys
class SnowflakeSource(
StoredProcedureMixin,
ExternalTableLineageMixin,
CommonDbSourceService,
MultiDBSource,
@ -712,22 +705,6 @@ class SnowflakeSource(
)
)
def get_stored_procedure_queries_dict(self) -> Dict[str, List[QueryByProcedure]]:
"""
Return the dictionary associating stored procedures to the
queries they triggered
"""
start, _ = get_start_and_end(self.source_config.queryLogDuration)
query = SNOWFLAKE_GET_STORED_PROCEDURE_QUERIES.format(
start_date=start,
)
queries_dict = self.procedure_queries_dict(
query=query,
)
return queries_dict
def mark_tables_as_deleted(self):
"""
Use the current inspector to mark tables as deleted

View File

@ -0,0 +1,46 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Sqlite lineage module
"""
from typing import Optional
from metadata.generated.schema.entity.services.connections.database.sqliteConnection import (
SQLiteConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class SqliteLineageSource(LineageSource):
"""
Sqlite lineage source implements view lineage
"""
@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
connection: SQLiteConnection = config.serviceConnection.root.config
if not isinstance(connection, SQLiteConnection):
raise InvalidSourceException(
f"Expected SQLiteConnection, but got {connection}"
)
return cls(config, metadata)

View File

@ -11,6 +11,7 @@
"""
Mixin class with common Stored Procedures logic aimed at lineage.
"""
import json
import re
import traceback
from abc import ABC, abstractmethod
@ -27,8 +28,8 @@ from metadata.generated.schema.entity.data.storedProcedure import StoredProcedur
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
from metadata.generated.schema.metadataIngestion.databaseServiceQueryLineagePipeline import (
DatabaseServiceQueryLineagePipeline,
)
from metadata.generated.schema.type.basic import SqlQuery, Timestamp
from metadata.generated.schema.type.entityLineage import Source as LineageSource
@ -37,7 +38,6 @@ from metadata.ingestion.api.models import Either
from metadata.ingestion.api.status import Status
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.models.topology import TopologyContextManager
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ingestion_logger
from metadata.utils.stored_procedures import get_procedure_name_from_call
@ -66,7 +66,18 @@ class QueryByProcedure(BaseModel):
model_config = ConfigDict(populate_by_name=True)
class StoredProcedureMixin(ABC):
class ProcedureAndQuery(BaseModel):
"""
Model to hold the procedure and its queries
"""
procedure: StoredProcedure
query_by_procedure: QueryByProcedure
model_config = ConfigDict(populate_by_name=True)
class StoredProcedureLineageMixin(ABC):
"""
The full flow is:
1. List Stored Procedures
@ -80,10 +91,10 @@ class StoredProcedureMixin(ABC):
It should be inherited in those Sources that implement Stored Procedure ingestion.
"""
context: TopologyContextManager
status: Status
source_config: DatabaseServiceMetadataPipeline
source_config: DatabaseServiceQueryLineagePipeline
engine: Engine
stored_procedure_query_lineage: bool
metadata: OpenMetadata
@abstractmethod
@ -106,6 +117,8 @@ class StoredProcedureMixin(ABC):
for row in results:
try:
print("*** " * 100)
print(dict(row))
query_by_procedure = QueryByProcedure.model_validate(dict(row))
procedure_name = (
query_by_procedure.procedure_name
@ -143,29 +156,34 @@ class StoredProcedureMixin(ABC):
return False
def yield_procedure_lineage(
def _yield_procedure_lineage(
self, query_by_procedure: QueryByProcedure, procedure: StoredProcedure
) -> Iterable[Either[AddLineageRequest]]:
"""Add procedure lineage from its query"""
self.context.get().stored_procedure_query_lineage = False
self.stored_procedure_query_lineage = False
if self.is_lineage_query(
query_type=query_by_procedure.query_type,
query_text=query_by_procedure.query_text,
):
self.context.get().stored_procedure_query_lineage = True
self.stored_procedure_query_lineage = True
for either_lineage in get_lineage_by_query(
self.metadata,
query=query_by_procedure.query_text,
service_name=self.context.get().database_service,
service_name=self.service_name,
database_name=query_by_procedure.query_database_name,
schema_name=query_by_procedure.query_schema_name,
dialect=ConnectionTypeDialectMapper.dialect_of(
self.service_connection.type.value
),
timeout_seconds=self.source_config.queryParsingTimeoutLimit,
timeout_seconds=self.source_config.parsingTimeoutLimit,
lineage_source=LineageSource.QueryLineage,
):
if either_lineage.right.edge.lineageDetails:
print("&& " * 100)
print(either_lineage)
if (
either_lineage.left is None
and either_lineage.right.edge.lineageDetails
):
either_lineage.right.edge.lineageDetails.pipeline = EntityReference(
id=procedure.id,
type="storedProcedure",
@ -192,34 +210,59 @@ class StoredProcedureMixin(ABC):
id=procedure.id,
type="storedProcedure",
),
processedLineage=bool(
self.context.get().stored_procedure_query_lineage
),
service=self.context.get().database_service,
processedLineage=bool(self.stored_procedure_query_lineage),
service=self.service_name,
)
)
def yield_procedure_lineage_and_queries(
def procedure_lineage_processor(
self, procedure_and_query: ProcedureAndQuery
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
yield from self._yield_procedure_lineage(
query_by_procedure=procedure_and_query.query_by_procedure,
procedure=procedure_and_query.procedure,
)
yield from self.yield_procedure_query(
query_by_procedure=procedure_and_query.query_by_procedure,
procedure=procedure_and_query.procedure,
)
def procedure_lineage_generator(self) -> Iterable[ProcedureAndQuery]:
query = {
"query": {
"bool": {
"must": [
{"term": {"service.name.keyword": self.service_name}},
{"term": {"deleted": False}},
]
}
}
}
query_filter = json.dumps(query)
logger.info("Processing Lineage for Stored Procedures")
# First, get all the query history
queries_dict = self.get_stored_procedure_queries_dict()
# Then for each procedure, iterate over all its queries
for procedure in (
self.metadata.paginate_es(entity=StoredProcedure, query_filter=query_filter)
or []
):
if procedure:
logger.debug(f"Processing Lineage for [{procedure.name}]")
for query_by_procedure in (
queries_dict.get(procedure.name.root.lower()) or []
):
yield ProcedureAndQuery(
procedure=procedure, query_by_procedure=query_by_procedure
)
def yield_procedure_lineage(
self,
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
"""Get all the queries and procedures list and yield them"""
if self.context.get().stored_procedures:
logger.info("Processing Lineage for Stored Procedures")
# First, get all the query history
queries_dict = self.get_stored_procedure_queries_dict()
# Then for each procedure, iterate over all its queries
for procedure_fqn in self.context.get().stored_procedures:
procedure: StoredProcedure = self.metadata.get_by_name(
entity=StoredProcedure, fqn=procedure_fqn
)
if procedure:
logger.debug(f"Processing Lineage for [{procedure.name}]")
for query_by_procedure in (
queries_dict.get(procedure.name.root.lower()) or []
):
yield from self.yield_procedure_lineage(
query_by_procedure=query_by_procedure, procedure=procedure
)
yield from self.yield_procedure_query(
query_by_procedure=query_by_procedure, procedure=procedure
)
logger.info("Processing Lineage for Stored Procedures")
producer_fn = self.procedure_lineage_generator
processor_fn = self.procedure_lineage_processor
yield from self.generate_lineage_in_thread(producer_fn, processor_fn)

View File

@ -0,0 +1,46 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Teradata lineage module
"""
from typing import Optional
from metadata.generated.schema.entity.services.connections.database.teradataConnection import (
TeradataConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class TeradataLineageSource(LineageSource):
"""
Teradata lineage source implements view lineage
"""
@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
):
"""Create class instance"""
config: WorkflowSource = WorkflowSource.model_validate(config_dict)
connection: TeradataConnection = config.serviceConnection.root.config
if not isinstance(connection, TeradataConnection):
raise InvalidSourceException(
f"Expected TeradataConnection, but got {connection}"
)
return cls(config, metadata)

View File

@ -84,6 +84,7 @@ EXPECTED_SERVICE = [
connectionOptions=None,
connectionArguments=None,
supportsMetadataExtraction=True,
supportsViewLineageExtraction=True,
supportsProfiler=True,
supportsDBTExtraction=True,
)

View File

@ -7,7 +7,7 @@ slug: /connectors/database/druid
name="Druid"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage", "dbt"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "View Lineage", "View Column-level Lineage", "dbt"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures"]
/ %}

View File

@ -7,7 +7,7 @@ slug: /connectors/database/druid/yaml
name="Druid"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage", "dbt"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "View Lineage", "View Column-level Lineage", "dbt"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures"]
/ %}

View File

@ -7,8 +7,8 @@ slug: /connectors/database/impala
name="Impala"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt"]
unavailableFeatures=["Query Usage", "Owners", "Lineage", "Column-level Lineage", "Tags", "Stored Procedures"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt", "View Lineage", "View Column-level Lineage"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures"]
/ %}
In this section, we provide guides and references to use the Impala connector.

View File

@ -7,8 +7,8 @@ slug: /connectors/database/impala/yaml
name="Impala"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt"]
unavailableFeatures=["Query Usage", "Owners", "Lineage", "Column-level Lineage", "Tags", "Stored Procedures"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt", "View Lineage", "View Column-level Lineage"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures"]
/ %}
In this section, we provide guides and references to use the Impala connector.

View File

@ -7,8 +7,8 @@ slug: /connectors/database/mariadb
name="MariaDB"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt"]
unavailableFeatures=["Query Usage", "Owners", "Lineage", "Column-level Lineage", "Tags", "Stored Procedures"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt", "View Lineage", "View Column-level Lineage"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures"]
/ %}
In this section, we provide guides and references to use the MariaDB connector.

View File

@ -7,8 +7,8 @@ slug: /connectors/database/mariadb/yaml
name="MariaDB"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt"]
unavailableFeatures=["Query Usage", "Owners", "Lineage", "Column-level Lineage", "Tags", "Stored Procedures"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt", "View Lineage", "View Column-level Lineage"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures"]
/ %}
In this section, we provide guides and references to use the MariaDB connector.

View File

@ -7,8 +7,8 @@ slug: /connectors/database/presto
name="Presto"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt", "Lineage", "Column-level Lineage"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures", "Lineage", "Column-level Lineage"]
/ %}
In this section, we provide guides and references to use the Presto connector.

View File

@ -7,8 +7,8 @@ slug: /connectors/database/presto/yaml
name="Presto"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt", "Lineage", "Column-level Lineage"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures", "Lineage", "Column-level Lineage"]
/ %}
In this section, we provide guides and references to use the Presto connector.

View File

@ -7,7 +7,7 @@ slug: /connectors/database/sap-hana
name="SAP Hana"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "View Lineage", "View Column-level Lineage", "dbt"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage", "dbt"]
unavailableFeatures=["Query Usage", "Stored Procedures", "Owners", "Tags"]
/ %}

View File

@ -7,7 +7,7 @@ slug: /connectors/database/sap-hana/yaml
name="SAP Hana"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "View Lineage", "View Column-level Lineage", "dbt"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage", "dbt"]
unavailableFeatures=["Query Usage", "Stored Procedures", "Owners", "Tags"]
/ %}

View File

@ -7,8 +7,8 @@ slug: /connectors/database/teradata
name="Teradata"
stage="BETA"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler"]
unavailableFeatures=["Query Usage", "Data Quality", "Owners", "Tags", "Stored Procedures", "Lineage", "Column-level Lineage", "dbt"]
availableFeatures=["Metadata", "Data Profiler", "View Lineage", "View Column-level Lineage"]
unavailableFeatures=["Query Usage", "Data Quality", "Owners", "Tags", "Stored Procedures", "dbt"]
/ %}
In this section, we provide guides and references to use the Teradata connector.

View File

@ -7,8 +7,8 @@ slug: /connectors/database/teradata/yaml
name="Teradata"
stage="BETA"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures", "Lineage", "Column-level Lineage", "dbt"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "View Lineage", "View Column-level Lineage"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures", "dbt"]
/ %}
In this section, we provide guides and references to use the Teradata connector.

View File

@ -7,7 +7,7 @@ slug: /connectors/database/druid
name="Druid"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage", "dbt"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "View Lineage", "View Column-level Lineage", "dbt"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures"]
/ %}

View File

@ -7,7 +7,7 @@ slug: /connectors/database/druid/yaml
name="Druid"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage", "dbt"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "View Lineage", "View Column-level Lineage", "dbt"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures"]
/ %}

View File

@ -7,8 +7,8 @@ slug: /connectors/database/impala
name="Impala"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt"]
unavailableFeatures=["Query Usage", "Owners", "Lineage", "Column-level Lineage", "Tags", "Stored Procedures"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt", "View Lineage", "View Column-level Lineage"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures"]
/ %}
In this section, we provide guides and references to use the Impala connector.

View File

@ -7,8 +7,8 @@ slug: /connectors/database/impala/yaml
name="Impala"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt"]
unavailableFeatures=["Query Usage", "Owners", "Lineage", "Column-level Lineage", "Tags", "Stored Procedures"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt", "View Lineage", "View Column-level Lineage"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures"]
/ %}
In this section, we provide guides and references to use the Impala connector.

View File

@ -7,8 +7,8 @@ slug: /connectors/database/mariadb
name="MariaDB"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt"]
unavailableFeatures=["Query Usage", "Owners", "Lineage", "Column-level Lineage", "Tags", "Stored Procedures"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt", "View Lineage", "View Column-level Lineage"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures"]
/ %}
In this section, we provide guides and references to use the MariaDB connector.

View File

@ -7,8 +7,8 @@ slug: /connectors/database/mariadb/yaml
name="MariaDB"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt"]
unavailableFeatures=["Query Usage", "Owners", "Lineage", "Column-level Lineage", "Tags", "Stored Procedures"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "dbt", "View Lineage", "View Column-level Lineage"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures"]
/ %}
In this section, we provide guides and references to use the MariaDB connector.

View File

@ -7,7 +7,7 @@ slug: /connectors/database/sap-hana
name="SAP Hana"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "View Lineage", "View Column-level Lineage", "dbt"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage", "dbt"]
unavailableFeatures=["Query Usage", "Stored Procedures", "Owners", "Tags"]
/ %}

View File

@ -7,7 +7,7 @@ slug: /connectors/database/sap-hana/yaml
name="SAP Hana"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "View Lineage", "View Column-level Lineage", "dbt"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage", "dbt"]
unavailableFeatures=["Query Usage", "Stored Procedures", "Owners", "Tags"]
/ %}

View File

@ -7,8 +7,8 @@ slug: /connectors/database/teradata
name="Teradata"
stage="BETA"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler"]
unavailableFeatures=["Query Usage", "Data Quality", "Owners", "Tags", "Stored Procedures", "Lineage", "Column-level Lineage", "dbt"]
availableFeatures=["Metadata", "Data Profiler", "View Lineage", "View Column-level Lineage"]
unavailableFeatures=["Query Usage", "Data Quality", "Owners", "Tags", "Stored Procedures", "dbt"]
/ %}
In this section, we provide guides and references to use the Teradata connector.

View File

@ -7,8 +7,8 @@ slug: /connectors/database/teradata/yaml
name="Teradata"
stage="BETA"
platform="OpenMetadata"
availableFeatures=["Metadata", "Data Profiler", "Data Quality"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures", "Lineage", "Column-level Lineage", "dbt"]
availableFeatures=["Metadata", "Data Profiler", "Data Quality", "View Lineage", "View Column-level Lineage"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures", "dbt"]
/ %}
In this section, we provide guides and references to use the Teradata connector.

View File

@ -99,6 +99,7 @@ public record TableIndex(Table table) implements ColumnIndex {
doc.put("database_suggest", databaseSuggest);
doc.put("serviceType", table.getServiceType());
doc.put("locationPath", table.getLocationPath());
doc.put("schemaDefinition", table.getSchemaDefinition());
doc.put("service", getEntityWithDisplayName(table.getService()));
doc.put("database", getEntityWithDisplayName(table.getDatabase()));
doc.put("lineage", SearchIndex.getLineageData(table.getEntityReference()));

View File

@ -101,6 +101,9 @@
"href": {
"type": "text"
},
"schemaDefinition": {
"type": "text"
},
"sourceUrl": {
"type": "keyword"
},

View File

@ -103,6 +103,9 @@
"href": {
"type": "text"
},
"schemaDefinition": {
"type": "text"
},
"sourceUrl": {
"type": "keyword"
},

View File

@ -74,6 +74,9 @@
"href": {
"type": "text"
},
"schemaDefinition": {
"type": "text"
},
"sourceUrl": {
"type": "keyword"
},

View File

@ -35,6 +35,11 @@
"type": "boolean",
"default": true
},
"supportsViewLineageExtraction": {
"description": "Supports View Lineage Extraction.",
"type": "boolean",
"default": true
},
"supportsProfiler": {
"description": "Supports Profiler",
"type": "boolean",

View File

@ -83,6 +83,9 @@
"sampleDataStorageConfig": {
"title": "Storage Config for Sample Data",
"$ref": "../connectionBasicType.json#/definitions/sampleDataStorageConfig"
},
"supportsViewLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsViewLineageExtraction"
}
},
"additionalProperties": false,

View File

@ -79,6 +79,9 @@
"sampleDataStorageConfig": {
"title": "Storage Config for Sample Data",
"$ref": "../connectionBasicType.json#/definitions/sampleDataStorageConfig"
},
"supportsViewLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsViewLineageExtraction"
}
},
"additionalProperties": false,

View File

@ -105,6 +105,9 @@
"sampleDataStorageConfig": {
"title": "Storage Config for Sample Data",
"$ref": "../connectionBasicType.json#/definitions/sampleDataStorageConfig"
},
"supportsViewLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsViewLineageExtraction"
}
},
"additionalProperties": false,

View File

@ -114,6 +114,9 @@
"sampleDataStorageConfig": {
"title": "Storage Config for Sample Data",
"$ref": "../connectionBasicType.json#/definitions/sampleDataStorageConfig"
},
"supportsViewLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsViewLineageExtraction"
}
},
"additionalProperties": false,

View File

@ -102,6 +102,9 @@
"sampleDataStorageConfig": {
"title": "Storage Config for Sample Data",
"$ref": "../connectionBasicType.json#/definitions/sampleDataStorageConfig"
},
"supportsViewLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsViewLineageExtraction"
}
},
"additionalProperties": false,

View File

@ -84,6 +84,9 @@
"sampleDataStorageConfig": {
"title": "Storage Config for Sample Data",
"$ref": "../connectionBasicType.json#/definitions/sampleDataStorageConfig"
},
"supportsViewLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsViewLineageExtraction"
}
},
"additionalProperties": false,

View File

@ -106,6 +106,9 @@
"supportsDataDiff": {
"title": "Supports Data Diff Extraction.",
"$ref": "../connectionBasicType.json#/definitions/supportsDataDiff"
},
"supportsViewLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsViewLineageExtraction"
}
},
"additionalProperties": false,

View File

@ -84,6 +84,9 @@
"sampleDataStorageConfig": {
"title": "Storage Config for Sample Data",
"$ref": "../connectionBasicType.json#/definitions/sampleDataStorageConfig"
},
"supportsViewLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsViewLineageExtraction"
}
},
"additionalProperties": false,

View File

@ -84,6 +84,9 @@
"sampleDataStorageConfig": {
"title": "Storage Config for Sample Data",
"$ref": "../connectionBasicType.json#/definitions/sampleDataStorageConfig"
},
"supportsViewLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsViewLineageExtraction"
}
},
"additionalProperties": false,

View File

@ -85,6 +85,9 @@
"sampleDataStorageConfig": {
"title": "Storage Config for Sample Data",
"$ref": "../connectionBasicType.json#/definitions/sampleDataStorageConfig"
},
"supportsViewLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsViewLineageExtraction"
}
},
"additionalProperties": false

View File

@ -99,6 +99,9 @@
"sampleDataStorageConfig": {
"title": "Storage Config for Sample Data",
"$ref": "../connectionBasicType.json#/definitions/sampleDataStorageConfig"
},
"supportsViewLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsViewLineageExtraction"
}
},
"additionalProperties": false,

View File

@ -60,6 +60,37 @@
"description": "Regex to only fetch databases that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern",
"title": "Database Filter Pattern"
},
"overrideViewLineage":{
"title": "Override View Lineage",
"description": "Set the 'Override View Lineage' toggle to control whether to override the existing view lineage.",
"type": "boolean",
"default": false
},
"processViewLineage": {
"title": "Process View Lineage",
"description": "Set the 'Process View Lineage' toggle to control whether to process view lineage.",
"type": "boolean",
"default": true
},
"processQueryLineage": {
"title": "Process Query Lineage",
"description": "Set the 'Process Query Lineage' toggle to control whether to process query lineage.",
"type": "boolean",
"default": true
},
"processStoredProcedureLineage": {
"title": "Process Stored Procedure Lineage",
"description": "Set the 'Process Stored ProcedureLog Lineage' toggle to control whether to process stored procedure lineage.",
"type": "boolean",
"default": true
},
"threads": {
"description": "Number of Threads to use in order to parallelize lineage ingestion.",
"type": "integer",
"default": 1,
"title": "Number of Threads",
"minimum": 1
}
},
"additionalProperties": false

View File

@ -286,12 +286,13 @@ export const DEF_UI_SCHEMA = {
supportsMetadataExtraction: { 'ui:widget': 'hidden', 'ui:hideError': true },
supportsUsageExtraction: { 'ui:widget': 'hidden', 'ui:hideError': true },
supportsLineageExtraction: { 'ui:widget': 'hidden', 'ui:hideError': true },
supportsViewLineageExtraction: { 'ui:widget': 'hidden', 'ui:hideError': true },
supportsProfiler: { 'ui:widget': 'hidden', 'ui:hideError': true },
supportsDatabase: { 'ui:widget': 'hidden', 'ui:hideError': true },
supportsQueryComment: { 'ui:widget': 'hidden', 'ui:hideError': true },
supportsDBTExtraction: { 'ui:widget': 'hidden', 'ui:hideError': true },
type: { 'ui:widget': 'hidden' },
};
}
export const INGESTION_ELASTIC_SEARCH_WORKFLOW_UI_SCHEMA = {
useSSL: { 'ui:widget': 'hidden', 'ui:hideError': true },

View File

@ -147,7 +147,8 @@ export const getSupportedPipelineTypes = (serviceDetails: ServicesType) => {
config?.supportsMetadataExtraction &&
pipelineType.push(PipelineType.Metadata);
config?.supportsUsageExtraction && pipelineType.push(PipelineType.Usage);
config?.supportsLineageExtraction &&
(config?.supportsLineageExtraction ||
config?.supportsViewLineageExtraction) &&
pipelineType.push(PipelineType.Lineage);
config?.supportsProfiler && pipelineType.push(PipelineType.Profiler);
config?.supportsDBTExtraction && pipelineType.push(PipelineType.Dbt);