mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-04 15:50:14 +00:00
411 lines
27 KiB
Markdown
411 lines
27 KiB
Markdown
# API Tracing
|
|
|
|
## Introduction
|
|
|
|
DataHub's asynchronous APIs enable high-volume data operations, particularly for bulk ingestion processes. While these
|
|
APIs optimize throughput, they previously lacked built-in validation mechanisms for operation status. Consequently,
|
|
detecting processing issues required direct monitoring of backend system metrics and logs.
|
|
|
|
To address this limitation, DataHub implemented a trace/request ID system that enables end-to-end tracking of write
|
|
operations. This tracing mechanism is particularly crucial given DataHub's multi-stage write architecture, where data
|
|
propagates through multiple components and persists across distinct storage systems. The trace ID maintains continuity
|
|
throughout this complex processing pipeline, providing visibility into the operation's status at each stage of execution.
|
|
|
|
The system effectively balances the performance benefits of asynchronous processing with the operational necessity of
|
|
request tracking and validation. This enhancement significantly improves observability without compromising the
|
|
throughput advantages of bulk operations.
|
|
|
|
## Architecture Overview
|
|
|
|
Shown below is the write path for an asynchronous write within DataHub. For more information about MCPs please see
|
|
the documentation on [MetadataChangeProposal & MetadataChangeLog Events](/docs/advanced/mcp-mcl.md).
|
|
|
|
<p align="center">
|
|
<img width="90%" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/advanced/mcp-mcl/async-ingestion.svg"/>
|
|
</p>
|
|
|
|
A successful write operation requires data persistence in at least one storage system, though typically both primary and
|
|
search storage systems must be updated. The storage architecture consists of two main components:
|
|
|
|
- Primary Storage: Comprises MySQL, Postgres, or Cassandra, serving as the persistent store for all non-Timeseries aspects.
|
|
- Search Storage: Utilizes either Elasticsearch or OpenSearch systems.
|
|
|
|
In most operational scenarios, write operations must successfully complete across both storage layers to maintain system
|
|
consistency and ensure complete data availability.
|
|
|
|
## Trace API
|
|
|
|
Known Limitations:
|
|
|
|
- Tracing can fail in some cases where a batch of aspects introduces a conflict between a system generated aspect and one
|
|
included within a batch. An example, `upstreamLineage` and `siblings` aspects together may conflict with a system generated
|
|
`siblings` aspect normally generated from `upstreamLineage` in an MCL hook.
|
|
|
|
The trace API's status retrieval functionality requires three key identifiers to locate specific write operations:
|
|
the trace ID (unique to the request), the URN, and the aspect name. This combination of identifiers ensures precise
|
|
operation tracking within the system.
|
|
|
|
For batch operations involving multiple URNs and aspects, a single trace ID is assigned to monitor the entire request.
|
|
In asynchronous mode, the system maintains independent status tracking for each aspect within the batch, allowing for
|
|
granular operation monitoring.
|
|
|
|
The API returns a comprehensive status report that includes:
|
|
|
|
- Per-aspect success/failure status
|
|
- Detailed status breakdowns for each storage system
|
|
- Write states as defined in the [Write States](#Write-States) documentation
|
|
- Error information from MCP processing, when applicable, to facilitate debugging
|
|
|
|
This structured approach to status reporting enables precise monitoring of complex write operations across the system's
|
|
various components.
|
|
|
|
### Retrieving the `trace id`
|
|
|
|
DataHub's asynchronous APIs provide trace ID information through two distinct mechanisms:
|
|
|
|
- HTTP Response Header: A W3C-compliant `traceparent` header is included in all API responses
|
|
The complete header value serves as a valid trace ID
|
|
- System Metadata: For OpenAPI v3 APIs and those returning systemMetadata, the trace ID is accessible via the
|
|
`telemetryTraceId` property within systemMetadata
|
|
|
|
While these two trace ID formats differ structurally—with the `traceparent` adhering to W3C's Trace Context
|
|
specification—both formats are fully compatible with the Trace API for operation tracking purposes.
|
|
|
|
Header Example:
|
|
|
|
```text
|
|
traceparent: 00-00062c53a468cbd8077e7dd079846870-9199effb49910b4e-01
|
|
```
|
|
|
|
`SystemMetadata` Example:
|
|
|
|
```json
|
|
[
|
|
{
|
|
"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
|
|
"status": {
|
|
"value": {
|
|
"removed": false
|
|
},
|
|
"systemMetadata": {
|
|
"properties": {
|
|
"telemetryLog": "false",
|
|
"telemetryQueueSpanId": "ee9e40edcb66ce4f",
|
|
"telemetryTraceId": "00062c53a468cbd8077e7dd079846870",
|
|
"telemetryEnqueuedAt": "1737587612508"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
]
|
|
```
|
|
|
|
### Write States
|
|
|
|
As mentioned earlier, there are multiple states for an aspect write both storage systems. These states are as follows:
|
|
|
|
| Write State | Description |
|
|
| ----------------------- | ------------------------------------------------------------------------------------------------------ |
|
|
| `ERROR` | This state indicates an error occurred when processing the write request. |
|
|
| `PENDING` | A pending state indicates that the write is queued and the consumer has not yet processed the message. |
|
|
| `ACTIVE_STATE` | The write was successful and is the current value. |
|
|
| `HISTORIC_STATE` | The write was successful, however it has been overwritten by a newer value. |
|
|
| `NO_OP` | The write is not applicable for a given storage system. |
|
|
| `UNKNOWN` | We are unable to determine the state of the write and no record of its failure exists either. |
|
|
| `TRACE_NOT_IMPLEMENTED` | We have not yet implemented tracing a particular aspect type. This applies to Timeseries aspects. |
|
|
|
|
### Using the Trace API
|
|
|
|
The Trace API is implemented as an OpenAPI endpoint and can be used both programmatically and through the Swagger UI.
|
|
|
|
Required Values:
|
|
|
|
- `traceId` - The `trace id` associated with the write request. See the previous [Retrieving the `trace id`](#retrieving-the-trace-id) section for how to find this id.
|
|
- URN/Aspect names - These are passed as a POST body and should represent at least a subset of the URN/aspects from the initial request.
|
|
An example is shown here for a single URN and 2 aspects [`datasetInfo`, `status`].
|
|
```json
|
|
{
|
|
"urn:li:dataset:(urn:li:dataPlatform:bigquery,transactions.user_profile,PROD)": [
|
|
"datasetInfo",
|
|
"status"
|
|
]
|
|
}
|
|
```
|
|
- Authorization token
|
|
|
|
Optional Parameters:
|
|
|
|
- `onlyIncludeErrors` (default: `true`) - If this parameter is set to `true`, the response will only include status information on the failed aspects.
|
|
- `detailed` (default: `false`) - If set to `true`, will include detailed information from exceptions for failed MCPs.
|
|
- `skipCache` (default: `false`) - If set to `true`, will bypass a short-lived cache of the kafka consumer group offsets.
|
|
|
|
The following shows a few examples of requests/response pairs.
|
|
|
|
- Successful Write
|
|
- Request for URN `urn:li:dataset:(urn:li:dataPlatform:bigquery,transactions.user_profile,PROD)` and aspect `status`
|
|
```shell
|
|
curl -v 'http://localhost:8080/openapi/v1/trace/write/00062c2b698bcb28e92508f8f311802d?onlyIncludeErrors=false&detailed=true&skipCache=false' \
|
|
-H 'accept: application/json' \
|
|
-H 'Content-Type: application/json' \
|
|
-H 'Authorization: Bearer <TOKEN>' \
|
|
-d '{
|
|
"urn:li:dataset:(urn:li:dataPlatform:bigquery,transactions.user_profile,PROD)": [
|
|
"status"
|
|
]
|
|
}' | jq
|
|
```
|
|
- Example response
|
|
```json
|
|
{
|
|
"urn:li:dataset:(urn:li:dataPlatform:bigquery,transactions.user_profile,PROD)": {
|
|
"status": {
|
|
"success": true,
|
|
"primaryStorage": {
|
|
"writeStatus": "ACTIVE_STATE"
|
|
},
|
|
"searchStorage": {
|
|
"writeStatus": "ACTIVE_STATE"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
```
|
|
- Error with exception details
|
|
- Example request
|
|
```shell
|
|
curl -v 'http://localhost:8080/openapi/v1/trace/write/00062c543e4550c8400e6f6864471a20?onlyIncludeErrors=true&detailed=true&skipCache=false' \
|
|
-H 'accept: application/json' \
|
|
-H 'Content-Type: application/json' \
|
|
-H 'Authorization: Bearer <TOKEN>' \
|
|
-d '{"urn:li:dataset:(urn:li:dataPlatform:bigquery,transactions.user_profile,PROD)": ["status"]}'
|
|
```
|
|
- Example response
|
|
```json
|
|
{
|
|
"urn:li:dataset:(urn:li:dataPlatform:bigquery,transactions.user_profile,PROD)": {
|
|
"status": {
|
|
"success": false,
|
|
"primaryStorage": {
|
|
"writeStatus": "ERROR",
|
|
"writeExceptions": [
|
|
{
|
|
"message": "Expected version -100000, actual version -1",
|
|
"exceptionClass": "com.linkedin.metadata.aspect.plugins.validation.AspectValidationException",
|
|
"stackTrace": [
|
|
"com.linkedin.metadata.aspect.plugins.validation.AspectValidationException.forPrecondition(AspectValidationException.java:33)",
|
|
"com.linkedin.metadata.aspect.plugins.validation.AspectValidationException.forPrecondition(AspectValidationException.java:25)",
|
|
"com.linkedin.metadata.aspect.validation.ConditionalWriteValidator.validateVersionPrecondition(ConditionalWriteValidator.java:152)",
|
|
"com.linkedin.metadata.aspect.validation.ConditionalWriteValidator.lambda$validatePreCommitAspects$2(ConditionalWriteValidator.java:100)",
|
|
"java.base/java.util.Optional.flatMap(Optional.java:289)",
|
|
"com.linkedin.metadata.aspect.validation.ConditionalWriteValidator.validatePreCommitAspects(ConditionalWriteValidator.java:98)",
|
|
"com.linkedin.metadata.aspect.plugins.validation.AspectPayloadValidator.validatePreCommit(AspectPayloadValidator.java:38)",
|
|
"com.linkedin.metadata.aspect.batch.AspectsBatch.lambda$validatePreCommit$4(AspectsBatch.java:129)",
|
|
"java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:273)",
|
|
"java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)",
|
|
"java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)",
|
|
"java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)",
|
|
"java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)",
|
|
"java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)",
|
|
"java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)",
|
|
"java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)",
|
|
"com.linkedin.metadata.aspect.batch.AspectsBatch.validatePreCommit(AspectsBatch.java:130)"
|
|
]
|
|
}
|
|
]
|
|
},
|
|
"searchStorage": {
|
|
"writeStatus": "ERROR",
|
|
"writeMessage": "Primary storage write failed."
|
|
}
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
### Ingestion Tracing (Experimental)
|
|
|
|
Known Limitations:
|
|
|
|
- Only OpenAPI is supported
|
|
- ASYNC_WAIT/ASYNC - Async modes are impacted by kafka lag.
|
|
|
|
Ingestion can be used with tracing enabled however it does require using a `ASYNC_WAIT` emit mode as well as the `OPENAPI`. This
|
|
can be enabled by setting a couple environment variables shown below.
|
|
|
|
```shell
|
|
DATAHUB_REST_SINK_DEFAULT_ENDPOINT=OPENAPI \
|
|
DATAHUB_EMIT_MODE=ASYNC_WAIT \
|
|
datahub ingest ...
|
|
```
|
|
|
|
## Trace Performance
|
|
|
|
The Trace API's performance profile varies based on operation status:
|
|
|
|
Successful Operations:
|
|
|
|
- Optimal performance through direct storage access
|
|
- Requires single lookup operations from SQL and Elasticsearch
|
|
- Bypasses Kafka interaction entirely
|
|
|
|
Error State Operations:
|
|
|
|
- Performance impact due to required Kafka topic inspection
|
|
- Optimization mechanisms implemented:
|
|
- Timestamp-based offset seeking for efficient topic traversal
|
|
- Parallel trace processing with controlled concurrency
|
|
- Offset caching system to enhance response times
|
|
- Cache bypass available via skipCache parameter when data currency is critical
|
|
|
|
The performance differential between success and error states stems primarily from the additional overhead of Kafka
|
|
topic inspection required for error tracking and diagnosis.
|
|
|
|
For more detail, please see the [Design Notes](#design-notes) section.
|
|
|
|
### Real World Test
|
|
|
|
A test was executed with 627,712 aspects in `ASYNC_BATCH` and averaged over 2 runs. The overhead from tracing introduced
|
|
a 3.84% increase in runtime. Example test runs shown below.
|
|
|
|
Without Tracing:
|
|
|
|
```json
|
|
{
|
|
"total_records_written": 627712,
|
|
"records_written_per_second": 376,
|
|
"total_duration_in_seconds": 1665.25,
|
|
"mode": "ASYNC_BATCH",
|
|
"max_threads": 15,
|
|
"gms_version": "v0.3.9.3",
|
|
"pending_requests": 0,
|
|
"async_batches_prepared": 6290,
|
|
"async_batches_split": 0,
|
|
"main_thread_blocking_timer": "1186.301 seconds"
|
|
}
|
|
```
|
|
|
|
With Tracing:
|
|
|
|
```json
|
|
{
|
|
"total_records_written": 627612,
|
|
"records_written_per_second": 368,
|
|
"total_duration_in_seconds": 1701.51,
|
|
"mode": "ASYNC_BATCH",
|
|
"max_threads": 15,
|
|
"gms_version": "v0.3.9.3",
|
|
"pending_requests": 0,
|
|
"async_batches_prepared": 6291,
|
|
"async_batches_split": 0,
|
|
"main_thread_blocking_timer": "1224.738 seconds"
|
|
}
|
|
```
|
|
|
|
## Trace Exporters
|
|
|
|
At the foundation of the trace instrumentation is OpenTelemetry which has been a part of DataHub for quite some time. As
|
|
documented in the [Monitoring](/docs/advanced/monitoring.md) section, OpenTelemetry can be configured to export traces
|
|
to external systems. For the Trace API to function, this external system is NOT required.
|
|
|
|
### Trace Log Export
|
|
|
|
A special log-based OpenTelemetry exporter was implemented for debugging purposes. When selectively activated for a given
|
|
request it will print `trace id`s and detailed timing information as the request traverses the different components of DataHub.
|
|
The output of these logs is also not required for the Trace API to function, however it leverages the same underlying OpenTelemetry
|
|
foundation.
|
|
|
|
Activating a trace log is done using one of these methods:
|
|
|
|
- HTTP Header: `X-Enable-Trace-Log: true`
|
|
- Cookie: `enable-trace-log: true`
|
|
- javascript: `document.cookie = "enable-trace-log=true";`
|
|
|
|
Example logs for a single request with tracing logging enabled:
|
|
|
|
- GMS
|
|
|
|
```text
|
|
i.d.metadata.context.RequestContext:53 - RequestContext{actorUrn='urn:li:corpuser:datahub', sourceIP='172.18.0.5', requestAPI=OPENAPI, requestID='createAspect([dataset])', userAgent='Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36'}
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: a2898a18f9f0c4f1, ParentId: dd746f079d1232ba, Name: ingestTimeseriesProposal, Duration: 0.03 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={async=true, batch.size=1}, capacity=128, totalAddedValues=2}
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: 02e058ff616e4c99, ParentId: 7ed88659811a8fdb, Name: produceMetadataChangeProposal, Duration: 0.03 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={messaging.destination_kind=topic, messaging.system=kafka, messaging.destination=MetadataChangeProposal_v1, messaging.operation=publish, queue.enqueued_at=1737418391958}, capacity=128, totalAddedValues=5}
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: 7ed88659811a8fdb, ParentId: dd746f079d1232ba, Name: ingestProposalAsync, Duration: 2.57 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={batch.size=1}, capacity=128, totalAddedValues=1}
|
|
```
|
|
|
|
- MCE Consumer
|
|
|
|
```text
|
|
c.l.m.k.MetadataChangeProposalsProcessor:89 - Got MCP event key: urn:li:dataset:(urn:li:dataPlatform:snowflake,climate.daily_temperature,PROD), topic: MetadataChangeProposal_v1, partition: 0, offset: 75, value size: 412, timestamp: 1737418391959
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: a65075fe0982d873, ParentId: 02e058ff616e4c99, Name: consume, Duration: 0.01 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={messaging.destination_kind=topic, queue.duration_ms=4, messaging.system=kafka, messaging.destination=MetadataChangeProposal_v1, messaging.operation=receive, queue.enqueued_at=1737418391958}, capacity=128, totalAddedValues=6}
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: dd746f079d1232ba, ParentId: 0000000000000000, Name: POST /openapi/v3/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cclimate.daily_temperature%2CPROD%29/status, Duration: 16.18 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={request.api=OPENAPI, http.status_code=202, user.id=urn:li:corpuser:datahub, http.url=/openapi/v3/entity/dataset/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cclimate.daily_temperature%2CPROD%29/status, request.id=createAspect([dataset]), http.method=POST}, capacity=128, totalAddedValues=6}
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: 94a019b95154c0e7, ParentId: 0cb378fe4f5ad185, Name: ingestProposalSync, Duration: 0.01 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={batch.size=0}, capacity=128, totalAddedValues=1}
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: 0cb378fe4f5ad185, ParentId: 68df6bc4729dc0a2, Name: ingestTimeseriesProposal, Duration: 0.25 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={async=false, batch.size=1}, capacity=128, totalAddedValues=2}
|
|
c.l.m.entity.EntityServiceImpl:988 - Ingesting aspects batch to database: AspectsBatchImpl{items=[ChangeMCP{changeType=UPSERT, urn=urn:li:dataset:(urn:li:dataPlatform:snowflake,climate.daily_temperature,PROD), aspectName='status', recordTemplate={removed=false}, systemMetadata={lastObserved=1737418391954, version=1, properties={telemetryLog=true, telemetryQueueSpanId=02e058ff616e4c99, telemetryEnqueu...}]}
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: 4754a1c02dadec4c, ParentId: ef383b26f0040fc5, Name: retentionService, Duration: 0.09 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={batch.size=1}, capacity=128, totalAddedValues=1}
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: ef383b26f0040fc5, ParentId: 7ae629151400fc18, Name: ingestAspectsToLocalDB, Duration: 18.64 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={batch.size=1, dwizName=com.linkedin.metadata.entity.EntityServiceImpl.ingestAspectsToLocalDB}, capacity=128, totalAddedValues=2}
|
|
c.l.m.entity.EntityServiceImpl:1900 - Producing MCL for ingested aspect status, urn urn:li:dataset:(urn:li:dataPlatform:snowflake,climate.daily_temperature,PROD)
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: f1a8a1da99f1ae23, ParentId: c5f8b3884060722c, Name: produceMetadataChangeLog, Duration: 0.10 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={messaging.destination_kind=topic, messaging.system=kafka, messaging.destination=MetadataChangeLog_Versioned_v1, messaging.operation=publish, queue.enqueued_at=1737418391982}, capacity=128, totalAddedValues=5}
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: c5f8b3884060722c, ParentId: 7ae629151400fc18, Name: emitMCL, Duration: 14.32 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={batch.size=1}, capacity=128, totalAddedValues=1}
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: 7ae629151400fc18, ParentId: 68df6bc4729dc0a2, Name: ingestProposalSync, Duration: 37.90 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={batch.size=1}, capacity=128, totalAddedValues=1}
|
|
c.l.m.k.MetadataChangeProposalsProcessor:128 - Successfully processed MCP event urn: urn:li:dataset:(urn:li:dataPlatform:snowflake,climate.daily_temperature,PROD)
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: 68df6bc4729dc0a2, ParentId: 02e058ff616e4c99, Name: consume, Duration: 39.11 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={batch.size=1, dwizName=com.linkedin.metadata.kafka.MetadataChangeProposalsProcessor.consume}, capacity=128, totalAddedValues=2}
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: 04dc44653b634df2, ParentId: 02e058ff616e4c99, Name: consume, Duration: 0.03 ms
|
|
```
|
|
|
|
- MAE Consumer
|
|
|
|
```text
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={messaging.destination_kind=topic, queue.duration_ms=22, messaging.system=kafka, messaging.destination=MetadataChangeLog_Versioned_v1, messaging.operation=receive, queue.enqueued_at=1737418391982}, capacity=128, totalAddedValues=6}
|
|
c.l.metadata.kafka.MCLKafkaListener:96 - Invoking MCL hooks for consumer: generic-mae-consumer-job-client urn: urn:li:dataset:(urn:li:dataPlatform:snowflake,climate.daily_temperature,PROD), aspect name: status, entity type: dataset, change type: UPSERT
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: 3c3c055c360dc8e4, ParentId: 1de99215a0e82697, Name: FormAssignmentHook, Duration: 0.06 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={dwizName=com.linkedin.metadata.kafka.MCLKafkaListener.FormAssignmentHook_latency}, capacity=128, totalAddedValues=1}
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: 8e238d0156baacc4, ParentId: 1de99215a0e82697, Name: IngestionSchedulerHook, Duration: 0.05 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={dwizName=com.linkedin.metadata.kafka.MCLKafkaListener.IngestionSchedulerHook_latency}, capacity=128, totalAddedValues=1}
|
|
c.l.m.s.e.update.ESBulkProcessor:85 - Added request id: urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Asnowflake%2Cclimate.daily_temperature%2CPROD%29, operation type: UPDATE, index: datasetindex_v2
|
|
c.l.m.s.e.update.ESBulkProcessor:85 - Added request id: SIHRXj1ktF7qkwPBZO8w0A==, operation type: UPDATE, index: system_metadata_service_v1
|
|
c.l.m.s.e.update.ESBulkProcessor:85 - Added request id: 2p3742l4sFS3wcL82Qh2lQ==, operation type: UPDATE, index: system_metadata_service_v1
|
|
c.l.m.s.e.update.ESBulkProcessor:85 - Added request id: CfZKRLsf25/e3p3mURzlnA==, operation type: UPDATE, index: system_metadata_service_v1
|
|
c.l.m.s.e.update.ESBulkProcessor:85 - Added request id: 8tvhG5ARd5BOdEbqaZkE0g==, operation type: UPDATE, index: system_metadata_service_v1
|
|
c.l.m.s.e.update.ESBulkProcessor:85 - Added request id: rAvQOOBItiKAym622S4dcQ==, operation type: UPDATE, index: system_metadata_service_v1
|
|
c.l.m.s.e.update.ESBulkProcessor:85 - Added request id: YqT6TNy7MAMOAyVXh6abMA==, operation type: UPDATE, index: system_metadata_service_v1
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: 054ac726204b449c, ParentId: 1de99215a0e82697, Name: UpdateIndicesHook, Duration: 47.31 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={dwizName=com.linkedin.metadata.kafka.MCLKafkaListener.UpdateIndicesHook_latency}, capacity=128, totalAddedValues=1}
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: 14d9ded49a94c7b8, ParentId: 1de99215a0e82697, Name: IncidentsSummaryHook, Duration: 0.09 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={dwizName=com.linkedin.metadata.kafka.MCLKafkaListener.IncidentsSummaryHook_latency}, capacity=128, totalAddedValues=1}
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: a92d9e54ade6073b, ParentId: 1de99215a0e82697, Name: EntityChangeEventGeneratorHook, Duration: 9.10 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={dwizName=com.linkedin.metadata.kafka.MCLKafkaListener.EntityChangeEventGeneratorHook_latency}, capacity=128, totalAddedValues=1}
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: c06dc7f131e57fca, ParentId: 1de99215a0e82697, Name: SiblingAssociationHook, Duration: 0.07 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={dwizName=com.linkedin.metadata.kafka.MCLKafkaListener.SiblingAssociationHook_latency}, capacity=128, totalAddedValues=1}
|
|
c.l.metadata.kafka.MCLKafkaListener:139 - Successfully completed MCL hooks for consumer: generic-mae-consumer-job-client urn: urn:li:dataset:(urn:li:dataPlatform:snowflake,climate.daily_temperature,PROD)
|
|
i.d.metadata.context.TraceContext:366 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, SpanId: 1de99215a0e82697, ParentId: 02e058ff616e4c99, Name: consume, Duration: 58.67 ms
|
|
i.d.metadata.context.TraceContext:376 - Trace: 00062c2c3e1403109bbaf3d2e39adcd0, Attributes: AttributesMap{data={batch.size=1, dwizName=com.linkedin.metadata.kafka.MCLKafkaListener.consume}, capacity=128, totalAddedValues=2}
|
|
```
|
|
|
|
## Design Notes
|
|
|
|
For the initial implementation no specific OpenTelemetry infrastructure is required, however existing environment variables
|
|
for OpenTelemetry can continue to be used and will export the new spans if configured.
|
|
|
|
The Trace API implementation does not rely on any additional external systems or infrastructure. Due to this design
|
|
choice, the trace is determined by inspecting the 3 storage systems (Primary Storage (SQL/Cassandra), Elasticsearch/Opensearch,
|
|
Kafka topics) for the `trace id` or related timestamps.
|
|
|
|
The `trace id` is stored in systemMetadata in both SQL and ES. For ES specifically, the presence of the `trace id` in
|
|
the system metadata index is used as a proxy to determine a successful write to ES.
|
|
|
|
The tracing feature will additionally fetch messages from the kafka topics (including the failed MCP topic) for
|
|
more detailed error information. Pending states are derived from offsets of the message vs the current offsets of the
|
|
consumer groups.
|