Data Pipeline Executor#
Data Pipeline Executor Module
This module provides the core data pipeline functionality for the backtesting engine. It centralizes all data validation, loading, and preparation logic to ensure consistent data processing across different entry points (main function, factory methods, etc.).
- The pipeline handles:
Portfolio configuration and loading
Market data ingestion and validation
Benchmark construction
Data integrity checking
Field mapping and standardization
All data processing follows a strict validation sequence to ensure data quality before any backtesting operations begin.
- class DataPipelineResult(portfolio_entity: PortfolioEntity, portfolio_df: DataFrame, start_date_backtest: Timestamp, market_data_bundle: MarketDataBundlePyarrow, data_dict: dict[str, Table], benchmark: Table, portfolio_dict: dict[Timestamp, dict[str, Scalar]], equivalence_dict: dict[str, str])#
Bases:
objectContainer for data pipeline execution results.
This class encapsulates all outputs from the data pipeline execution, providing a clean interface for accessing validated and processed data. Using a dedicated result class improves type safety and makes the pipeline output contract explicit.
- Variables:
portfolio_entity – Entity object representing the portfolio configuration
portfolio_df – DataFrame containing portfolio composition and weights
start_date_backtest – Calculated start date for backtest execution
market_data_bundle – Bundle of PyArrow tables with market data
data_dict – Dictionary mapping price field names to PyArrow tables
benchmark – PyArrow table containing benchmark time series data
portfolio_dict – Dictionary mapping dates to portfolio weight dictionaries
equivalence_dict – Dictionary mapping standard field names to actual column names
- get_portfolio_entity() PortfolioEntity#
Return the parsed portfolio entity.
The entity is the typed, validated representation of the portfolio definition (file or dict). It carries configuration metadata (name, source path) and the raw weight schedule before any post-processing such as date filtering or zero-weight pruning.
- Returns:
Portfolio entity with configuration metadata. Same instance stored on the result; do not mutate.
- Return type:
PortfolioEntity
See also
get_portfolio_dfTabular view of the same data.
get_portfolio_dictDate-indexed weights derived from the entity.
- get_portfolio_df() DataFrame#
Return the portfolio composition as a DataFrame.
The DataFrame holds the post-processed portfolio (after date-range filtering and zero-weight pruning). It is the version that downstream backtesters consume directly.
- Returns:
DataFrame with rebalance dates as the index and ticker columns; values are the target weights (decimals).
- Return type:
pandas.DataFrame
See also
get_portfolio_dictSame data as a nested dict keyed by date.
get_portfolio_entityRaw entity prior to processing.
- get_start_date_backtest() Timestamp#
Return the resolved start date of the backtest period.
This is the effective start date after applying
"auto"resolution against the portfolio rebalance dates. When the user-provided start date is explicit, this returns that value verbatim; when it was"auto", this returns the earliest rebalance date.- Returns:
First date in the resolved backtest window.
- Return type:
pandas.Timestamp
See also
get_portfolio_dictRebalance schedule used to resolve
"auto".
- get_market_data_bundle() MarketDataBundlePyarrow#
Return the bundle of price tables consumed by the backtester.
The bundle groups the columnar price tables (open, close, VWAP, adjusted close, etc.) loaded from the configured input handlers. It is passed directly to
PyArrowBacktesterand guarantees that all tables share the same date index and ticker set.- Returns:
Container with VWAP and adjusted-close price tables. Same instance stored on the result; do not mutate.
- Return type:
MarketDataBundlePyarrow
See also
get_data_dictRaw price tables before bundling.
- get_data_dict() dict[str, Table]#
Return the raw dictionary of price tables keyed by field name.
Each value is a PyArrow
Tablewith one column per ticker and a date column. This is the unbundled view of the same data exposed viaget_market_data_bundle; useful when callers need direct access to a specific price field (e.g. for custom metric computation).- Returns:
Mapping from standardized field name (e.g.
"close","vwap") to its PyArrow table.- Return type:
dict[str, pyarrow.Table]
See also
get_market_data_bundleHigher-level bundle view of the same tables.
- get_benchmark() Table#
Return the benchmark price table.
The benchmark is loaded only when
Configuration.benchmarkis configured. If no benchmark was requested, this returns the empty/sentinel benchmark table produced by the pipeline rather thanNone— callers should not assume an unconditional non-empty table.- Returns:
Benchmark time-series. May be empty if no benchmark was configured.
- Return type:
pyarrow.Table
See also
PyArrowBacktester.run_with_benchmarkPrimary consumer.
- get_portfolio_dict() dict[Timestamp, dict[str, Scalar]]#
Return the rebalance schedule as a nested dict.
Each key is a rebalance date and each value is a mapping
{ticker: weight}for that date. This is the structure consumed directly byPyArrowBacktestervia theportfolio_dictparameter.- Returns:
Date-keyed mapping of ticker weights. Weights are PyArrow scalars to preserve precision through the pipeline.
- Return type:
dict[pandas.Timestamp, dict[str, pyarrow.Scalar]]
See also
get_portfolio_dfTabular view of the same data.
- get_equivalence_dict() dict[str, str]#
Return the mapping from standard field names to source columns.
The pipeline normalizes heterogeneous input CSV / Parquet column names to the project’s
StandardFieldvocabulary. This dictionary preserves the original mapping so downstream code (e.g. error messages, debug logs) can refer back to the actual source column name.- Returns:
Mapping from standard field name (e.g.
"adjusted_close") to the column name found in the source data.- Return type:
dict[str, str]
See also
get_market_data_bundleData normalized via this mapping.
- execute_data_pipeline(configuration: Configuration, input_handlers: list[InputHandlerInterface], portfolio_handlers: list[PortfolioInputHandlerInterface]) DataPipelineResult#
Execute the complete data pipeline with validation and preparation.
This function orchestrates the entire data loading and validation process required before backtesting can begin. It ensures all data is properly formatted, validated, and ready for consumption by the backtesting engine.
- The pipeline execution follows these steps:
Load and process portfolio configuration
Extract tickers from portfolio weights
Validate benchmark availability
Load and process market data for all tickers
Build benchmark time series
Perform data integrity checks
Bundle market data into standardized structure
- Parameters:
configuration – Configuration object containing all backtest parameters including dates, file paths, execution settings, and commission rates
input_handlers – List of handler objects for reading market data from various sources (CSV, Parquet, Excel, etc.)
portfolio_handlers – List of handler objects for reading portfolio configuration and weight data
- Returns:
- Container object with all validated and processed
data ready for backtesting, including portfolio configuration, market data, and benchmark information
- Return type:
- Raises:
MissingData – If data integrity validation fails, indicating missing dates or prices for required tickers
MissingBenchmark – If the benchmark file or data cannot be located in the provided input handlers
ConfigurationError – If the configuration object contains invalid parameters or incompatible settings
ReadingExcelPortfolioError – If portfolio Excel file cannot be read or parsed correctly
ReadingCSVError – If market data CSV files cannot be read
- Side Effects:
Logs progress information including:
Data loading completion times
Number of tickers being processed
Validation results
Performance metrics
Note
This function performs comprehensive validation and will raise exceptions rather than proceed with invalid data. All exceptions should be caught and handled by the caller.
Example
>>> config = Configuration(...) >>> handlers = [CSVInputHandler(), ParquetInputHandler()] >>> portfolio_handlers = [ExcelPortfolioHandler()] >>> result = execute_data_pipeline(config, handlers, portfolio_handlers) >>> backtester = PyArrowBacktester.create_from_configuration(...)
See also
- Quick Start Guide
End-to-end walkthrough for setting up and running a backtest from scratch.
- Running from Python
Run a backtest from a Python script, including this pipeline as part of the flow.
- Data Formats
File formats expected by the input and portfolio handlers used here.