Issue-2538: Add Iceberg type for Glue Table + Location Entity (#3210)

This commit is contained in:
Teddy 2022-03-07 17:27:14 +01:00 committed by GitHub
parent b4118c7863
commit 6ee31eb21f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 60 additions and 6 deletions

2
.gitignore vendored
View File

@ -67,6 +67,8 @@ openmetadata-ui/src/main/resources/ui/src/test
#tests #tests
.coverage .coverage
/ingestion/coverage.xml
/ingestion/junit/*
#vscode #vscode
*/.vscode/* */.vscode/*

View File

@ -16,7 +16,7 @@
"javaType": "org.openmetadata.catalog.type.LocationType", "javaType": "org.openmetadata.catalog.type.LocationType",
"description": "This schema defines the type used for describing different types of Location.", "description": "This schema defines the type used for describing different types of Location.",
"type": "string", "type": "string",
"enum": ["Bucket", "Prefix", "Database", "Table"], "enum": ["Bucket", "Prefix", "Database", "Table", "Iceberg"],
"javaEnums": [ "javaEnums": [
{ {
"name": "Bucket" "name": "Bucket"
@ -29,6 +29,9 @@
}, },
{ {
"name": "Table" "name": "Table"
},
{
"name": "Iceberg"
} }
] ]
} }

View File

@ -10,7 +10,7 @@
"javaType": "org.openmetadata.catalog.type.TableType", "javaType": "org.openmetadata.catalog.type.TableType",
"description": "This schema defines the type used for describing different types of tables.", "description": "This schema defines the type used for describing different types of tables.",
"type": "string", "type": "string",
"enum": ["Regular", "External", "View", "SecureView", "MaterializedView"], "enum": ["Regular", "External", "View", "SecureView", "MaterializedView", "Iceberg"],
"javaEnums": [ "javaEnums": [
{ {
"name": "Regular" "name": "Regular"
@ -26,6 +26,9 @@
}, },
{ {
"name": "MaterializedView" "name": "MaterializedView"
},
{
"name": "Iceberg"
} }
] ]
}, },

View File

@ -20,6 +20,30 @@
"ordinalPosition": 2 "ordinalPosition": 2
} }
] ]
},
{
"name": "marketing",
"description": "Marketing data",
"tableType": "Iceberg",
"columns": [
{
"name": "ad_id",
"dataType": "NUMERIC",
"dataTypeDisplay": "bigint",
"description": "Ad ID",
"ordinalPosition": 1
},
{
"name": "campaign_id",
"dataType": "NUMERIC",
"dataTypeDisplay": "bigint",
"description": "campaign ID",
"ordinalPosition": 1
}
],
"Parameters": {
"table_type": "ICEBERG"
}
} }
] ]
} }

View File

@ -152,18 +152,31 @@ class GlueSource(Source[Entity]):
service=EntityReference(id=self.service.id, type="databaseService"), service=EntityReference(id=self.service.id, type="databaseService"),
) )
fqn = f"{self.config.service_name}.{self.database_name}.{table['Name']}" fqn = f"{self.config.service_name}.{self.database_name}.{table['Name']}"
parameters = table.get("Parameters")
location_type = LocationType.Table
if parameters:
# iceberg tables need to pass a key/value pair in the DDL `'table_type'='ICEBERG'`
# https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-creating-tables.html
location_type = (
location_type
if parameters.get("table_type") != "ICEBERG"
else LocationType.Iceberg
)
self.dataset_name = fqn self.dataset_name = fqn
table_columns = self.get_columns(table["StorageDescriptor"]) table_columns = self.get_columns(table["StorageDescriptor"])
location_entity = Location( location_entity = Location(
name=table["StorageDescriptor"]["Location"], name=table["StorageDescriptor"]["Location"],
locationType=LocationType.Table, locationType=location_type,
service=EntityReference( service=EntityReference(
id=self.storage_service.id, type="storageService" id=self.storage_service.id, type="storageService"
), ),
) )
table_type: TableType = TableType.Regular table_type: TableType = TableType.Regular
if table["TableType"] == "EXTERNAL_TABLE": if location_type == LocationType.Iceberg:
table_type = TableType.Iceberg
elif table["TableType"] == "EXTERNAL_TABLE":
table_type = TableType.External table_type = TableType.External
elif table["TableType"] == "VIRTUAL_VIEW": elif table["TableType"] == "VIRTUAL_VIEW":
table_type = TableType.View table_type = TableType.View

View File

@ -331,12 +331,21 @@ class SampleDataSource(Source[Entity]):
) )
for table in self.glue_tables["tables"]: for table in self.glue_tables["tables"]:
table["id"] = uuid.uuid4() table["id"] = uuid.uuid4()
parameters = table.get("Parameters")
table = {key: val for key, val in table.items() if key != "Parameters"}
table_metadata = Table(**table) table_metadata = Table(**table)
location_type = LocationType.Table
if parameters:
location_type = (
location_type
if parameters.get("table_type") != "ICEBERG"
else LocationType.Iceberg
)
location_metadata = Location( location_metadata = Location(
id=uuid.uuid4(), id=uuid.uuid4(),
name="s3://glue_bucket/dwh/schema/" + table["name"], name="s3://glue_bucket/dwh/schema/" + table["name"],
description=table["description"], description=table["description"],
locationType=LocationType.Table, locationType=location_type,
service=EntityReference( service=EntityReference(
id=self.glue_storage_service.id, type="storageService" id=self.glue_storage_service.id, type="storageService"
), ),

View File

@ -254,7 +254,7 @@ class ColumnTypeParser:
} }
elif ColumnTypeParser._FIXED_STRING.match(s): elif ColumnTypeParser._FIXED_STRING.match(s):
m = ColumnTypeParser._FIXED_STRING.match(s) m = ColumnTypeParser._FIXED_STRING.match(s)
return {"type": "STRING", "dataTypeDisplay": s} return {"dataType": "STRING", "dataTypeDisplay": s}
elif ColumnTypeParser._FIXED_DECIMAL.match(s): elif ColumnTypeParser._FIXED_DECIMAL.match(s):
m = ColumnTypeParser._FIXED_DECIMAL.match(s) m = ColumnTypeParser._FIXED_DECIMAL.match(s)
if m.group(2) is not None: # type: ignore if m.group(2) is not None: # type: ignore