// Import required modules import { Client } from "@db/postgres"; import { load } from "@std/dotenv"; import { crypto } from "@std/crypto"; // Load environment variables from .env file or system environment const env = await load(); // Validate required environment variables for (const key of ["DB_USER", "DB_PASSWORD", "DB_NAME", "ANONYMIZATION_KEY"]) { if (!env[key]) { console.error(`Missing ${key} in .env or environment variables`); Deno.exit(1); } } // Adjust based on database capacity const BATCH_SIZE = parseInt(env.BATCH_SIZE) || 10; // Maximum number of invoices to process per user // Skip user if exceeded (likely the case for streaming payments) const MAX_INVOICES = parseInt(env.MAX_INVOICES) || 21000; // Database connection configuration from .env or environment variables const dbConfig = { user: env.DB_USER, password: env.DB_PASSWORD, database: env.DB_NAME, hostname: env.DB_HOST || "localhost", port: parseInt(env.DB_PORT || "5432", 10), }; const client = new Client(dbConfig); // Connect to the database try { await client.connect(); console.log(`Connected to database`); } catch { console.log(`Failed to connect to database`); Deno.exit(1); } // Fetch all user IDs const userIdsResult = await client.queryObject("SELECT id FROM users"); const userIds = userIdsResult.rows.map((row) => row.id); // Shuffle userIds array to randomize processing order function shuffleArray(array) { for (let i = array.length - 1; i > 0; i--) { const j = Math.floor(Math.random() * (i + 1)); [array[i], array[j]] = [array[j], array[i]]; // Swap elements } return array; } shuffleArray(userIds); // CSV file path const csvFilePath = "./daily-stats.csv"; // CSV headers const csvHeaders = [ "User ID", // HMAC-SHA256 hash (truncated to 12 hex chars) "Date", "Balance Start of Day", "Balance End of Day", "Balance Max Day", "Balance Min Day", "Total Flow Out", "Total Flow In", ]; // Write headers to CSV (create file or overwrite if it exists) await Deno.writeTextFile(csvFilePath, csvHeaders.join(",") + "\n"); // Compute HMAC-SHA256 hash of user_id, truncated to 48 bits (12 hex chars) async function anonymizeUserId(userId) { const keyData = new TextEncoder().encode(env.ANONYMIZATION_KEY); const data = new TextEncoder().encode(userId.toString()); const key = await crypto.subtle.importKey( "raw", keyData, { name: "HMAC", hash: "SHA-256" }, false, ["sign"] ); const signature = await crypto.subtle.sign("HMAC", key, data); const hashArray = Array.from(new Uint8Array(signature).slice(0, 6)); // Take first 6 bytes (48 bits) return hashArray.map((b) => b.toString(16).padStart(2, "0")).join(""); } // Generate a random fuzzing factor function getFuzzFactor(amount) { const bytes = new Uint8Array(4); crypto.getRandomValues(bytes); const randomValue = new Uint32Array(bytes.buffer)[0]; const normalized = randomValue / 0xFFFFFFFF; if (amount < 100) { return normalized < 0.5 ? -1 : 1; // ±1 satoshi for small amounts } return normalized * 0.02 - 0.01; // -1% to +1% for larger amounts } // Randomly choose a date shift between 7 and 14 days const dateShiftDays = Math.floor(Math.random() * 8) + 7; // Shift date to the past by a number of days function shiftDate(date, days) { const shifted = new Date(date); shifted.setDate(date.getDate() - days); // Subtract days to shift to past return shifted.toISOString().split("T")[0]; // YYYY-MM-DD } // Process a single user and return CSV rows async function processUser(userId, client) { // Check the number of settled invoices for the user const countResult = await client.queryObject( `SELECT COUNT(*) AS count FROM invoices WHERE user_id = $1 AND settled_at IS NOT NULL`, [userId] ); const invoiceCount = Number(countResult.rows[0].count); if (invoiceCount > MAX_INVOICES) { const anonymizedUserId = await anonymizeUserId(userId); console.warn(`Skipping user ${anonymizedUserId}: ${invoiceCount} invoices exceed limit of ${MAX_INVOICES}`); return []; } // Fetch settled invoices for the user const invoicesResult = await client.queryObject( `SELECT settled_at, amount, type, service_fee, routing_fee FROM invoices WHERE user_id = $1 AND settled_at IS NOT NULL ORDER BY settled_at ASC`, [userId] ); const invoices = invoicesResult.rows; if (invoices.length === 0) return []; // Aggregate daily statistics using BigInt const dailyData = {}; let runningBalance = BigInt(0); for (const invoice of invoices) { const day = shiftDate(invoice.settled_at, dateShiftDays); // Get fuzzing factor for this invoice const fuzzFactor = getFuzzFactor(Number(invoice.amount)); // Convert amounts to BigInt const amount = BigInt(invoice.amount); const serviceFee = BigInt(invoice.service_fee); const routingFee = BigInt(invoice.routing_fee); // Apply fuzzing: value * (1 + fuzzFactor) or ±1 sat, rounded to nearest integer let fuzzedAmount = amount < 100 ? amount + BigInt(fuzzFactor) : BigInt(Math.round(Number(amount) * (1 + fuzzFactor))); // Calculate effective amount: include fees for outgoing let effectiveAmount = invoice.type === "incoming" ? fuzzedAmount : fuzzedAmount + serviceFee + routingFee; // Add fees for outgoing let signedAmount = invoice.type === "incoming" ? effectiveAmount : -effectiveAmount; // Prevent negative running balance for outgoing invoices if (invoice.type === "outgoing" && runningBalance + signedAmount < 0) { // Adjust fuzzed amount to spend only up to available balance const maxSpend = runningBalance; fuzzedAmount = maxSpend - serviceFee - routingFee; effectiveAmount = fuzzedAmount + serviceFee + routingFee; signedAmount = -effectiveAmount; } if (!dailyData[day]) { dailyData[day] = { balance_start_of_day: runningBalance, balance_end_of_day: runningBalance, balance_max_day: runningBalance, balance_min_day: runningBalance, total_flow_in: BigInt(0), total_flow_out: BigInt(0), }; } // Update running balance runningBalance += signedAmount; dailyData[day].balance_end_of_day = runningBalance; // Update min/max balance dailyData[day].balance_max_day = runningBalance > dailyData[day].balance_max_day ? runningBalance : dailyData[day].balance_max_day; dailyData[day].balance_min_day = runningBalance < dailyData[day].balance_min_day ? runningBalance : dailyData[day].balance_min_day; // Update flows if (signedAmount > 0) { dailyData[day].total_flow_in += signedAmount; } else { dailyData[day].total_flow_out += -signedAmount; // Positive outflow value } } // Generate CSV rows for this user with anonymized user_id const anonymizedUserId = await anonymizeUserId(userId); const rows = []; for (const [day, stats] of Object.entries(dailyData)) { rows.push([ anonymizedUserId, day, stats.balance_start_of_day.toString(), stats.balance_end_of_day.toString(), stats.balance_max_day.toString(), stats.balance_min_day.toString(), stats.total_flow_out.toString(), stats.total_flow_in.toString(), ].join(",")); } return rows; } // Process users in parallel with batching async function processUsersInParallel(userIds) { for (let i = 0; i < userIds.length; i += BATCH_SIZE) { const batch = userIds.slice(i, i + BATCH_SIZE); console.log(`Processing batch ${i / BATCH_SIZE + 1} of ${Math.ceil(userIds.length / BATCH_SIZE)}`); // Process users in parallel and collect rows const promises = batch.map(async (userId) => { const batchClient = new Client(dbConfig); // Use shared dbConfig await batchClient.connect(); try { return await processUser(userId, batchClient); } finally { await batchClient.end(); } }); // Wait for all users in the batch to complete const batchRows = (await Promise.all(promises)).flat(); // Write all rows for the batch to CSV in one operation if (batchRows.length > 0) { await Deno.writeTextFile(csvFilePath, batchRows.join("\n") + "\n", { append: true }); } } } // Run parallel processing await processUsersInParallel(userIds); // Close the main database connection await client.end(); console.log("Daily statistics written to", csvFilePath);