Capital Helpers
Contains main business logic for fetching essential capital data
Here are the snippets for capital helpers that contain the main logic that fetches and updates the data for the capital endpoints:
1. capital_metrics.py
capital_metrics.py
def process_transactions(df):
balances = defaultdict(float)
for _, row in df.iterrows():
balances[row['User']] += row['Amount']
return balances
def safe_divide(df, column, divisor):
try:
df[column] = pd.to_numeric(df[column], errors='coerce')
df[column] = df[column] / divisor
except Exception as e:
logger.error(f"Error converting and dividing {column}: {str(e)}")
return df
def get_total_supply_and_staker_info():
try:
user_staked_df = get_data_from_db(USER_STAKED_NAME)
user_withdrawn_df = get_data_from_db(USER_WITHDRAWN_NAME)
user_staked_df = safe_divide(user_staked_df, 'Amount', 1e18)
user_withdrawn_df = safe_divide(user_withdrawn_df, 'Amount', 1e18)
except Exception as e:
logger.error(f"Error reading sheets: {str(e)}")
return OrderedDict(), {}, OrderedDict(), OrderedDict(), OrderedDict(), OrderedDict(), pd.DataFrame()
required_columns = ["Timestamp", "TransactionHash", "BlockNumber", "PoolId", "User", "Amount"]
if not all(col in user_staked_df.columns
and
col in user_withdrawn_df.columns for col in required_columns):
logger.error(f"DataFrames are missing required columns. Available columns: "
f"Staked: {user_staked_df.columns}, Withdrawn: {user_withdrawn_df.columns}")
return OrderedDict(), {}, OrderedDict(), OrderedDict(), OrderedDict(), OrderedDict(), pd.DataFrame()
try:
user_staked_df['Timestamp'] = pd.to_datetime(user_staked_df['Timestamp'], errors='coerce')
user_withdrawn_df['Timestamp'] = pd.to_datetime(user_withdrawn_df['Timestamp'], errors='coerce')
user_staked_df = user_staked_df.dropna(subset=['Timestamp'])
user_withdrawn_df = user_withdrawn_df.dropna(subset=['Timestamp'])
staked_balances = process_transactions(user_staked_df)
withdrawn_balances = process_transactions(user_withdrawn_df)
final_balances = defaultdict(float)
for address, amount in staked_balances.items():
final_balances[address] = amount - withdrawn_balances.get(address, 0)
user_staked_df['Date'] = user_staked_df['Timestamp'].dt.date
user_withdrawn_df['Date'] = user_withdrawn_df['Timestamp'].dt.date
daily_staked = user_staked_df.groupby('Date')['Amount'].sum().reset_index()
daily_withdrawn = user_withdrawn_df.groupby('Date')['Amount'].sum().reset_index()
daily_net = pd.merge(daily_staked, daily_withdrawn, on='Date', how='outer', suffixes=('_staked', '_withdrawn'))
daily_net = daily_net.fillna(0)
daily_net['Net_Staked'] = daily_net['Amount_staked'] - daily_net['Amount_withdrawn']
daily_net['Cumulative_Net_Staked'] = daily_net['Net_Staked'].cumsum()
daily_net = daily_net.sort_values('Date')
json_output = OrderedDict()
total_stakers_by_date = OrderedDict()
active_stakers_by_date = OrderedDict()
currently_staked_by_date = OrderedDict()
total_staked_by_date = OrderedDict()
cumulative_stakers = set()
cumulative_staked = 0
unique_users_staked_df = user_staked_df.drop_duplicates(subset=['User'])
for _, row in daily_net.iterrows():
date_str = row['Date'].strftime('%d/%m/%Y')
json_output[date_str] = {
'Staked': round(row['Amount_staked'], 4),
'Withdrawn': round(row['Amount_withdrawn'], 4),
'Net_Staked': round(row['Net_Staked'], 4),
'Cumulative_Net_Staked': round(row['Cumulative_Net_Staked'], 4)
}
daily_stakers = set(unique_users_staked_df[unique_users_staked_df['Date'] == row['Date']]['User'])
cumulative_stakers.update(daily_stakers)
cumulative_staked += row['Amount_staked']
total_stakers_by_date[date_str] = len(cumulative_stakers)
unique_active_stakers = len(set(addr for addr in cumulative_stakers if final_balances[addr] > 0))
active_stakers_by_date[date_str] = unique_active_stakers
currently_staked_by_date[date_str] = round(row['Cumulative_Net_Staked'], 4)
total_staked_by_date[date_str] = round(cumulative_staked, 4)
return (json_output, dict(
final_balances), total_stakers_by_date, active_stakers_by_date, currently_staked_by_date,
total_staked_by_date, daily_net)
except Exception as e:
logger.error(f"Error processing data: {str(e)}")
return OrderedDict(), {}, OrderedDict(), OrderedDict(), OrderedDict(), OrderedDict(), pd.DataFrame()
def get_bridged_overplus_amounts_by_date():
try:
bridged_df = get_data_from_db(OVERPLUS_BRIDGED_NAME)
except Exception as e:
logger.error(f"Error reading sheets: {str(e)}")
return OrderedDict(), pd.DataFrame()
required_columns = ["Timestamp", "TransactionHash", "BlockNumber", "amount", "uniqueId"]
if not all(col in bridged_df.columns for col in required_columns):
logger.error(f"DataFrame is missing required columns")
return OrderedDict(), pd.DataFrame()
try:
bridged_df['Timestamp'] = pd.to_datetime(bridged_df['Timestamp'], errors='coerce')
bridged_df = bridged_df.dropna(subset=['Timestamp'])
bridged_df['amount'] = bridged_df['amount'].astype(str)
bridged_df['amount'] = bridged_df['amount'].apply(lambda x: float(x) / 1e18)
bridged_df['Date'] = bridged_df['Timestamp'].dt.date
daily_bridged = bridged_df.groupby('Date')['amount'].sum().reset_index()
daily_bridged['Cumulative_Bridged'] = daily_bridged['amount'].cumsum()
daily_bridged = daily_bridged.sort_values('Date')
json_output = OrderedDict()
for _, row in daily_bridged.iterrows():
date_str = row['Date'].strftime('%d/%m/%Y')
json_output[date_str] = {
'Daily_Bridged': round(row['amount'], 4),
'Cumulative_Bridged': round(row['Cumulative_Bridged'], 4)
}
return json_output
except Exception as e:
logger.error(f"Error processing data: {str(e)}")
return OrderedDict(), pd.DataFrame()
def get_all_claim_metrics():
today = datetime.today()
emissions_data = {}
claimed_capital_rewards = 0
claimed_code_rewards = 0
try:
emissions_data = read_emission_schedule(today, EMISSIONS_SHEET_NAME)
except Exception as e:
print(f"An error occurred: {str(e)}")
total_code_emissions = emissions_data['total_emissions']['Code Emission']
total_capital_emissions = emissions_data['total_emissions']['Capital Emission']
claimed_filter = distribution_contract.events.UserClaimed.create_filter(from_block=MAINNET_BLOCK_1ST_JAN_2024,
to_block='latest')
events = claimed_filter.get_all_entries()
for event in events:
amount = (event['args']['amount'] / 1e18)
pool_id = int(event['args']['poolId'])
if pool_id == 0:
claimed_capital_rewards += amount
elif pool_id == 1:
claimed_code_rewards += amount
else:
continue
unclaimed_capital_emissions = total_capital_emissions - claimed_capital_rewards
unclaimed_code_emissions = total_code_emissions - claimed_code_rewards
total_emissions = total_capital_emissions + total_code_emissions
total_claimed_rewards = claimed_capital_rewards + claimed_code_rewards
total_unclaimed_rewards = total_emissions - total_claimed_rewards
stakereward_analysis = calculate_pool_rewards_summary()
stakereward_analysis = {str(key): value for key, value in stakereward_analysis.items()}
total_capital_staked_reward_sum = stakereward_analysis["0"]["total_current_user_reward_sum"]
total_code_staked_reward_sum = stakereward_analysis["1"]["total_current_user_reward_sum"]
claim_metrics = {
"capital": {
"claimed_capital_rewards": claimed_capital_rewards,
"unclaimed_capital_emissions": unclaimed_capital_emissions,
"total_capital_staked_reward_sum": total_capital_staked_reward_sum,
"total_capital_emissions": total_capital_emissions,
},
"code": {
"claimed_code_rewards": claimed_code_rewards,
"unclaimed_code_emissions": unclaimed_code_emissions,
"total_code_staked_reward_sum": total_code_staked_reward_sum,
"total_code_emissions": total_code_emissions,
},
"total": {
"total_claimed_rewards": total_claimed_rewards,
"total_unclaimed_rewards": total_unclaimed_rewards,
"total_staked_reward_sum": total_capital_staked_reward_sum + total_code_staked_reward_sum,
"total_emissions": total_emissions
}
}
return claim_metrics
def get_capital_metrics():
result = get_total_supply_and_staker_info()
if len(result) == 7:
json_output, final_balances, total_stakers, active_stakers, currently_staked, total_staked, daily_net = result
capital_master_dict = {
"detailed_daily_staking_data": json_output,
"number_of_total_capital_providers_by_date": total_stakers,
"number_of_active_capital_providers_by_date": active_stakers,
"total_staked_steth_amount_by_date": total_staked,
"currently_staked_steth_amount_by_date": currently_staked,
"bridged_overplus_amount_by_date": get_bridged_overplus_amounts_by_date(),
"claim_metrics": get_all_claim_metrics()
}
return capital_master_dict
else:
print("An error occurred while processing the data.")
if __name__ == '__main__':
get_capital_metrics()