mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-24 17:08:28 +00:00
Refactored Presto based on schema changes (#3990)
This commit is contained in:
parent
3402b5f3e2
commit
a5546090e7
@ -1,22 +1,31 @@
|
|||||||
{
|
{
|
||||||
"source": {
|
"source": {
|
||||||
"type": "presto",
|
"type": "presto",
|
||||||
"config": {
|
"serviceName": "local_presto",
|
||||||
"service_name": "local_presto",
|
"serviceConnection": {
|
||||||
"host_port": "localhost:8080",
|
"config": {
|
||||||
"catalog": "tpcds"
|
"type": "Presto",
|
||||||
|
"hostPort": "localhost:8080",
|
||||||
|
"catalog": "tpcds",
|
||||||
|
"username": "admin",
|
||||||
|
"password": "password",
|
||||||
|
"database": "database"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"sourceConfig": {
|
||||||
|
"config": {
|
||||||
|
"generateSampleData": false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"sink": {
|
"sink": {
|
||||||
"type": "metadata-rest",
|
"type": "metadata-rest",
|
||||||
"config": {
|
"config": {}
|
||||||
}
|
|
||||||
},
|
},
|
||||||
"metadata_server": {
|
"workflowConfig": {
|
||||||
"type": "metadata-server",
|
"openMetadataServerConfig": {
|
||||||
"config": {
|
"hostPort": "http://localhost:8585/api",
|
||||||
"api_endpoint": "http://localhost:8585/api",
|
"authProvider": "no-auth"
|
||||||
"auth_provider_type": "no-auth"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -16,13 +16,6 @@ import logging
|
|||||||
|
|
||||||
from pydantic import SecretStr
|
from pydantic import SecretStr
|
||||||
|
|
||||||
from metadata.config.common import ConfigModel
|
|
||||||
from metadata.generated.schema.entity.services.connections.dashboard.supersetConnection import (
|
|
||||||
SupersetConnection,
|
|
||||||
)
|
|
||||||
from metadata.generated.schema.entity.services.dashboardService import (
|
|
||||||
DashboardServiceType,
|
|
||||||
)
|
|
||||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||||
Source as WorkflowSource,
|
Source as WorkflowSource,
|
||||||
)
|
)
|
||||||
|
@ -27,7 +27,6 @@ from metadata.generated.schema.metadataIngestion.workflow import (
|
|||||||
from metadata.generated.schema.type.entityReference import EntityReference
|
from metadata.generated.schema.type.entityReference import EntityReference
|
||||||
from metadata.ingestion.api.source import InvalidSourceException
|
from metadata.ingestion.api.source import InvalidSourceException
|
||||||
from metadata.ingestion.source.sql_source import SQLSource
|
from metadata.ingestion.source.sql_source import SQLSource
|
||||||
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
|
|
||||||
|
|
||||||
|
|
||||||
class OracleSource(SQLSource):
|
class OracleSource(SQLSource):
|
||||||
|
@ -19,8 +19,11 @@ from sqlalchemy.engine import reflection
|
|||||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||||
OpenMetadataServerConfig,
|
OpenMetadataServerConfig,
|
||||||
)
|
)
|
||||||
|
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||||
|
Source as WorkflowSource,
|
||||||
|
)
|
||||||
|
from metadata.ingestion.api.source import InvalidSourceException
|
||||||
from metadata.ingestion.source.sql_source import SQLSource
|
from metadata.ingestion.source.sql_source import SQLSource
|
||||||
from metadata.ingestion.source.sql_source_common import SQLConnectionConfig
|
|
||||||
|
|
||||||
_type_map.update(
|
_type_map.update(
|
||||||
{
|
{
|
||||||
@ -75,26 +78,16 @@ from metadata.generated.schema.entity.services.connections.database.prestoConnec
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class PrestoConfig(PrestoConnection, SQLConnectionConfig):
|
|
||||||
def get_connection_url(self):
|
|
||||||
url = f"{self.scheme}://"
|
|
||||||
if self.username:
|
|
||||||
url += f"{quote_plus(self.username)}"
|
|
||||||
if self.password:
|
|
||||||
url += f":{quote_plus(self.password.get_secret_value())}"
|
|
||||||
url += "@"
|
|
||||||
url += f"{self.hostPort}"
|
|
||||||
url += f"/{self.catalog}"
|
|
||||||
if self.database:
|
|
||||||
url += f"?schema={quote_plus(self.database)}"
|
|
||||||
return url
|
|
||||||
|
|
||||||
|
|
||||||
class PrestoSource(SQLSource):
|
class PrestoSource(SQLSource):
|
||||||
def __init__(self, config, metadata_config):
|
def __init__(self, config, metadata_config):
|
||||||
super().__init__(config, metadata_config)
|
super().__init__(config, metadata_config)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create(cls, config_dict, metadata_config: OpenMetadataServerConfig):
|
def create(cls, config_dict, metadata_config: OpenMetadataServerConfig):
|
||||||
config = PrestoConfig.parse_obj(config_dict)
|
config = WorkflowSource.parse_obj(config_dict)
|
||||||
|
connection: PrestoConnection = config.serviceConnection.__root__.config
|
||||||
|
if not isinstance(connection, PrestoConnection):
|
||||||
|
raise InvalidSourceException(
|
||||||
|
f"Expected PrestoConnection, but got {connection}"
|
||||||
|
)
|
||||||
return cls(config, metadata_config)
|
return cls(config, metadata_config)
|
||||||
|
@ -258,21 +258,25 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
|
|||||||
schema_names = inspector.get_schema_names()
|
schema_names = inspector.get_schema_names()
|
||||||
for schema in schema_names:
|
for schema in schema_names:
|
||||||
# clear any previous source database state
|
# clear any previous source database state
|
||||||
self.database_source_state.clear()
|
try:
|
||||||
if filter_by_schema(
|
self.database_source_state.clear()
|
||||||
self.source_config.schemaFilterPattern, schema_name=schema
|
if filter_by_schema(
|
||||||
):
|
self.source_config.schemaFilterPattern, schema_name=schema
|
||||||
self.status.filter(schema, "Schema pattern not allowed")
|
):
|
||||||
continue
|
self.status.filter(schema, "Schema pattern not allowed")
|
||||||
|
continue
|
||||||
|
|
||||||
if self.source_config.includeTables:
|
if self.source_config.includeTables:
|
||||||
yield from self.fetch_tables(inspector, schema)
|
yield from self.fetch_tables(inspector, schema)
|
||||||
|
|
||||||
if self.source_config.includeViews:
|
if self.source_config.includeViews:
|
||||||
yield from self.fetch_views(inspector, schema)
|
yield from self.fetch_views(inspector, schema)
|
||||||
if self.source_config.markDeletedTables:
|
if self.source_config.markDeletedTables:
|
||||||
schema_fqdn = f"{self.config.serviceName}.{schema}"
|
schema_fqdn = f"{self.config.serviceName}.{schema}"
|
||||||
yield from self.delete_tables(schema_fqdn)
|
yield from self.delete_tables(schema_fqdn)
|
||||||
|
except Exception as err:
|
||||||
|
logger.debug(traceback.format_exc())
|
||||||
|
logger.error(err)
|
||||||
|
|
||||||
def fetch_tables(
|
def fetch_tables(
|
||||||
self, inspector: Inspector, schema: str
|
self, inspector: Inspector, schema: str
|
||||||
|
@ -43,6 +43,9 @@ from metadata.generated.schema.entity.services.connections.database.oracleConnec
|
|||||||
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
|
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
|
||||||
PostgresConnection,
|
PostgresConnection,
|
||||||
)
|
)
|
||||||
|
from metadata.generated.schema.entity.services.connections.database.prestoConnection import (
|
||||||
|
PrestoConnection,
|
||||||
|
)
|
||||||
from metadata.generated.schema.entity.services.connections.database.redshiftConnection import (
|
from metadata.generated.schema.entity.services.connections.database.redshiftConnection import (
|
||||||
RedshiftConnection,
|
RedshiftConnection,
|
||||||
)
|
)
|
||||||
@ -167,6 +170,21 @@ def _(connection: DatabricksConnection):
|
|||||||
return url
|
return url
|
||||||
|
|
||||||
|
|
||||||
|
@get_connection_url.register
|
||||||
|
def _(connection: PrestoConnection):
|
||||||
|
url = f"{connection.scheme.value}://"
|
||||||
|
if connection.username:
|
||||||
|
url += f"{quote_plus(connection.username)}"
|
||||||
|
if connection.password:
|
||||||
|
url += f":{quote_plus(connection.password.get_secret_value())}"
|
||||||
|
url += "@"
|
||||||
|
url += f"{connection.hostPort}"
|
||||||
|
url += f"/{connection.catalog}"
|
||||||
|
if connection.database:
|
||||||
|
url += f"?schema={quote_plus(connection.database)}"
|
||||||
|
return url
|
||||||
|
|
||||||
|
|
||||||
@singledispatch
|
@singledispatch
|
||||||
def get_connection_args(connection):
|
def get_connection_args(connection):
|
||||||
if connection.connectionArguments:
|
if connection.connectionArguments:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user