fix(ingest): various avro codegen fixes (#2232)

This commit is contained in:
Harshal Sheth 2021-03-15 15:27:30 -07:00 committed by GitHub
parent d895202d52
commit aa6bc15cd7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 4133 additions and 40436 deletions

View File

@ -4,6 +4,7 @@ output
src/datahub/metadata/ src/datahub/metadata/
pvenv36/ pvenv36/
bq_credentials.json bq_credentials.json
/tmp
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

File diff suppressed because it is too large Load Diff

View File

@ -107,7 +107,7 @@ def create_ownership_aspect_mce(directive: Directive) -> MetadataChangeEventClas
owners=[ owners=[
OwnerClass( OwnerClass(
owner=owner_name_to_urn(clean_owner_name(owner)), owner=owner_name_to_urn(clean_owner_name(owner)),
type=OwnershipTypeClass.DATAOWNER, # type: ignore type=OwnershipTypeClass.DATAOWNER,
) )
for owner in directive.owners for owner in directive.owners
], ],
@ -130,7 +130,7 @@ def create_lineage_aspect_mce(directive: Directive) -> MetadataChangeEventClass:
upstreams=[ upstreams=[
UpstreamClass( UpstreamClass(
dataset=dataset_name_to_urn(upstream), dataset=dataset_name_to_urn(upstream),
type=DatasetLineageTypeClass.TRANSFORMED, # type: ignore type=DatasetLineageTypeClass.TRANSFORMED,
auditStamp=AuditStampClass( auditStamp=AuditStampClass(
time=int(time.time() * 1000), time=int(time.time() * 1000),
actor="urn:li:corpuser:datahub", actor="urn:li:corpuser:datahub",

View File

@ -0,0 +1,17 @@
#!/bin/bash
set -euxo pipefail
# We allow for failures in this step. Usually you'll be invoking this
# script to fix a build failure.
pytest --basetemp=tmp || true
# Update the golden files.
cp tmp/test_serde_large0/output.json tests/unit/serde/test_serde_large.json
cp tmp/test_ldap_ingest0/ldap_mces.json tests/integration/ldap/ldap_mce_golden.json
cp tmp/test_mysql_ingest0/mysql_mces.json tests/integration/mysql/mysql_mce_golden.json
cp tmp/test_mssql_ingest0/mssql_mces.json tests/integration/sql_server/mssql_mce_golden.json
# Print success message.
set +x
echo ''
echo 'Make sure to check `git diff` to verify the changes!'

View File

@ -37,7 +37,7 @@ framework_common = {
"click>=7.1.1", "click>=7.1.1",
"pyyaml>=5.4.1", "pyyaml>=5.4.1",
"toml>=0.10.0", "toml>=0.10.0",
"avro-gen3==0.3.2", "avro-gen3==0.3.3",
"avro-python3>=1.8.2", "avro-python3>=1.8.2",
} }

View File

@ -10,6 +10,7 @@ from datahub.configuration.toml import TomlConfigurationMechanism
from datahub.configuration.yaml import YamlConfigurationMechanism from datahub.configuration.yaml import YamlConfigurationMechanism
from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.sink.sink_registry import sink_registry from datahub.ingestion.sink.sink_registry import sink_registry
from datahub.ingestion.source.mce_file import check_mce_file
from datahub.ingestion.source.source_registry import source_registry from datahub.ingestion.source.source_registry import source_registry
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -37,7 +38,7 @@ def datahub():
@click.option( @click.option(
"-c", "-c",
"--config", "--config",
type=click.Path(exists=True), type=click.Path(exists=True, dir_okay=False),
help="Config file in .toml or .yaml format", help="Config file in .toml or .yaml format",
required=True, required=True,
) )
@ -77,6 +78,8 @@ def ingest(config: str):
@datahub.command(context_settings=DEFAULT_CONTEXT_SETTINGS) @datahub.command(context_settings=DEFAULT_CONTEXT_SETTINGS)
def ingest_list_plugins(): def ingest_list_plugins():
"""List enabled ingestion plugins"""
click.secho("Sources:", bold=True) click.secho("Sources:", bold=True)
click.echo(str(source_registry)) click.echo(str(source_registry))
click.echo() click.echo()
@ -84,3 +87,17 @@ def ingest_list_plugins():
click.echo(str(sink_registry)) click.echo(str(sink_registry))
click.echo() click.echo()
click.echo('If a plugin is disabled, try running: pip install ".[<plugin>]"') click.echo('If a plugin is disabled, try running: pip install ".[<plugin>]"')
@datahub.group()
def check():
pass
@check.command()
@click.argument("json-file", type=click.Path(exists=True, dir_okay=False))
def mce_file(json_file: str):
"""Check the schema of a MCE JSON file"""
report = check_mce_file(json_file)
click.echo(report)

View File

@ -14,7 +14,9 @@ class WorkUnitMCEExtractor(Extractor):
def get_records(self, workunit) -> Iterable[RecordEnvelope[MetadataChangeEvent]]: def get_records(self, workunit) -> Iterable[RecordEnvelope[MetadataChangeEvent]]:
if len(workunit.mce.proposedSnapshot.aspects) == 0: if len(workunit.mce.proposedSnapshot.aspects) == 0:
raise AttributeError('every mce must have at least one aspect') raise AttributeError("every mce must have at least one aspect")
if not workunit.mce.validate():
raise ValueError(f"source produced an invalid MCE: {workunit.mce}")
yield RecordEnvelope(workunit.mce, {}) yield RecordEnvelope(workunit.mce, {})
def close(self): def close(self):

View File

@ -41,7 +41,7 @@ _field_type_mapping = {
def _get_column_type(field_type) -> SchemaFieldDataType: def _get_column_type(field_type) -> SchemaFieldDataType:
tp = field_type tp = field_type
if hasattr(tp, 'type'): if hasattr(tp, "type"):
tp = tp.type tp = tp.type
tp = str(tp) tp = str(tp)
TypeClass: Any = _field_type_mapping.get(tp) TypeClass: Any = _field_type_mapping.get(tp)
@ -56,7 +56,7 @@ def avro_schema_to_mce_fields(avro_schema_string: str) -> List[SchemaField]:
"""Converts an avro schema into a schema compatible with MCE""" """Converts an avro schema into a schema compatible with MCE"""
# Handle some library compatability issues. # Handle some library compatability issues.
if hasattr(avro.schema, 'parse'): if hasattr(avro.schema, "parse"):
schema_parse_fn = avro.schema.parse schema_parse_fn = avro.schema.parse
else: else:
schema_parse_fn = avro.schema.Parse schema_parse_fn = avro.schema.Parse
@ -69,7 +69,9 @@ def avro_schema_to_mce_fields(avro_schema_string: str) -> List[SchemaField]:
fieldPath=parsed_field.name, fieldPath=parsed_field.name,
nativeDataType=str(parsed_field.type), nativeDataType=str(parsed_field.type),
type=_get_column_type(parsed_field.type), type=_get_column_type(parsed_field.type),
description=parsed_field.props.get('doc', None), description=parsed_field.props.get("doc", None),
recursive=False,
nullable=(parsed_field.type == "null"),
) )
fields.append(field) fields.append(field)

View File

@ -92,6 +92,7 @@ class KafkaSource(Source):
metadata_record = MetadataChangeEvent() metadata_record = MetadataChangeEvent()
dataset_snapshot = DatasetSnapshot( dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{env})", urn=f"urn:li:dataset:(urn:li:dataPlatform:{platform},{dataset_name},{env})",
aspects=[], # we append to this list later on
) )
dataset_snapshot.aspects.append(Status(removed=False)) dataset_snapshot.aspects.append(Status(removed=False))
metadata_record.proposedSnapshot = dataset_snapshot metadata_record.proposedSnapshot = dataset_snapshot

View File

@ -30,6 +30,8 @@ class MetadataFileSource(Source):
for i, obj in enumerate(mce_obj_list): for i, obj in enumerate(mce_obj_list):
mce: MetadataChangeEvent = MetadataChangeEvent.from_obj(obj) mce: MetadataChangeEvent = MetadataChangeEvent.from_obj(obj)
if not mce.validate():
raise ValueError(f"failed to parse into valid MCE: {obj}")
wu = MetadataWorkUnit(f"file://{self.config.filename}:{i}", mce) wu = MetadataWorkUnit(f"file://{self.config.filename}:{i}", mce)
self.report.report_workunit(wu) self.report.report_workunit(wu)
yield wu yield wu
@ -39,3 +41,10 @@ class MetadataFileSource(Source):
def close(self): def close(self):
pass pass
def check_mce_file(filepath: str) -> str:
mce_source = MetadataFileSource.create({"filename": filepath}, None)
for _ in mce_source.get_workunits():
pass
return f"{mce_source.get_report().workunits_produced} MCEs found - all valid"

View File

@ -162,6 +162,7 @@ def get_schema_metadata(
type=get_column_type(sql_report, dataset_name, column["type"]), type=get_column_type(sql_report, dataset_name, column["type"]),
description=column.get("comment", None), description=column.get("comment", None),
nullable=column["nullable"], nullable=column["nullable"],
recursive=False,
) )
canonical_schema.append(field) canonical_schema.append(field)
@ -229,6 +230,8 @@ class SQLAlchemySource(Source):
if description is not None: if description is not None:
dataset_properties = DatasetPropertiesClass( dataset_properties = DatasetPropertiesClass(
description=description, description=description,
tags=[],
customProperties={},
# uri=dataset_name, # uri=dataset_name,
) )
dataset_snapshot.aspects.append(dataset_properties) dataset_snapshot.aspects.append(dataset_properties)

View File

@ -8,25 +8,15 @@
{ {
"com.linkedin.pegasus2avro.identity.CorpUserInfo": { "com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true, "active": true,
"displayName": { "displayName": "Bart Simpson",
"string": "Bart Simpson"
},
"email": "", "email": "",
"title": { "title": "Mr. Boss",
"string": "Mr. Boss"
},
"managerUrn": null, "managerUrn": null,
"departmentId": null, "departmentId": null,
"departmentName": null, "departmentName": null,
"firstName": { "firstName": "Bart",
"string": "Bart" "lastName": "Simpson",
}, "fullName": "Bart Simpson",
"lastName": {
"string": "Simpson"
},
"fullName": {
"string": "Bart Simpson"
},
"countryCode": null "countryCode": null
} }
} }
@ -44,29 +34,15 @@
{ {
"com.linkedin.pegasus2avro.identity.CorpUserInfo": { "com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true, "active": true,
"displayName": { "displayName": "Homer Simpson",
"string": "Homer Simpson"
},
"email": "hsimpson", "email": "hsimpson",
"title": { "title": "Mr. Everything",
"string": "Mr. Everything" "managerUrn": "urn:li:corpuser:bsimpson",
},
"managerUrn": {
"string": "urn:li:corpuser:bsimpson"
},
"departmentId": null, "departmentId": null,
"departmentName": { "departmentName": "1001",
"string": "1001" "firstName": "Homer",
}, "lastName": "Simpson",
"firstName": { "fullName": "Homer Simpson",
"string": "Homer"
},
"lastName": {
"string": "Simpson"
},
"fullName": {
"string": "Homer Simpson"
},
"countryCode": null "countryCode": null
} }
} }
@ -84,23 +60,15 @@
{ {
"com.linkedin.pegasus2avro.identity.CorpUserInfo": { "com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true, "active": true,
"displayName": { "displayName": "Lisa Simpson",
"string": "Lisa Simpson"
},
"email": "", "email": "",
"title": null, "title": null,
"managerUrn": null, "managerUrn": null,
"departmentId": null, "departmentId": null,
"departmentName": null, "departmentName": null,
"firstName": { "firstName": "Lisa",
"string": "Lisa" "lastName": "Simpson",
}, "fullName": "Lisa Simpson",
"lastName": {
"string": "Simpson"
},
"fullName": {
"string": "Lisa Simpson"
},
"countryCode": null "countryCode": null
} }
} }
@ -118,23 +86,15 @@
{ {
"com.linkedin.pegasus2avro.identity.CorpUserInfo": { "com.linkedin.pegasus2avro.identity.CorpUserInfo": {
"active": true, "active": true,
"displayName": { "displayName": "Maggie Simpson",
"string": "Maggie Simpson"
},
"email": "", "email": "",
"title": null, "title": null,
"managerUrn": null, "managerUrn": null,
"departmentId": null, "departmentId": null,
"departmentName": null, "departmentName": null,
"firstName": { "firstName": "Maggie",
"string": "Maggie" "lastName": "Simpson",
}, "fullName": "Maggie Simpson",
"lastName": {
"string": "Simpson"
},
"fullName": {
"string": "Maggie Simpson"
},
"countryCode": null "countryCode": null
} }
} }

