(feat/ledger) Ledger events (#1728)

* Nick: ledger init

* Update email_notification.ts

* Update tracking.ts

* Nick: removed unused events

* Update email_notification.ts

* Apply suggestions from code review

* Update tracking.ts

* Update tracking.ts

* Update email_notification.ts

* Nick: conc limit ledger

---------

Co-authored-by: Gergő Móricz <mo.geryy@gmail.com>
This commit is contained in:
Nicolas 2025-06-30 12:48:06 -03:00 committed by GitHub
parent 13f012c583
commit b4eedce3e0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 374 additions and 104 deletions

View File

@ -0,0 +1,49 @@
/**
* This file defines the data schemas for events tracked in the ledger system.
* These interfaces represent the structure of the 'data' JSONB column in ledger.tracks.
*/
/**
* Common properties shared across all events
*/
export interface BaseEventData {
user_id?: string; // User ID if available
team_id?: string; // Team ID if available
}
/**
* Concurrent browser limit reached event data
* Tracks when a user reaches the concurrent browser limit
*/
export interface ConcurrentBrowserLimitReachedEventData extends BaseEventData {
team_id: string; // The team ID
}
/**
* Map of event definition slugs to their data types
*/
export interface EventDataMap {
"concurrent-browser-limit-reached": ConcurrentBrowserLimitReachedEventData;
}
/**
* Event definition slugs
*/
export type EventDefinitionSlug = keyof EventDataMap;
/**
* Helper type to extract the data type for a specific event
*/
export type EventData<T extends EventDefinitionSlug> = EventDataMap[T];
/**
* Creates a properly typed event data object with current timestamp
*/
export function createEventData<T extends EventDefinitionSlug>(
eventType: T,
data: EventDataMap[T],
): EventDataMap[T] {
return {
...data,
} as EventDataMap[T];
}

View File

@ -0,0 +1,83 @@
import { createClient, SupabaseClient } from "@supabase/supabase-js";
import { logger } from "../../lib/logger";
import { configDotenv } from "dotenv";
configDotenv();
// SupabaseLedgerService class initializes the Supabase client for the ledger schema
class SupabaseLedgerService {
private client: SupabaseClient<any, "ledger", any> | null = null;
private rrClient: SupabaseClient<any, "ledger", any> | null = null;
constructor() {
const supabaseUrl = process.env.SUPABASE_URL;
const supabaseReplicaUrl = process.env.SUPABASE_REPLICA_URL;
const supabaseServiceToken = process.env.SUPABASE_SERVICE_TOKEN;
const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true";
// Only initialize the Supabase client if both URL and Service Token are provided.
if (!useDbAuthentication) {
// Warn the user that Authentication is disabled by setting the client to null
logger.warn(
"Authentication is disabled. Supabase ledger client will not be initialized.",
);
this.client = null;
} else if (!supabaseUrl || !supabaseServiceToken || !supabaseReplicaUrl) {
logger.error(
"Supabase environment variables aren't configured correctly. Supabase ledger client will not be initialized. Fix ENV configuration or disable DB authentication with USE_DB_AUTHENTICATION env variable",
);
} else {
this.client = createClient(supabaseUrl, supabaseServiceToken, {
global: {
headers: {
"sb-lb-routing-mode": "alpha-all-services",
},
},
db: {
//@ts-ignore
schema: "ledger",
},
});
this.rrClient = createClient(supabaseReplicaUrl, supabaseServiceToken, {
db: {
//@ts-ignore
schema: "ledger",
},
});
}
}
// Provides access to the initialized Supabase client, if available.
getClient(): SupabaseClient<any, "ledger", any> | null {
return this.client;
}
getRRClient(): SupabaseClient<any, "ledger", any> | null {
return this.rrClient;
}
}
const ledgerServ = new SupabaseLedgerService();
// Using a Proxy to handle dynamic access to the Supabase ledger client or service methods.
// This approach ensures that if Supabase is not configured, any attempt to use it will result in a clear error.
export const supabase_ledger_service: SupabaseClient<any, "ledger", any> = new Proxy(
ledgerServ,
{
get: function (target, prop, receiver) {
const client = target.getClient();
// If the Supabase client is not initialized, intercept property access to provide meaningful error feedback.
if (client === null) {
return () => {
throw new Error("Supabase ledger client is not configured.");
};
}
// Direct access to SupabaseLedgerService properties takes precedence.
if (prop in target) {
return Reflect.get(target, prop, receiver);
}
// Otherwise, delegate access to the Supabase client.
return Reflect.get(client, prop, receiver);
},
},
) as unknown as SupabaseClient<any, "ledger", any>;

View File

@ -0,0 +1,75 @@
import { EventDataMap, EventDefinitionSlug } from "./data-schemas";
// @ts-nocheck - Schema not in types_db yet
import { supabase_ledger_service } from "./supabase-ledger";
import { getValue, setValue } from "../redis";
import { logger } from "../../lib/logger";
/**
* Track an event in the ledger system
* @param definitionSlug The provider definition slug
* @param data Additional data to store with the track
* @returns The tracked event ID or null if tracking failed
*/
export async function trackEvent<T extends EventDefinitionSlug>(
definitionSlug: T,
data: EventDataMap[T],
): Promise<string | null> {
try {
// Get the provider definition ID from cache or database
const cacheKey = `provider_definition_${definitionSlug}_`;
let providerDefinition: any = null;
let definitionError: any = null;
// Try to get from Redis cache first
const cachedData = await getValue(cacheKey);
if (cachedData) {
providerDefinition = JSON.parse(cachedData);
} else {
// If not in cache, fetch from database
const result = await supabase_ledger_service
.from("provider_definitions")
.select("id")
.eq("slug", definitionSlug)
.single();
definitionError = result.error;
providerDefinition = result.data;
// Cache the result for 24 hours (1440 minutes)
if (!definitionError && providerDefinition) {
await setValue(cacheKey, JSON.stringify(providerDefinition), 600 * 60 * 24);
}
}
if (definitionError || !providerDefinition) {
logger.error("Error finding provider definition:", definitionError);
return null;
}
// Create the track
const { data: track, error: trackError } = await supabase_ledger_service
.from("tracks")
//@ts-ignore
.insert({
created_at: new Date().toISOString(),
//@ts-ignore
provider_definition_id: providerDefinition.id,
data: data,
})
.select("uuid")
.single();
if (trackError || !track) {
logger.error("Error creating track:", trackError);
return null;
}
//@ts-ignore
return track.uuid;
} catch (error) {
logger.error("Error tracking event:", error);
return null;
}
}
// data schemas?
// everything that sends an email, move to tracks

View File

@ -8,6 +8,7 @@ import { getNotificationString } from "./notification_string";
import { AuthCreditUsageChunk } from "../../controllers/v1/types";
import { redlock } from "../redlock";
import { redisEvictConnection } from "../redis";
import { trackEvent } from "../ledger/tracking";
const emailTemplates: Record<
NotificationType,
@ -51,14 +52,17 @@ const emailTemplates: Record<
};
// Map notification types to email categories
const notificationToEmailCategory: Record<NotificationType, 'rate_limit_warnings' | 'system_alerts'> = {
[NotificationType.APPROACHING_LIMIT]: 'system_alerts',
[NotificationType.LIMIT_REACHED]: 'system_alerts',
[NotificationType.RATE_LIMIT_REACHED]: 'rate_limit_warnings',
[NotificationType.AUTO_RECHARGE_SUCCESS]: 'system_alerts',
[NotificationType.AUTO_RECHARGE_FAILED]: 'system_alerts',
[NotificationType.CONCURRENCY_LIMIT_REACHED]: 'rate_limit_warnings',
[NotificationType.AUTO_RECHARGE_FREQUENT]: 'system_alerts',
const notificationToEmailCategory: Record<
NotificationType,
"rate_limit_warnings" | "system_alerts"
> = {
[NotificationType.APPROACHING_LIMIT]: "system_alerts",
[NotificationType.LIMIT_REACHED]: "system_alerts",
[NotificationType.RATE_LIMIT_REACHED]: "rate_limit_warnings",
[NotificationType.AUTO_RECHARGE_SUCCESS]: "system_alerts",
[NotificationType.AUTO_RECHARGE_FAILED]: "system_alerts",
[NotificationType.CONCURRENCY_LIMIT_REACHED]: "rate_limit_warnings",
[NotificationType.AUTO_RECHARGE_FREQUENT]: "system_alerts",
};
export async function sendNotification(
@ -68,6 +72,7 @@ export async function sendNotification(
endDateString: string | null,
chunk: AuthCreditUsageChunk,
bypassRecentChecks: boolean = false,
is_ledger_enabled: boolean = false,
) {
return withAuth(sendNotificationInternal, undefined)(
team_id,
@ -76,6 +81,7 @@ export async function sendNotification(
endDateString,
chunk,
bypassRecentChecks,
is_ledger_enabled,
);
}
@ -112,7 +118,9 @@ async function sendEmailNotification(
// If user has unsubscribed from all emails or we can't find their preferences, don't send
if (!preferences || preferences.unsubscribed_all) {
logger.debug(`User ${email} has unsubscribed from all emails or preferences not found`);
logger.debug(
`User ${email} has unsubscribed from all emails or preferences not found`,
);
return { success: true }; // Return success since this is an expected case
}
@ -125,7 +133,9 @@ async function sendEmailNotification(
Array.isArray(preferences.email_preferences) &&
!preferences.email_preferences.includes(emailCategory)
) {
logger.debug(`User ${email} has unsubscribed from ${emailCategory} emails`);
logger.debug(
`User ${email} has unsubscribed from ${emailCategory} emails`,
);
return { success: true }; // Return success since this is an expected case
}
@ -149,6 +159,19 @@ async function sendEmailNotification(
}
}
async function sendLedgerEvent(
team_id: string,
notificationType: NotificationType,
) {
if (notificationType === NotificationType.CONCURRENCY_LIMIT_REACHED) {
trackEvent("concurrent-browser-limit-reached", {
team_id,
}).catch((error) => {
logger.warn("Error tracking event", { module: "email_notification", method: "sendLedgerEvent", error });
});
}
}
async function sendNotificationInternal(
team_id: string,
notificationType: NotificationType,
@ -156,6 +179,7 @@ async function sendNotificationInternal(
endDateString: string | null,
chunk: AuthCreditUsageChunk,
bypassRecentChecks: boolean = false,
is_ledger_enabled: boolean = false,
): Promise<{ success: boolean }> {
if (team_id === "preview" || team_id.startsWith("preview_")) {
return { success: true };
@ -209,6 +233,12 @@ async function sendNotificationInternal(
console.log(
`Sending notification for team_id: ${team_id} and notificationType: ${notificationType}`,
);
if (is_ledger_enabled) {
sendLedgerEvent(team_id, notificationType).catch((error) => {
logger.warn("Error sending ledger event", { module: "email_notification", method: "sendEmail", error });
});
}
// get the emails from the user with the team_id
const { data: emails, error: emailsError } = await supabase_service
.from("users")
@ -220,8 +250,10 @@ async function sendNotificationInternal(
return { success: false };
}
for (const email of emails) {
await sendEmailNotification(email.email, notificationType);
if (!is_ledger_enabled) {
for (const email of emails) {
await sendEmailNotification(email.email, notificationType);
}
}
const { error: insertError } = await supabase_service
@ -255,103 +287,134 @@ async function sendNotificationInternal(
);
}
export async function sendNotificationWithCustomDays(
team_id: string,
notificationType: NotificationType,
daysBetweenEmails: number,
bypassRecentChecks: boolean = false,
is_ledger_enabled: boolean = false,
) {
return withAuth(async (
team_id: string,
notificationType: NotificationType,
daysBetweenEmails: number,
bypassRecentChecks: boolean,
) => {
const redisKey = "notification_sent:" + notificationType + ":" + team_id;
return withAuth(
async (
team_id: string,
notificationType: NotificationType,
daysBetweenEmails: number,
bypassRecentChecks: boolean,
is_ledger_enabled: boolean,
) => {
const redisKey = "notification_sent:" + notificationType + ":" + team_id;
const didSendRecentNotification = (await redisEvictConnection.get(redisKey)) !== null;
const didSendRecentNotification =
(await redisEvictConnection.get(redisKey)) !== null;
if (didSendRecentNotification && !bypassRecentChecks) {
logger.debug(
`Notification already sent within the last ${daysBetweenEmails} days for team_id: ${team_id} and notificationType: ${notificationType}`,
);
return { success: true };
}
await redisEvictConnection.set(
redisKey,
"1",
"EX",
daysBetweenEmails * 24 * 60 * 60,
);
const now = new Date();
const pastDate = new Date(
now.getTime() - daysBetweenEmails * 24 * 60 * 60 * 1000,
);
const { data: recentNotifications, error: recentNotificationsError } =
await supabase_service
.from("user_notifications")
.select("*")
.eq("team_id", team_id)
.eq("notification_type", notificationType)
.gte("sent_date", pastDate.toISOString());
if (recentNotificationsError) {
logger.debug(
`Error fetching recent notifications: ${recentNotificationsError}`,
);
await redisEvictConnection.del(redisKey); // free up redis, let it try again
return { success: false };
}
if (recentNotifications.length > 0 && !bypassRecentChecks) {
logger.debug(
`Notification already sent within the last ${daysBetweenEmails} days for team_id: ${team_id} and notificationType: ${notificationType}`,
);
await redisEvictConnection.set(
redisKey,
"1",
"EX",
daysBetweenEmails * 24 * 60 * 60,
);
return { success: true };
}
logger.info(
`Sending notification for team_id: ${team_id} and notificationType: ${notificationType}`,
);
// get the emails from the user with the team_id
const { data: emails, error: emailsError } = await supabase_service
.from("users")
.select("email")
.eq("team_id", team_id);
if (emailsError) {
logger.debug(`Error fetching emails: ${emailsError}`);
await redisEvictConnection.del(redisKey); // free up redis, let it try again
return { success: false };
}
if (is_ledger_enabled) {
sendLedgerEvent(team_id, notificationType).catch((error) => {
logger.warn("Error sending ledger event", { module: "email_notification", method: "sendEmail", error });
});
}
if (!is_ledger_enabled) {
for (const email of emails) {
await sendEmailNotification(email.email, notificationType);
}
}
const { error: insertError } = await supabase_service
.from("user_notifications")
.insert([
{
team_id: team_id,
notification_type: notificationType,
sent_date: new Date().toISOString(),
timestamp: new Date().toISOString(),
},
]);
if (
process.env.SLACK_ADMIN_WEBHOOK_URL &&
emails.length > 0 &&
notificationType !== NotificationType.CONCURRENCY_LIMIT_REACHED
) {
sendSlackWebhook(
`${getNotificationString(notificationType)}: Team ${team_id}, with email ${emails[0].email}.`,
false,
process.env.SLACK_ADMIN_WEBHOOK_URL,
).catch((error) => {
logger.debug(`Error sending slack notification: ${error}`);
});
}
if (insertError) {
logger.debug(`Error inserting notification record: ${insertError}`);
await redisEvictConnection.del(redisKey); // free up redis, let it try again
return { success: false };
}
if (didSendRecentNotification && !bypassRecentChecks) {
logger.debug(`Notification already sent within the last ${daysBetweenEmails} days for team_id: ${team_id} and notificationType: ${notificationType}`);
return { success: true };
}
await redisEvictConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60);
const now = new Date();
const pastDate = new Date(now.getTime() - daysBetweenEmails * 24 * 60 * 60 * 1000);
const { data: recentNotifications, error: recentNotificationsError } = await supabase_service
.from("user_notifications")
.select("*")
.eq("team_id", team_id)
.eq("notification_type", notificationType)
.gte("sent_date", pastDate.toISOString());
if (recentNotificationsError) {
logger.debug(`Error fetching recent notifications: ${recentNotificationsError}`);
await redisEvictConnection.del(redisKey); // free up redis, let it try again
return { success: false };
}
if (recentNotifications.length > 0 && !bypassRecentChecks) {
logger.debug(`Notification already sent within the last ${daysBetweenEmails} days for team_id: ${team_id} and notificationType: ${notificationType}`);
await redisEvictConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60);
return { success: true };
}
console.log(
`Sending notification for team_id: ${team_id} and notificationType: ${notificationType}`,
);
// get the emails from the user with the team_id
const { data: emails, error: emailsError } = await supabase_service
.from("users")
.select("email")
.eq("team_id", team_id);
if (emailsError) {
logger.debug(`Error fetching emails: ${emailsError}`);
await redisEvictConnection.del(redisKey); // free up redis, let it try again
return { success: false };
}
for (const email of emails) {
await sendEmailNotification(email.email, notificationType);
}
const { error: insertError } = await supabase_service
.from("user_notifications")
.insert([
{
team_id: team_id,
notification_type: notificationType,
sent_date: new Date().toISOString(),
timestamp: new Date().toISOString(),
},
]);
if (process.env.SLACK_ADMIN_WEBHOOK_URL && emails.length > 0 && notificationType !== NotificationType.CONCURRENCY_LIMIT_REACHED) {
sendSlackWebhook(
`${getNotificationString(notificationType)}: Team ${team_id}, with email ${emails[0].email}.`,
false,
process.env.SLACK_ADMIN_WEBHOOK_URL,
).catch((error) => {
logger.debug(`Error sending slack notification: ${error}`);
});
}
if (insertError) {
logger.debug(`Error inserting notification record: ${insertError}`);
await redisEvictConnection.del(redisKey); // free up redis, let it try again
return { success: false };
}
return { success: true };
}, undefined)(
team_id,
notificationType,
daysBetweenEmails,
bypassRecentChecks,
);
},
undefined,
)(team_id, notificationType, daysBetweenEmails, bypassRecentChecks, is_ledger_enabled );
}

View File

@ -127,7 +127,7 @@ async function addScrapeJobRaw(
// Only send notification if it's not a crawl or batch scrape
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(webScraperOptions.team_id);
if (shouldSendNotification) {
sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false, true).catch((error) => {
logger.error("Error sending notification (concurrency limit reached)", { error });
});
}
@ -309,7 +309,7 @@ export async function addScrapeJobs(
if (!isCrawlOrBatchScrape(jobs[0].data)) {
const shouldSendNotification = await shouldSendConcurrencyLimitNotification(jobs[0].data.team_id);
if (shouldSendNotification) {
sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => {
sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false, true).catch((error) => {
logger.error("Error sending notification (concurrency limit reached)", { error });
});
}