Skip to content

API Reference

The public import path is finsaber. The detailed API reference below documents the implementation modules that are re-exported through that public package.

backtest

Public interface for the reusable FINSABER backtesting package.

FinsaberDataset

Bases: TradingData

TradingData adapter for FINSABER aggregated date dictionaries.

Expected daily shape is extensible: {date: {"price": {ticker: bar}, "news": {ticker: [...]}, ...}}. Additional modalities are preserved and filtered by ticker when possible.

Source code in backtest\data_util\finsaber_dataset.py
class FinsaberDataset(TradingData):
    """TradingData adapter for FINSABER aggregated date dictionaries.

    Expected daily shape is extensible:
    ``{date: {"price": {ticker: bar}, "news": {ticker: [...]}, ...}}``.
    Additional modalities are preserved and filtered by ticker when possible.
    """

    def __init__(self, pickle_file: str | None = None, data: dict | None = None, price_field: str = "adjusted_close"):
        if pickle_file is None and data is None:
            raise ValueError("Either pickle_file or data must be provided")
        if pickle_file is not None and data is not None:
            raise ValueError("Only one of pickle_file or data must be provided")

        if pickle_file is not None:
            with open(pickle_file, "rb") as file:
                self.data = pickle.load(file)
        else:
            self.data = data

        self.price_field = price_field
        self._tickers_list = None
        self._date_range = sorted(self.data.keys())

    @staticmethod
    def _normalize_date(date):
        if isinstance(date, str):
            return pd.to_datetime(date).date()
        if isinstance(date, pd.Timestamp):
            return date.date()
        return date

    def get_ticker_price_by_date(self, ticker: str, date, price_field: str | None = None) -> float:
        date = self._normalize_date(date)
        price = self.data[date]["price"][ticker]
        if isinstance(price, dict):
            field = price_field or self.price_field
            if field in price:
                return price[field]
            if field.startswith("adjusted_") and "adjusted_close" in price and "close" in price:
                if price["close"] == 0:
                    return 0
                adjustment = price["adjusted_close"] / price["close"]
                raw_field = field.removeprefix("adjusted_")
                if raw_field in price:
                    return price[raw_field] * adjustment
            return price.get("close", price.get("adjusted_close"))
        return price

    def get_ticker_data_by_date(self, ticker: str, date) -> dict[str, Any]:
        daily_data = self.get_data_by_date(date)
        ticker_data = {}
        for modality, values in daily_data.items():
            if isinstance(values, dict) and ticker in values:
                ticker_data[modality] = values[ticker]
        return ticker_data

    def get_data_by_date(self, date) -> dict[str, Any]:
        date = self._normalize_date(date)
        return self.data.get(date, {})

    def get_subset_by_time_range(self, start_date, end_date):
        start_date = self._normalize_date(start_date)
        end_date = self._normalize_date(end_date)
        subset = {
            date: self.data[date]
            for date in self._date_range
            if start_date <= date <= end_date
        }
        return FinsaberDataset(data=subset, price_field=self.price_field) if subset else None

    def get_ticker_subset_by_time_range(self, ticker: str, start_date, end_date):
        start_date = self._normalize_date(start_date)
        end_date = self._normalize_date(end_date)
        subset = {}
        for date in self._date_range:
            if not start_date <= date <= end_date:
                continue
            daily_ticker_data = {}
            for modality, values in self.data[date].items():
                if isinstance(values, dict) and ticker in values:
                    daily_ticker_data[modality] = {ticker: values[ticker]}
            if "price" in daily_ticker_data:
                subset[date] = daily_ticker_data
        return FinsaberDataset(data=subset, price_field=self.price_field) if subset else None

    def get_date_range(self) -> list:
        return list(self._date_range)

    def get_tickers_list(self) -> list[str]:
        if self._tickers_list is None:
            tickers = set()
            for date in self._date_range:
                tickers.update(self.data[date].get("price", {}).keys())
            self._tickers_list = sorted(tickers)
        return self._tickers_list

    def get_price_dataframe(self, tickers=None, date_from=None, date_to=None, adjust: bool = True) -> pd.DataFrame:
        if tickers is None or tickers == "all":
            tickers = set(self.get_tickers_list())
        elif isinstance(tickers, str):
            tickers = {tickers}
        else:
            tickers = set(tickers)
        start_date = self._normalize_date(date_from) if date_from is not None else None
        end_date = self._normalize_date(date_to) if date_to is not None else None

        records = []
        for date in self._date_range:
            if start_date is not None and date < start_date:
                continue
            if end_date is not None and date > end_date:
                continue
            for symbol, price in self.data[date].get("price", {}).items():
                if symbol not in tickers:
                    continue
                if isinstance(price, dict):
                    record = {
                        "date": pd.to_datetime(date),
                        "symbol": symbol,
                        "volume": price.get("volume", 0),
                    }
                    if adjust and "adjusted_close" in price and "close" in price:
                        adjustment = 0 if price["close"] == 0 else price["adjusted_close"] / price["close"]
                        record.update({
                            "open": price.get("adjusted_open", price.get("open", 0) * adjustment),
                            "high": price.get("adjusted_high", price.get("high", 0) * adjustment),
                            "low": price.get("adjusted_low", price.get("low", 0) * adjustment),
                            "close": price["adjusted_close"],
                        })
                    else:
                        record.update({
                            "open": price.get("open", price.get("close", price.get("adjusted_close"))),
                            "high": price.get("high", price.get("close", price.get("adjusted_close"))),
                            "low": price.get("low", price.get("close", price.get("adjusted_close"))),
                            "close": price.get("close", price.get("adjusted_close")),
                        })
                else:
                    record = {
                        "date": pd.to_datetime(date),
                        "symbol": symbol,
                        "open": price,
                        "high": price,
                        "low": price,
                        "close": price,
                        "volume": 0,
                    }
                records.append(record)

        return pd.DataFrame.from_records(records)

FinsaberParquetDataset

Bases: TradingData

TradingData adapter for the FINSABER-2 partitioned parquet dataset.

