Fix #7121 - Support Spark metastore DB connection (#7520)

* Fix #7121 - Support Spark metastore DB connection

* appname

* Update docs

* test validation

* Address PR comments

Co-authored-by: Nahuel <nahuel@getcollate.io>
This commit is contained in:
Pere Miquel Brull 2022-09-20 16:47:57 +02:00 committed by GitHub
parent 6b00c75852
commit 1578fa7f1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 199 additions and 45 deletions

View File

@ -12,4 +12,22 @@ CREATE TABLE IF NOT EXISTS user_tokens (
json JSON NOT NULL,
expiryDate BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.expiryDate') NOT NULL,
PRIMARY KEY (token)
);
);
UPDATE dbservice_entity
SET json = JSON_INSERT(
JSON_REMOVE(json, '$.connection.config.metastoreHostPort'),
'$.connection.config.metastoreConnection',
JSON_OBJECT('metastoreHostPort', JSON_EXTRACT(json, '$.connection.config.metastoreHostPort'))
)
where serviceType = 'DeltaLake'
and JSON_EXTRACT(json, '$.connection.config.metastoreHostPort') is not null;
UPDATE dbservice_entity
SET json = JSON_INSERT(
JSON_REMOVE(json, '$.connection.config.metastoreFilePath'),
'$.connection.config.metastoreConnection',
JSON_OBJECT('metastoreFilePath', JSON_EXTRACT(json, '$.connection.config.metastoreFilePath'))
)
where serviceType = 'DeltaLake'
and JSON_EXTRACT(json, '$.connection.config.metastoreFilePath') is not null;

View File

@ -110,4 +110,4 @@ DELETE FROM role_entity;
DELETE FROM policy_entity;
DELETE FROM field_relationship WHERE fromType IN ('role', 'policy') OR toType IN ('role', 'policy');
DELETE FROM entity_relationship WHERE fromEntity IN ('role', 'policy') OR toEntity IN ('role', 'policy');
ALTER TABLE role_entity DROP COLUMN defaultRole;
ALTER TABLE role_entity DROP COLUMN defaultRole;

View File

