Source code for pyindexnum.extension

"""
Extension methods for connecting two different multilateral indices.

This module contains functions for splicing two multilateral price indices
that are calculated on the same window length but shifted by one period.
These methods are used to extend price index series when using rolling windows.
"""

import polars as pl
import numpy as np
from typing import Tuple, List
from datetime import datetime, timedelta


[docs] def movement_splice(index1: pl.DataFrame, index2: pl.DataFrame) -> pl.DataFrame: """ Calculate the movement splice extension method. The movement splice method calculates the rate of change between the last and second-last period in the second window, then applies this rate to extend the first window by one period. Args: index1: First multilateral index DataFrame with columns "period" and "index_value" index2: Second multilateral index DataFrame with columns "period" and "index_value" Returns: DataFrame with the full extended index series including all periods from index1 plus the spliced period Raises: ValueError: If input validation fails Examples: >>> import polars as pl >>> from datetime import date >>> idx1 = pl.DataFrame({ ... "period": [date(2023, 1, 1), date(2023, 2, 1), date(2023, 3, 1)], ... "index_value": [1.0, 1.05, 1.10] ... }) >>> idx2 = pl.DataFrame({ ... "period": [date(2023, 2, 1), date(2023, 3, 1), date(2023, 4, 1)], ... "index_value": [1.05, 1.10, 1.15] ... }) >>> result = movement_splice(idx1, idx2) >>> # Returns the full extended index series including periods 2023-01-01, 2023-02-01, 2023-03-01, and 2023-04-01 """ _validate_indices(index1, index2) # Get the last period from index1 and the last two periods from index2 last_period_idx1 = index1.select(pl.col("period").max()).item() last_index_idx1 = index1.filter(pl.col("period") == last_period_idx1).select("index_value").item() # Get last two periods from index2 sorted_idx2 = index2.sort("period") last_period_idx2 = sorted_idx2.select(pl.col("period").max()).item() second_last_period_idx2 = sorted_idx2.select(pl.col("period")).to_series()[-2] last_index_idx2 = sorted_idx2.filter(pl.col("period") == last_period_idx2).select("index_value").item() second_last_index_idx2 = sorted_idx2.filter(pl.col("period") == second_last_period_idx2).select("index_value").item() # Calculate movement rate movement_rate = last_index_idx2 / second_last_index_idx2 # Calculate spliced index value spliced_index = last_index_idx1 * movement_rate # Create result DataFrame spliced_df = pl.DataFrame({ "period": [last_period_idx2], "index_value": [spliced_index] }) # Return the full extended series return pl.concat([index1, spliced_df])
[docs] def window_splice(index1: pl.DataFrame, index2: pl.DataFrame) -> pl.DataFrame: """ Calculate the window splice extension method. The window splice method calculates the rate of change between the last and first period of the second window, then uses this rate to connect the second period of the first window to the last period of the second window. Args: index1: First multilateral index DataFrame with columns "period" and "index_value" index2: Second multilateral index DataFrame with columns "period" and "index_value" Returns: DataFrame with the full extended index series including all periods from index1 plus the spliced period Raises: ValueError: If input validation fails Examples: >>> import polars as pl >>> from datetime import date >>> idx1 = pl.DataFrame({ ... "period": [date(2023, 1, 1), date(2023, 2, 1), date(2023, 3, 1)], ... "index_value": [1.0, 1.05, 1.10] ... }) >>> idx2 = pl.DataFrame({ ... "period": [date(2023, 2, 1), date(2023, 3, 1), date(2023, 4, 1)], ... "index_value": [1.05, 1.10, 1.15] ... }) >>> result = window_splice(idx1, idx2) >>> # Returns the full extended index series including periods 2023-01-01, 2023-02-01, 2023-03-01, and 2023-04-01 """ _validate_indices(index1, index2) # Get periods and indices sorted_idx1 = index1.sort("period") sorted_idx2 = index2.sort("period") second_period_idx1 = sorted_idx1.select(pl.col("period")).to_series()[1] last_period_idx2 = sorted_idx2.select(pl.col("period").max()).item() second_index_idx1 = sorted_idx1.filter(pl.col("period") == second_period_idx1).select("index_value").item() first_index_idx2 = sorted_idx2.select(pl.col("index_value")).to_series()[0] last_index_idx2 = sorted_idx2.filter(pl.col("period") == last_period_idx2).select("index_value").item() # Calculate window rate of change (from first to last period in index2) window_rate = last_index_idx2 / first_index_idx2 # Calculate spliced index by applying the window rate to the second period of index1 spliced_index = second_index_idx1 * window_rate # Create result DataFrame spliced_df = pl.DataFrame({ "period": [last_period_idx2], "index_value": [spliced_index] }) # Return the full extended series return pl.concat([index1, spliced_df])
[docs] def half_splice(index1: pl.DataFrame, index2: pl.DataFrame) -> pl.DataFrame: """ Calculate the half splice extension method. The half splice method uses the period in the middle of the first window (T/2 if the window is even, T/2+1 if the window is odd) as the connecting point. Args: index1: First multilateral index DataFrame with columns "period" and "index_value" index2: Second multilateral index DataFrame with columns "period" and "index_value" Returns: DataFrame with the full extended index series including all periods from index1 plus the spliced period Raises: ValueError: If input validation fails or window length is even Examples: >>> import polars as pl >>> from datetime import date >>> idx1 = pl.DataFrame({ ... "period": [date(2023, 1, 1), date(2023, 2, 1), date(2023, 3, 1)], ... "index_value": [1.0, 1.05, 1.10] ... }) >>> idx2 = pl.DataFrame({ ... "period": [date(2023, 2, 1), date(2023, 3, 1), date(2023, 4, 1)], ... "index_value": [1.05, 1.10, 1.15] ... }) >>> result = half_splice(idx1, idx2) >>> # Returns the full extended index series including periods 2023-01-01, 2023-02-01, 2023-03-01, and 2023-04-01 """ _validate_indices(index1, index2) # Get middle period of index1 window_length = index1.height if window_length % 2 == 0: # Python is zero-based for indexing middle_idx = (window_length // 2) - 1 else: middle_idx = window_length // 2 sorted_idx1 = index1.sort("period") middle_period_idx1 = sorted_idx1.select(pl.col("period")).to_series()[middle_idx] middle_index_idx1 = sorted_idx1.filter(pl.col("period") == middle_period_idx1).select("index_value").item() # Get the same period from index2 (should be the overlapping middle period) sorted_idx2 = index2.sort("period") middle_index_idx2 = sorted_idx2.filter(pl.col("period") == middle_period_idx1).select("index_value").item() # Get last period for index2 last_period_idx2 = sorted_idx2.select(pl.col("period").max()).item() last_index_idx2 = sorted_idx2.filter(pl.col("period") == last_period_idx2).select("index_value").item() # Calculate rate of change in index2 from middle to last period middle_to_last_rate = last_index_idx2 / middle_index_idx2 # Apply this rate to the middle period of index1 to get the spliced index # The spliced index = middle_index_idx1 * middle_to_last_rate spliced_index = middle_index_idx1 * middle_to_last_rate # Create result DataFrame spliced_df = pl.DataFrame({ "period": [last_period_idx2], "index_value": [spliced_index] }) # Return the full extended series return pl.concat([index1, spliced_df])
[docs] def mean_splice(index1: pl.DataFrame, index2: pl.DataFrame) -> pl.DataFrame: """ Calculate the mean splice extension method (Diewert and Fox, 2018). The mean splice method uses the geometric mean of all possible choices of splicing, i.e., all periods which are included in the current window and the previous one. This is the most sophisticated splicing method. Args: index1: First multilateral index DataFrame with columns "period" and "index_value" index2: Second multilateral index DataFrame with columns "period" and "index_value" Returns: DataFrame with the full extended index series including all periods from index1 plus the spliced period Raises: ValueError: If input validation fails Examples: >>> import polars as pl >>> from datetime import date >>> idx1 = pl.DataFrame({ ... "period": [date(2023, 1, 1), date(2023, 2, 1), date(2023, 3, 1)], ... "index_value": [1.0, 1.05, 1.10] ... }) >>> idx2 = pl.DataFrame({ ... "period": [date(2023, 2, 1), date(2023, 3, 1), date(2023, 4, 1)], ... "index_value": [1.05, 1.10, 1.15] ... }) >>> result = mean_splice(idx1, idx2) >>> # Returns the full extended index series including periods 2023-01-01, 2023-02-01, 2023-03-01, and 2023-04-01 """ _validate_indices(index1, index2) # Get overlapping periods (all periods except the first of index1 and last of index2) sorted_idx1 = index1.sort("period") sorted_idx2 = index2.sort("period") # Find overlapping periods periods_idx1 = set(sorted_idx1.select("period").to_series().to_list()) periods_idx2 = set(sorted_idx2.select("period").to_series().to_list()) overlapping_periods = periods_idx1.intersection(periods_idx2) # Get last period for index2 last_period_idx2 = sorted_idx2.select(pl.col("period").max()).item() last_index_idx2 = sorted_idx2.filter(pl.col("period") == last_period_idx2).select("index_value").item() # Calculate splicing indices for each overlapping period splice_list = [] for period in overlapping_periods: # Get index values for this period in both indices idx1_value = sorted_idx1.filter(pl.col("period") == period).select("index_value").item() idx2_value = sorted_idx2.filter(pl.col("period") == period).select("index_value").item() # Calculate rate of change in index2 from this to last period current_to_last_rate = last_index_idx2 / idx2_value # Apply this rate to the current period of index1 to get the current spliced index spliced_current = idx1_value * current_to_last_rate splice_list.append(spliced_current) # Calculate geometric mean of all splicing indices if not splice_list: raise ValueError("No valid splicing index calculated") spliced_index = np.exp(np.mean(np.log(splice_list))) # Create result DataFrame last_period_idx2 = sorted_idx2.select(pl.col("period").max()).item() spliced_df = pl.DataFrame({ "period": [last_period_idx2], "index_value": [spliced_index] }) # Return the full extended series return pl.concat([index1, spliced_df])
[docs] def fixed_base_rolling_window(index1: pl.DataFrame, index2: pl.DataFrame, base_period: str) -> pl.DataFrame: """ Calculate the fixed base rolling window extension method. The fixed base rolling method calculates the rate of change between the last period of the second window and a reference period common to the first and second window, then uses this rate to connect the base period of the first window to the last period of the second window. Args: index1: First multilateral index DataFrame with columns "period" and "index_value" index2: Second multilateral index DataFrame with columns "period" and "index_value" base_period: string indicating the date of the base period in YYYY-MM-DD format Returns: DataFrame with the full extended index series including all periods from index1 plus the spliced period Raises: ValueError: If input validation fails Examples: >>> import polars as pl >>> from datetime import date >>> idx1 = pl.DataFrame({ ... "period": [date(2023, 1, 1), date(2023, 2, 1), date(2023, 3, 1)], ... "index_value": [1.0, 1.05, 1.10] ... }) >>> idx2 = pl.DataFrame({ ... "period": [date(2023, 2, 1), date(2023, 3, 1), date(2023, 4, 1)], ... "index_value": [1.05, 1.10, 1.15] ... }) >>> result = fixed_base_rolling_window(idx1, idx2, "2023-02-01") >>> # Returns the full extended index series including periods 2023-01-01, 2023-02-01, 2023-03-01, and 2023-04-01 """ _validate_indices(index1, index2) # Get periods and indices sorted_idx1 = index1.sort("period") sorted_idx2 = index2.sort("period") base_period_date = datetime.strptime(base_period, "%Y-%m-%d").date() last_period_idx2 = sorted_idx2.select(pl.col("period").max()).item() base_index_idx1 = sorted_idx1.filter(pl.col("period") == base_period_date).select("index_value").item() base_index_idx2 = sorted_idx2.filter(pl.col("period") == base_period_date).select("index_value").item() last_index_idx2 = sorted_idx2.filter(pl.col("period") == last_period_idx2).select("index_value").item() # Calculate link rate of change (from base period to last period in index2) link_rate = last_index_idx2 / base_index_idx1 # Calculate spliced index by applying the window rate to the second period of index1 spliced_index = base_index_idx2 * link_rate # Create result DataFrame spliced_df = pl.DataFrame({ "period": [last_period_idx2], "index_value": [spliced_index] }) # Return the full extended series return pl.concat([index1, spliced_df])
def _validate_indices(index1: pl.DataFrame, index2: pl.DataFrame) -> None: """ Validate input indices for extension methods. Args: index1: First index DataFrame index2: Second index DataFrame Raises: ValueError: If validation fails """ # Check DataFrame types if not isinstance(index1, pl.DataFrame) or not isinstance(index2, pl.DataFrame): raise ValueError("Both inputs must be polars DataFrames") # Check required columns required_cols = ["period", "index_value"] for i, df in enumerate([index1, index2], 1): missing_cols = [col for col in required_cols if col not in df.columns] if missing_cols: raise ValueError(f"Index {i} missing required columns: {missing_cols}") # Check data types for i, df in enumerate([index1, index2], 1): if not df.schema["index_value"].is_numeric(): raise ValueError(f"Index {i} index_value must be numeric") if not df.schema["period"].is_temporal(): raise ValueError(f"Index {i} period must be a temporal type") # Check window lengths are equal if index1.height != index2.height: raise ValueError("Both indices must have the same window length") # Check at least 2 periods if index1.height < 2: raise ValueError("Indices must have at least 2 periods") # Check periods are sorted and consecutive for i, df in enumerate([index1, index2], 1): sorted_periods = df.select("period").sort("period").to_series().to_list() if len(sorted_periods) != len(set(sorted_periods)): raise ValueError(f"Index {i} has duplicate periods") # Check that indices are shifted by exactly one period periods1 = set(index1.select("period").to_series().to_list()) periods2 = set(index2.select("period").to_series().to_list()) # Calculate expected shift (should be one period) overlap = periods1.intersection(periods2) non_overlap1 = periods1 - periods2 non_overlap2 = periods2 - periods1 # Check that the non overlapping periods are the begin of th e first and end of the second first_period1 = min(periods1) last_period2 = max(periods2) if first_period1 not in non_overlap1 or last_period2 not in non_overlap2: raise ValueError("The non-overlappting periods must be the first of index1 and last of index2") # Check that exactly one period is unique to each index (shifted by one period) # For some edge cases (like no overlapping periods), raise a different error if len(non_overlap1) != 1 or len(non_overlap2) != 1: # Check if this is a case where there are no overlapping periods if len(overlap) == 0: raise ValueError("Indices must be overlapped") else: raise ValueError("Indices must be shifted by exactly one period") # Check that all index values are positive for i, df in enumerate([index1, index2], 1): min_value = df.select(pl.col("index_value").min()).item() if min_value <= 0: raise ValueError(f"Index {i} must have all positive index values")