Source code in backtest\data_util\finsaber_parquet_dataset.py
class FinsaberParquetDataset(TradingData):
    """TradingData adapter for the FINSABER-2 partitioned parquet dataset."""

    def __init__(
        self,
        root: str | Path,
        start_date=None,
        end_date=None,
        tickers: Iterable[str] | None = None,
        modalities: Iterable[str] = ("price", "news", "filing_k", "filing_q"),
        price_field: str = "adjusted_close",
    ):
        self.root = Path(root)
        self.start_date = self._normalize_date(start_date)
        self.end_date = self._normalize_date(end_date)
        if isinstance(tickers, str):
            self.tickers = None if tickers == "all" else [tickers]
        else:
            self.tickers = sorted(set(tickers)) if tickers is not None else None
        self.modalities = tuple(modalities)
        self.price_field = price_field
        self._data_cache = None
        self._date_range_cache = None
        self._tickers_cache = None

    @staticmethod
    def _normalize_date(date):
        if date is None:
            return None
        if isinstance(date, str):
            return pd.to_datetime(date).date()
        if isinstance(date, pd.Timestamp):
            return date.date()
        return date

    def _date_filter(self):
        filters = []
        if self.start_date is not None:
            filters.append(ds.field("date") >= self.start_date)
        if self.end_date is not None:
            filters.append(ds.field("date") <= self.end_date)
        if self.tickers is not None:
            filters.append(ds.field("symbol").isin(self.tickers))
        if not filters:
            return None
        expr = filters[0]
        for item in filters[1:]:
            expr = expr & item
        return expr

    def _read_price(self) -> pd.DataFrame:
        columns = ["date", "symbol", "cik", "open", "high", "low", "close", "adjusted_close", "volume", "year"]
        dataset = ds.dataset(str(self.root / "price_daily"), format="parquet", partitioning="hive")
        df = dataset.to_table(columns=columns, filter=self._date_filter()).to_pandas()
        if df.empty:
            return df
        df["date"] = pd.to_datetime(df["date"]).dt.date
        adjustment = df["adjusted_close"] / df["close"]
        adjustment = adjustment.where((df["close"] != 0) & adjustment.notna(), 0)
        for field in ["open", "high", "low"]:
            df[f"adjusted_{field}"] = df[field] * adjustment
        return df.sort_values(["date", "symbol"])

    def _read_news(self) -> pd.DataFrame:
        if "news" not in self.modalities:
            return pd.DataFrame()
        columns = ["date", "symbol", "cik", "item_index", "news_text", "text_len", "text_crc32", "year"]
        dataset = ds.dataset(str(self.root / "news_items"), format="parquet", partitioning="hive")
        df = dataset.to_table(columns=columns, filter=self._date_filter()).to_pandas()
        if df.empty:
            return df
        df["date"] = pd.to_datetime(df["date"]).dt.date
        return df.sort_values(["date", "symbol", "item_index"])

    def _iter_filing_files(self, folder: str):
        for file in sorted((self.root / folder).glob("year=*/part-000.parquet")):
            year = int(file.parent.name.split("=")[-1])
            if self.start_date is not None and year < self.start_date.year:
                continue
            if self.end_date is not None and year > self.end_date.year:
                continue
            yield file

    def _read_filings(self, folder: str) -> pd.DataFrame:
        frames = []
        for file in self._iter_filing_files(folder):
            parquet_file = pq.ParquetFile(file)
            df = parquet_file.read().to_pandas()
            if df.empty:
                continue
            df["date"] = pd.to_datetime(df["date"]).dt.date
            if self.start_date is not None:
                df = df[df["date"] >= self.start_date]
            if self.end_date is not None:
                df = df[df["date"] <= self.end_date]
            if self.tickers is not None:
                df = df[df["symbol"].isin(self.tickers)]
            if not df.empty:
                frames.append(df)
        if not frames:
            return pd.DataFrame()
        return pd.concat(frames, ignore_index=True).sort_values(["date", "symbol", "filing_idx"])

    def _load_data(self) -> dict:
        if self._data_cache is not None:
            return self._data_cache

        data = defaultdict(lambda: defaultdict(dict))
        price = self._read_price()
        for row in price.itertuples(index=False):
            data[row.date]["price"][row.symbol] = {
                "cik": row.cik,
                "open": row.open,
                "high": row.high,
                "low": row.low,
                "close": row.close,
                "adjusted_open": row.adjusted_open,
                "adjusted_high": row.adjusted_high,
                "adjusted_low": row.adjusted_low,
                "adjusted_close": row.adjusted_close,
                "volume": row.volume,
            }

        news = self._read_news()
        for row in news.itertuples(index=False):
            data[row.date]["news"].setdefault(row.symbol, []).append(row.news_text)

        filing_specs = (("filing_k", "filingk"), ("filing_q", "filingq"))
        for modality, folder in filing_specs:
            if modality not in self.modalities:
                continue
            filings = self._read_filings(folder)
            for row in filings.itertuples(index=False):
                current = data[row.date][modality].get(row.symbol)
                if current:
                    data[row.date][modality][row.symbol] = f"{current}\n\n{row.filing_text}"
                else:
                    data[row.date][modality][row.symbol] = row.filing_text

        self._data_cache = {date: dict(values) for date, values in data.items()}
        return self._data_cache

    def get_data_by_date(self, date) -> dict[str, Any]:
        date = self._normalize_date(date)
        return self._load_data().get(date, {})

    def get_ticker_price_by_date(self, ticker: str, date, price_field: str | None = None) -> float:
        date = self._normalize_date(date)
        price = self._load_data()[date]["price"][ticker]
        return price[price_field or self.price_field]

    def get_ticker_data_by_date(self, ticker: str, date) -> dict[str, Any]:
        daily_data = self.get_data_by_date(date)
        return {
            modality: values[ticker]
            for modality, values in daily_data.items()
            if isinstance(values, dict) and ticker in values
        }

    def get_tickers_list(self) -> list[str]:
        if self._tickers_cache is not None:
            return self._tickers_cache
        if self.tickers is not None:
            self._tickers_cache = list(self.tickers)
            return self._tickers_cache
        table = ds.dataset(str(self.root / "price_daily"), format="parquet", partitioning="hive").to_table(
            columns=["symbol"],
            filter=self._date_filter(),
        )
        self._tickers_cache = sorted(pc.unique(table["symbol"]).to_pylist())
        return self._tickers_cache

    def get_subset_by_time_range(self, start_date, end_date):
        subset = FinsaberParquetDataset(
            root=self.root,
            start_date=start_date,
            end_date=end_date,
            tickers=self.tickers,
            modalities=self.modalities,
            price_field=self.price_field,
        )
        return subset if subset.get_date_range() else None

    def get_ticker_subset_by_time_range(self, ticker: str, start_date, end_date):
        subset = FinsaberParquetDataset(
            root=self.root,
            start_date=start_date,
            end_date=end_date,
            tickers=[ticker],
            modalities=self.modalities,
            price_field=self.price_field,
        )
        return subset if subset.get_date_range() else None

    def get_date_range(self) -> list:
        if self._date_range_cache is not None:
            return self._date_range_cache
        price = self._read_price()
        self._date_range_cache = sorted(price["date"].unique().tolist()) if not price.empty else []
        return self._date_range_cache

    def get_price_dataframe(self, tickers=None, date_from=None, date_to=None, adjust: bool = True) -> pd.DataFrame:
        if tickers is None:
            tickers = self.tickers
        subset = FinsaberParquetDataset(
            root=self.root,
            start_date=date_from if date_from is not None else self.start_date,
            end_date=date_to if date_to is not None else self.end_date,
            tickers=None if tickers == "all" else tickers,
            modalities=("price",),
            price_field=self.price_field,
        )
        df = subset._read_price()
        if df.empty:
            return df
        df["date"] = pd.to_datetime(df["date"])
        if adjust:
            df = df.rename(
                columns={
                    "open": "raw_open",
                    "high": "raw_high",
                    "low": "raw_low",
                    "close": "raw_close",
                    "adjusted_open": "open",
                    "adjusted_high": "high",
                    "adjusted_low": "low",
                    "adjusted_close": "close",
                }
            )
        return df[["date", "symbol", "open", "high", "low", "close", "volume"]].copy()

TradeConfig dataclass

Source code in backtest\toolkit\trade_config.py
@dataclass
class TradeConfig:
    tickers: Union[List[str], str]  # Now can also be 'all' to indicate all tickers
    date_from: str = "2004-01-01"
    date_to: str = "2024-01-01"
    cash: float = 100000.0
    risk_free_rate: float = 0.03
    commission_per_share: float = 0.0049
    min_commission: float = 0.99
    max_commission_rate: float = 0.01
    execution_timing: str = "next_open"
    slippage_perc: float = 0.0
    slippage_impact: float = 0.0
    liquidity_lookback_days: int = 20
    liquidity_min_history_days: int = 1
    liquidity_cap_pct: float = 0.0
    llm_cost_as_trade_cost: bool = True
    print_trades_table: bool = False
    silence: bool = False
    rolling_window_size: int = 2
    rolling_window_step: int = 1
    training_years: int = None
    selection_strategy: BaseSelector = None
    setup_name: str = None
    result_filename: str = None
    save_results: bool = True
    log_base_dir: str = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "output")
    data_loader: Any = None

    def __post_init__(self):
        # Validate and manage the tickers field
        if isinstance(self.tickers, str):
            if self.tickers.lower() != "all":
                raise ValueError("tickers can either be a list of tickers or the string 'all'")
        elif not isinstance(self.tickers, list) or not all(isinstance(t, str) for t in self.tickers):
            raise ValueError("tickers must be a list of strings")

        # Validate the date_from and date_to fields
        if self.date_from > self.date_to:
            raise ValueError("date_from must be earlier than date_to")

        if self.execution_timing not in {"same_close", "next_open"}:
            raise ValueError("execution_timing must be one of: same_close, next_open")

        if self.slippage_perc < 0 or self.slippage_impact < 0:
            raise ValueError("slippage_perc and slippage_impact must be non-negative")

        if not 0 <= self.liquidity_cap_pct <= 1:
            raise ValueError("liquidity_cap_pct must be between 0 and 1")

        if self.liquidity_lookback_days < 1:
            raise ValueError("liquidity_lookback_days must be at least 1")

        if not 1 <= self.liquidity_min_history_days <= self.liquidity_lookback_days:
            raise ValueError("liquidity_min_history_days must be between 1 and liquidity_lookback_days")


    @classmethod
    def from_dict(cls, config_dict):
        """ Initialize a TradeConfig object from a dictionary """
        config_dict = dict(config_dict)
        if "commission" in config_dict and "commission_per_share" not in config_dict:
            config_dict["commission_per_share"] = config_dict.pop("commission")
        return cls(**config_dict)

    def to_dict(self):
        """ Convert the TradeConfig object to a dictionary """
        return {item.name: getattr(self, item.name) for item in fields(self)}

from_dict(config_dict) classmethod

Initialize a TradeConfig object from a dictionary

Source code in backtest\toolkit\trade_config.py
@classmethod
def from_dict(cls, config_dict):
    """ Initialize a TradeConfig object from a dictionary """
    config_dict = dict(config_dict)
    if "commission" in config_dict and "commission_per_share" not in config_dict:
        config_dict["commission_per_share"] = config_dict.pop("commission")
    return cls(**config_dict)

to_dict()

Convert the TradeConfig object to a dictionary

Source code in backtest\toolkit\trade_config.py
def to_dict(self):
    """ Convert the TradeConfig object to a dictionary """
    return {item.name: getattr(self, item.name) for item in fields(self)}

TradingData

Bases: ABC

Minimal interface for pluggable market datasets.

Implementations may store additional modalities such as news, filings, earnings calls, or transcripts. Backtest engines should only depend on this interface, not on a specific storage format.

Source code in backtest\data_util\trading_data.py
class TradingData(ABC):
    """Minimal interface for pluggable market datasets.

    Implementations may store additional modalities such as news, filings,
    earnings calls, or transcripts. Backtest engines should only depend on this
    interface, not on a specific storage format.
    """

    @abstractmethod
    def get_data_by_date(self, date) -> dict[str, Any]:
        raise NotImplementedError

    @abstractmethod
    def get_ticker_price_by_date(self, ticker: str, date, price_field: str | None = None) -> float:
        raise NotImplementedError

    @abstractmethod
    def get_ticker_data_by_date(self, ticker: str, date) -> dict[str, Any]:
        raise NotImplementedError

    @abstractmethod
    def get_tickers_list(self) -> list[str]:
        raise NotImplementedError

    def get_ticker_list(self) -> list[str]:
        return self.get_tickers_list()

    @abstractmethod
    def get_subset_by_time_range(self, start_date, end_date):
        raise NotImplementedError

    def get_data_by_time_range(self, start_date, end_date):
        return self.get_subset_by_time_range(start_date, end_date)

    @abstractmethod
    def get_ticker_subset_by_time_range(self, ticker: str, start_date, end_date):
        raise NotImplementedError

    def get_ticker_data_by_time_range(self, ticker: str, start_date, end_date):
        return self.get_ticker_subset_by_time_range(ticker, start_date, end_date)

    @abstractmethod
    def get_date_range(self) -> list:
        raise NotImplementedError

    def get_modalities(self) -> list[str]:
        dates = self.get_date_range()
        if not dates:
            return []
        return list(self.get_data_by_date(dates[0]).keys())

backtest.data_util.trading_data.TradingData

Bases: ABC

Minimal interface for pluggable market datasets.

Implementations may store additional modalities such as news, filings, earnings calls, or transcripts. Backtest engines should only depend on this interface, not on a specific storage format.

