Glue pagination added (#1282)

This commit is contained in:
Ayush Shah 2021-11-21 02:16:18 +05:30 committed by GitHub
parent ea79b47c45
commit 219246b78e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 89 additions and 25 deletions

View File

@ -4,7 +4,8 @@
"config": {
"aws_access_key_id": "aws_access_key_id",
"aws_secret_access_key": "aws_secret_access_key",
"database": "database",
"db_service_name": "local_glue_db",
"pipeline_service_name": "local_glue_pipeline",
"region_name": "region_name",
"endpoint_url": "endpoint_url",
"service_name": "local_glue"

View File

@ -126,7 +126,7 @@ plugins: Dict[str, Set[str]] = {
build_options = {"includes": ["_cffi_backend"]}
setup(
name="openmetadata-ingestion",
version="0.4.1",
version="0.4.4",
url="https://open-metadata.org/",
author="OpenMetadata Committers",
license="Apache License 2.0",

View File

@ -213,6 +213,9 @@ class ElasticsearchSink(Sink):
if table.followers:
for follower in table.followers.__root__:
table_followers.append(str(follower.id.__root__))
table_type = None
if hasattr(table.tableType, "name"):
table_type = table.tableType.name
table_doc = TableESDocument(
table_id=str(table.id.__root__),
database=str(database_entity.name.__root__),
@ -222,7 +225,7 @@ class ElasticsearchSink(Sink):
table_name=table.name.__root__,
suggest=suggest,
description=table.description,
table_type=table.tableType.name,
table_type=table_type,
last_updated_timestamp=timestamp,
column_names=column_names,
column_descriptions=column_descriptions,

View File

@ -18,10 +18,12 @@ from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.ingestion.source.sample_data import get_pipeline_service_or_create
from metadata.ingestion.source.sql_source import SQLSourceStatus
from metadata.utils.column_helpers import check_column_complex_type
from metadata.utils.helpers import get_database_service_or_create
from metadata.utils.helpers import (
get_database_service_or_create,
get_pipeline_service_or_create,
)
logger: logging.Logger = logging.getLogger(__name__)
@ -32,9 +34,10 @@ class GlueSourceConfig(ConfigModel):
aws_secret_access_key: str
endpoint_url: str
region_name: str
database: str
service_name: str
service_name: str = ""
host_port: str = ""
db_service_name: str
pipeline_service_name: str
filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
def get_service_type(self) -> DatabaseServiceType:
@ -50,11 +53,13 @@ class GlueSource(Source):
self.config = config
self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config)
self.service = get_database_service_or_create(config, metadata_config)
self.service = get_database_service_or_create(
config, metadata_config, self.config.db_service_name
)
self.task_id_mapping = {}
self.pipeline_service = get_pipeline_service_or_create(
{
"name": self.config.service_name,
"name": self.config.pipeline_service_name,
"serviceType": "Glue",
"pipelineUrl": self.config.endpoint_url,
},
@ -67,6 +72,8 @@ class GlueSource(Source):
region_name=self.config.region_name,
endpoint_url=self.config.endpoint_url,
)
self.database_name = None
self.next_db_token = None
@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
@ -77,8 +84,27 @@ class GlueSource(Source):
def prepare(self):
pass
def assign_next_token_db(self, glue_db_resp):
if "NextToken" in glue_db_resp:
self.next_db_token = glue_db_resp["NextToken"]
else:
self.next_db_token = "break"
def next_record(self) -> Iterable[Record]:
yield from self.ingest_tables()
while True:
if self.next_db_token == "break":
break
elif self.next_db_token:
glue_db_resp = self.glue.get_databases(
NextToken=self.next_db_token, ResourceShareType="ALL"
)
self.assign_next_token_db(glue_db_resp)
else:
glue_db_resp = self.glue.get_databases(ResourceShareType="ALL")
self.assign_next_token_db(glue_db_resp)
for db in glue_db_resp["DatabaseList"]:
self.database_name = db["Name"]
yield from self.ingest_tables()
yield from self.ingest_pipelines()
def get_columns(self, columnData):
@ -95,7 +121,7 @@ class GlueSource(Source):
self.status, self.dataset_name, column["Type"].lower(), column["Name"]
)
yield Column(
name=column["Name"],
name=column["Name"][:63],
description="",
dataType=col_type,
dataTypeDisplay="{}({})".format(col_type, 1)
@ -104,14 +130,19 @@ class GlueSource(Source):
ordinalPosition=row_order,
children=children,
arrayDataType=arr_data_type,
dataLength=1,
)
row_order += 1
def ingest_tables(self) -> Iterable[OMetaDatabaseAndTable]:
def ingest_tables(self, next_tables_token=None) -> Iterable[OMetaDatabaseAndTable]:
try:
for tables in self.glue.get_tables(DatabaseName=self.config.database)[
"TableList"
]:
if next_tables_token is not None:
glue_resp = self.glue.get_tables(
DatabaseName=self.database_name, NextToken=next_tables_token
)
else:
glue_resp = self.glue.get_tables(DatabaseName=self.database_name)
for tables in glue_resp["TableList"]:
if not self.config.filter_pattern.included(tables["Name"]):
self.status.filter(
"{}.{}".format(self.config.get_service_name(), tables["Name"]),
@ -120,16 +151,16 @@ class GlueSource(Source):
continue
database_entity = Database(
name=tables["DatabaseName"],
service=EntityReference(
id=self.service.id, type=self.config.service_type
),
service=EntityReference(id=self.service.id, type="databaseService"),
)
fqn = (
f"{self.config.service_name}.{self.database_name}.{tables['Name']}"
)
fqn = f"{self.config.service_name}.{self.config.database}.{tables['Name']}"
self.dataset_name = fqn
table_columns = self.get_columns(tables["StorageDescriptor"])
table_entity = Table(
id=uuid.uuid4(),
name=tables["Name"],
name=tables["Name"][:64],
description=tables["Description"]
if hasattr(tables, "Description")
else "",
@ -140,6 +171,8 @@ class GlueSource(Source):
table=table_entity, database=database_entity
)
yield table_and_db
if "NextToken" in glue_resp:
yield from self.ingest_tables(glue_resp["NextToken"])
except Exception as err:
logger.error(traceback.format_exc())
logger.error(traceback.print_exc())
@ -155,14 +188,15 @@ class GlueSource(Source):
list(self.task_id_mapping.values()).index(
edges["DestinationId"]
)
]
][:63]
)
return downstreamTasks
def get_tasks(self, tasks):
taskList = []
for task in tasks["Graph"]["Nodes"]:
self.task_id_mapping[task["Name"]] = task["UniqueId"]
task_name = task["Name"][:63]
self.task_id_mapping[task_name] = task["UniqueId"]
for task in tasks["Graph"]["Nodes"]:
taskList.append(
Task(

View File

@ -34,6 +34,7 @@ _column_type_mapping: Dict[Type[types.TypeEngine], str] = {
types.JSON: "JSON",
types.CHAR: "CHAR",
types.DECIMAL: "DECIMAL",
types.Interval: "INTERVAL",
}
_column_string_mapping = {
@ -64,6 +65,12 @@ _column_string_mapping = {
"TIMESTAMP WITHOUT TIME ZONE": "TIMESTAMP",
"FLOAT64": "DOUBLE",
"DECIMAL": "DECIMAL",
"DOUBLE": "DOUBLE",
"INTERVAL": "INTERVAL",
"SET": "SET",
"BINARY": "BINARY",
"SMALLINT": "SMALLINT",
"TINYINT": "TINYINT",
}
_known_unknown_column_types: Set[Type[types.TypeEngine]] = {
@ -123,7 +130,7 @@ def get_column_type(status: SourceStatus, dataset_name: str, column_type: Any) -
type_class = "NULL"
break
for col_type in _column_string_mapping.keys():
if str(column_type).split("(")[0].upper() in col_type:
if str(column_type).split("(")[0].split("<")[0].upper() in col_type:
type_class = _column_string_mapping.get(col_type)
break
else:

View File

@ -24,9 +24,13 @@ from metadata.generated.schema.api.services.createDatabaseService import (
from metadata.generated.schema.api.services.createMessagingService import (
CreateMessagingServiceEntityRequest,
)
from metadata.generated.schema.api.services.createPipelineService import (
CreatePipelineServiceEntityRequest,
)
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.ingestion.ometa.ometa_api import OpenMetadata
@ -47,10 +51,13 @@ def snake_to_camel(s):
return "".join(a)
def get_database_service_or_create(config, metadata_config) -> DatabaseService:
def get_database_service_or_create(
config, metadata_config, service_name=None
) -> DatabaseService:
metadata = OpenMetadata(metadata_config)
config.service_name = service_name if service_name else config.service_name
service = metadata.get_by_name(entity=DatabaseService, fqdn=config.service_name)
if service is not None:
if service:
return service
else:
service = {
@ -116,6 +123,18 @@ def get_dashboard_service_or_create(
return created_service
def get_pipeline_service_or_create(service_json, metadata_config) -> PipelineService:
metadata = OpenMetadata(metadata_config)
service = metadata.get_by_name(entity=PipelineService, fqdn=service_json["name"])
if service is not None:
return service
else:
created_service = metadata.create_or_update(
CreatePipelineServiceEntityRequest(**service_json)
)
return created_service
def convert_epoch_to_iso(seconds_since_epoch):
dt = datetime.utcfromtimestamp(seconds_since_epoch)
iso_format = dt.isoformat() + "Z"