diff --git a/.gitignore b/.gitignore index 92389931405..90ff9b53385 100644 --- a/.gitignore +++ b/.gitignore @@ -81,7 +81,7 @@ ingestion-core/src/metadata/generated/** ingestion/src/metadata/generated/** ingestion/requirements.txt ingestion/.python-version - +ingestion-core/src/** # MLFlow mlruns/ /ingestion/tests/integration/source/mlflow/tests/db/ diff --git a/ingestion/examples/airflow_lineage/openmetadata_airflow_lineage_example.py b/ingestion/examples/airflow_lineage/openmetadata_airflow_lineage_example.py index 571cd47a6e9..1699131dcde 100644 --- a/ingestion/examples/airflow_lineage/openmetadata_airflow_lineage_example.py +++ b/ingestion/examples/airflow_lineage/openmetadata_airflow_lineage_example.py @@ -24,7 +24,7 @@ from datetime import timedelta from airflow.decorators import dag, task from airflow.utils.dates import days_ago -from metadata.ingestion.models.table_metadata import Table +from metadata.ingestion.models.table_metadata import TableFQDN default_args = { "owner": "openmetadata_airflow_example", @@ -45,12 +45,12 @@ def openmetadata_airflow_lineage_example(): @task( inlets={ "tables": [ - Table(fullyQualifiedName="bigquery_gcp.shopify.raw_order"), - Table(fullyQualifiedName="bigquery_gcp.shopify.raw_customer"), + TableFQDN(fullyQualifiedName="bigquery_gcp.shopify.raw_order"), + TableFQDN(fullyQualifiedName="bigquery_gcp.shopify.raw_customer"), ], }, outlets={ - "tables": [Table(fullyQualifiedName="bigquery_gcp.shopify.fact_order")] + "tables": [TableFQDN(fullyQualifiedName="bigquery_gcp.shopify.fact_order")] }, ) def generate_data(): diff --git a/ingestion/examples/workflows/mysql.json b/ingestion/examples/workflows/mysql.json index d71b91684d0..f35a1b9dac3 100644 --- a/ingestion/examples/workflows/mysql.json +++ b/ingestion/examples/workflows/mysql.json @@ -7,7 +7,7 @@ "database": "openmetadata_db", "service_name": "local_mysql", "schema_filter_pattern": { - "excludes": ["mysql.*", "information_schema.*", "performance_schema.*", "sys.*"] + "includes": ["test_delete.*"] } } }, diff --git a/ingestion/src/metadata/ingestion/models/table_metadata.py b/ingestion/src/metadata/ingestion/models/table_metadata.py index 6faab668a39..c2ca650704a 100644 --- a/ingestion/src/metadata/ingestion/models/table_metadata.py +++ b/ingestion/src/metadata/ingestion/models/table_metadata.py @@ -14,10 +14,23 @@ from typing import Any, Dict, List, Optional from pydantic import BaseModel +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.type.entityReference import EntityReference -class Table(BaseModel): +class DatabaseAndTableState(BaseModel): + database: str + table: str + exists: bool + + +class DeleteTable(BaseModel): + """Entity Reference of a table to be deleted""" + + table: Table + + +class TableFQDN(BaseModel): """Table Fully Qualified Name""" fullyQualifiedName: str diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index 8c65d35bc56..e56729cf753 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -16,7 +16,8 @@ working with OpenMetadata entities. """ import logging -from typing import Generic, List, Optional, Type, TypeVar, Union, get_args +import urllib +from typing import Dict, Generic, List, Optional, Type, TypeVar, Union, get_args from pydantic import BaseModel @@ -405,6 +406,7 @@ class OpenMetadata( fields: Optional[List[str]] = None, after: str = None, limit: int = 1000, + params: Dict = {}, ) -> EntityList[T]: """ Helps us paginate over the collection @@ -414,8 +416,10 @@ class OpenMetadata( url_limit = f"?limit={limit}" url_after = f"&after={after}" if after else "" url_fields = f"&fields={','.join(fields)}" if fields else "" - - resp = self.client.get(f"{suffix}{url_limit}{url_after}{url_fields}") + url_params = f"&{urllib.parse.urlencode(params)}" + resp = self.client.get( + f"{suffix}{url_limit}{url_after}{url_fields}{url_params}" + ) if self._use_raw_data: return resp diff --git a/ingestion/src/metadata/ingestion/sink/metadata_rest.py b/ingestion/src/metadata/ingestion/sink/metadata_rest.py index 3d558f4542a..18238d4a70c 100644 --- a/ingestion/src/metadata/ingestion/sink/metadata_rest.py +++ b/ingestion/src/metadata/ingestion/sink/metadata_rest.py @@ -42,6 +42,7 @@ from metadata.generated.schema.entity.data.chart import ChartType from metadata.generated.schema.entity.data.location import Location from metadata.generated.schema.entity.data.mlmodel import MlModel from metadata.generated.schema.entity.data.pipeline import Pipeline +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.policies.policy import Policy from metadata.generated.schema.entity.teams.user import User from metadata.generated.schema.type.entityReference import EntityReference @@ -49,7 +50,7 @@ from metadata.ingestion.api.common import Entity, WorkflowContext from metadata.ingestion.api.sink import Sink, SinkStatus from metadata.ingestion.models.ometa_policy import OMetaPolicy from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable -from metadata.ingestion.models.table_metadata import Chart, Dashboard +from metadata.ingestion.models.table_metadata import Chart, Dashboard, DeleteTable from metadata.ingestion.ometa.client import APIError from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig @@ -128,6 +129,8 @@ class MetadataRestSink(Sink[Entity]): self.write_users(record) elif isinstance(record, CreateMlModelEntityRequest): self.write_ml_model(record) + elif isinstance(record, DeleteTable): + self.delete_table(record) else: logging.info( f"Ignoring the record due to unknown Record type {type(record)}" @@ -416,8 +419,19 @@ class MetadataRestSink(Sink[Entity]): self.status.records_written(record.displayName) logger.info("Sink: {}".format(record.displayName)) except Exception as err: - logger.error(traceback.format_exc()) - logger.error(traceback.print_exc()) + logger.debug(traceback.format_exc()) + logger.debug(traceback.print_exc()) + logger.error(err) + + def delete_table(self, record: DeleteTable): + try: + self.metadata.delete(entity=Table, entity_id=record.table.id) + logger.info( + f"{record.table.name} doesn't exist in source state, marking it as deleted" + ) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.debug(traceback.print_exc()) logger.error(err) def get_status(self): diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 184004b44e1..e9d423f5466 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -43,11 +43,14 @@ from metadata.generated.schema.entity.services.databaseService import ( from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.api.common import ( ConfigModel, + Entity, IncludeFilterPattern, WorkflowContext, ) from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable +from metadata.ingestion.models.table_metadata import DeleteTable +from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.utils.column_helpers import check_column_complex_type, get_column_type from metadata.utils.helpers import get_database_service_or_create @@ -135,6 +138,7 @@ class SQLConnectionConfig(ConfigModel): schema_filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() dbt_manifest_file: Optional[str] = None dbt_catalog_file: Optional[str] = None + mark_deleted_tables_as_deleted: Optional[bool] = True @abstractmethod def get_connection_url(self): @@ -183,6 +187,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): self.config = config self.metadata_config = metadata_config self.service = get_database_service_or_create(config, metadata_config) + self.metadata = OpenMetadata(metadata_config) self.status = SQLSourceStatus() self.sql_config = self.config self.connection_string = self.sql_config.get_connection_url() @@ -194,6 +199,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): self.connection = self.engine.connect() self.data_profiler = None self.data_models = {} + self.database_source_state = set() if self.config.dbt_catalog_file is not None: with open(self.config.dbt_catalog_file, "r", encoding="utf-8") as catalog: self.dbt_catalog = json.load(catalog) @@ -263,17 +269,21 @@ class SQLSource(Source[OMetaDatabaseAndTable]): logger.error(f"Failed to generate sample data for {table} - {err}") return None - def next_record(self) -> Iterable[OMetaDatabaseAndTable]: + def next_record(self) -> Iterable[Entity]: inspector = inspect(self.engine) for schema in inspector.get_schema_names(): + # clear any previous source database state + self.database_source_state.clear() if not self.sql_config.schema_filter_pattern.included(schema): self.status.filter(schema, "Schema pattern not allowed") continue - logger.debug(f"Total tables {inspector.get_table_names(schema)}") if self.config.include_tables: yield from self.fetch_tables(inspector, schema) if self.config.include_views: yield from self.fetch_views(inspector, schema) + if self.config.mark_deleted_tables_as_deleted: + schema_fqdn = f"{self.config.service_name}.{schema}" + yield from self.delete_tables(schema_fqdn) def fetch_tables( self, inspector: Inspector, schema: str @@ -296,7 +306,8 @@ class SQLSource(Source[OMetaDatabaseAndTable]): self.status.scanned(f"{self.config.get_service_name()}.{table_name}") description = _get_table_description(schema, table_name, inspector) - fqn = f"{self.config.service_name}.{self.config.database}.{schema}.{table_name}" + fqn = f"{self.config.service_name}.{schema}.{table_name}" + self.database_source_state.add(fqn) table_columns = self._get_columns(schema, table_name, inspector) table_entity = Table( id=uuid.uuid4(), @@ -366,7 +377,8 @@ class SQLSource(Source[OMetaDatabaseAndTable]): ) except NotImplementedError: view_definition = "" - + fqn = f"{self.config.service_name}.{schema}.{view_name}" + self.database_source_state.add(fqn) table = Table( id=uuid.uuid4(), name=view_name.replace(".", "_DOT_"), @@ -392,6 +404,12 @@ class SQLSource(Source[OMetaDatabaseAndTable]): self.status.warnings.append(f"{self.config.service_name}.{view_name}") continue + def delete_tables(self, schema_fqdn: str) -> DeleteTable: + database_state = self._build_database_state(schema_fqdn) + for table in database_state: + if table.fullyQualifiedName not in self.database_source_state: + yield DeleteTable(table=table) + def _parse_data_model(self): """ Get all the DBT information and feed it to the Table Entity @@ -643,6 +661,19 @@ class SQLSource(Source[OMetaDatabaseAndTable]): logger.debug(f"Finished profiling {dataset_name}") return profile + def _build_database_state(self, schema_fqdn: str) -> [EntityReference]: + after = None + tables = [] + while True: + table_entities = self.metadata.list_entities( + entity=Table, after=after, limit=10, params={"database": schema_fqdn} + ) + tables.extend(table_entities.entities) + if table_entities.after is None: + break + after = table_entities.after + return tables + def close(self): if self.connection is not None: self.connection.close()