Source code in backtest\data_util\trading_data.py
class TradingData(ABC):
    """Minimal interface for pluggable market datasets.

    Implementations may store additional modalities such as news, filings,
    earnings calls, or transcripts. Backtest engines should only depend on this
    interface, not on a specific storage format.
    """

    @abstractmethod
    def get_data_by_date(self, date) -> dict[str, Any]:
        raise NotImplementedError

    @abstractmethod
    def get_ticker_price_by_date(self, ticker: str, date, price_field: str | None = None) -> float:
        raise NotImplementedError

    @abstractmethod
    def get_ticker_data_by_date(self, ticker: str, date) -> dict[str, Any]:
        raise NotImplementedError

    @abstractmethod
    def get_tickers_list(self) -> list[str]:
        raise NotImplementedError

    def get_ticker_list(self) -> list[str]:
        return self.get_tickers_list()

    @abstractmethod
    def get_subset_by_time_range(self, start_date, end_date):
        raise NotImplementedError

    def get_data_by_time_range(self, start_date, end_date):
        return self.get_subset_by_time_range(start_date, end_date)

    @abstractmethod
    def get_ticker_subset_by_time_range(self, ticker: str, start_date, end_date):
        raise NotImplementedError

    def get_ticker_data_by_time_range(self, ticker: str, start_date, end_date):
        return self.get_ticker_subset_by_time_range(ticker, start_date, end_date)

    @abstractmethod
    def get_date_range(self) -> list:
        raise NotImplementedError

    def get_modalities(self) -> list[str]:
        dates = self.get_date_range()
        if not dates:
            return []
        return list(self.get_data_by_date(dates[0]).keys())

backtest.data_util.finsaber_dataset.FinsaberDataset

Bases: TradingData

TradingData adapter for FINSABER aggregated date dictionaries.

Expected daily shape is extensible: {date: {"price": {ticker: bar}, "news": {ticker: [...]}, ...}}. Additional modalities are preserved and filtered by ticker when possible.

Source code in backtest\data_util\finsaber_dataset.py
class FinsaberDataset(TradingData):
    """TradingData adapter for FINSABER aggregated date dictionaries.

    Expected daily shape is extensible:
    ``{date: {"price": {ticker: bar}, "news": {ticker: [...]}, ...}}``.
    Additional modalities are preserved and filtered by ticker when possible.
    """

    def __init__(self, pickle_file: str | None = None, data: dict | None = None, price_field: str = "adjusted_close"):
        if pickle_file is None and data is None:
            raise ValueError("Either pickle_file or data must be provided")
        if pickle_file is not None and data is not None:
            raise ValueError("Only one of pickle_file or data must be provided")

        if pickle_file is not None:
            with open(pickle_file, "rb") as file:
                self.data = pickle.load(file)
        else:
            self.data = data

        self.price_field = price_field
        self._tickers_list = None
        self._date_range = sorted(self.data.keys())

    @staticmethod
    def _normalize_date(date):
        if isinstance(date, str):
            return pd.to_datetime(date).date()
        if isinstance(date, pd.Timestamp):
            return date.date()
        return date

    def get_ticker_price_by_date(self, ticker: str, date, price_field: str | None = None) -> float:
        date = self._normalize_date(date)
        price = self.data[date]["price"][ticker]
        if isinstance(price, dict):
            field = price_field or self.price_field
            if field in price:
                return price[field]
            if field.startswith("adjusted_") and "adjusted_close" in price and "close" in price:
                if price["close"] == 0:
                    return 0
                adjustment = price["adjusted_close"] / price["close"]
                raw_field = field.removeprefix("adjusted_")
                if raw_field in price:
                    return price[raw_field] * adjustment
            return price.get("close", price.get("adjusted_close"))
        return price

    def get_ticker_data_by_date(self, ticker: str, date) -> dict[str, Any]:
        daily_data = self.get_data_by_date(date)
        ticker_data = {}
        for modality, values in daily_data.items():
            if isinstance(values, dict) and ticker in values:
                ticker_data[modality] = values[ticker]
        return ticker_data

    def get_data_by_date(self, date) -> dict[str, Any]:
        date = self._normalize_date(date)
        return self.data.get(date, {})

    def get_subset_by_time_range(self, start_date, end_date):
        start_date = self._normalize_date(start_date)
        end_date = self._normalize_date(end_date)
        subset = {
            date: self.data[date]
            for date in self._date_range
            if start_date <= date <= end_date
        }
        return FinsaberDataset(data=subset, price_field=self.price_field) if subset else None

    def get_ticker_subset_by_time_range(self, ticker: str, start_date, end_date):
        start_date = self._normalize_date(start_date)
        end_date = self._normalize_date(end_date)
        subset = {}
        for date in self._date_range:
            if not start_date <= date <= end_date:
                continue
            daily_ticker_data = {}
            for modality, values in self.data[date].items():
                if isinstance(values, dict) and ticker in values:
                    daily_ticker_data[modality] = {ticker: values[ticker]}
            if "price" in daily_ticker_data:
                subset[date] = daily_ticker_data
        return FinsaberDataset(data=subset, price_field=self.price_field) if subset else None

    def get_date_range(self) -> list:
        return list(self._date_range)

    def get_tickers_list(self) -> list[str]:
        if self._tickers_list is None:
            tickers = set()
            for date in self._date_range:
                tickers.update(self.data[date].get("price", {}).keys())
            self._tickers_list = sorted(tickers)
        return self._tickers_list

    def get_price_dataframe(self, tickers=None, date_from=None, date_to=None, adjust: bool = True) -> pd.DataFrame:
        if tickers is None or tickers == "all":
            tickers = set(self.get_tickers_list())
        elif isinstance(tickers, str):
            tickers = {tickers}
        else:
            tickers = set(tickers)
        start_date = self._normalize_date(date_from) if date_from is not None else None
        end_date = self._normalize_date(date_to) if date_to is not None else None

        records = []
        for date in self._date_range:
            if start_date is not None and date < start_date:
                continue
            if end_date is not None and date > end_date:
                continue
            for symbol, price in self.data[date].get("price", {}).items():
                if symbol not in tickers:
                    continue
                if isinstance(price, dict):
                    record = {
                        "date": pd.to_datetime(date),
                        "symbol": symbol,
                        "volume": price.get("volume", 0),
                    }
                    if adjust and "adjusted_close" in price and "close" in price:
                        adjustment = 0 if price["close"] == 0 else price["adjusted_close"] / price["close"]
                        record.update({
                            "open": price.get("adjusted_open", price.get("open", 0) * adjustment),
                            "high": price.get("adjusted_high", price.get("high", 0) * adjustment),
                            "low": price.get("adjusted_low", price.get("low", 0) * adjustment),
                            "close": price["adjusted_close"],
                        })
                    else:
                        record.update({
                            "open": price.get("open", price.get("close", price.get("adjusted_close"))),
                            "high": price.get("high", price.get("close", price.get("adjusted_close"))),
                            "low": price.get("low", price.get("close", price.get("adjusted_close"))),
                            "close": price.get("close", price.get("adjusted_close")),
                        })
                else:
                    record = {
                        "date": pd.to_datetime(date),
                        "symbol": symbol,
                        "open": price,
                        "high": price,
                        "low": price,
                        "close": price,
                        "volume": 0,
                    }
                records.append(record)

        return pd.DataFrame.from_records(records)

backtest.data_util.finsaber_parquet_dataset.FinsaberParquetDataset

Bases: TradingData

TradingData adapter for the FINSABER-2 partitioned parquet dataset.

