mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-06 05:43:44 +00:00
feat(telemetry): add server side telemetry (#4925)
Co-authored-by: Kevin Hu <kevinhuwest@gmail.com>
This commit is contained in:
parent
259b3453a0
commit
c3cfaf8e3c
@ -99,6 +99,7 @@ project.ext.externalDependency = [
|
||||
'lombok': 'org.projectlombok:lombok:1.18.12',
|
||||
'mariadbConnector': 'org.mariadb.jdbc:mariadb-java-client:2.6.0',
|
||||
'mavenArtifact': "org.apache.maven:maven-artifact:$mavenVersion",
|
||||
'mixpanel': 'com.mixpanel:mixpanel-java:1.4.4',
|
||||
'mockito': 'org.mockito:mockito-core:3.0.0',
|
||||
'mockitoInline': 'org.mockito:mockito-inline:3.0.0',
|
||||
'mockServer': 'org.mock-server:mockserver-netty:5.11.2',
|
||||
|
||||
@ -26,6 +26,9 @@ services:
|
||||
|
||||
datahub-gms:
|
||||
env_file: datahub-gms/env/docker-without-neo4j.env
|
||||
environment:
|
||||
- DATAHUB_TELEMETRY_ENABLED=${DATAHUB_TELEMETRY_ENABLED}
|
||||
- DATAHUB_FOLDER_PATH=/etc/datahub
|
||||
depends_on:
|
||||
- mysql
|
||||
volumes:
|
||||
|
||||
@ -39,6 +39,8 @@ services:
|
||||
APP_ENV: dev
|
||||
environment:
|
||||
- SKIP_ELASTICSEARCH_CHECK=false
|
||||
- DATAHUB_TELEMETRY_ENABLED=${DATAHUB_TELEMETRY_ENABLED}
|
||||
- DATAHUB_FOLDER_PATH=/etc/datahub
|
||||
volumes:
|
||||
- ./datahub-gms/start.sh:/datahub/datahub-gms/scripts/start.sh
|
||||
- ./datahub-gms/jetty.xml:/datahub/datahub-gms/scripts/jetty.xml
|
||||
|
||||
@ -29,6 +29,9 @@ services:
|
||||
env_file: datahub-gms/env/docker.env
|
||||
depends_on:
|
||||
- mysql
|
||||
environment:
|
||||
- DATAHUB_TELEMETRY_ENABLED=${DATAHUB_TELEMETRY_ENABLED}
|
||||
- DATAHUB_FOLDER_PATH=/etc/datahub
|
||||
volumes:
|
||||
- ${HOME}/.datahub/plugins/:/etc/datahub/plugins
|
||||
|
||||
|
||||
@ -78,6 +78,8 @@ services:
|
||||
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
|
||||
- MAE_CONSUMER_ENABLED=true
|
||||
- MCE_CONSUMER_ENABLED=true
|
||||
- DATAHUB_TELEMETRY_ENABLED=${DATAHUB_TELEMETRY_ENABLED}
|
||||
- DATAHUB_FOLDER_PATH=/etc/datahub
|
||||
- PE_CONSUMER_ENABLED=true
|
||||
hostname: datahub-gms
|
||||
image: linkedin/datahub-gms:${DATAHUB_VERSION:-head}
|
||||
|
||||
@ -63,6 +63,8 @@ services:
|
||||
depends_on:
|
||||
- mysql
|
||||
environment:
|
||||
- DATAHUB_TELEMETRY_ENABLED=${DATAHUB_TELEMETRY_ENABLED}
|
||||
- DATAHUB_FOLDER_PATH=/etc/datahub
|
||||
- DATASET_ENABLE_SCSI=false
|
||||
- EBEAN_DATASOURCE_USERNAME=datahub
|
||||
- EBEAN_DATASOURCE_PASSWORD=datahub
|
||||
|
||||
@ -65,6 +65,8 @@ services:
|
||||
depends_on:
|
||||
- mysql
|
||||
environment:
|
||||
- DATAHUB_TELEMETRY_ENABLED=${DATAHUB_TELEMETRY_ENABLED}
|
||||
- DATAHUB_FOLDER_PATH=/etc/datahub
|
||||
- DATASET_ENABLE_SCSI=false
|
||||
- EBEAN_DATASOURCE_USERNAME=datahub
|
||||
- EBEAN_DATASOURCE_PASSWORD=datahub
|
||||
|
||||
@ -289,6 +289,7 @@ module.exports = {
|
||||
"docs/deploy/aws",
|
||||
"docs/deploy/gcp",
|
||||
"docs/deploy/confluent-cloud",
|
||||
"docs/deploy/telemetry",
|
||||
// Purposely not including the following:
|
||||
// - "docker/datahub-frontend/README",
|
||||
// - "docker/datahub-gms/README",
|
||||
|
||||
@ -45,7 +45,7 @@ On GMS start up, retention policies are initialized with:
|
||||
1. First, the default provided **version-based** retention to keep **20 latest aspects** for all entity-aspect pairs.
|
||||
2. Second, we read YAML files from the `/etc/datahub/plugins/retention` directory and overlay them on the default set of policies we provide.
|
||||
|
||||
For docker, we set docker-compose to mount `${HOME}/.datahub/plugins` directory to `/etc/datahub/plugins` directory
|
||||
For docker, we set docker-compose to mount `${HOME}/.datahub` directory to `/etc/datahub` directory
|
||||
within the containers, so you can customize the initial set of retention policies by creating
|
||||
a `${HOME}/.datahub/plugins/retention/retention.yaml` file.
|
||||
|
||||
|
||||
10
docs/deploy/telemetry.md
Normal file
10
docs/deploy/telemetry.md
Normal file
@ -0,0 +1,10 @@
|
||||
# DataHub Telemetry
|
||||
|
||||
## Overview of DataHub Telemetry
|
||||
|
||||
To effectively build and maintain the DataHub Project, we must understand how end-users work within DataHub. Beginning in version X.X.X, DataHub collects anonymous usage statistics and errors to inform our roadmap priorities and to enable us to proactively address errors.
|
||||
|
||||
Deployments are assigned a UUID which is sent along with event details, Java version, OS, and timestamp; telemetry collection is disabled by default and can be disabled by setting `DATAHUB_TELEMETRY_ENABLED=false` in your Docker Compose config.
|
||||
|
||||
|
||||
The source code is available [here.](../../metadata-service/factories/src/main/java/com/linkedin/gms/factory/telemetry/TelemetryUtils.java)
|
||||
@ -36,6 +36,7 @@ dependencies {
|
||||
compile externalDependency.springContext
|
||||
compile externalDependency.swaggerAnnotations
|
||||
swaggerCodegen 'io.swagger.codegen.v3:swagger-codegen-cli:3.0.33'
|
||||
compile externalDependency.mixpanel
|
||||
|
||||
annotationProcessor externalDependency.lombok
|
||||
|
||||
|
||||
@ -283,7 +283,7 @@ public abstract class EntityService {
|
||||
|
||||
|
||||
@Nonnull
|
||||
private UpdateAspectResult wrappedIngestAspectToLocalDB(@Nonnull final Urn urn, @Nonnull final String aspectName,
|
||||
protected UpdateAspectResult wrappedIngestAspectToLocalDB(@Nonnull final Urn urn, @Nonnull final String aspectName,
|
||||
@Nonnull final Function<Optional<RecordTemplate>, RecordTemplate> updateLambda,
|
||||
@Nonnull final AuditStamp auditStamp, @Nonnull final SystemMetadata systemMetadata) {
|
||||
validateUrn(urn);
|
||||
@ -337,7 +337,7 @@ public abstract class EntityService {
|
||||
@Nonnull final AuditStamp auditStamp, @Nonnull final SystemMetadata providedSystemMetadata);
|
||||
|
||||
@Nonnull
|
||||
private SystemMetadata generateSystemMetadataIfEmpty(SystemMetadata systemMetadata) {
|
||||
protected SystemMetadata generateSystemMetadataIfEmpty(SystemMetadata systemMetadata) {
|
||||
if (systemMetadata == null) {
|
||||
systemMetadata = new SystemMetadata();
|
||||
systemMetadata.setRunId(DEFAULT_RUN_ID);
|
||||
@ -380,7 +380,7 @@ public abstract class EntityService {
|
||||
* @return the {@link RecordTemplate} representation of the written aspect object
|
||||
*/
|
||||
public RecordTemplate ingestAspect(@Nonnull final Urn urn, @Nonnull final String aspectName,
|
||||
@Nonnull final RecordTemplate newValue, @Nonnull final AuditStamp auditStamp, SystemMetadata systemMetadata) {
|
||||
@Nonnull final RecordTemplate newValue, @Nonnull final AuditStamp auditStamp, @Nonnull SystemMetadata systemMetadata) {
|
||||
|
||||
log.debug("Invoked ingestAspect with urn: {}, aspectName: {}, newValue: {}", urn, aspectName, newValue);
|
||||
|
||||
@ -393,7 +393,26 @@ public abstract class EntityService {
|
||||
return sendEventForUpdateAspectResult(urn, aspectName, result);
|
||||
}
|
||||
|
||||
private RecordTemplate sendEventForUpdateAspectResult(@Nonnull final Urn urn, @Nonnull final String aspectName,
|
||||
/**
|
||||
* Ingests (inserts) a new version of an entity aspect & emits a {@link com.linkedin.mxe.MetadataAuditEvent}.
|
||||
*
|
||||
* This method runs a read -> write atomically in a single transaction, this is to prevent multiple IDs from being created.
|
||||
*
|
||||
* Note that in general, this should not be used externally. It is currently serving upgrade scripts and
|
||||
* is as such public.
|
||||
*
|
||||
* @param urn an urn associated with the new aspect
|
||||
* @param aspectName name of the aspect being inserted
|
||||
* @param newValue value of the aspect being inserted
|
||||
* @param auditStamp an {@link AuditStamp} containing metadata about the writer & current time
|
||||
* @param systemMetadata
|
||||
* @return the {@link RecordTemplate} representation of the written aspect object
|
||||
*/
|
||||
@Nullable
|
||||
public abstract RecordTemplate ingestAspectIfNotPresent(@Nonnull final Urn urn, @Nonnull final String aspectName,
|
||||
@Nonnull final RecordTemplate newValue, @Nonnull final AuditStamp auditStamp, @Nonnull SystemMetadata systemMetadata);
|
||||
|
||||
protected RecordTemplate sendEventForUpdateAspectResult(@Nonnull final Urn urn, @Nonnull final String aspectName,
|
||||
@Nonnull UpdateAspectResult result) {
|
||||
|
||||
final RecordTemplate oldValue = result.getOldValue();
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package com.linkedin.metadata.entity.cassandra;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.datahub.util.RecordUtils;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
@ -35,6 +36,7 @@ import com.linkedin.metadata.utils.AuditStampUtils;
|
||||
import com.linkedin.metadata.utils.EntityKeyUtils;
|
||||
import com.linkedin.metadata.utils.PegasusUtils;
|
||||
import com.linkedin.metadata.utils.SystemMetadataUtils;
|
||||
import com.linkedin.metadata.utils.metrics.MetricUtils;
|
||||
import com.linkedin.mxe.MetadataAuditOperation;
|
||||
import com.linkedin.mxe.SystemMetadata;
|
||||
import com.linkedin.util.Pair;
|
||||
@ -331,6 +333,33 @@ public class CassandraEntityService extends EntityService {
|
||||
}, DEFAULT_MAX_TRANSACTION_RETRY);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public RecordTemplate ingestAspectIfNotPresent(@Nonnull Urn urn, @Nonnull String aspectName,
|
||||
@Nonnull RecordTemplate newValue, @Nonnull AuditStamp auditStamp, @Nonnull SystemMetadata systemMetadata) {
|
||||
log.debug("Invoked ingestAspectIfNotPresent with urn: {}, aspectName: {}, newValue: {}", urn, aspectName, newValue);
|
||||
|
||||
final SystemMetadata internalSystemMetadata = generateSystemMetadataIfEmpty(systemMetadata);
|
||||
|
||||
Timer.Context ingestToLocalDBTimer = MetricUtils.timer(this.getClass(), "ingestAspectToLocalDB").time();
|
||||
|
||||
UpdateAspectResult result = _aspectDao.runInTransactionWithRetry(() -> {
|
||||
final CassandraAspect latest = _aspectDao.getLatestAspect(urn.toString(), aspectName);
|
||||
if (latest == null) {
|
||||
long nextVersion = _aspectDao.getNextVersion(urn.toString(), aspectName);
|
||||
|
||||
return ingestAspectToLocalDBNoTransaction(urn, aspectName, ignored -> newValue, auditStamp,
|
||||
internalSystemMetadata, latest, nextVersion);
|
||||
}
|
||||
RecordTemplate oldValue = EntityUtils.toAspectRecord(urn, aspectName, latest.getMetadata(), getEntityRegistry());
|
||||
SystemMetadata oldMetadata = EntityUtils.parseSystemMetadata(latest.getSystemMetadata());
|
||||
return new UpdateAspectResult(urn, oldValue, oldValue, oldMetadata, oldMetadata, MetadataAuditOperation.UPDATE, auditStamp,
|
||||
latest.getVersion());
|
||||
}, DEFAULT_MAX_TRANSACTION_RETRY);
|
||||
|
||||
return sendEventForUpdateAspectResult(urn, aspectName, result);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nonnull
|
||||
public RecordTemplate updateAspect(
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package com.linkedin.metadata.entity.ebean;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.datahub.util.RecordUtils;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
@ -36,6 +37,7 @@ import com.linkedin.metadata.run.AspectRowSummary;
|
||||
import com.linkedin.metadata.utils.EntityKeyUtils;
|
||||
import com.linkedin.metadata.utils.GenericRecordUtils;
|
||||
import com.linkedin.metadata.utils.PegasusUtils;
|
||||
import com.linkedin.metadata.utils.metrics.MetricUtils;
|
||||
import com.linkedin.mxe.MetadataAuditOperation;
|
||||
import com.linkedin.mxe.MetadataChangeProposal;
|
||||
import com.linkedin.mxe.SystemMetadata;
|
||||
@ -382,6 +384,34 @@ public class EbeanEntityService extends EntityService {
|
||||
}, DEFAULT_MAX_TRANSACTION_RETRY);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public RecordTemplate ingestAspectIfNotPresent(@Nonnull Urn urn, @Nonnull String aspectName,
|
||||
@Nonnull RecordTemplate newValue, @Nonnull AuditStamp auditStamp, @Nonnull SystemMetadata systemMetadata) {
|
||||
log.debug("Invoked ingestAspectIfNotPresent with urn: {}, aspectName: {}, newValue: {}", urn, aspectName, newValue);
|
||||
|
||||
final SystemMetadata internalSystemMetadata = generateSystemMetadataIfEmpty(systemMetadata);
|
||||
|
||||
Timer.Context ingestToLocalDBTimer = MetricUtils.timer(this.getClass(), "ingestAspectToLocalDB").time();
|
||||
UpdateAspectResult result = _aspectDao.runInTransactionWithRetry(() -> {
|
||||
final String urnStr = urn.toString();
|
||||
final EbeanAspectV2 latest = _aspectDao.getLatestAspect(urnStr, aspectName);
|
||||
if (latest == null) {
|
||||
long nextVersion = _aspectDao.getNextVersion(urnStr, aspectName);
|
||||
|
||||
return ingestAspectToLocalDBNoTransaction(urn, aspectName, ignored -> newValue, auditStamp,
|
||||
internalSystemMetadata, latest, nextVersion);
|
||||
}
|
||||
RecordTemplate oldValue = EntityUtils.toAspectRecord(urn, aspectName, latest.getMetadata(), getEntityRegistry());
|
||||
SystemMetadata oldMetadata = EntityUtils.parseSystemMetadata(latest.getSystemMetadata());
|
||||
return new UpdateAspectResult(urn, oldValue, oldValue, oldMetadata, oldMetadata, MetadataAuditOperation.UPDATE, auditStamp,
|
||||
latest.getVersion());
|
||||
}, DEFAULT_MAX_TRANSACTION_RETRY);
|
||||
ingestToLocalDBTimer.stop();
|
||||
|
||||
return sendEventForUpdateAspectResult(urn, aspectName, result);
|
||||
}
|
||||
|
||||
@Nonnull
|
||||
private UpdateAspectResult ingestAspectToLocalDBNoTransaction(@Nonnull final Urn urn,
|
||||
@Nonnull final String aspectName, @Nonnull final Function<Optional<RecordTemplate>, RecordTemplate> updateLambda,
|
||||
|
||||
@ -7,7 +7,7 @@ import lombok.Data;
|
||||
@Data
|
||||
public class TelemetryConfiguration {
|
||||
/**
|
||||
* Whether cli telemtry is enabled
|
||||
* Whether cli telemetry is enabled
|
||||
*/
|
||||
public boolean enabledCli;
|
||||
/**
|
||||
@ -18,4 +18,9 @@ public class TelemetryConfiguration {
|
||||
* Whether or not third party logging should be enabled for this instance
|
||||
*/
|
||||
public boolean enableThirdPartyLogging;
|
||||
|
||||
/**
|
||||
* Whether or not server telemetry should be enabled
|
||||
*/
|
||||
public boolean enabledServer;
|
||||
}
|
||||
@ -727,6 +727,43 @@ abstract public class EntityServiceTestBase<T_AD extends AspectDao, T_ES extends
|
||||
assertEquals(_entityService.listLatestAspects(entityUrn.getEntityType(), aspectName2, 0, 10).getTotalCount(), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIngestAspectIfNotPresent() throws Exception {
|
||||
Urn entityUrn = Urn.createFromString("urn:li:corpuser:test1");
|
||||
|
||||
SystemMetadata metadata1 = new SystemMetadata();
|
||||
metadata1.setLastObserved(1625792689);
|
||||
metadata1.setRunId("run-123");
|
||||
|
||||
String aspectName = PegasusUtils.getAspectNameFromSchema(new CorpUserInfo().schema());
|
||||
|
||||
// Ingest CorpUserInfo Aspect
|
||||
CorpUserInfo writeAspect1 = createCorpUserInfo("email@test.com");
|
||||
_entityService.ingestAspectIfNotPresent(entityUrn, aspectName, writeAspect1, TEST_AUDIT_STAMP, metadata1);
|
||||
CorpUserInfo writeAspect1a = createCorpUserInfo("email_a@test.com");
|
||||
_entityService.ingestAspectIfNotPresent(entityUrn, aspectName, writeAspect1a, TEST_AUDIT_STAMP, metadata1);
|
||||
CorpUserInfo writeAspect1b = createCorpUserInfo("email_b@test.com");
|
||||
_entityService.ingestAspectIfNotPresent(entityUrn, aspectName, writeAspect1b, TEST_AUDIT_STAMP, metadata1);
|
||||
|
||||
String aspectName2 = PegasusUtils.getAspectNameFromSchema(new Status().schema());
|
||||
// Ingest Status Aspect
|
||||
Status writeAspect2 = new Status().setRemoved(true);
|
||||
_entityService.ingestAspectIfNotPresent(entityUrn, aspectName2, writeAspect2, TEST_AUDIT_STAMP, metadata1);
|
||||
Status writeAspect2a = new Status().setRemoved(false);
|
||||
_entityService.ingestAspectIfNotPresent(entityUrn, aspectName2, writeAspect2a, TEST_AUDIT_STAMP, metadata1);
|
||||
Status writeAspect2b = new Status().setRemoved(true);
|
||||
_entityService.ingestAspectIfNotPresent(entityUrn, aspectName2, writeAspect2b, TEST_AUDIT_STAMP, metadata1);
|
||||
|
||||
assertEquals(_entityService.getAspect(entityUrn, aspectName, 0), writeAspect1);
|
||||
assertEquals(_entityService.getAspect(entityUrn, aspectName2, 0), writeAspect2);
|
||||
|
||||
assertNull(_entityService.getAspect(entityUrn, aspectName, 1));
|
||||
assertNull(_entityService.getAspect(entityUrn, aspectName2, 1));
|
||||
|
||||
assertEquals(_entityService.listLatestAspects(entityUrn.getEntityType(), aspectName, 0, 10).getTotalCount(), 1);
|
||||
assertEquals(_entityService.listLatestAspects(entityUrn.getEntityType(), aspectName2, 0, 10).getTotalCount(), 1);
|
||||
}
|
||||
|
||||
protected static AuditStamp createTestAuditStamp() {
|
||||
try {
|
||||
return new AuditStamp().setTime(123L).setActor(Urn.createFromString("urn:li:principal:tester"));
|
||||
|
||||
@ -0,0 +1,14 @@
|
||||
namespace com.linkedin.metadata.key
|
||||
|
||||
/**
|
||||
* Key for the telemetry client ID, only one should ever exist
|
||||
*/
|
||||
@Aspect = {
|
||||
"name": "telemetryKey"
|
||||
}
|
||||
record TelemetryKey {
|
||||
/**
|
||||
* The telemetry entity name, which serves as a unique id
|
||||
*/
|
||||
name: string
|
||||
}
|
||||
@ -0,0 +1,14 @@
|
||||
namespace com.linkedin.telemetry
|
||||
|
||||
/**
|
||||
* A simple wrapper around a String to persist the client ID for telemetry in DataHub's backend DB
|
||||
*/
|
||||
@Aspect = {
|
||||
"name": "telemetryClientId"
|
||||
}
|
||||
record TelemetryClientId {
|
||||
/**
|
||||
* A string representing the telemetry client ID
|
||||
*/
|
||||
clientId: string
|
||||
}
|
||||
@ -209,4 +209,9 @@ entities:
|
||||
- glossaryTerms
|
||||
- editableMlPrimaryKeyProperties
|
||||
- domains
|
||||
- name: telemetry
|
||||
category: internal
|
||||
keyAspect: telemetryKey
|
||||
aspects:
|
||||
- telemetryClientId
|
||||
events:
|
||||
|
||||
@ -27,4 +27,7 @@ dependencies {
|
||||
annotationProcessor externalDependency.lombok
|
||||
|
||||
compile spec.product.pegasus.restliSpringBridge
|
||||
|
||||
testCompile externalDependency.mockito
|
||||
testCompile externalDependency.testng
|
||||
}
|
||||
|
||||
@ -0,0 +1,112 @@
|
||||
package com.linkedin.gms.factory.telemetry;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.linkedin.datahub.graphql.analytics.service.AnalyticsService;
|
||||
import com.linkedin.datahub.graphql.generated.DateRange;
|
||||
import com.linkedin.gms.factory.config.ConfigurationProvider;
|
||||
import com.linkedin.metadata.entity.EntityService;
|
||||
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
|
||||
import com.mixpanel.mixpanelapi.MessageBuilder;
|
||||
import com.mixpanel.mixpanelapi.MixpanelAPI;
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.joda.time.DateTime;
|
||||
import org.json.JSONObject;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
|
||||
import static com.linkedin.gms.factory.telemetry.TelemetryUtils.*;
|
||||
|
||||
@Slf4j
|
||||
public class DailyReport {
|
||||
|
||||
private final IndexConvention _indexConvention;
|
||||
private final RestHighLevelClient _elasticClient;
|
||||
private final ConfigurationProvider _configurationProvider;
|
||||
private final EntityService _entityService;
|
||||
|
||||
private static final String MIXPANEL_TOKEN = "5ee83d940754d63cacbf7d34daa6f44a";
|
||||
private MixpanelAPI mixpanel;
|
||||
private MessageBuilder mixpanelBuilder;
|
||||
|
||||
public DailyReport(IndexConvention indexConvention, RestHighLevelClient elasticClient,
|
||||
ConfigurationProvider configurationProvider, EntityService entityService) {
|
||||
this._indexConvention = indexConvention;
|
||||
this._elasticClient = elasticClient;
|
||||
this._configurationProvider = configurationProvider;
|
||||
this._entityService = entityService;
|
||||
try {
|
||||
String clientId = getClientId(entityService);
|
||||
|
||||
// initialize MixPanel instance and message builder
|
||||
mixpanel = new MixpanelAPI();
|
||||
mixpanelBuilder = new MessageBuilder(MIXPANEL_TOKEN);
|
||||
|
||||
// set user-level properties
|
||||
JSONObject props = new JSONObject();
|
||||
props.put("java_version", System.getProperty("java.version"));
|
||||
props.put("os", System.getProperty("os.name"));
|
||||
JSONObject update = mixpanelBuilder.set(clientId, props);
|
||||
try {
|
||||
mixpanel.sendMessage(update);
|
||||
} catch (IOException e) {
|
||||
log.error("Error sending telemetry profile:", e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("Unable to set up telemetry.", e);
|
||||
}
|
||||
}
|
||||
|
||||
// statistics to send daily
|
||||
@Scheduled(fixedDelay = 24 * 60 * 60 * 1000)
|
||||
public void dailyReport() {
|
||||
AnalyticsService analyticsService = new AnalyticsService(_elasticClient, _indexConvention);
|
||||
|
||||
DateTime endDate = DateTime.now();
|
||||
DateTime yesterday = endDate.minusDays(1);
|
||||
DateTime lastWeek = endDate.minusWeeks(1);
|
||||
DateTime lastMonth = endDate.minusMonths(1);
|
||||
|
||||
DateRange dayRange = new DateRange(String.valueOf(yesterday.getMillis()), String.valueOf(endDate.getMillis()));
|
||||
DateRange weekRange = new DateRange(String.valueOf(lastWeek.getMillis()), String.valueOf(endDate.getMillis()));
|
||||
DateRange monthRange = new DateRange(String.valueOf(lastMonth.getMillis()), String.valueOf(endDate.getMillis()));
|
||||
|
||||
int dailyActiveUsers =
|
||||
analyticsService.getHighlights(analyticsService.getUsageIndexName(), Optional.of(dayRange),
|
||||
ImmutableMap.of(), ImmutableMap.of(), Optional.of("browserId"));
|
||||
int weeklyActiveUsers =
|
||||
analyticsService.getHighlights(analyticsService.getUsageIndexName(), Optional.of(weekRange),
|
||||
ImmutableMap.of(), ImmutableMap.of(), Optional.of("browserId"));
|
||||
int monthlyActiveUsers =
|
||||
analyticsService.getHighlights(analyticsService.getUsageIndexName(), Optional.of(monthRange),
|
||||
ImmutableMap.of(), ImmutableMap.of(), Optional.of("browserId"));
|
||||
|
||||
// floor to nearest power of 10
|
||||
dailyActiveUsers = (int) Math.pow(10, (int) Math.log10(dailyActiveUsers + 1));
|
||||
weeklyActiveUsers = (int) Math.pow(10, (int) Math.log10(weeklyActiveUsers + 1));
|
||||
monthlyActiveUsers = (int) Math.pow(10, (int) Math.log10(monthlyActiveUsers + 1));
|
||||
|
||||
// set user-level properties
|
||||
JSONObject report = new JSONObject();
|
||||
report.put("dau", dailyActiveUsers);
|
||||
report.put("wau", weeklyActiveUsers);
|
||||
report.put("mau", monthlyActiveUsers);
|
||||
|
||||
ping("service-daily", report);
|
||||
}
|
||||
|
||||
public void ping(String eventName, JSONObject properties) {
|
||||
if (mixpanel == null || mixpanelBuilder == null) {
|
||||
log.error("Unable to send telemetry metrics, MixPanel API not initialized");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
JSONObject event = mixpanelBuilder.event(getClientId(_entityService), eventName, properties);
|
||||
mixpanel.sendMessage(event);
|
||||
} catch (IOException e) {
|
||||
log.error("Error reporting telemetry:", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,28 @@
|
||||
package com.linkedin.gms.factory.telemetry;
|
||||
|
||||
import com.linkedin.gms.factory.common.IndexConventionFactory;
|
||||
import com.linkedin.gms.factory.config.ConfigurationProvider;
|
||||
import com.linkedin.metadata.entity.EntityService;
|
||||
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
|
||||
@Slf4j
|
||||
@Configuration
|
||||
@EnableScheduling
|
||||
public class ScheduledAnalyticsFactory {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty("telemetry.enabledServer")
|
||||
public DailyReport dailyReport(@Qualifier("elasticSearchRestHighLevelClient") RestHighLevelClient elasticClient,
|
||||
@Qualifier(IndexConventionFactory.INDEX_CONVENTION_BEAN) IndexConvention indexConvention,
|
||||
ConfigurationProvider configurationProvider, EntityService entityService) {
|
||||
return new DailyReport(indexConvention, elasticClient, configurationProvider, entityService);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,45 @@
|
||||
package com.linkedin.gms.factory.telemetry;
|
||||
|
||||
import com.linkedin.common.AuditStamp;
|
||||
import com.linkedin.telemetry.TelemetryClientId;
|
||||
import com.linkedin.common.urn.UrnUtils;
|
||||
import com.linkedin.data.template.RecordTemplate;
|
||||
import com.linkedin.metadata.Constants;
|
||||
import com.linkedin.metadata.entity.EntityService;
|
||||
import java.util.UUID;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
|
||||
@Slf4j
|
||||
public final class TelemetryUtils {
|
||||
|
||||
public static final String CLIENT_ID_URN = "urn:li:telemetry:clientId";
|
||||
public static final String CLIENT_ID_ASPECT = "clientId";
|
||||
|
||||
private static String _clientId;
|
||||
|
||||
public static String getClientId(EntityService entityService) {
|
||||
if (_clientId == null) {
|
||||
createClientIdIfNotPresent(entityService);
|
||||
RecordTemplate clientIdTemplate = entityService.getLatestAspect(UrnUtils.getUrn(CLIENT_ID_URN), CLIENT_ID_ASPECT);
|
||||
// Should always be present here from above, so no need for null check
|
||||
_clientId = ((TelemetryClientId) clientIdTemplate).getClientId();
|
||||
}
|
||||
return _clientId;
|
||||
}
|
||||
|
||||
private static void createClientIdIfNotPresent(EntityService entityService) {
|
||||
String uuid = UUID.randomUUID().toString();
|
||||
TelemetryClientId clientId = new TelemetryClientId().setClientId(uuid);
|
||||
final AuditStamp clientIdStamp = new AuditStamp();
|
||||
clientIdStamp.setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR));
|
||||
clientIdStamp.setTime(System.currentTimeMillis());
|
||||
entityService.ingestAspectIfNotPresent(UrnUtils.getUrn(CLIENT_ID_URN), CLIENT_ID_ASPECT, clientId, clientIdStamp, null);
|
||||
}
|
||||
|
||||
private TelemetryUtils() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
}
|
||||
@ -46,6 +46,7 @@ telemetry:
|
||||
enabledCli: ${CLI_TELEMETRY_ENABLED:true}
|
||||
enabledIngestion: ${INGESTION_REPORTING_ENABLED:false}
|
||||
enableThirdPartyLogging: ${ENABLE_THIRD_PARTY_LOGGING:false}
|
||||
enabledServer: ${DATAHUB_TELEMETRY_ENABLED:true}
|
||||
|
||||
secretService:
|
||||
encryptionKey: ${SECRET_SERVICE_ENCRYPTION_KEY:ENCRYPTION_KEY}
|
||||
|
||||
@ -0,0 +1,28 @@
|
||||
package io.datahubproject.telemetry;
|
||||
|
||||
import com.linkedin.gms.factory.telemetry.TelemetryUtils;
|
||||
import com.linkedin.metadata.entity.EntityService;
|
||||
import com.linkedin.telemetry.TelemetryClientId;
|
||||
import org.mockito.Mockito;
|
||||
import org.testng.annotations.BeforeMethod;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.ArgumentMatchers.*;
|
||||
|
||||
|
||||
public class TelemetryUtilsTest {
|
||||
|
||||
EntityService _entityService;
|
||||
|
||||
@BeforeMethod
|
||||
public void init() {
|
||||
_entityService = Mockito.mock(EntityService.class);
|
||||
Mockito.when(_entityService.getLatestAspect(any(), anyString())).thenReturn(new TelemetryClientId().setClientId("1234"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void getClientIdTest() {
|
||||
assertEquals("1234", TelemetryUtils.getClientId(_entityService));
|
||||
}
|
||||
}
|
||||
@ -47,6 +47,8 @@ import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import javax.annotation.Nonnull;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import static entities.EntitiesControllerTest.*;
|
||||
|
||||
@ -163,6 +165,13 @@ public class MockEntityService extends EntityService {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public RecordTemplate ingestAspectIfNotPresent(@NotNull Urn urn, @NotNull String aspectName,
|
||||
@NotNull RecordTemplate newValue, @NotNull AuditStamp auditStamp, SystemMetadata systemMetadata) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordTemplate updateAspect(@Nonnull Urn urn, @Nonnull String entityName, @Nonnull String aspectName,
|
||||
@Nonnull AspectSpec aspectSpec, @Nonnull RecordTemplate newValue, @Nonnull AuditStamp auditStamp,
|
||||
|
||||
@ -1,7 +1,14 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:context="http://www.springframework.org/schema/context"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
|
||||
xmlns:task="http://www.springframework.org/schema/task"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans
|
||||
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
|
||||
http://www.springframework.org/schema/context
|
||||
http://www.springframework.org/schema/context/spring-context-3.0.xsd
|
||||
http://www.springframework.org/schema/task
|
||||
http://www.springframework.org/schema/task/spring-task-3.0.xsd"
|
||||
>
|
||||
|
||||
<context:component-scan base-package="com.linkedin.gms.factory,com.linkedin.metadata,com.linkedin.metadata.boot,com.datahub.authentication,com.datahub.event" />
|
||||
|
||||
@ -32,4 +39,6 @@
|
||||
</bean>
|
||||
|
||||
<context:property-placeholder properties-ref="yamlProperties"/>
|
||||
|
||||
<task:annotation-driven/>
|
||||
</beans>
|
||||
@ -20,7 +20,7 @@ pip install --upgrade pip wheel setuptools
|
||||
pip install -r requirements.txt
|
||||
|
||||
echo "DATAHUB_VERSION = $DATAHUB_VERSION"
|
||||
datahub docker quickstart
|
||||
datahub docker quickstart --dump-logs-on-failure
|
||||
|
||||
(cd tests/cypress ; yarn install)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user