datalake-csv-files-ingestion-added (#5343)

datalake-csv-files-ingestion-added (#5343)
This commit is contained in:
Abhishek Pandey 2022-06-15 12:27:21 +05:30 committed by GitHub
parent 8ec2b985c8
commit e8975aac01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 597 additions and 1 deletions

View File

@ -0,0 +1,83 @@
{
"$id": "https://open-metadata.org/schema/entity/services/connections/database/datalakeConnection.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "DatalakeConnection",
"description": "Datalake Connection Config",
"type": "object",
"javaType": "org.openmetadata.catalog.services.connections.database.DatalakeConnection",
"definitions": {
"datalakeType": {
"description": "Service type.",
"type": "string",
"enum": ["Datalake"],
"default": "Datalake"
},
"GCSConfig": {
"title": "DataLake GCS Config Source",
"description": "DataLake Catalog and Manifest files in GCS storage. We will search for catalog.json and manifest.json.",
"properties": {
"securityConfig": {
"title": "DataLake GCS Security Config",
"$ref": "../../../../security/credentials/gcsCredentials.json"
}
}
},
"S3Config": {
"title": "DataLake S3 Config Source",
"description": "DataLake Catalog and Manifest files in S3 bucket. We will search for catalog.json and manifest.json.",
"properties": {
"securityConfig": {
"title": "DataLake S3 Security Config",
"$ref": "../../../../security/credentials/awsCredentials.json"
}
}
}
},
"properties": {
"type": {
"title": "Service Type",
"description": "Service Type",
"$ref": "#/definitions/datalakeType",
"default": "Datalake"
},
"configSource": {
"title": "DataLake Configuration Source",
"description": "Available sources to fetch files.",
"oneOf": [
{
"$ref": "#/definitions/S3Config"
},
{
"$ref": "#/definitions/GCSConfig"
}
]
},
"bucketName": {
"title": "Bucket Name",
"description": "Bucket Name of the data source.",
"type": "string",
"default": ""
},
"prefix": {
"title": "Prefix",
"description": "Prefix of the data source.",
"type": "string",
"default": ""
},
"connectionOptions": {
"title": "Connection Options",
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"
},
"connectionArguments": {
"title": "Connection Arguments",
"$ref": "../connectionBasicType.json#/definitions/connectionArguments"
},
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
}
},
"additionalProperties": false,
"required": ["configSource"]
}

View File

@ -36,7 +36,8 @@
"DeltaLake", "DeltaLake",
"Salesforce", "Salesforce",
"SampleData", "SampleData",
"PinotDB" "PinotDB",
"Datalake"
], ],
"javaEnums": [ "javaEnums": [
{ {
@ -116,6 +117,9 @@
}, },
{ {
"name": "PinotDB" "name": "PinotDB"
},
{
"name": "Datalake"
} }
] ]
}, },
@ -202,6 +206,9 @@
}, },
{ {
"$ref": "./connections/database/pinotDBConnection.json" "$ref": "./connections/database/pinotDBConnection.json"
},
{
"$ref": "./connections/database/datalakeConnection.json"
} }
] ]
} }

View File

@ -0,0 +1,26 @@
source:
type: datalake
serviceName: local_datalake4
serviceConnection:
config:
type: Datalake
configSource:
securityConfig:
awsAccessKeyId: aws access key id
awsSecretAccessKey: aws secret access key
awsRegion: aws region
bucketName: bucket name
prefix: prefix
sourceConfig:
config:
tableFilterPattern:
includes:
- ''
sink:
type: metadata-rest
config: {}
workflowConfig:
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: no-auth

View File

