lndhub.go-stats/export-stats.ts

211 lines
6.7 KiB
TypeScript

// Import required modules
import { Client } from "jsr:@db/postgres";
import { load } from "jsr:@std/dotenv";
import { crypto } from "jsr:@std/crypto";
// Load environment variables from .env file or system environment
const env = await load();
// Validate required environment variables
for (const key of ["DB_USER", "DB_PASSWORD", "DB_NAME", "ANONYMIZATION_KEY"]) {
if (!env[key]) {
console.error(`Missing ${key} in .env or environment variables`);
Deno.exit(1);
}
}
// Adjust based on database capacity
const BATCH_SIZE = parseInt(env.BATCH_SIZE) || 10;
// Maximum number of invoices to process per user
// Skip user if exceeded (likely the case for streaming payments e.g.)
const MAX_INVOICES = parseInt(env.MAX_INVOICES) || 21000;
// Database connection configuration from .env or environment variables
const dbConfig = {
user: env.DB_USER,
password: env.DB_PASSWORD,
database: env.DB_NAME,
hostname: env.DB_HOST || "localhost",
port: parseInt(env.DB_PORT || "5432", 10),
};
const client = new Client(dbConfig);
// Connect to the database
try {
await client.connect();
console.log(`Connected to database`);
} catch {
console.log(`Failed to connect to database`);
Deno.exit(1);
}
// Fetch all user IDs
const userIdsResult = await client.queryObject("SELECT id FROM users");
const userIds = userIdsResult.rows.map((row) => row.id);
// CSV file path
const csvFilePath = "./daily-stats.csv";
// CSV headers
const csvHeaders = [
"User ID", // HMAC-SHA256 hash (truncated to 12 hex chars)
"Date",
"Balance Start of Day",
"Balance End of Day",
"Balance Max Day",
"Balance Min Day",
"Total Flow Out",
"Total Flow In",
];
// Write headers to CSV (create file or overwrite if it exists)
await Deno.writeTextFile(csvFilePath, csvHeaders.join(",") + "\n");
// Function to compute HMAC-SHA256 hash of user_id, truncated to 48 bits (12 hex chars)
async function anonymizeUserId(userId) {
const keyData = new TextEncoder().encode(env.ANONYMIZATION_KEY);
const data = new TextEncoder().encode(userId.toString());
const key = await crypto.subtle.importKey(
"raw",
keyData,
{ name: "HMAC", hash: "SHA-256" },
false,
["sign"]
);
const signature = await crypto.subtle.sign("HMAC", key, data);
const hashArray = Array.from(new Uint8Array(signature).slice(0, 6)); // Take first 6 bytes (48 bits)
return hashArray.map((b) => b.toString(16).padStart(2, "0")).join("");
}
// Function to process a single user and return CSV rows
async function processUser(userId, client) {
// Check the number of settled invoices for the user
const countResult = await client.queryObject(
`SELECT COUNT(*) AS count
FROM invoices
WHERE user_id = $1 AND settled_at IS NOT NULL`,
[userId]
);
const invoiceCount = Number(countResult.rows[0].count);
if (invoiceCount > MAX_INVOICES) {
const anonymizedUserId = await anonymizeUserId(userId);
console.warn(`Skipping user ${anonymizedUserId}: ${invoiceCount} invoices exceed limit of ${MAX_INVOICES}`);
return [];
}
// Fetch settled invoices for the user
const invoicesResult = await client.queryObject(
`SELECT settled_at, amount, type, service_fee, routing_fee
FROM invoices
WHERE user_id = $1 AND settled_at IS NOT NULL
ORDER BY settled_at ASC`,
[userId]
);
const invoices = invoicesResult.rows;
if (invoices.length === 0) return [];
// Aggregate daily statistics using BigInt
const dailyData = {};
let runningBalance = BigInt(0); // Initialize as BigInt
for (const invoice of invoices) {
const day = invoice.settled_at.toISOString().split("T")[0]; // YYYY-MM-DD
// Convert amounts to BigInt, handling NULL fees
const amount = BigInt(invoice.amount);
const serviceFee = invoice.service_fee ? BigInt(invoice.service_fee) : BigInt(0);
const routingFee = invoice.routing_fee ? BigInt(invoice.routing_fee) : BigInt(0);
// Calculate effective amount: include fees for outgoing, not for incoming
const effectiveAmount = invoice.type === "incoming"
? amount
: amount + serviceFee + routingFee; // Add fees for outgoing
const signedAmount = invoice.type === "incoming" ? effectiveAmount : -effectiveAmount;
if (!dailyData[day]) {
dailyData[day] = {
balance_start_of_day: runningBalance,
balance_end_of_day: runningBalance,
balance_max_day: runningBalance,
balance_min_day: runningBalance,
total_flow_in: BigInt(0),
total_flow_out: BigInt(0),
};
}
// Update running balance
runningBalance += signedAmount;
dailyData[day].balance_end_of_day = runningBalance;
// Update min/max balance
dailyData[day].balance_max_day = runningBalance > dailyData[day].balance_max_day
? runningBalance
: dailyData[day].balance_max_day;
dailyData[day].balance_min_day = runningBalance < dailyData[day].balance_min_day
? runningBalance
: dailyData[day].balance_min_day;
// Update flows
if (signedAmount > 0) {
dailyData[day].total_flow_in += signedAmount;
} else {
dailyData[day].total_flow_out += -signedAmount; // Positive outflow value
}
}
// Generate CSV rows for this user with anonymized user_id
const anonymizedUserId = await anonymizeUserId(userId);
const rows = [];
for (const [day, stats] of Object.entries(dailyData)) {
rows.push([
anonymizedUserId,
day,
stats.balance_start_of_day.toString(),
stats.balance_end_of_day.toString(),
stats.balance_max_day.toString(),
stats.balance_min_day.toString(),
stats.total_flow_out.toString(),
stats.total_flow_in.toString(),
].join(","));
}
return rows;
}
// Process users in parallel with batching
async function processUsersInParallel(userIds) {
for (let i = 0; i < userIds.length; i += BATCH_SIZE) {
const batch = userIds.slice(i, i + BATCH_SIZE);
console.log(`Processing batch ${i / BATCH_SIZE + 1} of ${Math.ceil(userIds.length / BATCH_SIZE)}`);
// Process users in parallel and collect rows
const promises = batch.map(async (userId) => {
const batchClient = new Client(dbConfig); // Use shared dbConfig
await batchClient.connect();
try {
return await processUser(userId, batchClient);
} finally {
await batchClient.end();
}
});
// Wait for all users in the batch to complete
const batchRows = (await Promise.all(promises)).flat();
// Write all rows for the batch to CSV in one operation
if (batchRows.length > 0) {
await Deno.writeTextFile(csvFilePath, batchRows.join("\n") + "\n", { append: true });
}
}
}
// Run parallel processing
await processUsersInParallel(userIds);
// Close the main database connection
await client.end();
console.log("Daily statistics written to", csvFilePath);