Staking Helpers

Contains main business logic for staking metrics

Here are the snippets for staking helpers that contain the main logic that fetches and updates the data for the staking endpoints:


  1. distribution.py

This script serves as the base for the staking logic as it interacts with the Distribution.sol contract, it processes the target events to get data for claim, stake, stake time, reward and multiplier based data into a csv which is then imported by the endpoints to update and communicate the data, it tracks last block number in the csv and updates the data periodically to the current block so all records are up to date without needing to call expensive transactions each time which start from the first block.

import json
from web3 import Web3
import os
import logging
from datetime import datetime, timedelta
import csv
from app.core.config import ETH_RPC_URL, distribution_contract, DISTRIBUTION_PROXY_ADDRESS
from collections import defaultdict

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

RPC_URL = ETH_RPC_URL
START_BLOCK = 20180927
BATCH_SIZE = 1000000


class EventProcessor:
    def __init__(self, output_dir):
        self.web3 = Web3(Web3.HTTPProvider(RPC_URL))
        self.contract = distribution_contract
        self.distribution_abi = self.contract.abi
        self.output_dir = output_dir

    def get_events_in_batches(self, start_block, end_block, event_name):
        current_start = start_block
        while current_start <= end_block:
            current_end = min(current_start + BATCH_SIZE, end_block)
            try:
                yield from self.get_events(current_start, current_end, event_name)
            except Exception as e:
                logger.error(f"Error getting events from block {current_start} to {current_end}: {str(e)}")
            current_start = current_end + 1

    def get_events(self, from_block, to_block, event_name):
        try:
            event_filter = getattr(self.contract.events, event_name).create_filter(from_block=from_block,
                                                                                   to_block=to_block)
            return event_filter.get_all_entries()
        except Exception as e:
            logger.error(f"Error getting events for {event_name} from block {from_block} to {to_block}: {str(e)}")
            return []

    def write_to_csv(self, events, filename, headers, mode='w'):
        filepath = os.path.join(self.output_dir, filename)
        with open(filepath, mode, newline='') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=headers)
            if mode == 'w':
                writer.writeheader()
            for event in events:
                row = {
                    'Timestamp': datetime.fromtimestamp(self.web3.eth.get_block(event['blockNumber'])['timestamp']).isoformat(),
                    'TransactionHash': event['transactionHash'].hex(),
                    'BlockNumber': event['blockNumber']
                }
                row.update(event['args'])
                writer.writerow(row)
        logger.info(f"CSV file {filename} has been updated successfully.")

    def get_event_headers(self, event_name):
        event_abi = next((e for e in self.distribution_abi if e['type'] == 'event' and e['name'] == event_name), None)
        if not event_abi:
            raise ValueError(f"Event {event_name} not found in ABI")
        return ['Timestamp', 'TransactionHash', 'BlockNumber'] + [input['name'] for input in event_abi['inputs']]

    def get_last_block_from_csv(self, filename):
        try:
            filepath = os.path.join(self.output_dir, filename)
            with open(filepath, 'r') as csvfile:
                reader = csv.DictReader(csvfile)
                return max(int(row['BlockNumber']) for row in reader)
        except FileNotFoundError:
            return None
        except ValueError:
            logger.warning(f"CSV file {filename} is empty or corrupted. Starting from default block.")
            return None

    def process_events(self, event_name):
        try:
            latest_block = self.web3.eth.get_block('latest')['number']
            headers = self.get_event_headers(event_name)
            filename = f"{event_name.lower()}_events.csv"
            last_processed_block = self.get_last_block_from_csv(filename)

            if last_processed_block is None:
                start_block = START_BLOCK
                mode = 'w'
            else:
                start_block = last_processed_block + 1
                mode = 'a'

            events = list(self.get_events_in_batches(start_block, latest_block, event_name))
            logger.info(f"Processing {len(events)} new {event_name} events from block {start_block} to {latest_block}")

            if events:
                self.write_to_csv(events, filename, headers, mode)
            else:
                logger.info(f"No new events found for {event_name}.")

        except Exception as e:
            logger.error(f"An error occurred in process_events: {str(e)}")
            logger.exception("Exception details:")