Source code in backtest\data_util\finsaber_parquet_dataset.py
class FinsaberParquetDataset(TradingData):
    """TradingData adapter for the FINSABER-2 partitioned parquet dataset."""

    def __init__(
        self,
        root: str | Path,
        start_date=None,
        end_date=None,
        tickers: Iterable[str] | None = None,
        modalities: Iterable[str] = ("price", "news", "filing_k", "filing_q"),
        price_field: str = "adjusted_close",
    ):
        self.root = Path(root)
        self.start_date = self._normalize_date(start_date)
        self.end_date = self._normalize_date(end_date)
        if isinstance(tickers, str):
            self.tickers = None if tickers == "all" else [tickers]
        else:
            self.tickers = sorted(set(tickers)) if tickers is not None else None
        self.modalities = tuple(modalities)
        self.price_field = price_field
        self._data_cache = None
        self._date_range_cache = None
        self._tickers_cache = None

    @staticmethod
    def _normalize_date(date):
        if date is None:
            return None
        if isinstance(date, str):
            return pd.to_datetime(date).date()
        if isinstance(date, pd.Timestamp):
            return date.date()
        return date

    def _date_filter(self):
        filters = []
        if self.start_date is not None:
            filters.append(ds.field("date") >= self.start_date)
        if self.end_date is not None:
            filters.append(ds.field("date") <= self.end_date)
        if self.tickers is not None:
            filters.append(ds.field("symbol").isin(self.tickers))
        if not filters:
            return None
        expr = filters[0]
        for item in filters[1:]:
            expr = expr & item
        return expr

    def _read_price(self) -> pd.DataFrame:
        columns = ["date", "symbol", "cik", "open", "high", "low", "close", "adjusted_close", "volume", "year"]
        dataset = ds.dataset(str(self.root / "price_daily"), format="parquet", partitioning="hive")
        df = dataset.to_table(columns=columns, filter=self._date_filter()).to_pandas()
        if df.empty:
            return df
        df["date"] = pd.to_datetime(df["date"]).dt.date
        adjustment = df["adjusted_close"] / df["close"]
        adjustment = adjustment.where((df["close"] != 0) & adjustment.notna(), 0)
        for field in ["open", "high", "low"]:
            df[f"adjusted_{field}"] = df[field] * adjustment
        return df.sort_values(["date", "symbol"])

    def _read_news(self) -> pd.DataFrame:
        if "news" not in self.modalities:
            return pd.DataFrame()
        columns = ["date", "symbol", "cik", "item_index", "news_text", "text_len", "text_crc32", "year"]
        dataset = ds.dataset(str(self.root / "news_items"), format="parquet", partitioning="hive")
        df = dataset.to_table(columns=columns, filter=self._date_filter()).to_pandas()
        if df.empty:
            return df
        df["date"] = pd.to_datetime(df["date"]).dt.date
        return df.sort_values(["date", "symbol", "item_index"])

    def _iter_filing_files(self, folder: str):
        for file in sorted((self.root / folder).glob("year=*/part-000.parquet")):
            year = int(file.parent.name.split("=")[-1])
            if self.start_date is not None and year < self.start_date.year:
                continue
            if self.end_date is not None and year > self.end_date.year:
                continue
            yield file

    def _read_filings(self, folder: str) -> pd.DataFrame:
        frames = []
        for file in self._iter_filing_files(folder):
            parquet_file = pq.ParquetFile(file)
            df = parquet_file.read().to_pandas()
            if df.empty:
                continue
            df["date"] = pd.to_datetime(df["date"]).dt.date
            if self.start_date is not None:
                df = df[df["date"] >= self.start_date]
            if self.end_date is not None:
                df = df[df["date"] <= self.end_date]
            if self.tickers is not None:
                df = df[df["symbol"].isin(self.tickers)]
            if not df.empty:
                frames.append(df)
        if not frames:
            return pd.DataFrame()
        return pd.concat(frames, ignore_index=True).sort_values(["date", "symbol", "filing_idx"])

    def _load_data(self) -> dict:
        if self._data_cache is not None:
            return self._data_cache

        data = defaultdict(lambda: defaultdict(dict))
        price = self._read_price()
        for row in price.itertuples(index=False):
            data[row.date]["price"][row.symbol] = {
                "cik": row.cik,
                "open": row.open,
                "high": row.high,
                "low": row.low,
                "close": row.close,
                "adjusted_open": row.adjusted_open,
                "adjusted_high": row.adjusted_high,
                "adjusted_low": row.adjusted_low,
                "adjusted_close": row.adjusted_close,
                "volume": row.volume,
            }

        news = self._read_news()
        for row in news.itertuples(index=False):
            data[row.date]["news"].setdefault(row.symbol, []).append(row.news_text)

        filing_specs = (("filing_k", "filingk"), ("filing_q", "filingq"))
        for modality, folder in filing_specs:
            if modality not in self.modalities:
                continue
            filings = self._read_filings(folder)
            for row in filings.itertuples(index=False):
                current = data[row.date][modality].get(row.symbol)
                if current:
                    data[row.date][modality][row.symbol] = f"{current}\n\n{row.filing_text}"
                else:
                    data[row.date][modality][row.symbol] = row.filing_text

        self._data_cache = {date: dict(values) for date, values in data.items()}
        return self._data_cache

    def get_data_by_date(self, date) -> dict[str, Any]:
        date = self._normalize_date(date)
        return self._load_data().get(date, {})

    def get_ticker_price_by_date(self, ticker: str, date, price_field: str | None = None) -> float:
        date = self._normalize_date(date)
        price = self._load_data()[date]["price"][ticker]
        return price[price_field or self.price_field]

    def get_ticker_data_by_date(self, ticker: str, date) -> dict[str, Any]:
        daily_data = self.get_data_by_date(date)
        return {
            modality: values[ticker]
            for modality, values in daily_data.items()
            if isinstance(values, dict) and ticker in values
        }

    def get_tickers_list(self) -> list[str]:
        if self._tickers_cache is not None:
            return self._tickers_cache
        if self.tickers is not None:
            self._tickers_cache = list(self.tickers)
            return self._tickers_cache
        table = ds.dataset(str(self.root / "price_daily"), format="parquet", partitioning="hive").to_table(
            columns=["symbol"],
            filter=self._date_filter(),
        )
        self._tickers_cache = sorted(pc.unique(table["symbol"]).to_pylist())
        return self._tickers_cache

    def get_subset_by_time_range(self, start_date, end_date):
        subset = FinsaberParquetDataset(
            root=self.root,
            start_date=start_date,
            end_date=end_date,
            tickers=self.tickers,
            modalities=self.modalities,
            price_field=self.price_field,
        )
        return subset if subset.get_date_range() else None

    def get_ticker_subset_by_time_range(self, ticker: str, start_date, end_date):
        subset = FinsaberParquetDataset(
            root=self.root,
            start_date=start_date,
            end_date=end_date,
            tickers=[ticker],
            modalities=self.modalities,
            price_field=self.price_field,
        )
        return subset if subset.get_date_range() else None

    def get_date_range(self) -> list:
        if self._date_range_cache is not None:
            return self._date_range_cache
        price = self._read_price()
        self._date_range_cache = sorted(price["date"].unique().tolist()) if not price.empty else []
        return self._date_range_cache

    def get_price_dataframe(self, tickers=None, date_from=None, date_to=None, adjust: bool = True) -> pd.DataFrame:
        if tickers is None:
            tickers = self.tickers
        subset = FinsaberParquetDataset(
            root=self.root,
            start_date=date_from if date_from is not None else self.start_date,
            end_date=date_to if date_to is not None else self.end_date,
            tickers=None if tickers == "all" else tickers,
            modalities=("price",),
            price_field=self.price_field,
        )
        df = subset._read_price()
        if df.empty:
            return df
        df["date"] = pd.to_datetime(df["date"])
        if adjust:
            df = df.rename(
                columns={
                    "open": "raw_open",
                    "high": "raw_high",
                    "low": "raw_low",
                    "close": "raw_close",
                    "adjusted_open": "open",
                    "adjusted_high": "high",
                    "adjusted_low": "low",
                    "adjusted_close": "close",
                }
            )
        return df[["date", "symbol", "open", "high", "low", "close", "volume"]].copy()

backtest.toolkit.trade_config.TradeConfig dataclass

Source code in backtest\toolkit\trade_config.py
@dataclass
class TradeConfig:
    tickers: Union[List[str], str]  # Now can also be 'all' to indicate all tickers
    date_from: str = "2004-01-01"
    date_to: str = "2024-01-01"
    cash: float = 100000.0
    risk_free_rate: float = 0.03
    commission_per_share: float = 0.0049
    min_commission: float = 0.99
    max_commission_rate: float = 0.01
    execution_timing: str = "next_open"
    slippage_perc: float = 0.0
    slippage_impact: float = 0.0
    liquidity_lookback_days: int = 20
    liquidity_min_history_days: int = 1
    liquidity_cap_pct: float = 0.0
    llm_cost_as_trade_cost: bool = True
    print_trades_table: bool = False
    silence: bool = False
    rolling_window_size: int = 2
    rolling_window_step: int = 1
    training_years: int = None
    selection_strategy: BaseSelector = None
    setup_name: str = None
    result_filename: str = None
    save_results: bool = True
    log_base_dir: str = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "output")
    data_loader: Any = None

    def __post_init__(self):
        # Validate and manage the tickers field
        if isinstance(self.tickers, str):
            if self.tickers.lower() != "all":
                raise ValueError("tickers can either be a list of tickers or the string 'all'")
        elif not isinstance(self.tickers, list) or not all(isinstance(t, str) for t in self.tickers):
            raise ValueError("tickers must be a list of strings")

        # Validate the date_from and date_to fields
        if self.date_from > self.date_to:
            raise ValueError("date_from must be earlier than date_to")

        if self.execution_timing not in {"same_close", "next_open"}:
            raise ValueError("execution_timing must be one of: same_close, next_open")

        if self.slippage_perc < 0 or self.slippage_impact < 0:
            raise ValueError("slippage_perc and slippage_impact must be non-negative")

        if not 0 <= self.liquidity_cap_pct <= 1:
            raise ValueError("liquidity_cap_pct must be between 0 and 1")

        if self.liquidity_lookback_days < 1:
            raise ValueError("liquidity_lookback_days must be at least 1")

        if not 1 <= self.liquidity_min_history_days <= self.liquidity_lookback_days:
            raise ValueError("liquidity_min_history_days must be between 1 and liquidity_lookback_days")


    @classmethod
    def from_dict(cls, config_dict):
        """ Initialize a TradeConfig object from a dictionary """
        config_dict = dict(config_dict)
        if "commission" in config_dict and "commission_per_share" not in config_dict:
            config_dict["commission_per_share"] = config_dict.pop("commission")
        return cls(**config_dict)

    def to_dict(self):
        """ Convert the TradeConfig object to a dictionary """
        return {item.name: getattr(self, item.name) for item in fields(self)}

from_dict(config_dict) classmethod

Initialize a TradeConfig object from a dictionary

Source code in backtest\toolkit\trade_config.py
@classmethod
def from_dict(cls, config_dict):
    """ Initialize a TradeConfig object from a dictionary """
    config_dict = dict(config_dict)
    if "commission" in config_dict and "commission_per_share" not in config_dict:
        config_dict["commission_per_share"] = config_dict.pop("commission")
    return cls(**config_dict)

to_dict()

Convert the TradeConfig object to a dictionary

Source code in backtest\toolkit\trade_config.py
def to_dict(self):
    """ Convert the TradeConfig object to a dictionary """
    return {item.name: getattr(self, item.name) for item in fields(self)}

backtest.toolkit.execution

backtest.toolkit.result_writer

backtest.finsaber_bt.FINSABERBt

