lndhub.go-stats/generate-stats.ts
Râu Cao 0e8228a0bb
Fuzzing for dates
In addition to fuzzing amounts, this shifts all dates to the past by a
random number of days (between 7 and 14).
2025-05-03 15:19:28 +04:00

261 lines
8.3 KiB
TypeScript

// Import required modules
import { Client } from "@db/postgres";
import { load } from "@std/dotenv";
import { crypto } from "@std/crypto";
// Load environment variables from .env file or system environment
const env = await load();
// Validate required environment variables
for (const key of ["DB_USER", "DB_PASSWORD", "DB_NAME", "ANONYMIZATION_KEY"]) {
if (!env[key]) {
console.error(`Missing ${key} in .env or environment variables`);
Deno.exit(1);
}
}
// Adjust based on database capacity
const BATCH_SIZE = parseInt(env.BATCH_SIZE) || 10;
// Maximum number of invoices to process per user
// Skip user if exceeded (likely the case for streaming payments)
const MAX_INVOICES = parseInt(env.MAX_INVOICES) || 21000;
// Database connection configuration from .env or environment variables
const dbConfig = {
user: env.DB_USER,
password: env.DB_PASSWORD,
database: env.DB_NAME,
hostname: env.DB_HOST || "localhost",
port: parseInt(env.DB_PORT || "5432", 10),
};
const client = new Client(dbConfig);
// Connect to the database
try {
await client.connect();
console.log(`Connected to database`);
} catch {
console.log(`Failed to connect to database`);
Deno.exit(1);
}
// Fetch all user IDs
const userIdsResult = await client.queryObject("SELECT id FROM users");
const userIds = userIdsResult.rows.map((row) => row.id);
// Shuffle userIds array to randomize processing order
function shuffleArray(array) {
for (let i = array.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[array[i], array[j]] = [array[j], array[i]]; // Swap elements
}
return array;
}
shuffleArray(userIds);
// CSV file path
const csvFilePath = "./daily-stats.csv";
// CSV headers
const csvHeaders = [
"User ID", // HMAC-SHA256 hash (truncated to 12 hex chars)
"Date",
"Balance Start of Day",
"Balance End of Day",
"Balance Max Day",
"Balance Min Day",
"Total Flow Out",
"Total Flow In",
];
// Write headers to CSV (create file or overwrite if it exists)
await Deno.writeTextFile(csvFilePath, csvHeaders.join(",") + "\n");
// Compute HMAC-SHA256 hash of user_id, truncated to 48 bits (12 hex chars)
async function anonymizeUserId(userId) {
const keyData = new TextEncoder().encode(env.ANONYMIZATION_KEY);
const data = new TextEncoder().encode(userId.toString());
const key = await crypto.subtle.importKey(
"raw",
keyData,
{ name: "HMAC", hash: "SHA-256" },
false,
["sign"]
);
const signature = await crypto.subtle.sign("HMAC", key, data);
const hashArray = Array.from(new Uint8Array(signature).slice(0, 6)); // Take first 6 bytes (48 bits)
return hashArray.map((b) => b.toString(16).padStart(2, "0")).join("");
}
// Generate a random fuzzing factor
function getFuzzFactor(amount) {
const bytes = new Uint8Array(4);
crypto.getRandomValues(bytes);
const randomValue = new Uint32Array(bytes.buffer)[0];
const normalized = randomValue / 0xFFFFFFFF;
if (amount < 100) {
return normalized < 0.5 ? -1 : 1; // ±1 satoshi for small amounts
}
return normalized * 0.02 - 0.01; // -1% to +1% for larger amounts
}
// Randomly choose a date shift between 7 and 14 days
const dateShiftDays = Math.floor(Math.random() * 8) + 7;
// Shift date to the past by a number of days
function shiftDate(date, days) {
const shifted = new Date(date);
shifted.setDate(date.getDate() - days); // Subtract days to shift to past
return shifted.toISOString().split("T")[0]; // YYYY-MM-DD
}
// Process a single user and return CSV rows
async function processUser(userId, client) {
// Check the number of settled invoices for the user
const countResult = await client.queryObject(
`SELECT COUNT(*) AS count
FROM invoices
WHERE user_id = $1 AND settled_at IS NOT NULL`,
[userId]
);
const invoiceCount = Number(countResult.rows[0].count);
if (invoiceCount > MAX_INVOICES) {
const anonymizedUserId = await anonymizeUserId(userId);
console.warn(`Skipping user ${anonymizedUserId}: ${invoiceCount} invoices exceed limit of ${MAX_INVOICES}`);
return [];
}
// Fetch settled invoices for the user
const invoicesResult = await client.queryObject(
`SELECT settled_at, amount, type, service_fee, routing_fee
FROM invoices
WHERE user_id = $1 AND settled_at IS NOT NULL
ORDER BY settled_at ASC`,
[userId]
);
const invoices = invoicesResult.rows;
if (invoices.length === 0) return [];
// Aggregate daily statistics using BigInt
const dailyData = {};
let runningBalance = BigInt(0);
for (const invoice of invoices) {
const day = shiftDate(invoice.settled_at, dateShiftDays);
// Get fuzzing factor for this invoice
const fuzzFactor = getFuzzFactor(Number(invoice.amount));
// Convert amounts to BigInt
const amount = BigInt(invoice.amount);
const serviceFee = BigInt(invoice.service_fee);
const routingFee = BigInt(invoice.routing_fee);
// Apply fuzzing: value * (1 + fuzzFactor) or ±1 sat, rounded to nearest integer
let fuzzedAmount = amount < 100
? amount + BigInt(fuzzFactor)
: BigInt(Math.round(Number(amount) * (1 + fuzzFactor)));
// Calculate effective amount: include fees for outgoing
let effectiveAmount = invoice.type === "incoming"
? fuzzedAmount
: fuzzedAmount + serviceFee + routingFee; // Add fees for outgoing
let signedAmount = invoice.type === "incoming" ? effectiveAmount : -effectiveAmount;
// Prevent negative running balance for outgoing invoices
if (invoice.type === "outgoing" && runningBalance + signedAmount < 0) {
// Adjust fuzzed amount to spend only up to available balance
const maxSpend = runningBalance;
fuzzedAmount = maxSpend - serviceFee - routingFee;
effectiveAmount = fuzzedAmount + serviceFee + routingFee;
signedAmount = -effectiveAmount;
}
if (!dailyData[day]) {
dailyData[day] = {
balance_start_of_day: runningBalance,
balance_end_of_day: runningBalance,
balance_max_day: runningBalance,
balance_min_day: runningBalance,
total_flow_in: BigInt(0),
total_flow_out: BigInt(0),
};
}
// Update running balance
runningBalance += signedAmount;
dailyData[day].balance_end_of_day = runningBalance;
// Update min/max balance
dailyData[day].balance_max_day = runningBalance > dailyData[day].balance_max_day
? runningBalance
: dailyData[day].balance_max_day;
dailyData[day].balance_min_day = runningBalance < dailyData[day].balance_min_day
? runningBalance
: dailyData[day].balance_min_day;
// Update flows
if (signedAmount > 0) {
dailyData[day].total_flow_in += signedAmount;
} else {
dailyData[day].total_flow_out += -signedAmount; // Positive outflow value
}
}
// Generate CSV rows for this user with anonymized user_id
const anonymizedUserId = await anonymizeUserId(userId);
const rows = [];
for (const [day, stats] of Object.entries(dailyData)) {
rows.push([
anonymizedUserId,
day,
stats.balance_start_of_day.toString(),
stats.balance_end_of_day.toString(),
stats.balance_max_day.toString(),
stats.balance_min_day.toString(),
stats.total_flow_out.toString(),
stats.total_flow_in.toString(),
].join(","));
}
return rows;
}
// Process users in parallel with batching
async function processUsersInParallel(userIds) {
for (let i = 0; i < userIds.length; i += BATCH_SIZE) {
const batch = userIds.slice(i, i + BATCH_SIZE);
console.log(`Processing batch ${i / BATCH_SIZE + 1} of ${Math.ceil(userIds.length / BATCH_SIZE)}`);
// Process users in parallel and collect rows
const promises = batch.map(async (userId) => {
const batchClient = new Client(dbConfig); // Use shared dbConfig
await batchClient.connect();
try {
return await processUser(userId, batchClient);
} finally {
await batchClient.end();
}
});
// Wait for all users in the batch to complete
const batchRows = (await Promise.all(promises)).flat();
// Write all rows for the batch to CSV in one operation
if (batchRows.length > 0) {
await Deno.writeTextFile(csvFilePath, batchRows.join("\n") + "\n", { append: true });
}
}
}
// Run parallel processing
await processUsersInParallel(userIds);
// Close the main database connection
await client.end();
console.log("Daily statistics written to", csvFilePath);