From e8975aac011ceada784c87f6ed99bc0ec7445f0b Mon Sep 17 00:00:00 2001 From: Abhishek Pandey Date: Wed, 15 Jun 2022 12:27:21 +0530 Subject: [PATCH] datalake-csv-files-ingestion-added (#5343) datalake-csv-files-ingestion-added (#5343) --- .../database/datalakeConnection.json | 83 ++++++ .../entity/services/databaseService.json | 9 +- ingestion/examples/workflows/datalake.yaml | 26 ++ ingestion/setup.py | 9 + .../ingestion/source/database/datalake.py | 280 ++++++++++++++++++ .../src/metadata/utils/connection_clients.py | 7 + ingestion/src/metadata/utils/connections.py | 67 +++++ ingestion/src/metadata/utils/gcs_utils.py | 58 ++++ ingestion/src/metadata/utils/s3_utils.py | 57 ++++ .../src/metadata/utils/source_connections.py | 2 + 10 files changed, 597 insertions(+), 1 deletion(-) create mode 100644 catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/datalakeConnection.json create mode 100644 ingestion/examples/workflows/datalake.yaml create mode 100644 ingestion/src/metadata/ingestion/source/database/datalake.py create mode 100644 ingestion/src/metadata/utils/gcs_utils.py create mode 100644 ingestion/src/metadata/utils/s3_utils.py diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/datalakeConnection.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/datalakeConnection.json new file mode 100644 index 00000000000..ab6c4cfa418 --- /dev/null +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/connections/database/datalakeConnection.json @@ -0,0 +1,83 @@ +{ + "$id": "https://open-metadata.org/schema/entity/services/connections/database/datalakeConnection.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "DatalakeConnection", + "description": "Datalake Connection Config", + "type": "object", + "javaType": "org.openmetadata.catalog.services.connections.database.DatalakeConnection", + "definitions": { + "datalakeType": { + "description": "Service type.", + "type": "string", + "enum": ["Datalake"], + "default": "Datalake" + }, + + "GCSConfig": { + "title": "DataLake GCS Config Source", + "description": "DataLake Catalog and Manifest files in GCS storage. We will search for catalog.json and manifest.json.", + "properties": { + "securityConfig": { + "title": "DataLake GCS Security Config", + "$ref": "../../../../security/credentials/gcsCredentials.json" + } + } + }, + "S3Config": { + "title": "DataLake S3 Config Source", + "description": "DataLake Catalog and Manifest files in S3 bucket. We will search for catalog.json and manifest.json.", + "properties": { + "securityConfig": { + "title": "DataLake S3 Security Config", + "$ref": "../../../../security/credentials/awsCredentials.json" + } + } + } + }, + "properties": { + "type": { + "title": "Service Type", + "description": "Service Type", + "$ref": "#/definitions/datalakeType", + "default": "Datalake" + }, + "configSource": { + "title": "DataLake Configuration Source", + "description": "Available sources to fetch files.", + "oneOf": [ + { + "$ref": "#/definitions/S3Config" + }, + { + "$ref": "#/definitions/GCSConfig" + } + ] + }, + "bucketName": { + "title": "Bucket Name", + "description": "Bucket Name of the data source.", + "type": "string", + "default": "" + }, + "prefix": { + "title": "Prefix", + "description": "Prefix of the data source.", + "type": "string", + "default": "" + }, + "connectionOptions": { + "title": "Connection Options", + "$ref": "../connectionBasicType.json#/definitions/connectionOptions" + }, + "connectionArguments": { + "title": "Connection Arguments", + "$ref": "../connectionBasicType.json#/definitions/connectionArguments" + }, + "supportsMetadataExtraction": { + "title": "Supports Metadata Extraction", + "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" + } + }, + "additionalProperties": false, + "required": ["configSource"] +} diff --git a/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json b/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json index 44d0d5eac2d..7076ac4bc87 100644 --- a/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json +++ b/catalog-rest-service/src/main/resources/json/schema/entity/services/databaseService.json @@ -36,7 +36,8 @@ "DeltaLake", "Salesforce", "SampleData", - "PinotDB" + "PinotDB", + "Datalake" ], "javaEnums": [ { @@ -116,6 +117,9 @@ }, { "name": "PinotDB" + }, + { + "name": "Datalake" } ] }, @@ -202,6 +206,9 @@ }, { "$ref": "./connections/database/pinotDBConnection.json" + }, + { + "$ref": "./connections/database/datalakeConnection.json" } ] } diff --git a/ingestion/examples/workflows/datalake.yaml b/ingestion/examples/workflows/datalake.yaml new file mode 100644 index 00000000000..2202586bb94 --- /dev/null +++ b/ingestion/examples/workflows/datalake.yaml @@ -0,0 +1,26 @@ +source: + type: datalake + serviceName: local_datalake4 + serviceConnection: + config: + type: Datalake + configSource: + securityConfig: + awsAccessKeyId: aws access key id + awsSecretAccessKey: aws secret access key + awsRegion: aws region + bucketName: bucket name + prefix: prefix + sourceConfig: + config: + tableFilterPattern: + includes: + - '' +sink: + type: metadata-rest + config: {} +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: no-auth + \ No newline at end of file diff --git a/ingestion/setup.py b/ingestion/setup.py index 2bd5e44e317..c1d0caca8ca 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -80,6 +80,15 @@ plugins: Dict[str, Set[str]] = { "bigquery-usage": {"google-cloud-logging", "cachetools"}, "docker": {"python_on_whales==0.34.0"}, "backup": {"boto3~=1.19.12"}, + "datalake": { + "google-cloud-storage==1.43.0", + "pandas==1.3.5", + "gcsfs==2022.5.0", + "s3fs==0.4.2", + "dask==2022.2.0", + "pyarrow==6.0.1", + "boto3~=1.19.12", + }, "dbt": {"google-cloud", "boto3", "google-cloud-storage==1.43.0"}, "druid": {"pydruid>=0.6.2"}, "elasticsearch": {"elasticsearch==7.13.1"}, diff --git a/ingestion/src/metadata/ingestion/source/database/datalake.py b/ingestion/src/metadata/ingestion/source/database/datalake.py new file mode 100644 index 00000000000..4fb5481f9e7 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/database/datalake.py @@ -0,0 +1,280 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +DataLake connector to fetch metadata from a files stored s3, gcs and Hdfs +""" +import traceback +import uuid +from typing import Iterable, Optional + +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema +from metadata.generated.schema.entity.data.table import Column, Table, TableData +from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( + DatalakeConnection, + GCSConfig, + S3Config, +) +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( + DatabaseServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.api.common import Entity +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus +from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.database.common_db_source import SQLSourceStatus +from metadata.utils.connections import get_connection, test_connection +from metadata.utils.filters import filter_by_table +from metadata.utils.gcs_utils import ( + read_csv_from_gcs, + read_json_from_gcs, + read_parquet_from_gcs, + read_tsv_from_gcs, +) +from metadata.utils.logger import ingestion_logger +from metadata.utils.s3_utils import ( + read_csv_from_s3, + read_json_from_s3, + read_parquet_from_s3, + read_tsv_from_s3, +) + +logger = ingestion_logger() + + +class DatalakeSource(Source[Entity]): + def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection): + super().__init__() + self.status = SQLSourceStatus() + + self.config = config + self.source_config: DatabaseServiceMetadataPipeline = ( + self.config.sourceConfig.config + ) + self.metadata_config = metadata_config + self.metadata = OpenMetadata(metadata_config) + self.service_connection = self.config.serviceConnection.__root__.config + self.service = self.metadata.get_service_or_create( + entity=DatabaseService, config=config + ) + + self.connection = get_connection(self.service_connection) + + self.client = self.connection.client + + @classmethod + def create(cls, config_dict, metadata_config: OpenMetadataConnection): + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: DatalakeConnection = config.serviceConnection.__root__.config + if not isinstance(connection, DatalakeConnection): + raise InvalidSourceException( + f"Expected DatalakeConnection, but got {connection}" + ) + return cls(config, metadata_config) + + def prepare(self): + pass + + def next_record(self) -> Iterable[Entity]: + try: + + bucket_name = self.service_connection.bucketName + prefix = self.service_connection.prefix + + if isinstance(self.service_connection.configSource, GCSConfig): + if bucket_name: + yield from self.get_gcs_files(bucket_name, prefix) + else: + for bucket in self.client.list_buckets(): + yield from self.get_gcs_files(bucket.name, prefix) + + if isinstance(self.service_connection.configSource, S3Config): + if bucket_name: + yield from self.get_s3_files(bucket_name, prefix) + else: + for bucket in self.client.list_buckets()["Buckets"]: + yield from self.get_s3_files(bucket["Name"], prefix) + + except Exception as err: + logger.error(traceback.format_exc()) + logger.error(err) + + def get_gcs_files(self, bucket_name, prefix): + + bucket = self.client.get_bucket(bucket_name) + + for key in bucket.list_blobs(prefix=prefix): + try: + if filter_by_table( + self.config.sourceConfig.config.tableFilterPattern, key.name + ): + self.status.filter( + "{}".format(key["Key"]), + "Table pattern not allowed", + ) + continue + + if key.name.endswith(".csv"): + + df = read_csv_from_gcs(key, bucket_name) + + yield from self.ingest_tables(key.name, df, bucket_name) + + if key.name.endswith(".tsv"): + + df = read_tsv_from_gcs(key, bucket_name) + + yield from self.ingest_tables(key.name, df, bucket_name) + + if key.name.endswith(".json"): + + df = read_json_from_gcs(key) + + yield from self.ingest_tables(key.name, df, bucket_name) + + if key.name.endswith(".parquet"): + + df = read_parquet_from_gcs(key, bucket_name) + + yield from self.ingest_tables(key.name, df, bucket_name) + + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) + + def get_s3_files(self, bucket_name, prefix): + kwargs = {"Bucket": bucket_name} + if prefix: + kwargs["prefix"] = prefix + for key in self.client.list_objects(**kwargs)["Contents"]: + try: + if filter_by_table( + self.config.sourceConfig.config.tableFilterPattern, key["Key"] + ): + self.status.filter( + "{}".format(key["Key"]), + "Table pattern not allowed", + ) + continue + if key["Key"].endswith(".csv"): + + df = read_csv_from_s3(self.client, key, bucket_name) + + yield from self.ingest_tables(key["Key"], df, bucket_name) + + if key["Key"].endswith(".tsv"): + + df = read_tsv_from_s3(self.client, key, bucket_name) + + yield from self.ingest_tables(key["Key"], df, bucket_name) + + if key["Key"].endswith(".json"): + + df = read_json_from_s3(self.client, key, bucket_name) + + yield from self.ingest_tables(key["Key"], df, bucket_name) + + if key["Key"].endswith(".parquet"): + + df = read_parquet_from_s3(self.client, key, bucket_name) + + yield from self.ingest_tables(key["Key"], df, bucket_name) + + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) + + def ingest_tables(self, key, df, bucket_name) -> Iterable[OMetaDatabaseAndTable]: + try: + table_columns = self.get_columns(df) + database_entity = Database( + id=uuid.uuid4(), + name="default", + service=EntityReference(id=self.service.id, type="databaseService"), + ) + table_entity = Table( + id=uuid.uuid4(), + name=key, + description="", + columns=table_columns, + ) + schema_entity = DatabaseSchema( + id=uuid.uuid4(), + name=bucket_name, + database=EntityReference(id=database_entity.id, type="database"), + service=EntityReference(id=self.service.id, type="databaseService"), + ) + table_and_db = OMetaDatabaseAndTable( + table=table_entity, + database=database_entity, + database_schema=schema_entity, + ) + + yield table_and_db + + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) + + def fetch_sample_data(self, df, table: str) -> Optional[TableData]: + try: + cols = [] + table_columns = self.get_columns(df) + + for col in table_columns: + cols.append(col.name.__root__) + table_rows = df.values.tolist() + + return TableData(columns=cols, rows=table_rows) + # Catch any errors and continue the ingestion + except Exception as err: # pylint: disable=broad-except + logger.debug(traceback.format_exc()) + logger.error(f"Failed to generate sample data for {table} - {err}") + return None + + def get_columns(self, df): + df_columns = list(df.columns) + for column in df_columns: + try: + if hasattr(df[column], "dtypes"): + if df[column].dtypes.name == "int64": + data_type = "INT" + if df[column].dtypes.name == "object": + data_type = "INT" + else: + data_type = "STRING" + parsed_string = {} + parsed_string["dataTypeDisplay"] = column + parsed_string["dataType"] = data_type + parsed_string["name"] = column[:64] + parsed_string["dataLength"] = parsed_string.get("dataLength", 1) + yield Column(**parsed_string) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.error(err) + + def close(self): + pass + + def get_status(self) -> SourceStatus: + return self.status + + def test_connection(self) -> None: + test_connection(self.connection) diff --git a/ingestion/src/metadata/utils/connection_clients.py b/ingestion/src/metadata/utils/connection_clients.py index 8a9063abb93..31eaf3838ec 100644 --- a/ingestion/src/metadata/utils/connection_clients.py +++ b/ingestion/src/metadata/utils/connection_clients.py @@ -84,3 +84,10 @@ class PowerBiClient: class LookerClient: def __init__(self, client) -> None: self.client = client + + +@dataclass +class DatalakeClient: + def __init__(self, client, config) -> None: + self.client = client + self.config = config diff --git a/ingestion/src/metadata/utils/connections.py b/ingestion/src/metadata/utils/connections.py index 08b8f23865f..a070af74d5d 100644 --- a/ingestion/src/metadata/utils/connections.py +++ b/ingestion/src/metadata/utils/connections.py @@ -16,6 +16,7 @@ import json import logging import os import traceback +from distutils.command.config import config from functools import singledispatch from typing import Union @@ -53,6 +54,11 @@ from metadata.generated.schema.entity.services.connections.database.bigQueryConn from metadata.generated.schema.entity.services.connections.database.databricksConnection import ( DatabricksConnection, ) +from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( + DatalakeConnection, + GCSConfig, + S3Config, +) from metadata.generated.schema.entity.services.connections.database.deltaLakeConnection import ( DeltaLakeConnection, ) @@ -73,6 +79,7 @@ from metadata.generated.schema.entity.services.connections.messaging.kafkaConnec ) from metadata.orm_profiler.orm.functions.conn_test import ConnTestFn from metadata.utils.connection_clients import ( + DatalakeClient, DeltaLakeClient, DynamoClient, GlueClient, @@ -565,3 +572,63 @@ def _(connection: LookerClient) -> None: raise SourceConnectionException( f"Unknown error connecting with {connection} - {err}." ) + + +@test_connection.register +def _(connection: DatalakeClient) -> None: + """ + Test that we can connect to the source using the given aws resource + :param engine: boto service resource to test + :return: None or raise an exception if we cannot connect + """ + from botocore.client import ClientError + + try: + config = connection.config.configSource + if isinstance(config, GCSConfig): + if connection.config.bucketName: + connection.client.get_bucket(connection.config.bucketName) + else: + connection.client.list_buckets() + + if isinstance(config, S3Config): + if connection.config.bucketName: + connection.client.list_objects(Bucket=connection.config.bucketName) + else: + connection.client.list_buckets() + + except ClientError as err: + raise SourceConnectionException( + f"Connection error for {connection} - {err}. Check the connection details." + ) + + +@singledispatch +def get_datalake_client(config): + if config: + raise NotImplementedError( + f"Config not implemented for type {type(config)}: {config}" + ) + + +@get_connection.register +def _(connection: DatalakeConnection, verbose: bool = False) -> DatalakeClient: + datalake_connection = get_datalake_client(connection.configSource) + return DatalakeClient(client=datalake_connection, config=connection) + + +@get_datalake_client.register +def _(config: S3Config): + from metadata.utils.aws_client import AWSClient + + s3_client = AWSClient(config.securityConfig).get_client(service_name="s3") + return s3_client + + +@get_datalake_client.register +def _(config: GCSConfig): + from google.cloud import storage + + set_google_credentials(gcs_credentials=config.securityConfig) + gcs_client = storage.Client() + return gcs_client diff --git a/ingestion/src/metadata/utils/gcs_utils.py b/ingestion/src/metadata/utils/gcs_utils.py new file mode 100644 index 00000000000..e9d18df6bed --- /dev/null +++ b/ingestion/src/metadata/utils/gcs_utils.py @@ -0,0 +1,58 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def read_csv_from_gcs(key, bucket_name): + import dask.dataframe as dd + + df = dd.read_csv(f"gs://{bucket_name}/{key.name}") + + return df + + +def read_tsv_from_gcs(key, bucket_name): + + import dask.dataframe as dd + + df = dd.read_csv(f"gs://{bucket_name}/{key.name}", sep="\t") + + return df + + +def read_json_from_gcs(key): + + import pandas as pd + + from metadata.utils.logger import utils_logger + + logger = utils_logger() + import json + import traceback + + try: + + data = key.download_as_string().decode() + df = pd.DataFrame.from_dict(json.loads(data)) + return df + + except ValueError as verr: + logger.debug(traceback.format_exc()) + logger.error(verr) + + +def read_parquet_from_gcs(key, bucket_name): + import gcsfs + import pyarrow.parquet as pq + + gs = gcsfs.GCSFileSystem() + arrow_df = pq.ParquetDataset(f"gs://{bucket_name}/{key.name}", filesystem=gs) + df = arrow_df.read_pandas().to_pandas() + return df diff --git a/ingestion/src/metadata/utils/s3_utils.py b/ingestion/src/metadata/utils/s3_utils.py new file mode 100644 index 00000000000..75c72fedc0e --- /dev/null +++ b/ingestion/src/metadata/utils/s3_utils.py @@ -0,0 +1,57 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def read_csv_from_s3(client, key, bucket_name): + from io import StringIO + + import pandas as pd + + csv_obj = client.get_object(Bucket=bucket_name, Key=key["Key"]) + body = csv_obj["Body"] + csv_string = body.read().decode("utf-8") + df = pd.read_csv(StringIO(csv_string)) + + return df + + +def read_tsv_from_s3(client, key, bucket_name): + from io import StringIO + + import pandas as pd + + csv_obj = client.get_object(Bucket=bucket_name, Key=key["Key"]) + body = csv_obj["Body"] + csv_string = body.read().decode("utf-8") + df = pd.read_csv(StringIO(csv_string), sep="\t") + + return df + + +def read_json_from_s3(client, key, bucket_name): + import json + + import pandas as pd + + obj = client.get_object(Bucket=bucket_name, Key=key["Key"]) + json_text = obj["Body"].read().decode("utf-8") + data = json.loads(json_text) + df = pd.DataFrame.from_dict(data) + + return df + + +def read_parquet_from_s3(client, key, bucket_name): + import dask.dataframe as dd + + df = dd.read_parquet(f"s3://{bucket_name}/{key['Key']}") + + return df diff --git a/ingestion/src/metadata/utils/source_connections.py b/ingestion/src/metadata/utils/source_connections.py index d7df09358f9..4c481b6228a 100644 --- a/ingestion/src/metadata/utils/source_connections.py +++ b/ingestion/src/metadata/utils/source_connections.py @@ -8,6 +8,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + + """ Hosts the singledispatch to build source URLs """