Source code in backtest\finsaber_bt.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
class FINSABERBt:
    def __init__(
            self,
            config: dict,
    ):
        """
        :param config: The configuration for the trade operator
        """
        self.trade_config = TradeConfig.from_dict(config)
        self._price_data_cache = {}


    def run_rolling_window(self, strategy: bt.Strategy, process: callable = None, **kwargs):
        """
        Call run_iterative_tickers or execute_all for each rolling window
        :param strategy: The strategy to execute
        :param process: The function to process the data
        :param kwargs: Additional arguments for the strategy
        """
        # divide the date into rolling windows
        rolling_window_size = self.trade_config.rolling_window_size # in years
        rolling_window_step = self.trade_config.rolling_window_step # in years


        # e.g. 2000-01-01 to 2005-01-01, rolling_window_size=2, rolling_window_step=1, then the rolling windows are:
        # 2000-01-01 to 2002-01-01, 2001-01-01 to 2003-01-01, 2002-01-01 to 2004-01-01, 2003-01-01 to 2005-01-01
        date_from = pd.to_datetime(self.trade_config.date_from)
        date_to = pd.to_datetime(self.trade_config.date_to)

        # check selection strategy
        if self.trade_config.setup_name in ["selected_4", "selected_5", "cherry_pick_both_finmem"]:
            stock_selector = FinMemSelector()
        else:
            # TODO: implement other selection strategies
            stock_selector = self.trade_config.selection_strategy

        rolling_windows = []
        while date_from + pd.DateOffset(years=rolling_window_size) <= date_to:
            rolling_windows.append((date_from, date_from + pd.DateOffset(years=rolling_window_size)))
            date_from += pd.DateOffset(years=rolling_window_step)

        eval_metrics = {}
        windows_loop = tqdm(rolling_windows, disable=self.trade_config.silence)

        for window in windows_loop:
            windows_loop.set_description(f"Processing window {window[0].strftime('%Y')} to {window[1].strftime('%Y')}")

            self.trade_config.tickers = stock_selector.select(
                self.trade_config.data_loader,
                window[0].strftime("%Y-%m-%d"),
                window[1].strftime("%Y-%m-%d")
            )
            if not self.trade_config.silence:
                print(f"Selected tickers for the period {window[0].strftime('%Y')} to {window[1].strftime('%Y')}: {self.trade_config.tickers}")

            test_config = self.trade_config.to_dict()
            test_config["date_from"] = window[0].strftime("%Y-%m-%d")
            test_config["date_to"] = window[1].strftime("%Y-%m-%d")

            eval_metrics[f"{window[0].strftime('%Y-%m-%d')}_{window[1].strftime('%Y-%m-%d')}"] \
                = self.run_iterative_tickers(strategy, process, test_config=test_config, **kwargs)

        # export the evaluation metrics
        if self.trade_config.save_results:
            output_dir = os.path.join(self.trade_config.log_base_dir, self.trade_config.setup_name.replace(":", "_"), strategy.__name__)
            filename = f"{self.trade_config.date_from}_{self.trade_config.date_to}.pkl" if self.trade_config.result_filename is None else self.trade_config.result_filename
            os.makedirs(output_dir, exist_ok=True)
            with open(os.path.join(output_dir, filename), "wb") as f:
                pickle.dump(eval_metrics, f)
            write_result_artifacts(output_dir, self.trade_config.to_dict(), eval_metrics)

        return eval_metrics

    def run_iterative_tickers(self, strategy: bt.Strategy, process: callable = None, test_config: dict = None, **kwargs):
        """
        Execute the strategy
        :param strategy: The strategy to execute
        :param process: The function to process the data
        :param test_config: The configuration if different from the global configuration
        :param kwargs: Additional arguments for the strategy
        """

        if test_config is None:
            test_config = self.trade_config
        else:
            test_config = TradeConfig.from_dict(test_config)

        eval_metrics = {}

        tickers_loop = (
            test_config.data_loader.get_tickers_list()
            if test_config.tickers == "all" and test_config.data_loader is not None
            else test_config.tickers
        )
        if test_config.tickers == "all" and test_config.data_loader is None:
            tickers_loop = get_tickers_price("all", return_original=True)["symbol"].unique()

        for ticker in tickers_loop:

            # print(f"Processing ticker {ticker}...")

            cerebro = bt.Cerebro()

            pd_data = self._get_ticker_price_data(
                ticker=ticker,
                date_from=test_config.date_from,
                date_to=test_config.date_to,
                test_config=test_config,
            )
            train_data = None
            strategy_kwargs = dict(kwargs)

            for additional_arg in strategy_kwargs:
                # if it is callable, call it
                if callable(strategy_kwargs[additional_arg]):
                    strategy_kwargs[additional_arg] = strategy_kwargs[additional_arg](pd_data)

            if "prior_period" in vars(strategy.params).keys():
                if test_config.training_years is not None:
                    strategy.params.prior_period = test_config.training_years * 252

                if not strategy.params.prior_period % 252 == 0:
                    raise ValueError("prior_period must be a multiple of 252")

                prior_year = strategy.params.prior_period // 252
                prior_data = self._get_ticker_price_data(
                    ticker,
                    date_from=(pd.to_datetime(test_config.date_from) - pd.DateOffset(years=prior_year)).strftime("%Y-%m-%d"),
                    date_to=(pd.to_datetime(test_config.date_from) - pd.DateOffset(days=1)).strftime("%Y-%m-%d"),
                    test_config=test_config,
                )

                if prior_data is not None:
                    if prior_data.index.min().year > pd.to_datetime(test_config.date_from).year - prior_year:
                        if not test_config.silence:
                            print(f"Prior data for {ticker} is not enough at year {pd.to_datetime(test_config.date_from).year}")
                        continue
                else:
                    if not test_config.silence:
                        print(f"No prior data for {ticker} at year {pd.to_datetime(test_config.date_from).year}")
                    continue

            # if the model needs to be trained, set the training data that are not used for backtesting
            if "train_period" in vars(strategy.params).keys():
                if test_config.training_years is not None:
                    strategy.params.train_period = test_config.training_years * 252

                if not strategy.params.train_period % 252 == 0:
                    raise ValueError("train_period must be a multiple of 252")

                train_year = strategy.params.train_period // 252
                train_data = self._get_ticker_price_data(
                    ticker,
                    date_from=(pd.to_datetime(test_config.date_from) - pd.DateOffset(years=train_year)).strftime("%Y-%m-%d"),
                    date_to=(pd.to_datetime(test_config.date_from) - pd.DateOffset(days=1)).strftime("%Y-%m-%d"),
                    test_config=test_config,
                )

                strategy_kwargs["train_data"] = train_data

            # Explicit backtests should not require January-start windows; only require enough bars.
            min_bars = 2
            if pd_data is None or pd_data.empty or len(pd_data) < min_bars:
                if not test_config.silence:
                    print(f"No usable data in the period {test_config.date_from} to {test_config.date_to} for {ticker}")
                continue

            # skip if no enough data for training
            if train_data is not None:
                if train_data.index.min().year > pd.to_datetime(test_config.date_from).year - train_year:
                    if not test_config.silence:
                        print(f"Train data for {ticker} is not enough at year {pd.to_datetime(test_config.date_from).year}")
                    continue

            # detect if the stock is delisted in the middle of the period, if it is, assign 0 price to the missing dates
            # This indicates a complete loss of the stock
            end_date = pd.to_datetime(test_config.date_to) - pd.DateOffset(days=1)
            all_expected_trading_days = pd.bdate_range(start=pd_data.index.min(), end=end_date)
            last_expected_date = all_expected_trading_days[-1]
            last_data_date = pd_data.index.max()

            if last_data_date < last_expected_date - pd.DateOffset(days=3):
                # If the last data date is more than 3 days before the last expected date (avoid weekend or holidays), we assume the stock is delisted
                if not test_config.silence:
                    print(
                        f"{ticker} appears to be delisted on {last_data_date.strftime('%Y-%m-%d')}, applying 7 days delisting announcement period.")

                # remove the last 7 days of data
                pd_data = pd_data[pd_data.index <= last_data_date - pd.DateOffset(days=7)]

            # check again after potential delisting truncation
            if pd_data is None or pd_data.empty or len(pd_data) < min_bars:
                if not test_config.silence:
                    print(f"Not enough data in the period {test_config.date_from} to {test_config.date_to} for {ticker}")
                continue

            add_tickers_data(cerebro, pd_data)

            # Add a strategy
            cerebro.addstrategy(strategy, total_days=len(set(pd_data.index.tolist())), **strategy_kwargs)

            # Set our desired cash start
            cerebro.broker.setcash(test_config.cash)
            commission_scheme = USStockCommission(
                commission_per_share=test_config.commission_per_share,
                min_commission=test_config.min_commission,
                max_commission_rate=test_config.max_commission_rate,
            )
            cerebro.broker.addcommissioninfo(commission_scheme)
            cerebro.broker.set_shortcash(False)
            if test_config.execution_timing == "same_close":
                cerebro.broker.set_coc(True)
            if test_config.slippage_perc > 0:
                cerebro.broker.set_slippage_perc(
                    test_config.slippage_perc,
                    slip_open=True,
                    slip_limit=True,
                    slip_match=True,
                    slip_out=False,
                )
            if test_config.liquidity_cap_pct > 0:
                cerebro.broker.set_filler(
                    MovingAverageVolumePercFiller(
                        cap_pct=test_config.liquidity_cap_pct,
                        lookback_days=test_config.liquidity_lookback_days,
                        min_history_days=test_config.liquidity_min_history_days,
                    )
                )

            # Add observers
            cerebro.addobserver(bt.observers.Value)

            # Add analyzers for Sharpe Ratio and Drawdown
            cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='mysharpe', riskfreerate=test_config.risk_free_rate, timeframe=bt.TimeFrame.Days, annualize=True)
            cerebro.addanalyzer(bt.analyzers.DrawDown, _name='mydrawdown')
            cerebro.addanalyzer(bt.analyzers.Returns, _name='myreturns')
            # cerebro.addanalyzer(bt.analyzers.AnnualReturn, _name='myannualreturn')
            # cerebro.addanalyzer(bt.analyzers.VWR, _name='myvwr')  # Annualized volatility

            # Run over everything
            results = cerebro.run()
            strat = results[0]

            if not test_config.silence:
                # Print out the final result
                eval_metrics[ticker] = self._analyze_results(
                    strat,
                    test_config=test_config,
                    ticker=ticker,
                    print_trades_table=test_config.print_trades_table,
                )
            else:
                eval_metrics[ticker] = self._analyze_results(
                    strat,
                    test_config=test_config,
                    ticker=ticker,
                    print_trades_table=False,
                    print_annual_metrics=False,
                    print_details=test_config.print_trades_table
                )

            # Obtain the equity curve
            equity_with_time = pd.DataFrame(
                {
                    "datetime": strat.equity_date,
                    "equity": strat.equity
                }
            )

            eval_metrics[ticker]["equity_with_time"] = equity_with_time

            if not test_config.silence:
                # Plot the result
                plt.figure(figsize=(10, 6))
                plt.plot(equity_with_time["datetime"], equity_with_time["equity"], label="Equity Curve")
                plt.title(f"Equity Curve for {ticker}")
                plt.xlabel("Date")
                plt.ylabel("Equity")
                plt.legend()
                plt.show()

        if "cherry_pick" in test_config.setup_name and test_config.save_results:
            # store the results using pickle
            output_dir = os.path.join(test_config.log_base_dir, test_config.setup_name.replace(":", "_"), strategy.__name__)
            filename = f"{test_config.date_from}_{test_config.date_to}.pkl" if test_config.result_filename is None else test_config.result_filename
            os.makedirs(output_dir, exist_ok=True)
            with open(os.path.join(output_dir, filename), "wb") as f:
                output_results = {f"{test_config.date_from}_{test_config.date_to}": eval_metrics}
                pickle.dump(output_results, f)
            write_result_artifacts(output_dir, test_config.to_dict(), output_results)
        return eval_metrics

    def _get_ticker_price_data(self, ticker, date_from, date_to, test_config):
        cache_key = (ticker, date_from, date_to, id(test_config.data_loader))
        if cache_key in self._price_data_cache:
            return self._price_data_cache[cache_key]

        if test_config.data_loader is not None:
            price_data = get_tickers_price_from_data_loader(
                test_config.data_loader,
                ticker,
                date_from=date_from,
                date_to=date_to,
            )
        else:
            price_data = get_tickers_price(ticker, date_from=date_from, date_to=date_to)

        self._price_data_cache[cache_key] = price_data
        return price_data



    def _analyze_results(self,
                         strategy: bt.Strategy,
                         ticker: str,
                         test_config: TradeConfig,
                         print_details=True,
                         print_annual_metrics=True,
                         print_trades_table=False):


        if strategy is None:
            print("No strategy results to analyze")
            return None


        max_drawdown = strategy.analyzers.mydrawdown.get_analysis().max.drawdown
        total_return = strategy.broker.getvalue() / test_config.cash - 1
        total_return_cash = strategy.broker.getvalue() - test_config.cash
        total_commission = sum(order.get("commission", 0) for order in getattr(strategy, "executed_orders", []))
        annual_metrics = self._calculate_annualized_metrics(strategy, test_config=test_config)

        if print_details:
            print("\n" + "=" * 50)
            print(f"Period: {test_config.date_from} to {test_config.date_to}")
            print(f"Ticker: {ticker}")
            print("-" * 50)
            print(f"Initial cash: {test_config.cash}")
            print(f"Final cash: {strategy.broker.getvalue():.2f}")
            print(f"Total return (cash): {total_return_cash:.2f}")
            print(f"Total return (%): {total_return:.2%}")
            print(f"Max drawdown (%): {max_drawdown:.2f}%")
            print(f"Number of trades: {len(strategy.trades)}")
            print(f"Total commission: {total_commission:.2f}")

        if print_annual_metrics:
            print("-" * 50)
            print(f"Annual return: {annual_metrics['Annual Return']:.2%}")
            print(f"Annual volatility: {annual_metrics['Annual Volatility']:.2%}")
            print(f"Sharpe ratio: {annual_metrics['Sharpe Ratio']:.4f}")
            print(f"Sortino ratio: {annual_metrics['Sortino Ratio']:.4f}")

        if print_trades_table:
            trades = []
            for trade in strategy.trades:
                trades.append([trade.open_datetime().date(), trade.close_datetime().date(), trade.price, trade.pnl, trade.pnlcomm])
            trades_df = pd.DataFrame(trades, columns=['Open Date', 'Close Date', "Price", 'Profit/Loss',
                                                      'PnL (incl. commission)'])
            print("-" * 50)
            print("Trades:")
            print(tabulate(trades_df, headers='keys', tablefmt='psql'))

        if not test_config.silence:
            print("="*50)

        return {
            'sharpe_ratio': annual_metrics['Sharpe Ratio'],
            'annual_return': annual_metrics['Annual Return'],
            'annual_volatility': annual_metrics['Annual Volatility'],
            'sortino_ratio': annual_metrics['Sortino Ratio'],
            'max_drawdown': max_drawdown,
            'total_return': total_return,
            'total_commission': total_commission,
            'total_slippage': 0.0,
            'total_trading_cost': total_commission,
            'executed_orders': pd.DataFrame(getattr(strategy, "executed_orders", [])),
        }


    def _calculate_annualized_metrics(
            self,
            strategy: bt.Strategy,
            test_config: TradeConfig):

        # Calculate the daily returns from the equity curve
        daily_returns = pd.Series(strategy.equity).pct_change().dropna()

        # average_daily_return = daily_returns.mean()
        # daily_risk_free_rate = (1 + test_config.risk_free_rate) ** (1 / (252)) - 1
        # excess_daily_return = average_daily_return - daily_risk_free_rate
        # self_calculate_sharpe_ratio = excess_daily_return / daily_returns.std() * np.sqrt(252)
        # print("Self calculated Sharpe ratio: ", self_calculate_sharpe_ratio)

        if not daily_returns.empty and daily_returns.any():
            if strategy.broker.getvalue() < 0:
                print("Negative value in equity curve")
                final_value = 0
            else:
                final_value = strategy.broker.getvalue()

            total_return = (final_value / test_config.cash) - 1
            total_periods = len(daily_returns)
            annual_return = (1 + total_return) ** (252 / total_periods) - 1
            # check if annual return is float
            try:
                assert isinstance(annual_return, float), f"Annual return is not float: {annual_return}"
            except AssertionError as e:
                print("value", strategy.broker.getvalue())
                print("cash", test_config.cash)
                print("total return", total_return)
                print("total periods", total_periods)
                print("annual return", annual_return)
                # print stock symbol
                print("stock symbol", strategy.datas[0]._name)
                raise e

            # Calculate annual volatility
            annual_volatility = metrics.calculate_annual_volatility(daily_returns)

            sortino_ratio = metrics.calculate_sortino_ratio(daily_returns, risk_free_rate=test_config.risk_free_rate)

            # Use the analyzer's Sharpe ratio if available
            sharpe_ratio = strategy.analyzers.mysharpe.get_analysis()['sharperatio']
        else:
            annual_return = annual_volatility = sharpe_ratio = sortino_ratio = 0

        return {
            "Annual Return": annual_return,
            "Annual Volatility": annual_volatility,
            "Sharpe Ratio": sharpe_ratio,
            "Sortino Ratio": sortino_ratio,
        }

