feat(ingest): mysql - support multiple database in single recipe (#5684)

This commit is contained in:
Mugdha Hardikar 2022-08-26 23:17:49 +05:30 committed by GitHub
parent fe52df6bab
commit e448bb8832
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 2755 additions and 14 deletions

View File

@ -179,6 +179,9 @@ class AllowDenyPattern(ConfigModel):
assert self.is_fully_specified_allow_list()
return [a for a in self.allow if self.allowed(a)]
def __eq__(self, other): # type: ignore
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
class KeyValuePattern(ConfigModel):
"""A class to store allow deny regexes"""

View File

@ -13,11 +13,13 @@ from datahub.ingestion.api.decorators import (
support_status,
)
from datahub.ingestion.source.sql.sql_common import (
BasicSQLAlchemyConfig,
SQLAlchemySource,
make_sqlalchemy_type,
register_custom_type,
)
from datahub.ingestion.source.sql.two_tier_sql_source import (
TwoTierSQLAlchemyConfig,
TwoTierSQLAlchemySource,
)
GEOMETRY = make_sqlalchemy_type("GEOMETRY")
POINT = make_sqlalchemy_type("POINT")
@ -38,7 +40,7 @@ base.ischema_names["polygon"] = POLYGON
base.ischema_names["decimal128"] = DECIMAL128
class MySQLConfig(BasicSQLAlchemyConfig):
class MySQLConfig(TwoTierSQLAlchemyConfig):
# defaults
host_port = Field(default="localhost:3306", description="MySQL host URL.")
scheme = "mysql+pymysql"
@ -58,7 +60,7 @@ class MySQLConfig(BasicSQLAlchemyConfig):
@capability(SourceCapability.DOMAINS, "Supported via the `domain` config field")
@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration")
@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion")
class MySQLSource(SQLAlchemySource):
class MySQLSource(TwoTierSQLAlchemySource):
"""
This plugin extracts the following:

View File

@ -712,6 +712,16 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
self.report.report_workunit(wu)
yield wu
def get_allowed_schemas(self, inspector: Inspector, db_name: str) -> Iterable[str]:
# this function returns the schema names which are filtered by schema_pattern.
for schema in self.get_schema_names(inspector):
if not self.config.schema_pattern.allowed(schema):
self.report.report_dropped(f"{schema}.*")
continue
else:
self.add_information_for_schema(inspector, schema)
yield schema
def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
sql_config = self.config
if logger.isEnabledFor(logging.DEBUG):
@ -734,10 +744,7 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
db_name = self.get_db_name(inspector)
yield from self.gen_database_containers(db_name)
for schema in self.get_schema_names(inspector):
if not sql_config.schema_pattern.allowed(schema):
self.report.report_dropped(f"{schema}.*")
continue
for schema in self.get_allowed_schemas(inspector, db_name):
self.add_information_for_schema(inspector, schema)
yield from self.gen_schema_containers(schema, db_name)
@ -1300,12 +1307,15 @@ class SQLAlchemySource(StatefulIngestionSourceBase):
sql_config=sql_config,
)
def get_parent_container_key(self, db_name: str, schema: str) -> PlatformKey:
return self.gen_schema_key(db_name, schema)
def add_table_to_schema_container(
self, dataset_urn: str, db_name: str, schema: str
) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
schema_container_key = self.gen_schema_key(db_name, schema)
parent_container_key = self.get_parent_container_key(db_name, schema)
container_workunits = add_dataset_to_container(
container_key=schema_container_key,
container_key=parent_container_key,
dataset_urn=dataset_urn,
)
for wu in container_workunits:

View File

@ -0,0 +1,106 @@
import typing
from typing import Any, Dict
import pydantic
from pydantic.fields import Field
from sqlalchemy import create_engine, inspect
from sqlalchemy.engine.reflection import Inspector
from datahub.configuration.common import AllowDenyPattern
from datahub.emitter.mcp_builder import PlatformKey
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.sql_common import (
BasicSQLAlchemyConfig,
SQLAlchemySource,
logger,
make_sqlalchemy_uri,
)
class TwoTierSQLAlchemyConfig(BasicSQLAlchemyConfig):
database_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for databases to filter in ingestion.",
)
schema_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Deprecated in favour of database_pattern. Regex patterns for schemas to filter in ingestion. "
"Specify regex to only match the schema name. e.g. to match all tables in schema analytics, "
"use the regex 'analytics'",
)
@pydantic.root_validator()
def ensure_profiling_pattern_is_passed_to_profiling(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
allow_all_pattern = AllowDenyPattern.allow_all()
schema_pattern = values.get("schema_pattern")
database_pattern = values.get("database_pattern")
if (
database_pattern == allow_all_pattern
and schema_pattern != allow_all_pattern
):
logger.warning(
"Updating 'database_pattern' to 'schema_pattern'. Please stop using deprecated "
"'schema_pattern'. Use 'database_pattern' instead. "
)
values["database_pattern"] = schema_pattern
return values
def get_sql_alchemy_url(
self,
uri_opts: typing.Optional[typing.Dict[str, typing.Any]] = None,
current_db: typing.Optional[str] = None,
) -> str:
return self.sqlalchemy_uri or make_sqlalchemy_uri(
self.scheme, # type: ignore
self.username,
self.password.get_secret_value() if self.password else None,
self.host_port, # type: ignore
current_db if current_db else self.database,
uri_opts=uri_opts,
)
class TwoTierSQLAlchemySource(SQLAlchemySource):
def __init__(self, config, ctx, platform):
super().__init__(config, ctx, platform)
self.current_database = None
self.config: TwoTierSQLAlchemyConfig = config
def get_parent_container_key(self, db_name: str, schema: str) -> PlatformKey:
return self.gen_database_key(db_name)
def get_allowed_schemas(
self, inspector: Inspector, db_name: str
) -> typing.Iterable[str]:
# This method returns schema names but for 2 tier databases there is no schema layer at all hence passing
# dbName itself as an allowed schema
yield db_name
def get_inspectors(self):
# This method can be overridden in the case that you want to dynamically
# run on multiple databases.
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
engine = create_engine(url, **self.config.options)
with engine.connect() as conn:
inspector = inspect(conn)
if self.config.database and self.config.database != "":
databases = [self.config.database]
else:
databases = inspector.get_schema_names()
for db in databases:
if self.config.database_pattern.allowed(db):
url = self.config.get_sql_alchemy_url(current_db=db)
inspector = inspect(
create_engine(url, **self.config.options).connect()
)
self.current_database = db
yield inspector
def gen_schema_containers(
self, schema: str, db_name: str
) -> typing.Iterable[MetadataWorkUnit]:
return []

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,555 @@
[
{
"auditHeader": null,
"entityType": "container",
"entityUrn": "urn:li:container:dc2ae101b66746b9c2b6df8ee89ca88f",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"value": "{\"customProperties\": {\"platform\": \"mysql\", \"instance\": \"PROD\", \"database\": \"northwind\"}, \"name\": \"northwind\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "container",
"entityUrn": "urn:li:container:dc2ae101b66746b9c2b6df8ee89ca88f",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"value": "{\"platform\": \"urn:li:dataPlatform:mysql\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "container",
"entityUrn": "urn:li:container:dc2ae101b66746b9c2b6df8ee89ca88f",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"Database\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "container",
"entityUrn": "urn:li:container:dc2ae101b66746b9c2b6df8ee89ca88f",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "domains",
"aspect": {
"value": "{\"domains\": [\"urn:li:domain:sales\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.customers,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"value": "{\"container\": \"urn:li:container:dc2ae101b66746b9c2b6df8ee89ca88f\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.customers,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {},
"externalUrl": null,
"name": "customers",
"qualifiedName": null,
"description": null,
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "northwind.customers",
"platform": "urn:li:dataPlatform:mysql",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
},
"deleted": null,
"dataset": null,
"cluster": null,
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.MySqlDDL": {
"tableSchema": ""
}
},
"fields": [
{
"fieldPath": "id",
"jsonPath": null,
"nullable": false,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "INTEGER()",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": true,
"isPartitioningKey": null,
"jsonProps": null
},
{
"fieldPath": "company",
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(length=50)",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
},
{
"fieldPath": "last_name",
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(length=50)",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
},
{
"fieldPath": "first_name",
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(length=50)",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
},
{
"fieldPath": "email_address",
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(length=50)",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
},
{
"fieldPath": "priority",
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "FLOAT()",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
}
],
"primaryKeys": null,
"foreignKeysSpecs": null,
"foreignKeys": null
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.customers,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"table\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.customers,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "domains",
"aspect": {
"value": "{\"domains\": [\"urn:li:domain:sales\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.orders,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"value": "{\"container\": \"urn:li:container:dc2ae101b66746b9c2b6df8ee89ca88f\"}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.orders,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {},
"externalUrl": null,
"name": "orders",
"qualifiedName": null,
"description": null,
"uri": null,
"tags": []
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "northwind.orders",
"platform": "urn:li:dataPlatform:mysql",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
},
"deleted": null,
"dataset": null,
"cluster": null,
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.MySqlDDL": {
"tableSchema": ""
}
},
"fields": [
{
"fieldPath": "id",
"jsonPath": null,
"nullable": false,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "INTEGER()",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": true,
"isPartitioningKey": null,
"jsonProps": null
},
{
"fieldPath": "description",
"jsonPath": null,
"nullable": true,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(length=50)",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
},
{
"fieldPath": "customer_id",
"jsonPath": null,
"nullable": false,
"description": null,
"created": null,
"lastModified": null,
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.NumberType": {}
}
},
"nativeDataType": "INTEGER()",
"recursive": false,
"globalTags": null,
"glossaryTerms": null,
"isPartOfKey": false,
"isPartitioningKey": null,
"jsonProps": null
}
],
"primaryKeys": null,
"foreignKeysSpecs": null,
"foreignKeys": [
{
"name": "fk_order_customer",
"foreignFields": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.customers,PROD),id)"
],
"sourceFields": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.orders,PROD),customer_id)"
],
"foreignDataset": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.customers,PROD)"
}
]
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.orders,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"value": "{\"typeNames\": [\"table\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.orders,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "domains",
"aspect": {
"value": "{\"domains\": [\"urn:li:domain:sales\"]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.customers,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "datasetProfile",
"aspect": {
"value": "{\"timestampMillis\": 1586847600000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 5, \"columnCount\": 6, \"fieldProfiles\": [{\"fieldPath\": \"id\", \"uniqueCount\": 5, \"uniqueProportion\": 1.0, \"nullCount\": 0, \"nullProportion\": 0.0, \"sampleValues\": [\"1\", \"2\", \"3\", \"4\", \"5\"]}, {\"fieldPath\": \"company\", \"uniqueCount\": 5, \"uniqueProportion\": 1.0, \"nullCount\": 0, \"nullProportion\": 0.0, \"sampleValues\": [\"Company A\", \"Company B\", \"Company C\", \"Company D\", \"Company E\"]}, {\"fieldPath\": \"last_name\", \"uniqueCount\": 5, \"uniqueProportion\": 1.0, \"nullCount\": 0, \"nullProportion\": 0.0, \"sampleValues\": [\"Axen\", \"Bedecs\", \"Donnell\", \"Gratacos Solsona\", \"Lee\"]}, {\"fieldPath\": \"first_name\", \"uniqueCount\": 5, \"uniqueProportion\": 1.0, \"nullCount\": 0, \"nullProportion\": 0.0, \"sampleValues\": [\"Anna\", \"Antonio\", \"Christina\", \"Martin\", \"Thomas\"]}, {\"fieldPath\": \"email_address\", \"uniqueCount\": 0, \"nullCount\": 5, \"nullProportion\": 1.0, \"sampleValues\": []}, {\"fieldPath\": \"priority\", \"uniqueCount\": 3, \"uniqueProportion\": 0.75, \"nullCount\": 1, \"nullProportion\": 0.2, \"min\": \"3.8\", \"max\": \"4.9\", \"mean\": \"4.175000011920929\", \"median\": \"4.0\", \"distinctValueFrequencies\": [{\"value\": \"3.8\", \"frequency\": 1}, {\"value\": \"4.0\", \"frequency\": 2}, {\"value\": \"4.9\", \"frequency\": 1}], \"sampleValues\": [\"4.0\", \"4.9\", \"4.0\", \"3.8\"]}]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:mysql,northwind.orders,PROD)",
"entityKeyAspect": null,
"changeType": "UPSERT",
"aspectName": "datasetProfile",
"aspect": {
"value": "{\"timestampMillis\": 1586847600000, \"partitionSpec\": {\"type\": \"FULL_TABLE\", \"partition\": \"FULL_TABLE_SNAPSHOT\"}, \"rowCount\": 0, \"columnCount\": 3, \"fieldProfiles\": [{\"fieldPath\": \"id\", \"uniqueCount\": 0, \"nullCount\": 0, \"sampleValues\": []}, {\"fieldPath\": \"description\", \"uniqueCount\": 0, \"nullCount\": 0, \"sampleValues\": []}, {\"fieldPath\": \"customer_id\", \"uniqueCount\": 0, \"nullCount\": 0, \"sampleValues\": []}]}",
"contentType": "application/json"
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "mysql-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
}
]

View File

@ -5,7 +5,6 @@ source:
config:
username: root
password: example
database: metagalaxy
host_port: localhost:53307
schema_pattern:
allow:

View File

@ -0,0 +1,29 @@
run_id: mysql-test
source:
type: mysql
config:
username: root
password: example
host_port: localhost:53307
database: northwind
profiling:
enabled: True
include_field_null_count: true
include_field_min_value: true
include_field_max_value: true
include_field_mean_value: true
include_field_median_value: true
include_field_stddev_value: true
include_field_quantiles: true
include_field_distinct_value_frequencies: true
include_field_histogram: true
include_field_sample_values: true
domain:
"urn:li:domain:sales":
allow:
- "^northwind"
sink:
type: file
config:
filename: "./mysql_mces.json"

View File

@ -44,11 +44,11 @@ def mysql_runner(docker_compose_runner, pytestconfig, test_resources_dir):
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_mysql_ingest(
def test_mysql_ingest_no_db(
mysql_runner, pytestconfig, test_resources_dir, tmp_path, mock_time
):
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "mysql_to_file.yml").resolve()
config_file = (test_resources_dir / "mysql_to_file_no_db.yml").resolve()
run_datahub_cmd(
["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path
)
@ -57,7 +57,26 @@ def test_mysql_ingest(
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "mysql_mces.json",
golden_path=test_resources_dir / "mysql_mces_golden.json",
golden_path=test_resources_dir / "mysql_mces_no_db_golden.json",
)
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_mysql_ingest_with_db(
mysql_runner, pytestconfig, test_resources_dir, tmp_path, mock_time
):
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "mysql_to_file_with_db.yml").resolve()
run_datahub_cmd(
["ingest", "--strict-warnings", "-c", f"{config_file}"], tmp_path=tmp_path
)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / "mysql_mces.json",
golden_path=test_resources_dir / "mysql_mces_with_db_golden.json",
)