diff --git a/docs/cli.md b/docs/cli.md index 4175f30cdf..8f08c8c7c1 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -71,7 +71,8 @@ We use a plugin architecture so that you can install only the dependencies you a | [datahub-business-glossary](../metadata-ingestion/source_docs/business_glossary.md) | _no additional dependencies_ | Business Glossary File source | | [dbt](../metadata-ingestion/source_docs/dbt.md) | _no additional dependencies_ | dbt source | | [druid](../metadata-ingestion/source_docs/druid.md) | `pip install 'acryl-datahub[druid]'` | Druid Source | -| [feast](../metadata-ingestion/source_docs/feast.md) | `pip install 'acryl-datahub[feast]'` | Feast source | +| [feast-legacy](../metadata-ingestion/source_docs/feast_legacy.md) | `pip install 'acryl-datahub[feast-legacy]'` | Feast source (legacy) | +| [feast](../metadata-ingestion/source_docs/feast.md) | `pip install 'acryl-datahub[feast]'` | Feast source (0.18.0) | | [glue](../metadata-ingestion/source_docs/glue.md) | `pip install 'acryl-datahub[glue]'` | AWS Glue source | | [hive](../metadata-ingestion/source_docs/hive.md) | `pip install 'acryl-datahub[hive]'` | Hive source | | [kafka](../metadata-ingestion/source_docs/kafka.md) | `pip install 'acryl-datahub[kafka]'` | Kafka source | diff --git a/metadata-ingestion/examples/recipes/feast_repository_to_datahub.yml b/metadata-ingestion/examples/recipes/feast_repository_to_datahub.yml new file mode 100644 index 0000000000..feba660402 --- /dev/null +++ b/metadata-ingestion/examples/recipes/feast_repository_to_datahub.yml @@ -0,0 +1,9 @@ +source: + type: "feast-repository" + config: + path: "/path/to/repository/" + environment: "PROD" +sink: + type: "datahub-rest" + config: + server: "http://localhost:8080" diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 6739db9fd4..e308234e9a 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -172,7 +172,8 @@ plugins: Dict[str, Set[str]] = { # https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/release-notes.html#rn-7-14-0 # https://github.com/elastic/elasticsearch-py/issues/1639#issuecomment-883587433 "elasticsearch": {"elasticsearch==7.13.4"}, - "feast": {"docker"}, + "feast-legacy": {"docker"}, + "feast": {"feast==0.18.0", "flask-openid>=1.3.0"}, "glue": aws_common, "hive": sql_common | { @@ -322,10 +323,30 @@ base_dev_requirements = { ), } +base_dev_requirements_airflow_1 = base_dev_requirements.copy() + if is_py37_or_newer: - # The lookml plugin only works on Python 3.7 or newer. + # These plugins only work on Python 3.7 or newer. base_dev_requirements = base_dev_requirements.union( - {dependency for plugin in ["lookml"] for dependency in plugins[plugin]} + { + dependency + for plugin in [ + "feast", + "lookml", + ] + for dependency in plugins[plugin] + } + ) + + # These plugins are compatible with Airflow 1. + base_dev_requirements_airflow_1 = base_dev_requirements_airflow_1.union( + { + dependency + for plugin in [ + "lookml", + ] + for dependency in plugins[plugin] + } ) dev_requirements = { @@ -340,7 +361,7 @@ dev_requirements_airflow_1_base = { "WTForms==2.3.3", # make constraint consistent with extras } dev_requirements_airflow_1 = { - *base_dev_requirements, + *base_dev_requirements_airflow_1, *dev_requirements_airflow_1_base, } @@ -348,11 +369,9 @@ full_test_dev_requirements = { *list( dependency for plugin in [ - # Only include Athena for Python 3.7 or newer. - *(["athena"] if is_py37_or_newer else []), "clickhouse", "druid", - "feast", + "feast-legacy", "hive", "ldap", "mongodb", @@ -367,6 +386,19 @@ full_test_dev_requirements = { ), } +if is_py37_or_newer: + # These plugins only work on Python 3.7 or newer. + full_test_dev_requirements = full_test_dev_requirements.union( + { + dependency + for plugin in [ + "athena", + "feast", + ] + for dependency in plugins[plugin] + } + ) + entry_points = { "console_scripts": ["datahub = datahub.entrypoints:main"], "datahub.ingestion.source.plugins": [ @@ -383,7 +415,8 @@ entry_points = { "dbt = datahub.ingestion.source.dbt:DBTSource", "druid = datahub.ingestion.source.sql.druid:DruidSource", "elasticsearch = datahub.ingestion.source.elastic_search:ElasticsearchSource", - "feast = datahub.ingestion.source.feast:FeastSource", + "feast-legacy = datahub.ingestion.source.feast_legacy:FeastSource", + "feast = datahub.ingestion.source.feast:FeastRepositorySource", "glue = datahub.ingestion.source.aws.glue:GlueSource", "sagemaker = datahub.ingestion.source.aws.sagemaker:SagemakerSource", "hive = datahub.ingestion.source.sql.hive:HiveSource", diff --git a/metadata-ingestion/source_docs/feast.md b/metadata-ingestion/source_docs/feast.md index 26f6653558..e303de15d0 100644 --- a/metadata-ingestion/source_docs/feast.md +++ b/metadata-ingestion/source_docs/feast.md @@ -2,23 +2,30 @@ For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md). -## Setup +This source is designed for Feast 10+ repositories. -**Note: Feast ingestion requires Docker to be installed.** +As of version 0.10+, Feast has changed the architecture from a stack of services to SDK/CLI centric application. Please refer to [Feast 0.9 vs Feast 0.10+](https://docs.feast.dev/project/feast-0.9-vs-feast-0.10+) for further details. + +For compatibility with pre-0.10 Feast, see [Feast Legacy](feast_legacy.md) source. + +:::note + +This source is only compatible with Feast 0.18.0 +::: + +## Setup To install this plugin, run `pip install 'acryl-datahub[feast]'`. ## Capabilities -This plugin extracts the following: +This plugin extracts: -- List of feature tables (modeled as [`MLFeatureTable`](https://github.com/datahub-project/datahub/blob/master/metadata-models/src/main/pegasus/com/linkedin/ml/metadata/MLFeatureTableProperties.pdl)s), - features ([`MLFeature`](https://github.com/datahub-project/datahub/blob/master/metadata-models/src/main/pegasus/com/linkedin/ml/metadata/MLFeatureProperties.pdl)s), - and entities ([`MLPrimaryKey`](https://github.com/datahub-project/datahub/blob/master/metadata-models/src/main/pegasus/com/linkedin/ml/metadata/MLPrimaryKeyProperties.pdl)s) -- Column types associated with each feature and entity - -Note: this uses a separate Docker container to extract Feast's metadata into a JSON file, which is then -parsed to DataHub's native objects. This separation was performed because of a dependency conflict in the `feast` module. +- Entities as [`MLPrimaryKey`](https://datahubproject.io/docs/graphql/objects#mlprimarykey) +- Features as [`MLFeature`](https://datahubproject.io/docs/graphql/objects#mlfeature) +- Feature views and on-demand feature views as [`MLFeatureTable`](https://datahubproject.io/docs/graphql/objects#mlfeaturetable) +- Batch and stream source details as [`Dataset`](https://datahubproject.io/docs/graphql/objects#dataset) +- Column types associated with each entity and feature ## Quickstart recipe @@ -26,12 +33,14 @@ Check out the following recipe to get started with ingestion! See [below](#confi For general pointers on writing and running a recipe, see our [main recipe guide](../README.md#recipes). -```yml +```yaml source: - type: feast + type: "feast" config: # Coordinates - core_url: "localhost:6565" + path: "/path/to/repository/" + # Options + environment: "PROD" sink: # sink configs @@ -41,15 +50,14 @@ sink: Note that a `.` is used to denote nested fields in the YAML recipe. -| Field | Required | Default | Description | -| ----------------- | -------- | ------------------ | ------------------------------------------------------- | -| `core_url` | | `"localhost:6565"` | URL of Feast Core instance. | -| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. | -| `use_local_build` | | `False` | Whether to build Feast ingestion Docker image locally. | +| Field | Required | Default | Description | +| ------------- | -------- | ------- | ------------------------------------------ | +| `path` | ✅ | | Path to Feast repository. | +| `environment` | | `PROD` | Environment to use when constructing URNs. | ## Compatibility -Coming soon! +This source is compatible with [Feast (==0.18.0)](https://github.com/feast-dev/feast/releases/tag/v0.18.0). ## Questions diff --git a/metadata-ingestion/source_docs/feast_legacy.md b/metadata-ingestion/source_docs/feast_legacy.md new file mode 100644 index 0000000000..1a55a472ba --- /dev/null +++ b/metadata-ingestion/source_docs/feast_legacy.md @@ -0,0 +1,63 @@ +# Feast (Legacy) + +For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md). + +This source is designed for Feast 0.9 core services. + +As of version 0.10+, Feast has changed the architecture from a stack of services to SDK/CLI centric application. Please refer to [Feast 0.9 vs Feast 0.10+](https://docs.feast.dev/project/feast-0.9-vs-feast-0.10+) for further details. + +See [Feast](feast.md) source for something compatible with the latest Feast versions. + +## Setup + +**Note: Feast ingestion requires Docker to be installed.** + +To install this plugin, run `pip install 'acryl-datahub[feast-legacy]'`. + +## Capabilities + +This plugin extracts the following: + +- Entities as [`MLPrimaryKey`](https://datahubproject.io/docs/graphql/objects#mlprimarykey) +- Features as [`MLFeature`](https://datahubproject.io/docs/graphql/objects#mlfeature) +- Feature tables as [`MLFeatureTable`](https://datahubproject.io/docs/graphql/objects#mlfeaturetable) +- Batch and stream source details as [`Dataset`](https://datahubproject.io/docs/graphql/objects#dataset) +- Column types associated with each entity and feature + +Note: this uses a separate Docker container to extract Feast's metadata into a JSON file, which is then +parsed to DataHub's native objects. This separation was performed because of a dependency conflict in the `feast` module. + +## Quickstart recipe + +Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options. + +For general pointers on writing and running a recipe, see our [main recipe guide](../README.md#recipes). + +```yml +source: + type: feast-legacy + config: + # Coordinates + core_url: "localhost:6565" + +sink: + # sink configs +``` + +## Config details + +Note that a `.` is used to denote nested fields in the YAML recipe. + +| Field | Required | Default | Description | +| ----------------- | -------- | ------------------ | ------------------------------------------------------- | +| `core_url` | | `"localhost:6565"` | URL of Feast Core instance. | +| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. | +| `use_local_build` | | `False` | Whether to build Feast ingestion Docker image locally. | + +## Compatibility + +This source is compatible with [Feast (0.10.5)](https://github.com/feast-dev/feast/releases/tag/v0.10.5) and earlier versions. + +## Questions + +If you've got any questions on configuring this source, feel free to ping us on [our Slack](https://slack.datahubproject.io/)! diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index 92cce6b305..9241165b1a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -1,11 +1,23 @@ -import json -import os -import tempfile -from dataclasses import dataclass, field -from shlex import quote -from typing import Dict, Iterable, List +import sys +from dataclasses import dataclass +from typing import Dict, Iterable, List, Tuple, Union -import docker +if sys.version_info >= (3, 7): + from feast import ( + BigQuerySource, + Entity, + Feature, + FeatureStore, + FeatureView, + FileSource, + KafkaSource, + KinesisSource, + OnDemandFeatureView, + ValueType, + ) + from feast.data_source import DataSource, RequestDataSource +else: + raise ModuleNotFoundError("The feast plugin requires Python 3.7 or newer.") import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel @@ -25,283 +37,334 @@ from datahub.metadata.schema_classes import ( MLFeaturePropertiesClass, MLFeatureTablePropertiesClass, MLPrimaryKeyPropertiesClass, + StatusClass, ) -# map Feast types to DataHub classes -_field_type_mapping: Dict[str, str] = { - "BYTES": MLFeatureDataType.BYTE, - "STRING": MLFeatureDataType.TEXT, - "INT32": MLFeatureDataType.ORDINAL, - "INT64": MLFeatureDataType.ORDINAL, - "DOUBLE": MLFeatureDataType.CONTINUOUS, - "FLOAT": MLFeatureDataType.CONTINUOUS, - "BOOL": MLFeatureDataType.BINARY, - "UNIX_TIMESTAMP": MLFeatureDataType.TIME, - "BYTES_LIST": MLFeatureDataType.SEQUENCE, - "STRING_LIST": MLFeatureDataType.SEQUENCE, - "INT32_LIST": MLFeatureDataType.SEQUENCE, - "INT64_LIST": MLFeatureDataType.SEQUENCE, - "DOUBLE_LIST": MLFeatureDataType.SEQUENCE, - "FLOAT_LIST": MLFeatureDataType.SEQUENCE, - "BOOL_LIST": MLFeatureDataType.SEQUENCE, - "UNIX_TIMESTAMP_LIST": MLFeatureDataType.SEQUENCE, +assert sys.version_info >= (3, 7) # needed for mypy + +_field_type_mapping: Dict[ValueType, str] = { + ValueType.UNKNOWN: MLFeatureDataType.UNKNOWN, + ValueType.BYTES: MLFeatureDataType.BYTE, + ValueType.STRING: MLFeatureDataType.TEXT, + ValueType.INT32: MLFeatureDataType.ORDINAL, + ValueType.INT64: MLFeatureDataType.ORDINAL, + ValueType.DOUBLE: MLFeatureDataType.CONTINUOUS, + ValueType.FLOAT: MLFeatureDataType.CONTINUOUS, + ValueType.BOOL: MLFeatureDataType.BINARY, + ValueType.UNIX_TIMESTAMP: MLFeatureDataType.TIME, + ValueType.BYTES_LIST: MLFeatureDataType.SEQUENCE, + ValueType.STRING_LIST: MLFeatureDataType.SEQUENCE, + ValueType.INT32_LIST: MLFeatureDataType.SEQUENCE, + ValueType.INT64_LIST: MLFeatureDataType.SEQUENCE, + ValueType.DOUBLE_LIST: MLFeatureDataType.SEQUENCE, + ValueType.FLOAT_LIST: MLFeatureDataType.SEQUENCE, + ValueType.BOOL_LIST: MLFeatureDataType.SEQUENCE, + ValueType.UNIX_TIMESTAMP_LIST: MLFeatureDataType.SEQUENCE, + ValueType.NULL: MLFeatureDataType.UNKNOWN, } -# image to use for initial feast extraction -HOSTED_FEAST_IMAGE = "acryldata/datahub-ingestion-feast-wrapper" - -class FeastConfig(ConfigModel): - core_url: str = "localhost:6565" - env: str = DEFAULT_ENV - use_local_build: bool = False +class FeastRepositorySourceConfig(ConfigModel): + path: str + environment: str = DEFAULT_ENV @dataclass -class FeastSourceReport(SourceReport): - filtered: List[str] = field(default_factory=list) +class FeastRepositorySource(Source): + source_config: FeastRepositorySourceConfig + report: SourceReport + feature_store: FeatureStore - def report_dropped(self, name: str) -> None: - self.filtered.append(name) - - -@dataclass -class FeastSource(Source): - config: FeastConfig - report: FeastSourceReport - - def __init__(self, ctx: PipelineContext, config: FeastConfig): + def __init__(self, config: FeastRepositorySourceConfig, ctx: PipelineContext): super().__init__(ctx) - self.config = config - self.report = FeastSourceReport() - @classmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> "FeastSource": - config = FeastConfig.parse_obj(config_dict) - return cls(ctx, config) + self.source_config = config + self.report = SourceReport() + self.feature_store = FeatureStore(self.source_config.path) - def get_field_type(self, field_type: str, parent_name: str) -> str: + def _get_field_type(self, field_type: ValueType, parent_name: str) -> str: """ Maps types encountered in Feast to corresponding schema types. - - Parameters - ---------- - field_type: - type of a Feast object - parent_name: - name of table (for logging) """ - enum_type = _field_type_mapping.get(field_type) - if enum_type is None: + ml_feature_data_type = _field_type_mapping.get(field_type) + + if ml_feature_data_type is None: self.report.report_warning( parent_name, f"unable to map type {field_type} to metadata schema" ) - enum_type = MLFeatureDataType.UNKNOWN - return enum_type + ml_feature_data_type = MLFeatureDataType.UNKNOWN - def get_entity_wu(self, ingest_table, ingest_entity): + return ml_feature_data_type + + def _get_data_source_details(self, source: DataSource) -> Tuple[str, str]: """ - Generate an MLPrimaryKey workunit for a Feast entity. - - Parameters - ---------- - ingest_table: - ingested Feast table - ingest_entity: - ingested Feast entity + Get Feast batch/stream source platform and name. """ - # create snapshot instance for the entity + platform = "unknown" + name = "unknown" + + if isinstance(source, FileSource): + platform = "file" + + name = source.path.replace("://", ".").replace("/", ".") + + if isinstance(source, BigQuerySource): + platform = "bigquery" + name = source.table + + if isinstance(source, KafkaSource): + platform = "kafka" + name = source.kafka_options.topic + + if isinstance(source, KinesisSource): + platform = "kinesis" + name = ( + f"{source.kinesis_options.region}:{source.kinesis_options.stream_name}" + ) + + if isinstance(source, RequestDataSource): + platform = "request" + name = source.name + + return platform, name + + def _get_data_sources(self, feature_view: FeatureView) -> List[str]: + """ + Get data source URN list. + """ + + sources = [] + + if feature_view.batch_source is not None: + batch_source_platform, batch_source_name = self._get_data_source_details( + feature_view.batch_source + ) + sources.append( + builder.make_dataset_urn( + batch_source_platform, + batch_source_name, + self.source_config.environment, + ) + ) + + if feature_view.stream_source is not None: + stream_source_platform, stream_source_name = self._get_data_source_details( + feature_view.stream_source + ) + sources.append( + builder.make_dataset_urn( + stream_source_platform, + stream_source_name, + self.source_config.environment, + ) + ) + + return sources + + def _get_entity_workunit( + self, feature_view: FeatureView, entity: Entity + ) -> MetadataWorkUnit: + """ + Generate an MLPrimaryKey work unit for a Feast entity. + """ + + feature_view_name = f"{self.feature_store.project}.{feature_view.name}" + entity_snapshot = MLPrimaryKeySnapshot( - urn=builder.make_ml_primary_key_urn( - ingest_table["name"], ingest_entity["name"] - ), - aspects=[], + urn=builder.make_ml_primary_key_urn(feature_view_name, entity.name), + aspects=[StatusClass(removed=False)], ) - entity_sources = [] - - if ingest_entity["batch_source"] is not None: - entity_sources.append( - builder.make_dataset_urn( - ingest_entity["batch_source_platform"], - ingest_entity["batch_source_name"], - self.config.env, - ) - ) - - if ingest_entity["stream_source"] is not None: - entity_sources.append( - builder.make_dataset_urn( - ingest_entity["stream_source_platform"], - ingest_entity["stream_source_name"], - self.config.env, - ) - ) - - # append entity name and type entity_snapshot.aspects.append( MLPrimaryKeyPropertiesClass( - description=ingest_entity["description"], - dataType=self.get_field_type( - ingest_entity["type"], ingest_entity["name"] - ), - sources=entity_sources, + description=entity.description, + dataType=self._get_field_type(entity.value_type, entity.name), + sources=self._get_data_sources(feature_view), ) ) - # make the MCE and workunit mce = MetadataChangeEvent(proposedSnapshot=entity_snapshot) - return MetadataWorkUnit(id=ingest_entity["name"], mce=mce) - def get_feature_wu(self, ingest_table, ingest_feature): + return MetadataWorkUnit(id=entity.name, mce=mce) + + def _get_feature_workunit( + self, + feature_view: Union[FeatureView, OnDemandFeatureView], + feature: Feature, + ) -> MetadataWorkUnit: """ - Generate an MLFeature workunit for a Feast feature. - - Parameters - ---------- - ingest_table: - ingested Feast table - ingest_feature: - ingested Feast feature + Generate an MLFeature work unit for a Feast feature. """ + feature_view_name = f"{self.feature_store.project}.{feature_view.name}" - # create snapshot instance for the feature feature_snapshot = MLFeatureSnapshot( - urn=builder.make_ml_feature_urn( - ingest_table["name"], ingest_feature["name"] - ), - aspects=[], + urn=builder.make_ml_feature_urn(feature_view_name, feature.name), + aspects=[StatusClass(removed=False)], ) feature_sources = [] - if ingest_feature["batch_source"] is not None: - feature_sources.append( - builder.make_dataset_urn( - ingest_feature["batch_source_platform"], - ingest_feature["batch_source_name"], - self.config.env, - ) - ) + if isinstance(feature_view, FeatureView): + feature_sources = self._get_data_sources(feature_view) + elif isinstance(feature_view, OnDemandFeatureView): + if feature_view.input_request_data_sources is not None: + for request_source in feature_view.input_request_data_sources.values(): + source_platform, source_name = self._get_data_source_details( + request_source + ) - if ingest_feature["stream_source"] is not None: - feature_sources.append( - builder.make_dataset_urn( - ingest_feature["stream_source_platform"], - ingest_feature["stream_source_name"], - self.config.env, - ) - ) + feature_sources.append( + builder.make_dataset_urn( + source_platform, + source_name, + self.source_config.environment, + ) + ) + + if feature_view.input_feature_view_projections is not None: + for ( + feature_view_projection + ) in feature_view.input_feature_view_projections.values(): + feature_view_source = self.feature_store.get_feature_view( + feature_view_projection.name + ) + + feature_sources.extend(self._get_data_sources(feature_view_source)) - # append feature name and type feature_snapshot.aspects.append( MLFeaturePropertiesClass( - dataType=self.get_field_type( - ingest_feature["type"], ingest_feature["name"] - ), + description=feature.labels.get("description"), + dataType=self._get_field_type(feature.dtype, feature.name), sources=feature_sources, ) ) - # make the MCE and workunit mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot) - return MetadataWorkUnit(id=ingest_feature["name"], mce=mce) - def get_feature_table_wu(self, ingest_table): + return MetadataWorkUnit(id=feature.name, mce=mce) + + def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkUnit: """ - Generate an MLFeatureTable workunit for a Feast feature table. - - Parameters - ---------- - ingest_table: - ingested Feast table + Generate an MLFeatureTable work unit for a Feast feature view. """ - featuretable_snapshot = MLFeatureTableSnapshot( - urn=builder.make_ml_feature_table_urn("feast", ingest_table["name"]), + feature_view_name = f"{self.feature_store.project}.{feature_view.name}" + + feature_view_snapshot = MLFeatureTableSnapshot( + urn=builder.make_ml_feature_table_urn("feast", feature_view_name), aspects=[ - BrowsePathsClass(paths=[f"feast/{ingest_table['name']}"]), + BrowsePathsClass( + paths=[f"/feast/{self.feature_store.project}/{feature_view_name}"] + ), + StatusClass(removed=False), ], ) - featuretable_snapshot.aspects.append( + feature_view_snapshot.aspects.append( MLFeatureTablePropertiesClass( mlFeatures=[ builder.make_ml_feature_urn( - ingest_table["name"], - feature["name"], + feature_view_name, + feature.name, ) - for feature in ingest_table["features"] + for feature in feature_view.features ], - # a feature table can have multiple primary keys, which then act as a composite key mlPrimaryKeys=[ - builder.make_ml_primary_key_urn( - ingest_table["name"], entity["name"] - ) - for entity in ingest_table["entities"] + builder.make_ml_primary_key_urn(feature_view_name, entity_name) + for entity_name in feature_view.entities ], ) ) - # make the MCE and workunit - mce = MetadataChangeEvent(proposedSnapshot=featuretable_snapshot) - return MetadataWorkUnit(id=ingest_table["name"], mce=mce) + mce = MetadataChangeEvent(proposedSnapshot=feature_view_snapshot) + + return MetadataWorkUnit(id=feature_view_name, mce=mce) + + def _get_on_demand_feature_view_workunit( + self, on_demand_feature_view: OnDemandFeatureView + ) -> MetadataWorkUnit: + """ + Generate an MLFeatureTable work unit for a Feast on-demand feature view. + """ + + on_demand_feature_view_name = ( + f"{self.feature_store.project}.{on_demand_feature_view.name}" + ) + + on_demand_feature_view_snapshot = MLFeatureTableSnapshot( + urn=builder.make_ml_feature_table_urn("feast", on_demand_feature_view_name), + aspects=[ + BrowsePathsClass( + paths=[ + f"/feast/{self.feature_store.project}/{on_demand_feature_view_name}" + ] + ), + StatusClass(removed=False), + ], + ) + + on_demand_feature_view_snapshot.aspects.append( + MLFeatureTablePropertiesClass( + mlFeatures=[ + builder.make_ml_feature_urn( + on_demand_feature_view_name, + feature.name, + ) + for feature in on_demand_feature_view.features + ], + mlPrimaryKeys=[], + ) + ) + + mce = MetadataChangeEvent(proposedSnapshot=on_demand_feature_view_snapshot) + + return MetadataWorkUnit(id=on_demand_feature_view_name, mce=mce) + + @classmethod + def create(cls, config_dict, ctx): + config = FeastRepositorySourceConfig.parse_obj(config_dict) + return cls(config, ctx) def get_workunits(self) -> Iterable[MetadataWorkUnit]: - with tempfile.NamedTemporaryFile(suffix=".json") as tf: + for feature_view in self.feature_store.list_feature_views(): + for entity_name in feature_view.entities: + entity = self.feature_store.get_entity(entity_name) - docker_client = docker.from_env() + work_unit = self._get_entity_workunit(feature_view, entity) + self.report.report_workunit(work_unit) - feast_image = HOSTED_FEAST_IMAGE + yield work_unit - # build the image locally if specified - if self.config.use_local_build: - dirname = os.path.dirname(__file__) - image_directory = os.path.join(dirname, "feast_image/") + for feature in feature_view.features: + work_unit = self._get_feature_workunit(feature_view, feature) + self.report.report_workunit(work_unit) - image, _ = docker_client.images.build(path=image_directory) + yield work_unit - feast_image = image.id + work_unit = self._get_feature_view_workunit(feature_view) + self.report.report_workunit(work_unit) - docker_client.containers.run( - feast_image, - f"python3 ingest.py --core_url={quote(self.config.core_url)} --output_path=/out.json", - # allow the image to access the core URL if on host - network_mode="host", - # mount the tempfile so the Docker image has access - volumes={ - tf.name: {"bind": "/out.json", "mode": "rw"}, - }, + yield work_unit + + for on_demand_feature_view in self.feature_store.list_on_demand_feature_views(): + for feature in on_demand_feature_view.features: + work_unit = self._get_feature_workunit(on_demand_feature_view, feature) + self.report.report_workunit(work_unit) + + yield work_unit + + work_unit = self._get_on_demand_feature_view_workunit( + on_demand_feature_view ) + self.report.report_workunit(work_unit) - ingest = json.load(tf) + yield work_unit - # ingest tables - for ingest_table in ingest: - - # ingest entities in table - for ingest_entity in ingest_table["entities"]: - - wu = self.get_entity_wu(ingest_table, ingest_entity) - self.report.report_workunit(wu) - yield wu - - # ingest features in table - for ingest_feature in ingest_table["features"]: - - wu = self.get_feature_wu(ingest_table, ingest_feature) - self.report.report_workunit(wu) - yield wu - - wu = self.get_feature_table_wu(ingest_table) - self.report.report_workunit(wu) - yield wu - - def get_report(self) -> FeastSourceReport: + def get_report(self) -> SourceReport: return self.report - def close(self): + def close(self) -> None: return diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast_image/ingest.py b/metadata-ingestion/src/datahub/ingestion/source/feast_image/ingest.py index 7c6371f61d..e02f92db89 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast_image/ingest.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast_image/ingest.py @@ -1,8 +1,16 @@ import json import click -from feast import Client -from feast.data_source import BigQuerySource, FileSource, KafkaSource, KinesisSource +import feast + +if feast.__version__ <= "0.18.0": + from feast import Client # type: ignore + from feast.data_source import ( # type: ignore + BigQuerySource, + FileSource, + KafkaSource, + KinesisSource, + ) @click.command( diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast_legacy.py b/metadata-ingestion/src/datahub/ingestion/source/feast_legacy.py new file mode 100644 index 0000000000..92cce6b305 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/feast_legacy.py @@ -0,0 +1,307 @@ +import json +import os +import tempfile +from dataclasses import dataclass, field +from shlex import quote +from typing import Dict, Iterable, List + +import docker + +import datahub.emitter.mce_builder as builder +from datahub.configuration.common import ConfigModel +from datahub.emitter.mce_builder import DEFAULT_ENV +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.metadata.com.linkedin.pegasus2avro.common import MLFeatureDataType +from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import ( + MLFeatureSnapshot, + MLFeatureTableSnapshot, + MLPrimaryKeySnapshot, +) +from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent +from datahub.metadata.schema_classes import ( + BrowsePathsClass, + MLFeaturePropertiesClass, + MLFeatureTablePropertiesClass, + MLPrimaryKeyPropertiesClass, +) + +# map Feast types to DataHub classes +_field_type_mapping: Dict[str, str] = { + "BYTES": MLFeatureDataType.BYTE, + "STRING": MLFeatureDataType.TEXT, + "INT32": MLFeatureDataType.ORDINAL, + "INT64": MLFeatureDataType.ORDINAL, + "DOUBLE": MLFeatureDataType.CONTINUOUS, + "FLOAT": MLFeatureDataType.CONTINUOUS, + "BOOL": MLFeatureDataType.BINARY, + "UNIX_TIMESTAMP": MLFeatureDataType.TIME, + "BYTES_LIST": MLFeatureDataType.SEQUENCE, + "STRING_LIST": MLFeatureDataType.SEQUENCE, + "INT32_LIST": MLFeatureDataType.SEQUENCE, + "INT64_LIST": MLFeatureDataType.SEQUENCE, + "DOUBLE_LIST": MLFeatureDataType.SEQUENCE, + "FLOAT_LIST": MLFeatureDataType.SEQUENCE, + "BOOL_LIST": MLFeatureDataType.SEQUENCE, + "UNIX_TIMESTAMP_LIST": MLFeatureDataType.SEQUENCE, +} + +# image to use for initial feast extraction +HOSTED_FEAST_IMAGE = "acryldata/datahub-ingestion-feast-wrapper" + + +class FeastConfig(ConfigModel): + core_url: str = "localhost:6565" + env: str = DEFAULT_ENV + use_local_build: bool = False + + +@dataclass +class FeastSourceReport(SourceReport): + filtered: List[str] = field(default_factory=list) + + def report_dropped(self, name: str) -> None: + self.filtered.append(name) + + +@dataclass +class FeastSource(Source): + config: FeastConfig + report: FeastSourceReport + + def __init__(self, ctx: PipelineContext, config: FeastConfig): + super().__init__(ctx) + self.config = config + self.report = FeastSourceReport() + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "FeastSource": + config = FeastConfig.parse_obj(config_dict) + return cls(ctx, config) + + def get_field_type(self, field_type: str, parent_name: str) -> str: + """ + Maps types encountered in Feast to corresponding schema types. + + Parameters + ---------- + field_type: + type of a Feast object + parent_name: + name of table (for logging) + """ + enum_type = _field_type_mapping.get(field_type) + + if enum_type is None: + self.report.report_warning( + parent_name, f"unable to map type {field_type} to metadata schema" + ) + enum_type = MLFeatureDataType.UNKNOWN + + return enum_type + + def get_entity_wu(self, ingest_table, ingest_entity): + """ + Generate an MLPrimaryKey workunit for a Feast entity. + + Parameters + ---------- + ingest_table: + ingested Feast table + ingest_entity: + ingested Feast entity + """ + + # create snapshot instance for the entity + entity_snapshot = MLPrimaryKeySnapshot( + urn=builder.make_ml_primary_key_urn( + ingest_table["name"], ingest_entity["name"] + ), + aspects=[], + ) + + entity_sources = [] + + if ingest_entity["batch_source"] is not None: + entity_sources.append( + builder.make_dataset_urn( + ingest_entity["batch_source_platform"], + ingest_entity["batch_source_name"], + self.config.env, + ) + ) + + if ingest_entity["stream_source"] is not None: + entity_sources.append( + builder.make_dataset_urn( + ingest_entity["stream_source_platform"], + ingest_entity["stream_source_name"], + self.config.env, + ) + ) + + # append entity name and type + entity_snapshot.aspects.append( + MLPrimaryKeyPropertiesClass( + description=ingest_entity["description"], + dataType=self.get_field_type( + ingest_entity["type"], ingest_entity["name"] + ), + sources=entity_sources, + ) + ) + + # make the MCE and workunit + mce = MetadataChangeEvent(proposedSnapshot=entity_snapshot) + return MetadataWorkUnit(id=ingest_entity["name"], mce=mce) + + def get_feature_wu(self, ingest_table, ingest_feature): + """ + Generate an MLFeature workunit for a Feast feature. + + Parameters + ---------- + ingest_table: + ingested Feast table + ingest_feature: + ingested Feast feature + """ + + # create snapshot instance for the feature + feature_snapshot = MLFeatureSnapshot( + urn=builder.make_ml_feature_urn( + ingest_table["name"], ingest_feature["name"] + ), + aspects=[], + ) + + feature_sources = [] + + if ingest_feature["batch_source"] is not None: + feature_sources.append( + builder.make_dataset_urn( + ingest_feature["batch_source_platform"], + ingest_feature["batch_source_name"], + self.config.env, + ) + ) + + if ingest_feature["stream_source"] is not None: + feature_sources.append( + builder.make_dataset_urn( + ingest_feature["stream_source_platform"], + ingest_feature["stream_source_name"], + self.config.env, + ) + ) + + # append feature name and type + feature_snapshot.aspects.append( + MLFeaturePropertiesClass( + dataType=self.get_field_type( + ingest_feature["type"], ingest_feature["name"] + ), + sources=feature_sources, + ) + ) + + # make the MCE and workunit + mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot) + return MetadataWorkUnit(id=ingest_feature["name"], mce=mce) + + def get_feature_table_wu(self, ingest_table): + """ + Generate an MLFeatureTable workunit for a Feast feature table. + + Parameters + ---------- + ingest_table: + ingested Feast table + """ + + featuretable_snapshot = MLFeatureTableSnapshot( + urn=builder.make_ml_feature_table_urn("feast", ingest_table["name"]), + aspects=[ + BrowsePathsClass(paths=[f"feast/{ingest_table['name']}"]), + ], + ) + + featuretable_snapshot.aspects.append( + MLFeatureTablePropertiesClass( + mlFeatures=[ + builder.make_ml_feature_urn( + ingest_table["name"], + feature["name"], + ) + for feature in ingest_table["features"] + ], + # a feature table can have multiple primary keys, which then act as a composite key + mlPrimaryKeys=[ + builder.make_ml_primary_key_urn( + ingest_table["name"], entity["name"] + ) + for entity in ingest_table["entities"] + ], + ) + ) + + # make the MCE and workunit + mce = MetadataChangeEvent(proposedSnapshot=featuretable_snapshot) + return MetadataWorkUnit(id=ingest_table["name"], mce=mce) + + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + with tempfile.NamedTemporaryFile(suffix=".json") as tf: + + docker_client = docker.from_env() + + feast_image = HOSTED_FEAST_IMAGE + + # build the image locally if specified + if self.config.use_local_build: + dirname = os.path.dirname(__file__) + image_directory = os.path.join(dirname, "feast_image/") + + image, _ = docker_client.images.build(path=image_directory) + + feast_image = image.id + + docker_client.containers.run( + feast_image, + f"python3 ingest.py --core_url={quote(self.config.core_url)} --output_path=/out.json", + # allow the image to access the core URL if on host + network_mode="host", + # mount the tempfile so the Docker image has access + volumes={ + tf.name: {"bind": "/out.json", "mode": "rw"}, + }, + ) + + ingest = json.load(tf) + + # ingest tables + for ingest_table in ingest: + + # ingest entities in table + for ingest_entity in ingest_table["entities"]: + + wu = self.get_entity_wu(ingest_table, ingest_entity) + self.report.report_workunit(wu) + yield wu + + # ingest features in table + for ingest_feature in ingest_table["features"]: + + wu = self.get_feature_wu(ingest_table, ingest_feature) + self.report.report_workunit(wu) + yield wu + + wu = self.get_feature_table_wu(ingest_table) + self.report.report_workunit(wu) + yield wu + + def get_report(self) -> FeastSourceReport: + return self.report + + def close(self): + return diff --git a/metadata-ingestion/tests/integration/feast/core/core.yml b/metadata-ingestion/tests/integration/feast-legacy/core/core.yml similarity index 100% rename from metadata-ingestion/tests/integration/feast/core/core.yml rename to metadata-ingestion/tests/integration/feast-legacy/core/core.yml diff --git a/metadata-ingestion/tests/integration/feast/docker-compose.yml b/metadata-ingestion/tests/integration/feast-legacy/docker-compose.yml similarity index 100% rename from metadata-ingestion/tests/integration/feast/docker-compose.yml rename to metadata-ingestion/tests/integration/feast-legacy/docker-compose.yml diff --git a/metadata-ingestion/tests/integration/feast/feast_mces_golden.json b/metadata-ingestion/tests/integration/feast-legacy/feast_mces_golden.json similarity index 100% rename from metadata-ingestion/tests/integration/feast/feast_mces_golden.json rename to metadata-ingestion/tests/integration/feast-legacy/feast_mces_golden.json diff --git a/metadata-ingestion/tests/integration/feast/make_tests.py b/metadata-ingestion/tests/integration/feast-legacy/make_tests.py similarity index 85% rename from metadata-ingestion/tests/integration/feast/make_tests.py rename to metadata-ingestion/tests/integration/feast-legacy/make_tests.py index 3d5edf4a47..59ec7c89d0 100644 --- a/metadata-ingestion/tests/integration/feast/make_tests.py +++ b/metadata-ingestion/tests/integration/feast-legacy/make_tests.py @@ -1,14 +1,23 @@ import socket -from feast import Client -from feast.data_format import ParquetFormat -from feast.data_source import FileSource -from feast.entity import Entity -from feast.feature import Feature -from feast.feature_table import FeatureTable -from feast.value_type import ValueType +import feast + +FEAST_MIN_VERSION = "0.18.0" +if feast.__version__ <= FEAST_MIN_VERSION: + from feast import Client # type: ignore + from feast.data_format import ParquetFormat + from feast.data_source import FileSource # type: ignore + from feast.entity import Entity + from feast.feature import Feature + from feast.feature_table import FeatureTable # type: ignore + from feast.value_type import ValueType + if __name__ == "__main__": + if feast.__version__ > FEAST_MIN_VERSION: + raise Exception( + f"this code does not work with feast > {FEAST_MIN_VERSION}. Found {feast.__version__}" + ) test_client = Client(core_url="testfeast:6565") diff --git a/metadata-ingestion/tests/integration/feast/serving/online-serving.yml b/metadata-ingestion/tests/integration/feast-legacy/serving/online-serving.yml similarity index 100% rename from metadata-ingestion/tests/integration/feast/serving/online-serving.yml rename to metadata-ingestion/tests/integration/feast-legacy/serving/online-serving.yml diff --git a/metadata-ingestion/tests/integration/feast/test_feast.py b/metadata-ingestion/tests/integration/feast-legacy/test_feast.py similarity index 96% rename from metadata-ingestion/tests/integration/feast/test_feast.py rename to metadata-ingestion/tests/integration/feast-legacy/test_feast.py index 2435d89a67..2f8a9abe04 100644 --- a/metadata-ingestion/tests/integration/feast/test_feast.py +++ b/metadata-ingestion/tests/integration/feast-legacy/test_feast.py @@ -15,7 +15,7 @@ FROZEN_TIME = "2020-04-14 07:00:00" @freeze_time(FROZEN_TIME) @pytest.mark.integration def test_feast_ingest(docker_compose_runner, pytestconfig, tmp_path): - test_resources_dir = pytestconfig.rootpath / "tests/integration/feast" + test_resources_dir = pytestconfig.rootpath / "tests/integration/feast-legacy" with docker_compose_runner( test_resources_dir / "docker-compose.yml", "feast" @@ -32,7 +32,7 @@ def test_feast_ingest(docker_compose_runner, pytestconfig, tmp_path): { "run_id": "feast-test", "source": { - "type": "feast", + "type": "feast-legacy", "config": { "core_url": "localhost:6565", "use_local_build": True, diff --git a/metadata-ingestion/tests/integration/feast/wait-for-it.sh b/metadata-ingestion/tests/integration/feast-legacy/wait-for-it.sh similarity index 100% rename from metadata-ingestion/tests/integration/feast/wait-for-it.sh rename to metadata-ingestion/tests/integration/feast-legacy/wait-for-it.sh diff --git a/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json b/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json new file mode 100644 index 0000000000..860d4e4769 --- /dev/null +++ b/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json @@ -0,0 +1,321 @@ +[ +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLPrimaryKeySnapshot": { + "urn": "urn:li:mlPrimaryKey:(feature_store.driver_hourly_stats,driver_id)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLPrimaryKeyProperties": { + "description": "Driver ID", + "dataType": "ORDINAL", + "version": null, + "sources": [ + "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" + ] + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "feast-repository-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(feature_store.driver_hourly_stats,conv_rate)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "description": "Conv rate", + "dataType": "CONTINUOUS", + "version": null, + "sources": [ + "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" + ] + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "feast-repository-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(feature_store.driver_hourly_stats,acc_rate)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "description": "Acc rate", + "dataType": "CONTINUOUS", + "version": null, + "sources": [ + "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" + ] + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "feast-repository-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(feature_store.driver_hourly_stats,avg_daily_trips)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "description": "Avg daily trips", + "dataType": "ORDINAL", + "version": null, + "sources": [ + "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" + ] + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "feast-repository-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(feature_store.driver_hourly_stats,string_feature)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "description": "String feature", + "dataType": "TEXT", + "version": null, + "sources": [ + "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" + ] + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "feast-repository-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureTableSnapshot": { + "urn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,feature_store.driver_hourly_stats)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/feast/feature_store/feature_store.driver_hourly_stats" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { + "customProperties": {}, + "description": null, + "mlFeatures": [ + "urn:li:mlFeature:(feature_store.driver_hourly_stats,conv_rate)", + "urn:li:mlFeature:(feature_store.driver_hourly_stats,acc_rate)", + "urn:li:mlFeature:(feature_store.driver_hourly_stats,avg_daily_trips)", + "urn:li:mlFeature:(feature_store.driver_hourly_stats,string_feature)" + ], + "mlPrimaryKeys": [ + "urn:li:mlPrimaryKey:(feature_store.driver_hourly_stats,driver_id)" + ] + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "feast-repository-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(feature_store.transformed_conv_rate,conv_rate_plus_val1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "description": null, + "dataType": "CONTINUOUS", + "version": null, + "sources": [ + "urn:li:dataset:(urn:li:dataPlatform:request,vals_to_add,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" + ] + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "feast-repository-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(feature_store.transformed_conv_rate,conv_rate_plus_val2)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "description": null, + "dataType": "CONTINUOUS", + "version": null, + "sources": [ + "urn:li:dataset:(urn:li:dataPlatform:request,vals_to_add,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" + ] + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "feast-repository-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +}, +{ + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureTableSnapshot": { + "urn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,feature_store.transformed_conv_rate)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/feast/feature_store/feature_store.transformed_conv_rate" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { + "customProperties": {}, + "description": null, + "mlFeatures": [ + "urn:li:mlFeature:(feature_store.transformed_conv_rate,conv_rate_plus_val1)", + "urn:li:mlFeature:(feature_store.transformed_conv_rate,conv_rate_plus_val2)" + ], + "mlPrimaryKeys": [] + } + } + ] + } + }, + "proposedDelta": null, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "feast-repository-test", + "registryName": null, + "registryVersion": null, + "properties": null + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/feast/feature_store/data/driver_stats_with_string.parquet b/metadata-ingestion/tests/integration/feast/feature_store/data/driver_stats_with_string.parquet new file mode 100644 index 0000000000..83b8c31aa5 Binary files /dev/null and b/metadata-ingestion/tests/integration/feast/feature_store/data/driver_stats_with_string.parquet differ diff --git a/metadata-ingestion/tests/integration/feast/feature_store/data/online_store.db b/metadata-ingestion/tests/integration/feast/feature_store/data/online_store.db new file mode 100644 index 0000000000..5f58681702 Binary files /dev/null and b/metadata-ingestion/tests/integration/feast/feature_store/data/online_store.db differ diff --git a/metadata-ingestion/tests/integration/feast/feature_store/data/registry.db b/metadata-ingestion/tests/integration/feast/feature_store/data/registry.db new file mode 100644 index 0000000000..dbc0380179 Binary files /dev/null and b/metadata-ingestion/tests/integration/feast/feature_store/data/registry.db differ diff --git a/metadata-ingestion/tests/integration/feast/feature_store/feature_store.yaml b/metadata-ingestion/tests/integration/feast/feature_store/feature_store.yaml new file mode 100644 index 0000000000..f68061d824 --- /dev/null +++ b/metadata-ingestion/tests/integration/feast/feature_store/feature_store.yaml @@ -0,0 +1,10 @@ +project: feature_store +provider: local +online_store: + type: sqlite + path: data/online_store.db +offline_store: + type: file +flags: + alpha_features: true + on_demand_transforms: true diff --git a/metadata-ingestion/tests/integration/feast/feature_store/features.py b/metadata-ingestion/tests/integration/feast/feature_store/features.py new file mode 100644 index 0000000000..b91318a823 --- /dev/null +++ b/metadata-ingestion/tests/integration/feast/feature_store/features.py @@ -0,0 +1,69 @@ +from datetime import timedelta + +import pandas as pd +from feast import Entity, Feature, FeatureView, FileSource, ValueType +from feast.data_source import RequestDataSource +from feast.on_demand_feature_view import on_demand_feature_view + +driver_hourly_stats_source = FileSource( + path="data/driver_stats_with_string.parquet", + event_timestamp_column="event_timestamp", + created_timestamp_column="created", +) + +driver_entity = Entity( + name="driver_id", value_type=ValueType.INT64, description="Driver ID" +) + +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=["driver_id"], + ttl=timedelta(days=7), + features=[ + Feature( + name="conv_rate", + dtype=ValueType.FLOAT, + labels=dict(description="Conv rate"), + ), + Feature( + name="acc_rate", dtype=ValueType.FLOAT, labels=dict(description="Acc rate") + ), + Feature( + name="avg_daily_trips", + dtype=ValueType.INT64, + labels=dict(description="Avg daily trips"), + ), + Feature( + name="string_feature", + dtype=ValueType.STRING, + labels=dict(description="String feature"), + ), + ], + online=True, + batch_source=driver_hourly_stats_source, + tags={}, +) + +input_request = RequestDataSource( + name="vals_to_add", + schema={"val_to_add": ValueType.INT64, "val_to_add_2": ValueType.INT64}, +) + + +@on_demand_feature_view( # type: ignore + inputs={ + "driver_hourly_stats": driver_hourly_stats_view, + "vals_to_add": input_request, + }, + features=[ + Feature(name="conv_rate_plus_val1", dtype=ValueType.DOUBLE), + Feature(name="conv_rate_plus_val2", dtype=ValueType.DOUBLE), + ], +) +def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame: + df = pd.DataFrame() + + df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"] + df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"] + + return df diff --git a/metadata-ingestion/tests/integration/feast/test_feast_repository.py b/metadata-ingestion/tests/integration/feast/test_feast_repository.py new file mode 100644 index 0000000000..7121a01642 --- /dev/null +++ b/metadata-ingestion/tests/integration/feast/test_feast_repository.py @@ -0,0 +1,50 @@ +import os +import sys + +import pytest +from freezegun import freeze_time + +from datahub.ingestion.run.pipeline import Pipeline +from tests.test_helpers import mce_helpers + +FROZEN_TIME = "2020-04-14 07:00:00" + + +@freeze_time(FROZEN_TIME) +@pytest.mark.skipif( + os.getenv("AIRFLOW1_TEST") == "true", reason="feast requires Airflow 2.0 or newer" +) +@pytest.mark.skipif( + sys.version_info < (3, 7), reason="feast requires Python 3.7 or newer" +) +def test_feast_repository_ingest(pytestconfig, tmp_path, mock_time): + test_resources_dir = pytestconfig.rootpath / "tests/integration/feast" + output_path = tmp_path / "feast_repository_mces.json" + + pipeline = Pipeline.create( + { + "run_id": "feast-repository-test", + "source": { + "type": "feast", + "config": { + "path": str(test_resources_dir / "feature_store"), + "environment": "PROD", + }, + }, + "sink": { + "type": "file", + "config": { + "filename": str(output_path), + }, + }, + } + ) + + pipeline.run() + pipeline.raise_from_status() + + mce_helpers.check_golden_file( + pytestconfig, + output_path=output_path, + golden_path=test_resources_dir / "feast_repository_mces_golden.json", + ) diff --git a/metadata-ingestion/tox.ini b/metadata-ingestion/tox.ini index 978d33818b..060f7f5a3f 100644 --- a/metadata-ingestion/tox.ini +++ b/metadata-ingestion/tox.ini @@ -36,6 +36,9 @@ deps = .[dev-airflow1] -c tests/airflow1-constraints.txt +setenv = + AIRFLOW1_TEST = true + [testenv:py3-full] deps = .[dev]