Module orca_whirlpool.internal.accounts.keyed_account_converter

Expand source code
from typing import Optional
from solders.pubkey import Pubkey
from ..anchor.accounts import WhirlpoolsConfig as AnchorWhirlpoolsConfig, FeeTier as AnchorFeeTier
from ..anchor.accounts import Whirlpool as AnchorWhirlpool, TickArray as AnchorTickArray, Position as AnchorPosition
from ..anchor.accounts import PositionBundle as AnchorPositionBundle, TokenBadge as AnchorTokenBadge
from ..anchor.accounts import WhirlpoolsConfigExtension as AnchorWhirlpoolsConfigExtension
from .types import WhirlpoolsConfig, FeeTier, Whirlpool, TickArray, Position, PositionBundle, WhirlpoolsConfigExtension
from .types import TokenBadge, AccountInfo, MintInfo
from ..utils.token_util import RawMintInfo, RawAccountInfo


class KeyedAccountConverter:
    @staticmethod
    def to_keyed_fee_tier(pubkey: Pubkey, account: Optional[AnchorFeeTier]) -> Optional[FeeTier]:
        if account is None:
            return None
        return FeeTier(
            pubkey=pubkey,
            whirlpools_config=account.whirlpools_config,
            tick_spacing=account.tick_spacing,
            default_fee_rate=account.default_fee_rate,
        )

    @staticmethod
    def to_keyed_position(pubkey: Pubkey, account: Optional[AnchorPosition]) -> Optional[Position]:
        if account is None:
            return None
        return Position(
            pubkey=pubkey,
            whirlpool=account.whirlpool,
            position_mint=account.position_mint,
            liquidity=account.liquidity,
            tick_lower_index=account.tick_lower_index,
            tick_upper_index=account.tick_upper_index,
            fee_growth_checkpoint_a=account.fee_growth_checkpoint_a,
            fee_owed_a=account.fee_owed_a,
            fee_growth_checkpoint_b=account.fee_growth_checkpoint_b,
            fee_owed_b=account.fee_owed_b,
            reward_infos=account.reward_infos,
        )

    @staticmethod
    def to_keyed_tick_array(pubkey: Pubkey, account: Optional[AnchorTickArray]) -> Optional[TickArray]:
        if account is None:
            return None
        return TickArray(
            pubkey=pubkey,
            start_tick_index=account.start_tick_index,
            ticks=account.ticks,
            whirlpool=account.whirlpool,
        )

    @staticmethod
    def to_keyed_whirlpool(pubkey: Pubkey, account: Optional[AnchorWhirlpool]) -> Optional[Whirlpool]:
        if account is None:
            return None
        return Whirlpool(
            pubkey=pubkey,
            whirlpools_config=account.whirlpools_config,
            whirlpool_bump=account.whirlpool_bump,
            tick_spacing=account.tick_spacing,
            tick_spacing_seed=account.tick_spacing_seed,
            fee_rate=account.fee_rate,
            protocol_fee_rate=account.protocol_fee_rate,
            liquidity=account.liquidity,
            sqrt_price=account.sqrt_price,
            tick_current_index=account.tick_current_index,
            protocol_fee_owed_a=account.protocol_fee_owed_a,
            protocol_fee_owed_b=account.protocol_fee_owed_b,
            token_mint_a=account.token_mint_a,
            token_vault_a=account.token_vault_a,
            fee_growth_global_a=account.fee_growth_global_a,
            token_mint_b=account.token_mint_b,
            token_vault_b=account.token_vault_b,
            fee_growth_global_b=account.fee_growth_global_b,
            reward_last_updated_timestamp=account.reward_last_updated_timestamp,
            reward_infos=account.reward_infos,
        )

    @staticmethod
    def to_keyed_whirlpools_config(pubkey: Pubkey, account: Optional[AnchorWhirlpoolsConfig]) -> Optional[WhirlpoolsConfig]:
        if account is None:
            return None
        return WhirlpoolsConfig(
            pubkey=pubkey,
            fee_authority=account.fee_authority,
            collect_protocol_fees_authority=account.collect_protocol_fees_authority,
            reward_emissions_super_authority=account.reward_emissions_super_authority,
            default_protocol_fee_rate=account.default_protocol_fee_rate,
        )

    @staticmethod
    def to_keyed_position_bundle(pubkey: Pubkey, account: Optional[AnchorPositionBundle]) -> Optional[PositionBundle]:
        if account is None:
            return None
        return PositionBundle(
            pubkey=pubkey,
            position_bundle_mint=account.position_bundle_mint,
            position_bitmap=account.position_bitmap,
        )

    @staticmethod
    def to_keyed_whirlpools_config_extension(pubkey: Pubkey, account: Optional[AnchorWhirlpoolsConfigExtension]) -> Optional[WhirlpoolsConfigExtension]:
        if account is None:
            return None
        return WhirlpoolsConfigExtension(
            pubkey=pubkey,
            whirlpools_config=account.whirlpools_config,
            config_extension_authority=account.config_extension_authority,
            token_badge_authority=account.token_badge_authority,
        )

    @staticmethod
    def to_keyed_token_badge(pubkey: Pubkey, account: Optional[AnchorTokenBadge]) -> Optional[TokenBadge]:
        if account is None:
            return None
        return TokenBadge(
            pubkey=pubkey,
            whirlpools_config=account.whirlpools_config,
            token_mint=account.token_mint,
        )

    @staticmethod
    def to_keyed_token_mint(pubkey: Pubkey, account: Optional[RawMintInfo]) -> Optional[MintInfo]:
        if account is None:
            return None
        return MintInfo(
            pubkey=pubkey,
            token_program_id=account.token_program_id,
            mint_authority=account.mint_authority,
            supply=account.supply,
            decimals=account.decimals,
            is_initialized=account.is_initialized,
            freeze_authority=account.freeze_authority,
            tlv_data=account.tlv_data,
        )

    @staticmethod
    def to_keyed_token_account(pubkey: Pubkey, account: Optional[RawAccountInfo]) -> Optional[AccountInfo]:
        if account is None:
            return None
        return AccountInfo(
            pubkey=pubkey,
            token_program_id=account.token_program_id,
            mint=account.mint,
            owner=account.owner,
            amount=account.amount,
            delegate=account.delegate,
            delegated_amount=account.delegated_amount,
            is_initialized=account.is_initialized,
            is_frozen=account.is_frozen,
            is_native=account.is_native,
            close_authority=account.close_authority,
            tlv_data=account.tlv_data,
        )

