feat(ingest): add db/schema properties hook to SQL common (#6847)

This commit is contained in:
Harshal Sheth 2022-12-22 16:38:59 -05:00 committed by GitHub
parent 2ef2ad05d0
commit 1d0c7852a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 44 additions and 17 deletions

View File

@ -201,6 +201,7 @@ def gen_containers(
name: str, name: str,
sub_types: List[str], sub_types: List[str],
parent_container_key: Optional[PlatformKey] = None, parent_container_key: Optional[PlatformKey] = None,
extra_properties: Optional[Dict[str, str]] = None,
domain_urn: Optional[str] = None, domain_urn: Optional[str] = None,
description: Optional[str] = None, description: Optional[str] = None,
owner_urn: Optional[str] = None, owner_urn: Optional[str] = None,
@ -221,7 +222,10 @@ def gen_containers(
aspect=ContainerProperties( aspect=ContainerProperties(
name=name, name=name,
description=description, description=description,
customProperties=container_key.guid_dict(), customProperties={
**container_key.guid_dict(),
**(extra_properties or {}),
},
externalUrl=external_url, externalUrl=external_url,
qualifiedName=qualified_name, qualifiedName=qualified_name,
created=TimeStamp(time=created) if created is not None else None, created=TimeStamp(time=created) if created is not None else None,

View File

@ -163,7 +163,7 @@ class AthenaSource(SQLAlchemySource):
return schemas return schemas
def gen_database_containers( def gen_database_containers(
self, database: str self, inspector: Inspector, database: str
) -> typing.Iterable[MetadataWorkUnit]: ) -> typing.Iterable[MetadataWorkUnit]:
# In Athena the schema is the database and database is not existing # In Athena the schema is the database and database is not existing
return [] return []
@ -177,7 +177,7 @@ class AthenaSource(SQLAlchemySource):
) )
def gen_schema_containers( def gen_schema_containers(
self, schema: str, db_name: str self, inspector: Inspector, schema: str, db_name: str
) -> typing.Iterable[MetadataWorkUnit]: ) -> typing.Iterable[MetadataWorkUnit]:
database_container_key = self.gen_database_key(database=schema) database_container_key = self.gen_database_key(database=schema)

View File

@ -1196,7 +1196,9 @@ WHERE
backcompat_instance_for_guid=self.config.env, backcompat_instance_for_guid=self.config.env,
) )
def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]: def gen_database_containers(
self, inspector: Inspector, database: str
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(database) domain_urn = self._gen_domain_urn(database)
database_container_key = self.gen_database_key(database) database_container_key = self.gen_database_key(database)
@ -1213,7 +1215,7 @@ WHERE
yield wu yield wu
def gen_schema_containers( def gen_schema_containers(
self, schema: str, db_name: str self, inspector: Inspector, schema: str, db_name: str
) -> Iterable[MetadataWorkUnit]: ) -> Iterable[MetadataWorkUnit]:
schema_container_key = self.gen_schema_key(db_name, schema) schema_container_key = self.gen_schema_key(db_name, schema)

View File

@ -317,7 +317,9 @@ class PrestoOnHiveSource(SQLAlchemySource):
config = PrestoOnHiveConfig.parse_obj(config_dict) config = PrestoOnHiveConfig.parse_obj(config_dict)
return cls(config, ctx) return cls(config, ctx)
def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]: def gen_database_containers(
self, inspector: Inspector, database: str
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(database) domain_urn = self._gen_domain_urn(database)
database_container_key = self.gen_database_key(database) database_container_key = self.gen_database_key(database)
@ -333,7 +335,7 @@ class PrestoOnHiveSource(SQLAlchemySource):
yield wu yield wu
def gen_schema_containers( def gen_schema_containers(
self, schema: str, db_name: str self, inspector: Inspector, schema: str, db_name: str
) -> Iterable[MetadataWorkUnit]: ) -> Iterable[MetadataWorkUnit]:
assert isinstance(self.config, PrestoOnHiveConfig) assert isinstance(self.config, PrestoOnHiveConfig)
where_clause_suffix: str = "" where_clause_suffix: str = ""

View File

@ -545,7 +545,9 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
backcompat_instance_for_guid=self.config.env, backcompat_instance_for_guid=self.config.env,
) )
def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]: def gen_database_containers(
self, inspector: Inspector, database: str
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(database) domain_urn = self._gen_domain_urn(database)
database_container_key = self.gen_database_key(database) database_container_key = self.gen_database_key(database)
@ -554,6 +556,7 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
name=database, name=database,
sub_types=[SqlContainerSubTypes.DATABASE], sub_types=[SqlContainerSubTypes.DATABASE],
domain_urn=domain_urn, domain_urn=domain_urn,
extra_properties=self.get_database_properties(inspector, database=database),
) )
# Add container to the checkpoint state # Add container to the checkpoint state
@ -567,7 +570,7 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
yield wu yield wu
def gen_schema_containers( def gen_schema_containers(
self, schema: str, db_name: str self, inspector: Inspector, schema: str, db_name: str
) -> Iterable[MetadataWorkUnit]: ) -> Iterable[MetadataWorkUnit]:
schema_container_key = self.gen_schema_key(db_name, schema) schema_container_key = self.gen_schema_key(db_name, schema)
@ -576,11 +579,13 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
database_container_key = self.gen_database_key(database=db_name) database_container_key = self.gen_database_key(database=db_name)
container_workunits = gen_containers( container_workunits = gen_containers(
# TODO: this one is bad container_key=schema_container_key,
schema_container_key, name=schema,
schema, sub_types=[SqlContainerSubTypes.SCHEMA],
[SqlContainerSubTypes.SCHEMA], parent_container_key=database_container_key,
database_container_key, extra_properties=self.get_schema_properties(
inspector, database=db_name, schema=schema
),
) )
# Add container to the checkpoint state # Add container to the checkpoint state
@ -623,12 +628,16 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
profiler = self.get_profiler_instance(inspector) profiler = self.get_profiler_instance(inspector)
db_name = self.get_db_name(inspector) db_name = self.get_db_name(inspector)
yield from self.gen_database_containers(db_name) yield from self.gen_database_containers(
inspector=inspector, database=db_name
)
for schema in self.get_allowed_schemas(inspector, db_name): for schema in self.get_allowed_schemas(inspector, db_name):
self.add_information_for_schema(inspector, schema) self.add_information_for_schema(inspector, schema)
yield from self.gen_schema_containers(schema, db_name) yield from self.gen_schema_containers(
inspector=inspector, schema=schema, db_name=db_name
)
if sql_config.include_tables: if sql_config.include_tables:
yield from self.loop_tables(inspector, schema, sql_config) yield from self.loop_tables(inspector, schema, sql_config)
@ -893,6 +902,16 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
sql_config=sql_config, sql_config=sql_config,
) )
def get_database_properties(
self, inspector: Inspector, database: str
) -> Optional[Dict[str, str]]:
return None
def get_schema_properties(
self, inspector: Inspector, database: str, schema: str
) -> Optional[Dict[str, str]]:
return None
def get_table_properties( def get_table_properties(
self, inspector: Inspector, schema: str, table: str self, inspector: Inspector, schema: str, table: str
) -> Tuple[Optional[str], Dict[str, str], Optional[str]]: ) -> Tuple[Optional[str], Dict[str, str], Optional[str]]:

View File

@ -90,7 +90,7 @@ class TwoTierSQLAlchemySource(SQLAlchemySource):
yield inspector yield inspector
def gen_schema_containers( def gen_schema_containers(
self, schema: str, db_name: str self, inspector: Inspector, schema: str, db_name: str
) -> typing.Iterable[MetadataWorkUnit]: ) -> typing.Iterable[MetadataWorkUnit]:
return [] return []