class OptimizedMultiplierCalculator:
    def __init__(self):
        self.web3 = Web3(Web3.HTTPProvider(RPC_URL))
        self.contract = distribution_contract

    def get_user_multipliers(self, input_csv, output_csv):
        try:
            with open(input_csv, 'r') as infile, open(output_csv, 'w', newline='') as outfile:
                reader = csv.DictReader(infile)

                if reader.fieldnames is None:
                    default_fieldnames = ['Timestamp', 'TransactionHash', 'BlockNumber', 'user', 'poolId']
                    fieldnames = default_fieldnames + ['multiplier']
                    logger.warning(f"No headers found in {input_csv}. Using default headers: {default_fieldnames}")
                else:
                    fieldnames = reader.fieldnames + ['multiplier']

                writer = csv.DictWriter(outfile, fieldnames=fieldnames)
                writer.writeheader()

                for row in reader:
                    try:
                        pool_id = int(row.get('poolId', 0))
                        user = self.web3.to_checksum_address(
                            row.get('user', '0x0000000000000000000000000000000000000000'))
                        timestamp = datetime.fromisoformat(row.get('Timestamp', datetime.now().isoformat()))
                        block_number = self.get_block_number(timestamp)
                        multiplier = self.contract.functions.getCurrentUserMultiplier(pool_id, user).call(
                            block_identifier=block_number)
                        row['multiplier'] = multiplier
                        writer.writerow(row)
                    except Exception as e:
                        logger.error(f"Error processing row {row}: {str(e)}")
        except Exception as e:
            logger.error(f"Error in get_user_multipliers: {str(e)}")
            raise

    def get_block_number(self, timestamp):
        return self.web3.eth.get_block('latest', full_transactions=False)['number']


class OptimizedRewardCalculator:
    def __init__(self):
        self.web3 = Web3(Web3.HTTPProvider(RPC_URL))
        self.contract = distribution_contract

    def calculate_rewards(self, input_csv, output_csv):
        try:
            with open(input_csv, 'r') as infile, open(output_csv, 'w', newline='') as outfile:
                reader = csv.DictReader(infile)
                
                if reader.fieldnames is None:
                    default_fieldnames = ['Timestamp', 'TransactionHash', 'BlockNumber', 'user', 'poolId', 'multiplier']
                    fieldnames = default_fieldnames + ['daily_reward', 'total_current_user_reward']
                    logger.warning(f"No headers found in {input_csv}. Using default headers: {default_fieldnames}")
                else:
                    fieldnames = reader.fieldnames + ['daily_reward', 'total_current_user_reward']

                writer = csv.DictWriter(outfile, fieldnames=fieldnames)
                writer.writeheader()

                for row in reader:
                    try:
                        address = self.web3.to_checksum_address(
                            row.get('user', '0x0000000000000000000000000000000000000000'))
                        pool_id = int(row.get("poolId", 0))
                        timestamp = datetime.fromisoformat(row.get('Timestamp', datetime.now().isoformat()))
                        block_number = self.get_block_number(timestamp)

                        daily_reward = self.calculate_daily_reward(pool_id, address, block_number)
                        total_reward = self.contract.functions.getCurrentUserReward(pool_id, address).call(
                            block_identifier=block_number)

                        row['daily_reward'] = daily_reward
                        row['total_current_user_reward'] = total_reward
                        writer.writerow(row)
                    except Exception as e:
                        logger.error(f"Error processing row {row}: {str(e)}")
        except Exception as e:
            logger.error(f"Error in calculate_rewards: {str(e)}")
            raise

    def calculate_daily_reward(self, pool_id, address, block_number):
        pool_data = self.contract.functions.poolsData(pool_id).call(block_identifier=block_number)
        total_virtual_deposited = pool_data[2]

        pool_info = self.contract.functions.pools(pool_id).call(block_identifier=block_number)
        payout_start, decrease_interval, _, _, _, initial_reward, reward_decrease, _, _ = pool_info

        user_data = self.contract.functions.usersData(address, pool_id).call(block_identifier=block_number)
        deposited = user_data[1]

        current_time = self.web3.eth.get_block(block_number)['timestamp']
        intervals_passed = (current_time - payout_start) // decrease_interval
        current_interval_reward = max(0, initial_reward - (intervals_passed * reward_decrease))

        if total_virtual_deposited > 0:
            daily_reward = (current_interval_reward * deposited * 86400) // (
                        total_virtual_deposited * decrease_interval)
        else:
            daily_reward = 0

        return daily_reward

    def get_block_number(self, timestamp):
        return self.web3.eth.get_block('latest', full_transactions=False)['number']

    def get_virtual_steth_pool(self, pool_id):
        pools_data = self.contract.functions.poolsData(pool_id).call()
        return pools_data[2] / 1e18

