elerium-tooling/elerium_ordinals_daemon.py
nick 46058a453d Update elerium_ordinals_daemon.py
In our updated script, we added a check to ensure that ordinalsid is not None before proceeding with updating the invoice. This prevents the script from trying to update invoices with incomplete or invalid data.
2023-12-05 09:36:49 +00:00

231 lines
7.0 KiB
Python

import requests
import json
import sqlite3
import logging
import time
from requests.exceptions import RequestException
import subprocess
import tempfile
import os
# Configuration
CHECK_INTERVAL = 60 # in seconds
# Maximum metadata size for a safe transaction is 95 KB, to be safe let's limit to 50 KB
MAX_METADATA_SIZE_BYTES = 50 * 1024 # 50 kilobytes in bytes
# Load configuration from JSON file
with open('config.json', 'r') as config_file:
config = json.load(config_file)
#DAEMON config:
BTCPAY_INSTANCE = config['btcpay']['instance']
STORE_ID = config['btcpay']['store_id']
API_KEY = config['btcpay']['api_key']
DB_PATH = config['database']['path']
LOG_FILE = config['database']['logging']
#BITCOIN-CLI config:
BITCOIN_CLI_PATH = config['bitcoin_cli']['path']
#BITCOIN_CLI_DATADIR = config['bitcoin_cli']['data_dir']
BITCOIN_CLI_RPC_PORT = config['bitcoin_cli']['rpc_port']
#ORD config:
#ORD_COOKIE_FILE = config['ord_command']['cookie_file']
#ORD_TESTNET = config['ord_command']['testnet']
ORD_RPC_USER = config['ord_command']['rpc_user'].split()
ORD_RPC_PASS = config['ord_command']['rpc_pass'].split()
# Set up logging
logging.basicConfig(filename=LOG_FILE, level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
# Database setup
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Create table if it doesn't exist
cursor.execute('''CREATE TABLE IF NOT EXISTS orders
(invoice_id TEXT PRIMARY KEY, status TEXT)''')
conn.commit()
# Headers for BTCPay Server API requests
headers = {
"Authorization": f"token {API_KEY}",
"Content-Type": "application/json"
}
def get_fee_rate():
command = [
BITCOIN_CLI_PATH,
#BITCOIN_CLI_DATADIR,
#BITCOIN_CLI_RPC_PORT,
"estimatesmartfee",
"3", # conf_target for 3 blocks
"conservative" # estimate_mode
]
try:
result = subprocess.run(command, capture_output=True, text=True, check=True)
output = result.stdout
data = json.loads(output)
print(data)
# Convert BTC/kvB to sat/B and round to nearest integer
btc_per_kvB = data.get("feerate", 0)
btc_per_kvB = "{:.8f}".format(btc_per_kvB) # Convert to decimal with 8 decimal places
print(btc_per_kvB)
sat_per_B = int(float(btc_per_kvB) * 100000000 / 1000)
print(sat_per_B)
return sat_per_B
except subprocess.CalledProcessError as e:
error_msg = e.stderr
logging.error(f"Fee rate estimation failed: {error_msg}")
return 1 # Default fee rate
def inscribe(metadata):
metadata_size_bytes = len(json.dumps(metadata).encode('utf-8'))
if metadata_size_bytes > MAX_METADATA_SIZE_BYTES:
logging.error(f"Metadata size {metadata_size_bytes} bytes exceeds maximum limit of 50 KB.")
return None
fee_rate = get_fee_rate()
print("FEE RATE: "+ str(fee_rate))
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.json') as temp_file:
# Write the metadata to the temporary file
json.dump(metadata, temp_file)
temp_file_path = temp_file.name
command = [
"ord",
#ORD_COOKIE_FILE,
#ORD_TESTNET,
*ORD_RPC_USER,
*ORD_RPC_PASS,
"wallet",
"inscribe",
"--fee-rate",
str(fee_rate), # Use dynamic fee rate
"--file",
temp_file_path
]
try:
result = subprocess.run(command, capture_output=True, text=True, check=True)
output = result.stdout
# Assuming the correct output is in JSON format
data = json.loads(output)
if "inscriptions" in data:
ordinalsId = data["inscriptions"][0]["id"]
return ordinalsId
else:
raise ValueError("Unexpected response format")
except subprocess.CalledProcessError as e:
error_msg = e.stderr
# Log the error and return None to continue the script
logging.error(f"Error during inscribe operation: {error_msg}")
return None
finally:
# Clean up the temporary file
os.remove(temp_file_path)
def fetch_invoices():
endpoint = f"{BTCPAY_INSTANCE}/api/v1/stores/{STORE_ID}/invoices"
try:
response = requests.get(endpoint, headers=headers)
response.raise_for_status()
return response.json()
except RequestException as e:
logging.error(f"Error fetching invoices: {e}")
print(f"Error fetching invoices: {e}")
return []
def is_inscribed(invoice):
metadata = invoice.get('metadata', {})
ordinals_id = metadata.get('ordinalsId', '')
# Check if ordinals_id is not None and has a length greater than 1
is_inscribed = ordinals_id is not None and len(ordinals_id) > 1
# Debugging print statement
print(f"Invoice ID: {invoice['id']}, Ordinals ID: {ordinals_id}, Is inscribed: {is_inscribed}")
return is_inscribed
def process_invoices(invoices):
for invoice in invoices:
if invoice['status'] != 'Settled':
continue # Skip non-settled invoices
invoice_id = invoice['id']
cursor.execute("SELECT status FROM orders WHERE invoice_id = ?", (invoice_id,))
if not is_inscribed(invoice):
print("Found non-inscribed order: " + str(invoice))
metadata = invoice.get('metadata', {})
ordinalsid = inscribe(metadata=metadata)
# Only proceed if inscribe was successful
if ordinalsid is not None:
update_data = invoice.copy()
update_data['metadata'] = metadata.copy()
update_data['metadata']['ordinalsId'] = ordinalsid
update_endpoint = f"{BTCPAY_INSTANCE}/api/v1/stores/{STORE_ID}/invoices/{invoice_id}"
response = requests.put(update_endpoint, json=update_data, headers=headers)
if response.status_code == 200:
logging.info(f"Updated invoice {invoice_id} with ordinalsId {ordinalsid}")
print(f"Updated invoice {invoice_id} with ordinalsId.")
cursor.execute("INSERT OR REPLACE INTO orders (invoice_id, status) VALUES (?, ?)",
(invoice_id, 'inscribed'))
conn.commit()
else:
logging.error(f"Failed to update invoice {invoice_id}: {response.text}")
print(f"Failed to update invoice {invoice_id}: {response.text}")
else:
logging.error(f"Inscribe operation failed for invoice {invoice_id}. Skipping update.")
continue # Skip updating this invoice
else:
print("Already inscribed order: " + str(invoice))
def main_loop():
while True:
invoices = fetch_invoices()
if invoices:
process_invoices(invoices)
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
try:
main_loop()
except KeyboardInterrupt:
logging.info("Elerium Daemon stopped manually.")
print("Elerium Daemon stopped manually.")
finally:
conn.close()