Orchestrators#
High-level runners that coordinate multiple backtests, sweep parameter variations and aggregate results across portfolios.
Backtest Variations#
BacktestVariationRunner for running multiple backtest variations efficiently.
This module provides the BacktestVariationRunner class which enables running multiple backtests with configuration variations while reusing pre-loaded data from a single pipeline execution.
Key features: - Pre-loaded data reuse (pipeline executes once) - Named configuration variations - Date range helpers for walk-forward analysis - Sequential and parallel execution modes - Progress tracking via logging
- class BacktestVariationRunner(*, base_config: Configuration, pipeline_result: DataPipelineResult, commission_model: CommissionModelInterface | None = None, slippage_model: SlippageModelInterface | None = None)#
Bases:
objectSuite for running multiple backtest variations with shared data.
BacktestVariationRunner optimizes multi-backtest scenarios by loading data once through the pipeline and running multiple backtest variations on the same pre-loaded data. This is significantly faster than running independent backtests that each load data separately.
Key Features#
Data reuse: Pipeline executes once, backtests share loaded data
Named variations: Each variation has a descriptive name for identification
Date range helpers: Built-in support for rolling windows and walk-forward
Parallel execution: Optional concurrent execution via ProcessPoolExecutor (not yet)
Progress tracking: Detailed logging of execution progress
- ivar base_config:
The base configuration used as template for all variations.
- vartype base_config:
Configuration
- ivar pipeline_result:
Pre-loaded data from pipeline execution.
- vartype pipeline_result:
DataPipelineResult
- ivar variations:
List of configuration variations to execute.
- vartype variations:
list[ConfigVariation]
Examples
Basic usage with commission variations:
>>> suite = BacktestVariationRunner( ... base_config=configuration, ... pipeline_result=pipeline_result, ... ) >>> suite.add_variation("low_commission", commission_cents=0.01) >>> suite.add_variation("high_commission", commission_cents=0.05) >>> results = suite.run_all() >>> summary = results.get_summary_dataframe()
With date range variations (rolling windows):
>>> suite.add_rolling_window_variations( ... start_date=datetime.date(2020, 1, 1), ... end_date=datetime.date(2023, 12, 31), ... window_months=12, ... step_months=3, ... )
Parallel execution for faster results:
>>> results = suite.run_parallel(max_workers=4)
Notes
All variations share the same pre-loaded market data and portfolio data
Date range variations are limited to the data available in pipeline_result
Parallel execution uses ProcessPoolExecutor with pickling limitations
- add_commission_grid(commission_values: list[float], *, name_prefix: str = 'comm') BacktestVariationRunner#
Add one variation per commission value in a sweep.
Convenience helper for sensitivity analysis on commission cost. Generates a variation per entry in
commission_valueswith a derived name (<prefix>_0_0100for0.01), leaving every other configuration field identical to the base config.- Parameters:
commission_values (list[float]) – Commission rates (per share, in the same units as
Configuration.commission_cents) to sweep. Order is preserved.name_prefix (str, default "comm") – Prefix used to build each variation name. The numeric commission is appended with dots replaced by underscores for filesystem safety.
- Returns:
self, to enable method chaining.- Return type:
Examples
Sweep four commission levels and run them:
>>> results = ( ... suite ... .add_commission_grid([0.0, 0.01, 0.02, 0.03]) ... .run_all() ... ) >>> [e.name for e in results['entries']] ['comm_0_0000', 'comm_0_0100', 'comm_0_0200', 'comm_0_0300']
Notes
This helper only varies
commission_cents. To sweep other parameters, calladd_variationin a loop or useadd_variationswith a custom list.See also
add_rolling_window_variationsSweep date ranges instead of commissions.
add_variationAdd a single variation with explicit overrides.
- add_rolling_window_variations(*, start_date: date, end_date: date, window_months: int = 12, step_months: int = 3, name_prefix: str = 'window') BacktestVariationRunner#
Add variations for rolling window backtests.
Creates multiple variations with overlapping date ranges, useful for walk-forward analysis and robustness testing.
- Parameters:
start_date (datetime.date) – First possible start date for windows.
end_date (datetime.date) – Last possible end date for windows.
window_months (int, default 12) – Length of each window in months.
step_months (int, default 3) – Step size between window starts in months.
name_prefix (str, default "window") – Prefix for variation names.
- Returns:
Self, for method chaining.
- Return type:
Examples
Create quarterly rolling 1-year windows:
>>> suite.add_rolling_window_variations( ... start_date=datetime.date(2020, 1, 1), ... end_date=datetime.date(2023, 12, 31), ... window_months=12, ... step_months=3, ... )
- add_variation(name: str, *, commission_cents: float | None = None, cash_reserve_percentage: float | None = None, start_date: date | Literal['auto'] | None = None, end_date: date | Literal['auto'] | None = None, initial_capital: int | float | None = None) BacktestVariationRunner#
Add a single configuration variation to the suite.
Each variation defines a delta from the base configuration. Only the fields explicitly passed are recorded as overrides; any unset override falls back to the base configuration value at execution time. The returned runner is the same instance, which lets callers chain multiple
add_*calls fluently.- Parameters:
name (str) – Unique descriptive name for this variation. Used as the key in the resulting
BacktestResultEntryand in the report output. Should be filesystem-safe.commission_cents (float | None, optional) – Override for commission rate per share. Default None (use the base config value).
cash_reserve_percentage (float | None, optional) – Override for cash reserve percentage (0.0 to 1.0). Default None.
start_date (DateConfig | None, optional) – Override for backtest start date. Default None.
end_date (DateConfig | None, optional) – Override for backtest end date. Default None.
initial_capital (int | float | None, optional) – Override for initial capital. Default None.
- Returns:
self, to enable method chaining.- Return type:
Examples
Single override:
>>> suite.add_variation("low_cost", commission_cents=0.01)
Fluent chaining of several variations:
>>> ( ... suite ... .add_variation("low_cost", commission_cents=0.01) ... .add_variation("high_capital", initial_capital=1_000_000) ... .add_variation("zero_reserve", cash_reserve_percentage=0.0) ... )
Notes
Variation names should be unique within a suite. Duplicates are not validated here, but downstream report generation may overwrite or skip duplicates.
See also
add_variationsAdd multiple pre-built variation dicts at once.
clear_variationsRemove every registered variation.
- add_variations(variations: list[ConfigVariation]) BacktestVariationRunner#
Add multiple variations at once.
- Parameters:
variations (list[ConfigVariation]) – List of variation dictionaries to add.
- Returns:
Self, for method chaining.
- Return type:
- property base_config: Configuration#
Get the base configuration.
- clear_variations() BacktestVariationRunner#
Remove every registered variation, keeping the base context.
Useful when reusing the same runner instance for multiple independent suites: after a
run_allcall, callclear_variationsand then register a new set of variations without rebuilding the base configuration and pipeline result.- Returns:
self, to enable method chaining.- Return type:
Examples
Reuse the runner for two independent sweeps:
>>> suite.add_commission_grid([0.0, 0.01]).run_all() >>> suite.clear_variations().add_commission_grid([0.02, 0.03]).run_all()
Notes
Only the variation list is cleared. The base config, pipeline result and default cost models are preserved.
See also
add_variationAdd a single variation after clearing.
- static compute_portfolio_stats(result: dict, config: Configuration) dict#
Compute portfolio statistics and add to result dictionary.
This mirrors the computation done in backtest_engine._generate_metrics(), ensuring BacktestVariationRunner results include the same metrics.
- Parameters:
result (dict) – Raw backtest result dictionary from PyArrowBacktester.run().
config (Configuration) – Configuration used for this backtest.
- Returns:
Result dictionary with ‘portfolio_stats’ added.
- Return type:
dict
- property pipeline_result: DataPipelineResult#
Get the pre-loaded pipeline result.
- run_all(*, progress_callback: Callable[[int, int, str], None] | None = None) SuiteResults#
Execute all variations sequentially.
- Parameters:
progress_callback (Callable[[int, int, str], None] | None, optional) – Optional callback function called after each backtest. Arguments are (completed_count, total_count, variation_name).
- Returns:
Container with all backtest results.
- Return type:
SuiteResults
- Raises:
ValueError – If no variations have been added.
Examples
>>> results = suite.run_all() >>> print(f"Completed {len(results)} backtests")
With progress tracking:
>>> def on_progress(done, total, name): ... print(f"[{done}/{total}] Completed: {name}") >>> results = suite.run_all(progress_callback=on_progress)
- run_parallel(*, max_workers: int | None = None, progress_callback: Callable[[int, int, str], None] | None = None) SuiteResults#
Run every variation, falling back to sequential when needed.
This method is the API entry point for parallel execution and accepts a
max_workersargument so the calling code does not need to special-case parallel vs sequential. Internally, when the loaded data (PyArrow tables) cannot be pickled safely across processes, the method falls back to a serial loop identical torun_alland logs a warning. Callers should therefore treat the parallelism as a best-effort optimisation rather than a contract.- Parameters:
max_workers (int | None, optional) – Upper bound on the number of worker processes. When None (default), uses
min(4, os.cpu_count()). Only used when true parallel execution is possible; otherwise ignored.progress_callback (Callable[[int, int, str], None] | None, optional) – Called after each variation finishes. Receives
(completed_count, total_count, variation_name). Default is None (no callback).
- Returns:
Mapping with
entries(list ofBacktestResultEntry) andtotal_execution_time(wall-clock seconds).- Return type:
SuiteResults
- Raises:
ConfigurationError – If no variations have been added to the suite.
Examples
Run every registered variation with a progress callback:
>>> def on_progress(done, total, name): ... print(f"[{done}/{total}] {name}") >>> results = suite.run_parallel(progress_callback=on_progress)
Notes
At the time of writing, the underlying PyArrow tables cannot be pickled across process boundaries reliably, so this method executes sequentially in-process and emits a warning. The signature is preserved so that switching to true parallelism in the future is a transparent change for callers.
See also
run_allExplicit sequential execution.
- property variations: list[ConfigVariation]#
Get the list of registered variations.
Multi-Portfolio Runner#
MultiPortfolioBacktester — orchestrator for multi-portfolio batch backtesting.
Coordinates the execution of multiple portfolio backtests from a single BatchConfig, reusing shared market data across portfolios for efficiency.
- Workflow:
Validate all configurations upfront
Collect unique tickers across all portfolios
Load shared market data once via SharedDataPool
For each portfolio: a. Load portfolio weights (file, template, or inline) b. Build DataPipelineResult from shared pool c. If variations → delegate to BacktestVariationRunner d. If no variations → run single PyArrowBacktester e. Collect BacktestResultEntry
Aggregate results into SuiteResults
Optionally generate comparative Excel report
- Usage:
>>> from kaxanuk.backtest_engine.backtest.orchestrators.multi_portfolio_runner import MultiPortfolioBacktester >>> from kaxanuk.backtest_engine.config_handlers.multi_backtest_parser import BatchConfigParser >>> >>> batch_config = BatchConfigParser.parse_file("batch_config.yaml") >>> runner = MultiPortfolioBacktester(input_handlers=[...], portfolio_handlers=[...]) >>> results = runner.run(batch_config) >>> report_path = runner.generate_report(results, batch_config)
- class MultiPortfolioBacktester(input_handlers: list[InputHandlerInterface], portfolio_handlers: list[PortfolioInputHandlerInterface])#
Bases:
objectOrchestrator for running multiple portfolio backtests in a batch.
Manages the lifecycle of a batch backtest execution: configuration validation, shared data loading, per-portfolio execution with error isolation, and result aggregation.
- Parameters:
input_handlers (list[InputHandlerInterface]) – Market data input handlers (CSV, Parquet, etc.).
portfolio_handlers (list[PortfolioInputHandlerInterface]) – Portfolio file input handlers (CSV, Excel, etc.).
Examples
>>> runner = MultiPortfolioBacktester( ... input_handlers=[CsvInput(input_dir="./Input/Data")], ... portfolio_handlers=[CsvPortfolioInputHandler(base_dir="./Input/Portfolios")], ... ) >>> results = runner.run(batch_config) >>> summary = results.get_summary_dataframe()
- classmethod from_yaml(yaml_path: str | Path, input_handlers: list[InputHandlerInterface], portfolio_handlers: list[PortfolioInputHandlerInterface]) MultiPortfolioBacktester#
Build a MultiPortfolioBacktester from a YAML configuration path.
Currently this factory simply forwards the handlers to the regular constructor; the YAML path is accepted to match the expected call site shape and to allow future logic (e.g. early schema validation) to live behind a single entry point.
The parsed
BatchConfigis not stored on the returned instance — the caller is expected to parse the YAML separately and pass the resultingBatchConfigtorun().- Parameters:
yaml_path (str | pathlib.Path) – Path to the batch configuration YAML file. Currently informational; not loaded by this factory.
input_handlers (list[InputHandlerInterface]) – Market data input handlers. Same semantics as the regular constructor.
portfolio_handlers (list[PortfolioInputHandlerInterface]) – Portfolio input handlers. Same semantics as the regular constructor.
- Returns:
Configured runner instance ready to receive a parsed
BatchConfigviarun().- Return type:
Examples
>>> runner = MultiPortfolioBacktester.from_yaml( ... yaml_path="./Input/batch.yaml", ... input_handlers=[CsvInput(input_dir="./Input/Data")], ... portfolio_handlers=[CsvPortfolioInputHandler(base_dir="./Input/Portfolios")], ... ) >>> batch_config = BatchConfigParser.parse_yaml("./Input/batch.yaml") >>> results = runner.run(batch_config)
See also
BatchConfigParserParses YAML into the
BatchConfigconsumed byrun().runExecute the batch once a
BatchConfighas been parsed.
- static generate_individual_reports(results: SuiteResults, batch_config: BatchConfig) list[Path]#
Generate individual Excel reports for each successful backtest.
Each portfolio gets a detailed Excel workbook with metrics, drawdown, commissions, shares, daily weights, and orders — the same format as the standard single-backtest Excel report.
- Parameters:
results (SuiteResults) – Results from
run().batch_config (BatchConfig) – Batch configuration (used for output settings).
- Returns:
Paths to the generated Excel report files.
- Return type:
list[Path]
- static generate_report(results: SuiteResults, batch_config: BatchConfig) Path#
Generate a comparative Excel report from batch results.
Writes a single Excel workbook that summarizes every successful backtest in the batch, ranks them by the configured metric, and flags the failed entries. The output filename and directory are resolved from
batch_config.output(with a fallback to the global defaults and finally./Output).- Parameters:
results (SuiteResults) – The mapping returned by
run(). Both successful and failed entries are included; failed entries appear in a dedicated section of the report.batch_config (BatchConfig) – Batch configuration. Only the
outputandglobal_defaultsblocks are read here.
- Returns:
Absolute path to the generated Excel file.
- Return type:
pathlib.Path
Examples
>>> results = runner.run(batch_config) >>> report_path = MultiPortfolioBacktester.generate_report( ... results=results, ... batch_config=batch_config, ... ) >>> report_path.name 'batch_2024_q1.xlsx'
Notes
The output directory is resolved in this priority order:
batch_config.output.directoryif set.batch_config.global_defaults.backtest_results_output_directory."./Output"as last-resort fallback.
The actual workbook is built by
generate_comparative_excel_report; refer to that function for the exact sheet layout.See also
runProduces the
SuiteResultsconsumed here.
- run(batch_config: BatchConfig, *, progress_callback: Callable[[int, int, str], None] | None = None) SuiteResults#
Execute the full batch backtest workflow.
Walks through every portfolio definition in
batch_config, resolving its concreteConfigurationagainst the global defaults and templates, expanding any parametric variations, and running each resulting backtest against a shared in-memory market-data pool. Failures in a single portfolio do not abort the batch — they are captured as failedBacktestResultEntryentries so the rest of the suite can complete.- Parameters:
batch_config (BatchConfig) – Validated batch configuration produced by
BatchConfigParser. Carries the global defaults, the list of portfolio definitions, the parametric sweeps and the output settings.progress_callback (Callable[[int, int, str], None] | None, optional) – Called after each portfolio finishes (success or failure). Receives
(completed_count, total_portfolios, portfolio_name). Useful for driving a CLI progress bar or UI updates. Default is None (no callback).
- Returns:
Mapping with two keys:
entries(list ofBacktestResultEntry— one per portfolio-variation pair) andtotal_execution_time(wall-clock seconds).- Return type:
SuiteResults
- Raises:
ConfigurationError – If the reference configuration cannot be built (e.g. invalid defaults).
BatchExecutionError – For fatal errors that prevent the batch from starting (e.g. shared data pool failure).
Examples
Basic usage:
>>> results = runner.run(batch_config) >>> [e.name for e in results['entries'] if e.success] ['momentum_q1', 'value_q1']
With a progress callback:
>>> def on_progress(done, total, name): ... print(f"[{done}/{total}] finished {name}") >>> results = runner.run(batch_config, progress_callback=on_progress)
Notes
The market data for every distinct ticker across all portfolios is loaded exactly once into a
SharedDataPool, so adding more portfolios that share tickers is cheap. The per-portfolio loop runs serially; use a parallel runner (e.g.BacktestVariationRunner.run_parallel) if you need cross- portfolio concurrency.See also
from_yamlBuild a runner from a YAML config path.
generate_reportProduce a comparative Excel report from the
SuiteResultsreturned here.
Backtest Results Summary#
Service functions for SuiteResults processing.
Provides filtering, summarization, and conversion utilities for SuiteResults TypedDict instances.
- get_failed_entries(results: SuiteResults) list[BacktestResultEntry]#
Get only the failed backtest entries.
- Parameters:
results (SuiteResults) – Suite results dictionary.
- Returns:
List of entries where success=False.
- Return type:
list[BacktestResultEntry]
- get_results_dict(results: SuiteResults) dict[str, BacktestResultDict]#
Get a dictionary mapping variation names to their results.
- Parameters:
results (SuiteResults) – Suite results dictionary.
- Returns:
Dictionary with variation names as keys. Only includes successful backtests.
- Return type:
dict[str, BacktestResultDict]
- get_successful_entries(results: SuiteResults) list[BacktestResultEntry]#
Get only the successful backtest entries.
- Parameters:
results (SuiteResults) – Suite results dictionary.
- Returns:
List of entries where success=True.
- Return type:
list[BacktestResultEntry]
- get_summary_dataframe(results: SuiteResults) DataFrame#
Create a summary DataFrame with key metrics from all backtests.
- Parameters:
results (SuiteResults) – Suite results dictionary.
- Returns:
DataFrame with one row per backtest containing key metrics.
- Return type:
pd.DataFrame