mirror of
https://github.com/datahub-project/datahub.git
synced 2025-10-27 00:40:06 +00:00
refactor(ingest): bigquery-lineage - allow tables and datasets in uppercase (#6739)
This commit is contained in:
parent
68fd802881
commit
f0a371941e
@ -93,6 +93,11 @@ class BigQueryV2Config(BigQueryConfig, LineageConfig):
|
||||
description="Sql parse view ddl to get lineage.",
|
||||
)
|
||||
|
||||
lineage_sql_parser_use_raw_names: bool = Field(
|
||||
default=False,
|
||||
description="This parameter ignores the lowercase pattern stipulated in the SQLParser. NOTE: Ignored if lineage_use_sql_parser is False.",
|
||||
)
|
||||
|
||||
convert_urns_to_lowercase: bool = Field(
|
||||
default=False,
|
||||
description="Convert urns to lowercase.",
|
||||
|
||||
@ -432,7 +432,9 @@ timestamp < "{end_time}"
|
||||
# to ensure we only use direct objects accessed for lineage
|
||||
try:
|
||||
parser = BigQuerySQLParser(
|
||||
e.query, self.config.sql_parser_use_external_process
|
||||
e.query,
|
||||
self.config.sql_parser_use_external_process,
|
||||
use_raw_names=self.config.lineage_sql_parser_use_raw_names,
|
||||
)
|
||||
referenced_objs = set(
|
||||
map(lambda x: x.split(".")[-1], parser.get_tables())
|
||||
@ -471,7 +473,9 @@ timestamp < "{end_time}"
|
||||
if view.view_definition:
|
||||
try:
|
||||
parser = BigQuerySQLParser(
|
||||
view.view_definition, self.config.sql_parser_use_external_process
|
||||
view.view_definition,
|
||||
self.config.sql_parser_use_external_process,
|
||||
use_raw_names=self.config.lineage_sql_parser_use_raw_names,
|
||||
)
|
||||
tables = parser.get_tables()
|
||||
except Exception as ex:
|
||||
|
||||
@ -9,11 +9,18 @@ from datahub.utilities.sql_parser import SqlLineageSQLParser, SQLParser
|
||||
class BigQuerySQLParser(SQLParser):
|
||||
parser: SQLParser
|
||||
|
||||
def __init__(self, sql_query: str, use_external_process: bool = False) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
sql_query: str,
|
||||
use_external_process: bool = False,
|
||||
use_raw_names: bool = False,
|
||||
) -> None:
|
||||
super().__init__(sql_query)
|
||||
|
||||
self._parsed_sql_query = self.parse_sql_query(sql_query)
|
||||
self.parser = SqlLineageSQLParser(self._parsed_sql_query, use_external_process)
|
||||
self.parser = SqlLineageSQLParser(
|
||||
self._parsed_sql_query, use_external_process, use_raw_names
|
||||
)
|
||||
|
||||
def parse_sql_query(self, sql_query: str) -> str:
|
||||
sql_query = BigQuerySQLParser._parse_bigquery_comment_sign(sql_query)
|
||||
|
||||
@ -28,9 +28,10 @@ class SqlLineageSQLParserImpl(SQLParser):
|
||||
_MYVIEW_SQL_TABLE_NAME_TOKEN = "__my_view__.__sql_table_name__"
|
||||
_MYVIEW_LOOKER_TOKEN = "my_view.SQL_TABLE_NAME"
|
||||
|
||||
def __init__(self, sql_query: str) -> None:
|
||||
def __init__(self, sql_query: str, use_raw_names: bool = False) -> None:
|
||||
super().__init__(sql_query)
|
||||
original_sql_query = sql_query
|
||||
self._use_raw_names = use_raw_names
|
||||
|
||||
# SqlLineageParser makes mistakes on lateral flatten queries, use the prefix
|
||||
if "lateral flatten" in sql_query:
|
||||
@ -110,7 +111,13 @@ class SqlLineageSQLParserImpl(SQLParser):
|
||||
logger.error("sql holder not present so cannot get tables")
|
||||
return result
|
||||
for table in self._sql_holder.source_tables:
|
||||
table_normalized = re.sub(r"^<default>.", "", str(table))
|
||||
table_normalized = re.sub(
|
||||
r"^<default>.",
|
||||
"",
|
||||
str(table)
|
||||
if not self._use_raw_names
|
||||
else f"{table.schema.raw_name}.{table.raw_name}",
|
||||
)
|
||||
result.append(str(table_normalized))
|
||||
|
||||
# We need to revert TOKEN replacements
|
||||
|
||||
@ -68,8 +68,7 @@ class MetadataSQLSQLParser(SQLParser):
|
||||
|
||||
|
||||
def sql_lineage_parser_impl_func_wrapper(
|
||||
queue: Optional[multiprocessing.Queue],
|
||||
sql_query: str,
|
||||
queue: Optional[multiprocessing.Queue], sql_query: str, use_raw_names: bool = False
|
||||
) -> Optional[Tuple[List[str], List[str], Any]]:
|
||||
"""
|
||||
The wrapper function that computes the tables and columns using the SqlLineageSQLParserImpl
|
||||
@ -78,13 +77,14 @@ def sql_lineage_parser_impl_func_wrapper(
|
||||
the sqllineage module.
|
||||
:param queue: The shared IPC queue on to which the results will be put.
|
||||
:param sql_query: The SQL query to extract the tables & columns from.
|
||||
:param use_raw_names: Parameter used to ignore sqllineage's default lowercasing.
|
||||
:return: None.
|
||||
"""
|
||||
exception_details: Optional[Tuple[Optional[Type[BaseException]], str]] = None
|
||||
tables: List[str] = []
|
||||
columns: List[str] = []
|
||||
try:
|
||||
parser = SqlLineageSQLParserImpl(sql_query)
|
||||
parser = SqlLineageSQLParserImpl(sql_query, use_raw_names)
|
||||
tables = parser.get_tables()
|
||||
columns = parser.get_columns()
|
||||
except BaseException:
|
||||
@ -101,14 +101,21 @@ def sql_lineage_parser_impl_func_wrapper(
|
||||
|
||||
|
||||
class SqlLineageSQLParser(SQLParser):
|
||||
def __init__(self, sql_query: str, use_external_process: bool = True) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
sql_query: str,
|
||||
use_external_process: bool = True,
|
||||
use_raw_names: bool = False,
|
||||
) -> None:
|
||||
super().__init__(sql_query, use_external_process)
|
||||
if use_external_process:
|
||||
self.tables, self.columns = self._get_tables_columns_process_wrapped(
|
||||
sql_query
|
||||
sql_query, use_raw_names
|
||||
)
|
||||
else:
|
||||
return_tuple = sql_lineage_parser_impl_func_wrapper(None, sql_query)
|
||||
return_tuple = sql_lineage_parser_impl_func_wrapper(
|
||||
None, sql_query, use_raw_names
|
||||
)
|
||||
if return_tuple is not None:
|
||||
(
|
||||
self.tables,
|
||||
@ -118,7 +125,7 @@ class SqlLineageSQLParser(SQLParser):
|
||||
|
||||
@staticmethod
|
||||
def _get_tables_columns_process_wrapped(
|
||||
sql_query: str,
|
||||
sql_query: str, use_raw_names: bool = False
|
||||
) -> Tuple[List[str], List[str]]:
|
||||
# Invoke sql_lineage_parser_impl_func_wrapper in a separate process to avoid
|
||||
# memory leaks from sqllineage module used by SqlLineageSQLParserImpl. This will help
|
||||
@ -127,10 +134,7 @@ class SqlLineageSQLParser(SQLParser):
|
||||
queue: multiprocessing.Queue = Queue()
|
||||
process: multiprocessing.Process = Process(
|
||||
target=sql_lineage_parser_impl_func_wrapper,
|
||||
args=(
|
||||
queue,
|
||||
sql_query,
|
||||
),
|
||||
args=(queue, sql_query, use_raw_names),
|
||||
)
|
||||
process.start()
|
||||
tables, columns, exception_details = queue.get(block=True)
|
||||
|
||||
@ -20,6 +20,175 @@ FROM `project.dataset.src_tbl`
|
||||
assert parser.get_tables() == ["project.dataset.src_tbl"]
|
||||
|
||||
|
||||
def test_bigquery_sql_lineage_camel_case_table():
|
||||
"""
|
||||
This test aims to test the parameter to ignore sqllineage lowercasing.
|
||||
On the BigQuery service, it's possible to use uppercase name un datasets and tables.
|
||||
The lowercasing, by default, breaks the lineage construction in these cases.
|
||||
"""
|
||||
parser = BigQuerySQLParser(
|
||||
sql_query="""
|
||||
/*
|
||||
HERE IS A STANDARD COMMENT BLOCK
|
||||
THIS WILL NOT BREAK sqllineage
|
||||
*/
|
||||
CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS
|
||||
#This, comment will not break sqllineage
|
||||
SELECT foo, bar
|
||||
-- this comment will not break sqllineage either
|
||||
# this comment will not break sqllineage either
|
||||
FROM `project.dataset.CamelCaseTable`
|
||||
""",
|
||||
use_raw_names=True,
|
||||
)
|
||||
|
||||
assert parser.get_tables() == ["project.dataset.CamelCaseTable"]
|
||||
|
||||
|
||||
def test_bigquery_sql_lineage_camel_case_dataset():
|
||||
"""
|
||||
This test aims to test the parameter to ignore sqllineage lowercasing.
|
||||
On the BigQuery service, it's possible to use uppercase name un datasets and tables.
|
||||
The lowercasing, by default, breaks the lineage construction in these cases.
|
||||
"""
|
||||
parser = BigQuerySQLParser(
|
||||
sql_query="""
|
||||
/*
|
||||
HERE IS A STANDARD COMMENT BLOCK
|
||||
THIS WILL NOT BREAK sqllineage
|
||||
*/
|
||||
CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS
|
||||
#This, comment will not break sqllineage
|
||||
SELECT foo, bar
|
||||
-- this comment will not break sqllineage either
|
||||
# this comment will not break sqllineage either
|
||||
FROM `project.DataSet.table`
|
||||
""",
|
||||
use_raw_names=True,
|
||||
)
|
||||
|
||||
assert parser.get_tables() == ["project.DataSet.table"]
|
||||
|
||||
|
||||
def test_bigquery_sql_lineage_camel_case_table_and_dataset():
|
||||
"""
|
||||
This test aims to test the parameter to ignore sqllineage lowercasing.
|
||||
On the BigQuery service, it's possible to use uppercase name un datasets and tables.
|
||||
The lowercasing, by default, breaks the lineage construction in these cases.
|
||||
"""
|
||||
parser = BigQuerySQLParser(
|
||||
sql_query="""
|
||||
/*
|
||||
HERE IS A STANDARD COMMENT BLOCK
|
||||
THIS WILL NOT BREAK sqllineage
|
||||
*/
|
||||
CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS
|
||||
#This, comment will not break sqllineage
|
||||
SELECT foo, bar
|
||||
-- this comment will not break sqllineage either
|
||||
# this comment will not break sqllineage either
|
||||
FROM `project.DataSet.CamelTable`
|
||||
""",
|
||||
use_raw_names=True,
|
||||
)
|
||||
|
||||
assert parser.get_tables() == ["project.DataSet.CamelTable"]
|
||||
|
||||
|
||||
def test_bigquery_sql_lineage_camel_case_table_and_dataset_subquery():
|
||||
"""
|
||||
This test aims to test the parameter to ignore sqllineage lowercasing.
|
||||
On the BigQuery service, it's possible to use uppercase name un datasets and tables.
|
||||
The lowercasing, by default, breaks the lineage construction in these cases.
|
||||
"""
|
||||
parser = BigQuerySQLParser(
|
||||
sql_query="""
|
||||
/*
|
||||
HERE IS A STANDARD COMMENT BLOCK
|
||||
THIS WILL NOT BREAK sqllineage
|
||||
*/
|
||||
CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS
|
||||
#This, comment will not break sqllineage
|
||||
SELECT foo, bar
|
||||
-- this comment will not break sqllineage either
|
||||
# this comment will not break sqllineage either
|
||||
FROM (
|
||||
# this comment will not break sqllineage either
|
||||
SELECT * FROM `project.DataSet.CamelTable`
|
||||
)
|
||||
""",
|
||||
use_raw_names=True,
|
||||
)
|
||||
|
||||
assert parser.get_tables() == ["project.DataSet.CamelTable"]
|
||||
|
||||
|
||||
def test_bigquery_sql_lineage_camel_case_table_and_dataset_joins():
|
||||
"""
|
||||
This test aims to test the parameter to ignore sqllineage lowercasing.
|
||||
On the BigQuery service, it's possible to use uppercase name un datasets and tables.
|
||||
The lowercasing, by default, breaks the lineage construction in these cases.
|
||||
"""
|
||||
parser = BigQuerySQLParser(
|
||||
sql_query="""
|
||||
/*
|
||||
HERE IS A STANDARD COMMENT BLOCK
|
||||
THIS WILL NOT BREAK sqllineage
|
||||
*/
|
||||
CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS
|
||||
#This, comment will not break sqllineage
|
||||
SELECT foo, bar
|
||||
-- this comment will not break sqllineage either
|
||||
# this comment will not break sqllineage either
|
||||
FROM `project.DataSet1.CamelTable`
|
||||
INNER JOIN `project.DataSet2.CamelTable2`
|
||||
ON b.id = a.id
|
||||
LEFT JOIN `project.DataSet3.CamelTable3`
|
||||
on c.id = b.id
|
||||
""",
|
||||
use_raw_names=True,
|
||||
)
|
||||
|
||||
assert parser.get_tables() == [
|
||||
"project.DataSet1.CamelTable",
|
||||
"project.DataSet2.CamelTable2",
|
||||
"project.DataSet3.CamelTable3",
|
||||
]
|
||||
|
||||
|
||||
def test_bigquery_sql_lineage_camel_case_table_and_dataset_joins_and_subquery():
|
||||
"""
|
||||
This test aims to test the parameter to ignore sqllineage lowercasing.
|
||||
On the BigQuery service, it's possible to use uppercase name un datasets and tables.
|
||||
The lowercasing, by default, breaks the lineage construction in these cases.
|
||||
"""
|
||||
parser = BigQuerySQLParser(
|
||||
sql_query="""
|
||||
/*
|
||||
HERE IS A STANDARD COMMENT BLOCK
|
||||
THIS WILL NOT BREAK sqllineage
|
||||
*/
|
||||
CREATE OR REPLACE TABLE `project.dataset.trg_tbl`AS
|
||||
#This, comment will not break sqllineage
|
||||
SELECT foo, bar
|
||||
-- this comment will not break sqllineage either
|
||||
# this comment will not break sqllineage either
|
||||
FROM `project.DataSet1.CamelTable` a
|
||||
INNER JOIN `project.DataSet2.CamelTable2` b
|
||||
ON b.id = a.id
|
||||
LEFT JOIN (SELECT * FROM `project.DataSet3.CamelTable3`) c
|
||||
ON c.id = b.id
|
||||
""",
|
||||
use_raw_names=True,
|
||||
)
|
||||
|
||||
assert parser.get_tables() == [
|
||||
"project.DataSet1.CamelTable",
|
||||
"project.DataSet2.CamelTable2",
|
||||
"project.DataSet3.CamelTable3",
|
||||
]
|
||||
|
||||
|
||||
def test_bigquery_sql_lineage_keyword_data_is_accepted():
|
||||
parser = BigQuerySQLParser(
|
||||
sql_query="""
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user