Module orca_whirlpool.internal.anchor.accounts.whirlpools_config_extension
Expand source code
import typing
from dataclasses import dataclass
from solders.pubkey import Pubkey
from solana.rpc.async_api import AsyncClient
from solana.rpc.commitment import Commitment
import borsh_construct as borsh
from anchorpy.coder.accounts import ACCOUNT_DISCRIMINATOR_SIZE
from anchorpy.error import AccountInvalidDiscriminator
from anchorpy.utils.rpc import get_multiple_accounts
from anchorpy.borsh_extension import BorshPubkey
from ..program_id import PROGRAM_ID
class WhirlpoolsConfigExtensionJSON(typing.TypedDict):
whirlpools_config: str
config_extension_authority: str
token_badge_authority: str
@dataclass
class WhirlpoolsConfigExtension:
discriminator: typing.ClassVar = b"\x02c\xd7\xa3\xf0\x1a\x99:"
layout: typing.ClassVar = borsh.CStruct(
"whirlpools_config" / BorshPubkey,
"config_extension_authority" / BorshPubkey,
"token_badge_authority" / BorshPubkey,
)
whirlpools_config: Pubkey
config_extension_authority: Pubkey
token_badge_authority: Pubkey
@classmethod
async def fetch(
cls,
conn: AsyncClient,
address: Pubkey,
commitment: typing.Optional[Commitment] = None,
program_id: Pubkey = PROGRAM_ID,
) -> typing.Optional["WhirlpoolsConfigExtension"]:
resp = await conn.get_account_info(address, commitment=commitment)
info = resp.value
if info is None:
return None
if info.owner != program_id:
raise ValueError("Account does not belong to this program")
bytes_data = info.data
return cls.decode(bytes_data)
@classmethod
async def fetch_multiple(
cls,
conn: AsyncClient,
addresses: list[Pubkey],
commitment: typing.Optional[Commitment] = None,
program_id: Pubkey = PROGRAM_ID,
) -> typing.List[typing.Optional["WhirlpoolsConfigExtension"]]:
infos = await get_multiple_accounts(conn, addresses, commitment=commitment)
res: typing.List[typing.Optional["WhirlpoolsConfigExtension"]] = []
for info in infos:
if info is None:
res.append(None)
continue
if info.account.owner != program_id:
raise ValueError("Account does not belong to this program")
res.append(cls.decode(info.account.data))
return res
@classmethod
def decode(cls, data: bytes) -> "WhirlpoolsConfigExtension":
if data[:ACCOUNT_DISCRIMINATOR_SIZE] != cls.discriminator:
raise AccountInvalidDiscriminator(
"The discriminator for this account is invalid"
)
dec = WhirlpoolsConfigExtension.layout.parse(data[ACCOUNT_DISCRIMINATOR_SIZE:])
return cls(
whirlpools_config=dec.whirlpools_config,
config_extension_authority=dec.config_extension_authority,
token_badge_authority=dec.token_badge_authority,
)
def to_json(self) -> WhirlpoolsConfigExtensionJSON:
return {
"whirlpools_config": str(self.whirlpools_config),
"config_extension_authority": str(self.config_extension_authority),
"token_badge_authority": str(self.token_badge_authority),
}
@classmethod
def from_json(
cls, obj: WhirlpoolsConfigExtensionJSON
) -> "WhirlpoolsConfigExtension":
return cls(
whirlpools_config=Pubkey.from_string(obj["whirlpools_config"]),
config_extension_authority=Pubkey.from_string(
obj["config_extension_authority"]
),
token_badge_authority=Pubkey.from_string(obj["token_badge_authority"]),
)
Classes
class WhirlpoolsConfigExtension (whirlpools_config: solders.pubkey.Pubkey, config_extension_authority: solders.pubkey.Pubkey, token_badge_authority: solders.pubkey.Pubkey)
-
WhirlpoolsConfigExtension(whirlpools_config: solders.pubkey.Pubkey, config_extension_authority: solders.pubkey.Pubkey, token_badge_authority: solders.pubkey.Pubkey)
Expand source code
@dataclass class WhirlpoolsConfigExtension: discriminator: typing.ClassVar = b"\x02c\xd7\xa3\xf0\x1a\x99:" layout: typing.ClassVar = borsh.CStruct( "whirlpools_config" / BorshPubkey, "config_extension_authority" / BorshPubkey, "token_badge_authority" / BorshPubkey, ) whirlpools_config: Pubkey config_extension_authority: Pubkey token_badge_authority: Pubkey @classmethod async def fetch( cls, conn: AsyncClient, address: Pubkey, commitment: typing.Optional[Commitment] = None, program_id: Pubkey = PROGRAM_ID, ) -> typing.Optional["WhirlpoolsConfigExtension"]: resp = await conn.get_account_info(address, commitment=commitment) info = resp.value if info is None: return None if info.owner != program_id: raise ValueError("Account does not belong to this program") bytes_data = info.data return cls.decode(bytes_data) @classmethod async def fetch_multiple( cls, conn: AsyncClient, addresses: list[Pubkey], commitment: typing.Optional[Commitment] = None, program_id: Pubkey = PROGRAM_ID, ) -> typing.List[typing.Optional["WhirlpoolsConfigExtension"]]: infos = await get_multiple_accounts(conn, addresses, commitment=commitment) res: typing.List[typing.Optional["WhirlpoolsConfigExtension"]] = [] for info in infos: if info is None: res.append(None) continue if info.account.owner != program_id: raise ValueError("Account does not belong to this program") res.append(cls.decode(info.account.data)) return res @classmethod def decode(cls, data: bytes) -> "WhirlpoolsConfigExtension": if data[:ACCOUNT_DISCRIMINATOR_SIZE] != cls.discriminator: raise AccountInvalidDiscriminator( "The discriminator for this account is invalid" ) dec = WhirlpoolsConfigExtension.layout.parse(data[ACCOUNT_DISCRIMINATOR_SIZE:]) return cls( whirlpools_config=dec.whirlpools_config, config_extension_authority=dec.config_extension_authority, token_badge_authority=dec.token_badge_authority, ) def to_json(self) -> WhirlpoolsConfigExtensionJSON: return { "whirlpools_config": str(self.whirlpools_config), "config_extension_authority": str(self.config_extension_authority), "token_badge_authority": str(self.token_badge_authority), } @classmethod def from_json( cls, obj: WhirlpoolsConfigExtensionJSON ) -> "WhirlpoolsConfigExtension": return cls( whirlpools_config=Pubkey.from_string(obj["whirlpools_config"]), config_extension_authority=Pubkey.from_string( obj["config_extension_authority"] ), token_badge_authority=Pubkey.from_string(obj["token_badge_authority"]), )
Class variables
var discriminator : ClassVar
var layout : ClassVar
var whirlpools_config : solders.pubkey.Pubkey
Static methods
def decode(data: bytes) ‑> WhirlpoolsConfigExtension
-
Expand source code
@classmethod def decode(cls, data: bytes) -> "WhirlpoolsConfigExtension": if data[:ACCOUNT_DISCRIMINATOR_SIZE] != cls.discriminator: raise AccountInvalidDiscriminator( "The discriminator for this account is invalid" ) dec = WhirlpoolsConfigExtension.layout.parse(data[ACCOUNT_DISCRIMINATOR_SIZE:]) return cls( whirlpools_config=dec.whirlpools_config, config_extension_authority=dec.config_extension_authority, token_badge_authority=dec.token_badge_authority, )
async def fetch(conn: solana.rpc.async_api.AsyncClient, address: solders.pubkey.Pubkey, commitment: Optional[solana.rpc.commitment.Commitment] = None, program_id: solders.pubkey.Pubkey = Pubkey( whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc, )) ‑> Optional[WhirlpoolsConfigExtension]
-
Expand source code
@classmethod async def fetch( cls, conn: AsyncClient, address: Pubkey, commitment: typing.Optional[Commitment] = None, program_id: Pubkey = PROGRAM_ID, ) -> typing.Optional["WhirlpoolsConfigExtension"]: resp = await conn.get_account_info(address, commitment=commitment) info = resp.value if info is None: return None if info.owner != program_id: raise ValueError("Account does not belong to this program") bytes_data = info.data return cls.decode(bytes_data)
async def fetch_multiple(conn: solana.rpc.async_api.AsyncClient, addresses: list[solders.pubkey.Pubkey], commitment: Optional[solana.rpc.commitment.Commitment] = None, program_id: solders.pubkey.Pubkey = Pubkey( whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc, )) ‑> List[Optional[WhirlpoolsConfigExtension]]
-
Expand source code
@classmethod async def fetch_multiple( cls, conn: AsyncClient, addresses: list[Pubkey], commitment: typing.Optional[Commitment] = None, program_id: Pubkey = PROGRAM_ID, ) -> typing.List[typing.Optional["WhirlpoolsConfigExtension"]]: infos = await get_multiple_accounts(conn, addresses, commitment=commitment) res: typing.List[typing.Optional["WhirlpoolsConfigExtension"]] = [] for info in infos: if info is None: res.append(None) continue if info.account.owner != program_id: raise ValueError("Account does not belong to this program") res.append(cls.decode(info.account.data)) return res
def from_json(obj: WhirlpoolsConfigExtensionJSON) ‑> WhirlpoolsConfigExtension
-
Expand source code
@classmethod def from_json( cls, obj: WhirlpoolsConfigExtensionJSON ) -> "WhirlpoolsConfigExtension": return cls( whirlpools_config=Pubkey.from_string(obj["whirlpools_config"]), config_extension_authority=Pubkey.from_string( obj["config_extension_authority"] ), token_badge_authority=Pubkey.from_string(obj["token_badge_authority"]), )
Methods
def to_json(self) ‑> WhirlpoolsConfigExtensionJSON
-
Expand source code
def to_json(self) -> WhirlpoolsConfigExtensionJSON: return { "whirlpools_config": str(self.whirlpools_config), "config_extension_authority": str(self.config_extension_authority), "token_badge_authority": str(self.token_badge_authority), }
class WhirlpoolsConfigExtensionJSON (*args, **kwargs)
-
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Expand source code
class WhirlpoolsConfigExtensionJSON(typing.TypedDict): whirlpools_config: str config_extension_authority: str token_badge_authority: str
Ancestors
- builtins.dict
Class variables
var whirlpools_config : str