Event logging, return Table from followers, joins PUT/DELETE operations

This commit is contained in:
Suresh Srinivas 2021-08-05 12:24:38 -07:00
parent 55d7cb41bf
commit 09eee3648b
3 changed files with 56 additions and 31 deletions

View File

@ -23,15 +23,15 @@ then
fi
base_dir=$(dirname $0)/..
CATALOG_HOME=$base_dir
OPENMETADATA_HOME=$base_dir
# OpenMetadata env script
. $CATALOG_HOME/conf/catalog-env.sh
. $OPENMETADATA_HOME/conf/openmetadata-env.sh
if [ "x$CATALOG_HEAP_OPTS" = "x" ]; then
export CATALOG_HEAP_OPTS="-Xmx1G -Xms1G"
if [ "x$OPENMETADATA_HEAP_OPTS" = "x" ]; then
export OPENMETADATA_HEAP_OPTS="-Xmx1G -Xms1G"
fi
EXTRA_ARGS="-name CatalogServer"
EXTRA_ARGS="-name OpenMetadataServer"
# create logs directory
if [ "x$LOG_DIR" = "x" ]; then
@ -79,7 +79,7 @@ fi
# Set Debug options if enabled
if [ "x$CATALOG_DEBUG" != "x" ]; then
if [ "x$OPENMETADATA_DEBUG" != "x" ]; then
# Use default ports
DEFAULT_JAVA_DEBUG_PORT="5005"
@ -95,24 +95,24 @@ if [ "x$CATALOG_DEBUG" != "x" ]; then
fi
echo "Enabling Java debug options: $JAVA_DEBUG_OPTS"
CATALOG_OPTS="$JAVA_DEBUG_OPTS $CATALOG_OPTS"
OPENMETADATA_OPTS="$JAVA_DEBUG_OPTS $OPENMETADATA_OPTS"
fi
# GC options
GC_LOG_FILE_NAME='catalog-gc.log'
if [ -z "$CATALOG_GC_LOG_OPTS" ]; then
GC_LOG_FILE_NAME='openmetadata-gc.log'
if [ -z "$OPENMETADATA_GC_LOG_OPTS" ]; then
JAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p')
if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
CATALOG_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=102400"
OPENMETADATA_GC_LOG_OPTS="-Xlog:gc*:file=$LOG_DIR/$GC_LOG_FILE_NAME:time,tags:filecount=10,filesize=102400"
else
CATALOG_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
OPENMETADATA_GC_LOG_OPTS="-Xloggc:$LOG_DIR/$GC_LOG_FILE_NAME -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
fi
fi
# JVM performance options
if [ -z "$CATALOG_JVM_PERFORMANCE_OPTS" ]; then
CATALOG_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
if [ -z "$OPENMETADATA_JVM_PERFORMANCE_OPTS" ]; then
OPENMETADATA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
fi
#Application classname
@ -120,7 +120,7 @@ APP_CLASS="org.openmetadata.catalog.CatalogApplication"
# Launch mode
if [ "x$DAEMON_MODE" = "xtrue" ]; then
nohup $JAVA $CATALOG_HEAP_OPTS $CATALOG_JVM_PERFORMANCE_OPTS -cp $CLASSPATH $CATALOG_OPTS "$APP_CLASS" "server" "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
nohup $JAVA $OPENMETADATA_HEAP_OPTS $OPENMETADATA_JVM_PERFORMANCE_OPTS -cp $CLASSPATH $OPENMETADATA_OPTS "$APP_CLASS" "server" "$@" > "$CONSOLE_OUTPUT_FILE" 2>&1 < /dev/null &
else
exec $JAVA $CATALOG_HEAP_OPTS $CATALOG_JVM_PERFORMANCE_OPTS -cp $CLASSPATH $CATALOG_OPTS "$APP_CLASS" "server" "$@"
exec $JAVA $OPENMETADATA_HEAP_OPTS $OPENMETADATA_JVM_PERFORMANCE_OPTS -cp $CLASSPATH $OPENMETADATA_OPTS "$APP_CLASS" "server" "$@"
fi

View File