File diff suppressed because it is too large Load Diff

View File

@ -7,8 +7,11 @@ source:
password: example password: example
database: metagalaxy database: metagalaxy
host_port: localhost:53306 host_port: localhost:53306
schema_pattern:
allow:
- "^metagalaxy"
sink: sink:
type: file type: file
config: config:
filename: './mysql_mces.json' filename: "./mysql_mces.json"

View File

@ -29,6 +29,6 @@ def test_mssql_ingest(sql_server, pytestconfig, tmp_path, mock_time):
# Verify the output. # Verify the output.
golden = mce_helpers.load_json_file( golden = mce_helpers.load_json_file(
str(test_resources_dir / "mssql_mces_golden.json") str(test_resources_dir / "mssql_mce_golden.json")
) )
mce_helpers.assert_mces_equal(output, golden) mce_helpers.assert_mces_equal(output, golden)

View File

@ -1,5 +1,7 @@
import mce_helpers import mce_helpers
from click.testing import CliRunner
from datahub.entrypoints import datahub
from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.run.pipeline import Pipeline
@ -24,3 +26,23 @@ def test_serde_large(pytestconfig, tmp_path):
output = mce_helpers.load_json_file(tmp_path / output_filename) output = mce_helpers.load_json_file(tmp_path / output_filename)
golden = mce_helpers.load_json_file(golden_file) golden = mce_helpers.load_json_file(golden_file)
mce_helpers.assert_mces_equal(output, golden) mce_helpers.assert_mces_equal(output, golden)
def test_check_mce_schema(pytestconfig):
json_filename = "test_serde_large.json"
test_resources_dir = pytestconfig.rootpath / "tests/unit/serde"
json_file_path = test_resources_dir / json_filename
runner = CliRunner()
result = runner.invoke(datahub, ["check", "mce-file", f"{json_file_path}"])
assert result.exit_code == 0
def test_reader_allows_verbose_unions(pytestconfig):
json_filename = "test_serde_backwards_compat.json"
test_resources_dir = pytestconfig.rootpath / "tests/unit/serde"
json_file_path = test_resources_dir / json_filename
runner = CliRunner()
result = runner.invoke(datahub, ["check", "mce-file", f"{json_file_path}"])
assert result.exit_code == 0

