Module orca_whirlpool.internal.accounts.account_fetcher
Expand source code
from typing import List, Optional
from solders.account import Account
from solders.pubkey import Pubkey
from solana.rpc.async_api import AsyncClient
from ..types.types import BlockTimestamp
from .types import WhirlpoolsConfig, FeeTier, Whirlpool, TickArray, Position, PositionBundle, MintInfo, AccountInfo
from .types import WhirlpoolsConfigExtension, TokenBadge
from .account_parser import AccountParser
from .keyed_account_converter import KeyedAccountConverter
BULK_FETCH_CHUNK_SIZE = 100
# https://github.com/orca-so/whirlpools/blob/7b9ec351e2048c5504ffc8894c0ec5a9e78dc113/sdk/src/network/public/fetcher.ts
class AccountFetcher:
def __init__(self, connection: AsyncClient):
self._connection = connection
self._cache = {}
async def _get(self, pubkey: Pubkey, parser, keyed_converter, refresh: bool, parse_with_program_id: bool = False):
key = str(pubkey)
if not refresh and key in self._cache:
return self._cache[key]
res = await self._connection.get_account_info(pubkey)
if res.value is None:
return None
parsed = parser(res.value.data, res.value.owner) if parse_with_program_id else parser(res.value.data)
if parsed is None:
return None
keyed = keyed_converter(pubkey, parsed)
self._cache[key] = keyed
return keyed
async def _list(self, pubkeys: List[Pubkey], parser, keyed_converter, refresh: bool, parse_with_program_id: bool = False):
fetch_needed = list(filter(lambda p: refresh or str(p) not in self._cache, pubkeys))
if len(fetch_needed) > 0:
fetched = await self._bulk_fetch(fetch_needed)
for i in range(len(fetch_needed)):
if fetched[i] is None:
continue
parsed = parser(fetched[i].data, fetched[i].owner) if parse_with_program_id else parser(fetched[i].data)
if parsed is None:
continue
keyed = keyed_converter(fetch_needed[i], parsed)
self._cache[str(fetch_needed[i])] = keyed
return list(map(lambda p: self._cache.get(str(p)), pubkeys))
async def _bulk_fetch(self, pubkeys: List[Pubkey]) -> List[Optional[Account]]:
accounts = []
for i in range(0, len(pubkeys), BULK_FETCH_CHUNK_SIZE):
chunk = pubkeys[i:(i+BULK_FETCH_CHUNK_SIZE)]
fetched = await self._connection.get_multiple_accounts(chunk)
accounts.extend(fetched.value)
return accounts
async def get_whirlpool(self, pubkey: Pubkey, refresh: bool = False) -> Optional[Whirlpool]:
return await self._get(pubkey, AccountParser.parse_whirlpool, KeyedAccountConverter.to_keyed_whirlpool, refresh)
async def get_whirlpools_config(self, pubkey: Pubkey, refresh: bool = False) -> Optional[WhirlpoolsConfig]:
return await self._get(pubkey, AccountParser.parse_whirlpools_config, KeyedAccountConverter.to_keyed_whirlpools_config, refresh)
async def get_fee_tier(self, pubkey: Pubkey, refresh: bool = False) -> Optional[FeeTier]:
return await self._get(pubkey, AccountParser.parse_fee_tier, KeyedAccountConverter.to_keyed_fee_tier, refresh)
async def get_position(self, pubkey: Pubkey, refresh: bool = False) -> Optional[Position]:
return await self._get(pubkey, AccountParser.parse_position, KeyedAccountConverter.to_keyed_position, refresh)
async def get_tick_array(self, pubkey: Pubkey, refresh: bool = False) -> Optional[TickArray]:
return await self._get(pubkey, AccountParser.parse_tick_array, KeyedAccountConverter.to_keyed_tick_array, refresh)
async def get_position_bundle(self, pubkey: Pubkey, refresh: bool = False) -> Optional[PositionBundle]:
return await self._get(pubkey, AccountParser.parse_position_bundle, KeyedAccountConverter.to_keyed_position_bundle, refresh)
async def get_whirlpools_config_extension(self, pubkey: Pubkey, refresh: bool = False) -> Optional[WhirlpoolsConfigExtension]:
return await self._get(pubkey, AccountParser.parse_whirlpools_config_extension, KeyedAccountConverter.to_keyed_whirlpools_config_extension, refresh)
async def get_token_badge(self, pubkey: Pubkey, refresh: bool = False) -> Optional[TokenBadge]:
return await self._get(pubkey, AccountParser.parse_token_badge, KeyedAccountConverter.to_keyed_token_badge, refresh)
async def get_token_account(self, pubkey: Pubkey, refresh: bool = False) -> Optional[AccountInfo]:
return await self._get(pubkey, AccountParser.parse_token_account, KeyedAccountConverter.to_keyed_token_account, refresh, True)
async def get_token_mint(self, pubkey: Pubkey, refresh: bool = False) -> Optional[MintInfo]:
return await self._get(pubkey, AccountParser.parse_token_mint, KeyedAccountConverter.to_keyed_token_mint, refresh, True)
async def list_whirlpools(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[Whirlpool]]:
return await self._list(pubkeys, AccountParser.parse_whirlpool, KeyedAccountConverter.to_keyed_whirlpool, refresh)
async def list_positions(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[Position]]:
return await self._list(pubkeys, AccountParser.parse_position, KeyedAccountConverter.to_keyed_position, refresh)
async def list_tick_arrays(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[TickArray]]:
return await self._list(pubkeys, AccountParser.parse_tick_array, KeyedAccountConverter.to_keyed_tick_array, refresh)
async def list_position_bundles(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[PositionBundle]]:
return await self._list(pubkeys, AccountParser.parse_position_bundle, KeyedAccountConverter.to_keyed_position_bundle, refresh)
async def list_token_badges(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[TokenBadge]]:
return await self._list(pubkeys, AccountParser.parse_token_badge, KeyedAccountConverter.to_keyed_token_badge, refresh)
async def list_token_accounts(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[AccountInfo]]:
return await self._list(pubkeys, AccountParser.parse_token_account, KeyedAccountConverter.to_keyed_token_account, refresh, True)
async def list_token_mints(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[MintInfo]]:
return await self._list(pubkeys, AccountParser.parse_token_mint, KeyedAccountConverter.to_keyed_token_mint, refresh, True)
async def get_latest_block_timestamp(self) -> BlockTimestamp:
res1 = await self._connection.get_latest_blockhash()
slot = res1.context.slot
res2 = await self._connection.get_block_time(slot)
timestamp = res2.value
return BlockTimestamp(slot=slot, timestamp=timestamp)
Classes
class AccountFetcher (connection: solana.rpc.async_api.AsyncClient)
-
Expand source code
class AccountFetcher: def __init__(self, connection: AsyncClient): self._connection = connection self._cache = {} async def _get(self, pubkey: Pubkey, parser, keyed_converter, refresh: bool, parse_with_program_id: bool = False): key = str(pubkey) if not refresh and key in self._cache: return self._cache[key] res = await self._connection.get_account_info(pubkey) if res.value is None: return None parsed = parser(res.value.data, res.value.owner) if parse_with_program_id else parser(res.value.data) if parsed is None: return None keyed = keyed_converter(pubkey, parsed) self._cache[key] = keyed return keyed async def _list(self, pubkeys: List[Pubkey], parser, keyed_converter, refresh: bool, parse_with_program_id: bool = False): fetch_needed = list(filter(lambda p: refresh or str(p) not in self._cache, pubkeys)) if len(fetch_needed) > 0: fetched = await self._bulk_fetch(fetch_needed) for i in range(len(fetch_needed)): if fetched[i] is None: continue parsed = parser(fetched[i].data, fetched[i].owner) if parse_with_program_id else parser(fetched[i].data) if parsed is None: continue keyed = keyed_converter(fetch_needed[i], parsed) self._cache[str(fetch_needed[i])] = keyed return list(map(lambda p: self._cache.get(str(p)), pubkeys)) async def _bulk_fetch(self, pubkeys: List[Pubkey]) -> List[Optional[Account]]: accounts = [] for i in range(0, len(pubkeys), BULK_FETCH_CHUNK_SIZE): chunk = pubkeys[i:(i+BULK_FETCH_CHUNK_SIZE)] fetched = await self._connection.get_multiple_accounts(chunk) accounts.extend(fetched.value) return accounts async def get_whirlpool(self, pubkey: Pubkey, refresh: bool = False) -> Optional[Whirlpool]: return await self._get(pubkey, AccountParser.parse_whirlpool, KeyedAccountConverter.to_keyed_whirlpool, refresh) async def get_whirlpools_config(self, pubkey: Pubkey, refresh: bool = False) -> Optional[WhirlpoolsConfig]: return await self._get(pubkey, AccountParser.parse_whirlpools_config, KeyedAccountConverter.to_keyed_whirlpools_config, refresh) async def get_fee_tier(self, pubkey: Pubkey, refresh: bool = False) -> Optional[FeeTier]: return await self._get(pubkey, AccountParser.parse_fee_tier, KeyedAccountConverter.to_keyed_fee_tier, refresh) async def get_position(self, pubkey: Pubkey, refresh: bool = False) -> Optional[Position]: return await self._get(pubkey, AccountParser.parse_position, KeyedAccountConverter.to_keyed_position, refresh) async def get_tick_array(self, pubkey: Pubkey, refresh: bool = False) -> Optional[TickArray]: return await self._get(pubkey, AccountParser.parse_tick_array, KeyedAccountConverter.to_keyed_tick_array, refresh) async def get_position_bundle(self, pubkey: Pubkey, refresh: bool = False) -> Optional[PositionBundle]: return await self._get(pubkey, AccountParser.parse_position_bundle, KeyedAccountConverter.to_keyed_position_bundle, refresh) async def get_whirlpools_config_extension(self, pubkey: Pubkey, refresh: bool = False) -> Optional[WhirlpoolsConfigExtension]: return await self._get(pubkey, AccountParser.parse_whirlpools_config_extension, KeyedAccountConverter.to_keyed_whirlpools_config_extension, refresh) async def get_token_badge(self, pubkey: Pubkey, refresh: bool = False) -> Optional[TokenBadge]: return await self._get(pubkey, AccountParser.parse_token_badge, KeyedAccountConverter.to_keyed_token_badge, refresh) async def get_token_account(self, pubkey: Pubkey, refresh: bool = False) -> Optional[AccountInfo]: return await self._get(pubkey, AccountParser.parse_token_account, KeyedAccountConverter.to_keyed_token_account, refresh, True) async def get_token_mint(self, pubkey: Pubkey, refresh: bool = False) -> Optional[MintInfo]: return await self._get(pubkey, AccountParser.parse_token_mint, KeyedAccountConverter.to_keyed_token_mint, refresh, True) async def list_whirlpools(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[Whirlpool]]: return await self._list(pubkeys, AccountParser.parse_whirlpool, KeyedAccountConverter.to_keyed_whirlpool, refresh) async def list_positions(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[Position]]: return await self._list(pubkeys, AccountParser.parse_position, KeyedAccountConverter.to_keyed_position, refresh) async def list_tick_arrays(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[TickArray]]: return await self._list(pubkeys, AccountParser.parse_tick_array, KeyedAccountConverter.to_keyed_tick_array, refresh) async def list_position_bundles(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[PositionBundle]]: return await self._list(pubkeys, AccountParser.parse_position_bundle, KeyedAccountConverter.to_keyed_position_bundle, refresh) async def list_token_badges(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[TokenBadge]]: return await self._list(pubkeys, AccountParser.parse_token_badge, KeyedAccountConverter.to_keyed_token_badge, refresh) async def list_token_accounts(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[AccountInfo]]: return await self._list(pubkeys, AccountParser.parse_token_account, KeyedAccountConverter.to_keyed_token_account, refresh, True) async def list_token_mints(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[MintInfo]]: return await self._list(pubkeys, AccountParser.parse_token_mint, KeyedAccountConverter.to_keyed_token_mint, refresh, True) async def get_latest_block_timestamp(self) -> BlockTimestamp: res1 = await self._connection.get_latest_blockhash() slot = res1.context.slot res2 = await self._connection.get_block_time(slot) timestamp = res2.value return BlockTimestamp(slot=slot, timestamp=timestamp)
Methods
async def get_fee_tier(self, pubkey: solders.pubkey.Pubkey, refresh: bool = False) ‑> Optional[FeeTier]
-
Expand source code
async def get_fee_tier(self, pubkey: Pubkey, refresh: bool = False) -> Optional[FeeTier]: return await self._get(pubkey, AccountParser.parse_fee_tier, KeyedAccountConverter.to_keyed_fee_tier, refresh)
async def get_latest_block_timestamp(self) ‑> BlockTimestamp
-
Expand source code
async def get_latest_block_timestamp(self) -> BlockTimestamp: res1 = await self._connection.get_latest_blockhash() slot = res1.context.slot res2 = await self._connection.get_block_time(slot) timestamp = res2.value return BlockTimestamp(slot=slot, timestamp=timestamp)
async def get_position(self, pubkey: solders.pubkey.Pubkey, refresh: bool = False) ‑> Optional[Position]
-
Expand source code
async def get_position(self, pubkey: Pubkey, refresh: bool = False) -> Optional[Position]: return await self._get(pubkey, AccountParser.parse_position, KeyedAccountConverter.to_keyed_position, refresh)
async def get_position_bundle(self, pubkey: solders.pubkey.Pubkey, refresh: bool = False) ‑> Optional[PositionBundle]
-
Expand source code
async def get_position_bundle(self, pubkey: Pubkey, refresh: bool = False) -> Optional[PositionBundle]: return await self._get(pubkey, AccountParser.parse_position_bundle, KeyedAccountConverter.to_keyed_position_bundle, refresh)
async def get_tick_array(self, pubkey: solders.pubkey.Pubkey, refresh: bool = False) ‑> Optional[TickArray]
-
Expand source code
async def get_tick_array(self, pubkey: Pubkey, refresh: bool = False) -> Optional[TickArray]: return await self._get(pubkey, AccountParser.parse_tick_array, KeyedAccountConverter.to_keyed_tick_array, refresh)
async def get_token_account(self, pubkey: solders.pubkey.Pubkey, refresh: bool = False) ‑> Optional[AccountInfo]
-
Expand source code
async def get_token_account(self, pubkey: Pubkey, refresh: bool = False) -> Optional[AccountInfo]: return await self._get(pubkey, AccountParser.parse_token_account, KeyedAccountConverter.to_keyed_token_account, refresh, True)
async def get_token_badge(self, pubkey: solders.pubkey.Pubkey, refresh: bool = False) ‑> Optional[TokenBadge]
-
Expand source code
async def get_token_badge(self, pubkey: Pubkey, refresh: bool = False) -> Optional[TokenBadge]: return await self._get(pubkey, AccountParser.parse_token_badge, KeyedAccountConverter.to_keyed_token_badge, refresh)
async def get_token_mint(self, pubkey: solders.pubkey.Pubkey, refresh: bool = False) ‑> Optional[MintInfo]
-
Expand source code
async def get_token_mint(self, pubkey: Pubkey, refresh: bool = False) -> Optional[MintInfo]: return await self._get(pubkey, AccountParser.parse_token_mint, KeyedAccountConverter.to_keyed_token_mint, refresh, True)
async def get_whirlpool(self, pubkey: solders.pubkey.Pubkey, refresh: bool = False) ‑> Optional[Whirlpool]
-
Expand source code
async def get_whirlpool(self, pubkey: Pubkey, refresh: bool = False) -> Optional[Whirlpool]: return await self._get(pubkey, AccountParser.parse_whirlpool, KeyedAccountConverter.to_keyed_whirlpool, refresh)
async def get_whirlpools_config(self, pubkey: solders.pubkey.Pubkey, refresh: bool = False) ‑> Optional[WhirlpoolsConfig]
-
Expand source code
async def get_whirlpools_config(self, pubkey: Pubkey, refresh: bool = False) -> Optional[WhirlpoolsConfig]: return await self._get(pubkey, AccountParser.parse_whirlpools_config, KeyedAccountConverter.to_keyed_whirlpools_config, refresh)
async def get_whirlpools_config_extension(self, pubkey: solders.pubkey.Pubkey, refresh: bool = False) ‑> Optional[WhirlpoolsConfigExtension]
-
Expand source code
async def get_whirlpools_config_extension(self, pubkey: Pubkey, refresh: bool = False) -> Optional[WhirlpoolsConfigExtension]: return await self._get(pubkey, AccountParser.parse_whirlpools_config_extension, KeyedAccountConverter.to_keyed_whirlpools_config_extension, refresh)
async def list_position_bundles(self, pubkeys: List[solders.pubkey.Pubkey], refresh: bool = False) ‑> List[Optional[PositionBundle]]
-
Expand source code
async def list_position_bundles(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[PositionBundle]]: return await self._list(pubkeys, AccountParser.parse_position_bundle, KeyedAccountConverter.to_keyed_position_bundle, refresh)
async def list_positions(self, pubkeys: List[solders.pubkey.Pubkey], refresh: bool = False) ‑> List[Optional[Position]]
-
Expand source code
async def list_positions(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[Position]]: return await self._list(pubkeys, AccountParser.parse_position, KeyedAccountConverter.to_keyed_position, refresh)
async def list_tick_arrays(self, pubkeys: List[solders.pubkey.Pubkey], refresh: bool = False) ‑> List[Optional[TickArray]]
-
Expand source code
async def list_tick_arrays(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[TickArray]]: return await self._list(pubkeys, AccountParser.parse_tick_array, KeyedAccountConverter.to_keyed_tick_array, refresh)
async def list_token_accounts(self, pubkeys: List[solders.pubkey.Pubkey], refresh: bool = False) ‑> List[Optional[AccountInfo]]
-
Expand source code
async def list_token_accounts(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[AccountInfo]]: return await self._list(pubkeys, AccountParser.parse_token_account, KeyedAccountConverter.to_keyed_token_account, refresh, True)
async def list_token_badges(self, pubkeys: List[solders.pubkey.Pubkey], refresh: bool = False) ‑> List[Optional[TokenBadge]]
-
Expand source code
async def list_token_badges(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[TokenBadge]]: return await self._list(pubkeys, AccountParser.parse_token_badge, KeyedAccountConverter.to_keyed_token_badge, refresh)
async def list_token_mints(self, pubkeys: List[solders.pubkey.Pubkey], refresh: bool = False) ‑> List[Optional[MintInfo]]
-
Expand source code
async def list_token_mints(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[MintInfo]]: return await self._list(pubkeys, AccountParser.parse_token_mint, KeyedAccountConverter.to_keyed_token_mint, refresh, True)
async def list_whirlpools(self, pubkeys: List[solders.pubkey.Pubkey], refresh: bool = False) ‑> List[Optional[Whirlpool]]
-
Expand source code
async def list_whirlpools(self, pubkeys: List[Pubkey], refresh: bool = False) -> List[Optional[Whirlpool]]: return await self._list(pubkeys, AccountParser.parse_whirlpool, KeyedAccountConverter.to_keyed_whirlpool, refresh)