feat(ingest): feast - add support for Feast 0.18, deprecate older integration (#4094)

This commit is contained in:
Danilo Peixoto 2022-04-26 18:35:02 -03:00 committed by GitHub
parent 4b913f674c
commit d2a6bc06dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1197 additions and 243 deletions

View File

@ -71,7 +71,8 @@ We use a plugin architecture so that you can install only the dependencies you a
| [datahub-business-glossary](../metadata-ingestion/source_docs/business_glossary.md) | _no additional dependencies_ | Business Glossary File source |
| [dbt](../metadata-ingestion/source_docs/dbt.md) | _no additional dependencies_ | dbt source |
| [druid](../metadata-ingestion/source_docs/druid.md) | `pip install 'acryl-datahub[druid]'` | Druid Source |
| [feast](../metadata-ingestion/source_docs/feast.md) | `pip install 'acryl-datahub[feast]'` | Feast source |
| [feast-legacy](../metadata-ingestion/source_docs/feast_legacy.md) | `pip install 'acryl-datahub[feast-legacy]'` | Feast source (legacy) |
| [feast](../metadata-ingestion/source_docs/feast.md) | `pip install 'acryl-datahub[feast]'` | Feast source (0.18.0) |
| [glue](../metadata-ingestion/source_docs/glue.md) | `pip install 'acryl-datahub[glue]'` | AWS Glue source |
| [hive](../metadata-ingestion/source_docs/hive.md) | `pip install 'acryl-datahub[hive]'` | Hive source |
| [kafka](../metadata-ingestion/source_docs/kafka.md) | `pip install 'acryl-datahub[kafka]'` | Kafka source |

View File

@ -0,0 +1,9 @@
source:
type: "feast-repository"
config:
path: "/path/to/repository/"
environment: "PROD"
sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"

View File

@ -172,7 +172,8 @@ plugins: Dict[str, Set[str]] = {
# https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/release-notes.html#rn-7-14-0
# https://github.com/elastic/elasticsearch-py/issues/1639#issuecomment-883587433
"elasticsearch": {"elasticsearch==7.13.4"},
"feast": {"docker"},
"feast-legacy": {"docker"},
"feast": {"feast==0.18.0", "flask-openid>=1.3.0"},
"glue": aws_common,
"hive": sql_common
| {
@ -322,10 +323,30 @@ base_dev_requirements = {
),
}
base_dev_requirements_airflow_1 = base_dev_requirements.copy()
if is_py37_or_newer:
# The lookml plugin only works on Python 3.7 or newer.
# These plugins only work on Python 3.7 or newer.
base_dev_requirements = base_dev_requirements.union(
{dependency for plugin in ["lookml"] for dependency in plugins[plugin]}
{
dependency
for plugin in [
"feast",
"lookml",
]
for dependency in plugins[plugin]
}
)
# These plugins are compatible with Airflow 1.
base_dev_requirements_airflow_1 = base_dev_requirements_airflow_1.union(
{
dependency
for plugin in [
"lookml",
]
for dependency in plugins[plugin]
}
)
dev_requirements = {
@ -340,7 +361,7 @@ dev_requirements_airflow_1_base = {
"WTForms==2.3.3", # make constraint consistent with extras
}
dev_requirements_airflow_1 = {
*base_dev_requirements,
*base_dev_requirements_airflow_1,
*dev_requirements_airflow_1_base,
}
@ -348,11 +369,9 @@ full_test_dev_requirements = {
*list(
dependency
for plugin in [
# Only include Athena for Python 3.7 or newer.
*(["athena"] if is_py37_or_newer else []),
"clickhouse",
"druid",
"feast",
"feast-legacy",
"hive",
"ldap",
"mongodb",
@ -367,6 +386,19 @@ full_test_dev_requirements = {
),
}
if is_py37_or_newer:
# These plugins only work on Python 3.7 or newer.
full_test_dev_requirements = full_test_dev_requirements.union(
{
dependency
for plugin in [
"athena",
"feast",
]
for dependency in plugins[plugin]
}
)
entry_points = {
"console_scripts": ["datahub = datahub.entrypoints:main"],
"datahub.ingestion.source.plugins": [
@ -383,7 +415,8 @@ entry_points = {
"dbt = datahub.ingestion.source.dbt:DBTSource",
"druid = datahub.ingestion.source.sql.druid:DruidSource",
"elasticsearch = datahub.ingestion.source.elastic_search:ElasticsearchSource",
"feast = datahub.ingestion.source.feast:FeastSource",
"feast-legacy = datahub.ingestion.source.feast_legacy:FeastSource",
"feast = datahub.ingestion.source.feast:FeastRepositorySource",
"glue = datahub.ingestion.source.aws.glue:GlueSource",
"sagemaker = datahub.ingestion.source.aws.sagemaker:SagemakerSource",
"hive = datahub.ingestion.source.sql.hive:HiveSource",

View File

@ -2,23 +2,30 @@
For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md).
## Setup
This source is designed for Feast 10+ repositories.
**Note: Feast ingestion requires Docker to be installed.**
As of version 0.10+, Feast has changed the architecture from a stack of services to SDK/CLI centric application. Please refer to [Feast 0.9 vs Feast 0.10+](https://docs.feast.dev/project/feast-0.9-vs-feast-0.10+) for further details.
For compatibility with pre-0.10 Feast, see [Feast Legacy](feast_legacy.md) source.
:::note
This source is only compatible with Feast 0.18.0
:::
## Setup
To install this plugin, run `pip install 'acryl-datahub[feast]'`.
## Capabilities
This plugin extracts the following:
This plugin extracts:
- List of feature tables (modeled as [`MLFeatureTable`](https://github.com/datahub-project/datahub/blob/master/metadata-models/src/main/pegasus/com/linkedin/ml/metadata/MLFeatureTableProperties.pdl)s),
features ([`MLFeature`](https://github.com/datahub-project/datahub/blob/master/metadata-models/src/main/pegasus/com/linkedin/ml/metadata/MLFeatureProperties.pdl)s),
and entities ([`MLPrimaryKey`](https://github.com/datahub-project/datahub/blob/master/metadata-models/src/main/pegasus/com/linkedin/ml/metadata/MLPrimaryKeyProperties.pdl)s)
- Column types associated with each feature and entity
Note: this uses a separate Docker container to extract Feast's metadata into a JSON file, which is then
parsed to DataHub's native objects. This separation was performed because of a dependency conflict in the `feast` module.
- Entities as [`MLPrimaryKey`](https://datahubproject.io/docs/graphql/objects#mlprimarykey)
- Features as [`MLFeature`](https://datahubproject.io/docs/graphql/objects#mlfeature)
- Feature views and on-demand feature views as [`MLFeatureTable`](https://datahubproject.io/docs/graphql/objects#mlfeaturetable)
- Batch and stream source details as [`Dataset`](https://datahubproject.io/docs/graphql/objects#dataset)
- Column types associated with each entity and feature
## Quickstart recipe
@ -26,12 +33,14 @@ Check out the following recipe to get started with ingestion! See [below](#confi
For general pointers on writing and running a recipe, see our [main recipe guide](../README.md#recipes).
```yml
```yaml
source:
type: feast
type: "feast"
config:
# Coordinates
core_url: "localhost:6565"
path: "/path/to/repository/"
# Options
environment: "PROD"
sink:
# sink configs
@ -41,15 +50,14 @@ sink:
Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description |
| ----------------- | -------- | ------------------ | ------------------------------------------------------- |
| `core_url` | | `"localhost:6565"` | URL of Feast Core instance. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `use_local_build` | | `False` | Whether to build Feast ingestion Docker image locally. |
| Field | Required | Default | Description |
| ------------- | -------- | ------- | ------------------------------------------ |
| `path` | ✅ | | Path to Feast repository. |
| `environment` | | `PROD` | Environment to use when constructing URNs. |
## Compatibility
Coming soon!
This source is compatible with [Feast (==0.18.0)](https://github.com/feast-dev/feast/releases/tag/v0.18.0).
## Questions

View File

@ -0,0 +1,63 @@
# Feast (Legacy)
For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md).
This source is designed for Feast 0.9 core services.
As of version 0.10+, Feast has changed the architecture from a stack of services to SDK/CLI centric application. Please refer to [Feast 0.9 vs Feast 0.10+](https://docs.feast.dev/project/feast-0.9-vs-feast-0.10+) for further details.
See [Feast](feast.md) source for something compatible with the latest Feast versions.
## Setup
**Note: Feast ingestion requires Docker to be installed.**
To install this plugin, run `pip install 'acryl-datahub[feast-legacy]'`.
## Capabilities
This plugin extracts the following:
- Entities as [`MLPrimaryKey`](https://datahubproject.io/docs/graphql/objects#mlprimarykey)
- Features as [`MLFeature`](https://datahubproject.io/docs/graphql/objects#mlfeature)
- Feature tables as [`MLFeatureTable`](https://datahubproject.io/docs/graphql/objects#mlfeaturetable)
- Batch and stream source details as [`Dataset`](https://datahubproject.io/docs/graphql/objects#dataset)
- Column types associated with each entity and feature
Note: this uses a separate Docker container to extract Feast's metadata into a JSON file, which is then
parsed to DataHub's native objects. This separation was performed because of a dependency conflict in the `feast` module.
## Quickstart recipe
Check out the following recipe to get started with ingestion! See [below](#config-details) for full configuration options.
For general pointers on writing and running a recipe, see our [main recipe guide](../README.md#recipes).
```yml
source:
type: feast-legacy
config:
# Coordinates
core_url: "localhost:6565"
sink:
# sink configs
```
## Config details
Note that a `.` is used to denote nested fields in the YAML recipe.
| Field | Required | Default | Description |
| ----------------- | -------- | ------------------ | ------------------------------------------------------- |
| `core_url` | | `"localhost:6565"` | URL of Feast Core instance. |
| `env` | | `"PROD"` | Environment to use in namespace when constructing URNs. |
| `use_local_build` | | `False` | Whether to build Feast ingestion Docker image locally. |
## Compatibility
This source is compatible with [Feast (0.10.5)](https://github.com/feast-dev/feast/releases/tag/v0.10.5) and earlier versions.
## Questions
If you've got any questions on configuring this source, feel free to ping us on [our Slack](https://slack.datahubproject.io/)!

View File

@ -1,11 +1,23 @@
import json
import os
import tempfile
from dataclasses import dataclass, field
from shlex import quote
from typing import Dict, Iterable, List
import sys
from dataclasses import dataclass
from typing import Dict, Iterable, List, Tuple, Union
import docker
if sys.version_info >= (3, 7):
from feast import (
BigQuerySource,
Entity,
Feature,
FeatureStore,
FeatureView,
FileSource,
KafkaSource,
KinesisSource,
OnDemandFeatureView,
ValueType,
)
from feast.data_source import DataSource, RequestDataSource
else:
raise ModuleNotFoundError("The feast plugin requires Python 3.7 or newer.")
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
@ -25,283 +37,334 @@ from datahub.metadata.schema_classes import (
MLFeaturePropertiesClass,
MLFeatureTablePropertiesClass,
MLPrimaryKeyPropertiesClass,
StatusClass,
)
# map Feast types to DataHub classes
_field_type_mapping: Dict[str, str] = {
"BYTES": MLFeatureDataType.BYTE,
"STRING": MLFeatureDataType.TEXT,
"INT32": MLFeatureDataType.ORDINAL,
"INT64": MLFeatureDataType.ORDINAL,
"DOUBLE": MLFeatureDataType.CONTINUOUS,
"FLOAT": MLFeatureDataType.CONTINUOUS,
"BOOL": MLFeatureDataType.BINARY,
"UNIX_TIMESTAMP": MLFeatureDataType.TIME,
"BYTES_LIST": MLFeatureDataType.SEQUENCE,
"STRING_LIST": MLFeatureDataType.SEQUENCE,
"INT32_LIST": MLFeatureDataType.SEQUENCE,
"INT64_LIST": MLFeatureDataType.SEQUENCE,
"DOUBLE_LIST": MLFeatureDataType.SEQUENCE,
"FLOAT_LIST": MLFeatureDataType.SEQUENCE,
"BOOL_LIST": MLFeatureDataType.SEQUENCE,
"UNIX_TIMESTAMP_LIST": MLFeatureDataType.SEQUENCE,
assert sys.version_info >= (3, 7) # needed for mypy
_field_type_mapping: Dict[ValueType, str] = {
ValueType.UNKNOWN: MLFeatureDataType.UNKNOWN,
ValueType.BYTES: MLFeatureDataType.BYTE,
ValueType.STRING: MLFeatureDataType.TEXT,
ValueType.INT32: MLFeatureDataType.ORDINAL,
ValueType.INT64: MLFeatureDataType.ORDINAL,
ValueType.DOUBLE: MLFeatureDataType.CONTINUOUS,
ValueType.FLOAT: MLFeatureDataType.CONTINUOUS,
ValueType.BOOL: MLFeatureDataType.BINARY,
ValueType.UNIX_TIMESTAMP: MLFeatureDataType.TIME,
ValueType.BYTES_LIST: MLFeatureDataType.SEQUENCE,
ValueType.STRING_LIST: MLFeatureDataType.SEQUENCE,
ValueType.INT32_LIST: MLFeatureDataType.SEQUENCE,
ValueType.INT64_LIST: MLFeatureDataType.SEQUENCE,
ValueType.DOUBLE_LIST: MLFeatureDataType.SEQUENCE,
ValueType.FLOAT_LIST: MLFeatureDataType.SEQUENCE,
ValueType.BOOL_LIST: MLFeatureDataType.SEQUENCE,
ValueType.UNIX_TIMESTAMP_LIST: MLFeatureDataType.SEQUENCE,
ValueType.NULL: MLFeatureDataType.UNKNOWN,
}
# image to use for initial feast extraction
HOSTED_FEAST_IMAGE = "acryldata/datahub-ingestion-feast-wrapper"
class FeastConfig(ConfigModel):
core_url: str = "localhost:6565"
env: str = DEFAULT_ENV
use_local_build: bool = False
class FeastRepositorySourceConfig(ConfigModel):
path: str
environment: str = DEFAULT_ENV
@dataclass
class FeastSourceReport(SourceReport):
filtered: List[str] = field(default_factory=list)
class FeastRepositorySource(Source):
source_config: FeastRepositorySourceConfig
report: SourceReport
feature_store: FeatureStore
def report_dropped(self, name: str) -> None:
self.filtered.append(name)
@dataclass
class FeastSource(Source):
config: FeastConfig
report: FeastSourceReport
def __init__(self, ctx: PipelineContext, config: FeastConfig):
def __init__(self, config: FeastRepositorySourceConfig, ctx: PipelineContext):
super().__init__(ctx)
self.config = config
self.report = FeastSourceReport()
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "FeastSource":
config = FeastConfig.parse_obj(config_dict)
return cls(ctx, config)
self.source_config = config
self.report = SourceReport()
self.feature_store = FeatureStore(self.source_config.path)
def get_field_type(self, field_type: str, parent_name: str) -> str:
def _get_field_type(self, field_type: ValueType, parent_name: str) -> str:
"""
Maps types encountered in Feast to corresponding schema types.
Parameters
----------
field_type:
type of a Feast object
parent_name:
name of table (for logging)
"""
enum_type = _field_type_mapping.get(field_type)
if enum_type is None:
ml_feature_data_type = _field_type_mapping.get(field_type)
if ml_feature_data_type is None:
self.report.report_warning(
parent_name, f"unable to map type {field_type} to metadata schema"
)
enum_type = MLFeatureDataType.UNKNOWN
return enum_type
ml_feature_data_type = MLFeatureDataType.UNKNOWN
def get_entity_wu(self, ingest_table, ingest_entity):
return ml_feature_data_type
def _get_data_source_details(self, source: DataSource) -> Tuple[str, str]:
"""
Generate an MLPrimaryKey workunit for a Feast entity.
Parameters
----------
ingest_table:
ingested Feast table
ingest_entity:
ingested Feast entity
Get Feast batch/stream source platform and name.
"""
# create snapshot instance for the entity
platform = "unknown"
name = "unknown"
if isinstance(source, FileSource):
platform = "file"
name = source.path.replace("://", ".").replace("/", ".")
if isinstance(source, BigQuerySource):
platform = "bigquery"
name = source.table
if isinstance(source, KafkaSource):
platform = "kafka"
name = source.kafka_options.topic
if isinstance(source, KinesisSource):
platform = "kinesis"
name = (
f"{source.kinesis_options.region}:{source.kinesis_options.stream_name}"
)
if isinstance(source, RequestDataSource):
platform = "request"
name = source.name
return platform, name
def _get_data_sources(self, feature_view: FeatureView) -> List[str]:
"""
Get data source URN list.
"""
sources = []
if feature_view.batch_source is not None:
batch_source_platform, batch_source_name = self._get_data_source_details(
feature_view.batch_source
)
sources.append(
builder.make_dataset_urn(
batch_source_platform,
batch_source_name,
self.source_config.environment,
)
)
if feature_view.stream_source is not None:
stream_source_platform, stream_source_name = self._get_data_source_details(
feature_view.stream_source
)
sources.append(
builder.make_dataset_urn(
stream_source_platform,
stream_source_name,
self.source_config.environment,
)
)
return sources
def _get_entity_workunit(
self, feature_view: FeatureView, entity: Entity
) -> MetadataWorkUnit:
"""
Generate an MLPrimaryKey work unit for a Feast entity.
"""
feature_view_name = f"{self.feature_store.project}.{feature_view.name}"
entity_snapshot = MLPrimaryKeySnapshot(
urn=builder.make_ml_primary_key_urn(
ingest_table["name"], ingest_entity["name"]
),
aspects=[],
urn=builder.make_ml_primary_key_urn(feature_view_name, entity.name),
aspects=[StatusClass(removed=False)],
)
entity_sources = []
if ingest_entity["batch_source"] is not None:
entity_sources.append(
builder.make_dataset_urn(
ingest_entity["batch_source_platform"],
ingest_entity["batch_source_name"],
self.config.env,
)
)
if ingest_entity["stream_source"] is not None:
entity_sources.append(
builder.make_dataset_urn(
ingest_entity["stream_source_platform"],
ingest_entity["stream_source_name"],
self.config.env,
)
)
# append entity name and type
entity_snapshot.aspects.append(
MLPrimaryKeyPropertiesClass(
description=ingest_entity["description"],
dataType=self.get_field_type(
ingest_entity["type"], ingest_entity["name"]
),
sources=entity_sources,
description=entity.description,
dataType=self._get_field_type(entity.value_type, entity.name),
sources=self._get_data_sources(feature_view),
)
)
# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=entity_snapshot)
return MetadataWorkUnit(id=ingest_entity["name"], mce=mce)
def get_feature_wu(self, ingest_table, ingest_feature):
return MetadataWorkUnit(id=entity.name, mce=mce)
def _get_feature_workunit(
self,
feature_view: Union[FeatureView, OnDemandFeatureView],
feature: Feature,
) -> MetadataWorkUnit:
"""
Generate an MLFeature workunit for a Feast feature.
Parameters
----------
ingest_table:
ingested Feast table
ingest_feature:
ingested Feast feature
Generate an MLFeature work unit for a Feast feature.
"""
feature_view_name = f"{self.feature_store.project}.{feature_view.name}"
# create snapshot instance for the feature
feature_snapshot = MLFeatureSnapshot(
urn=builder.make_ml_feature_urn(
ingest_table["name"], ingest_feature["name"]
),
aspects=[],
urn=builder.make_ml_feature_urn(feature_view_name, feature.name),
aspects=[StatusClass(removed=False)],
)
feature_sources = []
if ingest_feature["batch_source"] is not None:
feature_sources.append(
builder.make_dataset_urn(
ingest_feature["batch_source_platform"],
ingest_feature["batch_source_name"],
self.config.env,
)
)
if isinstance(feature_view, FeatureView):
feature_sources = self._get_data_sources(feature_view)
elif isinstance(feature_view, OnDemandFeatureView):
if feature_view.input_request_data_sources is not None:
for request_source in feature_view.input_request_data_sources.values():
source_platform, source_name = self._get_data_source_details(
request_source
)
if ingest_feature["stream_source"] is not None:
feature_sources.append(
builder.make_dataset_urn(
ingest_feature["stream_source_platform"],
ingest_feature["stream_source_name"],
self.config.env,
)
)
feature_sources.append(
builder.make_dataset_urn(
source_platform,
source_name,
self.source_config.environment,
)
)
if feature_view.input_feature_view_projections is not None:
for (
feature_view_projection
) in feature_view.input_feature_view_projections.values():
feature_view_source = self.feature_store.get_feature_view(
feature_view_projection.name
)
feature_sources.extend(self._get_data_sources(feature_view_source))
# append feature name and type
feature_snapshot.aspects.append(
MLFeaturePropertiesClass(
dataType=self.get_field_type(
ingest_feature["type"], ingest_feature["name"]
),
description=feature.labels.get("description"),
dataType=self._get_field_type(feature.dtype, feature.name),
sources=feature_sources,
)
)
# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot)
return MetadataWorkUnit(id=ingest_feature["name"], mce=mce)
def get_feature_table_wu(self, ingest_table):
return MetadataWorkUnit(id=feature.name, mce=mce)
def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkUnit:
"""
Generate an MLFeatureTable workunit for a Feast feature table.
Parameters
----------
ingest_table:
ingested Feast table
Generate an MLFeatureTable work unit for a Feast feature view.
"""
featuretable_snapshot = MLFeatureTableSnapshot(
urn=builder.make_ml_feature_table_urn("feast", ingest_table["name"]),
feature_view_name = f"{self.feature_store.project}.{feature_view.name}"
feature_view_snapshot = MLFeatureTableSnapshot(
urn=builder.make_ml_feature_table_urn("feast", feature_view_name),
aspects=[
BrowsePathsClass(paths=[f"feast/{ingest_table['name']}"]),
BrowsePathsClass(
paths=[f"/feast/{self.feature_store.project}/{feature_view_name}"]
),
StatusClass(removed=False),
],
)
featuretable_snapshot.aspects.append(
feature_view_snapshot.aspects.append(
MLFeatureTablePropertiesClass(
mlFeatures=[
builder.make_ml_feature_urn(
ingest_table["name"],
feature["name"],
feature_view_name,
feature.name,
)
for feature in ingest_table["features"]
for feature in feature_view.features
],
# a feature table can have multiple primary keys, which then act as a composite key
mlPrimaryKeys=[
builder.make_ml_primary_key_urn(
ingest_table["name"], entity["name"]
)
for entity in ingest_table["entities"]
builder.make_ml_primary_key_urn(feature_view_name, entity_name)
for entity_name in feature_view.entities
],
)
)
# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=featuretable_snapshot)
return MetadataWorkUnit(id=ingest_table["name"], mce=mce)
mce = MetadataChangeEvent(proposedSnapshot=feature_view_snapshot)
return MetadataWorkUnit(id=feature_view_name, mce=mce)
def _get_on_demand_feature_view_workunit(
self, on_demand_feature_view: OnDemandFeatureView
) -> MetadataWorkUnit:
"""
Generate an MLFeatureTable work unit for a Feast on-demand feature view.
"""
on_demand_feature_view_name = (
f"{self.feature_store.project}.{on_demand_feature_view.name}"
)
on_demand_feature_view_snapshot = MLFeatureTableSnapshot(
urn=builder.make_ml_feature_table_urn("feast", on_demand_feature_view_name),
aspects=[
BrowsePathsClass(
paths=[
f"/feast/{self.feature_store.project}/{on_demand_feature_view_name}"
]
),
StatusClass(removed=False),
],
)
on_demand_feature_view_snapshot.aspects.append(
MLFeatureTablePropertiesClass(
mlFeatures=[
builder.make_ml_feature_urn(
on_demand_feature_view_name,
feature.name,
)
for feature in on_demand_feature_view.features
],
mlPrimaryKeys=[],
)
)
mce = MetadataChangeEvent(proposedSnapshot=on_demand_feature_view_snapshot)
return MetadataWorkUnit(id=on_demand_feature_view_name, mce=mce)
@classmethod
def create(cls, config_dict, ctx):
config = FeastRepositorySourceConfig.parse_obj(config_dict)
return cls(config, ctx)
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
with tempfile.NamedTemporaryFile(suffix=".json") as tf:
for feature_view in self.feature_store.list_feature_views():
for entity_name in feature_view.entities:
entity = self.feature_store.get_entity(entity_name)
docker_client = docker.from_env()
work_unit = self._get_entity_workunit(feature_view, entity)
self.report.report_workunit(work_unit)
feast_image = HOSTED_FEAST_IMAGE
yield work_unit
# build the image locally if specified
if self.config.use_local_build:
dirname = os.path.dirname(__file__)
image_directory = os.path.join(dirname, "feast_image/")
for feature in feature_view.features:
work_unit = self._get_feature_workunit(feature_view, feature)
self.report.report_workunit(work_unit)
image, _ = docker_client.images.build(path=image_directory)
yield work_unit
feast_image = image.id
work_unit = self._get_feature_view_workunit(feature_view)
self.report.report_workunit(work_unit)
docker_client.containers.run(
feast_image,
f"python3 ingest.py --core_url={quote(self.config.core_url)} --output_path=/out.json",
# allow the image to access the core URL if on host
network_mode="host",
# mount the tempfile so the Docker image has access
volumes={
tf.name: {"bind": "/out.json", "mode": "rw"},
},
yield work_unit
for on_demand_feature_view in self.feature_store.list_on_demand_feature_views():
for feature in on_demand_feature_view.features:
work_unit = self._get_feature_workunit(on_demand_feature_view, feature)
self.report.report_workunit(work_unit)
yield work_unit
work_unit = self._get_on_demand_feature_view_workunit(
on_demand_feature_view
)
self.report.report_workunit(work_unit)
ingest = json.load(tf)
yield work_unit
# ingest tables
for ingest_table in ingest:
# ingest entities in table
for ingest_entity in ingest_table["entities"]:
wu = self.get_entity_wu(ingest_table, ingest_entity)
self.report.report_workunit(wu)
yield wu
# ingest features in table
for ingest_feature in ingest_table["features"]:
wu = self.get_feature_wu(ingest_table, ingest_feature)
self.report.report_workunit(wu)
yield wu
wu = self.get_feature_table_wu(ingest_table)
self.report.report_workunit(wu)
yield wu
def get_report(self) -> FeastSourceReport:
def get_report(self) -> SourceReport:
return self.report
def close(self):
def close(self) -> None:
return

View File

@ -1,8 +1,16 @@
import json
import click
from feast import Client
from feast.data_source import BigQuerySource, FileSource, KafkaSource, KinesisSource
import feast
if feast.__version__ <= "0.18.0":
from feast import Client # type: ignore
from feast.data_source import ( # type: ignore
BigQuerySource,
FileSource,
KafkaSource,
KinesisSource,
)
@click.command(

View File

@ -0,0 +1,307 @@
import json
import os
import tempfile
from dataclasses import dataclass, field
from shlex import quote
from typing import Dict, Iterable, List
import docker
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import DEFAULT_ENV
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import MLFeatureDataType
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
MLFeatureSnapshot,
MLFeatureTableSnapshot,
MLPrimaryKeySnapshot,
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
BrowsePathsClass,
MLFeaturePropertiesClass,
MLFeatureTablePropertiesClass,
MLPrimaryKeyPropertiesClass,
)
# map Feast types to DataHub classes
_field_type_mapping: Dict[str, str] = {
"BYTES": MLFeatureDataType.BYTE,
"STRING": MLFeatureDataType.TEXT,
"INT32": MLFeatureDataType.ORDINAL,
"INT64": MLFeatureDataType.ORDINAL,
"DOUBLE": MLFeatureDataType.CONTINUOUS,
"FLOAT": MLFeatureDataType.CONTINUOUS,
"BOOL": MLFeatureDataType.BINARY,
"UNIX_TIMESTAMP": MLFeatureDataType.TIME,
"BYTES_LIST": MLFeatureDataType.SEQUENCE,
"STRING_LIST": MLFeatureDataType.SEQUENCE,
"INT32_LIST": MLFeatureDataType.SEQUENCE,
"INT64_LIST": MLFeatureDataType.SEQUENCE,
"DOUBLE_LIST": MLFeatureDataType.SEQUENCE,
"FLOAT_LIST": MLFeatureDataType.SEQUENCE,
"BOOL_LIST": MLFeatureDataType.SEQUENCE,
"UNIX_TIMESTAMP_LIST": MLFeatureDataType.SEQUENCE,
}
# image to use for initial feast extraction
HOSTED_FEAST_IMAGE = "acryldata/datahub-ingestion-feast-wrapper"
class FeastConfig(ConfigModel):
core_url: str = "localhost:6565"
env: str = DEFAULT_ENV
use_local_build: bool = False
@dataclass
class FeastSourceReport(SourceReport):
filtered: List[str] = field(default_factory=list)
def report_dropped(self, name: str) -> None:
self.filtered.append(name)
@dataclass
class FeastSource(Source):
config: FeastConfig
report: FeastSourceReport
def __init__(self, ctx: PipelineContext, config: FeastConfig):
super().__init__(ctx)
self.config = config
self.report = FeastSourceReport()
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "FeastSource":
config = FeastConfig.parse_obj(config_dict)
return cls(ctx, config)
def get_field_type(self, field_type: str, parent_name: str) -> str:
"""
Maps types encountered in Feast to corresponding schema types.
Parameters
----------
field_type:
type of a Feast object
parent_name:
name of table (for logging)
"""
enum_type = _field_type_mapping.get(field_type)
if enum_type is None:
self.report.report_warning(
parent_name, f"unable to map type {field_type} to metadata schema"
)
enum_type = MLFeatureDataType.UNKNOWN
return enum_type
def get_entity_wu(self, ingest_table, ingest_entity):
"""
Generate an MLPrimaryKey workunit for a Feast entity.
Parameters
----------
ingest_table:
ingested Feast table
ingest_entity:
ingested Feast entity
"""
# create snapshot instance for the entity
entity_snapshot = MLPrimaryKeySnapshot(
urn=builder.make_ml_primary_key_urn(
ingest_table["name"], ingest_entity["name"]
),
aspects=[],
)
entity_sources = []
if ingest_entity["batch_source"] is not None:
entity_sources.append(
builder.make_dataset_urn(
ingest_entity["batch_source_platform"],
ingest_entity["batch_source_name"],
self.config.env,
)
)
if ingest_entity["stream_source"] is not None:
entity_sources.append(
builder.make_dataset_urn(
ingest_entity["stream_source_platform"],
ingest_entity["stream_source_name"],
self.config.env,
)
)
# append entity name and type
entity_snapshot.aspects.append(
MLPrimaryKeyPropertiesClass(
description=ingest_entity["description"],
dataType=self.get_field_type(
ingest_entity["type"], ingest_entity["name"]
),
sources=entity_sources,
)
)
# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=entity_snapshot)
return MetadataWorkUnit(id=ingest_entity["name"], mce=mce)
def get_feature_wu(self, ingest_table, ingest_feature):
"""
Generate an MLFeature workunit for a Feast feature.
Parameters
----------
ingest_table:
ingested Feast table
ingest_feature:
ingested Feast feature
"""
# create snapshot instance for the feature
feature_snapshot = MLFeatureSnapshot(
urn=builder.make_ml_feature_urn(
ingest_table["name"], ingest_feature["name"]
),
aspects=[],
)
feature_sources = []
if ingest_feature["batch_source"] is not None:
feature_sources.append(
builder.make_dataset_urn(
ingest_feature["batch_source_platform"],
ingest_feature["batch_source_name"],
self.config.env,
)
)
if ingest_feature["stream_source"] is not None:
feature_sources.append(
builder.make_dataset_urn(
ingest_feature["stream_source_platform"],
ingest_feature["stream_source_name"],
self.config.env,
)
)
# append feature name and type
feature_snapshot.aspects.append(
MLFeaturePropertiesClass(
dataType=self.get_field_type(
ingest_feature["type"], ingest_feature["name"]
),
sources=feature_sources,
)
)
# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot)
return MetadataWorkUnit(id=ingest_feature["name"], mce=mce)
def get_feature_table_wu(self, ingest_table):
"""
Generate an MLFeatureTable workunit for a Feast feature table.
Parameters
----------
ingest_table:
ingested Feast table
"""
featuretable_snapshot = MLFeatureTableSnapshot(
urn=builder.make_ml_feature_table_urn("feast", ingest_table["name"]),
aspects=[
BrowsePathsClass(paths=[f"feast/{ingest_table['name']}"]),
],
)
featuretable_snapshot.aspects.append(
MLFeatureTablePropertiesClass(
mlFeatures=[
builder.make_ml_feature_urn(
ingest_table["name"],
feature["name"],
)
for feature in ingest_table["features"]
],
# a feature table can have multiple primary keys, which then act as a composite key
mlPrimaryKeys=[
builder.make_ml_primary_key_urn(
ingest_table["name"], entity["name"]
)
for entity in ingest_table["entities"]
],
)
)
# make the MCE and workunit
mce = MetadataChangeEvent(proposedSnapshot=featuretable_snapshot)
return MetadataWorkUnit(id=ingest_table["name"], mce=mce)
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
with tempfile.NamedTemporaryFile(suffix=".json") as tf:
docker_client = docker.from_env()
feast_image = HOSTED_FEAST_IMAGE
# build the image locally if specified
if self.config.use_local_build:
dirname = os.path.dirname(__file__)
image_directory = os.path.join(dirname, "feast_image/")
image, _ = docker_client.images.build(path=image_directory)
feast_image = image.id
docker_client.containers.run(
feast_image,
f"python3 ingest.py --core_url={quote(self.config.core_url)} --output_path=/out.json",
# allow the image to access the core URL if on host
network_mode="host",
# mount the tempfile so the Docker image has access
volumes={
tf.name: {"bind": "/out.json", "mode": "rw"},
},
)
ingest = json.load(tf)
# ingest tables
for ingest_table in ingest:
# ingest entities in table
for ingest_entity in ingest_table["entities"]:
wu = self.get_entity_wu(ingest_table, ingest_entity)
self.report.report_workunit(wu)
yield wu
# ingest features in table
for ingest_feature in ingest_table["features"]:
wu = self.get_feature_wu(ingest_table, ingest_feature)
self.report.report_workunit(wu)
yield wu
wu = self.get_feature_table_wu(ingest_table)
self.report.report_workunit(wu)
yield wu
def get_report(self) -> FeastSourceReport:
return self.report
def close(self):
return

View File

@ -1,14 +1,23 @@
import socket
from feast import Client
from feast.data_format import ParquetFormat
from feast.data_source import FileSource
from feast.entity import Entity
from feast.feature import Feature
from feast.feature_table import FeatureTable
from feast.value_type import ValueType
import feast
FEAST_MIN_VERSION = "0.18.0"
if feast.__version__ <= FEAST_MIN_VERSION:
from feast import Client # type: ignore
from feast.data_format import ParquetFormat
from feast.data_source import FileSource # type: ignore
from feast.entity import Entity
from feast.feature import Feature
from feast.feature_table import FeatureTable # type: ignore
from feast.value_type import ValueType
if __name__ == "__main__":
if feast.__version__ > FEAST_MIN_VERSION:
raise Exception(
f"this code does not work with feast > {FEAST_MIN_VERSION}. Found {feast.__version__}"
)
test_client = Client(core_url="testfeast:6565")

View File

@ -15,7 +15,7 @@ FROZEN_TIME = "2020-04-14 07:00:00"
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_feast_ingest(docker_compose_runner, pytestconfig, tmp_path):
test_resources_dir = pytestconfig.rootpath / "tests/integration/feast"
test_resources_dir = pytestconfig.rootpath / "tests/integration/feast-legacy"
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "feast"
@ -32,7 +32,7 @@ def test_feast_ingest(docker_compose_runner, pytestconfig, tmp_path):
{
"run_id": "feast-test",
"source": {
"type": "feast",
"type": "feast-legacy",
"config": {
"core_url": "localhost:6565",
"use_local_build": True,

View File

@ -0,0 +1,321 @@
[
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.MLPrimaryKeySnapshot": {
"urn": "urn:li:mlPrimaryKey:(feature_store.driver_hourly_stats,driver_id)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLPrimaryKeyProperties": {
"description": "Driver ID",
"dataType": "ORDINAL",
"version": null,
"sources": [
"urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)"
]
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": {
"urn": "urn:li:mlFeature:(feature_store.driver_hourly_stats,conv_rate)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"description": "Conv rate",
"dataType": "CONTINUOUS",
"version": null,
"sources": [
"urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)"
]
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": {
"urn": "urn:li:mlFeature:(feature_store.driver_hourly_stats,acc_rate)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"description": "Acc rate",
"dataType": "CONTINUOUS",
"version": null,
"sources": [
"urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)"
]
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": {
"urn": "urn:li:mlFeature:(feature_store.driver_hourly_stats,avg_daily_trips)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"description": "Avg daily trips",
"dataType": "ORDINAL",
"version": null,
"sources": [
"urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)"
]
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": {
"urn": "urn:li:mlFeature:(feature_store.driver_hourly_stats,string_feature)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"description": "String feature",
"dataType": "TEXT",
"version": null,
"sources": [
"urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)"
]
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureTableSnapshot": {
"urn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,feature_store.driver_hourly_stats)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {
"paths": [
"/feast/feature_store/feature_store.driver_hourly_stats"
]
}
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": {
"customProperties": {},
"description": null,
"mlFeatures": [
"urn:li:mlFeature:(feature_store.driver_hourly_stats,conv_rate)",
"urn:li:mlFeature:(feature_store.driver_hourly_stats,acc_rate)",
"urn:li:mlFeature:(feature_store.driver_hourly_stats,avg_daily_trips)",
"urn:li:mlFeature:(feature_store.driver_hourly_stats,string_feature)"
],
"mlPrimaryKeys": [
"urn:li:mlPrimaryKey:(feature_store.driver_hourly_stats,driver_id)"
]
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": {
"urn": "urn:li:mlFeature:(feature_store.transformed_conv_rate,conv_rate_plus_val1)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"description": null,
"dataType": "CONTINUOUS",
"version": null,
"sources": [
"urn:li:dataset:(urn:li:dataPlatform:request,vals_to_add,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)"
]
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": {
"urn": "urn:li:mlFeature:(feature_store.transformed_conv_rate,conv_rate_plus_val2)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": {
"description": null,
"dataType": "CONTINUOUS",
"version": null,
"sources": [
"urn:li:dataset:(urn:li:dataPlatform:request,vals_to_add,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)"
]
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
},
{
"auditHeader": null,
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureTableSnapshot": {
"urn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,feature_store.transformed_conv_rate)",
"aspects": [
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {
"paths": [
"/feast/feature_store/feature_store.transformed_conv_rate"
]
}
},
{
"com.linkedin.pegasus2avro.common.Status": {
"removed": false
}
},
{
"com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": {
"customProperties": {},
"description": null,
"mlFeatures": [
"urn:li:mlFeature:(feature_store.transformed_conv_rate,conv_rate_plus_val1)",
"urn:li:mlFeature:(feature_store.transformed_conv_rate,conv_rate_plus_val2)"
],
"mlPrimaryKeys": []
}
}
]
}
},
"proposedDelta": null,
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "feast-repository-test",
"registryName": null,
"registryVersion": null,
"properties": null
}
}
]

View File

@ -0,0 +1,10 @@
project: feature_store
provider: local
online_store:
type: sqlite
path: data/online_store.db
offline_store:
type: file
flags:
alpha_features: true
on_demand_transforms: true

View File

@ -0,0 +1,69 @@
from datetime import timedelta
import pandas as pd
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from feast.data_source import RequestDataSource
from feast.on_demand_feature_view import on_demand_feature_view
driver_hourly_stats_source = FileSource(
path="data/driver_stats_with_string.parquet",
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)
driver_entity = Entity(
name="driver_id", value_type=ValueType.INT64, description="Driver ID"
)
driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=timedelta(days=7),
features=[
Feature(
name="conv_rate",
dtype=ValueType.FLOAT,
labels=dict(description="Conv rate"),
),
Feature(
name="acc_rate", dtype=ValueType.FLOAT, labels=dict(description="Acc rate")
),
Feature(
name="avg_daily_trips",
dtype=ValueType.INT64,
labels=dict(description="Avg daily trips"),
),
Feature(
name="string_feature",
dtype=ValueType.STRING,
labels=dict(description="String feature"),
),
],
online=True,
batch_source=driver_hourly_stats_source,
tags={},
)
input_request = RequestDataSource(
name="vals_to_add",
schema={"val_to_add": ValueType.INT64, "val_to_add_2": ValueType.INT64},
)
@on_demand_feature_view( # type: ignore
inputs={
"driver_hourly_stats": driver_hourly_stats_view,
"vals_to_add": input_request,
},
features=[
Feature(name="conv_rate_plus_val1", dtype=ValueType.DOUBLE),
Feature(name="conv_rate_plus_val2", dtype=ValueType.DOUBLE),
],
)
def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df

View File

@ -0,0 +1,50 @@
import os
import sys
import pytest
from freezegun import freeze_time
from datahub.ingestion.run.pipeline import Pipeline
from tests.test_helpers import mce_helpers
FROZEN_TIME = "2020-04-14 07:00:00"
@freeze_time(FROZEN_TIME)
@pytest.mark.skipif(
os.getenv("AIRFLOW1_TEST") == "true", reason="feast requires Airflow 2.0 or newer"
)
@pytest.mark.skipif(
sys.version_info < (3, 7), reason="feast requires Python 3.7 or newer"
)
def test_feast_repository_ingest(pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/feast"
output_path = tmp_path / "feast_repository_mces.json"
pipeline = Pipeline.create(
{
"run_id": "feast-repository-test",
"source": {
"type": "feast",
"config": {
"path": str(test_resources_dir / "feature_store"),
"environment": "PROD",
},
},
"sink": {
"type": "file",
"config": {
"filename": str(output_path),
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig,
output_path=output_path,
golden_path=test_resources_dir / "feast_repository_mces_golden.json",
)

View File

@ -36,6 +36,9 @@ deps =
.[dev-airflow1]
-c tests/airflow1-constraints.txt
setenv =
AIRFLOW1_TEST = true
[testenv:py3-full]
deps =
.[dev]