@ -12,4 +12,30 @@ CREATE TABLE IF NOT EXISTS user_tokens (
json JSONB NOT NULL,
expiryDate BIGINT GENERATED ALWAYS AS ((json ->> 'expiryDate')::bigint) STORED NOT NULL,
PRIMARY KEY (token)
);
);
UPDATE dbservice_entity
SET json = jsonb_set(
json,
'{connection,config,metastoreConnection}',
jsonb_build_object('metastoreHostPort', json#>'{connection,config,metastoreHostPort}')
)
WHERE serviceType = 'DeltaLake'
AND json#>'{connection,config,metastoreHostPort}' is not null;
UPDATE dbservice_entity
SET json = json::jsonb #- '{connection,config,metastoreHostPort}'
WHERE serviceType = 'DeltaLake';
UPDATE dbservice_entity
SET json = jsonb_set(
json,
'{connection,config,metastoreConnection}',
jsonb_build_object('metastoreFilePath', json#>'{connection,config,metastoreFilePath}')
)
WHERE serviceType = 'DeltaLake'
AND json#>'{connection,config,metastoreFilePath}' is not null;
UPDATE dbservice_entity
SET json = json::jsonb #- '{connection,config,metastoreFilePath}'
WHERE serviceType = 'DeltaLake';

View File

@ -89,8 +89,6 @@ class CommonDbSourceService(
self.test_connection()
self._connection = None # Lazy init as well
self.data_models = {}
self.dbt_tests = {}
self.table_constraints = None
self.database_source_state = set()
self.context.table_views = []

View File

@ -230,6 +230,10 @@ class DatabaseServiceSource(DBTMixin, TopologyRunnerMixin, Source, ABC):
topology = DatabaseServiceTopology()
context = create_source_context(topology)
# Initialize DBT structures for all Databases
data_models = {}
dbt_tests = {}
def __init__(self):
if hasattr(self.source_config.dbtConfigSource, "dbtSecurityConfig"):
if self.source_config.dbtConfigSource.dbtSecurityConfig is None:

View File

@ -107,10 +107,6 @@ class DeltalakeSource(DatabaseServiceSource):
raise InvalidSourceException(
f"Expected DeltaLakeConnection, but got {connection}"
)
if not connection.metastoreFilePath and not connection.metastoreHostPort:
raise MetaStoreNotFoundException(
"Either of metastoreFilePath or metastoreHostPort is required"
)
return cls(config, metadata_config)
def get_database_names(self) -> Iterable[str]:

View File

@ -297,7 +297,7 @@ def _(connection: DeltaLakeConnection, verbose: bool = False) -> DeltaLakeClient
from delta import configure_spark_with_delta_pip
builder = (
pyspark.sql.SparkSession.builder.appName("random")
pyspark.sql.SparkSession.builder.appName(connection.appName or "OpenMetadata")
.enableHiveSupport()
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
@ -307,19 +307,37 @@ def _(connection: DeltaLakeConnection, verbose: bool = False) -> DeltaLakeClient
# Download delta-core jars when creating the SparkSession
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.0.0")
)
if connection.metastoreHostPort:
# Check that the attribute exists and is properly informed
if (
hasattr(connection.metastoreConnection, "metastoreHostPort")
and connection.metastoreConnection.metastoreHostPort
):
builder.config(
"hive.metastore.uris",
f"thrift://{connection.metastoreHostPort}",
f"thrift://{connection.metastoreConnection.metastoreHostPort}",
)
elif connection.metastoreFilePath:
if (
hasattr(connection.metastoreConnection, "metastoreDb")
and connection.metastoreConnection.metastoreDb
):
builder.config(
"spark.hadoop.javax.jdo.option.ConnectionURL",
connection.metastoreConnection.metastoreDb,
)
if (
hasattr(connection.metastoreConnection, "metastoreFilePath")
and connection.metastoreConnection.metastoreFilePath
):
# From https://stackoverflow.com/questions/38377188/how-to-get-rid-of-derby-log-metastore-db-from-spark-shell
# derby.system.home is the one in charge of the path for `metastore_db` dir and `derby.log`
# We can use this option to control testing, as well as to properly point to the right
# local database when ingesting data
builder.config(
"spark.driver.extraJavaOptions",
f"-Dderby.system.home={connection.metastoreFilePath}",
f"-Dderby.system.home={connection.metastoreConnection.metastoreFilePath}",
)
if connection.connectionArguments:

View File

@ -271,8 +271,39 @@ def test_deltalake():
"serviceName": "local_deltalake",
"serviceConnection": {
"config": {
"metastoreHostPort": "localhost:9083",
"metastoreFilePath": "<path_to_metastore>/metastore_db",
"metastoreConnection": {
"metastoreDb": "jdbc:mysql://localhost:3306/demo_hive"
},
"appName": "MyApp",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
config: WorkflowSource = WorkflowSource.parse_obj(source)
assert isinstance(config.serviceConnection.__root__.config, DeltaLakeConnection)
source = {
"type": "deltalake",
"serviceName": "local_deltalake",
"serviceConnection": {
"config": {
"metastoreConnection": {"metastoreHostPort": "localhost:9083"},
"appName": "MyApp",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
}
config: WorkflowSource = WorkflowSource.parse_obj(source)
assert isinstance(config.serviceConnection.__root__.config, DeltaLakeConnection)
source = {
"type": "deltalake",
"serviceName": "local_deltalake",
"serviceConnection": {
"config": {
"metastoreConnection": {"metastoreFilePath": "/tmp/metastore.db"},
"appName": "MyApp",
}
},

View File

@ -46,7 +46,9 @@ MOCK_DELTA_CONFIG = {
"serviceConnection": {
"config": {
"type": "DeltaLake",
"metastoreFilePath": METASTORE_PATH,
"metastoreConnection": {
"metastoreFilePath": METASTORE_PATH,
},
"connectionArguments": {
"spark.sql.warehouse.dir": SPARK_SQL_WAREHOUSE,
},

View File

@ -111,6 +111,14 @@ upgrade that, but you want to use OM 0.12, reach out to us.
- DBT Config
- Added: `dbtRunResultsFilePath` and `dbtRunResultsHttpPath` where path of the `run_results.json` file can be passed to get the test results data from DBT.
In 0.12.1
- DeltaLake:
- Updated the structure of the connection to better reflect the possible options.
- Removed `metastoreHostPort` and `metastoreFilePath`, which are now embedded inside `metastoreConnection`
- Added `metastoreDb` as an option to be passed inside `metastoreConnection`
- You can find more information about the current structure [here](/openmetadata/connectors/database/deltalake/cli#1-define-the-yaml-config)
### Ingestion from CLI
We have stopped updating the service connection parameters when running the ingestion workflow from the CLI.

View File

@ -14,7 +14,7 @@ Configure and schedule Deltalake metadata and profiler workflows from the OpenMe
## Requirements
<InlineCallout color="violet-70" icon="description" bold="OpenMetadata 0.12 or later" href="/deployment">
<InlineCallout color="violet-70" icon="description" bold="OpenMetadata 0.12.1 or later" href="/deployment">
To deploy OpenMetadata, check the <a href="/deployment">Deployment</a> guides.
</InlineCallout>
@ -53,9 +53,11 @@ source:
serviceConnection:
config:
type: DeltaLake
# Choose either metastoreHostPort or metastoreFilePath
metastoreHostPort: "<metastore host port>"
# metastoreFilePath: "<path_to_metastore>/metastore_db"
metastoreConnection:
# Pick only of the three
metastoreHostPort: "<metastore host port>"
# metastoreDb: jdbc:mysql://localhost:3306/demo_hive
# metastoreFilePath: "<path_to_metastore>/metastore_db"
appName: MyApp
sourceConfig:
config:
@ -134,10 +136,12 @@ workflowConfig:
#### Source Configuration - Service Connection
- **Metastore Host Port**: Enter the Host & Port of Hive Metastore to configure the Spark Session. Either
of `metastoreHostPort` or `metastoreFilePath` is required.
- **Metastore Host Port**: Enter the Host & Port of Hive Metastore Service to configure the Spark Session. Either
of `metastoreHostPort`, `metastoreDb` or `metastoreFilePath` is required.
- **Metastore File Path**: Enter the file path to local Metastore in case Spark cluster is running locally. Either
of `metastoreHostPort` or `metastoreFilePath` is required.
of `metastoreHostPort`, `metastoreDb` or `metastoreFilePath` is required.
- **Metastore DB**: The JDBC connection to the underlying Hive metastore DB. Either
of `metastoreHostPort`, `metastoreDb` or `metastoreFilePath` is required.
- **appName (Optional)**: Enter the app name of spark session.
- **Connection Arguments (Optional)**: Key-Value pairs that will be used to pass extra `config` elements to the Spark
Session builder.

View File

@ -14,7 +14,7 @@ Configure and schedule Deltalake metadata and profiler workflows from the OpenMe
## Requirements
<InlineCallout color="violet-70" icon="description" bold="OpenMetadata 0.12 or later" href="/deployment">
<InlineCallout color="violet-70" icon="description" bold="OpenMetadata 0.12.1 or later" href="/deployment">
To deploy OpenMetadata, check the <a href="/deployment">Deployment</a> guides.
</InlineCallout>
@ -53,9 +53,11 @@ source:
serviceConnection:
config:
type: DeltaLake
# Choose either metastoreHostPort or metastoreFilePath
metastoreHostPort: "<metastore host port>"
# metastoreFilePath: "<path_to_metastore>/metastore_db"
metastoreConnection:
# Pick only of the three
metastoreHostPort: "<metastore host port>"
# metastoreDb: jdbc:mysql://localhost:3306/demo_hive
# metastoreFilePath: "<path_to_metastore>/metastore_db"
appName: MyApp
sourceConfig:
config:
@ -134,10 +136,12 @@ workflowConfig:
#### Source Configuration - Service Connection
- **Metastore Host Port**: Enter the Host & Port of Hive Metastore to configure the Spark Session. Either
of `metastoreHostPort` or `metastoreFilePath` is required.
- **Metastore Host Port**: Enter the Host & Port of Hive Metastore Service to configure the Spark Session. Either
of `metastoreHostPort`, `metastoreDb` or `metastoreFilePath` is required.
- **Metastore File Path**: Enter the file path to local Metastore in case Spark cluster is running locally. Either
of `metastoreHostPort` or `metastoreFilePath` is required.
of `metastoreHostPort`, `metastoreDb` or `metastoreFilePath` is required.
- **Metastore DB**: The JDBC connection to the underlying Hive metastore DB. Either
of `metastoreHostPort`, `metastoreDb` or `metastoreFilePath` is required.
- **appName (Optional)**: Enter the app name of spark session.
- **Connection Arguments (Optional)**: Key-Value pairs that will be used to pass extra `config` elements to the Spark
Session builder.

View File

@ -34,7 +34,7 @@ the following docs to connect using Airflow SDK or with the CLI.
## Requirements
<InlineCallout color="violet-70" icon="description" bold="OpenMetadata 0.12 or later" href="/deployment">
<InlineCallout color="violet-70" icon="description" bold="OpenMetadata 0.12.1 or later" href="/deployment">
To deploy OpenMetadata, check the <a href="/deployment">Deployment</a> guides.
</InlineCallout>
@ -130,9 +130,12 @@ the changes.
#### Connection Options
- **Metastore Host Port**: Enter the Host & Port of Hive Metastore to configure the Spark Session. Either of `metastoreHostPort` or `metastoreFilePath` is required.
- **Metastore File Path**: Enter the file path to local Metastore in case Spark cluster is running locally. Either of `metastoreHostPort` or `metastoreFilePath` is required.
- **Metastore Host Port**: Enter the Host & Port of Hive Metastore Service to configure the Spark Session. Either
of `metastoreHostPort`, `metastoreDb` or `metastoreFilePath` is required.
- **Metastore File Path**: Enter the file path to local Metastore in case Spark cluster is running locally. Either
of `metastoreHostPort`, `metastoreDb` or `metastoreFilePath` is required.
- **Metastore DB**: The JDBC connection to the underlying Hive metastore DB. Either
of `metastoreHostPort`, `metastoreDb` or `metastoreFilePath` is required.
- **appName (Optional)**: Enter the app name of spark session.
- **Connection Arguments (Optional)**: Key-Value pairs that will be used to pass extra `config` elements to the Spark Session builder.

View File

@ -11,6 +11,42 @@
"type": "string",
"enum": ["DeltaLake"],
"default": "DeltaLake"
},
"metastoreHostPortConnection": {
"title": "Hive Metastore Service",
"type":"object",
"properties": {
"metastoreHostPort": {
"title": "Hive Metastore Service",
"description": "Thrift connection to the metastore service. E.g., localhost:9083",
"type": "string"
}
},
"additionalProperties": false
},
"metastoreDbConnection": {
"title": "Hive Metastore Database",
"type":"object",
"properties": {
"metastoreDb": {
"title": "Hive Metastore Database",
"description": "JDBC connection to the metastore database. E.g., jdbc:mysql://localhost:3306/demo_hive",
"type": "string"
}
},
"additionalProperties": false
},
"metastoreFilePathConnection": {
"title": "Hive Metastore File Path",
"type":"object",
"properties": {
"metastoreFilePath": {
"title": "Hive Metastore File Path",
"description": "Local path for the local file with metastore data. E.g., /tmp/metastore.db",
"type": "string"
}
},
"additionalProperties": false
}
},
"properties": {
@ -20,15 +56,20 @@
"$ref": "#/definitions/deltaLakeType",
"default": "DeltaLake"
},
"metastoreHostPort": {
"title": "Metastore Host and Port",
"description": "Host and port of the remote Hive Metastore.",
"type": "string"
},
"metastoreFilePath": {
"title": "Metastore Local File Path",
"description": "File path of the local Hive Metastore.",
"type": "string"
"metastoreConnection": {
"title": "Hive Metastore Connection",
"description": "Hive metastore service, local file path or metastore db.",
"oneOf": [
{
"$ref": "#/definitions/metastoreHostPortConnection"
},
{
"$ref": "#/definitions/metastoreDbConnection"
},
{
"$ref": "#/definitions/metastoreFilePathConnection"
}
]
},
"appName": {
"title": "Application Name",
@ -45,5 +86,6 @@
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
}
},
"additionalProperties": false
"additionalProperties": false,
"required": ["metastoreConnection"]
}