Fix #6700: Add support for table properties: file format for datalake (#12920)

* Fix #6700: Add support for table properties: file format for datalake & storage

* pylint fix

* resolve review comments
This commit is contained in:
Mayur Singal 2023-08-22 13:16:22 +05:30 committed by GitHub
parent d140883747
commit f6d5c7413f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 33 additions and 3 deletions

View File

@ -60,7 +60,7 @@ from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper
from metadata.readers.dataframe.reader_factory import SupportedTypes
from metadata.utils import fqn
from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR, DEFAULT_DATABASE
from metadata.utils.datalake.datalake_utils import fetch_dataframe
from metadata.utils.datalake.datalake_utils import fetch_dataframe, get_file_format_type
from metadata.utils.filters import filter_by_schema, filter_by_table
from metadata.utils.logger import ingestion_logger
from metadata.utils.s3_utils import list_s3_objects
@ -238,7 +238,7 @@ class DatalakeSource(DatabaseServiceSource):
def get_tables_name_and_type( # pylint: disable=too-many-branches
self,
) -> Optional[Iterable[Tuple[str, str]]]:
) -> Iterable[Tuple[str, TableType]]:
"""
Handle table and views.
@ -371,6 +371,7 @@ class DatalakeSource(DatabaseServiceSource):
bucket_name=schema_name,
),
)
# If no data_frame (due to unsupported type), ignore
columns = self.get_columns(data_frame[0]) if data_frame else None
if columns:
@ -380,6 +381,7 @@ class DatalakeSource(DatabaseServiceSource):
columns=columns,
tableConstraints=table_constraints if table_constraints else None,
databaseSchema=self.context.database_schema.fullyQualifiedName,
fileFormat=get_file_format_type(table_name),
)
yield table_request
self.register_record(table_request=table_request)
@ -552,7 +554,7 @@ class DatalakeSource(DatabaseServiceSource):
complex_col_dict.clear()
return cols
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
def yield_view_lineage(self) -> Iterable[AddLineageRequest]:
yield from []
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]:

View File

@ -145,6 +145,7 @@ class S3Source(StorageServiceSource):
service=self.context.objectstore_service.fullyQualifiedName,
parent=container_details.parent,
sourceUrl=container_details.sourceUrl,
fileFormats=container_details.file_formats,
)
def _generate_container_details(

View File

@ -62,3 +62,16 @@ def fetch_dataframe(
raise err
return None
def get_file_format_type(
key: str,
) -> Optional[str]:
"""
Method to get file format type
"""
for supported_type in SupportedTypes:
if key.endswith(supported_type.value):
return supported_type.value
return None

View File

@ -923,6 +923,7 @@ public class TableResource extends EntityResource<Table, TableRepository> {
.withTablePartition(create.getTablePartition())
.withTableType(create.getTableType())
.withTags(create.getTags())
.withFileFormat(create.getFileFormat())
.withViewDefinition(create.getViewDefinition())
.withTableProfilerConfig(create.getTableProfilerConfig())
.withDatabaseSchema(getEntityReference(Entity.DATABASE_SCHEMA, create.getDatabaseSchema())))

View File

@ -88,6 +88,10 @@
"items" : {
"$ref" : "../../type/basic.json#/definitions/fullyQualifiedEntityName"
}
},
"fileFormat": {
"description": "File format in case of file/datalake tables.",
"$ref": "../../entity/data/table.json#/definitions/fileFormat"
}
},
"required": ["name", "columns", "databaseSchema"],

View File

@ -860,6 +860,11 @@
"sql"
],
"additionalProperties": false
},
"fileFormat": {
"description": "File format in case of file/datalake tables.",
"type": "string",
"enum": ["csv", "tsv", "avro", "parquet", "json", "json.gz", "json.zip"]
}
},
"properties": {
@ -1023,6 +1028,10 @@
"dataProducts" : {
"description": "List of data products this entity is part of.",
"$ref" : "../../type/entityReferenceList.json"
},
"fileFormat": {
"description": "File format in case of file/datalake tables.",
"$ref" : "#/definitions/fileFormat"
}
},
"required": [