__init__(config)

:param config: The configuration for the trade operator

Source code in backtest\finsaber_bt.py
def __init__(
        self,
        config: dict,
):
    """
    :param config: The configuration for the trade operator
    """
    self.trade_config = TradeConfig.from_dict(config)
    self._price_data_cache = {}

run_iterative_tickers(strategy, process=None, test_config=None, **kwargs)

Execute the strategy :param strategy: The strategy to execute :param process: The function to process the data :param test_config: The configuration if different from the global configuration :param kwargs: Additional arguments for the strategy

Source code in backtest\finsaber_bt.py
def run_iterative_tickers(self, strategy: bt.Strategy, process: callable = None, test_config: dict = None, **kwargs):
    """
    Execute the strategy
    :param strategy: The strategy to execute
    :param process: The function to process the data
    :param test_config: The configuration if different from the global configuration
    :param kwargs: Additional arguments for the strategy
    """

    if test_config is None:
        test_config = self.trade_config
    else:
        test_config = TradeConfig.from_dict(test_config)

    eval_metrics = {}

    tickers_loop = (
        test_config.data_loader.get_tickers_list()
        if test_config.tickers == "all" and test_config.data_loader is not None
        else test_config.tickers
    )
    if test_config.tickers == "all" and test_config.data_loader is None:
        tickers_loop = get_tickers_price("all", return_original=True)["symbol"].unique()

    for ticker in tickers_loop:

        # print(f"Processing ticker {ticker}...")

        cerebro = bt.Cerebro()

        pd_data = self._get_ticker_price_data(
            ticker=ticker,
            date_from=test_config.date_from,
            date_to=test_config.date_to,
            test_config=test_config,
        )
        train_data = None
        strategy_kwargs = dict(kwargs)

        for additional_arg in strategy_kwargs:
            # if it is callable, call it
            if callable(strategy_kwargs[additional_arg]):
                strategy_kwargs[additional_arg] = strategy_kwargs[additional_arg](pd_data)

        if "prior_period" in vars(strategy.params).keys():
            if test_config.training_years is not None:
                strategy.params.prior_period = test_config.training_years * 252

            if not strategy.params.prior_period % 252 == 0:
                raise ValueError("prior_period must be a multiple of 252")

            prior_year = strategy.params.prior_period // 252
            prior_data = self._get_ticker_price_data(
                ticker,
                date_from=(pd.to_datetime(test_config.date_from) - pd.DateOffset(years=prior_year)).strftime("%Y-%m-%d"),
                date_to=(pd.to_datetime(test_config.date_from) - pd.DateOffset(days=1)).strftime("%Y-%m-%d"),
                test_config=test_config,
            )

            if prior_data is not None:
                if prior_data.index.min().year > pd.to_datetime(test_config.date_from).year - prior_year:
                    if not test_config.silence:
                        print(f"Prior data for {ticker} is not enough at year {pd.to_datetime(test_config.date_from).year}")
                    continue
            else:
                if not test_config.silence:
                    print(f"No prior data for {ticker} at year {pd.to_datetime(test_config.date_from).year}")
                continue

        # if the model needs to be trained, set the training data that are not used for backtesting
        if "train_period" in vars(strategy.params).keys():
            if test_config.training_years is not None:
                strategy.params.train_period = test_config.training_years * 252

            if not strategy.params.train_period % 252 == 0:
                raise ValueError("train_period must be a multiple of 252")

            train_year = strategy.params.train_period // 252
            train_data = self._get_ticker_price_data(
                ticker,
                date_from=(pd.to_datetime(test_config.date_from) - pd.DateOffset(years=train_year)).strftime("%Y-%m-%d"),
                date_to=(pd.to_datetime(test_config.date_from) - pd.DateOffset(days=1)).strftime("%Y-%m-%d"),
                test_config=test_config,
            )

            strategy_kwargs["train_data"] = train_data

        # Explicit backtests should not require January-start windows; only require enough bars.
        min_bars = 2
        if pd_data is None or pd_data.empty or len(pd_data) < min_bars:
            if not test_config.silence:
                print(f"No usable data in the period {test_config.date_from} to {test_config.date_to} for {ticker}")
            continue

        # skip if no enough data for training
        if train_data is not None:
            if train_data.index.min().year > pd.to_datetime(test_config.date_from).year - train_year:
                if not test_config.silence:
                    print(f"Train data for {ticker} is not enough at year {pd.to_datetime(test_config.date_from).year}")
                continue

        # detect if the stock is delisted in the middle of the period, if it is, assign 0 price to the missing dates
        # This indicates a complete loss of the stock
        end_date = pd.to_datetime(test_config.date_to) - pd.DateOffset(days=1)
        all_expected_trading_days = pd.bdate_range(start=pd_data.index.min(), end=end_date)
        last_expected_date = all_expected_trading_days[-1]
        last_data_date = pd_data.index.max()

        if last_data_date < last_expected_date - pd.DateOffset(days=3):
            # If the last data date is more than 3 days before the last expected date (avoid weekend or holidays), we assume the stock is delisted
            if not test_config.silence:
                print(
                    f"{ticker} appears to be delisted on {last_data_date.strftime('%Y-%m-%d')}, applying 7 days delisting announcement period.")

            # remove the last 7 days of data
            pd_data = pd_data[pd_data.index <= last_data_date - pd.DateOffset(days=7)]

        # check again after potential delisting truncation
        if pd_data is None or pd_data.empty or len(pd_data) < min_bars:
            if not test_config.silence:
                print(f"Not enough data in the period {test_config.date_from} to {test_config.date_to} for {ticker}")
            continue

        add_tickers_data(cerebro, pd_data)

        # Add a strategy
        cerebro.addstrategy(strategy, total_days=len(set(pd_data.index.tolist())), **strategy_kwargs)

        # Set our desired cash start
        cerebro.broker.setcash(test_config.cash)
        commission_scheme = USStockCommission(
            commission_per_share=test_config.commission_per_share,
            min_commission=test_config.min_commission,
            max_commission_rate=test_config.max_commission_rate,
        )
        cerebro.broker.addcommissioninfo(commission_scheme)
        cerebro.broker.set_shortcash(False)
        if test_config.execution_timing == "same_close":
            cerebro.broker.set_coc(True)
        if test_config.slippage_perc > 0:
            cerebro.broker.set_slippage_perc(
                test_config.slippage_perc,
                slip_open=True,
                slip_limit=True,
                slip_match=True,
                slip_out=False,
            )
        if test_config.liquidity_cap_pct > 0:
            cerebro.broker.set_filler(
                MovingAverageVolumePercFiller(
                    cap_pct=test_config.liquidity_cap_pct,
                    lookback_days=test_config.liquidity_lookback_days,
                    min_history_days=test_config.liquidity_min_history_days,
                )
            )

        # Add observers
        cerebro.addobserver(bt.observers.Value)

        # Add analyzers for Sharpe Ratio and Drawdown
        cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='mysharpe', riskfreerate=test_config.risk_free_rate, timeframe=bt.TimeFrame.Days, annualize=True)
        cerebro.addanalyzer(bt.analyzers.DrawDown, _name='mydrawdown')
        cerebro.addanalyzer(bt.analyzers.Returns, _name='myreturns')
        # cerebro.addanalyzer(bt.analyzers.AnnualReturn, _name='myannualreturn')
        # cerebro.addanalyzer(bt.analyzers.VWR, _name='myvwr')  # Annualized volatility

        # Run over everything
        results = cerebro.run()
        strat = results[0]

        if not test_config.silence:
            # Print out the final result
            eval_metrics[ticker] = self._analyze_results(
                strat,
                test_config=test_config,
                ticker=ticker,
                print_trades_table=test_config.print_trades_table,
            )
        else:
            eval_metrics[ticker] = self._analyze_results(
                strat,
                test_config=test_config,
                ticker=ticker,
                print_trades_table=False,
                print_annual_metrics=False,
                print_details=test_config.print_trades_table
            )

        # Obtain the equity curve
        equity_with_time = pd.DataFrame(
            {
                "datetime": strat.equity_date,
                "equity": strat.equity
            }
        )

        eval_metrics[ticker]["equity_with_time"] = equity_with_time

        if not test_config.silence:
            # Plot the result
            plt.figure(figsize=(10, 6))
            plt.plot(equity_with_time["datetime"], equity_with_time["equity"], label="Equity Curve")
            plt.title(f"Equity Curve for {ticker}")
            plt.xlabel("Date")
            plt.ylabel("Equity")
            plt.legend()
            plt.show()

    if "cherry_pick" in test_config.setup_name and test_config.save_results:
        # store the results using pickle
        output_dir = os.path.join(test_config.log_base_dir, test_config.setup_name.replace(":", "_"), strategy.__name__)
        filename = f"{test_config.date_from}_{test_config.date_to}.pkl" if test_config.result_filename is None else test_config.result_filename
        os.makedirs(output_dir, exist_ok=True)
        with open(os.path.join(output_dir, filename), "wb") as f:
            output_results = {f"{test_config.date_from}_{test_config.date_to}": eval_metrics}
            pickle.dump(output_results, f)
        write_result_artifacts(output_dir, test_config.to_dict(), output_results)
    return eval_metrics

