From 219246b78ee877644fa853b6ccb100983191ad14 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Sun, 21 Nov 2021 02:16:18 +0530 Subject: [PATCH] Glue pagination added (#1282) --- ingestion/examples/workflows/glue.json | 3 +- ingestion/setup.py | 2 +- .../metadata/ingestion/sink/elasticsearch.py | 5 +- .../src/metadata/ingestion/source/glue.py | 72 ++++++++++++++----- .../src/metadata/utils/column_helpers.py | 9 ++- ingestion/src/metadata/utils/helpers.py | 23 +++++- 6 files changed, 89 insertions(+), 25 deletions(-) diff --git a/ingestion/examples/workflows/glue.json b/ingestion/examples/workflows/glue.json index 0597acb07cf..259a5310e55 100644 --- a/ingestion/examples/workflows/glue.json +++ b/ingestion/examples/workflows/glue.json @@ -4,7 +4,8 @@ "config": { "aws_access_key_id": "aws_access_key_id", "aws_secret_access_key": "aws_secret_access_key", - "database": "database", + "db_service_name": "local_glue_db", + "pipeline_service_name": "local_glue_pipeline", "region_name": "region_name", "endpoint_url": "endpoint_url", "service_name": "local_glue" diff --git a/ingestion/setup.py b/ingestion/setup.py index b1d2b113b98..ae885a47980 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -126,7 +126,7 @@ plugins: Dict[str, Set[str]] = { build_options = {"includes": ["_cffi_backend"]} setup( name="openmetadata-ingestion", - version="0.4.1", + version="0.4.4", url="https://open-metadata.org/", author="OpenMetadata Committers", license="Apache License 2.0", diff --git a/ingestion/src/metadata/ingestion/sink/elasticsearch.py b/ingestion/src/metadata/ingestion/sink/elasticsearch.py index 08fa83f547f..86e612b76af 100644 --- a/ingestion/src/metadata/ingestion/sink/elasticsearch.py +++ b/ingestion/src/metadata/ingestion/sink/elasticsearch.py @@ -213,6 +213,9 @@ class ElasticsearchSink(Sink): if table.followers: for follower in table.followers.__root__: table_followers.append(str(follower.id.__root__)) + table_type = None + if hasattr(table.tableType, "name"): + table_type = table.tableType.name table_doc = TableESDocument( table_id=str(table.id.__root__), database=str(database_entity.name.__root__), @@ -222,7 +225,7 @@ class ElasticsearchSink(Sink): table_name=table.name.__root__, suggest=suggest, description=table.description, - table_type=table.tableType.name, + table_type=table_type, last_updated_timestamp=timestamp, column_names=column_names, column_descriptions=column_descriptions, diff --git a/ingestion/src/metadata/ingestion/source/glue.py b/ingestion/src/metadata/ingestion/source/glue.py index 058cb61da2d..3e936b36a1e 100644 --- a/ingestion/src/metadata/ingestion/source/glue.py +++ b/ingestion/src/metadata/ingestion/source/glue.py @@ -18,10 +18,12 @@ from metadata.ingestion.api.source import Source, SourceStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig -from metadata.ingestion.source.sample_data import get_pipeline_service_or_create from metadata.ingestion.source.sql_source import SQLSourceStatus from metadata.utils.column_helpers import check_column_complex_type -from metadata.utils.helpers import get_database_service_or_create +from metadata.utils.helpers import ( + get_database_service_or_create, + get_pipeline_service_or_create, +) logger: logging.Logger = logging.getLogger(__name__) @@ -32,9 +34,10 @@ class GlueSourceConfig(ConfigModel): aws_secret_access_key: str endpoint_url: str region_name: str - database: str - service_name: str + service_name: str = "" host_port: str = "" + db_service_name: str + pipeline_service_name: str filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() def get_service_type(self) -> DatabaseServiceType: @@ -50,11 +53,13 @@ class GlueSource(Source): self.config = config self.metadata_config = metadata_config self.metadata = OpenMetadata(metadata_config) - self.service = get_database_service_or_create(config, metadata_config) + self.service = get_database_service_or_create( + config, metadata_config, self.config.db_service_name + ) self.task_id_mapping = {} self.pipeline_service = get_pipeline_service_or_create( { - "name": self.config.service_name, + "name": self.config.pipeline_service_name, "serviceType": "Glue", "pipelineUrl": self.config.endpoint_url, }, @@ -67,6 +72,8 @@ class GlueSource(Source): region_name=self.config.region_name, endpoint_url=self.config.endpoint_url, ) + self.database_name = None + self.next_db_token = None @classmethod def create(cls, config_dict, metadata_config_dict, ctx): @@ -77,8 +84,27 @@ class GlueSource(Source): def prepare(self): pass + def assign_next_token_db(self, glue_db_resp): + if "NextToken" in glue_db_resp: + self.next_db_token = glue_db_resp["NextToken"] + else: + self.next_db_token = "break" + def next_record(self) -> Iterable[Record]: - yield from self.ingest_tables() + while True: + if self.next_db_token == "break": + break + elif self.next_db_token: + glue_db_resp = self.glue.get_databases( + NextToken=self.next_db_token, ResourceShareType="ALL" + ) + self.assign_next_token_db(glue_db_resp) + else: + glue_db_resp = self.glue.get_databases(ResourceShareType="ALL") + self.assign_next_token_db(glue_db_resp) + for db in glue_db_resp["DatabaseList"]: + self.database_name = db["Name"] + yield from self.ingest_tables() yield from self.ingest_pipelines() def get_columns(self, columnData): @@ -95,7 +121,7 @@ class GlueSource(Source): self.status, self.dataset_name, column["Type"].lower(), column["Name"] ) yield Column( - name=column["Name"], + name=column["Name"][:63], description="", dataType=col_type, dataTypeDisplay="{}({})".format(col_type, 1) @@ -104,14 +130,19 @@ class GlueSource(Source): ordinalPosition=row_order, children=children, arrayDataType=arr_data_type, + dataLength=1, ) row_order += 1 - def ingest_tables(self) -> Iterable[OMetaDatabaseAndTable]: + def ingest_tables(self, next_tables_token=None) -> Iterable[OMetaDatabaseAndTable]: try: - for tables in self.glue.get_tables(DatabaseName=self.config.database)[ - "TableList" - ]: + if next_tables_token is not None: + glue_resp = self.glue.get_tables( + DatabaseName=self.database_name, NextToken=next_tables_token + ) + else: + glue_resp = self.glue.get_tables(DatabaseName=self.database_name) + for tables in glue_resp["TableList"]: if not self.config.filter_pattern.included(tables["Name"]): self.status.filter( "{}.{}".format(self.config.get_service_name(), tables["Name"]), @@ -120,16 +151,16 @@ class GlueSource(Source): continue database_entity = Database( name=tables["DatabaseName"], - service=EntityReference( - id=self.service.id, type=self.config.service_type - ), + service=EntityReference(id=self.service.id, type="databaseService"), + ) + fqn = ( + f"{self.config.service_name}.{self.database_name}.{tables['Name']}" ) - fqn = f"{self.config.service_name}.{self.config.database}.{tables['Name']}" self.dataset_name = fqn table_columns = self.get_columns(tables["StorageDescriptor"]) table_entity = Table( id=uuid.uuid4(), - name=tables["Name"], + name=tables["Name"][:64], description=tables["Description"] if hasattr(tables, "Description") else "", @@ -140,6 +171,8 @@ class GlueSource(Source): table=table_entity, database=database_entity ) yield table_and_db + if "NextToken" in glue_resp: + yield from self.ingest_tables(glue_resp["NextToken"]) except Exception as err: logger.error(traceback.format_exc()) logger.error(traceback.print_exc()) @@ -155,14 +188,15 @@ class GlueSource(Source): list(self.task_id_mapping.values()).index( edges["DestinationId"] ) - ] + ][:63] ) return downstreamTasks def get_tasks(self, tasks): taskList = [] for task in tasks["Graph"]["Nodes"]: - self.task_id_mapping[task["Name"]] = task["UniqueId"] + task_name = task["Name"][:63] + self.task_id_mapping[task_name] = task["UniqueId"] for task in tasks["Graph"]["Nodes"]: taskList.append( Task( diff --git a/ingestion/src/metadata/utils/column_helpers.py b/ingestion/src/metadata/utils/column_helpers.py index 32cd3e36bb0..1bf00e70e9f 100644 --- a/ingestion/src/metadata/utils/column_helpers.py +++ b/ingestion/src/metadata/utils/column_helpers.py @@ -34,6 +34,7 @@ _column_type_mapping: Dict[Type[types.TypeEngine], str] = { types.JSON: "JSON", types.CHAR: "CHAR", types.DECIMAL: "DECIMAL", + types.Interval: "INTERVAL", } _column_string_mapping = { @@ -64,6 +65,12 @@ _column_string_mapping = { "TIMESTAMP WITHOUT TIME ZONE": "TIMESTAMP", "FLOAT64": "DOUBLE", "DECIMAL": "DECIMAL", + "DOUBLE": "DOUBLE", + "INTERVAL": "INTERVAL", + "SET": "SET", + "BINARY": "BINARY", + "SMALLINT": "SMALLINT", + "TINYINT": "TINYINT", } _known_unknown_column_types: Set[Type[types.TypeEngine]] = { @@ -123,7 +130,7 @@ def get_column_type(status: SourceStatus, dataset_name: str, column_type: Any) - type_class = "NULL" break for col_type in _column_string_mapping.keys(): - if str(column_type).split("(")[0].upper() in col_type: + if str(column_type).split("(")[0].split("<")[0].upper() in col_type: type_class = _column_string_mapping.get(col_type) break else: diff --git a/ingestion/src/metadata/utils/helpers.py b/ingestion/src/metadata/utils/helpers.py index 0b9fbf99c62..1d7ca9d55bd 100644 --- a/ingestion/src/metadata/utils/helpers.py +++ b/ingestion/src/metadata/utils/helpers.py @@ -24,9 +24,13 @@ from metadata.generated.schema.api.services.createDatabaseService import ( from metadata.generated.schema.api.services.createMessagingService import ( CreateMessagingServiceEntityRequest, ) +from metadata.generated.schema.api.services.createPipelineService import ( + CreatePipelineServiceEntityRequest, +) from metadata.generated.schema.entity.services.dashboardService import DashboardService from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.messagingService import MessagingService +from metadata.generated.schema.entity.services.pipelineService import PipelineService from metadata.ingestion.ometa.ometa_api import OpenMetadata @@ -47,10 +51,13 @@ def snake_to_camel(s): return "".join(a) -def get_database_service_or_create(config, metadata_config) -> DatabaseService: +def get_database_service_or_create( + config, metadata_config, service_name=None +) -> DatabaseService: metadata = OpenMetadata(metadata_config) + config.service_name = service_name if service_name else config.service_name service = metadata.get_by_name(entity=DatabaseService, fqdn=config.service_name) - if service is not None: + if service: return service else: service = { @@ -116,6 +123,18 @@ def get_dashboard_service_or_create( return created_service +def get_pipeline_service_or_create(service_json, metadata_config) -> PipelineService: + metadata = OpenMetadata(metadata_config) + service = metadata.get_by_name(entity=PipelineService, fqdn=service_json["name"]) + if service is not None: + return service + else: + created_service = metadata.create_or_update( + CreatePipelineServiceEntityRequest(**service_json) + ) + return created_service + + def convert_epoch_to_iso(seconds_since_epoch): dt = datetime.utcfromtimestamp(seconds_since_epoch) iso_format = dt.isoformat() + "Z"