Classes

class KeyedAccountConverter
Expand source code
class KeyedAccountConverter:
    @staticmethod
    def to_keyed_fee_tier(pubkey: Pubkey, account: Optional[AnchorFeeTier]) -> Optional[FeeTier]:
        if account is None:
            return None
        return FeeTier(
            pubkey=pubkey,
            whirlpools_config=account.whirlpools_config,
            tick_spacing=account.tick_spacing,
            default_fee_rate=account.default_fee_rate,
        )

    @staticmethod
    def to_keyed_position(pubkey: Pubkey, account: Optional[AnchorPosition]) -> Optional[Position]:
        if account is None:
            return None
        return Position(
            pubkey=pubkey,
            whirlpool=account.whirlpool,
            position_mint=account.position_mint,
            liquidity=account.liquidity,
            tick_lower_index=account.tick_lower_index,
            tick_upper_index=account.tick_upper_index,
            fee_growth_checkpoint_a=account.fee_growth_checkpoint_a,
            fee_owed_a=account.fee_owed_a,
            fee_growth_checkpoint_b=account.fee_growth_checkpoint_b,
            fee_owed_b=account.fee_owed_b,
            reward_infos=account.reward_infos,
        )

    @staticmethod
    def to_keyed_tick_array(pubkey: Pubkey, account: Optional[AnchorTickArray]) -> Optional[TickArray]:
        if account is None:
            return None
        return TickArray(
            pubkey=pubkey,
            start_tick_index=account.start_tick_index,
            ticks=account.ticks,
            whirlpool=account.whirlpool,
        )

    @staticmethod
    def to_keyed_whirlpool(pubkey: Pubkey, account: Optional[AnchorWhirlpool]) -> Optional[Whirlpool]:
        if account is None:
            return None
        return Whirlpool(
            pubkey=pubkey,
            whirlpools_config=account.whirlpools_config,
            whirlpool_bump=account.whirlpool_bump,
            tick_spacing=account.tick_spacing,
            tick_spacing_seed=account.tick_spacing_seed,
            fee_rate=account.fee_rate,
            protocol_fee_rate=account.protocol_fee_rate,
            liquidity=account.liquidity,
            sqrt_price=account.sqrt_price,
            tick_current_index=account.tick_current_index,
            protocol_fee_owed_a=account.protocol_fee_owed_a,
            protocol_fee_owed_b=account.protocol_fee_owed_b,
            token_mint_a=account.token_mint_a,
            token_vault_a=account.token_vault_a,
            fee_growth_global_a=account.fee_growth_global_a,
            token_mint_b=account.token_mint_b,
            token_vault_b=account.token_vault_b,
            fee_growth_global_b=account.fee_growth_global_b,
            reward_last_updated_timestamp=account.reward_last_updated_timestamp,
            reward_infos=account.reward_infos,
        )

    @staticmethod
    def to_keyed_whirlpools_config(pubkey: Pubkey, account: Optional[AnchorWhirlpoolsConfig]) -> Optional[WhirlpoolsConfig]:
        if account is None:
            return None
        return WhirlpoolsConfig(
            pubkey=pubkey,
            fee_authority=account.fee_authority,
            collect_protocol_fees_authority=account.collect_protocol_fees_authority,
            reward_emissions_super_authority=account.reward_emissions_super_authority,
            default_protocol_fee_rate=account.default_protocol_fee_rate,
        )

    @staticmethod
    def to_keyed_position_bundle(pubkey: Pubkey, account: Optional[AnchorPositionBundle]) -> Optional[PositionBundle]:
        if account is None:
            return None
        return PositionBundle(
            pubkey=pubkey,
            position_bundle_mint=account.position_bundle_mint,
            position_bitmap=account.position_bitmap,
        )

    @staticmethod
    def to_keyed_whirlpools_config_extension(pubkey: Pubkey, account: Optional[AnchorWhirlpoolsConfigExtension]) -> Optional[WhirlpoolsConfigExtension]:
        if account is None:
            return None
        return WhirlpoolsConfigExtension(
            pubkey=pubkey,
            whirlpools_config=account.whirlpools_config,
            config_extension_authority=account.config_extension_authority,
            token_badge_authority=account.token_badge_authority,
        )

    @staticmethod
    def to_keyed_token_badge(pubkey: Pubkey, account: Optional[AnchorTokenBadge]) -> Optional[TokenBadge]:
        if account is None:
            return None
        return TokenBadge(
            pubkey=pubkey,
            whirlpools_config=account.whirlpools_config,
            token_mint=account.token_mint,
        )

    @staticmethod
    def to_keyed_token_mint(pubkey: Pubkey, account: Optional[RawMintInfo]) -> Optional[MintInfo]:
        if account is None:
            return None
        return MintInfo(
            pubkey=pubkey,
            token_program_id=account.token_program_id,
            mint_authority=account.mint_authority,
            supply=account.supply,
            decimals=account.decimals,
            is_initialized=account.is_initialized,
            freeze_authority=account.freeze_authority,
            tlv_data=account.tlv_data,
        )

    @staticmethod
    def to_keyed_token_account(pubkey: Pubkey, account: Optional[RawAccountInfo]) -> Optional[AccountInfo]:
        if account is None:
            return None
        return AccountInfo(
            pubkey=pubkey,
            token_program_id=account.token_program_id,
            mint=account.mint,
            owner=account.owner,
            amount=account.amount,
            delegate=account.delegate,
            delegated_amount=account.delegated_amount,
            is_initialized=account.is_initialized,
            is_frozen=account.is_frozen,
            is_native=account.is_native,
            close_authority=account.close_authority,
            tlv_data=account.tlv_data,
        )

