ingest(mysql): add storage bytes information (#8294)

Co-authored-by: Andrew Sikowitz <andrew.sikowitz@acryl.io>
This commit is contained in:
Aseem Bansal 2023-07-21 03:01:06 +05:30 committed by GitHub
parent b5e039ff4e
commit f4c0ed3aab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 48 additions and 0 deletions

View File

@ -5,6 +5,7 @@ from pydantic.fields import Field
from sqlalchemy import util
from sqlalchemy.dialects.mysql import base
from sqlalchemy.dialects.mysql.enumerated import SET
from sqlalchemy.engine.reflection import Inspector
from datahub.ingestion.api.decorators import (
SourceCapability,
@ -83,3 +84,14 @@ class MySQLSource(TwoTierSQLAlchemySource):
def create(cls, config_dict, ctx):
config = MySQLConfig.parse_obj(config_dict)
return cls(config, ctx)
def add_profile_metadata(self, inspector: Inspector) -> None:
if not self.config.profiling.enabled:
return
with inspector.engine.connect() as conn:
for row in conn.execute(
"SELECT table_schema, table_name, data_length from information_schema.tables"
):
self.profile_metadata_info.dataset_name_to_storage_bytes[
f"{row.table_schema}.{row.table_name}"
] = row.data_length

View File

@ -309,6 +309,15 @@ config_options_to_report = [
]
@dataclass
class ProfileMetadata:
"""
A class to hold information about the table for profile enrichment
"""
dataset_name_to_storage_bytes: Dict[str, int] = field(default_factory=dict)
class SQLAlchemySource(StatefulIngestionSourceBase):
"""A Base class for all SQL Sources that use SQLAlchemy to extend"""
@ -317,6 +326,7 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
self.config = config
self.platform = platform
self.report: SQLSourceReport = SQLSourceReport()
self.profile_metadata_info: ProfileMetadata = ProfileMetadata()
config_report = {
config_option: config.dict().get(config_option)
@ -484,6 +494,16 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
profile_requests: List["GEProfilerRequest"] = []
if sql_config.profiling.enabled:
profiler = self.get_profiler_instance(inspector)
try:
self.add_profile_metadata(inspector)
except Exception as e:
logger.warning(
"Failed to get enrichment data for profiler", exc_info=True
)
self.report.report_warning(
"profile_metadata",
f"Failed to get enrichment data for profile {e}",
)
db_name = self.get_db_name(inspector)
yield from self.gen_database_containers(
@ -1098,6 +1118,13 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
),
)
def add_profile_metadata(self, inspector: Inspector) -> None:
"""
Method to add profile metadata in a sub-class that can be used to enrich profile metadata.
This is meant to change self.profile_metadata_info in the sub-class.
"""
pass
def loop_profiler(
self,
profile_requests: List["GEProfilerRequest"],
@ -1113,6 +1140,15 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
if profile is None:
continue
dataset_name = request.pretty_name
if (
dataset_name in self.profile_metadata_info.dataset_name_to_storage_bytes
and profile.sizeInBytes is None
):
profile.sizeInBytes = (
self.profile_metadata_info.dataset_name_to_storage_bytes[
dataset_name
]
)
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,