fix(ingest): types for dbt (#2716)

This commit is contained in:
Kevin Hu 2021-06-22 10:37:08 -07:00 committed by GitHub
parent 2c20bce79d
commit 554e1637c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 4737 additions and 4493 deletions

View File

@ -77,6 +77,10 @@ type Props = {
export default function TypeIcon({ type, nativeDataType }: Props) {
const { icon: Icon, size, text } = DATA_TYPE_ICON_MAP[type];
// if unable to match type to DataHub, display native type info by default
const nativeFallback = type === SchemaFieldDataType.Null;
// eslint-disable-next-line react/prop-types
const NativeDataTypeTooltip = ({ children }) =>
nativeDataType ? (
@ -92,7 +96,7 @@ export default function TypeIcon({ type, nativeDataType }: Props) {
<TypeIconContainer data-testid={`icon-${type}`}>
{Icon && <Icon style={{ fontSize: size }} />}
<TypeSubtitle type="secondary" hasicon={Icon ? 'yes' : undefined}>
{text}
{nativeFallback ? nativeDataType : text}
</TypeSubtitle>
</TypeIconContainer>
</NativeDataTypeTooltip>

View File

@ -2,11 +2,11 @@
source:
type: "dbt"
config:
manifest_path: "./your/dbt/manifest.json"
catalog_path: "./your/dbt/catalog.json"
target_platform: "[target data platform]"
load_schemas: True # or false
manifest_path: "./tests/integration/dbt/dbt_manifest.json"
catalog_path: "./tests/integration/dbt/dbt_catalog.json"
target_platform: "dbt"
load_schemas: True # or False
sink:
type: "datahub-rest"
config:
server: 'http://localhost:8080'
server: "http://localhost:8080"

View File

@ -8,11 +8,12 @@ pytest --basetemp=tmp || true
# Update the golden files.
cp tmp/test_serde_to_json_tests_unit_0/output.json tests/unit/serde/test_serde_large.json
cp tmp/test_serde_to_json_tests_unit_1/output.json tests/unit/serde/test_serde_chart_snapshot.json
cp tmp/test_ldap_ingest0/ldap_mces.json tests/integration/ldap/ldap_mce_golden.json
cp tmp/test_mysql_ingest0/mysql_mces.json tests/integration/mysql/mysql_mce_golden.json
cp tmp/test_mssql_ingest0/mssql_mces.json tests/integration/sql_server/mssql_mce_golden.json
cp tmp/test_mongodb_ingest0/mongodb_mces.json tests/integration/mongodb/mongodb_mce_golden.json
cp tmp/test_feast_ingest0/feast_mces.json tests/integration/feast/feast_mce_golden.json
cp tmp/test_ldap_ingest0/ldap_mces.json tests/integration/ldap/ldap_mces_golden.json
cp tmp/test_mysql_ingest0/mysql_mces.json tests/integration/mysql/mysql_mces_golden.json
cp tmp/test_mssql_ingest0/mssql_mces.json tests/integration/sql_server/mssql_mces_golden.json
cp tmp/test_mongodb_ingest0/mongodb_mces.json tests/integration/mongodb/mongodb_mces_golden.json
cp tmp/test_feast_ingest0/feast_mces.json tests/integration/feast/feast_mces_golden.json
cp tmp/test_dbt_ingest0/dbt_mces.json tests/integration/dbt/dbt_mces_golden.json
cp tmp/test_lookml_ingest0/lookml_mces.json tests/integration/lookml/expected_output.json
cp tmp/test_looker_ingest0/looker_mces.json tests/integration/looker/expected_output.json

View File

@ -1,6 +1,5 @@
import json
import logging
import re
import time
from typing import Any, Dict, Iterable, List
@ -8,6 +7,11 @@ from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.source.dbt_types import (
POSTGRES_TYPES_MAP,
SNOWFLAKE_TYPES_MAP,
resolve_postgres_modified_type,
)
from datahub.ingestion.source.metadata_common import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
@ -27,6 +31,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.schema import (
SchemaFieldDataType,
SchemaMetadata,
StringTypeClass,
TimeTypeClass,
)
from datahub.metadata.schema_classes import DatasetPropertiesClass
@ -96,8 +101,7 @@ def extract_dbt_entities(
node_type_pattern: AllowDenyPattern,
) -> List[DBTNode]:
dbt_entities = []
for key in nodes:
node = nodes[key]
for key, node in nodes.items():
dbtNode = DBTNode()
# check if node pattern allowed based on config file
@ -174,7 +178,7 @@ def loadManifestAndCatalog(
all_catalog_entities = {**catalog_nodes, **catalog_sources}
nodes = extract_dbt_entities(
return extract_dbt_entities(
all_manifest_entities,
all_catalog_entities,
load_catalog,
@ -183,8 +187,6 @@ def loadManifestAndCatalog(
node_type_pattern,
)
return nodes
def get_urn_from_dbtNode(
database: str, schema: str, name: str, target_platform: str, env: str
@ -195,11 +197,11 @@ def get_urn_from_dbtNode(
def get_custom_properties(node: DBTNode) -> Dict[str, str]:
properties = {}
properties["dbt_node_type"] = node.node_type
properties["materialization"] = node.materialization
properties["dbt_file_path"] = node.dbt_file_path
return properties
return {
"dbt_node_type": node.node_type,
"materialization": node.materialization,
"dbt_file_path": node.dbt_file_path,
}
def get_upstreams(
@ -216,7 +218,7 @@ def get_upstreams(
dbtNode_upstream.database = all_nodes[upstream]["database"]
dbtNode_upstream.schema = all_nodes[upstream]["schema"]
if "identifier" in all_nodes[upstream] and load_catalog is False:
if "identifier" in all_nodes[upstream] and not load_catalog:
dbtNode_upstream.name = all_nodes[upstream]["identifier"]
else:
dbtNode_upstream.name = all_nodes[upstream]["name"]
@ -247,20 +249,22 @@ def get_upstream_lineage(upstream_urns: List[str]) -> UpstreamLineage:
)
ucl.append(uc)
ulc = UpstreamLineage(upstreams=ucl)
return ulc
return UpstreamLineage(upstreams=ucl)
# This is from a fairly narrow data source that is posgres specific, we would expect this to expand over
# time or be replaced with a more thorough mechanism
# See https://github.com/fishtown-analytics/dbt/blob/master/core/dbt/adapters/sql/impl.py
_field_type_mapping = {
"boolean": BooleanTypeClass,
"date": DateTypeClass,
"time": TimeTypeClass,
"numeric": NumberTypeClass,
"text": StringTypeClass,
"timestamp with time zone": DateTypeClass,
"timestamp without time zone": DateTypeClass,
"integer": NumberTypeClass,
"float8": NumberTypeClass,
**POSTGRES_TYPES_MAP,
**SNOWFLAKE_TYPES_MAP,
}
@ -270,20 +274,16 @@ def get_column_type(
"""
Maps known DBT types to datahub types
"""
column_type_stripped = ""
pattern = re.compile(r"[\w ]+") # drop all non alphanumerics
match = pattern.match(column_type)
if match is not None:
column_type_stripped = match.group()
TypeClass: Any = None
for key in _field_type_mapping.keys():
if key == column_type_stripped:
TypeClass = _field_type_mapping[column_type_stripped]
break
TypeClass: Any = _field_type_mapping.get(column_type)
if TypeClass is None:
# attempt Postgres modified type
TypeClass = resolve_postgres_modified_type(column_type)
# if still not found, report the warning
if TypeClass is None:
report.report_warning(
dataset_name, f"unable to map type {column_type} to metadata schema"
)
@ -309,7 +309,7 @@ def get_schema_metadata(
canonical_schema.append(field)
actor, sys_time = "urn:li:corpuser:dbt_executor", int(time.time()) * 1000
schema_metadata = SchemaMetadata(
return SchemaMetadata(
schemaName=node.dbt_name,
platform=f"urn:li:dataPlatform:{platform}",
version=0,
@ -319,7 +319,6 @@ def get_schema_metadata(
lastModified=AuditStamp(time=sys_time, actor=actor),
fields=canonical_schema,
)
return schema_metadata
class DBTSource(Source):

View File

@ -0,0 +1,264 @@
import re
from typing import Any, Dict, ValuesView
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayType,
BooleanType,
BytesType,
DateType,
EnumType,
NullType,
NumberType,
RecordType,
StringType,
TimeType,
UnionType,
)
# these can be obtained by running `select format_type(oid, null),* from pg_type;`
# we've omitted the types without a meaningful DataHub type (e.g. postgres-specific types, index vectors, etc.)
# (run `\copy (select format_type(oid, null),* from pg_type) to 'pg_type.csv' csv header;` to get a CSV)
# we map from format_type since this is what dbt uses
# see https://github.com/fishtown-analytics/dbt/blob/master/plugins/postgres/dbt/include/postgres/macros/catalog.sql#L22
# see https://www.npgsql.org/dev/types.html for helpful type annotations
POSTGRES_TYPES_MAP: Dict[str, Any] = {
"boolean": BooleanType,
"bytea": BytesType,
'"char"': StringType,
"name": None, # for system identifiers
"bigint": NumberType,
"smallint": NumberType,
"int2vector": NumberType, # for indexing
"integer": NumberType,
"regproc": None, # object identifier
"text": StringType,
"oid": None, # object identifier
"tid": None, # object identifier
"xid": None, # object identifier
"cid": None, # object identifier
"oidvector": None, # object identifier
"json": RecordType,
"xml": RecordType,
"xid8": None, # object identifier
"point": None, # 2D point
"lseg": None, # line segment
"path": None, # path of points
"box": None, # a pair of corner points
"polygon": None, # closed set of points
"line": None, # infinite line
"real": NumberType,
"double precision": NumberType,
"unknown": None,
"circle": None, # circle with center and radius
"money": NumberType,
"macaddr": None, # MAC address
"inet": None, # IPv4 or IPv6 host address
"cidr": None, # IPv4 or IPv6 network specification
"macaddr8": None, # MAC address
"aclitem": None, # system info
"character": StringType,
"character varying": StringType,
"date": DateType,
"time without time zone": TimeType,
"timestamp without time zone": TimeType,
"timestamp with time zone": TimeType,
"interval": None,
"time with time zone": TimeType,
"bit": BytesType,
"bit varying": BytesType,
"numeric": NumberType,
"refcursor": None,
"regprocedure": None,
"regoper": None,
"regoperator": None,
"regclass": None,
"regcollation": None,
"regtype": None,
"regrole": None,
"regnamespace": None,
"uuid": StringType,
"pg_lsn": None,
"tsvector": None, # text search vector
"gtsvector": None, # GiST for tsvector. Probably internal type.
"tsquery": None, # text search query tree
"regconfig": None,
"regdictionary": None,
"jsonb": BytesType,
"jsonpath": None, # path to property in a JSON doc
"txid_snapshot": None,
"pg_snapshot": None,
"int4range": None, # don't have support for ranges yet
"numrange": None,
"tsrange": None,
"tstzrange": None,
"daterange": None,
"int8range": None,
"record": RecordType,
"record[]": ArrayType,
"cstring": None,
'"any"': UnionType,
"anyarray": ArrayType,
"void": NullType,
"trigger": None,
"event_trigger": None,
"language_handler": None,
"internal": None,
"anyelement": None,
"anynonarray": None,
"anyenum": EnumType,
"fdw_handler": None,
"index_am_handler": None,
"tsm_handler": None,
"table_am_handler": None,
"anyrange": None,
"anycompatible": None,
"anycompatiblearray": None,
"anycompatiblenonarray": None,
"anycompatiblerange": None,
"boolean[]": ArrayType,
"bytea[]": ArrayType,
'"char"[]': ArrayType,
"name[]": ArrayType,
"bigint[]": ArrayType,
"smallint[]": ArrayType,
"int2vector[]": ArrayType,
"integer[]": ArrayType,
"regproc[]": ArrayType,
"text[]": ArrayType,
"oid[]": ArrayType,
"tid[]": ArrayType,
"xid[]": ArrayType,
"cid[]": ArrayType,
"oidvector[]": ArrayType,
"json[]": ArrayType,
"xml[]": ArrayType,
"xid8[]": ValuesView,
"point[]": ArrayType,
"lseg[]": ArrayType,
"path[]": ArrayType,
"box[]": ArrayType,
"polygon[]": ArrayType,
"line[]": ArrayType,
"real[]": ArrayType,
"double precision[]": ArrayType,
"circle[]": ArrayType,
"money[]": ArrayType,
"macaddr[]": ArrayType,
"inet[]": ArrayType,
"cidr[]": ArrayType,
"macaddr8[]": ArrayType,
"aclitem[]": ArrayType,
"character[]": ArrayType,
"character varying[]": ArrayType,
"date[]": ArrayType,
"time without time zone[]": ArrayType,
"timestamp without time zone[]": ArrayType,
"timestamp with time zone[]": ArrayType,
"interval[]": ArrayType,
"time with time zone[]": ArrayType,
"bit[]": ArrayType,
"bit varying[]": ArrayType,
"numeric[]": ArrayType,
"refcursor[]": ArrayType,
"regprocedure[]": ArrayType,
"regoper[]": ArrayType,
"regoperator[]": ArrayType,
"regclass[]": ArrayType,
"regcollation[]": ArrayType,
"regtype[]": ArrayType,
"regrole[]": ArrayType,
"regnamespace[]": ArrayType,
"uuid[]": ArrayType,
"pg_lsn[]": ArrayType,
"tsvector[]": ArrayType,
"gtsvector[]": ArrayType,
"tsquery[]": ArrayType,
"regconfig[]": ArrayType,
"regdictionary[]": ArrayType,
"jsonb[]": ArrayType,
"jsonpath[]": ArrayType,
"txid_snapshot[]": ArrayType,
"pg_snapshot[]": ArrayType,
"int4range[]": ArrayType,
"numrange[]": ArrayType,
"tsrange[]": ArrayType,
"tstzrange[]": ArrayType,
"daterange[]": ArrayType,
"int8range[]": ArrayType,
"cstring[]": ArrayType,
}
# Postgres types with modifiers (identifiable by non-empty typmodin and typmodout columns)
POSTGRES_MODIFIED_TYPES = {
"character varying",
"character varying[]",
"bit varying",
"bit varying[]",
"time with time zone",
"time with time zone[]",
"time without time zone",
"time without time zone[]",
"timestamp with time zone",
"timestamp with time zone[]",
"timestamp without time zone",
"timestamp without time zone[]",
"numeric",
"numeric[]",
"interval",
"interval[]",
"character",
"character[]",
"bit",
"bit[]",
}
def resolve_postgres_modified_type(type_string: str) -> Any:
if type_string.endswith("[]"):
return ArrayType
for modified_type_base in POSTGRES_MODIFIED_TYPES:
if re.match(rf"{re.escape(modified_type_base)}\([0-9,]+\)", type_string):
return POSTGRES_TYPES_MAP[modified_type_base]
return None
# see https://docs.snowflake.com/en/sql-reference/intro-summary-data-types.html
SNOWFLAKE_TYPES_MAP: Dict[str, Any] = {
"NUMBER": NumberType,
"DECIMAL": NumberType,
"NUMERIC": NumberType,
"INT": NumberType,
"INTEGER": NumberType,
"BIGINT": NumberType,
"SMALLINT": NumberType,
"FLOAT": NumberType,
"FLOAT4": NumberType,
"FLOAT8": NumberType,
"DOUBLE": NumberType,
"DOUBLE PRECISION": NumberType,
"REAL": NumberType,
"VARCHAR": StringType,
"CHAR": StringType,
"CHARACTER": StringType,
"STRING": StringType,
"TEXT": StringType,
"BINARY": BytesType,
"VARBINARY": BytesType,
"BOOLEAN": BooleanType,
"DATE": DateType,
"DATETIME": DateType,
"TIME": TimeType,
"TIMESTAMP": TimeType,
"TIMESTAMP_LTZ": TimeType,
"TIMESTAMP_NTZ": TimeType,
"TIMESTAMP_TZ": TimeType,
"VARIANT": RecordType,
"OBJECT": RecordType,
"ARRAY": ArrayType,
"GEOGRAPHY": None,
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -185,7 +185,7 @@
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.DateType": {}
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamp with time zone",
@ -269,7 +269,7 @@
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.DateType": {}
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamp with time zone",
@ -306,6 +306,21 @@
"recursive": false,
"globalTags": null,
"glossaryTerms": null
},
{
"fieldPath": "name",
"jsonPath": null,
"nullable": false,
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "character varying(129)",
"recursive": false,
"globalTags": null,
"glossaryTerms": null
}
],
"primaryKeys": null,
@ -478,7 +493,7 @@
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.DateType": {}
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamp with time zone",
@ -530,6 +545,53 @@
"recursive": false,
"globalTags": null,
"glossaryTerms": null
},
{
"fieldPath": "emails",
"jsonPath": null,
"nullable": false,
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.ArrayType": {
"nestedType": null
}
}
},
"nativeDataType": "text[]",
"recursive": false,
"globalTags": null,
"glossaryTerms": null
},
{
"fieldPath": "cost",
"jsonPath": null,
"nullable": false,
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "money",
"recursive": false,
"globalTags": null,
"glossaryTerms": null
},
{
"fieldPath": "native",
"jsonPath": null,
"nullable": false,
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NullType": {}
}
},
"nativeDataType": "some native type",
"recursive": false,
"globalTags": null,
"glossaryTerms": null
}
],
"primaryKeys": null,
@ -672,7 +734,7 @@
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.DateType": {}
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamp with time zone",
@ -821,7 +883,7 @@
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.DateType": {}
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamp with time zone",
@ -970,7 +1032,7 @@
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.DateType": {}
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamp with time zone",
@ -1119,7 +1181,7 @@
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.DateType": {}
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamp with time zone",
@ -1268,7 +1330,7 @@
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.DateType": {}
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamp with time zone",
@ -1417,7 +1479,7 @@
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.DateType": {}
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamp with time zone",
@ -1611,7 +1673,7 @@
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.DateType": {}
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamp with time zone",
@ -1805,7 +1867,7 @@
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.DateType": {}
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamp with time zone",
@ -1924,7 +1986,7 @@
"description": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.DateType": {}
"com.linkedin.pegasus2avro.schema.TimeType": {}
}
},
"nativeDataType": "timestamp with time zone",

View File

@ -46,6 +46,6 @@ def test_feast_ingest(docker_compose_runner, pytestconfig, tmp_path):
# Verify the output.
output = mce_helpers.load_json_file(str(tmp_path / "feast_mces.json"))
golden = mce_helpers.load_json_file(
str(test_resources_dir / "feast_mce_golden.json")
str(test_resources_dir / "feast_mces_golden.json")
)
mce_helpers.assert_mces_equal(output, golden)

View File

@ -44,6 +44,6 @@ def test_ldap_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
output = mce_helpers.load_json_file(str(tmp_path / "ldap_mces.json"))
golden = mce_helpers.load_json_file(
str(test_resources_dir / "ldap_mce_golden.json")
str(test_resources_dir / "ldap_mces_golden.json")
)
mce_helpers.assert_mces_equal(output, golden)

View File

@ -40,6 +40,6 @@ def test_mongodb_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time
# Verify the output.
output = mce_helpers.load_json_file(str(tmp_path / "mongodb_mces.json"))
golden = mce_helpers.load_json_file(
str(test_resources_dir / "mongodb_mce_golden.json")
str(test_resources_dir / "mongodb_mces_golden.json")
)
mce_helpers.assert_mces_equal(output, golden)

View File

@ -26,6 +26,6 @@ def test_mysql_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
# Verify the output.
golden = mce_helpers.load_json_file(
str(test_resources_dir / "mysql_mce_golden.json")
str(test_resources_dir / "mysql_mces_golden.json")
)
mce_helpers.assert_mces_equal(output, golden)

View File

@ -41,6 +41,6 @@ def test_mssql_ingest(docker_compose_runner, pytestconfig, tmp_path, mock_time):
# Verify the output.
golden = mce_helpers.load_json_file(
str(test_resources_dir / "mssql_mce_golden.json")
str(test_resources_dir / "mssql_mces_golden.json")
)
mce_helpers.assert_mces_equal(output, golden)