fix(ingest/hive): fix containers generation for hive (#7926)

This commit is contained in:
Mayuri Nehate 2023-05-02 18:37:51 +05:30 committed by GitHub
parent 4e9c398e1d
commit a711baa131
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1568 additions and 138 deletions

View File

@ -19,11 +19,11 @@ from datahub.ingestion.api.decorators import (
support_status,
)
from datahub.ingestion.extractor import schema_util
from datahub.ingestion.source.sql.sql_common import (
SQLAlchemySource,
register_custom_type,
from datahub.ingestion.source.sql.sql_common import register_custom_type
from datahub.ingestion.source.sql.two_tier_sql_source import (
TwoTierSQLAlchemyConfig,
TwoTierSQLAlchemySource,
)
from datahub.ingestion.source.sql.sql_config import BasicSQLAlchemyConfig
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
DateTypeClass,
NullTypeClass,
@ -90,7 +90,7 @@ except Exception as e:
logger.warning(f"Failed to patch method due to {e}")
class HiveConfig(BasicSQLAlchemyConfig):
class HiveConfig(TwoTierSQLAlchemyConfig):
# defaults
scheme = Field(default="hive", hidden_from_docs=True)
@ -113,7 +113,7 @@ class HiveConfig(BasicSQLAlchemyConfig):
@support_status(SupportStatus.CERTIFIED)
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.DOMAINS, "Supported via the `domain` config field")
class HiveSource(SQLAlchemySource):
class HiveSource(TwoTierSQLAlchemySource):
"""
This plugin extracts the following:

File diff suppressed because it is too large Load Diff

View File

@ -5,8 +5,14 @@
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"value": "{\"customProperties\": {\"platform\": \"hive\", \"instance\": \"PROD\", \"database\": \"db1\"}, \"name\": \"db1\"}",
"contentType": "application/json"
"json": {
"customProperties": {
"platform": "hive",
"instance": "PROD",
"database": "db1"
},
"name": "db1"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
@ -19,8 +25,9 @@
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"value": "{\"removed\": false}",
"contentType": "application/json"
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
@ -33,8 +40,9 @@
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:hive\"}",
"contentType": "application/json"
"json": {
"platform": "urn:li:dataPlatform:hive"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
@ -47,78 +55,11 @@
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"Database\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "hive-test"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:d4ee8f0f53fee8e83d4188d0497bfe37",
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"value": "{\"customProperties\": {\"platform\": \"hive\", \"instance\": \"PROD\", \"database\": \"db1\", \"schema\": \"db1\"}, \"name\": \"db1\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "hive-test"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:d4ee8f0f53fee8e83d4188d0497bfe37",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"value": "{\"removed\": false}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "hive-test"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:d4ee8f0f53fee8e83d4188d0497bfe37",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:hive\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "hive-test"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:d4ee8f0f53fee8e83d4188d0497bfe37",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"Schema\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "hive-test"
}
},
{
"entityType": "container",
"entityUrn": "urn:li:container:d4ee8f0f53fee8e83d4188d0497bfe37",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"value": "{\"container\": \"urn:li:container:ded36d15fcfbbb939830549697122661\"}",
"contentType": "application/json"
"json": {
"typeNames": [
"Database"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
@ -131,8 +72,9 @@
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"value": "{\"container\": \"urn:li:container:d4ee8f0f53fee8e83d4188d0497bfe37\"}",
"contentType": "application/json"
"json": {
"container": "urn:li:container:ded36d15fcfbbb939830549697122661"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
@ -154,7 +96,7 @@
"customProperties": {
"Database:": "db1",
"Owner:": "root",
"CreateTime:": "Sat Oct 08 01:09:04 UTC 2022",
"CreateTime:": "Fri Apr 28 12:04:49 UTC 2023",
"LastAccessTime:": "UNKNOWN",
"Retention:": "0",
"Location:": "hdfs://namenode:8020/user/hive/warehouse/db1.db/_test_table_underscore",
@ -164,7 +106,7 @@
"Table Parameters: numRows": "0",
"Table Parameters: rawDataSize": "0",
"Table Parameters: totalSize": "0",
"Table Parameters: transient_lastDdlTime": "1665191344",
"Table Parameters: transient_lastDdlTime": "1682683489",
"SerDe Library:": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"InputFormat:": "org.apache.hadoop.mapred.TextInputFormat",
"OutputFormat:": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
@ -239,8 +181,11 @@
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"Table\"]}",
"contentType": "application/json"
"json": {
"typeNames": [
"Table"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
@ -253,8 +198,9 @@
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"value": "{\"container\": \"urn:li:container:d4ee8f0f53fee8e83d4188d0497bfe37\"}",
"contentType": "application/json"
"json": {
"container": "urn:li:container:ded36d15fcfbbb939830549697122661"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
@ -276,7 +222,7 @@
"customProperties": {
"Database:": "db1",
"Owner:": "root",
"CreateTime:": "Sat Oct 08 01:09:05 UTC 2022",
"CreateTime:": "Fri Apr 28 12:04:49 UTC 2023",
"LastAccessTime:": "UNKNOWN",
"Retention:": "0",
"Location:": "hdfs://namenode:8020/user/hive/warehouse/db1.db/array_struct_test",
@ -286,7 +232,7 @@
"Table Parameters: numRows": "1",
"Table Parameters: rawDataSize": "32",
"Table Parameters: totalSize": "33",
"Table Parameters: transient_lastDdlTime": "1665191650",
"Table Parameters: transient_lastDdlTime": "1682683491",
"SerDe Library:": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"InputFormat:": "org.apache.hadoop.mapred.TextInputFormat",
"OutputFormat:": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
@ -396,8 +342,11 @@
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"Table\"]}",
"contentType": "application/json"
"json": {
"typeNames": [
"Table"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
@ -410,8 +359,9 @@
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"value": "{\"container\": \"urn:li:container:d4ee8f0f53fee8e83d4188d0497bfe37\"}",
"contentType": "application/json"
"json": {
"container": "urn:li:container:ded36d15fcfbbb939830549697122661"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
@ -433,7 +383,7 @@
"customProperties": {
"Database:": "db1",
"Owner:": "root",
"CreateTime:": "Sat Oct 08 01:14:11 UTC 2022",
"CreateTime:": "Fri Apr 28 12:04:51 UTC 2023",
"LastAccessTime:": "UNKNOWN",
"Retention:": "0",
"Location:": "hdfs://namenode:8020/user/hive/warehouse/db1.db/map_test",
@ -443,7 +393,7 @@
"Table Parameters: numRows": "0",
"Table Parameters: rawDataSize": "0",
"Table Parameters: totalSize": "0",
"Table Parameters: transient_lastDdlTime": "1665191651",
"Table Parameters: transient_lastDdlTime": "1682683491",
"SerDe Library:": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"InputFormat:": "org.apache.hadoop.mapred.TextInputFormat",
"OutputFormat:": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
@ -522,8 +472,11 @@
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"Table\"]}",
"contentType": "application/json"
"json": {
"typeNames": [
"Table"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
@ -536,8 +489,9 @@
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"value": "{\"container\": \"urn:li:container:d4ee8f0f53fee8e83d4188d0497bfe37\"}",
"contentType": "application/json"
"json": {
"container": "urn:li:container:ded36d15fcfbbb939830549697122661"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
@ -559,7 +513,7 @@
"customProperties": {
"Database:": "db1",
"Owner:": "root",
"CreateTime:": "Sat Oct 08 01:14:11 UTC 2022",
"CreateTime:": "Fri Apr 28 12:04:51 UTC 2023",
"LastAccessTime:": "UNKNOWN",
"Retention:": "0",
"Location:": "hdfs://namenode:8020/user/hive/warehouse/db1.db/nested_struct_test",
@ -569,7 +523,7 @@
"Table Parameters: numRows": "0",
"Table Parameters: rawDataSize": "0",
"Table Parameters: totalSize": "0",
"Table Parameters: transient_lastDdlTime": "1665191651",
"Table Parameters: transient_lastDdlTime": "1682683491",
"SerDe Library:": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"InputFormat:": "org.apache.hadoop.mapred.TextInputFormat",
"OutputFormat:": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
@ -697,8 +651,11 @@
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"Table\"]}",
"contentType": "application/json"
"json": {
"typeNames": [
"Table"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
@ -711,8 +668,9 @@
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"value": "{\"container\": \"urn:li:container:d4ee8f0f53fee8e83d4188d0497bfe37\"}",
"contentType": "application/json"
"json": {
"container": "urn:li:container:ded36d15fcfbbb939830549697122661"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
@ -734,7 +692,7 @@
"customProperties": {
"Database:": "db1",
"Owner:": "root",
"CreateTime:": "Sat Oct 08 01:08:44 UTC 2022",
"CreateTime:": "Fri Apr 28 12:04:47 UTC 2023",
"LastAccessTime:": "UNKNOWN",
"Retention:": "0",
"Location:": "hdfs://namenode:8020/user/hive/warehouse/db1.db/pokes",
@ -743,7 +701,7 @@
"Table Parameters: numRows": "0",
"Table Parameters: rawDataSize": "0",
"Table Parameters: totalSize": "5812",
"Table Parameters: transient_lastDdlTime": "1665191333",
"Table Parameters: transient_lastDdlTime": "1682683488",
"SerDe Library:": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"InputFormat:": "org.apache.hadoop.mapred.TextInputFormat",
"OutputFormat:": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
@ -818,8 +776,11 @@
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"Table\"]}",
"contentType": "application/json"
"json": {
"typeNames": [
"Table"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
@ -832,8 +793,9 @@
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"value": "{\"container\": \"urn:li:container:d4ee8f0f53fee8e83d4188d0497bfe37\"}",
"contentType": "application/json"
"json": {
"container": "urn:li:container:ded36d15fcfbbb939830549697122661"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
@ -855,7 +817,7 @@
"customProperties": {
"Database:": "db1",
"Owner:": "root",
"CreateTime:": "Sat Oct 08 01:09:04 UTC 2022",
"CreateTime:": "Fri Apr 28 12:04:49 UTC 2023",
"LastAccessTime:": "UNKNOWN",
"Retention:": "0",
"Location:": "hdfs://namenode:8020/user/hive/warehouse/db1.db/struct_test",
@ -865,7 +827,7 @@
"Table Parameters: numRows": "0",
"Table Parameters: rawDataSize": "0",
"Table Parameters: totalSize": "0",
"Table Parameters: transient_lastDdlTime": "1665191344",
"Table Parameters: transient_lastDdlTime": "1682683489",
"SerDe Library:": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"InputFormat:": "org.apache.hadoop.mapred.TextInputFormat",
"OutputFormat:": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
@ -971,8 +933,11 @@
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"Table\"]}",
"contentType": "application/json"
"json": {
"typeNames": [
"Table"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
@ -985,8 +950,9 @@
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"value": "{\"container\": \"urn:li:container:d4ee8f0f53fee8e83d4188d0497bfe37\"}",
"contentType": "application/json"
"json": {
"container": "urn:li:container:ded36d15fcfbbb939830549697122661"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
@ -1008,7 +974,7 @@
"customProperties": {
"Database:": "db1",
"Owner:": "root",
"CreateTime:": "Sat Oct 08 01:14:11 UTC 2022",
"CreateTime:": "Fri Apr 28 12:04:51 UTC 2023",
"LastAccessTime:": "UNKNOWN",
"Retention:": "0",
"Location:": "hdfs://namenode:8020/user/hive/warehouse/db1.db/union_test",
@ -1018,7 +984,7 @@
"Table Parameters: numRows": "0",
"Table Parameters: rawDataSize": "0",
"Table Parameters: totalSize": "0",
"Table Parameters: transient_lastDdlTime": "1665191651",
"Table Parameters: transient_lastDdlTime": "1682683491",
"SerDe Library:": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"InputFormat:": "org.apache.hadoop.mapred.TextInputFormat",
"OutputFormat:": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
@ -1193,8 +1159,11 @@
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"Table\"]}",
"contentType": "application/json"
"json": {
"typeNames": [
"Table"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,

View File

@ -4,7 +4,7 @@ CREATE DATABASE IF NOT EXISTS db2;
CREATE TABLE IF NOT EXISTS db1.pokes (foo INT, bar STRING);
LOAD DATA LOCAL INPATH '/opt/hive/examples/files/kv1.txt' OVERWRITE INTO TABLE db1.pokes;
CREATE TABLE IF NOT EXISTS db2.pokes (foo INT, bar STRING, primary key(foo) DISABLE NOVALIDATE NORELY);
CREATE TABLE IF NOT EXISTS db2.pokes (foo INT, bar STRING, CONSTRAINT pk_1173723383_1683022998392_0 primary key(foo) DISABLE NOVALIDATE NORELY);
LOAD DATA LOCAL INPATH '/opt/hive/examples/files/kv1.txt' OVERWRITE INTO TABLE db2.pokes;
-- Setup a table with a special character.

View File

@ -5,8 +5,6 @@ import pytest
from freezegun import freeze_time
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.sink.file import FileSinkConfig
from datahub.ingestion.source.sql.hive import HiveConfig
from tests.test_helpers import mce_helpers
from tests.test_helpers.docker_helpers import wait_for_port
@ -37,18 +35,20 @@ def loaded_hive(hive_runner):
subprocess.run(command, shell=True, check=True)
def base_pipeline_config(events_file):
def base_pipeline_config(events_file, db=None):
return {
"run_id": "hive-test",
"source": {
"type": data_platform,
"config": HiveConfig(
scheme="hive", database="db1", host_port="localhost:10000"
).dict(),
"config": {
"scheme": "hive",
"database": db,
"host_port": "localhost:10000",
},
},
"sink": {
"type": "file",
"config": FileSinkConfig(filename=str(events_file)).dict(),
"config": {"filename": str(events_file)},
},
}
@ -61,6 +61,34 @@ def test_hive_ingest(
mce_out_file = "test_hive_ingest.json"
events_file = tmp_path / mce_out_file
# Run the metadata ingestion pipeline.
pipeline = Pipeline.create(base_pipeline_config(events_file, "db1"))
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=events_file,
golden_path=test_resources_dir / "hive_mces_golden.json",
ignore_paths=[
r"root\[\d+\]\['proposedSnapshot'\]\['com\.linkedin\.pegasus2avro\.metadata\.snapshot\.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com\.linkedin\.pegasus2avro\.dataset\.DatasetProperties'\]\['customProperties'\]\['.*Time.*'\]",
r"root\[6\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.schema.SchemaMetadata'\]\['fields'\]\[\d+\]\['nativeDataType'\]",
],
)
# Limitation - native data types for union does not show up as expected
@freeze_time(FROZEN_TIME)
@pytest.mark.integration_batch_1
def test_hive_ingest_all_db(
loaded_hive, pytestconfig, test_resources_dir, tmp_path, mock_time
):
mce_out_file = "test_hive_ingest.json"
events_file = tmp_path / mce_out_file
# Run the metadata ingestion pipeline.
pipeline = Pipeline.create(base_pipeline_config(events_file))
pipeline.run()
@ -71,10 +99,8 @@ def test_hive_ingest(
mce_helpers.check_golden_file(
pytestconfig,
output_path=events_file,
golden_path=test_resources_dir / "hive_mces_golden.json",
golden_path=test_resources_dir / "hive_mces_all_db_golden.json",
ignore_paths=[
# example: root[1]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][0]['com.linkedin.pegasus2avro.dataset.DatasetProperties']['customProperties']['CreateTime:']
# example: root[2]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][0]['com.linkedin.pegasus2avro.dataset.DatasetProperties']['customProperties']['Table Parameters: transient_lastDdlTime']
r"root\[\d+\]\['proposedSnapshot'\]\['com\.linkedin\.pegasus2avro\.metadata\.snapshot\.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com\.linkedin\.pegasus2avro\.dataset\.DatasetProperties'\]\['customProperties'\]\['.*Time.*'\]",
r"root\[6\]\['proposedSnapshot'\]\['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot'\]\['aspects'\]\[\d+\]\['com.linkedin.pegasus2avro.schema.SchemaMetadata'\]\['fields'\]\[\d+\]\['nativeDataType'\]",
],
@ -92,7 +118,7 @@ def test_hive_instance_check(loaded_hive, test_resources_dir, tmp_path, pytestco
mce_out_file = "test_hive_instance.json"
events_file = tmp_path / mce_out_file
pipeline_config = base_pipeline_config(events_file)
pipeline_config = base_pipeline_config(events_file, "db1")
pipeline_config["source"]["config"]["platform_instance"] = instance
pipeline = Pipeline.create(pipeline_config)