# Example usage
# if __name__ == "__main__":
#     calculator = OptimizedRewardCalculator()
#     calculator.calculate_rewards(
#         input_csv='usermultiplier.csv',
#         output_csv='usermultiplier2.csv'
#     )
#     #processor = EventProcessor()
#     # Example usage:
#     #processor.process_events("UserStaked")
#     #processor.process_events("UserClaimLocked")
#     multiplier = OptimizedMultiplierCalculator()
#     multiplier.get_user_multipliers("userClaimLocked_events.csv","usermultiplier.csv")

1.1 response_distribution.py

Fetches the updated data from the csvs and calculates average power multiplier, stake time, distribution of stakers, reward claim metrics and rewards summary in order to return a final JSON output with all staker related metrics to the main.py API script.

import json
import os
import logging
from datetime import datetime, timedelta
import csv
from collections import defaultdict
from decimal import Decimal
import requests
import ipdb
from helpers.staking_general_helpers.distribution import OptimizedMultiplierCalculator, OptimizedRewardCalculator
import numpy as np
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


def is_valid_stake(row):
    current_time = int(datetime.now().timestamp())
    claim_lock_start = int(row['claimLockStart'])
    claim_lock_end = int(row['claimLockEnd'])
    twenty_years_from_now = current_time + (25 * 365 * 24 * 60 * 60)  # 20 years in seconds

    return (claim_lock_start != 0 and
            claim_lock_end != 0 and
            claim_lock_end > current_time and
            claim_lock_end <= twenty_years_from_now)

def analyze_mor_stakers(csv_file_path):
    # Initialize data structures
    stakers_by_pool = {0: set(), 1: set()}
    stakers_by_pool_and_date = defaultdict(lambda: defaultdict(set))
    total_stake_time = {0: timedelta(), 1: timedelta()}
    stake_count = {0: 0, 1: 0}

    try:
        with open(csv_file_path, 'r') as csvfile:
            reader = csv.DictReader(csvfile)
            for row in reader:
                if not is_valid_stake(row):
                    continue

                timestamp = datetime.fromisoformat(row['Timestamp']).date()
                pool_id = int(row['poolId'])
                user = row['user']

                # Add to overall pool stakers
                stakers_by_pool[pool_id].add(user)

                # Add to date-specific pool stakers
                stakers_by_pool_and_date[timestamp][pool_id].add(user)

                # Calculate stake time
                claim_lock_start = int(row['claimLockStart'])
                claim_lock_end = int(row['claimLockEnd'])
                stake_time = timedelta(seconds=claim_lock_end - claim_lock_start)

                # Add to total stake time
                total_stake_time[pool_id] += stake_time
                stake_count[pool_id] += 1

        logger.info(f"Successfully analyzed MOR stakers from file: {csv_file_path}")

        # Calculate average stake time
        avg_stake_time = {
            pool_id: (total_time / count if count > 0 else timedelta())
            for pool_id, total_time in total_stake_time.items()
            for count in [stake_count[pool_id]]
        }

        # Calculate combined average stake time
        total_combined_stake_time = sum(total_stake_time.values(), timedelta())
        total_combined_stakes = sum(stake_count.values())
        combined_avg_stake_time = total_combined_stake_time / total_combined_stakes if total_combined_stakes > 0 else timedelta()

        # Prepare results
        results = {
            'total_unique_stakers': {
                'pool_0': len(stakers_by_pool[0]),
                'pool_1': len(stakers_by_pool[1]),
                'combined': len(stakers_by_pool[0] | stakers_by_pool[1])
            },
            'daily_unique_stakers': defaultdict(lambda: {'pool_0': 0, 'pool_1': 0, 'combined': 0}),
            'average_stake_time': avg_stake_time,
            'combined_average_stake_time': combined_avg_stake_time,
            'total_stakes': stake_count
        }

        # Process daily data
        for date, pools in stakers_by_pool_and_date.items():
            results['daily_unique_stakers'][date] = {
                'pool_0': len(pools[0]),
                'pool_1': len(pools[1]),
                'combined': len(pools[0] | pools[1])
            }

    except FileNotFoundError:
        logger.error(f"File not found: {csv_file_path}")
        raise
    except PermissionError:
        logger.error(f"Permission denied when trying to read file: {csv_file_path}")
        raise
    except Exception as e:
        logger.error(f"Unexpected error when analyzing MOR stakers from file {csv_file_path}: {str(e)}")
        raise

    return results


