fix(ingest/redshift): Enabling autocommit for Redshift connection (#7983)

This commit is contained in:
Tamas Nemeth 2023-05-08 10:24:40 +02:00 committed by GitHub
parent 1f88dab6f9
commit 0e69e5a810
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 136 additions and 25 deletions

View File

@ -107,7 +107,17 @@ class RedshiftLineageExtractor:
parser = LineageRunner(query)
for table in parser.source_tables:
source_schema, source_table = str(table).split(".")
split = str(table).split(".")
if len(split) == 3:
db_name, source_schema, source_table = split
elif len(split) == 2:
source_schema, source_table = split
else:
raise ValueError(
f"Invalid table name {table} in query {query}. "
f"Expected format: [db_name].[schema].[table] or [schema].[table] or [table]."
)
if source_schema == "<default>":
source_schema = str(self.config.default_schema)
@ -149,22 +159,19 @@ class RedshiftLineageExtractor:
try:
sources = self._get_sources_from_query(db_name=db_name, query=ddl)
except Exception as e:
self.warn(
logger,
"parsing-query",
f"Error parsing query {ddl} for getting lineage ."
f"\nError was {e}.",
logger.warning(
f"Error parsing query {ddl} for getting lineage. Error was {e}."
)
self.report.num_lineage_dropped_query_parser += 1
else:
if lineage_type == lineage_type.COPY and filename is not None:
platform = LineageDatasetPlatform.S3
path = filename.strip()
if urlparse(path).scheme != "s3":
self.warn(
logger,
"non-s3-lineage",
f"Only s3 source supported with copy. The source was: {path}.",
logger.warning(
"Only s3 source supported with copy. The source was: {path}."
)
self.report.num_lineage_dropped_not_support_copy_path += 1
return sources
path = strip_s3_prefix(self._get_s3_path(path))
elif source_schema is not None and source_table is not None:
@ -316,11 +323,10 @@ class RedshiftLineageExtractor:
or schema not in all_tables[db]
or not any(table == t.name for t in all_tables[db][schema])
):
self.warn(
logger,
"missing-table",
f"{source.path} missing table",
logger.debug(
f"{source.path} missing table, dropping from lineage.",
)
self.report.num_lineage_tables_dropped += 1
continue
targe_source.append(source)

View File

@ -4,7 +4,7 @@ from datetime import datetime
from typing import Dict, Iterable, List, Optional, Union, cast
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mcp_builder import wrap_aspect_as_workunit
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.ge_data_profiler import GEProfilerRequest
from datahub.ingestion.source.redshift.config import RedshiftConfig
@ -49,7 +49,6 @@ class RedshiftProfiler(GenericProfiler):
Dict[str, Dict[str, List[RedshiftView]]],
],
) -> Iterable[MetadataWorkUnit]:
# Extra default SQLAlchemy option for better connection pooling and threading.
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
if self.config.profiling.enabled:
@ -63,7 +62,6 @@ class RedshiftProfiler(GenericProfiler):
if not self.config.schema_pattern.allowed(schema):
continue
for table in tables[db].get(schema, {}):
# Emit the profile work unit
profile_request = self.get_redshift_profile_request(
table, schema, db
@ -100,12 +98,9 @@ class RedshiftProfiler(GenericProfiler):
dataset_urn, int(datetime.now().timestamp() * 1000)
)
yield wrap_aspect_as_workunit(
"dataset",
dataset_urn,
"datasetProfile",
profile,
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=profile
).as_workunit()
def get_redshift_profile_request(
self,

View File

@ -339,7 +339,7 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
) -> redshift_connector.Connection:
client_options = config.extra_client_options
host, port = config.host_port.split(":")
return redshift_connector.connect(
conn = redshift_connector.connect(
host=host,
port=int(port),
user=config.username,
@ -348,6 +348,10 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
**client_options,
)
conn.autocommit = True
return conn
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
@ -494,7 +498,6 @@ class RedshiftSource(StatefulIngestionSourceBase, TestableSource):
logger.info("process views")
if schema.name in self.db_views[schema.database]:
for view in self.db_views[schema.database][schema.name]:
logger.info(f"View: {view}")
view.columns = schema_columns[schema.name].get(view.name, [])
yield from self._process_view(
table=view, database=database, schema=schema

View File

@ -24,6 +24,9 @@ class RedshiftReport(ProfilingSqlReport):
views_in_mem_size: Dict[str, str] = field(default_factory=TopKDict)
num_operational_stats_skipped: int = 0
num_usage_stat_skipped: int = 0
num_lineage_tables_dropped: int = 0
num_lineage_dropped_query_parser: int = 0
num_lineage_dropped_not_support_copy_path: int = 0
def report_dropped(self, key: str) -> None:
self.filtered.append(key)

View File

@ -0,0 +1,104 @@
from datahub.ingestion.source.redshift.config import RedshiftConfig
from datahub.ingestion.source.redshift.lineage import RedshiftLineageExtractor
from datahub.ingestion.source.redshift.report import RedshiftReport
def test_get_sources_from_query():
config = RedshiftConfig(host_port="localhost:5439", database="test")
report = RedshiftReport()
test_query = """
select * from my_schema.my_table
"""
lineage_extractor = RedshiftLineageExtractor(config, report)
lineage_datasets = lineage_extractor._get_sources_from_query(
db_name="test", query=test_query
)
assert len(lineage_datasets) == 1
lineage = lineage_datasets[0]
assert lineage.path == "test.my_schema.my_table"
def test_get_sources_from_query_with_only_table_name():
config = RedshiftConfig(host_port="localhost:5439", database="test")
report = RedshiftReport()
test_query = """
select * from my_table
"""
lineage_extractor = RedshiftLineageExtractor(config, report)
lineage_datasets = lineage_extractor._get_sources_from_query(
db_name="test", query=test_query
)
assert len(lineage_datasets) == 1
lineage = lineage_datasets[0]
assert lineage.path == "test.public.my_table"
def test_get_sources_from_query_with_database():
config = RedshiftConfig(host_port="localhost:5439", database="test")
report = RedshiftReport()
test_query = """
select * from test.my_schema.my_table
"""
lineage_extractor = RedshiftLineageExtractor(config, report)
lineage_datasets = lineage_extractor._get_sources_from_query(
db_name="test", query=test_query
)
assert len(lineage_datasets) == 1
lineage = lineage_datasets[0]
assert lineage.path == "test.my_schema.my_table"
def test_get_sources_from_query_with_non_default_database():
config = RedshiftConfig(host_port="localhost:5439", database="test")
report = RedshiftReport()
test_query = """
select * from test2.my_schema.my_table
"""
lineage_extractor = RedshiftLineageExtractor(config, report)
lineage_datasets = lineage_extractor._get_sources_from_query(
db_name="test", query=test_query
)
assert len(lineage_datasets) == 1
lineage = lineage_datasets[0]
assert lineage.path == "test2.my_schema.my_table"
def test_get_sources_from_query_with_only_table():
config = RedshiftConfig(host_port="localhost:5439", database="test")
report = RedshiftReport()
test_query = """
select * from my_table
"""
lineage_extractor = RedshiftLineageExtractor(config, report)
lineage_datasets = lineage_extractor._get_sources_from_query(
db_name="test", query=test_query
)
assert len(lineage_datasets) == 1
lineage = lineage_datasets[0]
assert lineage.path == "test.public.my_table"
def test_get_sources_from_query_with_four_part_table_should_throw_exception():
config = RedshiftConfig(host_port="localhost:5439", database="test")
report = RedshiftReport()
test_query = """
select * from database.schema.my_table.test
"""
lineage_extractor = RedshiftLineageExtractor(config, report)
try:
lineage_extractor._get_sources_from_query(db_name="test", query=test_query)
except ValueError:
pass
assert f"{test_query} should have thrown a ValueError exception but it didn't"