Staking Helpers
Contains main business logic for staking metrics
Here are the snippets for staking helpers that contain the main logic that fetches and updates the data for the staking endpoints:
distribution.py
This script serves as the base for the staking logic as it interacts with the Distribution.sol
contract, it processes the target events to get data for claim, stake, stake time, reward and multiplier based data into a csv which is then imported by the endpoints to update and communicate the data, it tracks last block number in the csv and updates the data periodically to the current block so all records are up to date without needing to call expensive transactions each time which start from the first block.
import json
from web3 import Web3
import os
import logging
from datetime import datetime, timedelta
import csv
from app.core.config import ETH_RPC_URL, distribution_contract, DISTRIBUTION_PROXY_ADDRESS
from collections import defaultdict
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
RPC_URL = ETH_RPC_URL
START_BLOCK = 20180927
BATCH_SIZE = 1000000
class EventProcessor:
def __init__(self, output_dir):
self.web3 = Web3(Web3.HTTPProvider(RPC_URL))
self.contract = distribution_contract
self.distribution_abi = self.contract.abi
self.output_dir = output_dir
def get_events_in_batches(self, start_block, end_block, event_name):
current_start = start_block
while current_start <= end_block:
current_end = min(current_start + BATCH_SIZE, end_block)
try:
yield from self.get_events(current_start, current_end, event_name)
except Exception as e:
logger.error(f"Error getting events from block {current_start} to {current_end}: {str(e)}")
current_start = current_end + 1
def get_events(self, from_block, to_block, event_name):
try:
event_filter = getattr(self.contract.events, event_name).create_filter(from_block=from_block,
to_block=to_block)
return event_filter.get_all_entries()
except Exception as e:
logger.error(f"Error getting events for {event_name} from block {from_block} to {to_block}: {str(e)}")
return []
def write_to_csv(self, events, filename, headers, mode='w'):
filepath = os.path.join(self.output_dir, filename)
with open(filepath, mode, newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=headers)
if mode == 'w':
writer.writeheader()
for event in events:
row = {
'Timestamp': datetime.fromtimestamp(self.web3.eth.get_block(event['blockNumber'])['timestamp']).isoformat(),
'TransactionHash': event['transactionHash'].hex(),
'BlockNumber': event['blockNumber']
}
row.update(event['args'])
writer.writerow(row)
logger.info(f"CSV file {filename} has been updated successfully.")
def get_event_headers(self, event_name):
event_abi = next((e for e in self.distribution_abi if e['type'] == 'event' and e['name'] == event_name), None)
if not event_abi:
raise ValueError(f"Event {event_name} not found in ABI")
return ['Timestamp', 'TransactionHash', 'BlockNumber'] + [input['name'] for input in event_abi['inputs']]
def get_last_block_from_csv(self, filename):
try:
filepath = os.path.join(self.output_dir, filename)
with open(filepath, 'r') as csvfile:
reader = csv.DictReader(csvfile)
return max(int(row['BlockNumber']) for row in reader)
except FileNotFoundError:
return None
except ValueError:
logger.warning(f"CSV file {filename} is empty or corrupted. Starting from default block.")
return None
def process_events(self, event_name):
try:
latest_block = self.web3.eth.get_block('latest')['number']
headers = self.get_event_headers(event_name)
filename = f"{event_name.lower()}_events.csv"
last_processed_block = self.get_last_block_from_csv(filename)
if last_processed_block is None:
start_block = START_BLOCK
mode = 'w'
else:
start_block = last_processed_block + 1
mode = 'a'
events = list(self.get_events_in_batches(start_block, latest_block, event_name))
logger.info(f"Processing {len(events)} new {event_name} events from block {start_block} to {latest_block}")
if events:
self.write_to_csv(events, filename, headers, mode)
else:
logger.info(f"No new events found for {event_name}.")
except Exception as e:
logger.error(f"An error occurred in process_events: {str(e)}")
logger.exception("Exception details:")
class OptimizedMultiplierCalculator:
def __init__(self):
self.web3 = Web3(Web3.HTTPProvider(RPC_URL))
self.contract = distribution_contract
def get_user_multipliers(self, input_csv, output_csv):
try:
with open(input_csv, 'r') as infile, open(output_csv, 'w', newline='') as outfile:
reader = csv.DictReader(infile)
if reader.fieldnames is None:
default_fieldnames = ['Timestamp', 'TransactionHash', 'BlockNumber', 'user', 'poolId']
fieldnames = default_fieldnames + ['multiplier']
logger.warning(f"No headers found in {input_csv}. Using default headers: {default_fieldnames}")
else:
fieldnames = reader.fieldnames + ['multiplier']
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
writer.writeheader()
for row in reader:
try:
pool_id = int(row.get('poolId', 0))
user = self.web3.to_checksum_address(
row.get('user', '0x0000000000000000000000000000000000000000'))
timestamp = datetime.fromisoformat(row.get('Timestamp', datetime.now().isoformat()))
block_number = self.get_block_number(timestamp)
multiplier = self.contract.functions.getCurrentUserMultiplier(pool_id, user).call(
block_identifier=block_number)
row['multiplier'] = multiplier
writer.writerow(row)
except Exception as e:
logger.error(f"Error processing row {row}: {str(e)}")
except Exception as e:
logger.error(f"Error in get_user_multipliers: {str(e)}")
raise
def get_block_number(self, timestamp):
return self.web3.eth.get_block('latest', full_transactions=False)['number']
class OptimizedRewardCalculator:
def __init__(self):
self.web3 = Web3(Web3.HTTPProvider(RPC_URL))
self.contract = distribution_contract
def calculate_rewards(self, input_csv, output_csv):
try:
with open(input_csv, 'r') as infile, open(output_csv, 'w', newline='') as outfile:
reader = csv.DictReader(infile)
if reader.fieldnames is None:
default_fieldnames = ['Timestamp', 'TransactionHash', 'BlockNumber', 'user', 'poolId', 'multiplier']
fieldnames = default_fieldnames + ['daily_reward', 'total_current_user_reward']
logger.warning(f"No headers found in {input_csv}. Using default headers: {default_fieldnames}")
else:
fieldnames = reader.fieldnames + ['daily_reward', 'total_current_user_reward']
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
writer.writeheader()
for row in reader:
try:
address = self.web3.to_checksum_address(
row.get('user', '0x0000000000000000000000000000000000000000'))
pool_id = int(row.get("poolId", 0))
timestamp = datetime.fromisoformat(row.get('Timestamp', datetime.now().isoformat()))
block_number = self.get_block_number(timestamp)
daily_reward = self.calculate_daily_reward(pool_id, address, block_number)
total_reward = self.contract.functions.getCurrentUserReward(pool_id, address).call(
block_identifier=block_number)
row['daily_reward'] = daily_reward
row['total_current_user_reward'] = total_reward
writer.writerow(row)
except Exception as e:
logger.error(f"Error processing row {row}: {str(e)}")
except Exception as e:
logger.error(f"Error in calculate_rewards: {str(e)}")
raise
def calculate_daily_reward(self, pool_id, address, block_number):
pool_data = self.contract.functions.poolsData(pool_id).call(block_identifier=block_number)
total_virtual_deposited = pool_data[2]
pool_info = self.contract.functions.pools(pool_id).call(block_identifier=block_number)
payout_start, decrease_interval, _, _, _, initial_reward, reward_decrease, _, _ = pool_info
user_data = self.contract.functions.usersData(address, pool_id).call(block_identifier=block_number)
deposited = user_data[1]
current_time = self.web3.eth.get_block(block_number)['timestamp']
intervals_passed = (current_time - payout_start) // decrease_interval
current_interval_reward = max(0, initial_reward - (intervals_passed * reward_decrease))
if total_virtual_deposited > 0:
daily_reward = (current_interval_reward * deposited * 86400) // (
total_virtual_deposited * decrease_interval)
else:
daily_reward = 0
return daily_reward
def get_block_number(self, timestamp):
return self.web3.eth.get_block('latest', full_transactions=False)['number']
def get_virtual_steth_pool(self, pool_id):
pools_data = self.contract.functions.poolsData(pool_id).call()
return pools_data[2] / 1e18
# Example usage
# if __name__ == "__main__":
# calculator = OptimizedRewardCalculator()
# calculator.calculate_rewards(
# input_csv='usermultiplier.csv',
# output_csv='usermultiplier2.csv'
# )
# #processor = EventProcessor()
# # Example usage:
# #processor.process_events("UserStaked")
# #processor.process_events("UserClaimLocked")
# multiplier = OptimizedMultiplierCalculator()
# multiplier.get_user_multipliers("userClaimLocked_events.csv","usermultiplier.csv")
1.1 response_distribution.py
Fetches the updated data from the csvs and calculates average power multiplier, stake time, distribution of stakers, reward claim metrics and rewards summary in order to return a final JSON output with all staker related metrics to the main.py
API script.
import json
import os
import logging
from datetime import datetime, timedelta
import csv
from collections import defaultdict
from decimal import Decimal
import requests
import ipdb
from helpers.staking_general_helpers.distribution import OptimizedMultiplierCalculator, OptimizedRewardCalculator
import numpy as np
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def is_valid_stake(row):
current_time = int(datetime.now().timestamp())
claim_lock_start = int(row['claimLockStart'])
claim_lock_end = int(row['claimLockEnd'])
twenty_years_from_now = current_time + (25 * 365 * 24 * 60 * 60) # 20 years in seconds
return (claim_lock_start != 0 and
claim_lock_end != 0 and
claim_lock_end > current_time and
claim_lock_end <= twenty_years_from_now)
def analyze_mor_stakers(csv_file_path):
# Initialize data structures
stakers_by_pool = {0: set(), 1: set()}
stakers_by_pool_and_date = defaultdict(lambda: defaultdict(set))
total_stake_time = {0: timedelta(), 1: timedelta()}
stake_count = {0: 0, 1: 0}
try:
with open(csv_file_path, 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
if not is_valid_stake(row):
continue
timestamp = datetime.fromisoformat(row['Timestamp']).date()
pool_id = int(row['poolId'])
user = row['user']
# Add to overall pool stakers
stakers_by_pool[pool_id].add(user)
# Add to date-specific pool stakers
stakers_by_pool_and_date[timestamp][pool_id].add(user)
# Calculate stake time
claim_lock_start = int(row['claimLockStart'])
claim_lock_end = int(row['claimLockEnd'])
stake_time = timedelta(seconds=claim_lock_end - claim_lock_start)
# Add to total stake time
total_stake_time[pool_id] += stake_time
stake_count[pool_id] += 1
logger.info(f"Successfully analyzed MOR stakers from file: {csv_file_path}")
# Calculate average stake time
avg_stake_time = {
pool_id: (total_time / count if count > 0 else timedelta())
for pool_id, total_time in total_stake_time.items()
for count in [stake_count[pool_id]]
}
# Calculate combined average stake time
total_combined_stake_time = sum(total_stake_time.values(), timedelta())
total_combined_stakes = sum(stake_count.values())
combined_avg_stake_time = total_combined_stake_time / total_combined_stakes if total_combined_stakes > 0 else timedelta()
# Prepare results
results = {
'total_unique_stakers': {
'pool_0': len(stakers_by_pool[0]),
'pool_1': len(stakers_by_pool[1]),
'combined': len(stakers_by_pool[0] | stakers_by_pool[1])
},
'daily_unique_stakers': defaultdict(lambda: {'pool_0': 0, 'pool_1': 0, 'combined': 0}),
'average_stake_time': avg_stake_time,
'combined_average_stake_time': combined_avg_stake_time,
'total_stakes': stake_count
}
# Process daily data
for date, pools in stakers_by_pool_and_date.items():
results['daily_unique_stakers'][date] = {
'pool_0': len(pools[0]),
'pool_1': len(pools[1]),
'combined': len(pools[0] | pools[1])
}
except FileNotFoundError:
logger.error(f"File not found: {csv_file_path}")
raise
except PermissionError:
logger.error(f"Permission denied when trying to read file: {csv_file_path}")
raise
except Exception as e:
logger.error(f"Unexpected error when analyzing MOR stakers from file {csv_file_path}: {str(e)}")
raise
return results
def calculate_average_multipliers(csv_file_path):
total_multiplier = Decimal('0')
capital_multiplier = Decimal('0')
code_multiplier = Decimal('0')
total_count = 0
capital_count = 0
code_count = 0
with open(csv_file_path, 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
if not is_valid_stake(row):
continue
multiplier = Decimal(row['multiplier']) / Decimal('1e18') # Convert from wei to whole units
total_multiplier += multiplier
total_count += 1
if row['poolId'] == '0': # Capital pool
capital_multiplier += multiplier
capital_count += 1
elif row['poolId'] == '1': # Code pool
code_multiplier += multiplier
code_count += 1
# Calculate averages
average_multiplier = total_multiplier / total_count if total_count > 0 else Decimal('0')
average_capital_multiplier = capital_multiplier / capital_count if capital_count > 0 else Decimal('0')
average_code_multiplier = code_multiplier / code_count if code_count > 0 else Decimal('0')
return {
'overall_average': average_multiplier,
'capital_average': average_capital_multiplier,
'code_average': average_code_multiplier
}
def calculate_pool_rewards_summary(csv_file_path):
pool_rewards = defaultdict(lambda: {'daily_reward_sum': 0, 'total_current_user_reward_sum': 0})
with open(csv_file_path, 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
if not is_valid_stake(row):
continue
pool_id = row['poolId']
daily_reward = float(row['daily_reward']) / (10 ** 18)
total_current_user_reward = float(row['total_current_user_reward']) / (10 ** 18)
pool_rewards[pool_id]['daily_reward_sum'] += daily_reward
pool_rewards[pool_id]['total_current_user_reward_sum'] += total_current_user_reward
return pool_rewards
def calculate_power_factor(staking_period_days):
# This is a simplified power factor calculation
# Adjust this based on the actual implementation
return 1 + (staking_period_days / 365)
def get_crypto_price(crypto_id):
base_url = "https://api.coingecko.com/api/v3/simple/price"
params = {
"ids": crypto_id,
"vs_currencies": "usd"
}
try:
response = requests.get(base_url, params=params)
response.raise_for_status() # Raises an HTTPError for bad responses
data = response.json()
if crypto_id in data and "usd" in data[crypto_id]:
return data[crypto_id]["usd"]
else:
return None
except requests.RequestException as e:
print(f"An error occurred: {e}")
return None
def calculate_mor_rewards(mor_daily_emission, staking_period_days,mor_price, eth_price):
calculator = OptimizedRewardCalculator()
total_virtual_steth = calculator.get_virtual_steth_pool(0)
# Calculate power factor
power_factor = calculate_power_factor(staking_period_days)
# Calculate APR
apr = (mor_daily_emission * 365 * mor_price * power_factor) / (total_virtual_steth * eth_price)
# Calculate APY assuming compounding once per year
apy = (1 + apr) ** 1 - 1
# Calculate daily MOR rewards per 1 deposited stETH
daily_mor_rewards = (mor_daily_emission * power_factor) / total_virtual_steth
return apy, daily_mor_rewards
def give_more_reward_response():
mor_daily_emission = 3456 # Replace with the actual daily emission
staking_periods = [0, 365, 730, 1095, 1460, 1825, 2190] # 0, 1 year, 2 years, 3 years, 4 years, 5 years, 6 years
# print(f"MOR Rewards Calculator")
# print(f"MOR Daily Emission: {mor_daily_emission}")
mor_price = get_crypto_price("morpheusai")
eth_price = get_crypto_price("staked-ether") # WETH address
# print("\nAPY per 1 deposited stETH:")
rewards_data = {
"apy_per_steth": [],
"daily_mor_rewards_per_steth": []
}
for period in staking_periods:
apy, daily_mor_rewards = calculate_mor_rewards(mor_daily_emission, period,mor_price, eth_price)
rewards_data["apy_per_steth"].append({
"staking_period": period,
"apy": f"{apy:.2%}"
})
rewards_data["daily_mor_rewards_per_steth"].append({
"staking_period": period,
"daily_mor_rewards": f"{daily_mor_rewards:.6f}"
})
return rewards_data
def get_wallet_stake_info(csv_file_path):
wallet_info = {}
# Read and process CSV data
with open(csv_file_path, 'r') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
if not is_valid_stake(row):
continue
wallet = row['user']
# Calculate stake time
claim_lock_start = int(row['claimLockStart'])
claim_lock_end = int(row['claimLockEnd'])
stake_time = timedelta(seconds=claim_lock_end - claim_lock_start)
# Get power multiplier
power_multiplier = int(row['multiplier'])
# Update wallet info
if wallet not in wallet_info or stake_time > wallet_info[wallet]['stake_time']:
wallet_info[wallet] = {
'stake_time': stake_time,
'power_multiplier': power_multiplier
}
stake_times = np.array([v["stake_time"].total_seconds() for v in wallet_info.values()])
power_multipliers = np.array([v["power_multiplier"]/1e25 for v in wallet_info.values()])
year_in_seconds = 365.25 * 24 * 60 * 60
stake_times_in_years = stake_times / year_in_seconds
def bin_data_custom_ranges(data, bins):
bin_indices = np.digitize(data, bins, right=True)
frequencies = np.bincount(bin_indices, minlength=len(bins))[1:]
ranges = [[float(bins[i]), float(bins[i+1]) if i < len(bins)-2 else None] for i in range(len(bins)-1)]
return ranges, frequencies.tolist()
stake_time_bins_years = [0, 1, 2, 3, 4, 5, 1000] # Using 1000 years as an effective "infinity"
stake_time_ranges, stake_time_frequencies = bin_data_custom_ranges(stake_times_in_years, stake_time_bins_years)
power_multiplier_bins = np.linspace(np.min(power_multipliers), np.max(power_multipliers), 11)
power_multiplier_ranges, power_multiplier_frequencies = bin_data_custom_ranges(power_multipliers, power_multiplier_bins)
output = {
"stake_time": {
"ranges": stake_time_ranges,
"frequencies": stake_time_frequencies
},
"power_multiplier": {
"ranges": power_multiplier_ranges,
"frequencies": power_multiplier_frequencies
}
}
return output
emissions.py
This script imports the raw markdown emission schedule and converts it into a csv file for reading, then it get the data from the csv into a dictionary, calculates and returns the emissions from each bucket for the current day along with total emissions till now in a JSON format:
import pandas as pd
from typing import List, Dict
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def read_emission_schedule(today_date: datetime, file_path: str) -> Dict:
"""
Read the emission schedule CSV file and return a dictionary with processed data.
Args:
file_path (str): Path to the CSV file
today_date (datetime): Current date
Returns:
Dict: Dictionary containing processed emission data
"""
try:
df = pd.read_csv(file_path, sep='|', skipinitialspace=True)
df = df.dropna(axis=1, how='all') # Remove empty columns
# Strip whitespace from column names and string columns
df.columns = df.columns.str.strip()
df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
df['Date'] = pd.to_datetime(df['Date'], format='%m/%d/%y', errors='coerce')
numeric_columns = df.columns.drop(['Day', 'Date'])
df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce')
# Filter data up to today's date
df_until_today = df[df['Date'] <= today_date]
if df_until_today.empty:
logger.warning("No data found up to the specified date.")
return {'new_emissions': {}, 'total_emissions': {}}
# Calculate new emissions for today
last_day = df_until_today.iloc[-1]
previous_day = df_until_today.iloc[-2] if len(df_until_today) > 1 else pd.Series(0, index=last_day.index)
emission_categories = ['Capital Emission', 'Code Emission', 'Compute Emission', 'Community Emission',
'Protection Emission']
new_emissions = {category: last_day[category] - previous_day[category] for category in emission_categories}
total_emissions = {category: df_until_today[category].sum() for category in emission_categories}
new_emissions['Total Emission'] = sum(new_emissions.values())
total_emissions['Total Emission'] = df_until_today['Total Emission'].sum()
logger.info(f"Successfully processed emission data up to {today_date}")
return {
'new_emissions': new_emissions,
'total_emissions': total_emissions
}
except FileNotFoundError:
logger.error(f"Emission schedule file not found: {file_path}")
raise
except Exception as e:
logger.error(f"Error processing emission schedule: {str(e)}")
raise
def calculate_total_emissions(df: pd.DataFrame) -> List[Dict]:
"""
Calculate the total emissions for each category.
Args:
df (pd.DataFrame): DataFrame containing the emission schedule data
Returns:
List[Dict]: List of dictionaries containing the total emissions for each category
"""
emission_categories = ['Capital Emission', 'Code Emission', 'Compute Emission', 'Community Emission',
'Protection Emission']
return [{"category": category, "total": df[category].sum()} for category in emission_categories]
position.py
Calculates Uniswap V3 pool position for Morpheus in terms of USD, MOR and stETH equivalent. It fetches the asset balances for the given Uniswap NFT and then converts them into the USD or token equivalent.
from web3 import Web3
import os, json
import math
from collections import defaultdict
import requests
from app.core.config import (ARB_RPC_URL, STETH_TOKEN_ADDRESS, MOR_ARBITRUM_ADDRESS, UNISWAP_V3_POSITIONS_NFT_ADDRESS,
UNISWAP_V3_FACTORY_ADDRESS, POSITIONS_NFT_ABI, FACTORY_NFT_ABI, POOL_ABI)
w3 = Web3(Web3.HTTPProvider(ARB_RPC_URL))
MOR_TOKEN_ADDRESS = MOR_ARBITRUM_ADDRESS
DEX_API_URL = "https://api.dexscreener.io/latest/dex/tokens/{}"
positions_nft_contract = w3.eth.contract(address=w3.to_checksum_address(UNISWAP_V3_POSITIONS_NFT_ADDRESS),
abi=POSITIONS_NFT_ABI)
factory_contract = w3.eth.contract(address=w3.to_checksum_address(UNISWAP_V3_FACTORY_ADDRESS),
abi=FACTORY_NFT_ABI)
def fetch_token_price(token_address):
"""Fetches the token price in USD from the Dex Screener API."""
api_url = DEX_API_URL.format(token_address)
response = requests.get(api_url)
if response.status_code == 200:
data = response.json()
if 'pairs' in data and len(data['pairs']) > 0 and 'priceUsd' in data['pairs'][0]:
return float(data['pairs'][0]['priceUsd'])
return None
def fetch_all_nfts(address):
"""Fetches all NFTs owned by the address."""
balance = positions_nft_contract.functions.balanceOf(address).call()
nfts = []
for i in range(balance):
token_id = positions_nft_contract.functions.tokenOfOwnerByIndex(address, i).call()
nfts.append(token_id)
return nfts
def get_asset_balances(token_id):
"""Fetches the asset balances for a specific NFT position."""
position = positions_nft_contract.functions.positions(token_id).call()
# Extract relevant information
token0 = position[2]
token1 = position[3]
fee = position[4]
tick_lower = position[5]
tick_upper = position[6]
liquidity = position[7]
# Get pool address
pool_address = factory_contract.functions.getPool(token0, token1, fee).call()
pool_contract = w3.eth.contract(address=pool_address, abi=POOL_ABI)
# Fetch current tick and sqrt price
slot0 = pool_contract.functions.slot0().call()
sqrt_price_x96 = slot0[0]
current_tick = slot0[1]
# Calculate amounts
amount0, amount1 = calculate_amounts(liquidity, sqrt_price_x96, current_tick, tick_lower, tick_upper)
return {
'token0': {'address': token0, 'amount': amount0},
'token1': {'address': token1, 'amount': amount1},
'fee': fee,
'liquidity': liquidity,
'tick_lower': tick_lower,
'tick_upper': tick_upper,
'current_tick': current_tick
}
def calculate_amounts(liquidity, sqrt_price_x96, tick_current, tick_lower, tick_upper):
"""Calculates the token balances based on the tick and liquidity."""
sqrt_ratio_a = math.sqrt(1.0001 ** tick_lower)
sqrt_ratio_b = math.sqrt(1.0001 ** tick_upper)
sqrt_price = sqrt_price_x96 / (1 << 96)
if tick_current < tick_lower:
amount0 = liquidity * (1 / sqrt_ratio_a - 1 / sqrt_ratio_b)
amount1 = 0
elif tick_current < tick_upper:
amount0 = liquidity * (1 / sqrt_price - 1 / sqrt_ratio_b)
amount1 = liquidity * (sqrt_price - sqrt_ratio_a)
else:
amount0 = 0
amount1 = liquidity * (sqrt_ratio_b - sqrt_ratio_a)
return amount0 / (10 ** 18), amount1 / (10 ** 18) # Convert to standard unit
def protocol_liquidity(address):
"""Fetches and calculates the protocol's liquidity and returns it in USD, MOR, and stETH values."""
nft_ids = fetch_all_nfts(address)
if not nft_ids:
# print(f"No NFTs found for address {address}")
return
# print(f"Found {len(nft_ids)} NFTs for address {address}")
# Fetch MOR and stETH prices
mor_price = fetch_token_price(MOR_TOKEN_ADDRESS)
steth_price = fetch_token_price(STETH_TOKEN_ADDRESS)
if mor_price is None or steth_price is None:
raise Exception("Could not fetch MOR or stETH prices.")
aggregated_positions = defaultdict(lambda: {'token0': {'balance': 0, 'address': None},
'token1': {'balance': 0, 'address': None},
'liquidity': 0})
total_value_usd = 0
for nft_id in nft_ids:
balances = get_asset_balances(nft_id)
key = f"{balances['token0']['address']}_{balances['token1']['address']}_{balances['fee']}"
aggregated_positions[key]['token0']['address'] = balances['token0']['address']
aggregated_positions[key]['token1']['address'] = balances['token1']['address']
aggregated_positions[key]['token0']['balance'] += balances['token0']['amount']
aggregated_positions[key]['token1']['balance'] += balances['token1']['amount']
aggregated_positions[key]['liquidity'] += balances['liquidity']
# Calculate the total value in USD for the position
token0_value_usd = balances['token0']['amount'] * mor_price
token1_value_usd = balances['token1']['amount'] * steth_price
total_value_usd += token0_value_usd + token1_value_usd
# Convert defaultdict to regular dict for JSON serialization
aggregated_positions = dict(aggregated_positions)
# Return USD, MOR, and stETH values
return {
"positions": aggregated_positions,
"total_value_usd": total_value_usd,
"mor_value": total_value_usd / mor_price if mor_price else 0,
"steth_value": total_value_usd / steth_price if steth_price else 0
}
#
# def main():
# result = protocol_liquidity("0x151c2b49CdEC10B150B2763dF3d1C00D70C90956")
# print(result)
#
#
# if __name__ == "__main__":
# main()
rewards.py
Gets daily reward, total pending reward, and total earned reward for a specific address across all pools.
import json
from datetime import datetime, timedelta
import os
import logging
import csv
from collections import defaultdict
from app.core.config import web3, distribution_contract
w3=web3
contract=distribution_contract
def get_rewards_info(address):
"""
Get daily reward, total pending reward, and total earned reward for a specific address across all pools.
:param address: Ethereum address of the user
:return: A dictionary containing rewards info for each pool and total rewards
"""
rewards_info = {
'pools': {},
'total_current_user_reward': 0,
'total_daily_reward': 0,
'total_earned_reward': 0
}
pool_id = 0
while True:
try:
pool_info = contract.functions.pools(pool_id).call()
except:
# If we can't get pool info, we've reached the end of the pools
break
# Get current user reward (pending reward) for this pool
current_user_reward = contract.functions.getCurrentUserReward(pool_id, address).call()
rewards_info['total_current_user_reward'] += current_user_reward
# Get user data
user_data = contract.functions.usersData(address, pool_id).call()
deposited = user_data[1]
last_stake = user_data[0]
# Get pool data
pool_data = contract.functions.poolsData(pool_id).call()
total_virtual_deposited = pool_data[2]
# Unpack pool info
payout_start, decrease_interval, _, _, _, initial_reward, reward_decrease, _, _ = pool_info
# Calculate daily reward
current_time = datetime.now().timestamp()
intervals_passed = (current_time - payout_start) // decrease_interval
current_interval_reward = max(0, initial_reward - (intervals_passed * reward_decrease))
if total_virtual_deposited > 0:
daily_reward = (current_interval_reward * deposited * 86400) // (total_virtual_deposited * decrease_interval)
else:
daily_reward = 0
rewards_info['total_daily_reward'] += daily_reward
# Calculate total earned reward (this is an estimate, as we don't have exact claim history)
time_staked = current_time - last_stake
total_earned_reward = (daily_reward * time_staked) // 86400
rewards_info['total_earned_reward'] += total_earned_reward
# Store pool-specific info
rewards_info['pools'][pool_id] = {
'current_user_reward': current_user_reward,
'daily_reward': daily_reward,
'earned_reward': total_earned_reward
}
pool_id += 1
return rewards_info
# Example usage
# user_address = '0x22E0225540ccf80aB3C4F029F3dE75dB785754A3'
# rewards_info = get_rewards_info(user_address)
# print(f"Total Current User Reward: {rewards_info['total_current_user_reward']}")
# print(f"Total Daily Reward: {rewards_info['total_daily_reward']}")
# print(f"Total Earned Reward (estimate): {rewards_info['total_earned_reward']}")
# print("\nPer-pool breakdown:")
# for pool_id, pool_info in rewards_info['pools'].items():
# print(f"Pool {pool_id}:")
# print(f" Current User Reward: {pool_info['current_user_reward']}")
# print(f" Daily Reward: {pool_info['daily_reward']}")
# print(f" Earned Reward (estimate): {pool_info['earned_reward']}")