def calculate_average_multipliers(csv_file_path):
    total_multiplier = Decimal('0')
    capital_multiplier = Decimal('0')
    code_multiplier = Decimal('0')
    total_count = 0
    capital_count = 0
    code_count = 0

    with open(csv_file_path, 'r') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            if not is_valid_stake(row):
                continue

            multiplier = Decimal(row['multiplier']) / Decimal('1e18')  # Convert from wei to whole units
            total_multiplier += multiplier
            total_count += 1

            if row['poolId'] == '0':  # Capital pool
                capital_multiplier += multiplier
                capital_count += 1
            elif row['poolId'] == '1':  # Code pool
                code_multiplier += multiplier
                code_count += 1

    # Calculate averages
    average_multiplier = total_multiplier / total_count if total_count > 0 else Decimal('0')
    average_capital_multiplier = capital_multiplier / capital_count if capital_count > 0 else Decimal('0')
    average_code_multiplier = code_multiplier / code_count if code_count > 0 else Decimal('0')

    return {
        'overall_average': average_multiplier,
        'capital_average': average_capital_multiplier,
        'code_average': average_code_multiplier
    }
    
def calculate_pool_rewards_summary(csv_file_path):
    pool_rewards = defaultdict(lambda: {'daily_reward_sum': 0, 'total_current_user_reward_sum': 0})
    
    with open(csv_file_path, 'r') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            if not is_valid_stake(row):
                continue

            pool_id = row['poolId']
            daily_reward = float(row['daily_reward']) / (10 ** 18)
            total_current_user_reward = float(row['total_current_user_reward']) / (10 ** 18)
            
            pool_rewards[pool_id]['daily_reward_sum'] += daily_reward
            pool_rewards[pool_id]['total_current_user_reward_sum'] += total_current_user_reward
    
    return pool_rewards

def calculate_power_factor(staking_period_days):
    # This is a simplified power factor calculation
    # Adjust this based on the actual implementation
    return 1 + (staking_period_days / 365)

def get_crypto_price(crypto_id):
    base_url = "https://api.coingecko.com/api/v3/simple/price"
    params = {
        "ids": crypto_id,
        "vs_currencies": "usd"
    }

    try:
        response = requests.get(base_url, params=params)
        response.raise_for_status()  # Raises an HTTPError for bad responses
        data = response.json()

        if crypto_id in data and "usd" in data[crypto_id]:
            return data[crypto_id]["usd"]
        else:
            return None
    except requests.RequestException as e:
        print(f"An error occurred: {e}")
        return None

def calculate_mor_rewards(mor_daily_emission, staking_period_days,mor_price, eth_price):
    calculator = OptimizedRewardCalculator()
    total_virtual_steth = calculator.get_virtual_steth_pool(0)
    # Calculate power factor
    power_factor = calculate_power_factor(staking_period_days)
    # Calculate APR
    apr = (mor_daily_emission * 365 * mor_price * power_factor) / (total_virtual_steth * eth_price)
    # Calculate APY assuming compounding once per year
    apy = (1 + apr) ** 1 - 1
    # Calculate daily MOR rewards per 1 deposited stETH
    daily_mor_rewards = (mor_daily_emission * power_factor) / total_virtual_steth
    return apy, daily_mor_rewards

