added-named-parameter (#2680)

* added-named-parameter

* metadata-config-fix-and-connector-tested
This commit is contained in:
codingwithabhi 2022-02-11 21:23:46 +05:30 committed by GitHub
parent 6447e34bff
commit b51c0d92b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 68 additions and 50 deletions

View File

@ -46,7 +46,11 @@ class DeltaLakeSource(Source):
super().__init__(ctx) super().__init__(ctx)
self.config = config self.config = config
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.service = get_database_service_or_create(config, metadata_config) self.service = get_database_service_or_create(
config=config,
metadata_config=metadata_config,
service_name=config.service_name,
)
self.status = SQLSourceStatus() self.status = SQLSourceStatus()
# spark session needs to initiated outside the workflow and pass it through WorkflowContext # spark session needs to initiated outside the workflow and pass it through WorkflowContext
self.spark = ctx.spark self.spark = ctx.spark

View File

@ -46,7 +46,9 @@ class DynamodbSource(Source[Entity]):
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config) self.metadata = OpenMetadata(metadata_config)
self.service = get_database_service_or_create( self.service = get_database_service_or_create(
config, metadata_config, self.config.service_name config=config,
metadata_config=metadata_config,
service_name=self.config.service_name,
) )
self.dynamodb = AWSClient(self.config).get_resource("dynamodb") self.dynamodb = AWSClient(self.config).get_resource("dynamodb")

View File

@ -68,8 +68,11 @@ class GcsSource(Source[Entity]):
self.config = config self.config = config
self.status = SourceStatus() self.status = SourceStatus()
self.service = get_storage_service_or_create( self.service = get_storage_service_or_create(
{"name": self.config.service_name, "serviceType": StorageServiceType.GCS}, service_json={
metadata_config, "name": self.config.service_name,
"serviceType": StorageServiceType.GCS,
},
metadata_config=metadata_config,
) )
self.gcs = storage.Client() self.gcs = storage.Client()

View File

