Data Pipeline Executor#

Data Pipeline Executor Module

This module provides the core data pipeline functionality for the backtesting engine. It centralizes all data validation, loading, and preparation logic to ensure consistent data processing across different entry points (main function, factory methods, etc.).

The pipeline handles:
  • Portfolio configuration and loading

  • Market data ingestion and validation

  • Benchmark construction

  • Data integrity checking

  • Field mapping and standardization

All data processing follows a strict validation sequence to ensure data quality before any backtesting operations begin.

class DataPipelineResult(portfolio_entity: PortfolioEntity, portfolio_df: DataFrame, start_date_backtest: Timestamp, market_data_bundle: MarketDataBundlePyarrow, data_dict: dict[str, Table], benchmark: Table, portfolio_dict: dict[Timestamp, dict[str, Scalar]], equivalence_dict: dict[str, str])#

Bases: object

Container for data pipeline execution results.

This class encapsulates all outputs from the data pipeline execution, providing a clean interface for accessing validated and processed data. Using a dedicated result class improves type safety and makes the pipeline output contract explicit.

Variables:
  • portfolio_entity – Entity object representing the portfolio configuration

  • portfolio_df – DataFrame containing portfolio composition and weights

  • start_date_backtest – Calculated start date for backtest execution

  • market_data_bundle – Bundle of PyArrow tables with market data

  • data_dict – Dictionary mapping price field names to PyArrow tables

  • benchmark – PyArrow table containing benchmark time series data

  • portfolio_dict – Dictionary mapping dates to portfolio weight dictionaries

  • equivalence_dict – Dictionary mapping standard field names to actual column names

get_portfolio_entity() PortfolioEntity#

Return the parsed portfolio entity.

The entity is the typed, validated representation of the portfolio definition (file or dict). It carries configuration metadata (name, source path) and the raw weight schedule before any post-processing such as date filtering or zero-weight pruning.

Returns:

Portfolio entity with configuration metadata. Same instance stored on the result; do not mutate.

Return type:

PortfolioEntity

See also

get_portfolio_df

Tabular view of the same data.

get_portfolio_dict

Date-indexed weights derived from the entity.

get_portfolio_df() DataFrame#

Return the portfolio composition as a DataFrame.

The DataFrame holds the post-processed portfolio (after date-range filtering and zero-weight pruning). It is the version that downstream backtesters consume directly.

Returns:

DataFrame with rebalance dates as the index and ticker columns; values are the target weights (decimals).

Return type:

pandas.DataFrame

See also

get_portfolio_dict

Same data as a nested dict keyed by date.

get_portfolio_entity

Raw entity prior to processing.

get_start_date_backtest() Timestamp#

Return the resolved start date of the backtest period.

This is the effective start date after applying "auto" resolution against the portfolio rebalance dates. When the user-provided start date is explicit, this returns that value verbatim; when it was "auto", this returns the earliest rebalance date.

Returns:

First date in the resolved backtest window.

Return type:

pandas.Timestamp

See also

get_portfolio_dict

Rebalance schedule used to resolve "auto".

get_market_data_bundle() MarketDataBundlePyarrow#

Return the bundle of price tables consumed by the backtester.

The bundle groups the columnar price tables (open, close, VWAP, adjusted close, etc.) loaded from the configured input handlers. It is passed directly to PyArrowBacktester and guarantees that all tables share the same date index and ticker set.

Returns:

Container with VWAP and adjusted-close price tables. Same instance stored on the result; do not mutate.

Return type:

MarketDataBundlePyarrow

See also

get_data_dict

Raw price tables before bundling.

get_data_dict() dict[str, Table]#

Return the raw dictionary of price tables keyed by field name.

Each value is a PyArrow Table with one column per ticker and a date column. This is the unbundled view of the same data exposed via get_market_data_bundle; useful when callers need direct access to a specific price field (e.g. for custom metric computation).

Returns:

Mapping from standardized field name (e.g. "close", "vwap") to its PyArrow table.

Return type:

