mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-26 09:26:22 +00:00
fix(ingest):lookml - better column-level lineage, hive urn generation… (#6254)
This commit is contained in:
parent
67570763dd
commit
26b4a9eec4
@ -3,6 +3,8 @@
|
||||
This file documents any backwards-incompatible changes in DataHub and assists people when migrating to a new version.
|
||||
|
||||
## Next
|
||||
- LookML source will only emit views that are reachable from explores while scanning your git repo. Previous behavior can be achieved by setting `emit_reachable_views_only` to False.
|
||||
- LookML source will always lowercase urns for lineage edges from views to upstream tables. There is no fallback provided to previous behavior because it was inconsistent in application of lower-casing earlier.
|
||||
|
||||
### Breaking Changes
|
||||
- Java version 11 or greater is required.
|
||||
|
||||
@ -15,6 +15,7 @@ class GitClone:
|
||||
def __init__(self, tmp_dir: str, skip_known_host_verification: bool = True):
|
||||
self.tmp_dir = tmp_dir
|
||||
self.skip_known_host_verification = skip_known_host_verification
|
||||
self.last_repo_cloned: Optional[git.Repo] = None
|
||||
|
||||
def clone(self, ssh_key: Optional[SecretStr], repo_url: str) -> Path:
|
||||
unique_dir = str(uuid4())
|
||||
@ -51,10 +52,13 @@ class GitClone:
|
||||
)
|
||||
logger.debug("ssh_command=%s", git_ssh_cmd)
|
||||
logger.info(f"⏳ Cloning repo '{repo_url}', this can take some time...")
|
||||
git.Repo.clone_from(
|
||||
self.last_repo_cloned = git.Repo.clone_from(
|
||||
repo_url,
|
||||
checkout_dir,
|
||||
env=dict(GIT_SSH_COMMAND=git_ssh_cmd),
|
||||
)
|
||||
logger.info("✅ Cloning complete!")
|
||||
return pathlib.Path(checkout_dir)
|
||||
|
||||
def get_last_repo_cloned(self) -> Optional[git.Repo]:
|
||||
return self.last_repo_cloned
|
||||
|
||||
@ -248,7 +248,7 @@ class ViewField:
|
||||
description: str
|
||||
field_type: ViewFieldType
|
||||
is_primary_key: bool = False
|
||||
upstream_field: Optional[str] = None
|
||||
upstream_fields: List[str] = dataclasses_field(default_factory=list)
|
||||
|
||||
|
||||
class LookerUtil:
|
||||
@ -673,7 +673,7 @@ class LookerExplore:
|
||||
is_primary_key=dim_field.primary_key
|
||||
if dim_field.primary_key
|
||||
else False,
|
||||
upstream_field=dim_field.name,
|
||||
upstream_fields=[dim_field.name],
|
||||
)
|
||||
)
|
||||
if explore.fields.measures is not None:
|
||||
@ -695,7 +695,7 @@ class LookerExplore:
|
||||
is_primary_key=measure_field.primary_key
|
||||
if measure_field.primary_key
|
||||
else False,
|
||||
upstream_field=measure_field.name,
|
||||
upstream_fields=[measure_field.name],
|
||||
)
|
||||
)
|
||||
|
||||
@ -818,27 +818,25 @@ class LookerExplore:
|
||||
fine_grained_lineages = []
|
||||
if config.extract_column_level_lineage:
|
||||
for field in self.fields or []:
|
||||
if (
|
||||
field.upstream_field
|
||||
and len(field.upstream_field.split(".")) >= 2
|
||||
):
|
||||
(view_name, field_path) = field.upstream_field.split(".")[
|
||||
0
|
||||
], ".".join(field.upstream_field.split(".")[1:])
|
||||
assert view_name
|
||||
view_urn = view_name_to_urn_map.get(view_name, "")
|
||||
if view_urn:
|
||||
fine_grained_lineages.append(
|
||||
FineGrainedLineageClass(
|
||||
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
|
||||
downstreamType=FineGrainedLineageDownstreamType.FIELD,
|
||||
upstreams=[
|
||||
builder.make_schema_field_urn(
|
||||
view_urn, field_path
|
||||
)
|
||||
],
|
||||
for upstream_field in field.upstream_fields:
|
||||
if len(upstream_field.split(".")) >= 2:
|
||||
(view_name, field_path) = upstream_field.split(".")[
|
||||
0
|
||||
], ".".join(upstream_field.split(".")[1:])
|
||||
assert view_name
|
||||
view_urn = view_name_to_urn_map.get(view_name, "")
|
||||
if view_urn:
|
||||
fine_grained_lineages.append(
|
||||
FineGrainedLineageClass(
|
||||
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
|
||||
downstreamType=FineGrainedLineageDownstreamType.FIELD,
|
||||
upstreams=[
|
||||
builder.make_schema_field_urn(
|
||||
view_urn, field_path
|
||||
)
|
||||
],
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
upstream_lineage = UpstreamLineage(
|
||||
upstreams=upstreams, fineGrainedLineages=fine_grained_lineages or None
|
||||
|
||||
@ -193,9 +193,13 @@ class LookMLSourceConfig(LookerCommonConfig):
|
||||
description="When extracting the view definition from a lookml file, the maximum number of characters to extract.",
|
||||
)
|
||||
emit_reachable_views_only: bool = Field(
|
||||
False,
|
||||
True,
|
||||
description="When enabled, only views that are reachable from explores defined in the model files are emitted",
|
||||
)
|
||||
populate_sql_logic_for_missing_descriptions: bool = Field(
|
||||
False,
|
||||
description="When enabled, field descriptions will include the sql logic for computed fields if descriptions are missing",
|
||||
)
|
||||
|
||||
@validator("platform_instance")
|
||||
def platform_instance_not_supported(cls, v: str) -> str:
|
||||
@ -663,27 +667,33 @@ class LookerView:
|
||||
field_list: List[Dict],
|
||||
type_cls: ViewFieldType,
|
||||
extract_column_level_lineage: bool,
|
||||
populate_sql_logic_in_descriptions: bool,
|
||||
) -> List[ViewField]:
|
||||
fields = []
|
||||
for field_dict in field_list:
|
||||
is_primary_key = field_dict.get("primary_key", "no") == "yes"
|
||||
name = field_dict["name"]
|
||||
native_type = field_dict.get("type", "string")
|
||||
description = field_dict.get("description", "")
|
||||
default_description = (
|
||||
f"sql:{field_dict['sql']}"
|
||||
if "sql" in field_dict and populate_sql_logic_in_descriptions
|
||||
else ""
|
||||
)
|
||||
description = field_dict.get("description", default_description)
|
||||
label = field_dict.get("label", "")
|
||||
upstream_field = None
|
||||
upstream_fields = []
|
||||
if type_cls == ViewFieldType.DIMENSION and extract_column_level_lineage:
|
||||
if field_dict.get("sql") is not None:
|
||||
upstream_field_match = re.match(
|
||||
r"^.*\${TABLE}\.(\w+)", field_dict["sql"]
|
||||
)
|
||||
if upstream_field_match:
|
||||
for upstream_field_match in re.finditer(
|
||||
r"\${TABLE}\.[\"]*([\.\w]+)", field_dict["sql"]
|
||||
):
|
||||
matched_field = upstream_field_match.group(1)
|
||||
# Remove quotes from field names
|
||||
matched_field = (
|
||||
matched_field.replace('"', "").replace("`", "").lower()
|
||||
)
|
||||
upstream_field = matched_field
|
||||
upstream_fields.append(matched_field)
|
||||
upstream_fields = sorted(list(set(upstream_fields)))
|
||||
|
||||
field = ViewField(
|
||||
name=name,
|
||||
@ -692,7 +702,7 @@ class LookerView:
|
||||
description=description,
|
||||
is_primary_key=is_primary_key,
|
||||
field_type=type_cls,
|
||||
upstream_field=upstream_field,
|
||||
upstream_fields=upstream_fields,
|
||||
)
|
||||
fields.append(field)
|
||||
return fields
|
||||
@ -711,6 +721,7 @@ class LookerView:
|
||||
parse_table_names_from_sql: bool = False,
|
||||
sql_parser_path: str = "datahub.utilities.sql_parser.DefaultSQLParser",
|
||||
extract_col_level_lineage: bool = False,
|
||||
populate_sql_logic_in_descriptions: bool = False,
|
||||
) -> Optional["LookerView"]:
|
||||
view_name = looker_view["name"]
|
||||
logger.debug(f"Handling view {view_name} in model {model_name}")
|
||||
@ -746,16 +757,19 @@ class LookerView:
|
||||
looker_view.get("dimensions", []),
|
||||
ViewFieldType.DIMENSION,
|
||||
extract_col_level_lineage,
|
||||
populate_sql_logic_in_descriptions=populate_sql_logic_in_descriptions,
|
||||
)
|
||||
dimension_groups = cls._get_fields(
|
||||
looker_view.get("dimension_groups", []),
|
||||
ViewFieldType.DIMENSION_GROUP,
|
||||
extract_col_level_lineage,
|
||||
populate_sql_logic_in_descriptions=populate_sql_logic_in_descriptions,
|
||||
)
|
||||
measures = cls._get_fields(
|
||||
looker_view.get("measures", []),
|
||||
ViewFieldType.MEASURE,
|
||||
extract_col_level_lineage,
|
||||
populate_sql_logic_in_descriptions=populate_sql_logic_in_descriptions,
|
||||
)
|
||||
fields: List[ViewField] = dimensions + dimension_groups + measures
|
||||
|
||||
@ -990,6 +1004,7 @@ class LookMLSource(Source):
|
||||
|
||||
# This is populated during the git clone step.
|
||||
base_projects_folder: Dict[str, pathlib.Path] = {}
|
||||
remote_projects_github_info: Dict[str, GitHubInfo] = {}
|
||||
|
||||
def __init__(self, config: LookMLSourceConfig, ctx: PipelineContext):
|
||||
super().__init__(ctx)
|
||||
@ -1037,7 +1052,9 @@ class LookMLSource(Source):
|
||||
parts = len(sql_table_name.split("."))
|
||||
|
||||
if parts == 3:
|
||||
# fully qualified
|
||||
# fully qualified, but if platform is of 2-part, we drop the first level
|
||||
if self._platform_names_have_2_parts(connection_def.platform):
|
||||
sql_table_name = ".".join(sql_table_name.split(".")[1:])
|
||||
return sql_table_name.lower()
|
||||
|
||||
if parts == 1:
|
||||
@ -1046,15 +1063,15 @@ class LookMLSource(Source):
|
||||
dataset_name = f"{connection_def.default_db}.{sql_table_name}"
|
||||
else:
|
||||
dataset_name = f"{connection_def.default_db}.{connection_def.default_schema}.{sql_table_name}"
|
||||
return dataset_name
|
||||
return dataset_name.lower()
|
||||
|
||||
if parts == 2:
|
||||
# if this is a 2 part platform, we are fine
|
||||
if self._platform_names_have_2_parts(connection_def.platform):
|
||||
return sql_table_name
|
||||
return sql_table_name.lower()
|
||||
# otherwise we attach the default top-level container
|
||||
dataset_name = f"{connection_def.default_db}.{sql_table_name}"
|
||||
return dataset_name
|
||||
return dataset_name.lower()
|
||||
|
||||
self.reporter.report_warning(
|
||||
key=sql_table_name, reason=f"{sql_table_name} has more than 3 parts."
|
||||
@ -1141,13 +1158,14 @@ class LookMLSource(Source):
|
||||
fine_grained_lineages: List[FineGrainedLineageClass] = []
|
||||
if self.source_config.extract_column_level_lineage:
|
||||
for field in looker_view.fields:
|
||||
if field.upstream_field is not None:
|
||||
if field.upstream_fields:
|
||||
fine_grained_lineage = FineGrainedLineageClass(
|
||||
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
|
||||
upstreams=[
|
||||
make_schema_field_urn(
|
||||
upstream_dataset_urn, field.upstream_field
|
||||
upstream_dataset_urn, upstream_field
|
||||
)
|
||||
for upstream_field in field.upstream_fields
|
||||
],
|
||||
downstreamType=FineGrainedLineageDownstreamType.FIELD,
|
||||
downstreams=[
|
||||
@ -1174,25 +1192,42 @@ class LookMLSource(Source):
|
||||
|
||||
def _get_custom_properties(self, looker_view: LookerView) -> DatasetPropertiesClass:
|
||||
assert self.source_config.base_folder # this is always filled out
|
||||
|
||||
file_path = str(
|
||||
pathlib.Path(looker_view.absolute_file_path).relative_to(
|
||||
self.source_config.base_folder.resolve()
|
||||
if looker_view.id.project_name == _BASE_PROJECT_NAME:
|
||||
base_folder = self.source_config.base_folder
|
||||
else:
|
||||
base_folder = self.base_projects_folder.get(
|
||||
looker_view.id.project_name, self.source_config.base_folder
|
||||
)
|
||||
try:
|
||||
file_path = str(
|
||||
pathlib.Path(looker_view.absolute_file_path).relative_to(
|
||||
base_folder.resolve()
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
file_path = None
|
||||
logger.warning(
|
||||
f"Failed to resolve relative path for file {looker_view.absolute_file_path} w.r.t. folder {self.source_config.base_folder}"
|
||||
)
|
||||
)
|
||||
|
||||
custom_properties = {
|
||||
"looker.file.path": file_path,
|
||||
"looker.file.path": file_path or looker_view.absolute_file_path,
|
||||
}
|
||||
dataset_props = DatasetPropertiesClass(
|
||||
name=looker_view.id.view_name, customProperties=custom_properties
|
||||
)
|
||||
|
||||
if self.source_config.github_info is not None:
|
||||
maybe_github_info = self.source_config.project_dependencies.get(
|
||||
looker_view.id.project_name,
|
||||
self.remote_projects_github_info.get(looker_view.id.project_name),
|
||||
)
|
||||
if isinstance(maybe_github_info, GitHubInfo):
|
||||
github_info: Optional[GitHubInfo] = maybe_github_info
|
||||
else:
|
||||
github_info = self.source_config.github_info
|
||||
if github_info is not None and file_path:
|
||||
# It should be that looker_view.id.project_name is the base project.
|
||||
github_file_url = self.source_config.github_info.get_url_for_file_path(
|
||||
file_path
|
||||
)
|
||||
github_file_url = github_info.get_url_for_file_path(file_path)
|
||||
dataset_props.externalUrl = github_file_url
|
||||
|
||||
return dataset_props
|
||||
@ -1390,6 +1425,20 @@ class LookMLSource(Source):
|
||||
self.base_projects_folder[
|
||||
remote_project.name
|
||||
] = p_checkout_dir.resolve()
|
||||
repo = p_cloner.get_last_repo_cloned()
|
||||
assert repo
|
||||
remote_github_info = GitHubInfo(
|
||||
base_url=remote_project.url,
|
||||
repo="dummy/dummy", # set to dummy values to bypass validation
|
||||
branch=repo.active_branch.name,
|
||||
)
|
||||
remote_github_info.repo = (
|
||||
"" # set to empty because url already contains the full path
|
||||
)
|
||||
self.remote_projects_github_info[
|
||||
remote_project.name
|
||||
] = remote_github_info
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to clone remote project {project_name}. This can lead to failures in parsing lookml files later on",
|
||||
@ -1508,7 +1557,9 @@ class LookMLSource(Source):
|
||||
self.reporter.report_views_scanned()
|
||||
try:
|
||||
maybe_looker_view = LookerView.from_looker_dict(
|
||||
project_name,
|
||||
include.project
|
||||
if include.project != _BASE_PROJECT_NAME
|
||||
else project_name,
|
||||
model_name,
|
||||
raw_view,
|
||||
connectionDefinition,
|
||||
@ -1519,6 +1570,7 @@ class LookMLSource(Source):
|
||||
self.source_config.parse_table_names_from_sql,
|
||||
self.source_config.sql_parser,
|
||||
self.source_config.extract_column_level_lineage,
|
||||
self.source_config.populate_sql_logic_for_missing_descriptions,
|
||||
)
|
||||
except Exception as e:
|
||||
self.reporter.report_warning(
|
||||
|
||||
@ -0,0 +1,40 @@
|
||||
view: my_derived_view {
|
||||
derived_table: {
|
||||
sql:
|
||||
SELECT
|
||||
country,
|
||||
city,
|
||||
timestamp,
|
||||
measurement
|
||||
FROM
|
||||
${my_view.SQL_TABLE_NAME} AS my_view ;;
|
||||
}
|
||||
|
||||
dimension: country {
|
||||
type: string
|
||||
description: "The country"
|
||||
sql: ${TABLE}.country ;;
|
||||
}
|
||||
|
||||
dimension: city {
|
||||
type: string
|
||||
description: "City"
|
||||
sql: ${TABLE}.city ;;
|
||||
}
|
||||
|
||||
dimension_group: timestamp {
|
||||
group_label: "Timestamp"
|
||||
type: time
|
||||
description: "Timestamp of measurement"
|
||||
sql: ${TABLE}.timestamp ;;
|
||||
timeframes: [hour, date, week, day_of_week]
|
||||
}
|
||||
|
||||
measure: average_measurement {
|
||||
group_label: "Measurement"
|
||||
type: average
|
||||
description: "My measurement"
|
||||
sql: ${TABLE}.measurement ;;
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,33 @@
|
||||
connection: "my_connection"
|
||||
|
||||
include: "foo.view.lkml"
|
||||
include: "bar.view.lkml"
|
||||
include: "nested/*"
|
||||
include: "liquid.view.lkml"
|
||||
|
||||
explore: aliased_explore {
|
||||
from: my_view
|
||||
}
|
||||
|
||||
explore: dataset_owners{
|
||||
join: all_entities {
|
||||
relationship: many_to_one
|
||||
sql_on: ${all_entities.urn} = ${dataset_owners.urn};;
|
||||
}
|
||||
}
|
||||
|
||||
explore: data_model {
|
||||
label: "Data model!"
|
||||
description: "Lorem ipsum"
|
||||
|
||||
always_filter: {
|
||||
filters: {
|
||||
field: is_latest_forecast
|
||||
value: "TRUE"
|
||||
}
|
||||
filters: {
|
||||
field: granularity
|
||||
value: "day"
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,10 @@
|
||||
connection: "my_other_connection"
|
||||
include: "**/*.view.lkml"
|
||||
|
||||
explore: aliased_explore2 {
|
||||
from: my_view2
|
||||
}
|
||||
|
||||
explore: duplicate_explore {
|
||||
from: my_view
|
||||
}
|
||||
@ -0,0 +1,47 @@
|
||||
view: my_view {
|
||||
derived_table: {
|
||||
sql:
|
||||
SELECT
|
||||
is_latest,
|
||||
country,
|
||||
city,
|
||||
timestamp,
|
||||
measurement
|
||||
FROM
|
||||
hive.my_database.my_table ;;
|
||||
}
|
||||
|
||||
dimension: country {
|
||||
type: string
|
||||
description: "The country"
|
||||
sql: ${TABLE}.country ;;
|
||||
}
|
||||
|
||||
dimension: city {
|
||||
type: string
|
||||
description: "City"
|
||||
sql: ${TABLE}.city ;;
|
||||
}
|
||||
|
||||
dimension: is_latest {
|
||||
type: yesno
|
||||
description: "Is latest data"
|
||||
sql: ${TABLE}.is_latest ;;
|
||||
}
|
||||
|
||||
dimension_group: timestamp {
|
||||
group_label: "Timestamp"
|
||||
type: time
|
||||
description: "Timestamp of measurement"
|
||||
sql: ${TABLE}.timestamp ;;
|
||||
timeframes: [hour, date, week, day_of_week]
|
||||
}
|
||||
|
||||
measure: average_measurement {
|
||||
group_label: "Measurement"
|
||||
type: average
|
||||
description: "My measurement"
|
||||
sql: ${TABLE}.measurement ;;
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,47 @@
|
||||
view: my_view2 {
|
||||
derived_table: {
|
||||
sql:
|
||||
SELECT
|
||||
is_latest,
|
||||
country,
|
||||
city,
|
||||
timestamp,
|
||||
measurement
|
||||
FROM
|
||||
my_table ;;
|
||||
}
|
||||
|
||||
dimension: country {
|
||||
type: string
|
||||
description: "The country"
|
||||
sql: ${TABLE}.country ;;
|
||||
}
|
||||
|
||||
dimension: city {
|
||||
type: string
|
||||
description: "City"
|
||||
sql: ${TABLE}.city ;;
|
||||
}
|
||||
|
||||
dimension: is_latest {
|
||||
type: yesno
|
||||
description: "Is latest data"
|
||||
sql: ${TABLE}.is_latest ;;
|
||||
}
|
||||
|
||||
dimension_group: timestamp {
|
||||
group_label: "Timestamp"
|
||||
type: time
|
||||
description: "Timestamp of measurement"
|
||||
sql: ${TABLE}.timestamp ;;
|
||||
timeframes: [hour, date, week, day_of_week]
|
||||
}
|
||||
|
||||
measure: average_measurement {
|
||||
group_label: "Measurement"
|
||||
type: average
|
||||
description: "My measurement"
|
||||
sql: ${TABLE}.measurement ;;
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,3 @@
|
||||
view: include_able_view {
|
||||
sql_table_name: looker_schema.include_able ;;
|
||||
}
|
||||
@ -0,0 +1,14 @@
|
||||
view: customer_facts {
|
||||
derived_table: {
|
||||
sql:
|
||||
SELECT
|
||||
customer_id,
|
||||
SUM(sale_price) AS lifetime_spend
|
||||
FROM
|
||||
order
|
||||
WHERE
|
||||
{% condition order_region %} order.region {% endcondition %}
|
||||
GROUP BY 1
|
||||
;;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,9 @@
|
||||
view: fragment_derived_view
|
||||
{ derived_table:
|
||||
{
|
||||
sql: date DATE encode ZSTD,
|
||||
platform VARCHAR(20) encode ZSTD AS aliased_platform,
|
||||
country VARCHAR(20) encode ZSTD
|
||||
;;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,10 @@
|
||||
connection: "my_connection"
|
||||
|
||||
explore: included_sql_preamble {
|
||||
sql_preamble:
|
||||
CREATE TEMP FUNCTION CONCAT_VERBOSE(a STRING, b STRING)
|
||||
RETURNS STRING AS (
|
||||
CONCAT(a, b)
|
||||
);
|
||||
;;
|
||||
}
|
||||
@ -0,0 +1 @@
|
||||
poison file
|
||||
@ -0,0 +1,20 @@
|
||||
connection: "my_connection"
|
||||
|
||||
include: "/view_declarations.view"
|
||||
|
||||
explore: second_model {
|
||||
label: "Second model!"
|
||||
description: "Lorem ipsum"
|
||||
|
||||
## This section doesn't work yet because of a bug in the lkml parser.
|
||||
## See https://github.com/joshtemple/lkml/issues/59.
|
||||
## measure: bookings_measure {
|
||||
## label: "Number of new bookings"
|
||||
## group_label: "New bookings"
|
||||
## description: "A distinct count of all new bookings"
|
||||
## sql: ${booking_id} ;;
|
||||
## type: count_distinct
|
||||
## filters: [ state: "CLOSED" ,
|
||||
## name: "New Bookings"]
|
||||
## }
|
||||
}
|
||||
@ -0,0 +1,19 @@
|
||||
include: "/included_view_file.view"
|
||||
|
||||
view: looker_events {
|
||||
sql_table_name: looker_schema.events ;;
|
||||
}
|
||||
|
||||
view: extending_looker_events {
|
||||
extends: [looker_events]
|
||||
|
||||
measure: additional_measure {
|
||||
type: count
|
||||
}
|
||||
}
|
||||
|
||||
view: autodetect_sql_name_based_on_view_name {}
|
||||
|
||||
view: test_include_external_view {
|
||||
extends: [include_able_view]
|
||||
}
|
||||
@ -8,6 +8,11 @@ from looker_sdk.sdk.api31.models import DBConnection
|
||||
|
||||
from datahub.configuration.common import PipelineExecutionError
|
||||
from datahub.ingestion.run.pipeline import Pipeline
|
||||
from datahub.metadata.schema_classes import (
|
||||
DatasetSnapshotClass,
|
||||
MetadataChangeEventClass,
|
||||
UpstreamLineageClass,
|
||||
)
|
||||
from tests.test_helpers import mce_helpers
|
||||
|
||||
logging.getLogger("lkml").setLevel(logging.INFO)
|
||||
@ -37,6 +42,7 @@ def test_lookml_ingest(pytestconfig, tmp_path, mock_time):
|
||||
"tag_measures_and_dimensions": False,
|
||||
"project_name": "lkml_samples",
|
||||
"model_pattern": {"deny": ["data2"]},
|
||||
"emit_reachable_views_only": False,
|
||||
},
|
||||
},
|
||||
"sink": {
|
||||
@ -80,6 +86,7 @@ def test_lookml_ingest_offline(pytestconfig, tmp_path, mock_time):
|
||||
"parse_table_names_from_sql": True,
|
||||
"project_name": "lkml_samples",
|
||||
"model_pattern": {"deny": ["data2"]},
|
||||
"emit_reachable_views_only": False,
|
||||
},
|
||||
},
|
||||
"sink": {
|
||||
@ -123,6 +130,7 @@ def test_lookml_ingest_offline_with_model_deny(pytestconfig, tmp_path, mock_time
|
||||
"parse_table_names_from_sql": True,
|
||||
"project_name": "lkml_samples",
|
||||
"model_pattern": {"deny": ["data"]},
|
||||
"emit_reachable_views_only": False,
|
||||
},
|
||||
},
|
||||
"sink": {
|
||||
@ -168,6 +176,7 @@ def test_lookml_ingest_offline_platform_instance(pytestconfig, tmp_path, mock_ti
|
||||
"parse_table_names_from_sql": True,
|
||||
"project_name": "lkml_samples",
|
||||
"model_pattern": {"deny": ["data2"]},
|
||||
"emit_reachable_views_only": False,
|
||||
},
|
||||
},
|
||||
"sink": {
|
||||
@ -246,6 +255,7 @@ def ingestion_test(
|
||||
},
|
||||
"parse_table_names_from_sql": True,
|
||||
"model_pattern": {"deny": ["data2"]},
|
||||
"emit_reachable_views_only": False,
|
||||
},
|
||||
},
|
||||
"sink": {
|
||||
@ -289,6 +299,7 @@ def test_lookml_bad_sql_parser(pytestconfig, tmp_path, mock_time):
|
||||
"parse_table_names_from_sql": True,
|
||||
"project_name": "lkml_samples",
|
||||
"sql_parser": "bad.sql.Parser",
|
||||
"emit_reachable_views_only": False,
|
||||
},
|
||||
},
|
||||
"sink": {
|
||||
@ -338,6 +349,7 @@ def test_lookml_github_info(pytestconfig, tmp_path, mock_time):
|
||||
"project_name": "lkml_samples",
|
||||
"model_pattern": {"deny": ["data2"]},
|
||||
"github_info": {"repo": "datahub/looker-demo", "branch": "master"},
|
||||
"emit_reachable_views_only": False,
|
||||
},
|
||||
},
|
||||
"sink": {
|
||||
@ -421,3 +433,57 @@ def test_reachable_views(pytestconfig, tmp_path, mock_time):
|
||||
"urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view2,PROD)"
|
||||
in entity_urns
|
||||
)
|
||||
|
||||
|
||||
@freeze_time(FROZEN_TIME)
|
||||
def test_hive_platform_drops_ids(pytestconfig, tmp_path, mock_time):
|
||||
"""Test omit db name from hive ids"""
|
||||
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
|
||||
mce_out = "lookml_mces_with_db_name_omitted.json"
|
||||
pipeline = Pipeline.create(
|
||||
{
|
||||
"run_id": "lookml-test",
|
||||
"source": {
|
||||
"type": "lookml",
|
||||
"config": {
|
||||
"base_folder": str(test_resources_dir / "lkml_samples_hive"),
|
||||
"connection_to_platform_map": {
|
||||
"my_connection": {
|
||||
"platform": "hive",
|
||||
"default_db": "default_database",
|
||||
"default_schema": "default_schema",
|
||||
}
|
||||
},
|
||||
"parse_table_names_from_sql": True,
|
||||
"project_name": "lkml_samples",
|
||||
"model_pattern": {"deny": ["data2"]},
|
||||
"github_info": {"repo": "datahub/looker-demo", "branch": "master"},
|
||||
"emit_reachable_views_only": False,
|
||||
},
|
||||
},
|
||||
"sink": {
|
||||
"type": "file",
|
||||
"config": {
|
||||
"filename": f"{tmp_path}/{mce_out}",
|
||||
},
|
||||
},
|
||||
}
|
||||
)
|
||||
pipeline.run()
|
||||
pipeline.pretty_print_summary()
|
||||
pipeline.raise_from_status(raise_warnings=True)
|
||||
|
||||
maybe_events = mce_helpers.load_json_file(tmp_path / mce_out)
|
||||
assert isinstance(maybe_events, list)
|
||||
for mce in maybe_events:
|
||||
if "proposedSnapshot" in mce:
|
||||
mce_concrete = MetadataChangeEventClass.from_obj(mce)
|
||||
if isinstance(mce_concrete.proposedSnapshot, DatasetSnapshotClass):
|
||||
lineage_aspects = [
|
||||
a
|
||||
for a in mce_concrete.proposedSnapshot.aspects
|
||||
if isinstance(a, UpstreamLineageClass)
|
||||
]
|
||||
for a in lineage_aspects:
|
||||
for upstream in a.upstreams:
|
||||
assert "hive." not in upstream.dataset
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user