From b4eedce3e03a2fa63d7ba87c17d953b643a786db Mon Sep 17 00:00:00 2001 From: Nicolas Date: Mon, 30 Jun 2025 12:48:06 -0300 Subject: [PATCH] (feat/ledger) Ledger events (#1728) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Nick: ledger init * Update email_notification.ts * Update tracking.ts * Nick: removed unused events * Update email_notification.ts * Apply suggestions from code review * Update tracking.ts * Update tracking.ts * Update email_notification.ts * Nick: conc limit ledger --------- Co-authored-by: Gergő Móricz --- apps/api/src/services/ledger/data-schemas.ts | 49 ++++ .../src/services/ledger/supabase-ledger.ts | 83 ++++++ apps/api/src/services/ledger/tracking.ts | 75 +++++ .../notification/email_notification.ts | 267 +++++++++++------- apps/api/src/services/queue-jobs.ts | 4 +- 5 files changed, 374 insertions(+), 104 deletions(-) create mode 100644 apps/api/src/services/ledger/data-schemas.ts create mode 100644 apps/api/src/services/ledger/supabase-ledger.ts create mode 100644 apps/api/src/services/ledger/tracking.ts diff --git a/apps/api/src/services/ledger/data-schemas.ts b/apps/api/src/services/ledger/data-schemas.ts new file mode 100644 index 000000000..bda764d52 --- /dev/null +++ b/apps/api/src/services/ledger/data-schemas.ts @@ -0,0 +1,49 @@ +/** + * This file defines the data schemas for events tracked in the ledger system. + * These interfaces represent the structure of the 'data' JSONB column in ledger.tracks. + */ + +/** + * Common properties shared across all events + */ +export interface BaseEventData { + user_id?: string; // User ID if available + team_id?: string; // Team ID if available + } + + /** + * Concurrent browser limit reached event data + * Tracks when a user reaches the concurrent browser limit + */ + export interface ConcurrentBrowserLimitReachedEventData extends BaseEventData { + team_id: string; // The team ID + } + /** + * Map of event definition slugs to their data types + */ + export interface EventDataMap { + "concurrent-browser-limit-reached": ConcurrentBrowserLimitReachedEventData; + } + + /** + * Event definition slugs + */ + export type EventDefinitionSlug = keyof EventDataMap; + + /** + * Helper type to extract the data type for a specific event + */ + export type EventData = EventDataMap[T]; + + /** + * Creates a properly typed event data object with current timestamp + */ + export function createEventData( + eventType: T, + data: EventDataMap[T], + ): EventDataMap[T] { + return { + ...data, + } as EventDataMap[T]; + } + \ No newline at end of file diff --git a/apps/api/src/services/ledger/supabase-ledger.ts b/apps/api/src/services/ledger/supabase-ledger.ts new file mode 100644 index 000000000..543bcdc68 --- /dev/null +++ b/apps/api/src/services/ledger/supabase-ledger.ts @@ -0,0 +1,83 @@ +import { createClient, SupabaseClient } from "@supabase/supabase-js"; +import { logger } from "../../lib/logger"; +import { configDotenv } from "dotenv"; +configDotenv(); + +// SupabaseLedgerService class initializes the Supabase client for the ledger schema +class SupabaseLedgerService { + private client: SupabaseClient | null = null; + private rrClient: SupabaseClient | null = null; + + constructor() { + const supabaseUrl = process.env.SUPABASE_URL; + const supabaseReplicaUrl = process.env.SUPABASE_REPLICA_URL; + const supabaseServiceToken = process.env.SUPABASE_SERVICE_TOKEN; + const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true"; + + // Only initialize the Supabase client if both URL and Service Token are provided. + if (!useDbAuthentication) { + // Warn the user that Authentication is disabled by setting the client to null + logger.warn( + "Authentication is disabled. Supabase ledger client will not be initialized.", + ); + this.client = null; + } else if (!supabaseUrl || !supabaseServiceToken || !supabaseReplicaUrl) { + logger.error( + "Supabase environment variables aren't configured correctly. Supabase ledger client will not be initialized. Fix ENV configuration or disable DB authentication with USE_DB_AUTHENTICATION env variable", + ); + } else { + this.client = createClient(supabaseUrl, supabaseServiceToken, { + global: { + headers: { + "sb-lb-routing-mode": "alpha-all-services", + }, + }, + db: { + //@ts-ignore + schema: "ledger", + }, + }); + + this.rrClient = createClient(supabaseReplicaUrl, supabaseServiceToken, { + db: { + //@ts-ignore + schema: "ledger", + }, + }); + } + } + + // Provides access to the initialized Supabase client, if available. + getClient(): SupabaseClient | null { + return this.client; + } + + getRRClient(): SupabaseClient | null { + return this.rrClient; + } +} + +const ledgerServ = new SupabaseLedgerService(); + +// Using a Proxy to handle dynamic access to the Supabase ledger client or service methods. +// This approach ensures that if Supabase is not configured, any attempt to use it will result in a clear error. +export const supabase_ledger_service: SupabaseClient = new Proxy( + ledgerServ, + { + get: function (target, prop, receiver) { + const client = target.getClient(); + // If the Supabase client is not initialized, intercept property access to provide meaningful error feedback. + if (client === null) { + return () => { + throw new Error("Supabase ledger client is not configured."); + }; + } + // Direct access to SupabaseLedgerService properties takes precedence. + if (prop in target) { + return Reflect.get(target, prop, receiver); + } + // Otherwise, delegate access to the Supabase client. + return Reflect.get(client, prop, receiver); + }, + }, +) as unknown as SupabaseClient; \ No newline at end of file diff --git a/apps/api/src/services/ledger/tracking.ts b/apps/api/src/services/ledger/tracking.ts new file mode 100644 index 000000000..6ce9c6c48 --- /dev/null +++ b/apps/api/src/services/ledger/tracking.ts @@ -0,0 +1,75 @@ +import { EventDataMap, EventDefinitionSlug } from "./data-schemas"; +// @ts-nocheck - Schema not in types_db yet +import { supabase_ledger_service } from "./supabase-ledger"; +import { getValue, setValue } from "../redis"; +import { logger } from "../../lib/logger"; + +/** + * Track an event in the ledger system + * @param definitionSlug The provider definition slug + * @param data Additional data to store with the track + * @returns The tracked event ID or null if tracking failed + */ +export async function trackEvent( + definitionSlug: T, + data: EventDataMap[T], +): Promise { + try { + // Get the provider definition ID from cache or database + const cacheKey = `provider_definition_${definitionSlug}_`; + let providerDefinition: any = null; + let definitionError: any = null; + + // Try to get from Redis cache first + const cachedData = await getValue(cacheKey); + if (cachedData) { + providerDefinition = JSON.parse(cachedData); + } else { + // If not in cache, fetch from database + const result = await supabase_ledger_service + .from("provider_definitions") + .select("id") + .eq("slug", definitionSlug) + .single(); + + definitionError = result.error; + providerDefinition = result.data; + + // Cache the result for 24 hours (1440 minutes) + if (!definitionError && providerDefinition) { + await setValue(cacheKey, JSON.stringify(providerDefinition), 600 * 60 * 24); + } + } + + if (definitionError || !providerDefinition) { + logger.error("Error finding provider definition:", definitionError); + return null; + } + + // Create the track + const { data: track, error: trackError } = await supabase_ledger_service + .from("tracks") + //@ts-ignore + .insert({ + created_at: new Date().toISOString(), + //@ts-ignore + provider_definition_id: providerDefinition.id, + data: data, + }) + .select("uuid") + .single(); + + if (trackError || !track) { + logger.error("Error creating track:", trackError); + return null; + } + //@ts-ignore + return track.uuid; + } catch (error) { + logger.error("Error tracking event:", error); + return null; + } +} + +// data schemas? +// everything that sends an email, move to tracks diff --git a/apps/api/src/services/notification/email_notification.ts b/apps/api/src/services/notification/email_notification.ts index 33b46a4d7..dabb9463d 100644 --- a/apps/api/src/services/notification/email_notification.ts +++ b/apps/api/src/services/notification/email_notification.ts @@ -8,6 +8,7 @@ import { getNotificationString } from "./notification_string"; import { AuthCreditUsageChunk } from "../../controllers/v1/types"; import { redlock } from "../redlock"; import { redisEvictConnection } from "../redis"; +import { trackEvent } from "../ledger/tracking"; const emailTemplates: Record< NotificationType, @@ -51,14 +52,17 @@ const emailTemplates: Record< }; // Map notification types to email categories -const notificationToEmailCategory: Record = { - [NotificationType.APPROACHING_LIMIT]: 'system_alerts', - [NotificationType.LIMIT_REACHED]: 'system_alerts', - [NotificationType.RATE_LIMIT_REACHED]: 'rate_limit_warnings', - [NotificationType.AUTO_RECHARGE_SUCCESS]: 'system_alerts', - [NotificationType.AUTO_RECHARGE_FAILED]: 'system_alerts', - [NotificationType.CONCURRENCY_LIMIT_REACHED]: 'rate_limit_warnings', - [NotificationType.AUTO_RECHARGE_FREQUENT]: 'system_alerts', +const notificationToEmailCategory: Record< + NotificationType, + "rate_limit_warnings" | "system_alerts" +> = { + [NotificationType.APPROACHING_LIMIT]: "system_alerts", + [NotificationType.LIMIT_REACHED]: "system_alerts", + [NotificationType.RATE_LIMIT_REACHED]: "rate_limit_warnings", + [NotificationType.AUTO_RECHARGE_SUCCESS]: "system_alerts", + [NotificationType.AUTO_RECHARGE_FAILED]: "system_alerts", + [NotificationType.CONCURRENCY_LIMIT_REACHED]: "rate_limit_warnings", + [NotificationType.AUTO_RECHARGE_FREQUENT]: "system_alerts", }; export async function sendNotification( @@ -68,6 +72,7 @@ export async function sendNotification( endDateString: string | null, chunk: AuthCreditUsageChunk, bypassRecentChecks: boolean = false, + is_ledger_enabled: boolean = false, ) { return withAuth(sendNotificationInternal, undefined)( team_id, @@ -76,6 +81,7 @@ export async function sendNotification( endDateString, chunk, bypassRecentChecks, + is_ledger_enabled, ); } @@ -112,7 +118,9 @@ async function sendEmailNotification( // If user has unsubscribed from all emails or we can't find their preferences, don't send if (!preferences || preferences.unsubscribed_all) { - logger.debug(`User ${email} has unsubscribed from all emails or preferences not found`); + logger.debug( + `User ${email} has unsubscribed from all emails or preferences not found`, + ); return { success: true }; // Return success since this is an expected case } @@ -125,7 +133,9 @@ async function sendEmailNotification( Array.isArray(preferences.email_preferences) && !preferences.email_preferences.includes(emailCategory) ) { - logger.debug(`User ${email} has unsubscribed from ${emailCategory} emails`); + logger.debug( + `User ${email} has unsubscribed from ${emailCategory} emails`, + ); return { success: true }; // Return success since this is an expected case } @@ -149,6 +159,19 @@ async function sendEmailNotification( } } +async function sendLedgerEvent( + team_id: string, + notificationType: NotificationType, +) { + if (notificationType === NotificationType.CONCURRENCY_LIMIT_REACHED) { + trackEvent("concurrent-browser-limit-reached", { + team_id, + }).catch((error) => { + logger.warn("Error tracking event", { module: "email_notification", method: "sendLedgerEvent", error }); + }); + } +} + async function sendNotificationInternal( team_id: string, notificationType: NotificationType, @@ -156,6 +179,7 @@ async function sendNotificationInternal( endDateString: string | null, chunk: AuthCreditUsageChunk, bypassRecentChecks: boolean = false, + is_ledger_enabled: boolean = false, ): Promise<{ success: boolean }> { if (team_id === "preview" || team_id.startsWith("preview_")) { return { success: true }; @@ -209,6 +233,12 @@ async function sendNotificationInternal( console.log( `Sending notification for team_id: ${team_id} and notificationType: ${notificationType}`, ); + + if (is_ledger_enabled) { + sendLedgerEvent(team_id, notificationType).catch((error) => { + logger.warn("Error sending ledger event", { module: "email_notification", method: "sendEmail", error }); + }); + } // get the emails from the user with the team_id const { data: emails, error: emailsError } = await supabase_service .from("users") @@ -220,8 +250,10 @@ async function sendNotificationInternal( return { success: false }; } - for (const email of emails) { - await sendEmailNotification(email.email, notificationType); + if (!is_ledger_enabled) { + for (const email of emails) { + await sendEmailNotification(email.email, notificationType); + } } const { error: insertError } = await supabase_service @@ -255,103 +287,134 @@ async function sendNotificationInternal( ); } - export async function sendNotificationWithCustomDays( team_id: string, notificationType: NotificationType, daysBetweenEmails: number, bypassRecentChecks: boolean = false, + is_ledger_enabled: boolean = false, ) { - return withAuth(async ( - team_id: string, - notificationType: NotificationType, - daysBetweenEmails: number, - bypassRecentChecks: boolean, - ) => { - const redisKey = "notification_sent:" + notificationType + ":" + team_id; + return withAuth( + async ( + team_id: string, + notificationType: NotificationType, + daysBetweenEmails: number, + bypassRecentChecks: boolean, + is_ledger_enabled: boolean, + ) => { + const redisKey = "notification_sent:" + notificationType + ":" + team_id; - const didSendRecentNotification = (await redisEvictConnection.get(redisKey)) !== null; + const didSendRecentNotification = + (await redisEvictConnection.get(redisKey)) !== null; + + if (didSendRecentNotification && !bypassRecentChecks) { + logger.debug( + `Notification already sent within the last ${daysBetweenEmails} days for team_id: ${team_id} and notificationType: ${notificationType}`, + ); + return { success: true }; + } + + await redisEvictConnection.set( + redisKey, + "1", + "EX", + daysBetweenEmails * 24 * 60 * 60, + ); + + const now = new Date(); + const pastDate = new Date( + now.getTime() - daysBetweenEmails * 24 * 60 * 60 * 1000, + ); + + const { data: recentNotifications, error: recentNotificationsError } = + await supabase_service + .from("user_notifications") + .select("*") + .eq("team_id", team_id) + .eq("notification_type", notificationType) + .gte("sent_date", pastDate.toISOString()); + + if (recentNotificationsError) { + logger.debug( + `Error fetching recent notifications: ${recentNotificationsError}`, + ); + await redisEvictConnection.del(redisKey); // free up redis, let it try again + return { success: false }; + } + + if (recentNotifications.length > 0 && !bypassRecentChecks) { + logger.debug( + `Notification already sent within the last ${daysBetweenEmails} days for team_id: ${team_id} and notificationType: ${notificationType}`, + ); + await redisEvictConnection.set( + redisKey, + "1", + "EX", + daysBetweenEmails * 24 * 60 * 60, + ); + return { success: true }; + } + + logger.info( + `Sending notification for team_id: ${team_id} and notificationType: ${notificationType}`, + ); + // get the emails from the user with the team_id + const { data: emails, error: emailsError } = await supabase_service + .from("users") + .select("email") + .eq("team_id", team_id); + + if (emailsError) { + logger.debug(`Error fetching emails: ${emailsError}`); + await redisEvictConnection.del(redisKey); // free up redis, let it try again + return { success: false }; + } + + if (is_ledger_enabled) { + sendLedgerEvent(team_id, notificationType).catch((error) => { + logger.warn("Error sending ledger event", { module: "email_notification", method: "sendEmail", error }); + }); + } + + if (!is_ledger_enabled) { + for (const email of emails) { + await sendEmailNotification(email.email, notificationType); + } + } + + const { error: insertError } = await supabase_service + .from("user_notifications") + .insert([ + { + team_id: team_id, + notification_type: notificationType, + sent_date: new Date().toISOString(), + timestamp: new Date().toISOString(), + }, + ]); + + if ( + process.env.SLACK_ADMIN_WEBHOOK_URL && + emails.length > 0 && + notificationType !== NotificationType.CONCURRENCY_LIMIT_REACHED + ) { + sendSlackWebhook( + `${getNotificationString(notificationType)}: Team ${team_id}, with email ${emails[0].email}.`, + false, + process.env.SLACK_ADMIN_WEBHOOK_URL, + ).catch((error) => { + logger.debug(`Error sending slack notification: ${error}`); + }); + } + + if (insertError) { + logger.debug(`Error inserting notification record: ${insertError}`); + await redisEvictConnection.del(redisKey); // free up redis, let it try again + return { success: false }; + } - if (didSendRecentNotification && !bypassRecentChecks) { - logger.debug(`Notification already sent within the last ${daysBetweenEmails} days for team_id: ${team_id} and notificationType: ${notificationType}`); return { success: true }; - } - - await redisEvictConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60); - - const now = new Date(); - const pastDate = new Date(now.getTime() - daysBetweenEmails * 24 * 60 * 60 * 1000); - - const { data: recentNotifications, error: recentNotificationsError } = await supabase_service - .from("user_notifications") - .select("*") - .eq("team_id", team_id) - .eq("notification_type", notificationType) - .gte("sent_date", pastDate.toISOString()); - - if (recentNotificationsError) { - logger.debug(`Error fetching recent notifications: ${recentNotificationsError}`); - await redisEvictConnection.del(redisKey); // free up redis, let it try again - return { success: false }; - } - - if (recentNotifications.length > 0 && !bypassRecentChecks) { - logger.debug(`Notification already sent within the last ${daysBetweenEmails} days for team_id: ${team_id} and notificationType: ${notificationType}`); - await redisEvictConnection.set(redisKey, "1", "EX", daysBetweenEmails * 24 * 60 * 60); - return { success: true }; - } - - console.log( - `Sending notification for team_id: ${team_id} and notificationType: ${notificationType}`, - ); - // get the emails from the user with the team_id - const { data: emails, error: emailsError } = await supabase_service - .from("users") - .select("email") - .eq("team_id", team_id); - - if (emailsError) { - logger.debug(`Error fetching emails: ${emailsError}`); - await redisEvictConnection.del(redisKey); // free up redis, let it try again - return { success: false }; - } - - for (const email of emails) { - await sendEmailNotification(email.email, notificationType); - } - - const { error: insertError } = await supabase_service - .from("user_notifications") - .insert([ - { - team_id: team_id, - notification_type: notificationType, - sent_date: new Date().toISOString(), - timestamp: new Date().toISOString(), - }, - ]); - - if (process.env.SLACK_ADMIN_WEBHOOK_URL && emails.length > 0 && notificationType !== NotificationType.CONCURRENCY_LIMIT_REACHED) { - sendSlackWebhook( - `${getNotificationString(notificationType)}: Team ${team_id}, with email ${emails[0].email}.`, - false, - process.env.SLACK_ADMIN_WEBHOOK_URL, - ).catch((error) => { - logger.debug(`Error sending slack notification: ${error}`); - }); - } - - if (insertError) { - logger.debug(`Error inserting notification record: ${insertError}`); - await redisEvictConnection.del(redisKey); // free up redis, let it try again - return { success: false }; - } - - return { success: true }; - }, undefined)( - team_id, - notificationType, - daysBetweenEmails, - bypassRecentChecks, - ); + }, + undefined, + )(team_id, notificationType, daysBetweenEmails, bypassRecentChecks, is_ledger_enabled ); } diff --git a/apps/api/src/services/queue-jobs.ts b/apps/api/src/services/queue-jobs.ts index 3a1a7ca78..b7cef75f4 100644 --- a/apps/api/src/services/queue-jobs.ts +++ b/apps/api/src/services/queue-jobs.ts @@ -127,7 +127,7 @@ async function addScrapeJobRaw( // Only send notification if it's not a crawl or batch scrape const shouldSendNotification = await shouldSendConcurrencyLimitNotification(webScraperOptions.team_id); if (shouldSendNotification) { - sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { + sendNotificationWithCustomDays(webScraperOptions.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false, true).catch((error) => { logger.error("Error sending notification (concurrency limit reached)", { error }); }); } @@ -309,7 +309,7 @@ export async function addScrapeJobs( if (!isCrawlOrBatchScrape(jobs[0].data)) { const shouldSendNotification = await shouldSendConcurrencyLimitNotification(jobs[0].data.team_id); if (shouldSendNotification) { - sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false).catch((error) => { + sendNotificationWithCustomDays(jobs[0].data.team_id, NotificationType.CONCURRENCY_LIMIT_REACHED, 15, false, true).catch((error) => { logger.error("Error sending notification (concurrency limit reached)", { error }); }); }