mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-11-03 20:27:50 +00:00 
			
		
		
		
	fix(ingest/bigquery): Lowering significantly the memory usage of the BigQuery connector (#7315)
This commit is contained in:
		
							parent
							
								
									f20aea9be3
								
							
						
					
					
						commit
						793f303a79
					
				@ -576,6 +576,7 @@ entry_points = {
 | 
			
		||||
    "datahub.ingestion.sink.plugins": [
 | 
			
		||||
        "file = datahub.ingestion.sink.file:FileSink",
 | 
			
		||||
        "console = datahub.ingestion.sink.console:ConsoleSink",
 | 
			
		||||
        "blackhole = datahub.ingestion.sink.blackhole:BlackHoleSink",
 | 
			
		||||
        "datahub-kafka = datahub.ingestion.sink.datahub_kafka:DatahubKafkaSink",
 | 
			
		||||
        "datahub-rest = datahub.ingestion.sink.datahub_rest:DatahubRestSink",
 | 
			
		||||
        "datahub-lite = datahub.ingestion.sink.datahub_lite:DataHubLiteSink",
 | 
			
		||||
 | 
			
		||||
@ -123,11 +123,15 @@ class CliReport(Report):
 | 
			
		||||
    py_version: str = sys.version
 | 
			
		||||
    py_exec_path: str = sys.executable
 | 
			
		||||
    os_details: str = platform.platform()
 | 
			
		||||
    _peek_memory_usage: int = 0
 | 
			
		||||
 | 
			
		||||
    def compute_stats(self) -> None:
 | 
			
		||||
        self.mem_info = humanfriendly.format_size(
 | 
			
		||||
            psutil.Process(os.getpid()).memory_info().rss
 | 
			
		||||
        )
 | 
			
		||||
        mem_usage = psutil.Process(os.getpid()).memory_info().rss
 | 
			
		||||
        if self._peek_memory_usage < mem_usage:
 | 
			
		||||
            self._peek_memory_usage = mem_usage
 | 
			
		||||
            self.peek_memory_usage = humanfriendly.format_size(self._peek_memory_usage)
 | 
			
		||||
 | 
			
		||||
        self.mem_info = humanfriendly.format_size(self._peek_memory_usage)
 | 
			
		||||
        return super().compute_stats()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										16
									
								
								metadata-ingestion/src/datahub/ingestion/sink/blackhole.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										16
									
								
								metadata-ingestion/src/datahub/ingestion/sink/blackhole.py
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,16 @@
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
from datahub.configuration.common import ConfigModel
 | 
			
		||||
from datahub.ingestion.api.common import RecordEnvelope
 | 
			
		||||
from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback
 | 
			
		||||
 | 
			
		||||
logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class BlackHoleSink(Sink[ConfigModel, SinkReport]):
 | 
			
		||||
    def write_record_async(
 | 
			
		||||
        self, record_envelope: RecordEnvelope, write_callback: WriteCallback
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        if write_callback:
 | 
			
		||||
            self.report.report_record_written(record_envelope)
 | 
			
		||||
            write_callback.on_success(record_envelope, {})
 | 
			
		||||
@ -13,3 +13,8 @@ def _check_sink_classes(cls: Type[Sink]) -> None:
 | 
			
		||||
 | 
			
		||||
sink_registry = PluginRegistry[Sink](extra_cls_check=_check_sink_classes)
 | 
			
		||||
sink_registry.register_from_entrypoint("datahub.ingestion.sink.plugins")
 | 
			
		||||
 | 
			
		||||
# These sinks are always enabled
 | 
			
		||||
assert sink_registry.get("console")
 | 
			
		||||
assert sink_registry.get("file")
 | 
			
		||||
assert sink_registry.get("blackhole")
 | 
			
		||||
 | 
			
		||||
@ -199,7 +199,6 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
        self.config: BigQueryV2Config = config
 | 
			
		||||
        self.report: BigQueryV2Report = BigQueryV2Report()
 | 
			
		||||
        self.platform: str = "bigquery"
 | 
			
		||||
 | 
			
		||||
        BigqueryTableIdentifier._BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX = (
 | 
			
		||||
            self.config.sharded_table_pattern
 | 
			
		||||
        )
 | 
			
		||||
@ -214,8 +213,8 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
 | 
			
		||||
        # Currently caching using instance variables
 | 
			
		||||
        # TODO - rewrite cache for readability or use out of the box solution
 | 
			
		||||
        self.db_tables: Dict[str, Dict[str, List[BigqueryTable]]] = {}
 | 
			
		||||
        self.db_views: Dict[str, Dict[str, List[BigqueryView]]] = {}
 | 
			
		||||
        self.db_tables: Dict[str, List[BigqueryTable]] = {}
 | 
			
		||||
        self.db_views: Dict[str, List[BigqueryView]] = {}
 | 
			
		||||
 | 
			
		||||
        self.schema_columns: Dict[
 | 
			
		||||
            Tuple[str, str], Optional[Dict[str, List[BigqueryColumn]]]
 | 
			
		||||
@ -523,11 +522,6 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
            self.report.set_project_state(project_id.id, "Metadata Extraction")
 | 
			
		||||
            yield from self._process_project(conn, project_id)
 | 
			
		||||
 | 
			
		||||
        if self.config.profiling.enabled:
 | 
			
		||||
            logger.info("Starting profiling...")
 | 
			
		||||
            self.report.set_project_state(project_id.id, "Profiling")
 | 
			
		||||
            yield from self.profiler.get_workunits(self.db_tables)
 | 
			
		||||
 | 
			
		||||
    def get_workunits(self) -> Iterable[MetadataWorkUnit]:
 | 
			
		||||
        return auto_stale_entity_removal(
 | 
			
		||||
            self.stale_entity_removal_handler,
 | 
			
		||||
@ -542,9 +536,6 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
    ) -> Iterable[MetadataWorkUnit]:
 | 
			
		||||
        project_id = bigquery_project.id
 | 
			
		||||
 | 
			
		||||
        self.db_tables[project_id] = {}
 | 
			
		||||
        self.db_views[project_id] = {}
 | 
			
		||||
 | 
			
		||||
        yield from self.gen_project_id_containers(project_id)
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
@ -592,7 +583,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
                logger.error(error_message)
 | 
			
		||||
                self.report.report_failure(
 | 
			
		||||
                    "metadata-extraction",
 | 
			
		||||
                    f"{project_id}.{bigquery_dataset.name} - {error_message}",
 | 
			
		||||
                    f"{project_id}.{bigquery_dataset.name} - {error_message} - {trace}",
 | 
			
		||||
                )
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
@ -642,10 +633,18 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
            self.report.set_project_state(project_id, "Usage Extraction")
 | 
			
		||||
            yield from self.generate_usage_statistics(project_id)
 | 
			
		||||
 | 
			
		||||
        if self.config.profiling.enabled:
 | 
			
		||||
            logger.info(f"Starting profiling project {project_id}")
 | 
			
		||||
            self.report.set_project_state(project_id, "Profiling")
 | 
			
		||||
            yield from self.profiler.get_workunits(
 | 
			
		||||
                project_id=project_id,
 | 
			
		||||
                tables=self.db_tables,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def generate_lineage(self, project_id: str) -> Iterable[MetadataWorkUnit]:
 | 
			
		||||
        logger.info(f"Generate lineage for {project_id}")
 | 
			
		||||
        for dataset in self.db_tables[project_id]:
 | 
			
		||||
            for table in self.db_tables[project_id][dataset]:
 | 
			
		||||
        for dataset in self.db_tables:
 | 
			
		||||
            for table in self.db_tables[dataset]:
 | 
			
		||||
                dataset_urn = self.gen_dataset_urn(dataset, project_id, table.name)
 | 
			
		||||
                lineage_info = self.lineage_extractor.get_upstream_lineage_info(
 | 
			
		||||
                    project_id=project_id,
 | 
			
		||||
@ -655,8 +654,8 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
                )
 | 
			
		||||
                if lineage_info:
 | 
			
		||||
                    yield from self.gen_lineage(dataset_urn, lineage_info)
 | 
			
		||||
        for dataset in self.db_views[project_id]:
 | 
			
		||||
            for view in self.db_views[project_id][dataset]:
 | 
			
		||||
        for dataset in self.db_views:
 | 
			
		||||
            for view in self.db_views[dataset]:
 | 
			
		||||
                dataset_urn = self.gen_dataset_urn(dataset, project_id, view.name)
 | 
			
		||||
                lineage_info = self.lineage_extractor.get_upstream_lineage_info(
 | 
			
		||||
                    project_id=project_id,
 | 
			
		||||
@ -669,16 +668,21 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
    def generate_usage_statistics(self, project_id: str) -> Iterable[MetadataWorkUnit]:
 | 
			
		||||
        logger.info(f"Generate usage for {project_id}")
 | 
			
		||||
        tables: Dict[str, List[str]] = defaultdict()
 | 
			
		||||
        for dataset in self.db_tables[project_id]:
 | 
			
		||||
        for dataset in self.db_tables.keys():
 | 
			
		||||
            tables[dataset] = [
 | 
			
		||||
                BigqueryTableIdentifier(
 | 
			
		||||
                    project_id, dataset, table.name
 | 
			
		||||
                ).get_table_name()
 | 
			
		||||
                for table in self.db_tables[project_id][dataset]
 | 
			
		||||
                for table in self.db_tables[dataset]
 | 
			
		||||
            ]
 | 
			
		||||
        for dataset in self.db_views[project_id]:
 | 
			
		||||
        for dataset in self.db_views.keys():
 | 
			
		||||
            tables[dataset].extend(
 | 
			
		||||
                [table.name for table in self.db_views[project_id][dataset]]
 | 
			
		||||
                [
 | 
			
		||||
                    BigqueryTableIdentifier(
 | 
			
		||||
                        project_id, dataset, view.name
 | 
			
		||||
                    ).get_table_name()
 | 
			
		||||
                    for view in self.db_views[dataset]
 | 
			
		||||
                ]
 | 
			
		||||
            )
 | 
			
		||||
        yield from self.usage_extractor.generate_usage_for_project(project_id, tables)
 | 
			
		||||
 | 
			
		||||
@ -692,12 +696,21 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
            project_id,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        if self.config.include_tables:
 | 
			
		||||
            bigquery_dataset.tables = self.get_tables_for_dataset(
 | 
			
		||||
                conn, project_id, dataset_name
 | 
			
		||||
        columns = BigQueryDataDictionary.get_columns_for_dataset(
 | 
			
		||||
            conn,
 | 
			
		||||
            project_id=project_id,
 | 
			
		||||
            dataset_name=dataset_name,
 | 
			
		||||
            column_limit=self.config.column_limit,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        if self.config.include_tables:
 | 
			
		||||
            tables = self.get_tables_for_dataset(conn, project_id, dataset_name)
 | 
			
		||||
            for table in tables:
 | 
			
		||||
                table_columns = columns.get(table.name, []) if columns else []
 | 
			
		||||
 | 
			
		||||
                yield from self._process_table(
 | 
			
		||||
                    conn, table, table_columns, project_id, dataset_name
 | 
			
		||||
                )
 | 
			
		||||
            for table in bigquery_dataset.tables:
 | 
			
		||||
                yield from self._process_table(conn, table, project_id, dataset_name)
 | 
			
		||||
 | 
			
		||||
        if self.config.include_views:
 | 
			
		||||
            bigquery_dataset.views = self.get_views_for_dataset(
 | 
			
		||||
@ -705,12 +718,28 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            for view in bigquery_dataset.views:
 | 
			
		||||
                yield from self._process_view(conn, view, project_id, dataset_name)
 | 
			
		||||
                view_columns = columns.get(view.name, []) if columns else []
 | 
			
		||||
                yield from self._process_view(
 | 
			
		||||
                    view, view_columns, project_id, dataset_name
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
    # This method is used to generate the ignore list for datatypes the profiler doesn't support we have to do it here
 | 
			
		||||
    # because the profiler doesn't have access to columns
 | 
			
		||||
    def generate_profile_ignore_list(self, columns: List[BigqueryColumn]) -> List[str]:
 | 
			
		||||
        ignore_list: List[str] = []
 | 
			
		||||
        for column in columns:
 | 
			
		||||
            if not column.data_type or any(
 | 
			
		||||
                word in column.data_type.lower()
 | 
			
		||||
                for word in ["array", "struct", "geography", "json"]
 | 
			
		||||
            ):
 | 
			
		||||
                ignore_list.append(column.field_path)
 | 
			
		||||
        return ignore_list
 | 
			
		||||
 | 
			
		||||
    def _process_table(
 | 
			
		||||
        self,
 | 
			
		||||
        conn: bigquery.Client,
 | 
			
		||||
        table: BigqueryTable,
 | 
			
		||||
        columns: List[BigqueryColumn],
 | 
			
		||||
        project_id: str,
 | 
			
		||||
        schema_name: str,
 | 
			
		||||
    ) -> Iterable[MetadataWorkUnit]:
 | 
			
		||||
@ -722,20 +751,40 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
            self.report.report_dropped(table_identifier.raw_table_name())
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        table.columns = self.get_columns_for_table(
 | 
			
		||||
            conn, table_identifier, self.config.column_limit
 | 
			
		||||
        table.column_count = len(columns)
 | 
			
		||||
 | 
			
		||||
        # We only collect profile ignore list if profiling is enabled and profile_table_level_only is false
 | 
			
		||||
        if (
 | 
			
		||||
            self.config.profiling.enabled
 | 
			
		||||
            and not self.config.profiling.profile_table_level_only
 | 
			
		||||
        ):
 | 
			
		||||
            table.columns_ignore_from_profiling = self.generate_profile_ignore_list(
 | 
			
		||||
                columns
 | 
			
		||||
            )
 | 
			
		||||
        if not table.columns:
 | 
			
		||||
 | 
			
		||||
        if not table.column_count:
 | 
			
		||||
            logger.warning(
 | 
			
		||||
                f"Table doesn't have any column or unable to get columns for table: {table_identifier}"
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        yield from self.gen_table_dataset_workunits(table, project_id, schema_name)
 | 
			
		||||
        # If table has time partitioning, set the data type of the partitioning field
 | 
			
		||||
        if table.time_partitioning:
 | 
			
		||||
            table.time_partitioning.column = next(
 | 
			
		||||
                (
 | 
			
		||||
                    column
 | 
			
		||||
                    for column in columns
 | 
			
		||||
                    if column.name == table.time_partitioning.field
 | 
			
		||||
                ),
 | 
			
		||||
                None,
 | 
			
		||||
            )
 | 
			
		||||
        yield from self.gen_table_dataset_workunits(
 | 
			
		||||
            table, columns, project_id, schema_name
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def _process_view(
 | 
			
		||||
        self,
 | 
			
		||||
        conn: bigquery.Client,
 | 
			
		||||
        view: BigqueryView,
 | 
			
		||||
        columns: List[BigqueryColumn],
 | 
			
		||||
        project_id: str,
 | 
			
		||||
        dataset_name: str,
 | 
			
		||||
    ) -> Iterable[MetadataWorkUnit]:
 | 
			
		||||
@ -747,20 +796,23 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
            self.report.report_dropped(table_identifier.raw_table_name())
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        view.columns = self.get_columns_for_table(
 | 
			
		||||
            conn, table_identifier, column_limit=self.config.column_limit
 | 
			
		||||
        view.column_count = len(columns)
 | 
			
		||||
        if not view.column_count:
 | 
			
		||||
            logger.warning(
 | 
			
		||||
                f"View doesn't have any column or unable to get columns for table: {table_identifier}"
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        if dataset_name not in self.db_views[project_id]:
 | 
			
		||||
            self.db_views[project_id][dataset_name] = []
 | 
			
		||||
 | 
			
		||||
        self.db_views[project_id][dataset_name].append(view)
 | 
			
		||||
 | 
			
		||||
        yield from self.gen_view_dataset_workunits(view, project_id, dataset_name)
 | 
			
		||||
        yield from self.gen_view_dataset_workunits(
 | 
			
		||||
            table=view,
 | 
			
		||||
            columns=columns,
 | 
			
		||||
            project_id=project_id,
 | 
			
		||||
            dataset_name=dataset_name,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def gen_table_dataset_workunits(
 | 
			
		||||
        self,
 | 
			
		||||
        table: BigqueryTable,
 | 
			
		||||
        columns: List[BigqueryColumn],
 | 
			
		||||
        project_id: str,
 | 
			
		||||
        dataset_name: str,
 | 
			
		||||
    ) -> Iterable[MetadataWorkUnit]:
 | 
			
		||||
@ -804,6 +856,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
 | 
			
		||||
        yield from self.gen_dataset_workunits(
 | 
			
		||||
            table=table,
 | 
			
		||||
            columns=columns,
 | 
			
		||||
            project_id=project_id,
 | 
			
		||||
            dataset_name=dataset_name,
 | 
			
		||||
            sub_types=sub_types,
 | 
			
		||||
@ -814,11 +867,13 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
    def gen_view_dataset_workunits(
 | 
			
		||||
        self,
 | 
			
		||||
        table: BigqueryView,
 | 
			
		||||
        columns: List[BigqueryColumn],
 | 
			
		||||
        project_id: str,
 | 
			
		||||
        dataset_name: str,
 | 
			
		||||
    ) -> Iterable[MetadataWorkUnit]:
 | 
			
		||||
        yield from self.gen_dataset_workunits(
 | 
			
		||||
            table=table,
 | 
			
		||||
            columns=columns,
 | 
			
		||||
            project_id=project_id,
 | 
			
		||||
            dataset_name=dataset_name,
 | 
			
		||||
            sub_types=["view"],
 | 
			
		||||
@ -837,6 +892,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
    def gen_dataset_workunits(
 | 
			
		||||
        self,
 | 
			
		||||
        table: Union[BigqueryTable, BigqueryView],
 | 
			
		||||
        columns: List[BigqueryColumn],
 | 
			
		||||
        project_id: str,
 | 
			
		||||
        dataset_name: str,
 | 
			
		||||
        sub_types: List[str],
 | 
			
		||||
@ -854,7 +910,9 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
            project_id, dataset_name, table.name
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        yield self.gen_schema_metadata(dataset_urn, table, str(datahub_dataset_name))
 | 
			
		||||
        yield self.gen_schema_metadata(
 | 
			
		||||
            dataset_urn, table, columns, str(datahub_dataset_name)
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        dataset_properties = DatasetProperties(
 | 
			
		||||
            name=datahub_dataset_name.get_table_display_name(),
 | 
			
		||||
@ -1013,6 +1071,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
        self,
 | 
			
		||||
        dataset_urn: str,
 | 
			
		||||
        table: Union[BigqueryTable, BigqueryView],
 | 
			
		||||
        columns: List[BigqueryColumn],
 | 
			
		||||
        dataset_name: str,
 | 
			
		||||
    ) -> MetadataWorkUnit:
 | 
			
		||||
        schema_metadata = SchemaMetadata(
 | 
			
		||||
@ -1021,7 +1080,8 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
            version=0,
 | 
			
		||||
            hash="",
 | 
			
		||||
            platformSchema=MySqlDDL(tableSchema=""),
 | 
			
		||||
            fields=self.gen_schema_fields(table.columns),
 | 
			
		||||
            # fields=[],
 | 
			
		||||
            fields=self.gen_schema_fields(columns),
 | 
			
		||||
        )
 | 
			
		||||
        return MetadataChangeProposalWrapper(
 | 
			
		||||
            entityUrn=dataset_urn, aspect=schema_metadata
 | 
			
		||||
@ -1036,10 +1096,8 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
        project_id: str,
 | 
			
		||||
        dataset_name: str,
 | 
			
		||||
    ) -> List[BigqueryTable]:
 | 
			
		||||
        bigquery_tables: Optional[List[BigqueryTable]] = (
 | 
			
		||||
            self.db_tables[project_id].get(dataset_name)
 | 
			
		||||
            if project_id in self.db_tables
 | 
			
		||||
            else []
 | 
			
		||||
        bigquery_tables: Optional[List[BigqueryTable]] = self.db_tables.get(
 | 
			
		||||
            dataset_name, []
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # In bigquery there is no way to query all tables in a Project id
 | 
			
		||||
@ -1148,7 +1206,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
                        )
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                self.db_tables[project_id][dataset_name] = bigquery_tables
 | 
			
		||||
                self.db_tables[dataset_name] = bigquery_tables
 | 
			
		||||
 | 
			
		||||
                self.report.metadata_extraction_sec[
 | 
			
		||||
                    f"{project_id}.{dataset_name}"
 | 
			
		||||
@ -1157,11 +1215,7 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
                return bigquery_tables
 | 
			
		||||
 | 
			
		||||
        # Some schema may not have any table
 | 
			
		||||
        return (
 | 
			
		||||
            self.db_tables[project_id].get(dataset_name, [])
 | 
			
		||||
            if project_id in self.db_tables
 | 
			
		||||
            else []
 | 
			
		||||
        )
 | 
			
		||||
        return self.db_tables.get(dataset_name, [])
 | 
			
		||||
 | 
			
		||||
    def get_views_for_dataset(
 | 
			
		||||
        self,
 | 
			
		||||
@ -1169,51 +1223,15 @@ class BigqueryV2Source(StatefulIngestionSourceBase, TestableSource):
 | 
			
		||||
        project_id: str,
 | 
			
		||||
        dataset_name: str,
 | 
			
		||||
    ) -> List[BigqueryView]:
 | 
			
		||||
        views = self.db_views.get(project_id, {}).get(dataset_name, [])
 | 
			
		||||
        views = self.db_views.get(dataset_name, [])
 | 
			
		||||
 | 
			
		||||
        if not views:
 | 
			
		||||
            return BigQueryDataDictionary.get_views_for_dataset(
 | 
			
		||||
            views = BigQueryDataDictionary.get_views_for_dataset(
 | 
			
		||||
                conn, project_id, dataset_name, self.config.profiling.enabled
 | 
			
		||||
            )
 | 
			
		||||
            self.db_views[dataset_name] = views
 | 
			
		||||
        return views
 | 
			
		||||
 | 
			
		||||
    def get_columns_for_table(
 | 
			
		||||
        self,
 | 
			
		||||
        conn: bigquery.Client,
 | 
			
		||||
        table_identifier: BigqueryTableIdentifier,
 | 
			
		||||
        column_limit: Optional[int] = None,
 | 
			
		||||
    ) -> List[BigqueryColumn]:
 | 
			
		||||
        if (
 | 
			
		||||
            table_identifier.project_id,
 | 
			
		||||
            table_identifier.dataset,
 | 
			
		||||
        ) not in self.schema_columns.keys():
 | 
			
		||||
            columns = BigQueryDataDictionary.get_columns_for_dataset(
 | 
			
		||||
                conn,
 | 
			
		||||
                project_id=table_identifier.project_id,
 | 
			
		||||
                dataset_name=table_identifier.dataset,
 | 
			
		||||
                column_limit=column_limit,
 | 
			
		||||
            )
 | 
			
		||||
            self.schema_columns[
 | 
			
		||||
                (table_identifier.project_id, table_identifier.dataset)
 | 
			
		||||
            ] = columns
 | 
			
		||||
        else:
 | 
			
		||||
            columns = self.schema_columns[
 | 
			
		||||
                (table_identifier.project_id, table_identifier.dataset)
 | 
			
		||||
            ]
 | 
			
		||||
 | 
			
		||||
        # get all columns for schema failed,
 | 
			
		||||
        # falling back to get columns for table
 | 
			
		||||
        if not columns:
 | 
			
		||||
            logger.warning(
 | 
			
		||||
                f"Couldn't get columns on the dataset level for {table_identifier}. Trying to get on table level..."
 | 
			
		||||
            )
 | 
			
		||||
            return BigQueryDataDictionary.get_columns_for_table(
 | 
			
		||||
                conn, table_identifier, self.config.column_limit
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        # Access to table but none of its columns - is this possible ?
 | 
			
		||||
        return columns.get(table_identifier.table, [])
 | 
			
		||||
 | 
			
		||||
    def add_config_to_report(self):
 | 
			
		||||
        self.report.include_table_lineage = self.config.include_table_lineage
 | 
			
		||||
        self.report.use_date_sharded_audit_log_tables = (
 | 
			
		||||
 | 
			
		||||
@ -164,6 +164,8 @@ class BigQueryV2Config(
 | 
			
		||||
    )
 | 
			
		||||
    _credentials_path: Optional[str] = PrivateAttr(None)
 | 
			
		||||
 | 
			
		||||
    _cache_path: Optional[str] = PrivateAttr(None)
 | 
			
		||||
 | 
			
		||||
    upstream_lineage_in_report: bool = Field(
 | 
			
		||||
        default=False,
 | 
			
		||||
        description="Useful for debugging lineage information. Set to True to see the raw lineage created internally.",
 | 
			
		||||
 | 
			
		||||
@ -5,7 +5,12 @@ from datetime import datetime, timezone
 | 
			
		||||
from typing import Dict, List, Optional, cast
 | 
			
		||||
 | 
			
		||||
from google.cloud import bigquery
 | 
			
		||||
from google.cloud.bigquery.table import RowIterator, TableListItem, TimePartitioning
 | 
			
		||||
from google.cloud.bigquery.table import (
 | 
			
		||||
    RowIterator,
 | 
			
		||||
    TableListItem,
 | 
			
		||||
    TimePartitioning,
 | 
			
		||||
    TimePartitioningType,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier
 | 
			
		||||
from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView
 | 
			
		||||
@ -19,6 +24,28 @@ class BigqueryColumn(BaseColumn):
 | 
			
		||||
    is_partition_column: bool
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@dataclass
 | 
			
		||||
class PartitionInfo:
 | 
			
		||||
    field: str
 | 
			
		||||
    # Data type is optional as we not have it when we set it from TimePartitioning
 | 
			
		||||
    column: Optional[BigqueryColumn] = None
 | 
			
		||||
    type: str = TimePartitioningType.DAY
 | 
			
		||||
    expiration_ms: Optional[int] = None
 | 
			
		||||
    require_partition_filter: bool = False
 | 
			
		||||
 | 
			
		||||
    # TimePartitioning field doesn't provide data_type so we have to add it afterwards
 | 
			
		||||
    @classmethod
 | 
			
		||||
    def from_time_partitioning(
 | 
			
		||||
        cls, time_partitioning: TimePartitioning
 | 
			
		||||
    ) -> "PartitionInfo":
 | 
			
		||||
        return cls(
 | 
			
		||||
            field=time_partitioning.field,
 | 
			
		||||
            type=time_partitioning.type_,
 | 
			
		||||
            expiration_ms=time_partitioning.expiration_ms,
 | 
			
		||||
            require_partition_filter=time_partitioning.require_partition_filter,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@dataclass
 | 
			
		||||
class BigqueryTable(BaseTable):
 | 
			
		||||
    expires: Optional[datetime] = None
 | 
			
		||||
@ -29,8 +56,8 @@ class BigqueryTable(BaseTable):
 | 
			
		||||
    max_shard_id: Optional[str] = None
 | 
			
		||||
    active_billable_bytes: Optional[int] = None
 | 
			
		||||
    long_term_billable_bytes: Optional[int] = None
 | 
			
		||||
    time_partitioning: Optional[TimePartitioning] = None
 | 
			
		||||
    columns: List[BigqueryColumn] = field(default_factory=list)
 | 
			
		||||
    time_partitioning: Optional[PartitionInfo] = None
 | 
			
		||||
    columns_ignore_from_profiling: List[str] = field(default_factory=list)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@dataclass
 | 
			
		||||
@ -47,6 +74,7 @@ class BigqueryDataset:
 | 
			
		||||
    comment: Optional[str] = None
 | 
			
		||||
    tables: List[BigqueryTable] = field(default_factory=list)
 | 
			
		||||
    views: List[BigqueryView] = field(default_factory=list)
 | 
			
		||||
    columns: List[BigqueryColumn] = field(default_factory=list)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@dataclass
 | 
			
		||||
@ -320,7 +348,6 @@ class BigQueryDataDictionary:
 | 
			
		||||
                    table_filter=f" and t.table_name in ({filter})" if filter else "",
 | 
			
		||||
                ),
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        # Some property we want to capture only available from the TableListItem we get from an earlier query of
 | 
			
		||||
        # the list of tables.
 | 
			
		||||
        return [
 | 
			
		||||
@ -338,8 +365,10 @@ class BigQueryDataDictionary:
 | 
			
		||||
                ddl=table.ddl,
 | 
			
		||||
                expires=tables[table.table_name].expires if tables else None,
 | 
			
		||||
                labels=tables[table.table_name].labels if tables else None,
 | 
			
		||||
                time_partitioning=tables[table.table_name].time_partitioning
 | 
			
		||||
                if tables
 | 
			
		||||
                time_partitioning=PartitionInfo.from_time_partitioning(
 | 
			
		||||
                    tables[table.table_name].time_partitioning
 | 
			
		||||
                )
 | 
			
		||||
                if tables and tables[table.table_name].time_partitioning
 | 
			
		||||
                else None,
 | 
			
		||||
                clustering_fields=tables[table.table_name].clustering_fields
 | 
			
		||||
                if tables
 | 
			
		||||
 | 
			
		||||
@ -11,10 +11,7 @@ from datahub.ingestion.api.workunit import MetadataWorkUnit
 | 
			
		||||
from datahub.ingestion.source.bigquery_v2.bigquery_audit import BigqueryTableIdentifier
 | 
			
		||||
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
 | 
			
		||||
from datahub.ingestion.source.bigquery_v2.bigquery_report import BigQueryV2Report
 | 
			
		||||
from datahub.ingestion.source.bigquery_v2.bigquery_schema import (
 | 
			
		||||
    BigqueryColumn,
 | 
			
		||||
    BigqueryTable,
 | 
			
		||||
)
 | 
			
		||||
from datahub.ingestion.source.bigquery_v2.bigquery_schema import BigqueryTable
 | 
			
		||||
from datahub.ingestion.source.ge_data_profiler import GEProfilerRequest
 | 
			
		||||
from datahub.ingestion.source.sql.sql_generic_profiler import (
 | 
			
		||||
    GenericProfiler,
 | 
			
		||||
@ -94,13 +91,10 @@ class BigqueryProfiler(GenericProfiler):
 | 
			
		||||
            partition_where_clause: str
 | 
			
		||||
 | 
			
		||||
            if not table.time_partitioning:
 | 
			
		||||
                partition_column: Optional[BigqueryColumn] = None
 | 
			
		||||
                for column in table.columns:
 | 
			
		||||
                    if column.is_partition_column:
 | 
			
		||||
                        partition_column = column
 | 
			
		||||
                        break
 | 
			
		||||
                if partition_column:
 | 
			
		||||
                    partition_where_clause = f"{partition_column.name} >= {partition}"
 | 
			
		||||
                if table.time_partitioning and table.time_partitioning.column:
 | 
			
		||||
                    partition_where_clause = (
 | 
			
		||||
                        f"{table.time_partitioning.column.name} >= {partition}"
 | 
			
		||||
                    )
 | 
			
		||||
                else:
 | 
			
		||||
                    logger.warning(
 | 
			
		||||
                        f"Partitioned table {table.name} without partiton column"
 | 
			
		||||
@ -127,16 +121,17 @@ class BigqueryProfiler(GenericProfiler):
 | 
			
		||||
                    return None, None
 | 
			
		||||
 | 
			
		||||
                # ingestion time partitoned tables partition column is not in the schema, so we default to TIMESTAMP type
 | 
			
		||||
                partition_column_type: str = "TIMESTAMP"
 | 
			
		||||
                for c in table.columns:
 | 
			
		||||
                    if c.is_partition_column:
 | 
			
		||||
                        partition_column_type = c.data_type
 | 
			
		||||
                if not table.time_partitioning.column:
 | 
			
		||||
                    logger.warning(
 | 
			
		||||
                        f"Partitioned table {table.name} without partition column, it seems like a bug in our extraction"
 | 
			
		||||
                    )
 | 
			
		||||
                    return None, None
 | 
			
		||||
 | 
			
		||||
                if table.time_partitioning.type_ in ("HOUR", "DAY", "MONTH", "YEAR"):
 | 
			
		||||
                    partition_where_clause = f"{partition_column_type}(`{table.time_partitioning.field}`) BETWEEN {partition_column_type}('{partition_datetime}') AND {partition_column_type}('{upper_bound_partition_datetime}')"
 | 
			
		||||
                if table.time_partitioning.type in ("HOUR", "DAY", "MONTH", "YEAR"):
 | 
			
		||||
                    partition_where_clause = f"{table.time_partitioning.column.data_type}(`{table.time_partitioning.field}`) BETWEEN {table.time_partitioning.column.data_type}('{partition_datetime}') AND {table.time_partitioning.column.data_type}('{upper_bound_partition_datetime}')"
 | 
			
		||||
                else:
 | 
			
		||||
                    logger.warning(
 | 
			
		||||
                        f"Not supported partition type {table.time_partitioning.type_}"
 | 
			
		||||
                        f"Not supported partition type {table.time_partitioning.type}"
 | 
			
		||||
                    )
 | 
			
		||||
                    return None, None
 | 
			
		||||
            custom_sql = """
 | 
			
		||||
@ -161,42 +156,37 @@ WHERE
 | 
			
		||||
        return None, None
 | 
			
		||||
 | 
			
		||||
    def get_workunits(
 | 
			
		||||
        self, tables: Dict[str, Dict[str, List[BigqueryTable]]]
 | 
			
		||||
        self, project_id: str, tables: Dict[str, List[BigqueryTable]]
 | 
			
		||||
    ) -> Iterable[MetadataWorkUnit]:
 | 
			
		||||
        # Otherwise, if column level profiling is enabled, use  GE profiler.
 | 
			
		||||
        for project in tables.keys():
 | 
			
		||||
            if not self.config.project_id_pattern.allowed(project):
 | 
			
		||||
                continue
 | 
			
		||||
        if not self.config.project_id_pattern.allowed(project_id):
 | 
			
		||||
            return
 | 
			
		||||
        profile_requests = []
 | 
			
		||||
 | 
			
		||||
            for dataset in tables[project]:
 | 
			
		||||
        for dataset in tables:
 | 
			
		||||
            if not self.config.schema_pattern.allowed(dataset):
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
                for table in tables[project][dataset]:
 | 
			
		||||
            for table in tables[dataset]:
 | 
			
		||||
                normalized_table_name = BigqueryTableIdentifier(
 | 
			
		||||
                        project_id=project, dataset=dataset, table=table.name
 | 
			
		||||
                    project_id=project_id, dataset=dataset, table=table.name
 | 
			
		||||
                ).get_table_name()
 | 
			
		||||
                    for column in table.columns:
 | 
			
		||||
                for column in table.columns_ignore_from_profiling:
 | 
			
		||||
                    # Profiler has issues with complex types (array, struct, geography, json), so we deny those types from profiling
 | 
			
		||||
                    # We also filter columns without data type as it means that column is part of a complex type.
 | 
			
		||||
                        if not column.data_type or any(
 | 
			
		||||
                            word in column.data_type.lower()
 | 
			
		||||
                            for word in ["array", "struct", "geography", "json"]
 | 
			
		||||
                        ):
 | 
			
		||||
                    self.config.profile_pattern.deny.append(
 | 
			
		||||
                                f"^{normalized_table_name}.{column.field_path}$"
 | 
			
		||||
                        f"^{normalized_table_name}.{column}$"
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                # Emit the profile work unit
 | 
			
		||||
                profile_request = self.get_bigquery_profile_request(
 | 
			
		||||
                        project=project, dataset=dataset, table=table
 | 
			
		||||
                    project=project_id, dataset=dataset, table=table
 | 
			
		||||
                )
 | 
			
		||||
                if profile_request is not None:
 | 
			
		||||
                    profile_requests.append(profile_request)
 | 
			
		||||
 | 
			
		||||
        if len(profile_requests) == 0:
 | 
			
		||||
                continue
 | 
			
		||||
            return
 | 
			
		||||
        yield from self.generate_wu_from_profile_requests(profile_requests)
 | 
			
		||||
 | 
			
		||||
    def generate_wu_from_profile_requests(
 | 
			
		||||
@ -257,7 +247,7 @@ WHERE
 | 
			
		||||
                + 1
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        if not table.columns:
 | 
			
		||||
        if not table.column_count:
 | 
			
		||||
            skip_profiling = True
 | 
			
		||||
 | 
			
		||||
        if skip_profiling:
 | 
			
		||||
 | 
			
		||||
@ -887,6 +887,7 @@ class SnowflakeV2Source(
 | 
			
		||||
    def fetch_columns_for_table(self, table, schema_name, db_name, table_identifier):
 | 
			
		||||
        try:
 | 
			
		||||
            table.columns = self.get_columns_for_table(table.name, schema_name, db_name)
 | 
			
		||||
            table.column_count = len(table.columns)
 | 
			
		||||
            if self.config.extract_tags != TagOption.skip:
 | 
			
		||||
                table.column_tags = self.tag_extractor.get_column_tags_for_table(
 | 
			
		||||
                    table.name, schema_name, db_name
 | 
			
		||||
 | 
			
		||||
@ -1,6 +1,6 @@
 | 
			
		||||
from dataclasses import dataclass, field
 | 
			
		||||
from dataclasses import dataclass
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
from typing import Generic, List, Optional, TypeVar
 | 
			
		||||
from typing import Generic, Optional, TypeVar
 | 
			
		||||
 | 
			
		||||
from pydantic.fields import Field
 | 
			
		||||
 | 
			
		||||
@ -37,7 +37,7 @@ class BaseTable(Generic[SqlTableColumn]):
 | 
			
		||||
    last_altered: Optional[datetime]
 | 
			
		||||
    size_in_bytes: Optional[int]
 | 
			
		||||
    rows_count: Optional[int]
 | 
			
		||||
    columns: List[SqlTableColumn] = field(default_factory=list)
 | 
			
		||||
    column_count: Optional[int] = None
 | 
			
		||||
    ddl: Optional[str] = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -50,7 +50,7 @@ class BaseView(Generic[SqlTableColumn]):
 | 
			
		||||
    view_definition: str
 | 
			
		||||
    size_in_bytes: Optional[int] = None
 | 
			
		||||
    rows_count: Optional[int] = None
 | 
			
		||||
    columns: List[SqlTableColumn] = field(default_factory=list)
 | 
			
		||||
    column_count: Optional[int] = None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SQLAlchemyGenericConfig(SQLAlchemyConfig):
 | 
			
		||||
 | 
			
		||||
@ -74,7 +74,7 @@ class GenericProfiler:
 | 
			
		||||
        for request in table_level_profile_requests:
 | 
			
		||||
            profile = DatasetProfile(
 | 
			
		||||
                timestampMillis=int(datetime.now().timestamp() * 1000),
 | 
			
		||||
                columnCount=len(request.table.columns),
 | 
			
		||||
                columnCount=request.table.column_count,
 | 
			
		||||
                rowCount=request.table.rows_count,
 | 
			
		||||
                sizeInBytes=request.table.size_in_bytes,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user