diff --git a/metadata-ingestion/src/datahub/emitter/mcp_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_builder.py index 3bcfcd9909..a288e807af 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_builder.py @@ -201,6 +201,7 @@ def gen_containers( name: str, sub_types: List[str], parent_container_key: Optional[PlatformKey] = None, + extra_properties: Optional[Dict[str, str]] = None, domain_urn: Optional[str] = None, description: Optional[str] = None, owner_urn: Optional[str] = None, @@ -221,7 +222,10 @@ def gen_containers( aspect=ContainerProperties( name=name, description=description, - customProperties=container_key.guid_dict(), + customProperties={ + **container_key.guid_dict(), + **(extra_properties or {}), + }, externalUrl=external_url, qualifiedName=qualified_name, created=TimeStamp(time=created) if created is not None else None, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index 54efda036d..b8f341cf27 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -163,7 +163,7 @@ class AthenaSource(SQLAlchemySource): return schemas def gen_database_containers( - self, database: str + self, inspector: Inspector, database: str ) -> typing.Iterable[MetadataWorkUnit]: # In Athena the schema is the database and database is not existing return [] @@ -177,7 +177,7 @@ class AthenaSource(SQLAlchemySource): ) def gen_schema_containers( - self, schema: str, db_name: str + self, inspector: Inspector, schema: str, db_name: str ) -> typing.Iterable[MetadataWorkUnit]: database_container_key = self.gen_database_key(database=schema) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index cc4affbe7d..77d0832865 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -1196,7 +1196,9 @@ WHERE backcompat_instance_for_guid=self.config.env, ) - def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]: + def gen_database_containers( + self, inspector: Inspector, database: str + ) -> Iterable[MetadataWorkUnit]: domain_urn = self._gen_domain_urn(database) database_container_key = self.gen_database_key(database) @@ -1213,7 +1215,7 @@ WHERE yield wu def gen_schema_containers( - self, schema: str, db_name: str + self, inspector: Inspector, schema: str, db_name: str ) -> Iterable[MetadataWorkUnit]: schema_container_key = self.gen_schema_key(db_name, schema) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py b/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py index d89e07ccb0..718585b492 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py @@ -317,7 +317,9 @@ class PrestoOnHiveSource(SQLAlchemySource): config = PrestoOnHiveConfig.parse_obj(config_dict) return cls(config, ctx) - def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]: + def gen_database_containers( + self, inspector: Inspector, database: str + ) -> Iterable[MetadataWorkUnit]: domain_urn = self._gen_domain_urn(database) database_container_key = self.gen_database_key(database) @@ -333,7 +335,7 @@ class PrestoOnHiveSource(SQLAlchemySource): yield wu def gen_schema_containers( - self, schema: str, db_name: str + self, inspector: Inspector, schema: str, db_name: str ) -> Iterable[MetadataWorkUnit]: assert isinstance(self.config, PrestoOnHiveConfig) where_clause_suffix: str = "" diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 0214593296..84118b305b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -545,7 +545,9 @@ class SQLAlchemySource(StatefulIngestionSourceBase): backcompat_instance_for_guid=self.config.env, ) - def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]: + def gen_database_containers( + self, inspector: Inspector, database: str + ) -> Iterable[MetadataWorkUnit]: domain_urn = self._gen_domain_urn(database) database_container_key = self.gen_database_key(database) @@ -554,6 +556,7 @@ class SQLAlchemySource(StatefulIngestionSourceBase): name=database, sub_types=[SqlContainerSubTypes.DATABASE], domain_urn=domain_urn, + extra_properties=self.get_database_properties(inspector, database=database), ) # Add container to the checkpoint state @@ -567,7 +570,7 @@ class SQLAlchemySource(StatefulIngestionSourceBase): yield wu def gen_schema_containers( - self, schema: str, db_name: str + self, inspector: Inspector, schema: str, db_name: str ) -> Iterable[MetadataWorkUnit]: schema_container_key = self.gen_schema_key(db_name, schema) @@ -576,11 +579,13 @@ class SQLAlchemySource(StatefulIngestionSourceBase): database_container_key = self.gen_database_key(database=db_name) container_workunits = gen_containers( - # TODO: this one is bad - schema_container_key, - schema, - [SqlContainerSubTypes.SCHEMA], - database_container_key, + container_key=schema_container_key, + name=schema, + sub_types=[SqlContainerSubTypes.SCHEMA], + parent_container_key=database_container_key, + extra_properties=self.get_schema_properties( + inspector, database=db_name, schema=schema + ), ) # Add container to the checkpoint state @@ -623,12 +628,16 @@ class SQLAlchemySource(StatefulIngestionSourceBase): profiler = self.get_profiler_instance(inspector) db_name = self.get_db_name(inspector) - yield from self.gen_database_containers(db_name) + yield from self.gen_database_containers( + inspector=inspector, database=db_name + ) for schema in self.get_allowed_schemas(inspector, db_name): self.add_information_for_schema(inspector, schema) - yield from self.gen_schema_containers(schema, db_name) + yield from self.gen_schema_containers( + inspector=inspector, schema=schema, db_name=db_name + ) if sql_config.include_tables: yield from self.loop_tables(inspector, schema, sql_config) @@ -893,6 +902,16 @@ class SQLAlchemySource(StatefulIngestionSourceBase): sql_config=sql_config, ) + def get_database_properties( + self, inspector: Inspector, database: str + ) -> Optional[Dict[str, str]]: + return None + + def get_schema_properties( + self, inspector: Inspector, database: str, schema: str + ) -> Optional[Dict[str, str]]: + return None + def get_table_properties( self, inspector: Inspector, schema: str, table: str ) -> Tuple[Optional[str], Dict[str, str], Optional[str]]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py b/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py index f2914ae2c1..3691a22ff5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py @@ -90,7 +90,7 @@ class TwoTierSQLAlchemySource(SQLAlchemySource): yield inspector def gen_schema_containers( - self, schema: str, db_name: str + self, inspector: Inspector, schema: str, db_name: str ) -> typing.Iterable[MetadataWorkUnit]: return []