@ -62,7 +62,9 @@ class GlueSource(Source[Entity]):
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config) self.metadata = OpenMetadata(metadata_config)
self.service = get_database_service_or_create( self.service = get_database_service_or_create(
config, metadata_config, self.config.service_name config=config,
metadata_config=metadata_config,
service_name=self.config.service_name,
) )
self.storage_service = get_storage_service_or_create( self.storage_service = get_storage_service_or_create(
{"name": self.config.storage_service_name, "serviceType": "S3"}, {"name": self.config.storage_service_name, "serviceType": "S3"},

View File

@ -73,11 +73,11 @@ class KafkaSource(Source[CreateTopicRequest]):
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.status = KafkaSourceStatus() self.status = KafkaSourceStatus()
self.service = get_messaging_service_or_create( self.service = get_messaging_service_or_create(
config.service_name, service_name=config.service_name,
MessagingServiceType.Kafka.name, message_service_type=MessagingServiceType.Kafka.name,
config.schema_registry_url, schema_registry_url=config.schema_registry_url,
config.bootstrap_servers.split(","), brokers=config.bootstrap_servers.split(","),
metadata_config, metadata_config=metadata_config,
) )
self.schema_registry_client = SchemaRegistryClient( self.schema_registry_client = SchemaRegistryClient(
{"url": self.config.schema_registry_url} {"url": self.config.schema_registry_url}

View File

@ -85,12 +85,12 @@ class LookerSource(Source[Entity]):
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.client = self.looker_client() self.client = self.looker_client()
self.service = get_dashboard_service_or_create( self.service = get_dashboard_service_or_create(
config.service_name, service_name=config.service_name,
DashboardServiceType.Looker.name, dashboard_service_type=DashboardServiceType.Looker.name,
config.username, username=config.username,
config.password.get_secret_value(), password=config.password.get_secret_value(),
config.url, dashboard_url=config.url,
metadata_config, metadata_config=metadata_config,
) )
def check_env(self, env_key): def check_env(self, env_key):

View File

@ -68,12 +68,12 @@ class RedashSource(Source[Entity]):
self.status = RedashSourceStatus() self.status = RedashSourceStatus()
self.client = Redash(self.config.uri, self.config.api_key) self.client = Redash(self.config.uri, self.config.api_key)
self.service = get_dashboard_service_or_create( self.service = get_dashboard_service_or_create(
config.service_name, service_name=config.service_name,
DashboardServiceType.Redash.name, dashboard_service_type=DashboardServiceType.Redash.name,
config.username, username=config.username,
config.api_key, password=config.api_key,
config.uri, dashboard_url=config.uri,
metadata_config, metadata_config=metadata_config,
) )
self.dashboards_to_charts = {} self.dashboards_to_charts = {}

View File

@ -52,8 +52,11 @@ class S3Source(Source[Entity]):
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.status = SourceStatus() self.status = SourceStatus()
self.service = get_storage_service_or_create( self.service = get_storage_service_or_create(
{"name": self.config.service_name, "serviceType": StorageServiceType.S3}, service_json={
metadata_config, "name": self.config.service_name,
"serviceType": StorageServiceType.S3,
},
metadata_config=metadata_config,
) )
self.s3 = AWSClient(self.config).get_client("s3") self.s3 = AWSClient(self.config).get_client("s3")

View File

@ -72,7 +72,9 @@ class SalesforceSource(Source[OMetaDatabaseAndTable]):
): ):
super().__init__(ctx) super().__init__(ctx)
self.config = config self.config = config
self.service = get_database_service_or_create(config, metadata_config) self.service = get_database_service_or_create(
config=config, metadata_config=metadata_config
)
self.status = SalesforceSourceStatus() self.status = SalesforceSourceStatus()
self.sf = Salesforce( self.sf = Salesforce(
username=self.config.username, username=self.config.username,

View File

@ -178,8 +178,8 @@ class SampleDataSource(Source[Entity]):
open(self.config.sample_data_folder + "/locations/locations.json", "r") open(self.config.sample_data_folder + "/locations/locations.json", "r")
) )
self.storage_service = get_storage_service_or_create( self.storage_service = get_storage_service_or_create(
self.storage_service_json, service_json=self.storage_service_json,
metadata_config, metadata_config=metadata_config,
) )
self.glue_storage_service_json = json.load( self.glue_storage_service_json = json.load(
open(self.config.sample_data_folder + "/glue/storage_service.json", "r") open(self.config.sample_data_folder + "/glue/storage_service.json", "r")
@ -194,8 +194,8 @@ class SampleDataSource(Source[Entity]):
open(self.config.sample_data_folder + "/glue/tables.json", "r") open(self.config.sample_data_folder + "/glue/tables.json", "r")
) )
self.glue_database_service = get_database_service_or_create_v2( self.glue_database_service = get_database_service_or_create_v2(
self.glue_database_service_json, service_json=self.glue_database_service_json,
metadata_config, metadata_config=metadata_config,
) )
self.glue_storage_service = get_storage_service_or_create( self.glue_storage_service = get_storage_service_or_create(
self.glue_storage_service_json, self.glue_storage_service_json,
@ -211,7 +211,7 @@ class SampleDataSource(Source[Entity]):
open(self.config.sample_data_folder + "/datasets/tables.json", "r") open(self.config.sample_data_folder + "/datasets/tables.json", "r")
) )
self.database_service = get_database_service_or_create( self.database_service = get_database_service_or_create(
config, self.metadata_config config=config, metadata_config=self.metadata_config
) )
self.kafka_service_json = json.load( self.kafka_service_json = json.load(
open(self.config.sample_data_folder + "/topics/service.json", "r") open(self.config.sample_data_folder + "/topics/service.json", "r")
@ -220,11 +220,11 @@ class SampleDataSource(Source[Entity]):
open(self.config.sample_data_folder + "/topics/topics.json", "r") open(self.config.sample_data_folder + "/topics/topics.json", "r")
) )
self.kafka_service = get_messaging_service_or_create( self.kafka_service = get_messaging_service_or_create(
self.kafka_service_json.get("name"), service_name=self.kafka_service_json.get("name"),
self.kafka_service_json.get("serviceType"), message_service_type=self.kafka_service_json.get("serviceType"),
self.kafka_service_json.get("schemaRegistry"), schema_registry_url=self.kafka_service_json.get("schemaRegistry"),
self.kafka_service_json.get("brokers"), brokers=self.kafka_service_json.get("brokers"),
self.metadata_config, metadata_config=self.metadata_config,
) )
self.dashboard_service_json = json.load( self.dashboard_service_json = json.load(
open(self.config.sample_data_folder + "/dashboards/service.json", "r") open(self.config.sample_data_folder + "/dashboards/service.json", "r")
@ -236,12 +236,12 @@ class SampleDataSource(Source[Entity]):
open(self.config.sample_data_folder + "/dashboards/dashboards.json", "r") open(self.config.sample_data_folder + "/dashboards/dashboards.json", "r")
) )
self.dashboard_service = get_dashboard_service_or_create( self.dashboard_service = get_dashboard_service_or_create(
self.dashboard_service_json.get("name"), service_name=self.dashboard_service_json.get("name"),
self.dashboard_service_json.get("serviceType"), dashboard_service_type=self.dashboard_service_json.get("serviceType"),
self.dashboard_service_json.get("username"), username=self.dashboard_service_json.get("username"),
self.dashboard_service_json.get("password"), password=self.dashboard_service_json.get("password"),
self.dashboard_service_json.get("dashboardUrl"), dashboard_url=self.dashboard_service_json.get("dashboardUrl"),
metadata_config, metadata_config=metadata_config,
) )
self.pipeline_service_json = json.load( self.pipeline_service_json = json.load(
open(self.config.sample_data_folder + "/pipelines/service.json", "r") open(self.config.sample_data_folder + "/pipelines/service.json", "r")
@ -250,8 +250,8 @@ class SampleDataSource(Source[Entity]):
open(self.config.sample_data_folder + "/pipelines/pipelines.json", "r") open(self.config.sample_data_folder + "/pipelines/pipelines.json", "r")
) )
self.pipeline_service = get_pipeline_service_or_create( self.pipeline_service = get_pipeline_service_or_create(
self.pipeline_service_json, service_json=self.pipeline_service_json,
metadata_config, metadata_config=metadata_config,
) )
self.lineage = json.load( self.lineage = json.load(
open(self.config.sample_data_folder + "/lineage/lineage.json", "r") open(self.config.sample_data_folder + "/lineage/lineage.json", "r")

View File

@ -41,7 +41,9 @@ class SampleUsageSource(Source[TableQuery]):
self.query_log_csv = config.sample_data_folder + "/datasets/query_log" self.query_log_csv = config.sample_data_folder + "/datasets/query_log"
with open(self.query_log_csv, "r") as fin: with open(self.query_log_csv, "r") as fin:
self.query_logs = [dict(i) for i in csv.DictReader(fin)] self.query_logs = [dict(i) for i in csv.DictReader(fin)]
self.service = get_database_service_or_create(self.config, metadata_config) self.service = get_database_service_or_create(
config=self.config, metadata_config=metadata_config
)
@classmethod @classmethod
def create(cls, config_dict, metadata_config_dict, ctx): def create(cls, config_dict, metadata_config_dict, ctx):

View File

@ -176,12 +176,12 @@ class SupersetSource(Source[Entity]):
self.status = SourceStatus() self.status = SourceStatus()
self.client = SupersetAPIClient(self.config) self.client = SupersetAPIClient(self.config)
self.service = get_dashboard_service_or_create( self.service = get_dashboard_service_or_create(
config.service_name, service_name=config.service_name,
DashboardServiceType.Superset.name, dashboard_service_type=DashboardServiceType.Superset.name,
config.username, username=config.username,
config.password.get_secret_value(), password=config.password.get_secret_value(),
config.url, dashboard_url=config.url,
metadata_config, metadata_config=metadata_config,
) )
@classmethod @classmethod