Skip to content

Commit 4fe444c

Browse files
committed
feat: update DSM context passing
1 parent f54475d commit 4fe444c

4 files changed

Lines changed: 40 additions & 20 deletions

File tree

src/pricing-service/src/observability/observability.ts

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ import * as os from "os";
55

66
const textEncoder = new TextEncoder();
77
const logger = new Logger({});
8-
const cpuCount = os.cpus().length;
8+
9+
export const DSM_PROPAGATION_KEY_BASE_64 = "dd-pathway-ctx-base64";
910

1011
export enum MessagingType {
1112
PUBLIC,
@@ -23,13 +24,21 @@ export interface SemanticConventions {
2324

2425
export function startProcessSpanWithSemanticConventions(
2526
evt: CloudEvent<any>,
26-
conventions: SemanticConventions
27+
conventions: SemanticConventions,
2728
): Span {
2829
const messageProcessingSpan = tracer.startSpan(`process ${evt.type}`, {
2930
childOf: conventions.parentSpan ?? undefined,
3031
});
31-
const headers = {};
32-
tracer.dataStreamsCheckpointer.setConsumeCheckpoint("eventbridge", evt.type, headers);
32+
33+
// Extract the DSM-specific header from the event data to set the consume checkpoint, enabling Datadog's Data Streams Monitoring to track this message consumption
34+
const dsmHeader = (evt.data as Record<string, unknown>)[
35+
DSM_PROPAGATION_KEY_BASE_64
36+
];
37+
tracer.dataStreamsCheckpointer.setConsumeCheckpoint(
38+
"eventbridge",
39+
evt.type,
40+
dsmHeader,
41+
);
3342

3443
try {
3544
messageProcessingSpan.addTags({
@@ -49,7 +58,7 @@ export function startProcessSpanWithSemanticConventions(
4958
"messaging.message.envelope.size": textEncoder.encode(JSON.stringify(evt))
5059
.length,
5160
"messaging.message.body.size": textEncoder.encode(
52-
JSON.stringify(evt.data)
61+
JSON.stringify(evt.data),
5362
).length,
5463
});
5564

@@ -73,18 +82,19 @@ export function startProcessSpanWithSemanticConventions(
7382

7483
export function startPublishSpanWithSemanticConventions(
7584
evt: CloudEvent<any>,
76-
conventions: SemanticConventions
77-
): Span {
85+
conventions: SemanticConventions,
86+
): { span: Span; carrier: Record<string, string> } {
7887
const messagingSpan = tracer.startSpan(`publish ${evt.type}`, {
7988
childOf: conventions.parentSpan ?? undefined,
8089
});
8190

91+
const carrier = {};
92+
8293
try {
83-
const headers = {};
8494
tracer.dataStreamsCheckpointer.setProduceCheckpoint(
8595
"eventbridge",
8696
evt.type,
87-
headers,
97+
carrier,
8898
);
8999

90100
messagingSpan.addTags({
@@ -102,7 +112,7 @@ export function startPublishSpanWithSemanticConventions(
102112
"messaging.message.envelope.size": textEncoder.encode(JSON.stringify(evt))
103113
.length,
104114
"messaging.message.body.size": textEncoder.encode(
105-
JSON.stringify(evt.data)
115+
JSON.stringify(evt.data),
106116
).length,
107117
"messaging.operation.name": "send",
108118
"messaging.message.conversation_id": conventions.conversationId ?? "",
@@ -116,7 +126,7 @@ export function startPublishSpanWithSemanticConventions(
116126
logger.error(JSON.stringify(e));
117127
}
118128

119-
return messagingSpan;
129+
return { span: messagingSpan, carrier };
120130
}
121131

122132
export function addDefaultServiceTagsTo(span: Span | undefined | null) {

src/pricing-service/src/pricing-api/adapters/eventBridgeEventPublisher.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { Span, tracer } from "dd-trace";
1414
import { CloudEvent } from "cloudevents";
1515
import { Logger } from "@aws-lambda-powertools/logger";
1616
import {
17+
DSM_PROPAGATION_KEY_BASE_64,
1718
MessagingType,
1819
startPublishSpanWithSemanticConventions,
1920
} from "../../observability/observability";
@@ -33,7 +34,7 @@ export class EventBridgeEventPublisher implements EventPublisher {
3334

3435
async publishPriceCalculatedEvent(
3536
evt: PriceCalculatedEventV1,
36-
linkedTraceparent?: string
37+
linkedTraceparent?: string,
3738
): Promise<boolean> {
3839
const parentSpan = tracer.scope().active();
3940

@@ -48,21 +49,27 @@ export class EventBridgeEventPublisher implements EventPublisher {
4849
traceparent: parentSpan?.context().toTraceparent(),
4950
});
5051

51-
messagingSpan = startPublishSpanWithSemanticConventions(
52+
const publishSpan = startPublishSpanWithSemanticConventions(
5253
cloudEventWrapper,
5354
{
5455
publicOrPrivate: MessagingType.PUBLIC,
5556
messagingSystem: "eventbridge",
5657
destinationName: process.env.EVENT_BUS_NAME ?? "",
5758
parentSpan: parentSpan,
5859
linkedTraceparent,
59-
}
60+
},
6061
);
62+
messagingSpan = publishSpan.span;
6163

64+
// Ensure the DSM context is injected
6265
const evtEntries: PutEventsRequestEntry[] = [
6366
{
6467
EventBusName: process.env.EVENT_BUS_NAME,
65-
Detail: JSON.stringify(cloudEventWrapper),
68+
Detail: JSON.stringify({
69+
...cloudEventWrapper,
70+
DSM_PROPAGATION_KEY_BASE_64:
71+
publishSpan.carrier[DSM_PROPAGATION_KEY_BASE_64],
72+
}),
6673
DetailType: "pricing.pricingCalculated.v1",
6774
Source: `${process.env.ENV}.pricing`,
6875
},
@@ -71,12 +78,14 @@ export class EventBridgeEventPublisher implements EventPublisher {
7178
await this.client.send(
7279
new PutEventsCommand({
7380
Entries: evtEntries,
74-
})
81+
}),
7582
);
7683

7784
messagingSpan?.finish();
7885
} catch (error: unknown) {
79-
this.logger.error(error instanceof Error ? error.message : JSON.stringify(error));
86+
this.logger.error(
87+
error instanceof Error ? error.message : JSON.stringify(error),
88+
);
8089
if (error instanceof Error) {
8190
const e = error as Error;
8291
const stack = e.stack!.split("\n").slice(1, 4).join("\n");

src/product-management-service/src/observability/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type CloudEvent[T any] struct {
2929
Time string `json:"time"`
3030
TraceParent string `json:"traceparent"`
3131
Datadog map[string]string `json:"_datadog,omitempty"`
32+
DsmContext string `json:"dd-pathway-ctx-base64,omitempty"` // for backward compatibility with older services using the _datadog field as a simple string carrier
3233
}
3334

3435
func NewCloudEvent[T any](ctx context.Context, evtType string, data T) CloudEvent[T] {

src/product-management-service/src/product-acl/pricing-changed-handler/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,10 @@ func processMessage(ctx context.Context, record events.SQSMessage) error {
132132

133133
// Extract DSM context using the incoming ctx so the checkpoint is linked to
134134
// the current trace, not an orphaned background context.
135-
_, _ = tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(ctx, &evt), options.CheckpointParams{
135+
dsm_ctx, _ := tracer.SetDataStreamsCheckpointWithParams(datastreams.ExtractFromBase64Carrier(ctx, &evt), options.CheckpointParams{
136136
ServiceOverride: "productservice-acl",
137137
}, "direction:in", productcore.ExternalPubSubName, "topic:"+evt.Type, "manual_checkpoint:true")
138-
processSpan, _ := tracer.StartSpanFromContext(ctx, fmt.Sprintf("process %s", evt.Type), tracer.WithSpanLinks(spanLinks))
138+
processSpan, _ := tracer.StartSpanFromContext(dsm_ctx, fmt.Sprintf("process %s", evt.Type), tracer.WithSpanLinks(spanLinks))
139139
defer processSpan.Finish()
140140

141141
processSpan.SetTag("product.id", evt.Data.ProductId)
@@ -146,7 +146,7 @@ func processMessage(ctx context.Context, record events.SQSMessage) error {
146146
processSpan.SetTag("messaging.operation.type", "process")
147147
processSpan.SetTag("messaging.system", "aws_sqs")
148148

149-
_, err := eventTranslator.HandleProductPricingChanged(ctx, evt.Data)
149+
_, err := eventTranslator.HandleProductPricingChanged(dsm_ctx, evt.Data)
150150

151151
if err != nil {
152152
processSpan.SetTag("error", true)

0 commit comments

Comments
 (0)