Fix #1306: Evolve Location entity integration (#1442)

This commit is contained in:
Alberto Miorin 2021-12-03 02:13:13 +01:00 committed by GitHub
parent 249643b79f
commit f9436c65f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 287 additions and 31 deletions

View File

@ -40,7 +40,7 @@ generate:
run_ometa_integration_tests:
cd ingestion; \
pytest -c setup.cfg --override-ini=testpaths=tests/integration/ometa
pytest -c setup.cfg --override-ini=testpaths="tests/integration/ometa tests/unit/stage_test.py"
publish:
make install_dev generate

View File

@ -0,0 +1,9 @@
{
"id": null,
"name": "default",
"description": "This **mock** database contains tables related to the Glue service",
"service": {
"id": "b946d870-03b2-4d33-a075-13665a7a76b9",
"type": "GLUE"
}
}

View File

@ -0,0 +1,11 @@
{
"id": "89229e26-7f74-4443-a568-85512eaeaa07",
"name": "glue",
"serviceType": "Glue",
"description": "Glue service used for table with location",
"href": "null",
"jdbc": {
"driverClass" : "jdbc",
"connectionUrl" : "jdbc://localhost"
}
}

View File

@ -0,0 +1,6 @@
{
"name": "glue_s3",
"serviceType": "S3",
"description": "S3 Object storage used for the Glue Catalog",
"href": "null"
}

View File

@ -0,0 +1,25 @@
{
"tables": [
{
"name": "sales",
"description": "Sales data",
"tableType": "Regular",
"columns": [
{
"name": "SKU",
"dataType": "NUMERIC",
"dataTypeDisplay": "numeric",
"description": "Article SKU",
"ordinalPosition": 1
},
{
"name": "Quantity",
"dataType": "NUMERIC",
"dataTypeDisplay": "numeric",
"description": "Quantity of SKU",
"ordinalPosition": 2
}
]
}
]
}

View File

@ -0,0 +1,22 @@
{
"locations": [
{
"name": "s3://bucket-a",
"displayName": "s3://bucket-a",
"description": "Bucket A",
"locationType": "Bucket"
},
{
"name": "s3://bucket-b",
"displayName": "s3://bucket-b",
"description": "Bucket B",
"locationType": "Bucket"
},
{
"name": "s3://bucket-a/user/hive/dwh",
"displayName": "s3://bucket-a/user/hive/dwh",
"description": "Bucket A prefix",
"locationType": "Prefix"
}
]
}

View File

@ -0,0 +1,6 @@
{
"name": "sample_s3",
"serviceType": "S3",
"description": "S3 Object storage",
"href": "null"
}

View File

@ -2,3 +2,4 @@ black
isort
pre-commit
pylint
faker

View File

@ -8,17 +8,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
from pydantic import BaseModel
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.dbtmodel import DbtModel
from metadata.generated.schema.entity.data.location import Location
from metadata.generated.schema.entity.data.table import Table
class OMetaDatabaseAndTable(BaseModel):
table: Table
database: Database
table: Table
location: Optional[Location]
class OMetaDatabaseAndModel(BaseModel):

View File