run_rolling_window(strategy, process=None, **kwargs)

Call run_iterative_tickers or execute_all for each rolling window :param strategy: The strategy to execute :param process: The function to process the data :param kwargs: Additional arguments for the strategy

Source code in backtest\finsaber_bt.py
def run_rolling_window(self, strategy: bt.Strategy, process: callable = None, **kwargs):
    """
    Call run_iterative_tickers or execute_all for each rolling window
    :param strategy: The strategy to execute
    :param process: The function to process the data
    :param kwargs: Additional arguments for the strategy
    """
    # divide the date into rolling windows
    rolling_window_size = self.trade_config.rolling_window_size # in years
    rolling_window_step = self.trade_config.rolling_window_step # in years


    # e.g. 2000-01-01 to 2005-01-01, rolling_window_size=2, rolling_window_step=1, then the rolling windows are:
    # 2000-01-01 to 2002-01-01, 2001-01-01 to 2003-01-01, 2002-01-01 to 2004-01-01, 2003-01-01 to 2005-01-01
    date_from = pd.to_datetime(self.trade_config.date_from)
    date_to = pd.to_datetime(self.trade_config.date_to)

    # check selection strategy
    if self.trade_config.setup_name in ["selected_4", "selected_5", "cherry_pick_both_finmem"]:
        stock_selector = FinMemSelector()
    else:
        # TODO: implement other selection strategies
        stock_selector = self.trade_config.selection_strategy

    rolling_windows = []
    while date_from + pd.DateOffset(years=rolling_window_size) <= date_to:
        rolling_windows.append((date_from, date_from + pd.DateOffset(years=rolling_window_size)))
        date_from += pd.DateOffset(years=rolling_window_step)

    eval_metrics = {}
    windows_loop = tqdm(rolling_windows, disable=self.trade_config.silence)

    for window in windows_loop:
        windows_loop.set_description(f"Processing window {window[0].strftime('%Y')} to {window[1].strftime('%Y')}")

        self.trade_config.tickers = stock_selector.select(
            self.trade_config.data_loader,
            window[0].strftime("%Y-%m-%d"),
            window[1].strftime("%Y-%m-%d")
        )
        if not self.trade_config.silence:
            print(f"Selected tickers for the period {window[0].strftime('%Y')} to {window[1].strftime('%Y')}: {self.trade_config.tickers}")

        test_config = self.trade_config.to_dict()
        test_config["date_from"] = window[0].strftime("%Y-%m-%d")
        test_config["date_to"] = window[1].strftime("%Y-%m-%d")

        eval_metrics[f"{window[0].strftime('%Y-%m-%d')}_{window[1].strftime('%Y-%m-%d')}"] \
            = self.run_iterative_tickers(strategy, process, test_config=test_config, **kwargs)

    # export the evaluation metrics
    if self.trade_config.save_results:
        output_dir = os.path.join(self.trade_config.log_base_dir, self.trade_config.setup_name.replace(":", "_"), strategy.__name__)
        filename = f"{self.trade_config.date_from}_{self.trade_config.date_to}.pkl" if self.trade_config.result_filename is None else self.trade_config.result_filename
        os.makedirs(output_dir, exist_ok=True)
        with open(os.path.join(output_dir, filename), "wb") as f:
            pickle.dump(eval_metrics, f)
        write_result_artifacts(output_dir, self.trade_config.to_dict(), eval_metrics)

    return eval_metrics

backtest.finsaber.FINSABER

