Module orca_whirlpool.internal.quote.swap_simulator.tick_array_sequence
Expand source code
import dataclasses
from typing import List, Optional
from solders.pubkey import Pubkey
from ...invariant import invariant
from ...errors import WhirlpoolError, SwapErrorCode
from ...accounts.types import TickArray
from ...types.enums import SwapDirection, TickArrayReduction
from ...anchor.types import Tick
from ...constants import MIN_TICK_INDEX, MAX_TICK_INDEX, TICK_ARRAY_SIZE
from ...utils.swap_util import SwapUtil
@dataclasses.dataclass(frozen=True)
class InitializedTick:
tick_index: int
tick_array_index: int
data: Tick
def is_consecutive_tick_arrays(
tick_array: TickArray,
next_tick_array: TickArray,
tick_spacing: int,
direction: SwapDirection
) -> bool:
ticks_in_array = TICK_ARRAY_SIZE * tick_spacing
if direction.is_price_up:
expected_start_tick_index = tick_array.start_tick_index + ticks_in_array
else:
expected_start_tick_index = tick_array.start_tick_index - ticks_in_array
return next_tick_array.start_tick_index == expected_start_tick_index
def get_initialized_ticks(
tick_array: TickArray,
tick_array_index: int,
tick_spacing: int,
direction: SwapDirection,
has_next: bool
) -> List[InitializedTick]:
start_tick_index = tick_array.start_tick_index
last_tick_index_appended = False
if direction.is_price_up:
last_tick_index = min(start_tick_index + tick_spacing * TICK_ARRAY_SIZE - 1, MAX_TICK_INDEX)
ticks = list(enumerate(tick_array.ticks))
else:
last_tick_index = max(start_tick_index, MIN_TICK_INDEX)
ticks = reversed(list(enumerate(tick_array.ticks)))
initialized_ticks = []
for i, tick in ticks:
if tick.initialized:
tick_index = start_tick_index + i*tick_spacing
initialized_ticks.append(InitializedTick(tick_index, tick_array_index, tick))
if tick_index == last_tick_index:
last_tick_index_appended = True
if not has_next and not last_tick_index_appended:
initialized_ticks.append(InitializedTick(last_tick_index, tick_array_index, Tick(False, 0, 0, 0, 0, [])))
return initialized_ticks
class TickArraySequence:
def __init__(
self,
tick_arrays: List[Optional[TickArray]],
tick_current_index: int,
tick_spacing: int,
direction: SwapDirection,
max_swap_tick_arrays: int,
):
self.tick_spacing = tick_spacing
self.direction = direction
self.max_swap_tick_arrays = max_swap_tick_arrays
self.tick_arrays = []
for i, ta in enumerate(tick_arrays[0:max_swap_tick_arrays]):
if ta is None:
break # discontiguous TickArrays are not allowed.
if i > 0 and not is_consecutive_tick_arrays(self.tick_arrays[i-1], ta, tick_spacing, direction):
raise WhirlpoolError(SwapErrorCode.TickArraySequenceInvalid)
self.tick_arrays.append(ta)
if len(self.tick_arrays) == 0:
raise WhirlpoolError(SwapErrorCode.TickArray0MustBeInitialized)
if not SwapUtil.is_valid_tick_array_0(
self.tick_arrays[0],
tick_current_index,
self.tick_spacing,
self.direction):
raise WhirlpoolError(SwapErrorCode.TickArraySequenceInvalid)
self.max_touched_tick_array_index = 0
self.initialized_ticks = []
for tick_array_index, tick_array in enumerate(self.tick_arrays):
has_next = tick_array_index+1 < len(self.tick_arrays)
self.initialized_ticks.extend(get_initialized_ticks(
tick_array,
tick_array_index,
self.tick_spacing,
self.direction,
has_next,
))
def get_next_initialized_tick_index(self, current_tick_index: int) -> int:
for tick in self.initialized_ticks:
if self.direction.is_price_up and tick.tick_index > current_tick_index: # not inclusive
self.max_touched_tick_array_index = max(self.max_touched_tick_array_index, tick.tick_array_index)
return tick.tick_index
if self.direction.is_price_down and tick.tick_index <= current_tick_index: # inclusive
self.max_touched_tick_array_index = max(self.max_touched_tick_array_index, tick.tick_array_index)
return tick.tick_index
raise WhirlpoolError(SwapErrorCode.TickArraySequenceInvalid)
def get_tick(self, tick_index: int) -> Tick:
for tick in self.initialized_ticks:
if tick.tick_index == tick_index:
return tick.data
invariant(False, "unreachable - tick_index is not in initialized_ticks")
def get_tick_array_pubkeys(self, reduction: TickArrayReduction) -> List[Pubkey]:
# reduction
max_touched = self.max_touched_tick_array_index
if reduction == TickArrayReduction.Aggressive:
end = max_touched + 1
elif reduction == TickArrayReduction.Conservative:
end = max_touched + 1 + 1
else:
end = self.max_swap_tick_arrays
result = [ta.pubkey for ta in self.tick_arrays[0:end]]
# padding
last = result[-1]
while len(result) < self.max_swap_tick_arrays:
result.append(last)
return result
Functions
def get_initialized_ticks(tick_array: TickArray, tick_array_index: int, tick_spacing: int, direction: SwapDirection, has_next: bool) ‑> List[InitializedTick]
-
Expand source code
def get_initialized_ticks( tick_array: TickArray, tick_array_index: int, tick_spacing: int, direction: SwapDirection, has_next: bool ) -> List[InitializedTick]: start_tick_index = tick_array.start_tick_index last_tick_index_appended = False if direction.is_price_up: last_tick_index = min(start_tick_index + tick_spacing * TICK_ARRAY_SIZE - 1, MAX_TICK_INDEX) ticks = list(enumerate(tick_array.ticks)) else: last_tick_index = max(start_tick_index, MIN_TICK_INDEX) ticks = reversed(list(enumerate(tick_array.ticks))) initialized_ticks = [] for i, tick in ticks: if tick.initialized: tick_index = start_tick_index + i*tick_spacing initialized_ticks.append(InitializedTick(tick_index, tick_array_index, tick)) if tick_index == last_tick_index: last_tick_index_appended = True if not has_next and not last_tick_index_appended: initialized_ticks.append(InitializedTick(last_tick_index, tick_array_index, Tick(False, 0, 0, 0, 0, []))) return initialized_ticks
def is_consecutive_tick_arrays(tick_array: TickArray, next_tick_array: TickArray, tick_spacing: int, direction: SwapDirection) ‑> bool
-
Expand source code
def is_consecutive_tick_arrays( tick_array: TickArray, next_tick_array: TickArray, tick_spacing: int, direction: SwapDirection ) -> bool: ticks_in_array = TICK_ARRAY_SIZE * tick_spacing if direction.is_price_up: expected_start_tick_index = tick_array.start_tick_index + ticks_in_array else: expected_start_tick_index = tick_array.start_tick_index - ticks_in_array return next_tick_array.start_tick_index == expected_start_tick_index
Classes
class InitializedTick (tick_index: int, tick_array_index: int, data: Tick)
-
InitializedTick(tick_index: int, tick_array_index: int, data: orca_whirlpool.internal.anchor.types.tick.Tick)
Expand source code
@dataclasses.dataclass(frozen=True) class InitializedTick: tick_index: int tick_array_index: int data: Tick
Class variables
var data : Tick
var tick_array_index : int
var tick_index : int
class TickArraySequence (tick_arrays: List[Optional[TickArray]], tick_current_index: int, tick_spacing: int, direction: SwapDirection, max_swap_tick_arrays: int)
-
Expand source code
class TickArraySequence: def __init__( self, tick_arrays: List[Optional[TickArray]], tick_current_index: int, tick_spacing: int, direction: SwapDirection, max_swap_tick_arrays: int, ): self.tick_spacing = tick_spacing self.direction = direction self.max_swap_tick_arrays = max_swap_tick_arrays self.tick_arrays = [] for i, ta in enumerate(tick_arrays[0:max_swap_tick_arrays]): if ta is None: break # discontiguous TickArrays are not allowed. if i > 0 and not is_consecutive_tick_arrays(self.tick_arrays[i-1], ta, tick_spacing, direction): raise WhirlpoolError(SwapErrorCode.TickArraySequenceInvalid) self.tick_arrays.append(ta) if len(self.tick_arrays) == 0: raise WhirlpoolError(SwapErrorCode.TickArray0MustBeInitialized) if not SwapUtil.is_valid_tick_array_0( self.tick_arrays[0], tick_current_index, self.tick_spacing, self.direction): raise WhirlpoolError(SwapErrorCode.TickArraySequenceInvalid) self.max_touched_tick_array_index = 0 self.initialized_ticks = [] for tick_array_index, tick_array in enumerate(self.tick_arrays): has_next = tick_array_index+1 < len(self.tick_arrays) self.initialized_ticks.extend(get_initialized_ticks( tick_array, tick_array_index, self.tick_spacing, self.direction, has_next, )) def get_next_initialized_tick_index(self, current_tick_index: int) -> int: for tick in self.initialized_ticks: if self.direction.is_price_up and tick.tick_index > current_tick_index: # not inclusive self.max_touched_tick_array_index = max(self.max_touched_tick_array_index, tick.tick_array_index) return tick.tick_index if self.direction.is_price_down and tick.tick_index <= current_tick_index: # inclusive self.max_touched_tick_array_index = max(self.max_touched_tick_array_index, tick.tick_array_index) return tick.tick_index raise WhirlpoolError(SwapErrorCode.TickArraySequenceInvalid) def get_tick(self, tick_index: int) -> Tick: for tick in self.initialized_ticks: if tick.tick_index == tick_index: return tick.data invariant(False, "unreachable - tick_index is not in initialized_ticks") def get_tick_array_pubkeys(self, reduction: TickArrayReduction) -> List[Pubkey]: # reduction max_touched = self.max_touched_tick_array_index if reduction == TickArrayReduction.Aggressive: end = max_touched + 1 elif reduction == TickArrayReduction.Conservative: end = max_touched + 1 + 1 else: end = self.max_swap_tick_arrays result = [ta.pubkey for ta in self.tick_arrays[0:end]] # padding last = result[-1] while len(result) < self.max_swap_tick_arrays: result.append(last) return result
Methods
def get_next_initialized_tick_index(self, current_tick_index: int) ‑> int
-
Expand source code
def get_next_initialized_tick_index(self, current_tick_index: int) -> int: for tick in self.initialized_ticks: if self.direction.is_price_up and tick.tick_index > current_tick_index: # not inclusive self.max_touched_tick_array_index = max(self.max_touched_tick_array_index, tick.tick_array_index) return tick.tick_index if self.direction.is_price_down and tick.tick_index <= current_tick_index: # inclusive self.max_touched_tick_array_index = max(self.max_touched_tick_array_index, tick.tick_array_index) return tick.tick_index raise WhirlpoolError(SwapErrorCode.TickArraySequenceInvalid)
def get_tick(self, tick_index: int) ‑> Tick
-
Expand source code
def get_tick(self, tick_index: int) -> Tick: for tick in self.initialized_ticks: if tick.tick_index == tick_index: return tick.data invariant(False, "unreachable - tick_index is not in initialized_ticks")
def get_tick_array_pubkeys(self, reduction: TickArrayReduction) ‑> List[solders.pubkey.Pubkey]
-
Expand source code
def get_tick_array_pubkeys(self, reduction: TickArrayReduction) -> List[Pubkey]: # reduction max_touched = self.max_touched_tick_array_index if reduction == TickArrayReduction.Aggressive: end = max_touched + 1 elif reduction == TickArrayReduction.Conservative: end = max_touched + 1 + 1 else: end = self.max_swap_tick_arrays result = [ta.pubkey for ta in self.tick_arrays[0:end]] # padding last = result[-1] while len(result) < self.max_swap_tick_arrays: result.append(last) return result