Merge pull request #447 from open-metadata/Issue-446

Fix #446: Add DataProfiler to ingestion and APIs
This commit is contained in:
Suresh Srinivas 2021-09-09 11:25:56 -07:00 committed by GitHub
commit 30c1e5a14c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
79 changed files with 690 additions and 99 deletions

View File

@ -31,11 +31,13 @@ import org.openmetadata.catalog.resources.databases.TableResource;
import org.openmetadata.catalog.resources.databases.TableResource.TableList;
import org.openmetadata.catalog.type.Column;
import org.openmetadata.catalog.type.ColumnJoin;
import org.openmetadata.catalog.type.ColumnProfile;
import org.openmetadata.catalog.type.DailyCount;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.JoinedWith;
import org.openmetadata.catalog.type.TableData;
import org.openmetadata.catalog.type.TableJoins;
import org.openmetadata.catalog.type.TableProfile;
import org.openmetadata.catalog.type.TagLabel;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.EntityUtil.Fields;
@ -273,6 +275,29 @@ public abstract class TableRepository {
JsonUtils.pojoToJson(tableData));
}
@Transaction
public void addTableProfileData(String tableId, TableProfile tableProfile) throws IOException {
// Validate the request content
Table table = EntityUtil.validate(tableId, tableDAO().findById(tableId), Table.class);
List<TableProfile> storedTableProfiles = getTableProfile(table);
Map<String, TableProfile> storedMapTableProfiles = new HashMap<>();
if (storedTableProfiles != null) {
for (TableProfile profile : storedTableProfiles) {
storedMapTableProfiles.put(profile.getProfileDate(), profile);
}
}
//validate all the columns
for (ColumnProfile columnProfile: tableProfile.getColumnProfile()) {
validateColumn(table, columnProfile.getName());
}
storedMapTableProfiles.put(tableProfile.getProfileDate(), tableProfile);
List<TableProfile> updatedProfiles = new ArrayList<>(storedMapTableProfiles.values());
entityExtensionDAO().insert(tableId, "table.tableProfile", "tableProfile",
JsonUtils.pojoToJson(updatedProfiles));
}
@Transaction
public void deleteFollower(String tableId, String userId) {
EntityUtil.validateUser(userDAO(), userId);
@ -436,6 +461,7 @@ public abstract class TableRepository {
table.setJoins(fields.contains("joins") ? getJoins(table) : null);
table.setSampleData(fields.contains("sampleData") ? getSampleData(table) : null);
table.setViewDefinition(fields.contains("viewDefinition") ? table.getViewDefinition() : null);
table.setTableProfile(fields.contains("tableProfile") ? getTableProfile(table): null);
return table;
}
@ -650,6 +676,11 @@ public abstract class TableRepository {
TableData.class);
}
private List<TableProfile> getTableProfile(Table table) throws IOException {
return JsonUtils.readObjects(entityExtensionDAO().getExtension(table.getId().toString(),
"table.tableProfile"),
TableProfile.class);
}
public interface TableDAO {
@SqlUpdate("INSERT INTO table_entity (json) VALUES (:json)")

View File

@ -35,6 +35,7 @@ import org.openmetadata.catalog.resources.Collection;
import org.openmetadata.catalog.security.CatalogAuthorizer;
import org.openmetadata.catalog.security.SecurityUtil;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.TableProfile;
import org.openmetadata.catalog.util.EntityUtil;
import org.openmetadata.catalog.util.EntityUtil.Fields;
import org.openmetadata.catalog.util.RestUtil;
@ -116,7 +117,7 @@ public class TableResource {
}
static final String FIELDS = "columns,tableConstraints,usageSummary,owner," +
"database,tags,followers,joins,sampleData,viewDefinition";
"database,tags,followers,joins,sampleData,viewDefinition,tableProfile";
public static final List<String> FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "")
.split(","));
@ -349,6 +350,21 @@ public class TableResource {
return addHref(uriInfo, table);
}
@PUT
@Path("/{id}/tableProfile")
@Operation(summary = "Add table profile data", tags = "tables",
description = "Add table profile data to the table.")
public Table addDataProfiler(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Id of the table", schema = @Schema(type = "string"))
@PathParam("id") String id, TableProfile tableProfile) throws IOException, ParseException {
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
Fields fields = new Fields(FIELD_LIST, "tableProfile");
dao.addTableProfileData(id, tableProfile);
Table table = dao.get(id, fields);
return addHref(uriInfo, table);
}
@DELETE
@Path("/{id}/followers/{userId}")
@Operation(summary = "Remove a follower", tags = "tables",

View File

@ -240,6 +240,80 @@
}
},
"additionalProperties": false
},
"columnProfile": {
"type": "object",
"javaType": "org.openmetadata.catalog.type.ColumnProfile",
"description": "This schema defines the type to capture the table's column profile.",
"properties": {
"name": {
"description": "Column Name.",
"type": "string"
},
"uniqueCount": {
"description": "No. of unique values in the column",
"type": "number"
},
"uniqueProportion": {
"description": "Proportion of number of unique values in a column",
"type": "number"
},
"nullCount": {
"description": "No.of null values in a column",
"type": "number"
},
"nullProportion": {
"description": "No.of null value proportion in columns",
"type": "number"
},
"min": {
"description": "Minimum value in a column.",
"type": "string"
},
"max": {
"description": "Maximum value in a column.",
"type": "string"
},
"mean": {
"description": "Avg value in a column.",
"type": "string"
},
"median": {
"description": "Median value in a column.",
"type": "string"
},
"stddev": {
"description": "Standard deviation of a column.",
"type": "number"
}
}
},
"tableProfile": {
"type": "object",
"javaType": "org.openmetadata.catalog.type.TableProfile",
"description": "This schema defines the type to capture the table's data profile.",
"properties": {
"profileDate": {
"description": "Data one which profile is taken.",
"$ref": "../../type/basic.json#/definitions/date"
},
"columnCount": {
"description": "No.of columns in the table.",
"type": "number"
},
"rowCount": {
"description": "No.of rows in the table.",
"type": "number"
},
"columnProfile": {
"description": "List of local column profiles of the table.",
"type": "array",
"items": {
"$ref": "#/definitions/columnProfile"
}
}
},
"additionalProperties": false
}
},
"properties": {
@ -321,6 +395,14 @@
"description": "Sample data for a table.",
"$ref": "#/definitions/tableData",
"default": null
},
"tableProfile": {
"description": "Data profile for a table.",
"type": "array",
"items": {
"$ref": "#/definitions/tableProfile"
},
"default": null
}
},
"required": [

View File

@ -44,12 +44,14 @@ import org.openmetadata.catalog.resources.teams.UserResourceTest;
import org.openmetadata.catalog.type.Column;
import org.openmetadata.catalog.type.ColumnDataType;
import org.openmetadata.catalog.type.ColumnJoin;
import org.openmetadata.catalog.type.ColumnProfile;
import org.openmetadata.catalog.type.EntityReference;
import org.openmetadata.catalog.type.JoinedWith;
import org.openmetadata.catalog.type.TableConstraint;
import org.openmetadata.catalog.type.TableConstraint.ConstraintType;
import org.openmetadata.catalog.type.TableData;
import org.openmetadata.catalog.type.TableJoins;
import org.openmetadata.catalog.type.TableProfile;
import org.openmetadata.catalog.type.TableType;
import org.openmetadata.catalog.type.TagLabel;
import org.openmetadata.catalog.util.EntityUtil;
@ -610,6 +612,55 @@ public class TableResourceTest extends CatalogApplicationTest {
"TableType View, SecureView or MaterializedView");
}
@Test
public void put_tableProfile_200(TestInfo test) throws HttpResponseException {
Table table = createAndCheckTable(create(test), adminAuthHeaders());
List<String> columns = Arrays.asList("c1", "c2", "c3");
ColumnProfile c1Profile = new ColumnProfile().withName("c1").withMax("100.0")
.withMin("10.0").withUniqueCount(100.0);
ColumnProfile c2Profile = new ColumnProfile().withName("c2").withMax("99.0").withMin("20.0").withUniqueCount(89.0);
ColumnProfile c3Profile = new ColumnProfile().withName("c3").withMax("75.0").withMin("25.0").withUniqueCount(77.0);
// Add column profiles
List<ColumnProfile> columnProfiles = List.of(c1Profile, c2Profile, c3Profile);
TableProfile tableProfile = new TableProfile().withRowCount(6.0).withColumnCount(3.0)
.withColumnProfile(columnProfiles).withProfileDate("2021-09-09");
putTableProfileData(table.getId(), tableProfile, adminAuthHeaders());
table = getTable(table.getId(), "tableProfile", adminAuthHeaders());
verifyTableProfileData(table.getTableProfile(), List.of(tableProfile));
// Add new date for TableProfile
TableProfile newTableProfile = new TableProfile().withRowCount(7.0).withColumnCount(3.0)
.withColumnProfile(columnProfiles).withProfileDate("2021-09-08");
putTableProfileData(table.getId(), newTableProfile, adminAuthHeaders());
table = getTable(table.getId(), "tableProfile", adminAuthHeaders());
verifyTableProfileData(table.getTableProfile(), List.of(newTableProfile, tableProfile));
// Replace table profile for a date
TableProfile newTableProfile1 = new TableProfile().withRowCount(21.0).withColumnCount(3.0)
.withColumnProfile(columnProfiles).withProfileDate("2021-09-08");
putTableProfileData(table.getId(), newTableProfile1, adminAuthHeaders());
table = getTable(table.getId(), "tableProfile", adminAuthHeaders());
verifyTableProfileData(table.getTableProfile(), List.of(newTableProfile1, tableProfile));
}
@Test
public void put_tableInvalidTableProfileData_4xx(TestInfo test) throws HttpResponseException {
Table table = createAndCheckTable(create(test), adminAuthHeaders());
ColumnProfile c1Profile = new ColumnProfile().withName("c1").withMax("100").withMin("10.0")
.withUniqueCount(100.0);
ColumnProfile c2Profile = new ColumnProfile().withName("c2").withMax("99.0").withMin("20.0").withUniqueCount(89.0);
ColumnProfile c3Profile = new ColumnProfile().withName("invalidColumn").withMax("75")
.withMin("25").withUniqueCount(77.0);
List<ColumnProfile> columnProfiles = List.of(c1Profile, c2Profile, c3Profile);
TableProfile tableProfile = new TableProfile().withRowCount(6.0).withColumnCount(3.0)
.withColumnProfile(columnProfiles).withProfileDate("2021-09-09");
HttpResponseException exception = assertThrows(HttpResponseException.class, ()
-> putTableProfileData(table.getId(), tableProfile, adminAuthHeaders()));
TestUtils.assertResponseContains(exception, BAD_REQUEST, "Invalid column name invalidColumn");
}
@Test
public void get_nonExistentTable_404_notFound() {
HttpResponseException exception = assertThrows(HttpResponseException.class, () ->
@ -1225,6 +1276,13 @@ public class TableResourceTest extends CatalogApplicationTest {
TestUtils.put(target, data, OK, authHeaders);
}
public static void putTableProfileData(UUID tableId, TableProfile data, Map<String, String> authHeaders)
throws HttpResponseException {
WebTarget target = CatalogApplicationTest.getResource("tables/" + tableId + "/tableProfile");
TestUtils.put(target, data, OK, authHeaders);
}
private void deleteTable(UUID id, Map<String, String> authHeaders) throws HttpResponseException {
TestUtils.delete(CatalogApplicationTest.getResource("tables/" + id), authHeaders);
@ -1355,4 +1413,17 @@ public class TableResourceTest extends CatalogApplicationTest {
public static String getTableName(TestInfo test, int index) {
return String.format("table%d_%s", index, test.getDisplayName());
}
private void verifyTableProfileData(List<TableProfile> actualProfiles, List<TableProfile> expectedProfiles) {
assertEquals(actualProfiles.size(), expectedProfiles.size());
Map<String, TableProfile> tableProfileMap = new HashMap<>();
for(TableProfile profile: actualProfiles) {
tableProfileMap.put(profile.getProfileDate(), profile);
}
for(TableProfile tableProfile: expectedProfiles) {
TableProfile storedProfile = tableProfileMap.get(tableProfile.getProfileDate());
assertNotNull(storedProfile);
assertEquals(tableProfile, storedProfile);
}
}
}

View File

@ -2,9 +2,9 @@
"source": {
"type": "redshift",
"config": {
"host_port": "cluster.name.region.redshift.amazonaws.com:5439",
"username": "username",
"password": "strong_password",
"host_port": "redshift-cluster-1.clot5cqn1cnb.us-west-2.redshift.amazonaws.com:5439",
"username": "awsuser",
"password": "focguC-kaqqe5-nepsok",
"database": "warehouse",
"service_name": "aws_redshift",
"filter_pattern": {

View File

@ -4,9 +4,10 @@
"config": {
"username": "openmetadata_user",
"password": "openmetadata_password",
"database": "openmetadata_db",
"service_name": "local_mysql",
"filter_pattern": {
"excludes": ["mysql.*", "information_schema.*"]
"excludes": ["mysql.*", "information_schema.*", "performance_schema.*", "sys.*"]
}
}
},

View File

@ -16,4 +16,5 @@ confluent_kafka>=1.5.0
fastavro>=1.2.0
google~=3.0.0
okta~=2.0.0
PyMySQL~=1.0.2
PyMySQL~=1.0.2
great-expectations>=0.13.31

View File

@ -58,12 +58,12 @@ base_requirements = {
"email-validator>=1.0.3",
"wheel~=0.36.2",
"python-jose==3.3.0",
"okta==1.7.0",
"pandas~=1.3.1",
"okta>=1.7.0",
"sqlalchemy>=1.3.24",
"sql-metadata~=2.0.0",
"spacy==3.0.5",
"requests~=2.25.1",
"great-expectations>=0.13.31",
"en_core_web_sm@https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.0.0/en_core_web_sm-3.0.0.tar.gz#egg=en_core_web"
}
base_plugins = {

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/personalDataTags.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations
@ -12,5 +12,5 @@ from pydantic import BaseModel, Field
class Model(BaseModel):
__root__: Any = Field(
...,
description='Tags related classifying **Personal data** as defined by **GDPR.**\n\n\n\n_Note to Legal - This tag category is provided as a starting point. Please review and update the tags based on your company policy. Also, add a reference to your GDPR policy document in this description._',
description='Tags related classifying **Personal data** as defined by **GDPR.**<br/><br/>_Note to Legal - This tag category is provided as a starting point. Please review and update the tags based on your company policy. Also, add a reference to your GDPR policy document in this description._',
)

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/piiTags.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations
@ -12,5 +12,5 @@ from pydantic import BaseModel, Field
class Model(BaseModel):
__root__: Any = Field(
...,
description='Personally Identifiable Information information that, when used alone or with other relevant data, can identify an individual.\n\n\n\n_Note to Legal - This tag category is provided as a starting point. Please review and update the tags based on your company policy. Also, add a reference to your PII policy document in this description._',
description='Personally Identifiable Information information that, when used alone or with other relevant data, can identify an individual.<br/><br/>_Note to Legal - This tag category is provided as a starting point. Please review and update the tags based on your company policy. Also, add a reference to your PII policy document in this description._',
)

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/tierTags.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/userTags.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/catalogVersion.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createChart.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createDashboard.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createDatabase.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createTable.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createTopic.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations
@ -38,8 +38,7 @@ class CreateTopic(BaseModel):
description='Topic clean up policy. For Kafka - `cleanup.policy` configuration.',
)
replicationFactor: Optional[int] = Field(
None,
description='Replication Factor in integer (more than 1).',
None, description='Replication Factor in integer (more than 1).'
)
retentionTime: Optional[float] = Field(
None,

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/feed/createThread.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createDashboardService.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createDatabaseService.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createMessagingService.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateDashboardService.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateDatabaseService.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateMessagingService.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/setOwner.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/tags/createTag.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/tags/createTagCategory.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/teams/createTeam.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/teams/createUser.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/bots.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/chart.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/dashboard.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/database.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/metrics.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/pipeline.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/report.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/table.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations
@ -124,6 +124,27 @@ class TableData(BaseModel):
)
class ColumnProfile(BaseModel):
name: Optional[str] = Field(None, description='Column Name.')
uniqueCount: Optional[float] = Field(
None, description='No. of unique values in the column'
)
uniqueProportion: Optional[float] = Field(
None, description='Proportion of number of unique values in a column'
)
nullCount: Optional[float] = Field(
None, description='No.of null values in a column'
)
nullProportion: Optional[float] = Field(
None, description='No.of null value proportion in columns'
)
min: Optional[str] = Field(None, description='Minimum value in a column.')
max: Optional[str] = Field(None, description='Maximum value in a column.')
mean: Optional[str] = Field(None, description='Avg value in a column.')
median: Optional[str] = Field(None, description='Median value in a column.')
stddev: Optional[float] = Field(None, description='Standard deviation of a column.')
class TableJoins(BaseModel):
class Config:
extra = Extra.forbid
@ -135,6 +156,22 @@ class TableJoins(BaseModel):
columnJoins: Optional[List[ColumnJoins]] = None
class TableProfile(BaseModel):
class Config:
extra = Extra.forbid
profileDate: Optional[basic.Date] = Field(
None, description='Data one which profile is taken.'
)
columnCount: Optional[float] = Field(
None, description='No.of columns in the table.'
)
rowCount: Optional[float] = Field(None, description='No.of rows in the table.')
columnProfile: Optional[List[ColumnProfile]] = Field(
None, description='List of local column profiles of the table.'
)
class Column(BaseModel):
name: ColumnName
columnDataType: ColumnDataType = Field(
@ -194,3 +231,6 @@ class Table(BaseModel):
sampleData: Optional[TableData] = Field(
None, description='Sample data for a table.'
)
tableProfile: Optional[List[TableProfile]] = Field(
None, description='Data profile for a table.'
)

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/topic.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations
@ -64,6 +64,9 @@ class Topic(BaseModel):
None,
description='Retention time in milliseconds. For Kafka - `retention.ms` configuration.',
)
replicationFactor: Optional[int] = Field(
None, description='Replication Factor in integer (more than 1).'
)
maximumMessageSize: Optional[int] = Field(
None,
description='Maximum message size in bytes. For Kafka - `max.message.bytes` configuration.',

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/feed/thread.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/dashboardService.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/databaseService.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/messagingService.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/tags/tagCategory.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/teams/team.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/teams/user.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/auditLog.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/basic.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/collectionDescriptor.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/dailyCount.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/entityReference.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/entityUsage.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/jdbcConnection.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -0,0 +1,23 @@
# generated by datamodel-codegen:
# filename: schema/type/paging.json
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations
from typing import Optional
from pydantic import BaseModel, Field
class Paging(BaseModel):
before: Optional[str] = Field(
None,
description='Before cursor used for getting the previous page (see API pagination for details).',
)
after: Optional[str] = Field(
None,
description='After cursor used for getting the next page (see API pagination for details).',
)
total: int = Field(
..., description='Total number of entries available to page through.'
)

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/profile.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/schedule.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/tagLabel.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/usageDetails.json
# timestamp: 2021-09-02T03:08:31+00:00
# timestamp: 2021-09-09T17:49:00+00:00
from __future__ import annotations

View File

@ -258,3 +258,50 @@ class Dashboard(BaseModel):
charts: List[str]
service: EntityReference
lastModified: int = None
class ValueFrequency(BaseModel):
"""Profiler ValueFrequency"""
value: str
frequency: int
class Histogram(BaseModel):
"""Histogram"""
boundaries: List[str]
heights: List[str]
class Quantile(BaseModel):
"""Quantile"""
quantile: str
value: str
class DatasetColumnProfile(BaseModel):
"""Dataset Column Profile stats """
fqdn: str
unique_count: int = None
unique_proportion: int = None
null_count: int = None
null_proportion: int = None
min: str = None
max: str = None
mean: str = None
median: str = None
stddev: str = None
quantiles: List[Quantile] = None
distinct_value_frequencies: List[ValueFrequency] = None
histogram: List[Histogram] = None
sample_values: List[str] = None
class DatasetProfile(BaseModel):
"""Dataset(table) stats"""
timestamp: int
table_name: str
row_count: int = None
col_count: int = None
col_profiles: List[DatasetColumnProfile] = None

View File

@ -28,7 +28,7 @@ from metadata.generated.schema.api.services.createMessagingService import Create
from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table, TableData, TableJoins
from metadata.generated.schema.entity.data.table import Table, TableData, TableJoins, TableProfile
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.databaseService import DatabaseService
@ -56,7 +56,7 @@ TableEntities = List[Table]
Tags = List[Tag]
Topics = List[Topic]
Dashboards = List[Dashboard]
TableProfiles = List[TableProfile]
class MetadataServerConfig(ConfigModel):
api_endpoint: str
@ -241,10 +241,15 @@ class OpenMetadataAPIClient(object):
else:
return [Table(**t) for t in resp['data']]
def ingest_sample_data(self, id, sample_data):
resp = self.client.put('/tables/{}/sampleData'.format(id.__root__), data=sample_data.json())
def ingest_sample_data(self, table_id, sample_data):
resp = self.client.put('/tables/{}/sampleData'.format(table_id.__root__), data=sample_data.json())
return TableData(**resp['sampleData'])
def ingest_table_profile_data(self, table_id, table_profile):
print(table_profile.json())
resp = self.client.put('/tables/{}/tableProfile'.format(table_id.__root__), data=table_profile.json())
return [TableProfile(**t) for t in resp['tableProfile']]
def get_table_by_id(self, table_id: str, fields: [] = ['columns']) -> Table:
"""Get Table By ID"""
params = {'fields': ",".join(fields)}

View File

@ -14,6 +14,7 @@
# limitations under the License.
import logging
from typing import List
from pydantic import ValidationError
@ -30,7 +31,7 @@ from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.models.table_metadata import Chart, Dashboard
from metadata.ingestion.ometa.client import APIError
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig, TableProfiles
logger = logging.getLogger(__name__)
@ -102,7 +103,10 @@ class MetadataRestSink(Sink):
created_table = self.client.create_or_update_table(table_request)
if table_and_db.table.sampleData is not None:
self.client.ingest_sample_data(id=created_table.id, sample_data=table_and_db.table.sampleData)
self.client.ingest_sample_data(table_id=created_table.id, sample_data=table_and_db.table.sampleData)
if table_and_db.table.tableProfile is not None:
self.client.ingest_table_profile_data(table_id=created_table.id,
table_profile=table_and_db.table.tableProfile)
logger.info(
'Successfully ingested table {}.{}'.

View File

@ -28,8 +28,9 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table, Column, ColumnConstraint, TableType, TableData
from sqlalchemy import create_engine, inspect
from metadata.generated.schema.entity.data.table import Table, Column, ColumnConstraint, TableType, TableData, \
TableProfile
from sqlalchemy import create_engine
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.sql import sqltypes as types
from sqlalchemy.inspection import inspect
@ -37,8 +38,10 @@ from sqlalchemy.inspection import inspect
from metadata.ingestion.api.common import IncludeFilterPattern, ConfigModel, Record
from metadata.ingestion.api.common import WorkflowContext
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.models.table_metadata import DatasetProfile
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
from metadata.utils.helpers import get_database_service_or_create
from metadata.profiler.dataprofiler import DataProfiler
logger: logging.Logger = logging.getLogger(__name__)
@ -72,6 +75,9 @@ class SQLConnectionConfig(ConfigModel):
include_views: Optional[bool] = True
include_tables: Optional[bool] = True
generate_sample_data: Optional[bool] = True
data_profiler_enabled: Optional[bool] = True
data_profiler_offset: Optional[int] = 0
data_profiler_limit: Optional[int] = 50000
filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
@abstractmethod
@ -152,8 +158,18 @@ def get_column_type(status: SQLSourceStatus, dataset_name: str, column_type: Any
return type_class
class SQLSource(Source):
def _get_table_description(schema: str, table: str, inspector: Inspector) -> str:
description = None
try:
table_info: dict = inspector.get_table_comment(table, schema)
except Exception as err:
logger.error(f"Table Description Error : {err}")
else:
description = table_info["text"]
return description
class SQLSource(Source):
def __init__(self, config: SQLConnectionConfig, metadata_config: MetadataServerConfig,
ctx: WorkflowContext):
super().__init__(ctx)
@ -162,8 +178,12 @@ class SQLSource(Source):
self.service = get_database_service_or_create(config, metadata_config)
self.status = SQLSourceStatus()
self.sql_config = self.config
self.engine = create_engine(self.sql_config.get_connection_url(), **self.sql_config.options)
self.connection_string = self.sql_config.get_connection_url()
self.engine = create_engine(self.connection_string, **self.sql_config.options)
self.connection = self.engine.connect()
if self.config.data_profiler_enabled:
self.data_profiler = DataProfiler(status=self.status,
connection_str=self.connection_string)
def prepare(self):
pass
@ -179,7 +199,7 @@ class SQLSource(Source):
def fetch_sample_data(self, schema: str, table: str):
try:
query = self.config.query.format(schema,table)
query = self.config.query.format(schema, table)
logger.info(query)
results = self.connection.execute(query)
cols = list(results.keys())
@ -211,11 +231,11 @@ class SQLSource(Source):
schema, table_name = self.standardize_schema_table_names(schema, table_name)
if not self.sql_config.filter_pattern.included(table_name):
self.status.filter('{}.{}'.format(self.config.get_service_name(), table_name),
"Table pattern not allowed")
"Table pattern not allowed")
continue
self.status.scanned('{}.{}'.format(self.config.get_service_name(), table_name))
description = self._get_table_description(schema, table_name, inspector)
description = _get_table_description(schema, table_name, inspector)
table_columns = self._get_columns(schema, table_name, inspector)
table_entity = Table(id=uuid.uuid4(),
@ -227,6 +247,10 @@ class SQLSource(Source):
table_data = self.fetch_sample_data(schema, table_name)
table_entity.sampleData = table_data
if self.config.data_profiler_enabled:
profile = self.run_data_profiler(table_name, schema)
table_entity.tableProfile = profile
table_and_db = OMetaDatabaseAndTable(table=table_entity, database=self._get_database(schema))
yield table_and_db
except ValidationError as err:
@ -241,7 +265,7 @@ class SQLSource(Source):
try:
if not self.sql_config.filter_pattern.included(view_name):
self.status.filter('{}.{}'.format(self.config.get_service_name(), view_name),
"View pattern not allowed")
"View pattern not allowed")
continue
try:
view_definition = inspector.get_view_definition(view_name, schema)
@ -249,7 +273,7 @@ class SQLSource(Source):
except NotImplementedError:
view_definition = ""
description = self._get_table_description(schema, view_name, inspector)
description = _get_table_description(schema, view_name, inspector)
table_columns = self._get_columns(schema, view_name, inspector)
table = Table(id=uuid.uuid4(),
name=view_name,
@ -314,15 +338,23 @@ class SQLSource(Source):
return table_columns
def _get_table_description(self, schema: str, table: str, inspector: Inspector) -> str:
description = None
try:
table_info: dict = inspector.get_table_comment(table, schema)
except Exception as err:
logger.error(f"Table Description Error : {err}")
else:
description = table_info["text"]
return description
def run_data_profiler(
self,
table: str,
schema: str
) -> TableProfile:
dataset_name = f"{schema}.{table}"
self.status.scanned(f"profile of {dataset_name}")
logger.info(f"Running Profiling for {dataset_name}. "
f"If you haven't configured offset and limit this process can take longer")
profile = self.data_profiler.run_profiler(
dataset_name=dataset_name,
schema=schema,
table=table,
limit=self.sql_config.data_profiler_limit,
offset=self.sql_config.data_profiler_offset)
logger.debug(f"Finished profiling {dataset_name}")
return profile
def close(self):
if self.connection is not None:

View File

@ -0,0 +1,206 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from datetime import datetime
from typing import Any, Iterable, Optional
from great_expectations.core.expectation_validation_result import (
ExpectationSuiteValidationResult,
ExpectationValidationResult,
)
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import (
DataContextConfig,
DatasourceConfig,
InMemoryStoreBackendDefaults,
)
from metadata.generated.schema.entity.data.table import TableProfile, ColumnProfile
from metadata.ingestion.api.source import SourceStatus
from metadata.ingestion.models.table_metadata import DatasetProfile, DatasetColumnProfile, ValueFrequency
from metadata.profiler.util import group_by
logger: logging.Logger = logging.getLogger(__name__)
class DataProfiler:
data_context: BaseDataContext
status: SourceStatus
datasource_name: str = "om_data_source"
def __init__(self, connection_str, status):
self.status = status
self.connection_str = connection_str
data_context_config = DataContextConfig(
datasources={
self.datasource_name: DatasourceConfig(
class_name="SqlAlchemyDatasource",
credentials={
"url": self.connection_str,
},
)
},
store_backend_defaults=InMemoryStoreBackendDefaults(),
anonymous_usage_statistics={
"enabled": False,
},
)
self.data_context = BaseDataContext(project_config=data_context_config)
def run_profiler(
self,
dataset_name: str,
schema: str = None,
table: str = None,
limit: int = None,
offset: int = None,
**kwargs: Any,
) -> TableProfile:
profile_test_results = self._profile_data_asset(
{
"schema": schema,
"table": table,
"limit": limit,
"offset": offset,
**kwargs,
}
)
profile = self._parse_test_results_to_table_profile(profile_test_results, dataset_name=dataset_name)
return profile
def _profile_data_asset(
self,
batch_kwargs: dict
) -> ExpectationSuiteValidationResult:
profile_results = self.data_context.profile_data_asset(
self.datasource_name,
batch_kwargs={
"datasource": self.datasource_name,
**batch_kwargs,
},
)
assert profile_results["success"]
assert len(profile_results["results"]) == 1
test_suite, test_results = profile_results["results"][0]
return test_results
@staticmethod
def _get_column_from_result(result: ExpectationValidationResult) -> Optional[str]:
return result.expectation_config.kwargs.get("column")
def _parse_test_results_to_table_profile(
self, profile_test_results: ExpectationSuiteValidationResult, dataset_name: str
) -> TableProfile:
profile = None
column_profiles = []
for col, col_test_result in group_by(
profile_test_results.results, key=self._get_column_from_result
):
if col is None:
profile = self._parse_table_test_results(col_test_result, dataset_name=dataset_name)
else:
column_profile = self._parse_column_test_results(col, col_test_result, dataset_name=dataset_name)
column_profiles.append(column_profile)
if profile is not None:
profile.columnProfile = column_profiles
return profile
def _parse_table_test_results(
self,
table_test_results: Iterable[ExpectationValidationResult],
dataset_name: str,
) -> TableProfile:
logger.info("generating table stats")
profile = TableProfile(profileDate=datetime.now(). strftime("%Y-%m-%d"))
for table_result in table_test_results:
expectation: str = table_result.expectation_config.expectation_type
result: dict = table_result.result
if expectation == "expect_table_row_count_to_be_between":
profile.rowCount = result['observed_value']
elif expectation == "expect_table_columns_to_match_ordered_list":
profile.columnCount = len(result["observed_value"])
else:
self.status.warning(
f"profile of {dataset_name}", f"unknown table mapper {expectation}"
)
return profile
def _parse_column_test_results(
self,
column: str,
col_test_results: Iterable[ExpectationValidationResult],
dataset_name: str,
) -> ColumnProfile:
logger.info(f"Generating Column Stats for {column}")
column_profile = ColumnProfile(name=column)
for col_result in col_test_results:
expectation: str = col_result.expectation_config.expectation_type
result: dict = col_result.result
if not result:
self.status.warning(
f"profile of {dataset_name}", f"{expectation} did not yield any results"
)
continue
if expectation == "expect_column_unique_value_count_to_be_between":
column_profile.uniqueCount = result["observed_value"]
elif expectation == "expect_column_proportion_of_unique_values_to_be_between":
column_profile.uniqueProportion = result["observed_value"]
elif expectation == "expect_column_values_to_not_be_null":
column_profile.nullCount = result["unexpected_count"]
if (
"unexpected_percent" in result
and result["unexpected_percent"] is not None
):
column_profile.nullProportion = result["unexpected_percent"] / 100
elif expectation == "expect_column_values_to_not_match_regex":
pass
elif expectation == "expect_column_mean_to_be_between":
column_profile.mean = str(result["observed_value"])
elif expectation == "expect_column_min_to_be_between":
column_profile.min = str(result["observed_value"])
elif expectation == "expect_column_max_to_be_between":
column_profile.max = str(result["observed_value"])
elif expectation == "expect_column_median_to_be_between":
column_profile.median = str(result["observed_value"])
elif expectation == "expect_column_stdev_to_be_between":
column_profile.stddev = str(result["observed_value"])
elif expectation == "expect_column_quantile_values_to_be_between":
pass
elif expectation == "expect_column_values_to_be_in_set":
#column_profile.sample_values = [
# str(v) for v in result["partial_unexpected_list"]
#]
pass
elif expectation == "expect_column_kl_divergence_to_be_less_than":
pass
elif expectation == "expect_column_distinct_values_to_be_in_set":
pass
elif expectation == "expect_column_values_to_be_in_type_list":
pass
elif expectation == "expect_column_values_to_be_unique":
pass
else:
self.status.warning(
f"profile of {dataset_name}",
f"warning: unknown column mapper {expectation} in col {column}",
)
return column_profile

View File

@ -0,0 +1,30 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
from typing import TypeVar, Iterable, Callable, Tuple
T = TypeVar("T")
K = TypeVar("K")
def group_by(
iterable: Iterable[T], key: Callable[[T], K]
) -> Iterable[Tuple[K, Iterable[T]]]:
values = collections.defaultdict(list)
for v in iterable:
values[key(v)].append(v)
return values.items()