In addition to fuzzing amounts, this shifts all dates to the past by a random number of days (between 7 and 14).
261 lines
8.3 KiB
TypeScript
261 lines
8.3 KiB
TypeScript
// Import required modules
|
|
import { Client } from "@db/postgres";
|
|
import { load } from "@std/dotenv";
|
|
import { crypto } from "@std/crypto";
|
|
|
|
// Load environment variables from .env file or system environment
|
|
const env = await load();
|
|
|
|
// Validate required environment variables
|
|
for (const key of ["DB_USER", "DB_PASSWORD", "DB_NAME", "ANONYMIZATION_KEY"]) {
|
|
if (!env[key]) {
|
|
console.error(`Missing ${key} in .env or environment variables`);
|
|
Deno.exit(1);
|
|
}
|
|
}
|
|
|
|
// Adjust based on database capacity
|
|
const BATCH_SIZE = parseInt(env.BATCH_SIZE) || 10;
|
|
// Maximum number of invoices to process per user
|
|
// Skip user if exceeded (likely the case for streaming payments)
|
|
const MAX_INVOICES = parseInt(env.MAX_INVOICES) || 21000;
|
|
|
|
// Database connection configuration from .env or environment variables
|
|
const dbConfig = {
|
|
user: env.DB_USER,
|
|
password: env.DB_PASSWORD,
|
|
database: env.DB_NAME,
|
|
hostname: env.DB_HOST || "localhost",
|
|
port: parseInt(env.DB_PORT || "5432", 10),
|
|
};
|
|
|
|
const client = new Client(dbConfig);
|
|
|
|
// Connect to the database
|
|
try {
|
|
await client.connect();
|
|
console.log(`Connected to database`);
|
|
} catch {
|
|
console.log(`Failed to connect to database`);
|
|
Deno.exit(1);
|
|
}
|
|
|
|
// Fetch all user IDs
|
|
const userIdsResult = await client.queryObject("SELECT id FROM users");
|
|
const userIds = userIdsResult.rows.map((row) => row.id);
|
|
|
|
// Shuffle userIds array to randomize processing order
|
|
function shuffleArray(array) {
|
|
for (let i = array.length - 1; i > 0; i--) {
|
|
const j = Math.floor(Math.random() * (i + 1));
|
|
[array[i], array[j]] = [array[j], array[i]]; // Swap elements
|
|
}
|
|
return array;
|
|
}
|
|
shuffleArray(userIds);
|
|
|
|
// CSV file path
|
|
const csvFilePath = "./daily-stats.csv";
|
|
|
|
// CSV headers
|
|
const csvHeaders = [
|
|
"User ID", // HMAC-SHA256 hash (truncated to 12 hex chars)
|
|
"Date",
|
|
"Balance Start of Day",
|
|
"Balance End of Day",
|
|
"Balance Max Day",
|
|
"Balance Min Day",
|
|
"Total Flow Out",
|
|
"Total Flow In",
|
|
];
|
|
|
|
// Write headers to CSV (create file or overwrite if it exists)
|
|
await Deno.writeTextFile(csvFilePath, csvHeaders.join(",") + "\n");
|
|
|
|
// Compute HMAC-SHA256 hash of user_id, truncated to 48 bits (12 hex chars)
|
|
async function anonymizeUserId(userId) {
|
|
const keyData = new TextEncoder().encode(env.ANONYMIZATION_KEY);
|
|
const data = new TextEncoder().encode(userId.toString());
|
|
const key = await crypto.subtle.importKey(
|
|
"raw",
|
|
keyData,
|
|
{ name: "HMAC", hash: "SHA-256" },
|
|
false,
|
|
["sign"]
|
|
);
|
|
const signature = await crypto.subtle.sign("HMAC", key, data);
|
|
const hashArray = Array.from(new Uint8Array(signature).slice(0, 6)); // Take first 6 bytes (48 bits)
|
|
return hashArray.map((b) => b.toString(16).padStart(2, "0")).join("");
|
|
}
|
|
|
|
// Generate a random fuzzing factor
|
|
function getFuzzFactor(amount) {
|
|
const bytes = new Uint8Array(4);
|
|
crypto.getRandomValues(bytes);
|
|
const randomValue = new Uint32Array(bytes.buffer)[0];
|
|
const normalized = randomValue / 0xFFFFFFFF;
|
|
if (amount < 100) {
|
|
return normalized < 0.5 ? -1 : 1; // ±1 satoshi for small amounts
|
|
}
|
|
return normalized * 0.02 - 0.01; // -1% to +1% for larger amounts
|
|
}
|
|
|
|
// Randomly choose a date shift between 7 and 14 days
|
|
const dateShiftDays = Math.floor(Math.random() * 8) + 7;
|
|
|
|
// Shift date to the past by a number of days
|
|
function shiftDate(date, days) {
|
|
const shifted = new Date(date);
|
|
shifted.setDate(date.getDate() - days); // Subtract days to shift to past
|
|
return shifted.toISOString().split("T")[0]; // YYYY-MM-DD
|
|
}
|
|
|
|
// Process a single user and return CSV rows
|
|
async function processUser(userId, client) {
|
|
// Check the number of settled invoices for the user
|
|
const countResult = await client.queryObject(
|
|
`SELECT COUNT(*) AS count
|
|
FROM invoices
|
|
WHERE user_id = $1 AND settled_at IS NOT NULL`,
|
|
[userId]
|
|
);
|
|
const invoiceCount = Number(countResult.rows[0].count);
|
|
|
|
if (invoiceCount > MAX_INVOICES) {
|
|
const anonymizedUserId = await anonymizeUserId(userId);
|
|
console.warn(`Skipping user ${anonymizedUserId}: ${invoiceCount} invoices exceed limit of ${MAX_INVOICES}`);
|
|
return [];
|
|
}
|
|
|
|
// Fetch settled invoices for the user
|
|
const invoicesResult = await client.queryObject(
|
|
`SELECT settled_at, amount, type, service_fee, routing_fee
|
|
FROM invoices
|
|
WHERE user_id = $1 AND settled_at IS NOT NULL
|
|
ORDER BY settled_at ASC`,
|
|
[userId]
|
|
);
|
|
const invoices = invoicesResult.rows;
|
|
|
|
if (invoices.length === 0) return [];
|
|
|
|
// Aggregate daily statistics using BigInt
|
|
const dailyData = {};
|
|
let runningBalance = BigInt(0);
|
|
|
|
for (const invoice of invoices) {
|
|
const day = shiftDate(invoice.settled_at, dateShiftDays);
|
|
|
|
// Get fuzzing factor for this invoice
|
|
const fuzzFactor = getFuzzFactor(Number(invoice.amount));
|
|
|
|
// Convert amounts to BigInt
|
|
const amount = BigInt(invoice.amount);
|
|
const serviceFee = BigInt(invoice.service_fee);
|
|
const routingFee = BigInt(invoice.routing_fee);
|
|
|
|
// Apply fuzzing: value * (1 + fuzzFactor) or ±1 sat, rounded to nearest integer
|
|
let fuzzedAmount = amount < 100
|
|
? amount + BigInt(fuzzFactor)
|
|
: BigInt(Math.round(Number(amount) * (1 + fuzzFactor)));
|
|
|
|
// Calculate effective amount: include fees for outgoing
|
|
let effectiveAmount = invoice.type === "incoming"
|
|
? fuzzedAmount
|
|
: fuzzedAmount + serviceFee + routingFee; // Add fees for outgoing
|
|
let signedAmount = invoice.type === "incoming" ? effectiveAmount : -effectiveAmount;
|
|
|
|
// Prevent negative running balance for outgoing invoices
|
|
if (invoice.type === "outgoing" && runningBalance + signedAmount < 0) {
|
|
// Adjust fuzzed amount to spend only up to available balance
|
|
const maxSpend = runningBalance;
|
|
fuzzedAmount = maxSpend - serviceFee - routingFee;
|
|
effectiveAmount = fuzzedAmount + serviceFee + routingFee;
|
|
signedAmount = -effectiveAmount;
|
|
}
|
|
|
|
if (!dailyData[day]) {
|
|
dailyData[day] = {
|
|
balance_start_of_day: runningBalance,
|
|
balance_end_of_day: runningBalance,
|
|
balance_max_day: runningBalance,
|
|
balance_min_day: runningBalance,
|
|
total_flow_in: BigInt(0),
|
|
total_flow_out: BigInt(0),
|
|
};
|
|
}
|
|
|
|
// Update running balance
|
|
runningBalance += signedAmount;
|
|
dailyData[day].balance_end_of_day = runningBalance;
|
|
|
|
// Update min/max balance
|
|
dailyData[day].balance_max_day = runningBalance > dailyData[day].balance_max_day
|
|
? runningBalance
|
|
: dailyData[day].balance_max_day;
|
|
dailyData[day].balance_min_day = runningBalance < dailyData[day].balance_min_day
|
|
? runningBalance
|
|
: dailyData[day].balance_min_day;
|
|
|
|
// Update flows
|
|
if (signedAmount > 0) {
|
|
dailyData[day].total_flow_in += signedAmount;
|
|
} else {
|
|
dailyData[day].total_flow_out += -signedAmount; // Positive outflow value
|
|
}
|
|
}
|
|
|
|
// Generate CSV rows for this user with anonymized user_id
|
|
const anonymizedUserId = await anonymizeUserId(userId);
|
|
const rows = [];
|
|
for (const [day, stats] of Object.entries(dailyData)) {
|
|
rows.push([
|
|
anonymizedUserId,
|
|
day,
|
|
stats.balance_start_of_day.toString(),
|
|
stats.balance_end_of_day.toString(),
|
|
stats.balance_max_day.toString(),
|
|
stats.balance_min_day.toString(),
|
|
stats.total_flow_out.toString(),
|
|
stats.total_flow_in.toString(),
|
|
].join(","));
|
|
}
|
|
|
|
return rows;
|
|
}
|
|
|
|
// Process users in parallel with batching
|
|
async function processUsersInParallel(userIds) {
|
|
for (let i = 0; i < userIds.length; i += BATCH_SIZE) {
|
|
const batch = userIds.slice(i, i + BATCH_SIZE);
|
|
console.log(`Processing batch ${i / BATCH_SIZE + 1} of ${Math.ceil(userIds.length / BATCH_SIZE)}`);
|
|
|
|
// Process users in parallel and collect rows
|
|
const promises = batch.map(async (userId) => {
|
|
const batchClient = new Client(dbConfig); // Use shared dbConfig
|
|
await batchClient.connect();
|
|
try {
|
|
return await processUser(userId, batchClient);
|
|
} finally {
|
|
await batchClient.end();
|
|
}
|
|
});
|
|
|
|
// Wait for all users in the batch to complete
|
|
const batchRows = (await Promise.all(promises)).flat();
|
|
|
|
// Write all rows for the batch to CSV in one operation
|
|
if (batchRows.length > 0) {
|
|
await Deno.writeTextFile(csvFilePath, batchRows.join("\n") + "\n", { append: true });
|
|
}
|
|
}
|
|
}
|
|
|
|
// Run parallel processing
|
|
await processUsersInParallel(userIds);
|
|
|
|
// Close the main database connection
|
|
await client.end();
|
|
|
|
console.log("Daily statistics written to", csvFilePath);
|