// Import required modules import { Client } from "@db/postgres"; import { load } from "@std/dotenv"; import { crypto } from "@std/crypto"; // Load environment variables from .env file or system environment const env = await load(); // Validate required environment variables for (const key of ["DB_USER", "DB_PASSWORD", "DB_NAME", "ANONYMIZATION_KEY"]) { if (!env[key]) { console.error(`Missing ${key} in .env or environment variables`); Deno.exit(1); } } // Adjust based on database capacity const BATCH_SIZE = parseInt(env.BATCH_SIZE) || 10; // Maximum number of invoices to process per user // Skip user if exceeded (likely the case for streaming payments e.g.) const MAX_INVOICES = parseInt(env.MAX_INVOICES) || 21000; // Database connection configuration from .env or environment variables const dbConfig = { user: env.DB_USER, password: env.DB_PASSWORD, database: env.DB_NAME, hostname: env.DB_HOST || "localhost", port: parseInt(env.DB_PORT || "5432", 10), }; const client = new Client(dbConfig); // Connect to the database try { await client.connect(); console.log(`Connected to database`); } catch { console.log(`Failed to connect to database`); Deno.exit(1); } // Fetch all user IDs const userIdsResult = await client.queryObject("SELECT id FROM users"); const userIds = userIdsResult.rows.map((row) => row.id); // CSV file path const csvFilePath = "./daily-stats.csv"; // CSV headers const csvHeaders = [ "User ID", // HMAC-SHA256 hash (truncated to 12 hex chars) "Date", "Balance Start of Day", "Balance End of Day", "Balance Max Day", "Balance Min Day", "Total Flow Out", "Total Flow In", ]; // Write headers to CSV (create file or overwrite if it exists) await Deno.writeTextFile(csvFilePath, csvHeaders.join(",") + "\n"); // Function to compute HMAC-SHA256 hash of user_id, truncated to 48 bits (12 hex chars) async function anonymizeUserId(userId) { const keyData = new TextEncoder().encode(env.ANONYMIZATION_KEY); const data = new TextEncoder().encode(userId.toString()); const key = await crypto.subtle.importKey( "raw", keyData, { name: "HMAC", hash: "SHA-256" }, false, ["sign"] ); const signature = await crypto.subtle.sign("HMAC", key, data); const hashArray = Array.from(new Uint8Array(signature).slice(0, 6)); // Take first 6 bytes (48 bits) return hashArray.map((b) => b.toString(16).padStart(2, "0")).join(""); } // Function to process a single user and return CSV rows async function processUser(userId, client) { // Check the number of settled invoices for the user const countResult = await client.queryObject( `SELECT COUNT(*) AS count FROM invoices WHERE user_id = $1 AND settled_at IS NOT NULL`, [userId] ); const invoiceCount = Number(countResult.rows[0].count); if (invoiceCount > MAX_INVOICES) { const anonymizedUserId = await anonymizeUserId(userId); console.warn(`Skipping user ${anonymizedUserId}: ${invoiceCount} invoices exceed limit of ${MAX_INVOICES}`); return []; } // Fetch settled invoices for the user const invoicesResult = await client.queryObject( `SELECT settled_at, amount, type, service_fee, routing_fee FROM invoices WHERE user_id = $1 AND settled_at IS NOT NULL ORDER BY settled_at ASC`, [userId] ); const invoices = invoicesResult.rows; if (invoices.length === 0) return []; // Aggregate daily statistics using BigInt const dailyData = {}; let runningBalance = BigInt(0); // Initialize as BigInt for (const invoice of invoices) { const day = invoice.settled_at.toISOString().split("T")[0]; // YYYY-MM-DD // Convert amounts to BigInt, handling NULL fees const amount = BigInt(invoice.amount); const serviceFee = invoice.service_fee ? BigInt(invoice.service_fee) : BigInt(0); const routingFee = invoice.routing_fee ? BigInt(invoice.routing_fee) : BigInt(0); // Calculate effective amount: include fees for outgoing, not for incoming const effectiveAmount = invoice.type === "incoming" ? amount : amount + serviceFee + routingFee; // Add fees for outgoing const signedAmount = invoice.type === "incoming" ? effectiveAmount : -effectiveAmount; if (!dailyData[day]) { dailyData[day] = { balance_start_of_day: runningBalance, balance_end_of_day: runningBalance, balance_max_day: runningBalance, balance_min_day: runningBalance, total_flow_in: BigInt(0), total_flow_out: BigInt(0), }; } // Update running balance runningBalance += signedAmount; dailyData[day].balance_end_of_day = runningBalance; // Update min/max balance dailyData[day].balance_max_day = runningBalance > dailyData[day].balance_max_day ? runningBalance : dailyData[day].balance_max_day; dailyData[day].balance_min_day = runningBalance < dailyData[day].balance_min_day ? runningBalance : dailyData[day].balance_min_day; // Update flows if (signedAmount > 0) { dailyData[day].total_flow_in += signedAmount; } else { dailyData[day].total_flow_out += -signedAmount; // Positive outflow value } } // Generate CSV rows for this user with anonymized user_id const anonymizedUserId = await anonymizeUserId(userId); const rows = []; for (const [day, stats] of Object.entries(dailyData)) { rows.push([ anonymizedUserId, day, stats.balance_start_of_day.toString(), stats.balance_end_of_day.toString(), stats.balance_max_day.toString(), stats.balance_min_day.toString(), stats.total_flow_out.toString(), stats.total_flow_in.toString(), ].join(",")); } return rows; } // Process users in parallel with batching async function processUsersInParallel(userIds) { for (let i = 0; i < userIds.length; i += BATCH_SIZE) { const batch = userIds.slice(i, i + BATCH_SIZE); console.log(`Processing batch ${i / BATCH_SIZE + 1} of ${Math.ceil(userIds.length / BATCH_SIZE)}`); // Process users in parallel and collect rows const promises = batch.map(async (userId) => { const batchClient = new Client(dbConfig); // Use shared dbConfig await batchClient.connect(); try { return await processUser(userId, batchClient); } finally { await batchClient.end(); } }); // Wait for all users in the batch to complete const batchRows = (await Promise.all(promises)).flat(); // Write all rows for the batch to CSV in one operation if (batchRows.length > 0) { await Deno.writeTextFile(csvFilePath, batchRows.join("\n") + "\n", { append: true }); } } } // Run parallel processing await processUsersInParallel(userIds); // Close the main database connection await client.end(); console.log("Daily statistics written to", csvFilePath);