feat(ingest): various minor fixes (#2246)

This commit is contained in:
Harshal Sheth 2021-03-18 02:05:05 -04:00 committed by GitHub
parent c819289b53
commit b8462028c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 201 additions and 92 deletions

View File

@ -18,7 +18,7 @@ Before running any metadata ingestion job, you should make sure that DataHub bac
<!-- You can run this ingestion framework by building from source or by running docker images. -->
### Install
### Install from Source
#### Requirements
@ -76,7 +76,7 @@ pip install avro-python3
</details>
### Installing Plugins
#### Installing Plugins
We use a plugin architecture so that you can install only the dependencies you actually need.
@ -111,27 +111,28 @@ datahub ingest-list-plugins
[extra requirements]: https://www.python-ldap.org/en/python-ldap-3.3.0/installing.html#build-prerequisites
### Basic Usage
#### Basic Usage
```sh
pip install -e '.[datahub-rest]' # install the required plugin
datahub ingest -c ./examples/recipes/example_to_datahub_rest.yml
```
<!--
## Running Ingestion using Docker:
### Install using Docker
[![Docker Hub](https://img.shields.io/docker/pulls/linkedin/datahub-ingestion?style=plastic)](https://hub.docker.com/r/linkedin/datahub-ingestion)
[![datahub-ingestion docker](https://github.com/linkedin/datahub/actions/workflows/docker-ingestion.yml/badge.svg)](https://github.com/linkedin/datahub/actions/workflows/docker-ingestion.yml)
If you don't want to install locally, you can alternatively run metadata ingestion within a Docker container.
We have prebuilt images available on [Docker hub](https://hub.docker.com/r/linkedin/datahub-ingestion). All plugins will be installed and enabled automatically.
*Limitation: the datahub_docker.sh convenience script assumes that the recipe and any input/output files are accessible in the current working directory or its subdirectories. Files outside the current working directory will not be found, and you'll need to invoke the Docker image directly.*
### Build the image
```sh
source docker/docker_build.sh
./scripts/datahub_docker.sh ingest -c ./examples/recipes/example_to_datahub_rest.yml
```
### Usage - with Docker
We have a simple script provided that supports mounting a local directory for input recipes and an output directory for output data:
```sh
source docker/docker_run.sh examples/recipes/file_to_file.yml
```
-->
### Usage within Airflow
We have also included a couple [sample DAGs](./examples/airflow) that can be used with [Airflow](https://airflow.apache.org/).
@ -492,7 +493,7 @@ pytest tests/integration
```sh
# Assumes: pip install -e '.[dev]'
black --exclude 'datahub/metadata' -S -t py36 src tests
black src tests
isort src tests
flake8 src tests
mypy -p datahub

View File

@ -4,6 +4,19 @@ import click
from avrogen import write_schema_files
def suppress_checks_in_file(filepath: str) -> None:
"""Adds a couple lines to the top of a file to suppress flake8 and black"""
with open(filepath, "r+") as f:
contents = f.read()
f.seek(0, 0)
f.write("# flake8: noqa\n")
f.write("# fmt: off\n")
f.write(contents)
f.write("# fmt: on\n")
@click.command()
@click.argument("schema_file", type=click.Path(exists=True))
@click.argument("outdir", type=click.Path())
@ -20,6 +33,8 @@ def generate(schema_file: str, outdir: str):
redo_spaces = json.dumps(json.loads(schema_json), indent=2)
write_schema_files(redo_spaces, outdir)
suppress_checks_in_file(f"{outdir}/schema_classes.py")
suppress_checks_in_file(f"{outdir}/__init__.py")
if __name__ == "__main__":

View File

@ -13,7 +13,7 @@ pip install -e ".[dev]"
./scripts/codegen.sh
black --check --exclude 'datahub/metadata' -S -t py36 src tests
black --check src tests
isort --check-only src tests
flake8 --count --statistics src tests
mypy -p datahub

View File

@ -0,0 +1,17 @@
#!/bin/bash
# A convenience command to run the built docker container with a local config file and local output directory.
# Note: this works by mounting the current working directory as a Docker volume.
set -euo pipefail
DOCKER_IMAGE=linkedin/datahub-ingestion:latest
docker pull --quiet $DOCKER_IMAGE
docker run --rm \
--network host \
--workdir=/dir \
--mount type=bind,source="$(pwd)",target=/dir \
$DOCKER_IMAGE \
$@

View File

@ -23,6 +23,8 @@ plugins =
namespace_packages = true
strict_optional = yes
disallow_untyped_defs = no
disallow_incomplete_defs = no
check_untyped_defs = yes
[mypy-confluent_kafka.*]
ignore_missing_imports = yes

View File

@ -37,7 +37,7 @@ framework_common = {
"click>=7.1.1",
"pyyaml>=5.4.1",
"toml>=0.10.0",
"avro-gen3==0.3.3",
"avro-gen3==0.3.5",
"avro-python3>=1.8.2",
}

View File

@ -1,8 +1,8 @@
from collections import OrderedDict
from typing import Dict, Type
from typing import Any, Dict, Type
import requests
from requests.exceptions import HTTPError
from requests.exceptions import HTTPError, RequestException
from datahub.configuration.common import OperationalError
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
@ -29,7 +29,7 @@ resource_locator: Dict[Type[object], str] = {
}
def _rest_li_ify(obj):
def _rest_li_ify(obj: Any) -> Any:
if isinstance(obj, (dict, OrderedDict)):
if len(obj.keys()) == 1:
key = list(obj.keys())[0]
@ -37,10 +37,8 @@ def _rest_li_ify(obj):
if key.find("com.linkedin.pegasus2avro.") >= 0:
new_key = key.replace("com.linkedin.pegasus2avro.", "com.linkedin.")
return {new_key: _rest_li_ify(value)}
elif key == "string" or key == "array":
return value
new_obj = {}
new_obj: Any = {}
for key, value in obj.items():
if value is not None:
new_obj[key] = _rest_li_ify(value)
@ -75,11 +73,20 @@ class DatahubRestEmitter:
mce_obj = _rest_li_ify(raw_mce_obj)
snapshot = {"snapshot": mce_obj}
response = requests.post(url, headers=headers, json=snapshot)
try:
response = requests.post(url, headers=headers, json=snapshot)
# import curlify
# print(curlify.to_curl(response.request))
# breakpoint()
response.raise_for_status()
except HTTPError as e:
info = response.json()
raise OperationalError(
"Unable to emit metadata to DataHub GMS", info
) from e
except RequestException as e:
raise OperationalError(
"Unable to emit metadata to DataHub GMS", {"message": str(e)}
) from e

View File

@ -1,4 +1,5 @@
import logging
import os
import pathlib
import sys
@ -15,26 +16,33 @@ from datahub.ingestion.source.source_registry import source_registry
logger = logging.getLogger(__name__)
# Set to debug on the root logger.
logging.getLogger(None).setLevel(logging.DEBUG)
# Configure some loggers.
logging.getLogger("urllib3").setLevel(logging.WARN)
logging.getLogger("botocore").setLevel(logging.INFO)
# logging.getLogger("botocore").setLevel(logging.INFO)
# logging.getLogger("google").setLevel(logging.INFO)
# Configure logger.
BASE_LOGGING_FORMAT = (
"[%(asctime)s] %(levelname)-8s {%(name)s:%(lineno)d} - %(message)s"
)
logging.basicConfig(level=logging.DEBUG, format=BASE_LOGGING_FORMAT)
DEFAULT_CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
logging.basicConfig(format=BASE_LOGGING_FORMAT)
@click.group()
def datahub():
pass
@click.option("--debug/--no-debug", default=False)
def datahub(debug: bool) -> None:
if debug or os.getenv("DATAHUB_DEBUG", False):
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("datahub").setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.WARNING)
logging.getLogger("datahub").setLevel(logging.INFO)
# loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict]
# print(loggers)
# breakpoint()
@datahub.command(context_settings=DEFAULT_CONTEXT_SETTINGS)
@datahub.command()
@click.option(
"-c",
"--config",
@ -42,7 +50,7 @@ def datahub():
help="Config file in .toml or .yaml format",
required=True,
)
def ingest(config: str):
def ingest(config: str) -> None:
"""Main command for ingesting metadata into DataHub"""
config_file = pathlib.Path(config)
@ -65,7 +73,7 @@ def ingest(config: str):
pipeline_config = config_mech.load_config(fp)
try:
logger.debug(f"Using config: {pipeline_config}")
logger.info(f"Using config: {pipeline_config}")
pipeline = Pipeline.create(pipeline_config)
except ValidationError as e:
click.echo(e, err=True)
@ -76,8 +84,8 @@ def ingest(config: str):
sys.exit(ret)
@datahub.command(context_settings=DEFAULT_CONTEXT_SETTINGS)
def ingest_list_plugins():
@datahub.command()
def ingest_list_plugins() -> None:
"""List enabled ingestion plugins"""
click.secho("Sources:", bold=True)
@ -90,13 +98,13 @@ def ingest_list_plugins():
@datahub.group()
def check():
def check() -> None:
pass
@check.command()
@click.argument("json-file", type=click.Path(exists=True, dir_okay=False))
def mce_file(json_file: str):
def mce_file(json_file: str) -> None:
"""Check the schema of a MCE JSON file"""
report = check_mce_file(json_file)

View File

@ -27,5 +27,4 @@ class WorkUnit(_WorkUnitId, metaclass=ABCMeta):
@dataclass
class PipelineContext:
# TODO: autogenerate run_ids if not specified.
run_id: str

View File

@ -47,7 +47,7 @@ class Registry(Generic[T]):
tp = self._mapping[key]
if isinstance(tp, Exception):
raise ConfigurationError(
f"{key} is disabled; try running: pip install \".[{key}]\""
f'{key} is disabled; try running: pip install ".[{key}]"'
) from tp
else:
# If it's not an exception, then it's a registered type.
@ -55,7 +55,7 @@ class Registry(Generic[T]):
def __str__(self):
col_width = 15
return '\n'.join(
return "\n".join(
f"{key}{'' if self.is_enabled(key) else (' ' * (col_width - len(key))) + '(disabled)'}"
for key in sorted(self._mapping.keys())
)

View File

@ -71,7 +71,7 @@ class Sink(Closeable, metaclass=ABCMeta):
@abstractmethod
def write_record_async(
self, record_envelope: RecordEnvelope, callback: WriteCallback
):
) -> None:
# must call callback when done.
pass

View File

@ -1,6 +1,6 @@
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, Iterable, List
from typing import Dict, Generic, Iterable, List, TypeVar
from .closeable import Closeable
from .common import PipelineContext, RecordEnvelope, WorkUnit
@ -15,7 +15,7 @@ class SourceReport(Report):
warnings: Dict[str, List[str]] = field(default_factory=dict)
failures: Dict[str, List[str]] = field(default_factory=dict)
def report_workunit(self, wu: WorkUnit):
def report_workunit(self, wu: WorkUnit) -> None:
self.workunits_produced += 1
self.workunit_ids.append(wu.id)
@ -30,13 +30,16 @@ class SourceReport(Report):
self.failures[key].append(reason)
class Extractor(Closeable, metaclass=ABCMeta):
WorkUnitType = TypeVar("WorkUnitType", bound=WorkUnit)
class Extractor(Generic[WorkUnitType], Closeable, metaclass=ABCMeta):
@abstractmethod
def configure(self, config_dict: dict, ctx: PipelineContext):
pass
@abstractmethod
def get_records(self, workunit: WorkUnit) -> Iterable[RecordEnvelope]:
def get_records(self, workunit: WorkUnitType) -> Iterable[RecordEnvelope]:
pass

View File

@ -1,8 +1,9 @@
from typing import Iterable
from typing import Iterable, cast
from datahub.ingestion.api import RecordEnvelope
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Extractor
from datahub.ingestion.source.metadata_common import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
@ -13,11 +14,17 @@ class WorkUnitMCEExtractor(Extractor):
pass
def get_records(self, workunit) -> Iterable[RecordEnvelope[MetadataChangeEvent]]:
workunit = cast(MetadataWorkUnit, workunit)
if len(workunit.mce.proposedSnapshot.aspects) == 0:
raise AttributeError("every mce must have at least one aspect")
if not workunit.mce.validate():
raise ValueError(f"source produced an invalid MCE: {workunit.mce}")
yield RecordEnvelope(workunit.mce, {})
yield RecordEnvelope(
workunit.mce,
{
"workunit_id": workunit.id,
},
)
def close(self):
pass

View File

@ -1,5 +1,5 @@
import logging
import time
import uuid
import click
from pydantic import Field
@ -9,7 +9,7 @@ from datahub.configuration.common import (
DynamicTypedConfig,
PipelineExecutionError,
)
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.sink import Sink, WriteCallback
from datahub.ingestion.api.source import Extractor, Source
from datahub.ingestion.extractor.extractor_registry import extractor_registry
@ -28,20 +28,19 @@ class PipelineConfig(ConfigModel):
# simplify this configuration and validation.
# See https://github.com/samuelcolvin/pydantic/pull/2336.
run_id: str = Field(default_factory=lambda: str(int(time.time() * 1000)))
run_id: str = Field(default_factory=lambda: str(uuid.uuid1()))
source: SourceConfig
sink: DynamicTypedConfig
class LoggingCallback(WriteCallback):
def on_success(self, record_envelope, success_meta):
logger.debug("sink called success callback")
def on_success(self, record_envelope: RecordEnvelope, success_meta):
logger.info(f"sink wrote workunit {record_envelope.metadata['workunit_id']}")
def on_failure(self, record_envelope, exception, failure_meta):
# breakpoint()
logger.exception(
f"failed to write {record_envelope.record}"
" with {exception} and info {failure_meta}"
def on_failure(self, record_envelope: RecordEnvelope, exception, failure_meta):
logger.error(
f"failed to write record with workunit {record_envelope.metadata['workunit_id']}"
f" with {exception} and info {failure_meta}"
)

View File

@ -24,7 +24,7 @@ class ConsoleSink(Sink):
def write_record_async(
self, record_envelope: RecordEnvelope, write_callback: WriteCallback
):
print(f'{self.ctx.run_id}:{record_envelope}')
print(f"{record_envelope}")
if write_callback:
self.report.report_record_written(record_envelope)
write_callback.on_success(record_envelope, {})

View File

@ -24,13 +24,12 @@ class FileSink(Sink):
self.report = SinkReport()
fpath = pathlib.Path(self.config.filename)
logger.info(f"Will write to {fpath}")
self.file = fpath.open("w")
self.file.write("[\n")
self.wrote_something = False
@classmethod
def create(cls, config_dict, ctx: PipelineContext):
def create(cls, config_dict: dict, ctx: PipelineContext):
config = FileSinkConfig.parse_obj(config_dict)
return cls(ctx, config)
@ -44,7 +43,7 @@ class FileSink(Sink):
self,
record_envelope: RecordEnvelope[MetadataChangeEvent],
write_callback: WriteCallback,
):
) -> None:
mce = record_envelope.record
obj = mce.to_obj()

View File

@ -9,4 +9,4 @@ class MetadataWorkUnit(WorkUnit):
mce: MetadataChangeEvent
def get_metadata(self):
return {'mce': self.mce}
return {"mce": self.mce}

View File

@ -10,7 +10,8 @@ from sqlalchemy.sql import sqltypes as types
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport, WorkUnit
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.source.metadata_common import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
@ -91,11 +92,8 @@ class BasicSQLAlchemyConfig(SQLAlchemyConfig):
@dataclass
class SqlWorkUnit(WorkUnit):
mce: MetadataChangeEvent
def get_metadata(self):
return {"mce": self.mce}
class SqlWorkUnit(MetadataWorkUnit):
pass
_field_type_mapping = {

View File

@ -1,3 +1,4 @@
import logging
import os
import sys
import time
@ -9,6 +10,10 @@ sys.path.append(os.path.join(os.path.dirname(__file__), "test_helpers"))
pytest_plugins = ["tests.integration.fixtures.sql_fixtures"]
# Enable debug logging.
logging.getLogger().setLevel(logging.DEBUG)
os.putenv("DATAHUB_DEBUG", "1")
@pytest.fixture
def mock_time(monkeypatch):

View File

@ -1,17 +1,24 @@
import mce_helpers
import pytest
from click.testing import CliRunner
from datahub.entrypoints import datahub
from datahub.ingestion.run.pipeline import Pipeline
def test_serde_large(pytestconfig, tmp_path):
json_filename = "test_serde_large.json"
@pytest.mark.parametrize(
"json_filename",
[
# Normal test.
"tests/unit/serde/test_serde_large.json",
# Ensure correct representation of chart info's input list.
"tests/unit/serde/test_serde_chart_snapshot.json",
],
)
def test_serde(pytestconfig, tmp_path, json_filename):
golden_file = pytestconfig.rootpath / json_filename
output_filename = "output.json"
test_resources_dir = pytestconfig.rootpath / "tests/unit/serde"
golden_file = test_resources_dir / json_filename
output_file = tmp_path / output_filename
pipeline = Pipeline.create(
@ -25,23 +32,24 @@ def test_serde_large(pytestconfig, tmp_path):
output = mce_helpers.load_json_file(tmp_path / output_filename)
golden = mce_helpers.load_json_file(golden_file)
mce_helpers.assert_mces_equal(output, golden)
assert golden == output
def test_check_mce_schema(pytestconfig):
json_filename = "test_serde_large.json"
test_resources_dir = pytestconfig.rootpath / "tests/unit/serde"
json_file_path = test_resources_dir / json_filename
runner = CliRunner()
result = runner.invoke(datahub, ["check", "mce-file", f"{json_file_path}"])
assert result.exit_code == 0
def test_reader_allows_verbose_unions(pytestconfig):
json_filename = "test_serde_backwards_compat.json"
test_resources_dir = pytestconfig.rootpath / "tests/unit/serde"
json_file_path = test_resources_dir / json_filename
@pytest.mark.parametrize(
"json_filename",
[
# Normal test.
"tests/unit/serde/test_serde_large.json",
# Check for backwards compatability with specifying all union types.
"tests/unit/serde/test_serde_backwards_compat.json",
# Ensure sample MCE files are valid.
"examples/mce_files/single_mce.json",
"examples/mce_files/mce_list.json",
"examples/mce_files/bootstrap_mce.json",
],
)
def test_check_mce_schema(pytestconfig, json_filename):
json_file_path = pytestconfig.rootpath / json_filename
runner = CliRunner()
result = runner.invoke(datahub, ["check", "mce-file", f"{json_file_path}"])

View File

@ -0,0 +1,41 @@
[
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": {
"urn": "urn:li:chart:(looker,baz2)",
"aspects": [
{
"com.linkedin.pegasus2avro.chart.ChartInfo": {
"title": "Baz Chart 2",
"description": "Baz Chart 2",
"lastModified": {
"created": {
"time": 0,
"actor": "urn:li:corpuser:jdoe",
"impersonator": null
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:datahub",
"impersonator": null
},
"deleted": null
},
"chartUrl": null,
"inputs": [
{
"string": "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)"
}
],
"type": null,
"access": null,
"lastRefreshed": null
}
}
]
}
},
"proposedDelta": null
}
]

View File

@ -3,7 +3,7 @@ namespace com.linkedin.schema
import com.linkedin.dataset.SchemaFieldPath
/**
* If SchemaMetadata fields make any external references and references are of type com.linkeidn.common.Urn or any children, this models can be used to mark it.
* If SchemaMetadata fields make any external references and references are of type com.linkedin.common.Urn or any children, this models can be used to mark it.
*/
record UrnForeignKey {
@ -11,4 +11,4 @@ record UrnForeignKey {
* Field in hosting(current) SchemaMetadata.
*/
currentFieldPath: SchemaFieldPath
}
}