Fix #1994: Add support for marking dataset entities as deleted (#2141)

* Fix #1994: Add support for marking dataset entities as deleted

* Fix #1994: Add support for marking dataset entities as deleted

* Fix #1994: Add support for marking dataset entities as deleted
This commit is contained in:
Sriharsha Chintalapani 2022-01-10 22:12:53 -08:00 committed by GitHub
parent bc05e4945d
commit cf6f438531
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 79 additions and 17 deletions

2
.gitignore vendored
View File

@ -81,7 +81,7 @@ ingestion-core/src/metadata/generated/**
ingestion/src/metadata/generated/**
ingestion/requirements.txt
ingestion/.python-version
ingestion-core/src/**
# MLFlow
mlruns/
/ingestion/tests/integration/source/mlflow/tests/db/

View File

@ -24,7 +24,7 @@ from datetime import timedelta
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from metadata.ingestion.models.table_metadata import Table
from metadata.ingestion.models.table_metadata import TableFQDN
default_args = {
"owner": "openmetadata_airflow_example",
@ -45,12 +45,12 @@ def openmetadata_airflow_lineage_example():
@task(
inlets={
"tables": [
Table(fullyQualifiedName="bigquery_gcp.shopify.raw_order"),
Table(fullyQualifiedName="bigquery_gcp.shopify.raw_customer"),
TableFQDN(fullyQualifiedName="bigquery_gcp.shopify.raw_order"),
TableFQDN(fullyQualifiedName="bigquery_gcp.shopify.raw_customer"),
],
},
outlets={
"tables": [Table(fullyQualifiedName="bigquery_gcp.shopify.fact_order")]
"tables": [TableFQDN(fullyQualifiedName="bigquery_gcp.shopify.fact_order")]
},
)
def generate_data():

View File

@ -7,7 +7,7 @@
"database": "openmetadata_db",
"service_name": "local_mysql",
"schema_filter_pattern": {
"excludes": ["mysql.*", "information_schema.*", "performance_schema.*", "sys.*"]
"includes": ["test_delete.*"]
}
}
},

View File

@ -14,10 +14,23 @@ from typing import Any, Dict, List, Optional
from pydantic import BaseModel
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.entityReference import EntityReference
class Table(BaseModel):
class DatabaseAndTableState(BaseModel):
database: str
table: str
exists: bool
class DeleteTable(BaseModel):
"""Entity Reference of a table to be deleted"""
table: Table
class TableFQDN(BaseModel):
"""Table Fully Qualified Name"""
fullyQualifiedName: str

View File

@ -16,7 +16,8 @@ working with OpenMetadata entities.
"""
import logging
from typing import Generic, List, Optional, Type, TypeVar, Union, get_args
import urllib
from typing import Dict, Generic, List, Optional, Type, TypeVar, Union, get_args
from pydantic import BaseModel
@ -405,6 +406,7 @@ class OpenMetadata(
fields: Optional[List[str]] = None,
after: str = None,
limit: int = 1000,
params: Dict = {},
) -> EntityList[T]:
"""
Helps us paginate over the collection
@ -414,8 +416,10 @@ class OpenMetadata(
url_limit = f"?limit={limit}"
url_after = f"&after={after}" if after else ""
url_fields = f"&fields={','.join(fields)}" if fields else ""
resp = self.client.get(f"{suffix}{url_limit}{url_after}{url_fields}")
url_params = f"&{urllib.parse.urlencode(params)}"
resp = self.client.get(
f"{suffix}{url_limit}{url_after}{url_fields}{url_params}"
)
if self._use_raw_data:
return resp

View File

@ -42,6 +42,7 @@ from metadata.generated.schema.entity.data.chart import ChartType
from metadata.generated.schema.entity.data.location import Location
from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.policies.policy import Policy
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type.entityReference import EntityReference
@ -49,7 +50,7 @@ from metadata.ingestion.api.common import Entity, WorkflowContext
from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.models.ometa_policy import OMetaPolicy
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.models.table_metadata import Chart, Dashboard, DeleteTable
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
@ -128,6 +129,8 @@ class MetadataRestSink(Sink[Entity]):
self.write_users(record)
elif isinstance(record, CreateMlModelEntityRequest):
self.write_ml_model(record)
elif isinstance(record, DeleteTable):
self.delete_table(record)
else:
logging.info(
f"Ignoring the record due to unknown Record type {type(record)}"
@ -416,8 +419,19 @@ class MetadataRestSink(Sink[Entity]):
self.status.records_written(record.displayName)
logger.info("Sink: {}".format(record.displayName))
except Exception as err:
logger.error(traceback.format_exc())
logger.error(traceback.print_exc())
logger.debug(traceback.format_exc())
logger.debug(traceback.print_exc())
logger.error(err)
def delete_table(self, record: DeleteTable):
try:
self.metadata.delete(entity=Table, entity_id=record.table.id)
logger.info(
f"{record.table.name} doesn't exist in source state, marking it as deleted"
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.debug(traceback.print_exc())
logger.error(err)
def get_status(self):

View File

@ -43,11 +43,14 @@ from metadata.generated.schema.entity.services.databaseService import (
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import (
ConfigModel,
Entity,
IncludeFilterPattern,
WorkflowContext,
)
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import DeleteTable
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.utils.column_helpers import check_column_complex_type, get_column_type
from metadata.utils.helpers import get_database_service_or_create
@ -135,6 +138,7 @@ class SQLConnectionConfig(ConfigModel):
schema_filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
dbt_manifest_file: Optional[str] = None
dbt_catalog_file: Optional[str] = None
mark_deleted_tables_as_deleted: Optional[bool] = True
@abstractmethod
def get_connection_url(self):
@ -183,6 +187,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
self.config = config
self.metadata_config = metadata_config
self.service = get_database_service_or_create(config, metadata_config)
self.metadata = OpenMetadata(metadata_config)
self.status = SQLSourceStatus()
self.sql_config = self.config
self.connection_string = self.sql_config.get_connection_url()
@ -194,6 +199,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
self.connection = self.engine.connect()
self.data_profiler = None
self.data_models = {}
self.database_source_state = set()
if self.config.dbt_catalog_file is not None:
with open(self.config.dbt_catalog_file, "r", encoding="utf-8") as catalog:
self.dbt_catalog = json.load(catalog)
@ -263,17 +269,21 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
logger.error(f"Failed to generate sample data for {table} - {err}")
return None
def next_record(self) -> Iterable[OMetaDatabaseAndTable]:
def next_record(self) -> Iterable[Entity]:
inspector = inspect(self.engine)
for schema in inspector.get_schema_names():
# clear any previous source database state
self.database_source_state.clear()
if not self.sql_config.schema_filter_pattern.included(schema):
self.status.filter(schema, "Schema pattern not allowed")
continue
logger.debug(f"Total tables {inspector.get_table_names(schema)}")
if self.config.include_tables:
yield from self.fetch_tables(inspector, schema)
if self.config.include_views:
yield from self.fetch_views(inspector, schema)
if self.config.mark_deleted_tables_as_deleted:
schema_fqdn = f"{self.config.service_name}.{schema}"
yield from self.delete_tables(schema_fqdn)
def fetch_tables(
self, inspector: Inspector, schema: str
@ -296,7 +306,8 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
self.status.scanned(f"{self.config.get_service_name()}.{table_name}")
description = _get_table_description(schema, table_name, inspector)
fqn = f"{self.config.service_name}.{self.config.database}.{schema}.{table_name}"
fqn = f"{self.config.service_name}.{schema}.{table_name}"
self.database_source_state.add(fqn)
table_columns = self._get_columns(schema, table_name, inspector)
table_entity = Table(
id=uuid.uuid4(),
@ -366,7 +377,8 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
)
except NotImplementedError:
view_definition = ""
fqn = f"{self.config.service_name}.{schema}.{view_name}"
self.database_source_state.add(fqn)
table = Table(
id=uuid.uuid4(),
name=view_name.replace(".", "_DOT_"),
@ -392,6 +404,12 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
self.status.warnings.append(f"{self.config.service_name}.{view_name}")
continue
def delete_tables(self, schema_fqdn: str) -> DeleteTable:
database_state = self._build_database_state(schema_fqdn)
for table in database_state:
if table.fullyQualifiedName not in self.database_source_state:
yield DeleteTable(table=table)
def _parse_data_model(self):
"""
Get all the DBT information and feed it to the Table Entity
@ -643,6 +661,19 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
logger.debug(f"Finished profiling {dataset_name}")
return profile
def _build_database_state(self, schema_fqdn: str) -> [EntityReference]:
after = None
tables = []
while True:
table_entities = self.metadata.list_entities(
entity=Table, after=after, limit=10, params={"database": schema_fqdn}
)
tables.extend(table_entities.entities)
if table_entities.after is None:
break
after = table_entities.after
return tables
def close(self):
if self.connection is not None:
self.connection.close()