def give_more_reward_response():
    mor_daily_emission = 3456  # Replace with the actual daily emission
    staking_periods = [0, 365, 730, 1095, 1460, 1825, 2190]  # 0, 1 year, 2 years, 3 years, 4 years, 5 years, 6 years
    
    # print(f"MOR Rewards Calculator")
    # print(f"MOR Daily Emission: {mor_daily_emission}")
    mor_price = get_crypto_price("morpheusai")
    eth_price = get_crypto_price("staked-ether")  # WETH address
    
    # print("\nAPY per 1 deposited stETH:")
    rewards_data = {
        "apy_per_steth": [],
        "daily_mor_rewards_per_steth": []
    }
    
    for period in staking_periods:
        apy, daily_mor_rewards = calculate_mor_rewards(mor_daily_emission, period,mor_price, eth_price)
        rewards_data["apy_per_steth"].append({
            "staking_period": period,
            "apy": f"{apy:.2%}"
        })
        rewards_data["daily_mor_rewards_per_steth"].append({
            "staking_period": period,
            "daily_mor_rewards": f"{daily_mor_rewards:.6f}"
        })
    return rewards_data

def get_wallet_stake_info(csv_file_path):
    wallet_info = {}
    # Read and process CSV data
    with open(csv_file_path, 'r') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            if not is_valid_stake(row):
                continue

            wallet = row['user']
        
            # Calculate stake time
            claim_lock_start = int(row['claimLockStart'])
            claim_lock_end = int(row['claimLockEnd'])
            stake_time = timedelta(seconds=claim_lock_end - claim_lock_start)
            
            # Get power multiplier
            power_multiplier = int(row['multiplier'])
            
            # Update wallet info
            if wallet not in wallet_info or stake_time > wallet_info[wallet]['stake_time']:
                wallet_info[wallet] = {
                    'stake_time': stake_time,
                    'power_multiplier': power_multiplier
                }
    
    stake_times = np.array([v["stake_time"].total_seconds() for v in wallet_info.values()])
    power_multipliers = np.array([v["power_multiplier"]/1e25 for v in wallet_info.values()])

    year_in_seconds = 365.25 * 24 * 60 * 60
    stake_times_in_years = stake_times / year_in_seconds

    def bin_data_custom_ranges(data, bins):
        bin_indices = np.digitize(data, bins, right=True)
        frequencies = np.bincount(bin_indices, minlength=len(bins))[1:]
        ranges = [[float(bins[i]), float(bins[i+1]) if i < len(bins)-2 else None] for i in range(len(bins)-1)]
        return ranges, frequencies.tolist()

    stake_time_bins_years = [0, 1, 2, 3, 4, 5, 1000]  # Using 1000 years as an effective "infinity"
    stake_time_ranges, stake_time_frequencies = bin_data_custom_ranges(stake_times_in_years, stake_time_bins_years)

    power_multiplier_bins = np.linspace(np.min(power_multipliers), np.max(power_multipliers), 11)
    power_multiplier_ranges, power_multiplier_frequencies = bin_data_custom_ranges(power_multipliers, power_multiplier_bins)

    output = {
        "stake_time": {
            "ranges": stake_time_ranges,
            "frequencies": stake_time_frequencies
        },
        "power_multiplier": {
            "ranges": power_multiplier_ranges,
            "frequencies": power_multiplier_frequencies
        }
    }

    return output


  1. emissions.py

This script imports the raw markdown emission schedule and converts it into a csv file for reading, then it get the data from the csv into a dictionary, calculates and returns the emissions from each bucket for the current day along with total emissions till now in a JSON format:

import pandas as pd
from typing import List, Dict
from datetime import datetime
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