Static methods

def to_keyed_fee_tier(pubkey: solders.pubkey.Pubkey, account: Optional[FeeTier]) ‑> Optional[FeeTier]
Expand source code
@staticmethod
def to_keyed_fee_tier(pubkey: Pubkey, account: Optional[AnchorFeeTier]) -> Optional[FeeTier]:
    if account is None:
        return None
    return FeeTier(
        pubkey=pubkey,
        whirlpools_config=account.whirlpools_config,
        tick_spacing=account.tick_spacing,
        default_fee_rate=account.default_fee_rate,
    )
def to_keyed_position(pubkey: solders.pubkey.Pubkey, account: Optional[Position]) ‑> Optional[Position]
Expand source code
@staticmethod
def to_keyed_position(pubkey: Pubkey, account: Optional[AnchorPosition]) -> Optional[Position]:
    if account is None:
        return None
    return Position(
        pubkey=pubkey,
        whirlpool=account.whirlpool,
        position_mint=account.position_mint,
        liquidity=account.liquidity,
        tick_lower_index=account.tick_lower_index,
        tick_upper_index=account.tick_upper_index,
        fee_growth_checkpoint_a=account.fee_growth_checkpoint_a,
        fee_owed_a=account.fee_owed_a,
        fee_growth_checkpoint_b=account.fee_growth_checkpoint_b,
        fee_owed_b=account.fee_owed_b,
        reward_infos=account.reward_infos,
    )