@ -27,7 +27,9 @@ import org.openmetadata.catalog.CatalogApplicationConfig;
import org.openmetadata.catalog.ElasticSearchConfiguration;
import org.openmetadata.catalog.Entity;
import org.openmetadata.catalog.entity.data.Table;
import org.openmetadata.catalog.jdbi3.TableRepository;
import org.openmetadata.catalog.type.Column;
import org.openmetadata.catalog.type.EntityReference;
import org.skife.jdbi.v2.DBI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -44,6 +46,7 @@ import java.util.Set;
public class ElasticSearchEventHandler implements EventHandler {
private static final Logger LOG = LoggerFactory.getLogger(AuditEventHandler.class);
private RestHighLevelClient client;
private TableRepository tableRepository;
private final ActionListener<UpdateResponse> listener = new ActionListener<>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
@ -68,19 +71,23 @@ public class ElasticSearchEventHandler implements EventHandler {
try {
int responseCode = responseContext.getStatus();
String method = requestContext.getMethod();
LOG.info("request Context "+ requestContext.toString());
if (responseContext.getEntity() != null) {
Object entity = responseContext.getEntity();
if (entity.getClass().toString().toLowerCase().endsWith(Entity.TABLE.toLowerCase())) {
LOG.info("updating elastic search");
Table instance = (Table) entity;
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("description", instance.getDescription());
Set<String> tags = new HashSet<>();
List<String> columnDescriptions = new ArrayList<>();
instance.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
for(Column column: instance.getColumns()) {
column.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
columnDescriptions.add(column.getDescription());
if (instance.getTags() != null) {
instance.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
}
if (instance.getColumns() != null) {
for (Column column : instance.getColumns()) {
column.getTags().forEach(tag -> tags.add(tag.getTagFQN()));
columnDescriptions.add(column.getDescription());
}
}
if (!tags.isEmpty()) {
List<String> tagsList = new ArrayList<>();
@ -90,6 +97,17 @@ public class ElasticSearchEventHandler implements EventHandler {
if (!columnDescriptions.isEmpty()) {
jsonMap.put("column_descriptions", columnDescriptions);
}
if(instance.getOwner() != null) {
jsonMap.put("owner", instance.getOwner().getId().toString());
}
if (instance.getFollowers() != null) {
List<String> followers = new ArrayList<>();
for(EntityReference follower: instance.getFollowers()) {
followers.add(follower.getId().toString());
}
jsonMap.put("followers", followers);
}
jsonMap.put("last_updated_timestamp", System.currentTimeMillis());
UpdateRequest updateRequest = new UpdateRequest("table_search_index", instance.getId().toString());
updateRequest.doc(jsonMap);
client.updateAsync(updateRequest, RequestOptions.DEFAULT, listener);

View File

@ -318,10 +318,11 @@ public class TableResource {
@PathParam("id") String id,
@Parameter(description = "Id of the user to be added as follower",
schema = @Schema(type = "string"))
String userId) throws IOException {
String userId) throws IOException, ParseException {
Fields fields = new Fields(FIELD_LIST, "followers");
Status status = dao.addFollower(id, userId);
return Response.status(status).build();
Table table = dao.get(id, fields);
return Response.status(status).entity(table).build();
}
@PUT
@ -335,42 +336,48 @@ public class TableResource {
@ApiResponse(responseCode = "400", description = "Date range can only include past 30 days starting" +
" today")
})
public Response addJoins(@Context UriInfo uriInfo,
public Table addJoins(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Id of the table", schema = @Schema(type = "string"))
@PathParam("id") String id, TableJoins joins) throws IOException, ParseException {
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
Fields fields = new Fields(FIELD_LIST, "joins");
dao.addJoins(id, joins);
return Response.ok().build();
Table table = dao.get(id, fields);
return addHref(uriInfo, table);
}
@PUT
@Path("/{id}/sampleData")
@Operation(summary = "Add sample data", tags = "tables",
description = "Add sample data to the table." )
public Response addSampleData(@Context UriInfo uriInfo,
public Table addSampleData(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Id of the table", schema = @Schema(type = "string"))
@PathParam("id") String id, TableData tableData) throws IOException {
@PathParam("id") String id, TableData tableData) throws IOException, ParseException {
SecurityUtil.checkAdminOrBotRole(authorizer, securityContext);
Fields fields = new Fields(FIELD_LIST, "sampleData");
dao.addSampleData(id, tableData);
return Response.ok().build();
Table table = dao.get(id, fields);
return addHref(uriInfo, table);
}
@DELETE
@Path("/{id}/followers/{userId}")
@Operation(summary = "Remove a follower", tags = "tables",
description = "Remove the user identified `userId` as a follower of the table.")
public Response deleteFollower(@Context UriInfo uriInfo,
public Table deleteFollower(@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Id of the table",
schema = @Schema(type = "string"))
@PathParam("id") String id,
@Parameter(description = "Id of the user being removed as follower",
schema = @Schema(type = "string"))
@PathParam("userId") String userId) {
@PathParam("userId") String userId) throws IOException, ParseException {
Fields fields = new Fields(FIELD_LIST, "followers");
dao.deleteFollower(id, userId);
return Response.ok().build();
Table table = dao.get(id, fields);
return addHref(uriInfo, table);
}
public static Table validateNewTable(Table table) {