def read_emission_schedule(today_date: datetime, file_path: str) -> Dict:
    """
    Read the emission schedule CSV file and return a dictionary with processed data.

    Args:
    file_path (str): Path to the CSV file
    today_date (datetime): Current date

    Returns:
    Dict: Dictionary containing processed emission data
    """
    try:
        df = pd.read_csv(file_path, sep='|', skipinitialspace=True)
        df = df.dropna(axis=1, how='all')  # Remove empty columns

        # Strip whitespace from column names and string columns
        df.columns = df.columns.str.strip()
        df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)

        df['Date'] = pd.to_datetime(df['Date'], format='%m/%d/%y', errors='coerce')
        numeric_columns = df.columns.drop(['Day', 'Date'])
        df[numeric_columns] = df[numeric_columns].apply(pd.to_numeric, errors='coerce')

        # Filter data up to today's date
        df_until_today = df[df['Date'] <= today_date]

        if df_until_today.empty:
            logger.warning("No data found up to the specified date.")
            return {'new_emissions': {}, 'total_emissions': {}}

        # Calculate new emissions for today
        last_day = df_until_today.iloc[-1]
        previous_day = df_until_today.iloc[-2] if len(df_until_today) > 1 else pd.Series(0, index=last_day.index)

        emission_categories = ['Capital Emission', 'Code Emission', 'Compute Emission', 'Community Emission',
                               'Protection Emission']

        new_emissions = {category: last_day[category] - previous_day[category] for category in emission_categories}
        total_emissions = {category: df_until_today[category].sum() for category in emission_categories}

        new_emissions['Total Emission'] = sum(new_emissions.values())
        total_emissions['Total Emission'] = df_until_today['Total Emission'].sum()

        logger.info(f"Successfully processed emission data up to {today_date}")
        return {
            'new_emissions': new_emissions,
            'total_emissions': total_emissions
        }
    except FileNotFoundError:
        logger.error(f"Emission schedule file not found: {file_path}")
        raise
    except Exception as e:
        logger.error(f"Error processing emission schedule: {str(e)}")
        raise


def calculate_total_emissions(df: pd.DataFrame) -> List[Dict]:
    """
    Calculate the total emissions for each category.

    Args:
    df (pd.DataFrame): DataFrame containing the emission schedule data

    Returns:
    List[Dict]: List of dictionaries containing the total emissions for each category
    """
    emission_categories = ['Capital Emission', 'Code Emission', 'Compute Emission', 'Community Emission',
                           'Protection Emission']
    return [{"category": category, "total": df[category].sum()} for category in emission_categories]

  1. position.py

Calculates Uniswap V3 pool position for Morpheus in terms of USD, MOR and stETH equivalent. It fetches the asset balances for the given Uniswap NFT and then converts them into the USD or token equivalent.

from web3 import Web3
import os, json
import math
from collections import defaultdict
import requests
from app.core.config import (ARB_RPC_URL, STETH_TOKEN_ADDRESS, MOR_ARBITRUM_ADDRESS, UNISWAP_V3_POSITIONS_NFT_ADDRESS,
                             UNISWAP_V3_FACTORY_ADDRESS, POSITIONS_NFT_ABI, FACTORY_NFT_ABI, POOL_ABI)

w3 = Web3(Web3.HTTPProvider(ARB_RPC_URL))

MOR_TOKEN_ADDRESS = MOR_ARBITRUM_ADDRESS
DEX_API_URL = "https://api.dexscreener.io/latest/dex/tokens/{}"

positions_nft_contract = w3.eth.contract(address=w3.to_checksum_address(UNISWAP_V3_POSITIONS_NFT_ADDRESS),
                                         abi=POSITIONS_NFT_ABI)
factory_contract = w3.eth.contract(address=w3.to_checksum_address(UNISWAP_V3_FACTORY_ADDRESS),
                                   abi=FACTORY_NFT_ABI)


def fetch_token_price(token_address):
    """Fetches the token price in USD from the Dex Screener API."""
    api_url = DEX_API_URL.format(token_address)
    response = requests.get(api_url)

    if response.status_code == 200:
        data = response.json()
        if 'pairs' in data and len(data['pairs']) > 0 and 'priceUsd' in data['pairs'][0]:
            return float(data['pairs'][0]['priceUsd'])

    return None


