Fix #2403: Add support for deleted entities to be published into Slack Events (#2404)

* Fix #2403: Add support for deleted entities to be published into Slack Events

* Fix #2403: Add support for deleted entities to be published into Slack Events
This commit is contained in:
Sriharsha Chintalapani 2022-01-25 01:09:12 -08:00 committed by GitHub
parent 0e736012a9
commit b47f49e418
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 26 additions and 14 deletions

View File

@ -54,7 +54,7 @@ public final class AirflowUtils {
public static final String INGESTION_MARK_DELETED_TABLES = "mark_deleted_tables_as_deleted";
public static final String INGESTION_USAGE_DURATION = "duration";
public static final String INGESTION_OPTIONS = "options";
public static final String INGESTION_CONNECTION_ARGS = "connection_args";
public static final String INGESTION_CONNECTION_ARGS = "connect_args";
public static final String INGESTION_USAGE_STAGE_FILE_PATH = "filename";
public static final String INGESTION_STATUS = "status";
@ -71,10 +71,12 @@ public final class AirflowUtils {
dbConfig.put(INGESTION_PASSWORD, databaseConnection.getPassword());
dbConfig.put(INGESTION_DATABASE, databaseConnection.getDatabase());
dbConfig.put(INGESTION_SERVICE_NAME, databaseService.getName());
if (databaseConnection.getConnectionOptions() != null) {
if (databaseConnection.getConnectionOptions() != null
&& !databaseConnection.getConnectionOptions().getAdditionalProperties().isEmpty()) {
dbConfig.put(INGESTION_OPTIONS, databaseConnection.getConnectionOptions().getAdditionalProperties());
}
if (databaseConnection.getConnectionArguments() != null) {
if (databaseConnection.getConnectionArguments() != null
&& !databaseConnection.getConnectionArguments().getAdditionalProperties().isEmpty()) {
dbConfig.put(INGESTION_CONNECTION_ARGS, databaseConnection.getConnectionArguments().getAdditionalProperties());
}
String ingestionType = databaseService.getServiceType().value().toLowerCase(Locale.ROOT);
@ -85,8 +87,12 @@ public final class AirflowUtils {
dbConfig.put(INGESTION_ENABLE_DATA_PROFILER, databaseServiceMetadataPipeline.getEnableDataProfiler());
dbConfig.put(INGESTION_GENERATE_SAMPLE_DATA, databaseServiceMetadataPipeline.getGenerateSampleData());
dbConfig.put(INGESTION_INCLUDE_VIEWS, databaseServiceMetadataPipeline.getIncludeViews());
dbConfig.put(INGESTION_SCHEMA_FILTER_PATTERN, databaseServiceMetadataPipeline.getSchemaFilterPattern());
dbConfig.put(INGESTION_TABLE_FILTER_PATTERN, databaseServiceMetadataPipeline.getTableFilterPattern());
if (databaseServiceMetadataPipeline.getSchemaFilterPattern() != null) {
dbConfig.put(INGESTION_SCHEMA_FILTER_PATTERN, databaseServiceMetadataPipeline.getSchemaFilterPattern());
}
if (databaseServiceMetadataPipeline.getTableFilterPattern() != null) {
dbConfig.put(INGESTION_TABLE_FILTER_PATTERN, databaseServiceMetadataPipeline.getTableFilterPattern());
}
dbConfig.put(INGESTION_MARK_DELETED_TABLES, databaseServiceMetadataPipeline.getMarkDeletedTables());
dbConfig.put(INGESTION_DBT_CATALOG_FILE_PATH, databaseServiceMetadataPipeline.getDbtCatalogFilePath());
dbConfig.put(INGESTION_DBT_MANIFEST_FILE_PATH, databaseServiceMetadataPipeline.getDbtManifestFilePath());

View File

@ -97,7 +97,7 @@ public class SlackWebhookEventPublisher extends AbstractEventPublisher {
operation = "created";
} else if (event.getEventType().equals(EventType.ENTITY_UPDATED)) {
operation = "updated";
} else if (event.getEventType().equals(EventType.ENTITY_DELETED)) {
} else if (event.getEventType().equals(EventType.ENTITY_SOFT_DELETED)) {
operation = "deleted";
}
return String.format(headerTxt, event.getUserName(), operation, entityUrl);
@ -137,14 +137,17 @@ public class SlackWebhookEventPublisher extends AbstractEventPublisher {
ChangeDescription changeDescription = event.getChangeDescription();
if (changeDescription.getFieldsUpdated() != null && !changeDescription.getFieldsUpdated().isEmpty()) {
for (FieldChange fieldChange : changeDescription.getFieldsUpdated()) {
SlackAttachment attachment = new SlackAttachment();
attachment.setTitle("Updated " + fieldChange.getName());
if (fieldChange.getName().equals("owner")) {
attachment.setText(parseOwnership((String) fieldChange.getOldValue(), (String) fieldChange.getNewValue()));
} else {
attachment.setText((String) fieldChange.getNewValue());
// when the entity is deleted we will get deleted set as true. We do not need to parse this for slack messages.
if (!fieldChange.getName().equals("deleted")) {
SlackAttachment attachment = new SlackAttachment();
attachment.setTitle("Updated " + fieldChange.getName());
if (fieldChange.getName().equals("owner")) {
attachment.setText(parseOwnership((String) fieldChange.getOldValue(), (String) fieldChange.getNewValue()));
} else {
attachment.setText((String) fieldChange.getNewValue());
}
attachments.add(attachment);
}
attachments.add(attachment);
}
}
return attachments;

View File

@ -130,7 +130,7 @@ airflowConfiguration:
slackEventPublishers:
- name: "slack events"
webhookUrl: "slackIncomingWebhook"
webhookUrl: "slackIncomingWebhook URL"
openMetadataUrl: http://${SERVER_HOST:-localhost}:${SERVER_PORT:-8585}
filters:
- eventType: "entityCreated"
@ -139,6 +139,9 @@ slackEventPublishers:
- eventType: "entityUpdated"
entities:
- "*"
- eventType: "entitySoftDeleted"
entities:
- "*"
- eventType: "entityDeleted"
entities:
- "*"