def to_keyed_position_bundle(pubkey: solders.pubkey.Pubkey, account: Optional[PositionBundle]) ‑> Optional[PositionBundle]
Expand source code
@staticmethod
def to_keyed_position_bundle(pubkey: Pubkey, account: Optional[AnchorPositionBundle]) -> Optional[PositionBundle]:
    if account is None:
        return None
    return PositionBundle(
        pubkey=pubkey,
        position_bundle_mint=account.position_bundle_mint,
        position_bitmap=account.position_bitmap,
    )
def to_keyed_tick_array(pubkey: solders.pubkey.Pubkey, account: Optional[TickArray]) ‑> Optional[TickArray]
Expand source code
@staticmethod
def to_keyed_tick_array(pubkey: Pubkey, account: Optional[AnchorTickArray]) -> Optional[TickArray]:
    if account is None:
        return None
    return TickArray(
        pubkey=pubkey,
        start_tick_index=account.start_tick_index,
        ticks=account.ticks,
        whirlpool=account.whirlpool,
    )
def to_keyed_token_account(pubkey: solders.pubkey.Pubkey, account: Optional[RawAccountInfo]) ‑> Optional[AccountInfo]
Expand source code
@staticmethod
def to_keyed_token_account(pubkey: Pubkey, account: Optional[RawAccountInfo]) -> Optional[AccountInfo]:
    if account is None:
        return None
    return AccountInfo(
        pubkey=pubkey,
        token_program_id=account.token_program_id,
        mint=account.mint,
        owner=account.owner,
        amount=account.amount,
        delegate=account.delegate,
        delegated_amount=account.delegated_amount,
        is_initialized=account.is_initialized,
        is_frozen=account.is_frozen,
        is_native=account.is_native,
        close_authority=account.close_authority,
        tlv_data=account.tlv_data,
    )