View File

@ -0,0 +1,229 @@
[
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,bigquery-public-data.covid19_geotab_mobility_impact.airport_traffic,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"description": {
"string": "This dataset shows traffic to and from the Airport as a Percentage of the Traffic volume during the baseline period. The baseline period used for computing this metric is from 1st Feb to 15th March 2020. The dataset gets updated daily."
},
"uri": null,
"tags": [],
"customProperties": {}
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "bigquery-public-data.covid19_geotab_mobility_impact.airport_traffic",
"platform": "urn:li:dataPlatform:bigquery",
"version": 0,
"created": {
"time": 1615444202056,
"actor": "urn:li:corpuser:etl",
"impersonator": null
},
"lastModified": {
"time": 1615444202056,
"actor": "urn:li:corpuser:etl",
"impersonator": null
},
"deleted": null,
"dataset": null,
"cluster": null,
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.MySqlDDL": {
"tableSchema": ""
}
},
"fields": [
{
"fieldPath": "aggregation_method",
"jsonPath": null,
"nullable": true,
"description": {
"string": "Aggregation period used to compute this metric"
},
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "String()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "date",
"jsonPath": null,
"nullable": true,
"description": {
"string": "Date of the data"
},
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.DateType": {}
}
},
"nativeDataType": "DATE()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "version",
"jsonPath": null,
"nullable": true,
"description": {
"string": "Version of the table"
},
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "String()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "airport_name",
"jsonPath": null,
"nullable": true,
"description": {
"string": "Aggregation period used to compute this metric"
},
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "String()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "percent_of_baseline",
"jsonPath": null,
"nullable": true,
"description": {
"string": "Proportion of trips on this date as compared to Avg number of trips on the same day of week in baseline period i.e 1st February 2020 - 15th March 2020"
},
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "Float()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "center_point_geom",
"jsonPath": null,
"nullable": true,
"description": {
"string": "Geographic representation of the centroid of the Airport polygon"
},
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NullType": {}
}
},
"nativeDataType": "NullType()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "city",
"jsonPath": null,
"nullable": true,
"description": {
"string": "City within which the Airport is located"
},
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "String()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "state_region",
"jsonPath": null,
"nullable": true,
"description": {
"string": "State within which the Airport is located"
},
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "String()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "country_iso_code_2",
"jsonPath": null,
"nullable": true,
"description": {
"string": "ISO 3166-2 code representing the county and subdivision within which the Airport is located"
},
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "String()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "country_name",
"jsonPath": null,
"nullable": true,
"description": {
"string": "Full text name of the country within which the Airport is located"
},
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "String()",
"recursive": false,
"globalTags": null
},
{
"fieldPath": "airport_geom",
"jsonPath": null,
"nullable": true,
"description": {
"string": "Geographic representation of the Airport polygon"
},
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NullType": {}
}
},
"nativeDataType": "NullType()",
"recursive": false,
"globalTags": null
}
],
"primaryKeys": null,
"foreignKeysSpecs": null
}
}
]
}
},
"proposedDelta": null
}
]

File diff suppressed because it is too large Load Diff