commit e4634fedf292b6abc38dc0ade302da27ec54d1fe Author: Râu Cao Date: Fri May 2 14:40:06 2025 +0400 Export daily stats per user to CSV diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..9596a7c --- /dev/null +++ b/.env.example @@ -0,0 +1,8 @@ +DB_USER=lndhub +DB_PASSWORD=12345678 +DB_NAME=lndhub +DB_HOST=10.1.1.1 +# DB_PORT=5432 +# BATCH_SIZE=10 +# MAX_INVOICES=21000 # per user +# ANONYMIZATION_KEY=12345678 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3f69639 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.env +csv/ +*.csv +sql/ diff --git a/export-stats.ts b/export-stats.ts new file mode 100644 index 0000000..1b51639 --- /dev/null +++ b/export-stats.ts @@ -0,0 +1,210 @@ +// Import required modules +import { Client } from "jsr:@db/postgres"; +import { load } from "jsr:@std/dotenv"; +import { crypto } from "jsr:@std/crypto"; + +// Load environment variables from .env file or system environment +const env = await load(); + +// Validate required environment variables +for (const key of ["DB_USER", "DB_PASSWORD", "DB_NAME", "ANONYMIZATION_KEY"]) { + if (!env[key]) { + console.error(`Missing ${key} in .env or environment variables`); + Deno.exit(1); + } +} + +// Adjust based on database capacity +const BATCH_SIZE = parseInt(env.BATCH_SIZE) || 10; +// Maximum number of invoices to process per user +// Skip user if exceeded (likely the case for streaming payments e.g.) +const MAX_INVOICES = parseInt(env.MAX_INVOICES) || 21000; + +// Database connection configuration from .env or environment variables +const dbConfig = { + user: env.DB_USER, + password: env.DB_PASSWORD, + database: env.DB_NAME, + hostname: env.DB_HOST || "localhost", + port: parseInt(env.DB_PORT || "5432", 10), +}; + +const client = new Client(dbConfig); + +// Connect to the database +try { + await client.connect(); + console.log(`Connected to database`); +} catch { + console.log(`Failed to connect to database`); + Deno.exit(1); +} + +// Fetch all user IDs +const userIdsResult = await client.queryObject("SELECT id FROM users"); +const userIds = userIdsResult.rows.map((row) => row.id); + +// CSV file path +const csvFilePath = "./daily-stats.csv"; + +// CSV headers +const csvHeaders = [ + "User ID", // HMAC-SHA256 hash (truncated to 12 hex chars) + "Date", + "Balance Start of Day", + "Balance End of Day", + "Balance Max Day", + "Balance Min Day", + "Total Flow Out", + "Total Flow In", +]; + +// Write headers to CSV (create file or overwrite if it exists) +await Deno.writeTextFile(csvFilePath, csvHeaders.join(",") + "\n"); + +// Function to compute HMAC-SHA256 hash of user_id, truncated to 48 bits (12 hex chars) +async function anonymizeUserId(userId) { + const keyData = new TextEncoder().encode(env.ANONYMIZATION_KEY); + const data = new TextEncoder().encode(userId.toString()); + const key = await crypto.subtle.importKey( + "raw", + keyData, + { name: "HMAC", hash: "SHA-256" }, + false, + ["sign"] + ); + const signature = await crypto.subtle.sign("HMAC", key, data); + const hashArray = Array.from(new Uint8Array(signature).slice(0, 6)); // Take first 6 bytes (48 bits) + return hashArray.map((b) => b.toString(16).padStart(2, "0")).join(""); +} + +// Function to process a single user and return CSV rows +async function processUser(userId, client) { + // Check the number of settled invoices for the user + const countResult = await client.queryObject( + `SELECT COUNT(*) AS count + FROM invoices + WHERE user_id = $1 AND settled_at IS NOT NULL`, + [userId] + ); + const invoiceCount = Number(countResult.rows[0].count); + + if (invoiceCount > MAX_INVOICES) { + const anonymizedUserId = await anonymizeUserId(userId); + console.warn(`Skipping user ${anonymizedUserId}: ${invoiceCount} invoices exceed limit of ${MAX_INVOICES}`); + return []; + } + + // Fetch settled invoices for the user + const invoicesResult = await client.queryObject( + `SELECT settled_at, amount, type, service_fee, routing_fee + FROM invoices + WHERE user_id = $1 AND settled_at IS NOT NULL + ORDER BY settled_at ASC`, + [userId] + ); + const invoices = invoicesResult.rows; + + if (invoices.length === 0) return []; + + // Aggregate daily statistics using BigInt + const dailyData = {}; + let runningBalance = BigInt(0); // Initialize as BigInt + + for (const invoice of invoices) { + const day = invoice.settled_at.toISOString().split("T")[0]; // YYYY-MM-DD + // Convert amounts to BigInt, handling NULL fees + const amount = BigInt(invoice.amount); + const serviceFee = invoice.service_fee ? BigInt(invoice.service_fee) : BigInt(0); + const routingFee = invoice.routing_fee ? BigInt(invoice.routing_fee) : BigInt(0); + + // Calculate effective amount: include fees for outgoing, not for incoming + const effectiveAmount = invoice.type === "incoming" + ? amount + : amount + serviceFee + routingFee; // Add fees for outgoing + const signedAmount = invoice.type === "incoming" ? effectiveAmount : -effectiveAmount; + + if (!dailyData[day]) { + dailyData[day] = { + balance_start_of_day: runningBalance, + balance_end_of_day: runningBalance, + balance_max_day: runningBalance, + balance_min_day: runningBalance, + total_flow_in: BigInt(0), + total_flow_out: BigInt(0), + }; + } + + // Update running balance + runningBalance += signedAmount; + dailyData[day].balance_end_of_day = runningBalance; + + // Update min/max balance + dailyData[day].balance_max_day = runningBalance > dailyData[day].balance_max_day + ? runningBalance + : dailyData[day].balance_max_day; + dailyData[day].balance_min_day = runningBalance < dailyData[day].balance_min_day + ? runningBalance + : dailyData[day].balance_min_day; + + // Update flows + if (signedAmount > 0) { + dailyData[day].total_flow_in += signedAmount; + } else { + dailyData[day].total_flow_out += -signedAmount; // Positive outflow value + } + } + + // Generate CSV rows for this user with anonymized user_id + const anonymizedUserId = await anonymizeUserId(userId); + const rows = []; + for (const [day, stats] of Object.entries(dailyData)) { + rows.push([ + anonymizedUserId, + day, + stats.balance_start_of_day.toString(), + stats.balance_end_of_day.toString(), + stats.balance_max_day.toString(), + stats.balance_min_day.toString(), + stats.total_flow_out.toString(), + stats.total_flow_in.toString(), + ].join(",")); + } + + return rows; +} + +// Process users in parallel with batching +async function processUsersInParallel(userIds) { + for (let i = 0; i < userIds.length; i += BATCH_SIZE) { + const batch = userIds.slice(i, i + BATCH_SIZE); + console.log(`Processing batch ${i / BATCH_SIZE + 1} of ${Math.ceil(userIds.length / BATCH_SIZE)}`); + + // Process users in parallel and collect rows + const promises = batch.map(async (userId) => { + const batchClient = new Client(dbConfig); // Use shared dbConfig + await batchClient.connect(); + try { + return await processUser(userId, batchClient); + } finally { + await batchClient.end(); + } + }); + + // Wait for all users in the batch to complete + const batchRows = (await Promise.all(promises)).flat(); + + // Write all rows for the batch to CSV in one operation + if (batchRows.length > 0) { + await Deno.writeTextFile(csvFilePath, batchRows.join("\n") + "\n", { append: true }); + } + } +} + +// Run parallel processing +await processUsersInParallel(userIds); + +// Close the main database connection +await client.end(); + +console.log("Daily statistics written to", csvFilePath);