def fetch_all_nfts(address):
    """Fetches all NFTs owned by the address."""
    balance = positions_nft_contract.functions.balanceOf(address).call()
    nfts = []
    for i in range(balance):
        token_id = positions_nft_contract.functions.tokenOfOwnerByIndex(address, i).call()
        nfts.append(token_id)
    return nfts


def get_asset_balances(token_id):
    """Fetches the asset balances for a specific NFT position."""
    position = positions_nft_contract.functions.positions(token_id).call()

    # Extract relevant information
    token0 = position[2]
    token1 = position[3]
    fee = position[4]
    tick_lower = position[5]
    tick_upper = position[6]
    liquidity = position[7]

    # Get pool address
    pool_address = factory_contract.functions.getPool(token0, token1, fee).call()
    pool_contract = w3.eth.contract(address=pool_address, abi=POOL_ABI)

    # Fetch current tick and sqrt price
    slot0 = pool_contract.functions.slot0().call()
    sqrt_price_x96 = slot0[0]
    current_tick = slot0[1]

    # Calculate amounts
    amount0, amount1 = calculate_amounts(liquidity, sqrt_price_x96, current_tick, tick_lower, tick_upper)

    return {
        'token0': {'address': token0, 'amount': amount0},
        'token1': {'address': token1, 'amount': amount1},
        'fee': fee,
        'liquidity': liquidity,
        'tick_lower': tick_lower,
        'tick_upper': tick_upper,
        'current_tick': current_tick
    }


def calculate_amounts(liquidity, sqrt_price_x96, tick_current, tick_lower, tick_upper):
    """Calculates the token balances based on the tick and liquidity."""
    sqrt_ratio_a = math.sqrt(1.0001 ** tick_lower)
    sqrt_ratio_b = math.sqrt(1.0001 ** tick_upper)
    sqrt_price = sqrt_price_x96 / (1 << 96)

    if tick_current < tick_lower:
        amount0 = liquidity * (1 / sqrt_ratio_a - 1 / sqrt_ratio_b)
        amount1 = 0
    elif tick_current < tick_upper:
        amount0 = liquidity * (1 / sqrt_price - 1 / sqrt_ratio_b)
        amount1 = liquidity * (sqrt_price - sqrt_ratio_a)
    else:
        amount0 = 0
        amount1 = liquidity * (sqrt_ratio_b - sqrt_ratio_a)

    return amount0 / (10 ** 18), amount1 / (10 ** 18)  # Convert to standard unit


def protocol_liquidity(address):
    """Fetches and calculates the protocol's liquidity and returns it in USD, MOR, and stETH values."""
    nft_ids = fetch_all_nfts(address)

    if not nft_ids:
        # print(f"No NFTs found for address {address}")
        return

    # print(f"Found {len(nft_ids)} NFTs for address {address}")

    # Fetch MOR and stETH prices
    mor_price = fetch_token_price(MOR_TOKEN_ADDRESS)
    steth_price = fetch_token_price(STETH_TOKEN_ADDRESS)

    if mor_price is None or steth_price is None:
        raise Exception("Could not fetch MOR or stETH prices.")

    aggregated_positions = defaultdict(lambda: {'token0': {'balance': 0, 'address': None},
                                                'token1': {'balance': 0, 'address': None},
                                                'liquidity': 0})

    total_value_usd = 0

    for nft_id in nft_ids:
        balances = get_asset_balances(nft_id)
        key = f"{balances['token0']['address']}_{balances['token1']['address']}_{balances['fee']}"

        aggregated_positions[key]['token0']['address'] = balances['token0']['address']
        aggregated_positions[key]['token1']['address'] = balances['token1']['address']
        aggregated_positions[key]['token0']['balance'] += balances['token0']['amount']
        aggregated_positions[key]['token1']['balance'] += balances['token1']['amount']
        aggregated_positions[key]['liquidity'] += balances['liquidity']

        # Calculate the total value in USD for the position
        token0_value_usd = balances['token0']['amount'] * mor_price
        token1_value_usd = balances['token1']['amount'] * steth_price
        total_value_usd += token0_value_usd + token1_value_usd

    # Convert defaultdict to regular dict for JSON serialization
    aggregated_positions = dict(aggregated_positions)

    # Return USD, MOR, and stETH values
    return {
        "positions": aggregated_positions,
        "total_value_usd": total_value_usd,
        "mor_value": total_value_usd / mor_price if mor_price else 0,
        "steth_value": total_value_usd / steth_price if steth_price else 0
    }