def to_keyed_token_badge(pubkey: solders.pubkey.Pubkey, account: Optional[TokenBadge]) ‑> Optional[TokenBadge]
Expand source code
@staticmethod
def to_keyed_token_badge(pubkey: Pubkey, account: Optional[AnchorTokenBadge]) -> Optional[TokenBadge]:
    if account is None:
        return None
    return TokenBadge(
        pubkey=pubkey,
        whirlpools_config=account.whirlpools_config,
        token_mint=account.token_mint,
    )
def to_keyed_token_mint(pubkey: solders.pubkey.Pubkey, account: Optional[RawMintInfo]) ‑> Optional[MintInfo]
Expand source code
@staticmethod
def to_keyed_token_mint(pubkey: Pubkey, account: Optional[RawMintInfo]) -> Optional[MintInfo]:
    if account is None:
        return None
    return MintInfo(
        pubkey=pubkey,
        token_program_id=account.token_program_id,
        mint_authority=account.mint_authority,
        supply=account.supply,
        decimals=account.decimals,
        is_initialized=account.is_initialized,
        freeze_authority=account.freeze_authority,
        tlv_data=account.tlv_data,
    )
def to_keyed_whirlpool(pubkey: solders.pubkey.Pubkey, account: Optional[Whirlpool]) ‑> Optional[Whirlpool]
Expand source code
@staticmethod
def to_keyed_whirlpool(pubkey: Pubkey, account: Optional[AnchorWhirlpool]) -> Optional[Whirlpool]:
    if account is None:
        return None
    return Whirlpool(
        pubkey=pubkey,
        whirlpools_config=account.whirlpools_config,
        whirlpool_bump=account.whirlpool_bump,
        tick_spacing=account.tick_spacing,
        tick_spacing_seed=account.tick_spacing_seed,
        fee_rate=account.fee_rate,
        protocol_fee_rate=account.protocol_fee_rate,
        liquidity=account.liquidity,
        sqrt_price=account.sqrt_price,
        tick_current_index=account.tick_current_index,
        protocol_fee_owed_a=account.protocol_fee_owed_a,
        protocol_fee_owed_b=account.protocol_fee_owed_b,
        token_mint_a=account.token_mint_a,
        token_vault_a=account.token_vault_a,
        fee_growth_global_a=account.fee_growth_global_a,
        token_mint_b=account.token_mint_b,
        token_vault_b=account.token_vault_b,
        fee_growth_global_b=account.fee_growth_global_b,
        reward_last_updated_timestamp=account.reward_last_updated_timestamp,
        reward_infos=account.reward_infos,
    )
def to_keyed_whirlpools_config(pubkey: solders.pubkey.Pubkey, account: Optional[WhirlpoolsConfig]) ‑> Optional[WhirlpoolsConfig]
Expand source code
@staticmethod
def to_keyed_whirlpools_config(pubkey: Pubkey, account: Optional[AnchorWhirlpoolsConfig]) -> Optional[WhirlpoolsConfig]:
    if account is None:
        return None
    return WhirlpoolsConfig(
        pubkey=pubkey,
        fee_authority=account.fee_authority,
        collect_protocol_fees_authority=account.collect_protocol_fees_authority,
        reward_emissions_super_authority=account.reward_emissions_super_authority,
        default_protocol_fee_rate=account.default_protocol_fee_rate,
    )
def to_keyed_whirlpools_config_extension(pubkey: solders.pubkey.Pubkey, account: Optional[WhirlpoolsConfigExtension]) ‑> Optional[WhirlpoolsConfigExtension]
Expand source code
@staticmethod
def to_keyed_whirlpools_config_extension(pubkey: Pubkey, account: Optional[AnchorWhirlpoolsConfigExtension]) -> Optional[WhirlpoolsConfigExtension]:
    if account is None:
        return None
    return WhirlpoolsConfigExtension(
        pubkey=pubkey,
        whirlpools_config=account.whirlpools_config,
        config_extension_authority=account.config_extension_authority,
        token_badge_authority=account.token_badge_authority,
    )