fix(ingest): run sqllineage in process by default (#11650)

This commit is contained in:
Harshal Sheth 2024-10-16 20:47:48 -07:00 committed by GitHub
parent b8144699fd
commit 8b42ac8cde
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 13 additions and 2883 deletions

View File

@ -124,9 +124,6 @@ class LookMLSourceConfig(
description="List of regex patterns for LookML views to include in the extraction.", description="List of regex patterns for LookML views to include in the extraction.",
) )
parse_table_names_from_sql: bool = Field(True, description="See note below.") parse_table_names_from_sql: bool = Field(True, description="See note below.")
sql_parser: str = Field(
"datahub.utilities.sql_parser.DefaultSQLParser", description="See note below."
)
api: Optional[LookerAPIConfig] api: Optional[LookerAPIConfig]
project_name: Optional[str] = Field( project_name: Optional[str] = Field(
None, None,

View File

@ -2,7 +2,6 @@ import logging
import math import math
import sys import sys
from dataclasses import dataclass, field from dataclasses import dataclass, field
from multiprocessing.pool import ThreadPool
from typing import Dict, Iterable, List, Optional, Set, Type from typing import Dict, Iterable, List, Optional, Set, Type
import dateutil.parser as dp import dateutil.parser as dp
@ -43,6 +42,7 @@ from datahub.metadata.schema_classes import (
from datahub.utilities.lossy_collections import LossyDict, LossyList from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.sql_parser import SQLParser from datahub.utilities.sql_parser import SQLParser
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
@ -646,11 +646,11 @@ class RedashSource(Source):
self.report.total_dashboards = total_dashboards self.report.total_dashboards = total_dashboards
self.report.max_page_dashboards = max_page self.report.max_page_dashboards = max_page
dash_exec_pool = ThreadPool(self.config.parallelism) yield from ThreadedIteratorExecutor.process(
for response in dash_exec_pool.imap_unordered( self._process_dashboard_response,
self._process_dashboard_response, range(1, max_page + 1) [(page,) for page in range(1, max_page + 1)],
): max_workers=self.config.parallelism,
yield from response )
def _get_chart_type_from_viz_data(self, viz_data: Dict) -> str: def _get_chart_type_from_viz_data(self, viz_data: Dict) -> str:
""" """
@ -769,11 +769,12 @@ class RedashSource(Source):
logger.info(f"/api/queries total count {total_queries} and max page {max_page}") logger.info(f"/api/queries total count {total_queries} and max page {max_page}")
self.report.total_queries = total_queries self.report.total_queries = total_queries
self.report.max_page_queries = max_page self.report.max_page_queries = max_page
chart_exec_pool = ThreadPool(self.config.parallelism)
for response in chart_exec_pool.imap_unordered( yield from ThreadedIteratorExecutor.process(
self._process_query_response, range(1, max_page + 1) self._process_query_response,
): [(page,) for page in range(1, max_page + 1)],
yield from response max_workers=self.config.parallelism,
)
def add_config_to_report(self) -> None: def add_config_to_report(self) -> None:
self.report.api_page_limit = self.config.api_page_limit self.report.api_page_limit = self.config.api_page_limit

View File

@ -46,7 +46,7 @@ class SqlLineageSQLParser(SQLParser):
def __init__( def __init__(
self, self,
sql_query: str, sql_query: str,
use_external_process: bool = True, use_external_process: bool = False,
use_raw_names: bool = False, use_raw_names: bool = False,
) -> None: ) -> None:
super().__init__(sql_query, use_external_process) super().__init__(sql_query, use_external_process)

View File

@ -10,7 +10,6 @@ from deepdiff import DeepDiff
from freezegun import freeze_time from freezegun import freeze_time
from looker_sdk.sdk.api40.models import DBConnection from looker_sdk.sdk.api40.models import DBConnection
from datahub.configuration.common import PipelineExecutionError
from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.file import read_metadata_file from datahub.ingestion.source.file import read_metadata_file
from datahub.ingestion.source.looker.looker_template_language import ( from datahub.ingestion.source.looker.looker_template_language import (
@ -518,53 +517,6 @@ def ingestion_test(
) )
@freeze_time(FROZEN_TIME)
def test_lookml_bad_sql_parser(pytestconfig, tmp_path, mock_time):
"""Incorrect specification of sql parser should not fail ingestion"""
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out = "lookml_mces_badsql_parser.json"
pipeline = Pipeline.create(
{
"run_id": "lookml-test",
"source": {
"type": "lookml",
"config": {
"base_folder": str(test_resources_dir / "lkml_samples"),
"connection_to_platform_map": {
"my_connection": {
"platform": "snowflake",
"default_db": "default_db",
"default_schema": "default_schema",
}
},
"parse_table_names_from_sql": True,
"project_name": "lkml_samples",
"sql_parser": "bad.sql.Parser",
"emit_reachable_views_only": False,
"process_refinements": False,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/{mce_out}",
},
},
}
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=False)
with pytest.raises(PipelineExecutionError): # we expect the source to have warnings
pipeline.raise_from_status(raise_warnings=True)
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out,
golden_path=test_resources_dir / mce_out,
)
@freeze_time(FROZEN_TIME) @freeze_time(FROZEN_TIME)
def test_lookml_git_info(pytestconfig, tmp_path, mock_time): def test_lookml_git_info(pytestconfig, tmp_path, mock_time):
"""Add github info to config""" """Add github info to config"""