Source code in backtest\finsaber.py
class FINSABER:
    def __init__(self, trade_config: dict):
        self.trade_config = TradeConfig.from_dict(trade_config)
        self.framework = FINSABERFrameworkHelper(
            initial_cash=self.trade_config.cash,
            risk_free_rate=self.trade_config.risk_free_rate,
            commission_per_share=self.trade_config.commission_per_share,
            min_commission=self.trade_config.min_commission,
            max_commission_rate=self.trade_config.max_commission_rate,
            execution_timing=self.trade_config.execution_timing,
            slippage_perc=self.trade_config.slippage_perc,
            slippage_impact=self.trade_config.slippage_impact,
            liquidity_lookback_days=self.trade_config.liquidity_lookback_days,
            liquidity_min_history_days=self.trade_config.liquidity_min_history_days,
            liquidity_cap_pct=self.trade_config.liquidity_cap_pct,
        )
        self.data_loader = self.trade_config.data_loader


    def run_rolling_window(self, strategy_class, rolling_window_size=None, rolling_window_step=None, strat_params=None):
        rolling_window_size = rolling_window_size or self.trade_config.rolling_window_size
        rolling_window_step = rolling_window_step or self.trade_config.rolling_window_step # in years

        date_from = pd.to_datetime(self.trade_config.date_from)
        date_to = pd.to_datetime(self.trade_config.date_to)
        total_years = (date_to.year - date_from.year) + 1

        rolling_windows = []

        # get the first year
        start_year = date_from.year
        # get rolling windows
        for i in range(0, total_years - rolling_window_size, rolling_window_step):
            # get yyyy-mm-dd
            start_date = f"{start_year + i}-01-01"
            end_date = f"{start_year + i + rolling_window_size}-01-01"
            rolling_windows.append((start_date, end_date))

        if not self.trade_config.silence:
            print(f"Rolling windows: {rolling_windows}")

        if self.trade_config.setup_name in ["selected_4", "selected_5", "cherry_pick_both_finmem"]:
            stock_selector = FinMemSelector()
        else:
            # TODO: implement other selection strategies
            stock_selector = self.trade_config.selection_strategy


        eval_metrics = {}
        for window in tqdm(rolling_windows, disable=self.trade_config.silence):
            #
            # subset_data = {date: self.all_data[date] for date in window}
            strat_params["date_from"] = window[0]
            strat_params["date_to"] = window[-1]
            self.trade_config.tickers = stock_selector.select(self.trade_config.data_loader, window[0], window[1])
            if not self.trade_config.silence:
                print(f"Selected tickers for the period {window[0]} to {window[1]}: {self.trade_config.tickers}")

            self.trade_config.date_from = window[0]
            self.trade_config.date_to = window[-1]

            metrics = self.run_iterative_tickers(strategy_class, strat_params, tickers=self.trade_config.tickers, delist_check=True)

            # window_key = f"{window[0]}_{window[-1]}"
            eval_metrics.update(metrics)

        # Save results if required
        if self.trade_config.save_results:
            output_dir = os.path.join(
                self.trade_config.log_base_dir,
                self.trade_config.setup_name.replace(":", "_"),
                strategy_class.__name__,
            )
            filename = f"{date_from.date()}_{date_to.date()}.pkl" if self.trade_config.result_filename is None else self.trade_config.result_filename
            os.makedirs(output_dir, exist_ok=True)
            with open(os.path.join(output_dir, filename), "wb") as f:
                pickle.dump(eval_metrics, f)
            write_result_artifacts(output_dir, self.trade_config.to_dict(), eval_metrics)

        return eval_metrics

    def _print_results(self, metrics, ticker):
        max_drawdown = metrics.get("max_drawdown", 0)
        total_return = metrics.get("total_return", 0)
        annual_return = metrics.get("annual_return", 0)
        annual_volatility = metrics.get("annual_volatility", 0)
        sharpe_ratio = metrics.get("sharpe_ratio", 0)
        sortino_ratio = metrics.get("sortino_ratio", 0)
        total_commission = metrics.get("total_commission", 0)
        total_slippage = metrics.get("total_slippage", 0)
        total_llm_cost = metrics.get("total_llm_cost", 0)

        print("\n" + "=" * 50)
        print(f"Ticker: {ticker}")
        print(f"Total Return (%): {total_return:.3%}")
        print(f"Annual Return (%): {annual_return:.3%}")
        print(f"Max Drawdown (%): {-max_drawdown:.3f}%")
        print(f"Annual Volatility (%): {annual_volatility:.3%}")
        print(f"Sharpe Ratio: {sharpe_ratio:.3f}")
        print(f"Sortino Ratio: {sortino_ratio:.3f}")
        print(f"Total Commission: ${total_commission:.3f}")
        print(f"Total Slippage: ${total_slippage:.3f}")
        print(f"Total LLM Cost: ${total_llm_cost:.3f}")
        print("=" * 50)

    def _plot_equity_curve(self, equity_with_time, ticker):
        plt.figure(figsize=(10, 6))
        plt.plot(equity_with_time["datetime"], equity_with_time["equity"], label="Equity Curve")
        plt.title(f"Equity Curve for {ticker}")
        plt.xlabel("Date")
        plt.ylabel("Equity")
        plt.legend()
        plt.show()

    def run_iterative_tickers(self, strategy_class, strat_params=None, tickers=None, delist_check=False):
        reset_llm_cost()
        tickers = tickers or self.trade_config.tickers
        if isinstance(tickers, str) and tickers.lower() == "all":
            tickers = self.data_loader.get_tickers_list()

        eval_metrics = {}
        with Progress() as progress:  # Use Progress
            task = progress.add_task("Iterative Tickers Backtesting", total=len(tickers))
            for ticker in tickers:
                llm_cost_before = get_llm_cost()
                llm_ledger_start = len(get_llm_cost_ledger())
                progress.update(task,
                                description=f"Backtesting {ticker} on {self.trade_config.date_from} to {self.trade_config.date_to}")

                subset_data = self.data_loader.get_subset_by_time_range(self.trade_config.date_from, self.trade_config.date_to)

                # check if the ticker is in the data
                try:
                    first_day = subset_data.get_date_range()[0]
                except (AttributeError, IndexError):
                    if not self.trade_config.silence:
                        print(f"No data found for the period {self.trade_config.date_from} to {self.trade_config.date_to}. Skipping...")
                    continue
                if ticker not in subset_data.get_tickers_list():
                    if not self.trade_config.silence:
                        print(f"Ticker {ticker} not in the data for the period {self.trade_config.date_from} to {self.trade_config.date_to}. Skipping...")
                    continue

                self.framework.reset()
                success_or_not = self.framework.load_backtest_data_single_ticker(
                    subset_data,
                    ticker,
                    start_date=self.trade_config.date_from,
                    end_date=self.trade_config.date_to
                )

                if not success_or_not:
                    if not self.trade_config.silence:
                        print(f"Ticker {ticker} not in the data for the period {self.trade_config.date_from} to {self.trade_config.date_to}. Skipping...")
                    continue

                resolved_params = self.auto_resolve_params(
                    strat_params,
                    {
                        "date_from": self.trade_config.date_from,
                        "date_to": self.trade_config.date_to,
                        "symbol": ticker
                    }
                )

                # check if there's valid data for the testing and training period
                try:
                    strategy = strategy_class(**resolved_params)
                except InsufficientTrainingDataException as e:
                    if not self.trade_config.silence:
                        print(f"Insufficient training data for {ticker} in the period {self.trade_config.date_from} to {self.trade_config.date_to}. Skipping...")
                    continue

                # detect if it is because of the insufficient training data
                try:
                    strategy.train() if hasattr(strategy, "train") else None
                except InsufficientTrainingDataException as e:
                    if not self.trade_config.silence:
                        print(f"Insufficient training data for {ticker} in the period {self.trade_config.date_from} to {self.trade_config.date_to}. Skipping...")
                    continue

                status = self.framework.run(strategy, delist_check=delist_check)

                if not status:
                    if not self.trade_config.silence:
                        print(f"Skipping {ticker}...")
                    continue

                metrics = self.framework.evaluate(strategy)
                total_llm_cost = get_llm_cost() - llm_cost_before
                metrics["total_llm_cost"] = total_llm_cost
                metrics["llm_cost_records"] = pd.DataFrame(get_llm_cost_ledger()[llm_ledger_start:])
                if self.trade_config.llm_cost_as_trade_cost:
                    metrics["total_trading_cost"] = metrics.get("total_trading_cost", 0) + total_llm_cost
                equity_with_time = pd.DataFrame({
                    "datetime": strategy.equity_date,
                    "equity": strategy.equity
                })
                metrics["equity_with_time"] = equity_with_time
                metrics["trades"] = pd.DataFrame(self.framework.history)
                metrics["rejected_orders"] = pd.DataFrame(self.framework.rejected_orders)
                eval_metrics[ticker] = metrics

                if not self.trade_config.silence:
                    self._print_results(metrics, ticker)
                    self._plot_equity_curve(equity_with_time, ticker)


        if self.trade_config.save_results:
            eval_metrics = {f"{self.trade_config.date_from}_{self.trade_config.date_to}": eval_metrics}
            output_dir = os.path.join(
                self.trade_config.log_base_dir,
                self.trade_config.setup_name.replace(":", "_"),
                strategy_class.__name__,
            )
            filename = f"{self.trade_config.date_from}_{self.trade_config.date_to}.pkl" if self.trade_config.result_filename is None else self.trade_config.result_filename
            os.makedirs(output_dir, exist_ok=True)
            with open(os.path.join(output_dir, filename), "wb") as f:
                pickle.dump(eval_metrics, f)
            write_result_artifacts(output_dir, self.trade_config.to_dict(), eval_metrics)

            aggregate_results_one_strategy(self.trade_config.setup_name.replace(":", "_"), strategy_class.__name__)

        # print the estimated cost
        if not self.trade_config.silence:
            print(f"Finish backtesting period {self.trade_config.date_from} to {self.trade_config.date_to}. Estimated cost: ${get_llm_cost()}")
        return eval_metrics

    def auto_resolve_params(self, strat_params, trade_config):
        resolved_params = {}

        for key, value in strat_params.items():
            if isinstance(value, str) and value.startswith("$"):
                if value[1:] in trade_config:
                    resolved_params[key] = trade_config[value[1:]]
                else:
                    raise ValueError(f"Unsupported dynamic parameter: {key}")
            else:
                resolved_params[key] = value

        return resolved_params