diff --git a/wherehows-backend/app/models/daos/DatasetInfoDao.java b/wherehows-backend/app/models/daos/DatasetInfoDao.java index 152203d16b..7b8a1c1ad7 100644 --- a/wherehows-backend/app/models/daos/DatasetInfoDao.java +++ b/wherehows-backend/app/models/daos/DatasetInfoDao.java @@ -1238,6 +1238,7 @@ public class DatasetInfoDao { record.setSchemaType(rec.getOriginalSchema().getFormat()); record.setFields((String) StringUtil.objectToJsonString(rec.getFieldSchema())); record.setSource("API"); + record.setIsActive(true); Urn urnType = new Urn(urn); record.setDatasetType(urnType.datasetType); diff --git a/wherehows-backend/app/models/kafka/GobblinTrackingAuditProcessor.java b/wherehows-backend/app/models/kafka/GobblinTrackingAuditProcessor.java index 8ee8fb3cf6..797992840d 100644 --- a/wherehows-backend/app/models/kafka/GobblinTrackingAuditProcessor.java +++ b/wherehows-backend/app/models/kafka/GobblinTrackingAuditProcessor.java @@ -121,6 +121,7 @@ public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor { dataset.setSource("Hdfs"); dataset.setParentName(getParentName(datasetName)); dataset.setDatasetType("hdfs"); + dataset.setIsActive(true); dataset.setSourceModifiedTime(getsourceModifiedTime(metadata.get("modificationTime"))); Matcher matcher = LOCATION_PREFIX_PATTERN.matcher(datasetName); diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetInstanceRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetInstanceRecord.java index f861416329..3a549a80e5 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetInstanceRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetInstanceRecord.java @@ -27,7 +27,7 @@ public class DatasetInstanceRecord extends AbstractRecord { String dataCenter; String serverCluster; String slice; - Integer statusId; + Boolean isActive; String nativeName; String logicalName; String version; @@ -36,16 +36,15 @@ public class DatasetInstanceRecord extends AbstractRecord { String viewExpandedText; String abstractedDatasetUrn; - public DatasetInstanceRecord(String datasetUrn, String deploymentTier, String dataCenter, - String serverCluster, String slice, Integer statusId, String nativeName, - String logicalName, String version, Long instanceCreatedUnixtime, - String schema, String viewExpandedText, String abstractedDatasetUrn) { + public DatasetInstanceRecord(String datasetUrn, String deploymentTier, String dataCenter, String serverCluster, + String slice, Boolean isActive, String nativeName, String logicalName, String version, + Long instanceCreatedUnixtime, String schema, String viewExpandedText, String abstractedDatasetUrn) { this.datasetUrn = datasetUrn; this.deploymentTier = deploymentTier; this.dataCenter = dataCenter; this.serverCluster = serverCluster; this.slice = slice; - this.statusId = statusId; + this.isActive = isActive; this.nativeName = nativeName; this.logicalName = logicalName; this.version = version; @@ -63,7 +62,7 @@ public class DatasetInstanceRecord extends AbstractRecord { allFields.add(dataCenter); allFields.add(serverCluster); allFields.add(slice); - allFields.add(statusId); + allFields.add(isActive); allFields.add(nativeName); allFields.add(logicalName); allFields.add(version); @@ -73,6 +72,4 @@ public class DatasetInstanceRecord extends AbstractRecord { allFields.add(abstractedDatasetUrn); return allFields; } - - } diff --git a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetRecord.java b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetRecord.java index a0101990b8..9395802617 100644 --- a/wherehows-common/src/main/java/wherehows/common/schemas/DatasetRecord.java +++ b/wherehows-common/src/main/java/wherehows/common/schemas/DatasetRecord.java @@ -35,7 +35,7 @@ public class DatasetRecord extends AbstractRecord { String locationPrefix; String refDatasetUrn; Integer refDatasetId; - Integer statusId; + Boolean isActive; Character isPartitioned; String samplePartitionFullPath; Integer partitionLayoutPatternId; @@ -62,7 +62,7 @@ public class DatasetRecord extends AbstractRecord { allFields.add(parentName); allFields.add(storageType); allFields.add(refDatasetId); - allFields.add(statusId); + allFields.add(isActive); allFields.add(datasetType); allFields.add(hiveSerdesClass); allFields.add(isPartitioned); @@ -76,6 +76,7 @@ public class DatasetRecord extends AbstractRecord { allFields.add(null); return allFields; } + public List fillAllFieldNames() { List allFieldNames = new ArrayList<>(); allFieldNames.add("id"); @@ -90,7 +91,7 @@ public class DatasetRecord extends AbstractRecord { allFieldNames.add("parent_name"); allFieldNames.add("storage_type"); allFieldNames.add("ref_dataset_id"); - allFieldNames.add("status_id"); + allFieldNames.add("is_active"); allFieldNames.add("dataset_type"); allFieldNames.add("hive_serdes_class"); allFieldNames.add("is_partitioned"); @@ -111,9 +112,9 @@ public class DatasetRecord extends AbstractRecord { StringBuilder sb = new StringBuilder(); for (int i = 0; i < allFields.size(); i++) { Object o = allFields.get(i); - if (o != null) { + if (o != null) { String fieldName = allFieldNames.get(i); - sb.append("`"+fieldName+"`"); + sb.append("`" + fieldName + "`"); sb.append("="); sb.append(StringUtil.toDbString(o)); sb.append(","); @@ -121,10 +122,8 @@ public class DatasetRecord extends AbstractRecord { } sb.deleteCharAt(sb.length() - 1); return sb.toString(); - } - public Integer getId() { return id; } @@ -221,6 +220,14 @@ public class DatasetRecord extends AbstractRecord { this.refDatasetUrn = refDatasetUrn; } + public Boolean getIsActive() { + return isActive; + } + + public void setIsActive(Boolean isActive) { + this.isActive = isActive; + } + public Character getIsPartitioned() { return isPartitioned; } @@ -277,14 +284,6 @@ public class DatasetRecord extends AbstractRecord { this.refDatasetId = refDatasetId; } - public Integer getStatusId() { - return statusId; - } - - public void setStatusId(Integer statusId) { - this.statusId = statusId; - } - public Integer getPartitionLayoutPatternId() { return partitionLayoutPatternId; } diff --git a/wherehows-data-model/DDL/ETL_DDL/dataset_metadata.sql b/wherehows-data-model/DDL/ETL_DDL/dataset_metadata.sql index 875983cd49..0830670a30 100644 --- a/wherehows-data-model/DDL/ETL_DDL/dataset_metadata.sql +++ b/wherehows-data-model/DDL/ETL_DDL/dataset_metadata.sql @@ -30,7 +30,8 @@ CREATE TABLE `stg_dict_dataset` ( `storage_type` ENUM('Table', 'View', 'Avro', 'ORC', 'RC', 'Sequence', 'Flat File', 'JSON', 'XML', 'Thrift', 'Parquet', 'Protobuff') NULL, `ref_dataset_name` VARCHAR(200) NULL, `ref_dataset_id` INT(11) UNSIGNED NULL COMMENT 'Refer to Master/Main dataset for Views/ExternalTables', - `status_id` SMALLINT(6) UNSIGNED NULL COMMENT 'Reserve for dataset status', + `is_active` BOOLEAN NULL COMMENT 'is the dataset active / exist ?', + `is_deprecated` BOOLEAN NULL COMMENT 'is the dataset deprecated by user ?', `dataset_type` VARCHAR(30) NULL COMMENT 'hdfs, hive, kafka, teradata, mysql, sqlserver, file, nfs, pinot, salesforce, oracle, db2, netezza, cassandra, hbase, qfs, zfs', `hive_serdes_class` VARCHAR(300) NULL, @@ -69,8 +70,8 @@ CREATE TABLE `dict_dataset` ( `storage_type` ENUM('Table', 'View', 'Avro', 'ORC', 'RC', 'Sequence', 'Flat File', 'JSON', 'XML', 'Thrift', 'Parquet', 'Protobuff') NULL, `ref_dataset_id` INT(11) UNSIGNED NULL COMMENT 'Refer to Master/Main dataset for Views/ExternalTables', - `status_id` SMALLINT(6) UNSIGNED NULL - COMMENT 'Reserve for dataset status', + `is_active` BOOLEAN NULL COMMENT 'is the dataset active / exist ?', + `is_deprecated` BOOLEAN NULL COMMENT 'is the dataset deprecated by user ?', `dataset_type` VARCHAR(30) NULL COMMENT 'hdfs, hive, kafka, teradata, mysql, sqlserver, file, nfs, pinot, salesforce, oracle, db2, netezza, cassandra, hbase, qfs, zfs', `hive_serdes_class` VARCHAR(300) NULL, @@ -282,7 +283,8 @@ CREATE TABLE dict_dataset_instance ( data_center varchar(30) COMMENT 'data center code: lva1, ltx1, dc2, dc3...' NULL DEFAULT '*', server_cluster varchar(150) COMMENT 'sfo1-bigserver, jfk3-sqlserver03' NULL DEFAULT '*', slice varchar(50) COMMENT 'virtual group/tenant id/instance tag' NOT NULL DEFAULT '*', - status_id smallint(6) UNSIGNED COMMENT 'Reserve for dataset status' NULL, + is_active BOOLEAN NULL COMMENT 'is the dataset active / exist ?', + is_deprecated BOOLEAN NULL COMMENT 'is the dataset deprecated by user ?', native_name varchar(250) NOT NULL, logical_name varchar(250) NOT NULL, version varchar(30) COMMENT '1.2.3 or 0.3.131' NULL, @@ -324,7 +326,8 @@ CREATE TABLE stg_dict_dataset_instance ( data_center varchar(30) COMMENT 'data center code: lva1, ltx1, dc2, dc3...' NULL DEFAULT '*', server_cluster varchar(150) COMMENT 'sfo1-bigserver' NULL DEFAULT '*', slice varchar(50) COMMENT 'virtual group/tenant id/instance tag' NOT NULL DEFAULT '*', - status_id smallint(6) UNSIGNED COMMENT 'Reserve for dataset status' NULL, + is_active BOOLEAN NULL COMMENT 'is the dataset active / exist ?', + is_deprecated BOOLEAN NULL COMMENT 'is the dataset deprecated by user ?', native_name varchar(250) NOT NULL, logical_name varchar(250) NOT NULL, version varchar(30) COMMENT '1.2.3 or 0.3.131' NULL, diff --git a/wherehows-etl/src/main/resources/jython/HdfsLoad.py b/wherehows-etl/src/main/resources/jython/HdfsLoad.py index f5947fd6ef..8cb67e7c29 100644 --- a/wherehows-etl/src/main/resources/jython/HdfsLoad.py +++ b/wherehows-etl/src/main/resources/jython/HdfsLoad.py @@ -36,7 +36,8 @@ class HdfsLoad: (`name`, `schema`, properties, fields, urn, source, @dataset_type, @storage_type, sample_partition_full_path, source_created_time, source_modified_time) SET db_id = {db_id}, - wh_etl_exec_id = {wh_etl_exec_id}; + is_active = TRUE, + wh_etl_exec_id = {wh_etl_exec_id}; -- clear DELETE FROM stg_dict_dataset @@ -87,7 +88,7 @@ class HdfsLoad: data_center, server_cluster, slice, - status_id, + is_active, native_name, logical_name, `version`, @@ -98,13 +99,13 @@ class HdfsLoad: schema_text ) select s.urn, {db_id}, d.deployment_tier, d.data_center, d.cluster, - '*', 0, s.name, s.name, 0, s.source_created_time, s.created_time, + '*', s.is_active, s.name, s.name, 0, s.source_created_time, s.created_time, {wh_etl_exec_id}, s.urn, s.schema from stg_dict_dataset s JOIN cfg_database d on s.db_id = d.db_id where s.db_id = {db_id} on duplicate key update deployment_tier=d.deployment_tier, data_center=d.data_center, - server_cluster=d.cluster, native_name=s.name, logical_name=s.name, + server_cluster=d.cluster, is_active=s.is_active, native_name=s.name, logical_name=s.name, instance_created_time=s.source_created_time, created_time=s.created_time, wh_etl_exec_id={wh_etl_exec_id}, abstract_dataset_urn=s.urn, schema_text=s.schema; @@ -121,7 +122,7 @@ class HdfsLoad: parent_name, storage_type, ref_dataset_id, - status_id, + is_active, dataset_type, hive_serdes_class, is_partitioned, @@ -135,7 +136,7 @@ class HdfsLoad: select s.name, s.schema, s.schema_type, s.fields, s.properties, s.urn, s.source, s.location_prefix, s.parent_name, - s.storage_type, s.ref_dataset_id, s.status_id, + s.storage_type, s.ref_dataset_id, s.is_active, s.dataset_type, s.hive_serdes_class, s.is_partitioned, s.partition_layout_pattern_id, s.sample_partition_full_path, s.source_created_time, s.source_modified_time, UNIX_TIMESTAMP(now()), @@ -145,7 +146,7 @@ class HdfsLoad: on duplicate key update `name`=s.name, `schema`=s.schema, schema_type=s.schema_type, `fields`=s.fields, properties=s.properties, `source`=s.source, location_prefix=s.location_prefix, parent_name=s.parent_name, - storage_type=s.storage_type, ref_dataset_id=s.ref_dataset_id, status_id=s.status_id, + storage_type=s.storage_type, ref_dataset_id=s.ref_dataset_id, is_active=s.is_active, dataset_type=s.dataset_type, hive_serdes_class=s.hive_serdes_class, is_partitioned=s.is_partitioned, partition_layout_pattern_id=s.partition_layout_pattern_id, sample_partition_full_path=s.sample_partition_full_path, source_created_time=s.source_created_time, source_modified_time=s.source_modified_time, @@ -166,7 +167,7 @@ class HdfsLoad: data_center, server_cluster, slice, - status_id, + is_active, native_name, logical_name, version, @@ -178,7 +179,7 @@ class HdfsLoad: wh_etl_exec_id ) select s.dataset_id, s.db_id, s.deployment_tier, s.data_center, - s.server_cluster, s.slice, s.status_id, s.native_name, s.logical_name, s.version, + s.server_cluster, s.slice, s.is_active, s.native_name, s.logical_name, s.version, case when s.version regexp '[0-9]+\.[0-9]+\.[0-9]+' then cast(substring_index(s.version, '.', 1) as unsigned) * 100000000 + cast(substring_index(substring_index(s.version, '.', 2), '.', -1) as unsigned) * 10000 + @@ -190,7 +191,7 @@ class HdfsLoad: where s.db_id = {db_id} on duplicate key update deployment_tier=s.deployment_tier, data_center=s.data_center, server_cluster=s.server_cluster, slice=s.slice, - status_id=s.status_id, native_name=s.native_name, logical_name=s.logical_name, version=s.version, + is_active=s.is_active, native_name=s.native_name, logical_name=s.logical_name, version=s.version, schema_text=s.schema_text, ddl_text=s.ddl_text, instance_created_time=s.instance_created_time, created_time=s.created_time, wh_etl_exec_id=s.wh_etl_exec_id ; diff --git a/wherehows-etl/src/main/resources/jython/HiveLoad.py b/wherehows-etl/src/main/resources/jython/HiveLoad.py index cc00cad5fd..dc14a7e4a2 100644 --- a/wherehows-etl/src/main/resources/jython/HiveLoad.py +++ b/wherehows-etl/src/main/resources/jython/HiveLoad.py @@ -31,9 +31,10 @@ class HiveLoad: FIELDS TERMINATED BY '\Z' ESCAPED BY '\0' (`name`, `schema`, properties, fields, urn, source, dataset_type, storage_type, @sample_partition_full_path, source_created_time, @source_modified_time) SET db_id = {db_id}, - source_modified_time=nullif(@source_modified_time,''), - sample_partition_full_path=nullif(@sample_partition_full_path,''), - wh_etl_exec_id = {wh_etl_exec_id}; + source_modified_time=nullif(@source_modified_time,''), + sample_partition_full_path=nullif(@sample_partition_full_path,''), + is_active = TRUE, + wh_etl_exec_id = {wh_etl_exec_id}; -- SELECT COUNT(*) FROM stg_dict_dataset; -- clear @@ -64,7 +65,7 @@ class HiveLoad: parent_name, storage_type, ref_dataset_id, - status_id, + is_active, dataset_type, hive_serdes_class, is_partitioned, @@ -78,7 +79,7 @@ class HiveLoad: select s.name, s.schema, s.schema_type, s.fields, s.properties, s.urn, s.source, s.location_prefix, s.parent_name, - s.storage_type, s.ref_dataset_id, s.status_id, + s.storage_type, s.ref_dataset_id, s.is_active, s.dataset_type, s.hive_serdes_class, s.is_partitioned, s.partition_layout_pattern_id, s.sample_partition_full_path, s.source_created_time, s.source_modified_time, UNIX_TIMESTAMP(now()), @@ -88,11 +89,11 @@ class HiveLoad: on duplicate key update `name`=s.name, `schema`=s.schema, schema_type=s.schema_type, fields=s.fields, properties=s.properties, source=s.source, location_prefix=s.location_prefix, parent_name=s.parent_name, - storage_type=s.storage_type, ref_dataset_id=s.ref_dataset_id, status_id=s.status_id, - dataset_type=s.dataset_type, hive_serdes_class=s.hive_serdes_class, is_partitioned=s.is_partitioned, + storage_type=s.storage_type, ref_dataset_id=s.ref_dataset_id, is_active=s.is_active, + dataset_type=s.dataset_type, hive_serdes_class=s.hive_serdes_class, is_partitioned=s.is_partitioned, partition_layout_pattern_id=s.partition_layout_pattern_id, sample_partition_full_path=s.sample_partition_full_path, source_created_time=s.source_created_time, source_modified_time=s.source_modified_time, - modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id + modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id ; """.format(source_file=self.input_schema_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) @@ -275,11 +276,11 @@ class HiveLoad: INTO TABLE stg_dict_dataset_instance FIELDS TERMINATED BY '\x1a' ESCAPED BY '\0' (dataset_urn, deployment_tier, data_center, server_cluster, slice, - status_id, native_name, logical_name, version, instance_created_time, + is_active, native_name, logical_name, version, instance_created_time, schema_text, ddl_text, abstract_dataset_urn) - SET db_id = {db_id}, - created_time=unix_timestamp(now()), - wh_etl_exec_id = {wh_etl_exec_id}; + SET db_id = {db_id}, + created_time=unix_timestamp(now()), + wh_etl_exec_id = {wh_etl_exec_id}; -- update dataset_id update stg_dict_dataset_instance sdi, dict_dataset d @@ -294,7 +295,7 @@ class HiveLoad: data_center, server_cluster, slice, - status_id, + is_active, native_name, logical_name, version, @@ -306,7 +307,7 @@ class HiveLoad: wh_etl_exec_id ) select s.dataset_id, s.db_id, s.deployment_tier, c.data_center, c.cluster, - s.slice, s.status_id, s.native_name, s.logical_name, s.version, + s.slice, s.is_active, s.native_name, s.logical_name, s.version, case when s.version regexp '[0-9]+\.[0-9]+\.[0-9]+' then cast(substring_index(s.version, '.', 1) as unsigned) * 100000000 + cast(substring_index(substring_index(s.version, '.', 2), '.', -1) as unsigned) * 10000 + @@ -319,10 +320,10 @@ class HiveLoad: where s.db_id = {db_id} on duplicate key update deployment_tier=s.deployment_tier, data_center=s.data_center, server_cluster=s.server_cluster, slice=s.slice, - status_id=s.status_id, native_name=s.native_name, logical_name=s.logical_name, version=s.version, + is_active=s.is_active, native_name=s.native_name, logical_name=s.logical_name, version=s.version, schema_text=s.schema_text, ddl_text=s.ddl_text, - instance_created_time=s.instance_created_time, created_time=s.created_time, wh_etl_exec_id=s.wh_etl_exec_id - ; + instance_created_time=s.instance_created_time, created_time=s.created_time, wh_etl_exec_id=s.wh_etl_exec_id + ; """.format(source_file=self.input_instance_file, db_id=self.db_id, wh_etl_exec_id=self.wh_etl_exec_id) self.executeCommands(load_cmd) diff --git a/wherehows-etl/src/main/resources/jython/HiveTransform.py b/wherehows-etl/src/main/resources/jython/HiveTransform.py index 9d95e2d372..f7e04d9a68 100644 --- a/wherehows-etl/src/main/resources/jython/HiveTransform.py +++ b/wherehows-etl/src/main/resources/jython/HiveTransform.py @@ -188,7 +188,7 @@ class HiveTransform: '', '', '*', - 0, + True, table['native_name'], table['logical_name'], table['version'], diff --git a/wherehows-etl/src/main/resources/jython/OracleLoad.py b/wherehows-etl/src/main/resources/jython/OracleLoad.py index 6c31f835d1..253c690343 100644 --- a/wherehows-etl/src/main/resources/jython/OracleLoad.py +++ b/wherehows-etl/src/main/resources/jython/OracleLoad.py @@ -41,7 +41,7 @@ class OracleLoad: lock_wait_time = args[Constant.INNODB_LOCK_WAIT_TIMEOUT] self.conn_cursor.execute("SET innodb_lock_wait_timeout = %s;" % lock_wait_time) - temp_dir = FileUtil.etl_temp_dir(args, "ORACLE"); + temp_dir = FileUtil.etl_temp_dir(args, "ORACLE") self.input_table_file = os.path.join(temp_dir, args[Constant.ORA_SCHEMA_OUTPUT_KEY]) self.input_field_file = os.path.join(temp_dir, args[Constant.ORA_FIELD_OUTPUT_KEY]) self.input_sample_file = os.path.join(temp_dir, args[Constant.ORA_SAMPLE_OUTPUT_KEY]) @@ -62,7 +62,8 @@ class OracleLoad: (`name`, `schema`, `schema_type`, `properties`, `urn`, `source`, `location_prefix`, `parent_name`, `storage_type`, `dataset_type`, `is_partitioned`) SET db_id = {db_id}, - wh_etl_exec_id = {wh_etl_exec_id}; + wh_etl_exec_id = {wh_etl_exec_id}, + is_active = TRUE; -- insert into final table INSERT INTO dict_dataset @@ -77,7 +78,7 @@ class OracleLoad: parent_name, storage_type, ref_dataset_id, - status_id, + is_active, dataset_type, hive_serdes_class, is_partitioned, @@ -90,7 +91,7 @@ class OracleLoad: ) select s.name, s.schema, s.schema_type, s.fields, s.properties, s.urn, s.source, s.location_prefix, s.parent_name, - s.storage_type, s.ref_dataset_id, s.status_id, + s.storage_type, s.ref_dataset_id, s.is_active, s.dataset_type, s.hive_serdes_class, s.is_partitioned, s.partition_layout_pattern_id, s.sample_partition_full_path, s.source_created_time, s.source_modified_time, UNIX_TIMESTAMP(now()), @@ -100,7 +101,7 @@ class OracleLoad: on duplicate key update `name`=s.name, `schema`=s.schema, schema_type=s.schema_type, `fields`=s.fields, properties=s.properties, `source`=s.source, location_prefix=s.location_prefix, parent_name=s.parent_name, - storage_type=s.storage_type, ref_dataset_id=s.ref_dataset_id, status_id=s.status_id, + storage_type=s.storage_type, ref_dataset_id=s.ref_dataset_id, is_active=s.is_active, dataset_type=s.dataset_type, hive_serdes_class=s.hive_serdes_class, is_partitioned=s.is_partitioned, partition_layout_pattern_id=s.partition_layout_pattern_id, sample_partition_full_path=s.sample_partition_full_path, source_created_time=s.source_created_time, source_modified_time=s.source_modified_time, diff --git a/wherehows-etl/src/main/resources/jython/TeradataLoad.py b/wherehows-etl/src/main/resources/jython/TeradataLoad.py index dd0fdb37f7..a544b02f19 100644 --- a/wherehows-etl/src/main/resources/jython/TeradataLoad.py +++ b/wherehows-etl/src/main/resources/jython/TeradataLoad.py @@ -37,8 +37,9 @@ class TeradataLoad: FIELDS TERMINATED BY '\Z' ESCAPED BY '\0' (`name`, `schema`, properties, fields, urn, source, sample_partition_full_path, source_created_time, source_modified_time) SET db_id = {db_id}, - storage_type = 'Table', - wh_etl_exec_id = {wh_etl_exec_id}; + storage_type = 'Table', + is_active = TRUE, + wh_etl_exec_id = {wh_etl_exec_id}; -- SELECT COUNT(*) FROM stg_dict_dataset; -- clear @@ -67,7 +68,7 @@ class TeradataLoad: data_center, server_cluster, slice, - status_id, + is_active, native_name, logical_name, `version`, @@ -78,13 +79,13 @@ class TeradataLoad: schema_text ) select s.urn, {db_id}, d.deployment_tier, d.data_center, d.cluster, - '*', 0, s.name, s.name, 0, s.source_created_time, s.created_time, + '*', s.is_active, s.name, s.name, 0, s.source_created_time, s.created_time, {wh_etl_exec_id}, s.urn, s.schema from stg_dict_dataset s JOIN cfg_database d on s.db_id = d.db_id where s.db_id = {db_id} on duplicate key update deployment_tier=d.deployment_tier, data_center=d.data_center, - server_cluster=d.cluster, native_name=s.name, logical_name=s.name, + server_cluster=d.cluster, is_active=s.is_active, native_name=s.name, logical_name=s.name, instance_created_time=s.source_created_time, created_time=s.created_time, wh_etl_exec_id={wh_etl_exec_id}, abstract_dataset_urn=s.urn, schema_text=s.schema; @@ -101,7 +102,7 @@ class TeradataLoad: parent_name, storage_type, ref_dataset_id, - status_id, + is_active, dataset_type, hive_serdes_class, is_partitioned, @@ -115,7 +116,7 @@ class TeradataLoad: select s.name, s.schema, s.schema_type, s.fields, s.properties, s.urn, s.source, s.location_prefix, s.parent_name, - s.storage_type, s.ref_dataset_id, s.status_id, + s.storage_type, s.ref_dataset_id, s.is_active, s.dataset_type, s.hive_serdes_class, s.is_partitioned, s.partition_layout_pattern_id, s.sample_partition_full_path, s.source_created_time, s.source_modified_time, UNIX_TIMESTAMP(now()), @@ -125,11 +126,11 @@ class TeradataLoad: on duplicate key update `name`=s.name, `schema`=s.schema, schema_type=s.schema_type, fields=s.fields, properties=s.properties, source=s.source, location_prefix=s.location_prefix, parent_name=s.parent_name, - storage_type=s.storage_type, ref_dataset_id=s.ref_dataset_id, status_id=s.status_id, - dataset_type=s.dataset_type, hive_serdes_class=s.hive_serdes_class, is_partitioned=s.is_partitioned, + storage_type=s.storage_type, ref_dataset_id=s.ref_dataset_id, is_active=s.is_active, + dataset_type=s.dataset_type, hive_serdes_class=s.hive_serdes_class, is_partitioned=s.is_partitioned, partition_layout_pattern_id=s.partition_layout_pattern_id, sample_partition_full_path=s.sample_partition_full_path, source_created_time=s.source_created_time, source_modified_time=s.source_modified_time, - modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id + modified_time=UNIX_TIMESTAMP(now()), wh_etl_exec_id=s.wh_etl_exec_id ; analyze table dict_dataset; @@ -146,7 +147,7 @@ class TeradataLoad: data_center, server_cluster, slice, - status_id, + is_active, native_name, logical_name, version, @@ -158,7 +159,7 @@ class TeradataLoad: wh_etl_exec_id ) select s.dataset_id, s.db_id, s.deployment_tier, s.data_center, - s.server_cluster, s.slice, s.status_id, s.native_name, s.logical_name, s.version, + s.server_cluster, s.slice, s.is_active, s.native_name, s.logical_name, s.version, case when s.version regexp '[0-9]+\.[0-9]+\.[0-9]+' then cast(substring_index(s.version, '.', 1) as unsigned) * 100000000 + cast(substring_index(substring_index(s.version, '.', 2), '.', -1) as unsigned) * 10000 + @@ -170,7 +171,7 @@ class TeradataLoad: where s.db_id = {db_id} on duplicate key update deployment_tier=s.deployment_tier, data_center=s.data_center, server_cluster=s.server_cluster, slice=s.slice, - status_id=s.status_id, native_name=s.native_name, logical_name=s.logical_name, version=s.version, + is_active=s.is_active, native_name=s.native_name, logical_name=s.logical_name, version=s.version, schema_text=s.schema_text, ddl_text=s.ddl_text, instance_created_time=s.instance_created_time, created_time=s.created_time, wh_etl_exec_id=s.wh_etl_exec_id ;