@ -6,6 +6,7 @@ To be used by OpenMetadata class
import logging
from typing import List
from metadata.generated.schema.entity.data.location import Location
from metadata.generated.schema.entity.data.table import (
Table,
TableData,
@ -27,6 +28,18 @@ class OMetaTableMixin:
client: REST
def add_location(self, table: Table, location: Location) -> None:
"""
PUT location for a table
:param table: Table Entity to update
:param location: Location Entity to add
"""
resp = self.client.put(
f"{self.get_suffix(Table)}/{table.id.__root__}/location",
data=str(location.id.__root__),
)
def ingest_table_sample_data(
self, table: Table, sample_data: TableData
) -> TableData:

View File

@ -130,65 +130,76 @@ class MetadataRestSink(Sink):
f"Ignoring the record due to unknown Record type {type(record)}"
)
def write_tables(self, table_and_db: OMetaDatabaseAndTable):
def write_tables(self, db_and_table: OMetaDatabaseAndTable):
try:
db_request = CreateDatabaseEntityRequest(
name=table_and_db.database.name,
description=table_and_db.database.description,
name=db_and_table.database.name,
description=db_and_table.database.description,
service=EntityReference(
id=table_and_db.database.service.id, type="databaseService"
id=db_and_table.database.service.id,
type="databaseService",
),
)
db = self.metadata.create_or_update(db_request)
table_request = CreateTableEntityRequest(
name=table_and_db.table.name,
tableType=table_and_db.table.tableType,
columns=table_and_db.table.columns,
description=table_and_db.table.description,
name=db_and_table.table.name,
tableType=db_and_table.table.tableType,
columns=db_and_table.table.columns,
description=db_and_table.table.description,
database=db.id,
)
if (
table_and_db.table.viewDefinition is not None
and table_and_db.table.viewDefinition != ""
):
if db_and_table.table.viewDefinition:
table_request.viewDefinition = (
table_and_db.table.viewDefinition.__root__
db_and_table.table.viewDefinition.__root__
)
created_table = self.metadata.create_or_update(table_request)
if table_and_db.table.sampleData is not None:
self.metadata.ingest_table_sample_data(
table=created_table, sample_data=table_and_db.table.sampleData
if db_and_table.location is not None:
location_request = CreateLocationEntityRequest(
name=db_and_table.location.name,
description=db_and_table.location.description,
service=EntityReference(
id=db_and_table.location.service.id,
type="storageService",
),
)
if table_and_db.table.tableProfile is not None:
for tp in table_and_db.table.tableProfile:
location = self.metadata.create_or_update(location_request)
self.metadata.add_location(table=created_table, location=location)
if db_and_table.table.sampleData is not None:
self.metadata.ingest_table_sample_data(
table=created_table,
sample_data=db_and_table.table.sampleData,
)
if db_and_table.table.tableProfile is not None:
for tp in db_and_table.table.tableProfile:
for pd in tp:
if pd[0] == "columnProfile":
for col in pd[1]:
col.name = col.name.replace(".", "_DOT_")
self.metadata.ingest_table_profile_data(
table=created_table,
table_profile=table_and_db.table.tableProfile,
table_profile=db_and_table.table.tableProfile,
)
logger.info(
"Successfully ingested table {}.{}".format(
table_and_db.database.name.__root__, created_table.name.__root__
db_and_table.database.name.__root__,
created_table.name.__root__,
)
)
self.status.records_written(
f"Table: {table_and_db.database.name.__root__}.{created_table.name.__root__}"
f"Table: {db_and_table.database.name.__root__}.{created_table.name.__root__}"
)
except (APIError, ValidationError) as err:
logger.error(
"Failed to ingest table {} in database {} ".format(
table_and_db.table.name.__root__,
table_and_db.database.name.__root__,
db_and_table.table.name.__root__,
db_and_table.database.name.__root__,
)
)
logger.error(err)
self.status.failure(f"Table: {table_and_db.table.name.__root__}")
self.status.failure(f"Table: {db_and_table.table.name.__root__}")
def write_dbt_models(self, model_and_db: OMetaDatabaseAndModel):
try:

View File

@ -13,7 +13,6 @@ import csv
import json
import logging
import os
import random
import uuid
from collections import namedtuple
from dataclasses import dataclass, field
@ -25,8 +24,8 @@ from pydantic import ValidationError
from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createTopic import CreateTopicEntityRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineage
from metadata.generated.schema.api.teams.createUser import CreateUserEntityRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.location import Location, LocationType
from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table
@ -36,7 +35,9 @@ from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Record
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.ometa_table_db import (
OMetaDatabaseAndTable,
)
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
@ -45,6 +46,8 @@ from metadata.utils.helpers import (
get_database_service_or_create,
get_messaging_service_or_create,
get_pipeline_service_or_create,
get_storage_service_or_create,
get_database_service_or_create_v2,
)
logger: logging.Logger = logging.getLogger(__name__)
@ -221,6 +224,36 @@ class SampleDataSource(Source):
self.config = config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.storage_service_json = json.load(
open(self.config.sample_data_folder + "/locations/service.json", "r")
)
self.locations = json.load(
open(self.config.sample_data_folder + "/locations/locations.json", "r")
)
self.storage_service = get_storage_service_or_create(
self.storage_service_json,
metadata_config,
)
self.glue_storage_service_json = json.load(
open(self.config.sample_data_folder + "/glue/storage_service.json", "r")
)
self.glue_database_service_json = json.load(
open(self.config.sample_data_folder + "/glue/database_service.json", "r")
)
self.glue_database = json.load(
open(self.config.sample_data_folder + "/glue/database.json", "r")
)
self.glue_tables = json.load(
open(self.config.sample_data_folder + "/glue/tables.json", "r")
)
self.glue_database_service = get_database_service_or_create_v2(
self.glue_database_service_json,
metadata_config,
)
self.glue_storage_service = get_storage_service_or_create(
self.glue_storage_service_json,
metadata_config,
)
self.database_service_json = json.load(
open(self.config.sample_data_folder + "/datasets/service.json", "r")
)
@ -293,6 +326,8 @@ class SampleDataSource(Source):
pass
def next_record(self) -> Iterable[Record]:
yield from self.ingest_locations()
yield from self.ingest_glue()
yield from self.ingest_tables()
yield from self.ingest_topics()
yield from self.ingest_charts()
@ -302,6 +337,52 @@ class SampleDataSource(Source):
yield from self.ingest_users()
yield from self.ingest_mlmodels()
def ingest_locations(self) -> Iterable[Location]:
for location in self.locations["locations"]:
location_ev = Location(
id=uuid.uuid4(),
name=location["name"],
displayName=location["displayName"],
description=location["description"],
locationType=location["locationType"],
service=EntityReference(
id=self.storage_service.id, type="storageService"
),
)
yield location_ev
def ingest_glue(self) -> Iterable[OMetaDatabaseAndTable]:
db = Database(
id=uuid.uuid4(),
name=self.glue_database["name"],
description=self.glue_database["description"],
service=EntityReference(
id=self.glue_database_service.id,
type=self.glue_database_service.serviceType.value,
),
)
for table in self.glue_tables["tables"]:
if not table.get("sampleData"):
table["sampleData"] = GenerateFakeSampleData.check_columns(
table["columns"]
)
table["id"] = uuid.uuid4()
table_metadata = Table(**table)
location_metadata = Location(
id=uuid.uuid4(),
name="s3://glue_bucket/dwh/schema/" + table["name"],
description=table["description"],
locationType=LocationType.Table,
service=EntityReference(
id=self.glue_storage_service.id, type="storageService"
),
)
db_table_location = OMetaDatabaseAndTable(
database=db, table=table_metadata, location=location_metadata
)
self.status.scanned("table", table_metadata.name.__root__)
yield db_table_location
def ingest_tables(self) -> Iterable[OMetaDatabaseAndTable]:
db = Database(
id=uuid.uuid4(),

View File

@ -14,7 +14,6 @@ import logging
import pathlib
from metadata.config.common import ConfigModel
from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.api.common import WorkflowContext
from metadata.ingestion.api.stage import Stage, StageStatus
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
@ -52,7 +51,7 @@ class FileStage(Stage):
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(ctx, config, metadata_config)
def stage_record(self, record: Table) -> None:
def stage_record(self, record) -> None:
json_record = json.loads(record.json())
self.file.write(json.dumps(json_record))
self.file.write("\n")

View File

@ -24,10 +24,14 @@ from metadata.generated.schema.api.services.createMessagingService import (
from metadata.generated.schema.api.services.createPipelineService import (
CreatePipelineServiceEntityRequest,
)
from metadata.generated.schema.api.services.createStorageService import (
CreateStorageServiceEntityRequest,
)
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.services.storageService import StorageService
from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -132,6 +136,30 @@ def get_pipeline_service_or_create(service_json, metadata_config) -> PipelineSer
return created_service
def get_storage_service_or_create(service_json, metadata_config) -> StorageService:
metadata = OpenMetadata(metadata_config)
service = metadata.get_by_name(entity=StorageService, fqdn=service_json["name"])
if service is not None:
return service
else:
created_service = metadata.create_or_update(
CreateStorageServiceEntityRequest(**service_json)
)
return created_service
def get_database_service_or_create_v2(service_json, metadata_config) -> DatabaseService:
metadata = OpenMetadata(metadata_config)
service = metadata.get_by_name(entity=DatabaseService, fqdn=service_json["name"])
if service is not None:
return service
else:
created_service = metadata.create_or_update(
CreateDatabaseServiceEntityRequest(**service_json)
)
return created_service
def convert_epoch_to_iso(seconds_since_epoch):
dt = datetime.utcfromtimestamp(seconds_since_epoch)
iso_format = dt.isoformat() + "Z"

View File

@ -0,0 +1,41 @@
import json
from unittest import TestCase
from metadata.ingestion.api.workflow import Workflow
config = """
{
"source": {
"type": "sample-data",
"config": {
"sample_data_folder": "examples/sample_data/"
}
},
"stage": {
"type": "file",
"config": {
"filename": "/tmp/stage_test"
}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
}
}
}
"""
class WorkflowTest(TestCase):
def test_execute_200(self):
"""
stage/file.py must be compatible with source/sample_data.py,
this test try to catch if one becomes incompatible with the other
by running a workflow that includes both of them.
"""
workflow = Workflow.create(json.loads(config))
workflow.execute()
workflow.stop()
self.assertTrue(True)