@ -80,6 +80,15 @@ plugins: Dict[str, Set[str]] = {
"bigquery-usage": {"google-cloud-logging", "cachetools"}, "bigquery-usage": {"google-cloud-logging", "cachetools"},
"docker": {"python_on_whales==0.34.0"}, "docker": {"python_on_whales==0.34.0"},
"backup": {"boto3~=1.19.12"}, "backup": {"boto3~=1.19.12"},
"datalake": {
"google-cloud-storage==1.43.0",
"pandas==1.3.5",
"gcsfs==2022.5.0",
"s3fs==0.4.2",
"dask==2022.2.0",
"pyarrow==6.0.1",
"boto3~=1.19.12",
},
"dbt": {"google-cloud", "boto3", "google-cloud-storage==1.43.0"}, "dbt": {"google-cloud", "boto3", "google-cloud-storage==1.43.0"},
"druid": {"pydruid>=0.6.2"}, "druid": {"pydruid>=0.6.2"},
"elasticsearch": {"elasticsearch==7.13.1"}, "elasticsearch": {"elasticsearch==7.13.1"},

View File

@ -0,0 +1,280 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
DataLake connector to fetch metadata from a files stored s3, gcs and Hdfs
"""
import traceback
import uuid
from typing import Iterable, Optional
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Column, Table, TableData
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
GCSConfig,
S3Config,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import SQLSourceStatus
from metadata.utils.connections import get_connection, test_connection
from metadata.utils.filters import filter_by_table
from metadata.utils.gcs_utils import (
read_csv_from_gcs,
read_json_from_gcs,
read_parquet_from_gcs,
read_tsv_from_gcs,
)
from metadata.utils.logger import ingestion_logger
from metadata.utils.s3_utils import (
read_csv_from_s3,
read_json_from_s3,
read_parquet_from_s3,
read_tsv_from_s3,
)
logger = ingestion_logger()
class DatalakeSource(Source[Entity]):
def __init__(self, config: WorkflowSource, metadata_config: OpenMetadataConnection):
super().__init__()
self.status = SQLSourceStatus()
self.config = config
self.source_config: DatabaseServiceMetadataPipeline = (
self.config.sourceConfig.config
)
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.service_connection = self.config.serviceConnection.__root__.config
self.service = self.metadata.get_service_or_create(
entity=DatabaseService, config=config
)
self.connection = get_connection(self.service_connection)
self.client = self.connection.client
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: DatalakeConnection = config.serviceConnection.__root__.config
if not isinstance(connection, DatalakeConnection):
raise InvalidSourceException(
f"Expected DatalakeConnection, but got {connection}"
)
return cls(config, metadata_config)
def prepare(self):
pass
def next_record(self) -> Iterable[Entity]:
try:
bucket_name = self.service_connection.bucketName
prefix = self.service_connection.prefix
if isinstance(self.service_connection.configSource, GCSConfig):
if bucket_name:
yield from self.get_gcs_files(bucket_name, prefix)
else:
for bucket in self.client.list_buckets():
yield from self.get_gcs_files(bucket.name, prefix)
if isinstance(self.service_connection.configSource, S3Config):
if bucket_name:
yield from self.get_s3_files(bucket_name, prefix)
else:
for bucket in self.client.list_buckets()["Buckets"]:
yield from self.get_s3_files(bucket["Name"], prefix)
except Exception as err:
logger.error(traceback.format_exc())
logger.error(err)
def get_gcs_files(self, bucket_name, prefix):
bucket = self.client.get_bucket(bucket_name)
for key in bucket.list_blobs(prefix=prefix):
try:
if filter_by_table(
self.config.sourceConfig.config.tableFilterPattern, key.name
):
self.status.filter(
"{}".format(key["Key"]),
"Table pattern not allowed",
)
continue
if key.name.endswith(".csv"):
df = read_csv_from_gcs(key, bucket_name)
yield from self.ingest_tables(key.name, df, bucket_name)
if key.name.endswith(".tsv"):
df = read_tsv_from_gcs(key, bucket_name)
yield from self.ingest_tables(key.name, df, bucket_name)
if key.name.endswith(".json"):
df = read_json_from_gcs(key)
yield from self.ingest_tables(key.name, df, bucket_name)
if key.name.endswith(".parquet"):
df = read_parquet_from_gcs(key, bucket_name)
yield from self.ingest_tables(key.name, df, bucket_name)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
def get_s3_files(self, bucket_name, prefix):
kwargs = {"Bucket": bucket_name}
if prefix:
kwargs["prefix"] = prefix
for key in self.client.list_objects(**kwargs)["Contents"]:
try:
if filter_by_table(
self.config.sourceConfig.config.tableFilterPattern, key["Key"]
):
self.status.filter(
"{}".format(key["Key"]),
"Table pattern not allowed",
)
continue
if key["Key"].endswith(".csv"):
df = read_csv_from_s3(self.client, key, bucket_name)
yield from self.ingest_tables(key["Key"], df, bucket_name)
if key["Key"].endswith(".tsv"):
df = read_tsv_from_s3(self.client, key, bucket_name)
yield from self.ingest_tables(key["Key"], df, bucket_name)
if key["Key"].endswith(".json"):
df = read_json_from_s3(self.client, key, bucket_name)
yield from self.ingest_tables(key["Key"], df, bucket_name)
if key["Key"].endswith(".parquet"):
df = read_parquet_from_s3(self.client, key, bucket_name)
yield from self.ingest_tables(key["Key"], df, bucket_name)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
def ingest_tables(self, key, df, bucket_name) -> Iterable[OMetaDatabaseAndTable]:
try:
table_columns = self.get_columns(df)
database_entity = Database(
id=uuid.uuid4(),
name="default",
service=EntityReference(id=self.service.id, type="databaseService"),
)
table_entity = Table(
id=uuid.uuid4(),
name=key,
description="",
columns=table_columns,
)
schema_entity = DatabaseSchema(
id=uuid.uuid4(),
name=bucket_name,
database=EntityReference(id=database_entity.id, type="database"),
service=EntityReference(id=self.service.id, type="databaseService"),
)
table_and_db = OMetaDatabaseAndTable(
table=table_entity,
database=database_entity,
database_schema=schema_entity,
)
yield table_and_db
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
def fetch_sample_data(self, df, table: str) -> Optional[TableData]:
try:
cols = []
table_columns = self.get_columns(df)
for col in table_columns:
cols.append(col.name.__root__)
table_rows = df.values.tolist()
return TableData(columns=cols, rows=table_rows)
# Catch any errors and continue the ingestion
except Exception as err: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.error(f"Failed to generate sample data for {table} - {err}")
return None
def get_columns(self, df):
df_columns = list(df.columns)
for column in df_columns:
try:
if hasattr(df[column], "dtypes"):
if df[column].dtypes.name == "int64":
data_type = "INT"
if df[column].dtypes.name == "object":
data_type = "INT"
else:
data_type = "STRING"
parsed_string = {}
parsed_string["dataTypeDisplay"] = column
parsed_string["dataType"] = data_type
parsed_string["name"] = column[:64]
parsed_string["dataLength"] = parsed_string.get("dataLength", 1)
yield Column(**parsed_string)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
def close(self):
pass
def get_status(self) -> SourceStatus:
return self.status
def test_connection(self) -> None:
test_connection(self.connection)

View File

@ -84,3 +84,10 @@ class PowerBiClient:
class LookerClient: class LookerClient:
def __init__(self, client) -> None: def __init__(self, client) -> None:
self.client = client self.client = client
@dataclass
class DatalakeClient:
def __init__(self, client, config) -> None:
self.client = client
self.config = config

View File

@ -16,6 +16,7 @@ import json
import logging import logging
import os import os
import traceback import traceback
from distutils.command.config import config
from functools import singledispatch from functools import singledispatch
from typing import Union from typing import Union
@ -53,6 +54,11 @@ from metadata.generated.schema.entity.services.connections.database.bigQueryConn
from metadata.generated.schema.entity.services.connections.database.databricksConnection import ( from metadata.generated.schema.entity.services.connections.database.databricksConnection import (
DatabricksConnection, DatabricksConnection,
) )
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
GCSConfig,
S3Config,
)
from metadata.generated.schema.entity.services.connections.database.deltaLakeConnection import ( from metadata.generated.schema.entity.services.connections.database.deltaLakeConnection import (
DeltaLakeConnection, DeltaLakeConnection,
) )
@ -73,6 +79,7 @@ from metadata.generated.schema.entity.services.connections.messaging.kafkaConnec
) )
from metadata.orm_profiler.orm.functions.conn_test import ConnTestFn from metadata.orm_profiler.orm.functions.conn_test import ConnTestFn
from metadata.utils.connection_clients import ( from metadata.utils.connection_clients import (
DatalakeClient,
DeltaLakeClient, DeltaLakeClient,
DynamoClient, DynamoClient,
GlueClient, GlueClient,
@ -565,3 +572,63 @@ def _(connection: LookerClient) -> None:
raise SourceConnectionException( raise SourceConnectionException(
f"Unknown error connecting with {connection} - {err}." f"Unknown error connecting with {connection} - {err}."
) )
@test_connection.register
def _(connection: DatalakeClient) -> None:
"""
Test that we can connect to the source using the given aws resource
:param engine: boto service resource to test
:return: None or raise an exception if we cannot connect
"""
from botocore.client import ClientError
try:
config = connection.config.configSource
if isinstance(config, GCSConfig):
if connection.config.bucketName:
connection.client.get_bucket(connection.config.bucketName)
else:
connection.client.list_buckets()
if isinstance(config, S3Config):
if connection.config.bucketName:
connection.client.list_objects(Bucket=connection.config.bucketName)
else:
connection.client.list_buckets()
except ClientError as err:
raise SourceConnectionException(
f"Connection error for {connection} - {err}. Check the connection details."
)
@singledispatch
def get_datalake_client(config):
if config:
raise NotImplementedError(
f"Config not implemented for type {type(config)}: {config}"
)
@get_connection.register
def _(connection: DatalakeConnection, verbose: bool = False) -> DatalakeClient:
datalake_connection = get_datalake_client(connection.configSource)
return DatalakeClient(client=datalake_connection, config=connection)
@get_datalake_client.register
def _(config: S3Config):
from metadata.utils.aws_client import AWSClient
s3_client = AWSClient(config.securityConfig).get_client(service_name="s3")
return s3_client
@get_datalake_client.register
def _(config: GCSConfig):
from google.cloud import storage
set_google_credentials(gcs_credentials=config.securityConfig)
gcs_client = storage.Client()
return gcs_client

View File

@ -0,0 +1,58 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def read_csv_from_gcs(key, bucket_name):
import dask.dataframe as dd
df = dd.read_csv(f"gs://{bucket_name}/{key.name}")
return df
def read_tsv_from_gcs(key, bucket_name):
import dask.dataframe as dd
df = dd.read_csv(f"gs://{bucket_name}/{key.name}", sep="\t")
return df
def read_json_from_gcs(key):
import pandas as pd
from metadata.utils.logger import utils_logger
logger = utils_logger()
import json
import traceback
try:
data = key.download_as_string().decode()
df = pd.DataFrame.from_dict(json.loads(data))
return df
except ValueError as verr:
logger.debug(traceback.format_exc())
logger.error(verr)
def read_parquet_from_gcs(key, bucket_name):
import gcsfs
import pyarrow.parquet as pq
gs = gcsfs.GCSFileSystem()
arrow_df = pq.ParquetDataset(f"gs://{bucket_name}/{key.name}", filesystem=gs)
df = arrow_df.read_pandas().to_pandas()
return df

View File

@ -0,0 +1,57 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def read_csv_from_s3(client, key, bucket_name):
from io import StringIO
import pandas as pd
csv_obj = client.get_object(Bucket=bucket_name, Key=key["Key"])
body = csv_obj["Body"]
csv_string = body.read().decode("utf-8")
df = pd.read_csv(StringIO(csv_string))
return df
def read_tsv_from_s3(client, key, bucket_name):
from io import StringIO
import pandas as pd
csv_obj = client.get_object(Bucket=bucket_name, Key=key["Key"])
body = csv_obj["Body"]
csv_string = body.read().decode("utf-8")
df = pd.read_csv(StringIO(csv_string), sep="\t")
return df
def read_json_from_s3(client, key, bucket_name):
import json
import pandas as pd
obj = client.get_object(Bucket=bucket_name, Key=key["Key"])
json_text = obj["Body"].read().decode("utf-8")
data = json.loads(json_text)
df = pd.DataFrame.from_dict(data)
return df
def read_parquet_from_s3(client, key, bucket_name):
import dask.dataframe as dd
df = dd.read_parquet(f"s3://{bucket_name}/{key['Key']}")
return df

View File

@ -8,6 +8,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
""" """
Hosts the singledispatch to build source URLs Hosts the singledispatch to build source URLs
""" """