dict[str, pyarrow.Table]

See also

get_market_data_bundle

Higher-level bundle view of the same tables.

get_benchmark() Table#

Return the benchmark price table.

The benchmark is loaded only when Configuration.benchmark is configured. If no benchmark was requested, this returns the empty/sentinel benchmark table produced by the pipeline rather than None — callers should not assume an unconditional non-empty table.

Returns:

Benchmark time-series. May be empty if no benchmark was configured.

Return type:

pyarrow.Table

See also

PyArrowBacktester.run_with_benchmark

Primary consumer.

get_portfolio_dict() dict[Timestamp, dict[str, Scalar]]#

Return the rebalance schedule as a nested dict.

Each key is a rebalance date and each value is a mapping {ticker: weight} for that date. This is the structure consumed directly by PyArrowBacktester via the portfolio_dict parameter.

Returns:

Date-keyed mapping of ticker weights. Weights are PyArrow scalars to preserve precision through the pipeline.

Return type:

dict[pandas.Timestamp, dict[str, pyarrow.Scalar]]

See also

get_portfolio_df

Tabular view of the same data.

get_equivalence_dict() dict[str, str]#

Return the mapping from standard field names to source columns.

The pipeline normalizes heterogeneous input CSV / Parquet column names to the project’s StandardField vocabulary. This dictionary preserves the original mapping so downstream code (e.g. error messages, debug logs) can refer back to the actual source column name.

Returns:

Mapping from standard field name (e.g. "adjusted_close") to the column name found in the source data.

Return type:

dict[str, str]

See also

get_market_data_bundle

Data normalized via this mapping.

execute_data_pipeline(configuration: Configuration, input_handlers: list[InputHandlerInterface], portfolio_handlers: list[PortfolioInputHandlerInterface]) DataPipelineResult#

Execute the complete data pipeline with validation and preparation.

This function orchestrates the entire data loading and validation process required before backtesting can begin. It ensures all data is properly formatted, validated, and ready for consumption by the backtesting engine.

The pipeline execution follows these steps:
  1. Load and process portfolio configuration

  2. Extract tickers from portfolio weights

  3. Validate benchmark availability

  4. Load and process market data for all tickers

  5. Build benchmark time series

  6. Perform data integrity checks

  7. Bundle market data into standardized structure

Parameters:
  • configuration – Configuration object containing all backtest parameters including dates, file paths, execution settings, and commission rates

  • input_handlers – List of handler objects for reading market data from various sources (CSV, Parquet, Excel, etc.)

  • portfolio_handlers – List of handler objects for reading portfolio configuration and weight data

Returns:

Container object with all validated and processed

data ready for backtesting, including portfolio configuration, market data, and benchmark information

Return type:

DataPipelineResult

Raises:
  • MissingData – If data integrity validation fails, indicating missing dates or prices for required tickers

  • MissingBenchmark – If the benchmark file or data cannot be located in the provided input handlers

  • ConfigurationError – If the configuration object contains invalid parameters or incompatible settings

  • ReadingExcelPortfolioError – If portfolio Excel file cannot be read or parsed correctly

  • ReadingCSVError – If market data CSV files cannot be read

Side Effects:

Logs progress information including:

  • Data loading completion times

  • Number of tickers being processed

  • Validation results

  • Performance metrics

Note

This function performs comprehensive validation and will raise exceptions rather than proceed with invalid data. All exceptions should be caught and handled by the caller.

Example

>>> config = Configuration(...)
>>> handlers = [CSVInputHandler(), ParquetInputHandler()]
>>> portfolio_handlers = [ExcelPortfolioHandler()]
>>> result = execute_data_pipeline(config, handlers, portfolio_handlers)
>>> backtester = PyArrowBacktester.create_from_configuration(...)

See also

Quick Start Guide

End-to-end walkthrough for setting up and running a backtest from scratch.

Running from Python

Run a backtest from a Python script, including this pipeline as part of the flow.

Data Formats

File formats expected by the input and portfolio handlers used here.