#
# def main():
#     result = protocol_liquidity("0x151c2b49CdEC10B150B2763dF3d1C00D70C90956")
#     print(result)
#
#
# if __name__ == "__main__":
#     main()

  1. rewards.py

Gets daily reward, total pending reward, and total earned reward for a specific address across all pools.

import json
from datetime import datetime, timedelta
import os
import logging
import csv
from collections import defaultdict
from app.core.config import web3, distribution_contract


w3=web3
contract=distribution_contract


def get_rewards_info(address):
    """
    Get daily reward, total pending reward, and total earned reward for a specific address across all pools.
    
    :param address: Ethereum address of the user
    :return: A dictionary containing rewards info for each pool and total rewards
    """
    rewards_info = {
        'pools': {},
        'total_current_user_reward': 0,
        'total_daily_reward': 0,
        'total_earned_reward': 0
    }

    pool_id = 0
    while True:
        try:
            pool_info = contract.functions.pools(pool_id).call()
        except:
            # If we can't get pool info, we've reached the end of the pools
            break

        # Get current user reward (pending reward) for this pool
        current_user_reward = contract.functions.getCurrentUserReward(pool_id, address).call()
        rewards_info['total_current_user_reward'] += current_user_reward
        
        # Get user data
        user_data = contract.functions.usersData(address, pool_id).call()
        deposited = user_data[1]
        last_stake = user_data[0]
        
        # Get pool data
        pool_data = contract.functions.poolsData(pool_id).call()
        total_virtual_deposited = pool_data[2]
        
        # Unpack pool info
        payout_start, decrease_interval, _, _, _, initial_reward, reward_decrease, _, _ = pool_info
        
        # Calculate daily reward
        current_time = datetime.now().timestamp()
        intervals_passed = (current_time - payout_start) // decrease_interval
        current_interval_reward = max(0, initial_reward - (intervals_passed * reward_decrease))
        
        if total_virtual_deposited > 0:
            daily_reward = (current_interval_reward * deposited * 86400) // (total_virtual_deposited * decrease_interval)
        else:
            daily_reward = 0
        
        rewards_info['total_daily_reward'] += daily_reward
        
        # Calculate total earned reward (this is an estimate, as we don't have exact claim history)
        time_staked = current_time - last_stake
        total_earned_reward = (daily_reward * time_staked) // 86400
        rewards_info['total_earned_reward'] += total_earned_reward
        
        # Store pool-specific info
        rewards_info['pools'][pool_id] = {
            'current_user_reward': current_user_reward,
            'daily_reward': daily_reward,
            'earned_reward': total_earned_reward
        }
        
        pool_id += 1

    return rewards_info

# Example usage
# user_address = '0x22E0225540ccf80aB3C4F029F3dE75dB785754A3'

# rewards_info = get_rewards_info(user_address)
# print(f"Total Current User Reward: {rewards_info['total_current_user_reward']}")
# print(f"Total Daily Reward: {rewards_info['total_daily_reward']}")
# print(f"Total Earned Reward (estimate): {rewards_info['total_earned_reward']}")
# print("\nPer-pool breakdown:")
# for pool_id, pool_info in rewards_info['pools'].items():
#     print(f"Pool {pool_id}:")
#     print(f"  Current User Reward: {pool_info['current_user_reward']}")
#     print(f"  Daily Reward: {pool_info['daily_reward']}")
#     print(f"  Earned Reward (estimate): {pool_info['earned_reward']}")