diff --git a/ingestion/src/metadata/ingestion/source/deltalake.py b/ingestion/src/metadata/ingestion/source/deltalake.py index 3f1fe6ec4d8..1b201fade6e 100644 --- a/ingestion/src/metadata/ingestion/source/deltalake.py +++ b/ingestion/src/metadata/ingestion/source/deltalake.py @@ -46,7 +46,11 @@ class DeltaLakeSource(Source): super().__init__(ctx) self.config = config self.metadata_config = metadata_config - self.service = get_database_service_or_create(config, metadata_config) + self.service = get_database_service_or_create( + config=config, + metadata_config=metadata_config, + service_name=config.service_name, + ) self.status = SQLSourceStatus() # spark session needs to initiated outside the workflow and pass it through WorkflowContext self.spark = ctx.spark diff --git a/ingestion/src/metadata/ingestion/source/dynamodb.py b/ingestion/src/metadata/ingestion/source/dynamodb.py index fce57324d8e..50f3ff279bf 100644 --- a/ingestion/src/metadata/ingestion/source/dynamodb.py +++ b/ingestion/src/metadata/ingestion/source/dynamodb.py @@ -46,7 +46,9 @@ class DynamodbSource(Source[Entity]): self.metadata_config = metadata_config self.metadata = OpenMetadata(metadata_config) self.service = get_database_service_or_create( - config, metadata_config, self.config.service_name + config=config, + metadata_config=metadata_config, + service_name=self.config.service_name, ) self.dynamodb = AWSClient(self.config).get_resource("dynamodb") diff --git a/ingestion/src/metadata/ingestion/source/gcs.py b/ingestion/src/metadata/ingestion/source/gcs.py index c31eb846e30..eeef07eb880 100644 --- a/ingestion/src/metadata/ingestion/source/gcs.py +++ b/ingestion/src/metadata/ingestion/source/gcs.py @@ -68,8 +68,11 @@ class GcsSource(Source[Entity]): self.config = config self.status = SourceStatus() self.service = get_storage_service_or_create( - {"name": self.config.service_name, "serviceType": StorageServiceType.GCS}, - metadata_config, + service_json={ + "name": self.config.service_name, + "serviceType": StorageServiceType.GCS, + }, + metadata_config=metadata_config, ) self.gcs = storage.Client() diff --git a/ingestion/src/metadata/ingestion/source/glue.py b/ingestion/src/metadata/ingestion/source/glue.py index 0f4266f966d..c2ecdbf3930 100644 --- a/ingestion/src/metadata/ingestion/source/glue.py +++ b/ingestion/src/metadata/ingestion/source/glue.py @@ -62,7 +62,9 @@ class GlueSource(Source[Entity]): self.metadata_config = metadata_config self.metadata = OpenMetadata(metadata_config) self.service = get_database_service_or_create( - config, metadata_config, self.config.service_name + config=config, + metadata_config=metadata_config, + service_name=self.config.service_name, ) self.storage_service = get_storage_service_or_create( {"name": self.config.storage_service_name, "serviceType": "S3"}, diff --git a/ingestion/src/metadata/ingestion/source/kafka.py b/ingestion/src/metadata/ingestion/source/kafka.py index b5d0300688a..721879860be 100644 --- a/ingestion/src/metadata/ingestion/source/kafka.py +++ b/ingestion/src/metadata/ingestion/source/kafka.py @@ -73,11 +73,11 @@ class KafkaSource(Source[CreateTopicRequest]): self.metadata_config = metadata_config self.status = KafkaSourceStatus() self.service = get_messaging_service_or_create( - config.service_name, - MessagingServiceType.Kafka.name, - config.schema_registry_url, - config.bootstrap_servers.split(","), - metadata_config, + service_name=config.service_name, + message_service_type=MessagingServiceType.Kafka.name, + schema_registry_url=config.schema_registry_url, + brokers=config.bootstrap_servers.split(","), + metadata_config=metadata_config, ) self.schema_registry_client = SchemaRegistryClient( {"url": self.config.schema_registry_url} diff --git a/ingestion/src/metadata/ingestion/source/looker.py b/ingestion/src/metadata/ingestion/source/looker.py index 34a46f476fa..dd98bb5c5c3 100644 --- a/ingestion/src/metadata/ingestion/source/looker.py +++ b/ingestion/src/metadata/ingestion/source/looker.py @@ -85,12 +85,12 @@ class LookerSource(Source[Entity]): self.metadata_config = metadata_config self.client = self.looker_client() self.service = get_dashboard_service_or_create( - config.service_name, - DashboardServiceType.Looker.name, - config.username, - config.password.get_secret_value(), - config.url, - metadata_config, + service_name=config.service_name, + dashboard_service_type=DashboardServiceType.Looker.name, + username=config.username, + password=config.password.get_secret_value(), + dashboard_url=config.url, + metadata_config=metadata_config, ) def check_env(self, env_key): diff --git a/ingestion/src/metadata/ingestion/source/redash.py b/ingestion/src/metadata/ingestion/source/redash.py index fc21abd8dea..dfce1fb0794 100644 --- a/ingestion/src/metadata/ingestion/source/redash.py +++ b/ingestion/src/metadata/ingestion/source/redash.py @@ -68,12 +68,12 @@ class RedashSource(Source[Entity]): self.status = RedashSourceStatus() self.client = Redash(self.config.uri, self.config.api_key) self.service = get_dashboard_service_or_create( - config.service_name, - DashboardServiceType.Redash.name, - config.username, - config.api_key, - config.uri, - metadata_config, + service_name=config.service_name, + dashboard_service_type=DashboardServiceType.Redash.name, + username=config.username, + password=config.api_key, + dashboard_url=config.uri, + metadata_config=metadata_config, ) self.dashboards_to_charts = {} diff --git a/ingestion/src/metadata/ingestion/source/s3.py b/ingestion/src/metadata/ingestion/source/s3.py index 5338bfb18f3..92df37fdef5 100644 --- a/ingestion/src/metadata/ingestion/source/s3.py +++ b/ingestion/src/metadata/ingestion/source/s3.py @@ -52,8 +52,11 @@ class S3Source(Source[Entity]): self.metadata_config = metadata_config self.status = SourceStatus() self.service = get_storage_service_or_create( - {"name": self.config.service_name, "serviceType": StorageServiceType.S3}, - metadata_config, + service_json={ + "name": self.config.service_name, + "serviceType": StorageServiceType.S3, + }, + metadata_config=metadata_config, ) self.s3 = AWSClient(self.config).get_client("s3") diff --git a/ingestion/src/metadata/ingestion/source/salesforce.py b/ingestion/src/metadata/ingestion/source/salesforce.py index af785086894..c2c2833f6a4 100644 --- a/ingestion/src/metadata/ingestion/source/salesforce.py +++ b/ingestion/src/metadata/ingestion/source/salesforce.py @@ -72,7 +72,9 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]): ): super().__init__(ctx) self.config = config - self.service = get_database_service_or_create(config, metadata_config) + self.service = get_database_service_or_create( + config=config, metadata_config=metadata_config + ) self.status = SalesforceSourceStatus() self.sf = Salesforce( username=self.config.username, diff --git a/ingestion/src/metadata/ingestion/source/sample_data.py b/ingestion/src/metadata/ingestion/source/sample_data.py index 3fe9cd35630..a13609b0b10 100644 --- a/ingestion/src/metadata/ingestion/source/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/sample_data.py @@ -178,8 +178,8 @@ class SampleDataSource(Source[Entity]): open(self.config.sample_data_folder + "/locations/locations.json", "r") ) self.storage_service = get_storage_service_or_create( - self.storage_service_json, - metadata_config, + service_json=self.storage_service_json, + metadata_config=metadata_config, ) self.glue_storage_service_json = json.load( open(self.config.sample_data_folder + "/glue/storage_service.json", "r") @@ -194,8 +194,8 @@ class SampleDataSource(Source[Entity]): open(self.config.sample_data_folder + "/glue/tables.json", "r") ) self.glue_database_service = get_database_service_or_create_v2( - self.glue_database_service_json, - metadata_config, + service_json=self.glue_database_service_json, + metadata_config=metadata_config, ) self.glue_storage_service = get_storage_service_or_create( self.glue_storage_service_json, @@ -211,7 +211,7 @@ class SampleDataSource(Source[Entity]): open(self.config.sample_data_folder + "/datasets/tables.json", "r") ) self.database_service = get_database_service_or_create( - config, self.metadata_config + config=config, metadata_config=self.metadata_config ) self.kafka_service_json = json.load( open(self.config.sample_data_folder + "/topics/service.json", "r") @@ -220,11 +220,11 @@ class SampleDataSource(Source[Entity]): open(self.config.sample_data_folder + "/topics/topics.json", "r") ) self.kafka_service = get_messaging_service_or_create( - self.kafka_service_json.get("name"), - self.kafka_service_json.get("serviceType"), - self.kafka_service_json.get("schemaRegistry"), - self.kafka_service_json.get("brokers"), - self.metadata_config, + service_name=self.kafka_service_json.get("name"), + message_service_type=self.kafka_service_json.get("serviceType"), + schema_registry_url=self.kafka_service_json.get("schemaRegistry"), + brokers=self.kafka_service_json.get("brokers"), + metadata_config=self.metadata_config, ) self.dashboard_service_json = json.load( open(self.config.sample_data_folder + "/dashboards/service.json", "r") @@ -236,12 +236,12 @@ class SampleDataSource(Source[Entity]): open(self.config.sample_data_folder + "/dashboards/dashboards.json", "r") ) self.dashboard_service = get_dashboard_service_or_create( - self.dashboard_service_json.get("name"), - self.dashboard_service_json.get("serviceType"), - self.dashboard_service_json.get("username"), - self.dashboard_service_json.get("password"), - self.dashboard_service_json.get("dashboardUrl"), - metadata_config, + service_name=self.dashboard_service_json.get("name"), + dashboard_service_type=self.dashboard_service_json.get("serviceType"), + username=self.dashboard_service_json.get("username"), + password=self.dashboard_service_json.get("password"), + dashboard_url=self.dashboard_service_json.get("dashboardUrl"), + metadata_config=metadata_config, ) self.pipeline_service_json = json.load( open(self.config.sample_data_folder + "/pipelines/service.json", "r") @@ -250,8 +250,8 @@ class SampleDataSource(Source[Entity]): open(self.config.sample_data_folder + "/pipelines/pipelines.json", "r") ) self.pipeline_service = get_pipeline_service_or_create( - self.pipeline_service_json, - metadata_config, + service_json=self.pipeline_service_json, + metadata_config=metadata_config, ) self.lineage = json.load( open(self.config.sample_data_folder + "/lineage/lineage.json", "r") diff --git a/ingestion/src/metadata/ingestion/source/sample_usage.py b/ingestion/src/metadata/ingestion/source/sample_usage.py index e6ccd2ba1ab..a4669fce97c 100644 --- a/ingestion/src/metadata/ingestion/source/sample_usage.py +++ b/ingestion/src/metadata/ingestion/source/sample_usage.py @@ -41,7 +41,9 @@ class SampleUsageSource(Source[TableQuery]): self.query_log_csv = config.sample_data_folder + "/datasets/query_log" with open(self.query_log_csv, "r") as fin: self.query_logs = [dict(i) for i in csv.DictReader(fin)] - self.service = get_database_service_or_create(self.config, metadata_config) + self.service = get_database_service_or_create( + config=self.config, metadata_config=metadata_config + ) @classmethod def create(cls, config_dict, metadata_config_dict, ctx): diff --git a/ingestion/src/metadata/ingestion/source/superset.py b/ingestion/src/metadata/ingestion/source/superset.py index 42e1f9425ae..a59dc8b63a2 100644 --- a/ingestion/src/metadata/ingestion/source/superset.py +++ b/ingestion/src/metadata/ingestion/source/superset.py @@ -176,12 +176,12 @@ class SupersetSource(Source[Entity]): self.status = SourceStatus() self.client = SupersetAPIClient(self.config) self.service = get_dashboard_service_or_create( - config.service_name, - DashboardServiceType.Superset.name, - config.username, - config.password.get_secret_value(), - config.url, - metadata_config, + service_name=config.service_name, + dashboard_service_type=DashboardServiceType.Superset.name, + username=config.username, + password=config.password.get_secret_value(), + dashboard_url=config.url, + metadata_config=metadata_config, ) @classmethod