Market Data Processing Service#
Market Data Processing Service
This module provides a comprehensive service for processing market data using PyArrow-based operations. The service orchestrates the complete pipeline for loading, processing, and transforming market data from various input sources.
- class MarketDataProcessingService(field_mapping: FieldMapping, field_mapping_dict: dict)#
Bases:
objectA comprehensive service for processing market data using PyArrow-based operations.
This service handles the complete pipeline for loading, processing, and transforming market data from various input sources. It utilizes PyArrow tables for efficient data manipulation and supports field mapping for flexible data structure handling.
The service provides functionality to: - Load market data from multiple input handlers - Process and validate data for multiple tickers - Apply field mappings and transformations - Calculate derived metrics (e.g., returns) - Handle errors and missing data gracefully
- Variables:
_field_mapping (FieldMapping) – The field mapping configuration object containing column name mappings and data structure definitions.
field_mapping_dict (dict) – A dictionary representation of field mappings used for data transformation and column identification.
Example
>>> field_mapping = FieldMapping(...) >>> field_mapping_dict = {"adjusted_close": "adj_close", "date": "timestamp"} >>> service = MarketDataProcessingService(field_mapping, field_mapping_dict) >>> data = service.load_and_process_market_data(handlers, tickers, config)
- add_calculated_metrics(data_dict: dict[str, Table]) dict[str, Table]#
Calculate and add derived metrics to the market data dictionary.
This method enhances the market data by computing additional metrics derived from the base data. Currently, it calculates returns from adjusted close prices using efficient PyArrow operations.
- Parameters:
data_dict (dict[str, pa.Table]) – Dictionary containing metric tables where keys are metric names (typically user’s column names) and values are PyArrow tables.
- Returns:
- The updated data dictionary with calculated metrics
added. If adjusted close data is available, a returns table will be added under CalculatedMetric.RETURNS.value.
- Return type:
dict[str, pa.Table]
Example
>>> data_dict = {'m_close_adjusted': price_table, 'm_vwap': vwap_table} >>> result = service.add_calculated_metrics(data_dict) >>> CalculatedMetric.RETURNS.value in result True
- extract_metric_tables(market_data_tables: dict[str, MarketDataTable]) dict[str, Table]#
Convert MarketDataTable entities to metric-wise PyArrow Tables.
This method reorganizes ticker-based data into metric-based tables. Each resulting table has dates as the first column and tickers as subsequent columns.
- Parameters:
market_data_tables (dict[str, MarketDataTable]) – Dictionary mapping ticker symbols to their MarketDataTable entities.
- Returns:
- Dictionary where keys are user’s column names
(e.g., “m_close_dividend_and_split_adjusted”) and values are PyArrow tables with date column + one column per ticker.
- Return type:
dict[str, pa.Table]
Example:
# Input {'AAPL': MarketDataTable(...), 'MSFT': MarketDataTable(...)} # Output { 'm_close_adjusted': Table with columns [date, AAPL, MSFT], 'm_vwap': Table with columns [date, AAPL, MSFT], ... }
- load_and_process_market_data(input_handlers: list[InputHandlerInterface], tickers: set[str], configuration: Configuration) dict[str, Table]#
Execute the complete market data processing pipeline for multiple tickers.
This method orchestrates the entire process of loading, transforming, and enriching market data. It handles multiple tickers simultaneously and applies field mappings to standardize data structure across different input sources.
The processing pipeline includes: 1. Loading raw market data tables for each ticker 2. Extracting and organizing metric tables based on field mappings 3. Adding calculated metrics (returns, etc.) to the dataset
- Parameters:
input_handlers (list[InputHandlerInterface]) – A list of input handler objects that implement the InputHandlerInterface.
tickers (set[str]) – A set of ticker symbols for which to load and process market data.
configuration (Configuration) – A configuration object containing settings and parameters for data loading and processing operations.
- Returns:
- A dictionary where keys are metric names (user’s column
names or calculated metric names) and values are PyArrow tables.
- Return type:
dict[str, pa.Table]
- Raises:
ReadingCSVError – If there are issues reading data files.
ValueError – If input handlers return unexpected data types.
See also
- Data Formats
Expected column layout of market data files (date + commission/execution/mark-to-market price columns, CSV/Parquet conventions).
load_market_data_tablesStep 1 of this pipeline (raw load).
add_calculated_metricsStep 3 of this pipeline (derived metrics).
- load_market_data_tables(input_handlers: list[InputHandlerInterface], tickers: set[str], configuration: Configuration) dict[str, MarketDataTable]#
Load market data for multiple tickers and create MarketDataTable entities.
This method iterates through all provided tickers and attempts to load their market data using the available input handlers. Each successfully loaded ticker’s data is wrapped in a MarketDataTable entity. If any tickers fail to load, all errors are collected and reported together.
- Parameters:
input_handlers (list[InputHandlerInterface]) – A list of input handler instances that implement the InputHandlerInterface. Handlers are tried in order until one successfully loads the data.
tickers (set[str]) – The set of ticker symbols for which to load market data.
configuration (Configuration) – The configuration object containing backtest parameters such as date ranges and file paths.
- Returns:
A dictionary mapping ticker symbols to their corresponding MarketDataTable entities containing the loaded PyArrow tables.
- Return type:
dict[str, MarketDataTable]
- Raises:
ReadingCSVError – If any tickers fail to load. The error message contains a formatted summary of all failures, categorized by error type (missing files, CSV reading errors, or unexpected errors).