diff --git a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py index e60087a81a3..96ef322eb1c 100644 --- a/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/datalake/metadata.py @@ -60,7 +60,7 @@ from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper from metadata.readers.dataframe.reader_factory import SupportedTypes from metadata.utils import fqn from metadata.utils.constants import COMPLEX_COLUMN_SEPARATOR, DEFAULT_DATABASE -from metadata.utils.datalake.datalake_utils import fetch_dataframe +from metadata.utils.datalake.datalake_utils import fetch_dataframe, get_file_format_type from metadata.utils.filters import filter_by_schema, filter_by_table from metadata.utils.logger import ingestion_logger from metadata.utils.s3_utils import list_s3_objects @@ -238,7 +238,7 @@ class DatalakeSource(DatabaseServiceSource): def get_tables_name_and_type( # pylint: disable=too-many-branches self, - ) -> Optional[Iterable[Tuple[str, str]]]: + ) -> Iterable[Tuple[str, TableType]]: """ Handle table and views. @@ -371,6 +371,7 @@ class DatalakeSource(DatabaseServiceSource): bucket_name=schema_name, ), ) + # If no data_frame (due to unsupported type), ignore columns = self.get_columns(data_frame[0]) if data_frame else None if columns: @@ -380,6 +381,7 @@ class DatalakeSource(DatabaseServiceSource): columns=columns, tableConstraints=table_constraints if table_constraints else None, databaseSchema=self.context.database_schema.fullyQualifiedName, + fileFormat=get_file_format_type(table_name), ) yield table_request self.register_record(table_request=table_request) @@ -552,7 +554,7 @@ class DatalakeSource(DatabaseServiceSource): complex_col_dict.clear() return cols - def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]: + def yield_view_lineage(self) -> Iterable[AddLineageRequest]: yield from [] def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]: diff --git a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py index c086eeb1f72..7734953fe61 100644 --- a/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py +++ b/ingestion/src/metadata/ingestion/source/storage/s3/metadata.py @@ -145,6 +145,7 @@ class S3Source(StorageServiceSource): service=self.context.objectstore_service.fullyQualifiedName, parent=container_details.parent, sourceUrl=container_details.sourceUrl, + fileFormats=container_details.file_formats, ) def _generate_container_details( diff --git a/ingestion/src/metadata/utils/datalake/datalake_utils.py b/ingestion/src/metadata/utils/datalake/datalake_utils.py index 5f45dd0de3c..02ecd69f05d 100644 --- a/ingestion/src/metadata/utils/datalake/datalake_utils.py +++ b/ingestion/src/metadata/utils/datalake/datalake_utils.py @@ -62,3 +62,16 @@ def fetch_dataframe( raise err return None + + +def get_file_format_type( + key: str, +) -> Optional[str]: + """ + Method to get file format type + """ + for supported_type in SupportedTypes: + if key.endswith(supported_type.value): + return supported_type.value + + return None diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/TableResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/TableResource.java index 4fcec6a2547..09f7be9a45f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/TableResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/databases/TableResource.java @@ -923,6 +923,7 @@ public class TableResource extends EntityResource { .withTablePartition(create.getTablePartition()) .withTableType(create.getTableType()) .withTags(create.getTags()) + .withFileFormat(create.getFileFormat()) .withViewDefinition(create.getViewDefinition()) .withTableProfilerConfig(create.getTableProfilerConfig()) .withDatabaseSchema(getEntityReference(Entity.DATABASE_SCHEMA, create.getDatabaseSchema()))) diff --git a/openmetadata-spec/src/main/resources/json/schema/api/data/createTable.json b/openmetadata-spec/src/main/resources/json/schema/api/data/createTable.json index 3299d293532..4397ce77687 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/data/createTable.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/data/createTable.json @@ -88,6 +88,10 @@ "items" : { "$ref" : "../../type/basic.json#/definitions/fullyQualifiedEntityName" } + }, + "fileFormat": { + "description": "File format in case of file/datalake tables.", + "$ref": "../../entity/data/table.json#/definitions/fileFormat" } }, "required": ["name", "columns", "databaseSchema"], diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json index 93dc0110346..de54f04346d 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json @@ -860,6 +860,11 @@ "sql" ], "additionalProperties": false + }, + "fileFormat": { + "description": "File format in case of file/datalake tables.", + "type": "string", + "enum": ["csv", "tsv", "avro", "parquet", "json", "json.gz", "json.zip"] } }, "properties": { @@ -1023,6 +1028,10 @@ "dataProducts" : { "description": "List of data products this entity is part of.", "$ref" : "../../type/entityReferenceList.json" + }, + "fileFormat": { + "description": "File format in case of file/datalake tables.", + "$ref" : "#/definitions/fileFormat" } }, "required": [