Source code for pyindexnum.multilateral

"""
Multilateral price index functions for the PyIndexNum library.

This module contains functions for calculating multilateral price indices
that compare prices across multiple time periods simultaneously.
"""

import polars as pl
import numpy as np
from scipy.optimize import root_scalar
from typing import Optional
from .bilateral import fisher, tornqvist, jevons


[docs] def geks_fisher(df: pl.DataFrame) -> pl.DataFrame: """ Compute the GEKS-Fisher multilateral price index. The GEKS (Generalized EKS) method uses bilateral Fisher indices between all pairs of periods. The price level for period t relative to period 1 is the geometric mean of all possible bilateral links. Formula: P_geks_t = product_{k=1}^T [P_F(p^k, p^t, q^k, q^t) / P_F(p^k, p^1, q^k, q^1)]^(1/T) Args: df: Polars DataFrame with standardized columns ("product_id", "period", "aggregated_price", "aggregated_quantity") containing data for multiple periods, with each product having exactly one price and quantity per period. Returns: DataFrame with columns "period" (Date) and "index_value" (float), where index_value represents the multilateral price index for each period relative to the base period (first chronological period = 1.0). Raises: ValueError: If DataFrame doesn't meet requirements (see _validate_multilateral_input). Examples: >>> import polars as pl >>> df = pl.DataFrame({ ... "product_id": ["A", "A", "B", "B"], ... "period": [pl.date(2023, 1, 1), pl.date(2023, 2, 1), pl.date(2023, 1, 1), pl.date(2023, 2, 1)], ... "aggregated_price": [100, 110, 200, 210], ... "aggregated_quantity": [10, 10, 20, 20] ... }) >>> result = geks_fisher(df) >>> # Returns DataFrame with period and index_value columns """ return _geks_base(df, fisher)
[docs] def geks_tornqvist(df: pl.DataFrame) -> pl.DataFrame: """ Compute the GEKS-Törnqvist (CCDI) multilateral price index. The GEKS method applied using the Törnqvist index as the underlying bilateral formula. Formula: P_ccdi_t = product_{k=1}^T [P_T(p^k, p^t, s^k, s^t) / P_T(p^k, p^1, s^k, s^1)]^(1/T) Args: df: Polars DataFrame with standardized columns ("product_id", "period", "aggregated_price", "aggregated_quantity") containing data for multiple periods, with each product having exactly one price and quantity per period. Returns: DataFrame with columns "period" (Date) and "index_value" (float), where index_value represents the multilateral price index for each period relative to the base period (first chronological period = 1.0). Raises: ValueError: If DataFrame doesn't meet requirements (see _validate_multilateral_input). Examples: >>> import polars as pl >>> df = pl.DataFrame({ ... "product_id": ["A", "A", "B", "B"], ... "period": [pl.date(2023, 1, 1), pl.date(2023, 2, 1), pl.date(2023, 1, 1), pl.date(2023, 2, 1)], ... "aggregated_price": [100, 110, 200, 210], ... "aggregated_quantity": [10, 10, 20, 20] ... }) >>> result = geks_tornqvist(df) >>> # Returns DataFrame with period and index_value columns """ return _geks_base(df, tornqvist)
[docs] def geks_jevons(df: pl.DataFrame) -> pl.DataFrame: """ Compute the GEKS-Jevons multilateral price index. The GEKS method applied using the Jevons (unweighted geometric mean of price relatives) index as the underlying bilateral formula. Since the Jevons index is unweighted, only price information is required — no quantity column is necessary. Formula: P_geks-J(0,t) = product_{k=0}^{T-1} [P_J(k,t) / P_J(k,0)]^(1/T) where P_J(a,b) is the Jevons bilateral index between periods a and b: P_J(a,b) = [prod_{i=1}^{N} (p_i^b / p_i^a)]^(1/N) GEKS-Jevons is particularly useful for web-scraped data where quantity information is unavailable. Despite being unweighted, it has been found to outperform some weighted bilateral methods in empirical studies. Args: df: Polars DataFrame with columns ("product_id", "period", "aggregated_price") containing data for multiple periods, with each product having exactly one price per period. The "aggregated_quantity" column is optional and, if present, will be ignored (Jevons is an unweighted index). Returns: DataFrame with columns "period" (Date) and "index_value" (float), where index_value represents the multilateral price index for each period relative to the base period (first chronological period = 1.0). Raises: ValueError: If DataFrame doesn't meet requirements (see _validate_multilateral_input). Examples: >>> import polars as pl >>> df = pl.DataFrame({ ... "product_id": ["A", "A", "B", "B"], ... "period": [pl.date(2023, 1, 1), pl.date(2023, 2, 1), pl.date(2023, 1, 1), pl.date(2023, 2, 1)], ... "aggregated_price": [100, 110, 200, 210], ... }) >>> result = geks_jevons(df) >>> # Returns DataFrame with period and index_value columns """ return _geks_base(df, jevons, weighted=False)
def _geks_base(df: pl.DataFrame, bilateral_func, weighted: bool = True) -> pl.DataFrame: """Helper for GEKS indices computation. Args: df: Input DataFrame. bilateral_func: Bilateral index function to use (e.g., fisher, jevons). weighted: If True, require aggregated_quantity column. If False, only require aggregated_price (suitable for unweighted indices like Jevons). """ _validate_multilateral_input(df, weighted) has_quantity = "aggregated_quantity" in df.columns periods = df.select("period").unique().sort("period").to_series().to_list() T = len(periods) if T < 2: raise ValueError("GEKS requires at least 2 periods") # Pre-calculate all bilateral indices P(k, t) # Store in a matrix-like dictionary for easy access bilateral_matrix = {} for i, period_i in enumerate(periods): for j, period_j in enumerate(periods): if i == j: bilateral_matrix[(i, j)] = 1.0 continue # Filter data for the two periods df_i = df.filter(pl.col("period") == period_i) df_j = df.filter(pl.col("period") == period_j) # Important: Ensure periods are in chronological order for the bilateral function # The bilateral functions expect first date as base, second as current. if i < j: base_p, curr_p = period_i, period_j base_df, curr_df = df_i, df_j else: base_p, curr_p = period_j, period_i base_df, curr_df = df_j, df_i # Create bilateral dataframe format expected by bilateral functions if has_quantity: bilateral_df = pl.concat([ base_df.select(["product_id", "aggregated_price", "aggregated_quantity"]) .rename({"aggregated_price": "price", "aggregated_quantity": "quantity"}) .with_columns(pl.lit(base_p).alias("date")), curr_df.select(["product_id", "aggregated_price", "aggregated_quantity"]) .rename({"aggregated_price": "price", "aggregated_quantity": "quantity"}) .with_columns(pl.lit(curr_p).alias("date")) ]) else: bilateral_df = pl.concat([ base_df.select(["product_id", "aggregated_price"]) .rename({"aggregated_price": "price"}) .with_columns(pl.lit(base_p).alias("date")), curr_df.select(["product_id", "aggregated_price"]) .rename({"aggregated_price": "price"}) .with_columns(pl.lit(curr_p).alias("date")) ]) try: # Calculate bilateral index idx = bilateral_func(bilateral_df) # If i > j, we need the inverse because bilateral_func(bilateral_df) # calculated P(j, i) but we want P(i, j) if i < j: bilateral_matrix[(i, j)] = idx else: bilateral_matrix[(i, j)] = 1.0 / idx except ValueError: # In GEKS, we should ideally have a complete graph, # but if data is missing, this is a simplified version. # Here we assume full overlap for simplicity as per requirements. raise # Compute GEKS: P_geks_t = product_{k=0}^{T-1} [P(k, t) / P(k, 0)]^(1/T) # This is equivalent to linking period t to period 0 via all intermediate periods k indices = [] for t in range(T): log_sum = 0.0 for k in range(T): # log(P(k, t) / P(k, 0)) # Note: bilateral_matrix stores P(i, j) = Price in j relative to i # So P(k, t) is price in t relative to k # P(k, 0) is price in 0 relative to k # P(k, t) / P(k, 0) is the link from 0 to t via k log_sum += np.log(bilateral_matrix[(k, t)] / bilateral_matrix[(k, 0)]) geks_t = np.exp(log_sum / T) indices.append({"period": periods[t], "index_value": geks_t}) return pl.DataFrame(indices)
[docs] def geary_khamis(df: pl.DataFrame, max_iter: int = 100, tol: float = 1e-8) -> pl.DataFrame: """ Compute the Geary-Khamis multilateral price index. The Geary-Khamis method is an iterative multilateral index that solves for reference prices and period price levels simultaneously. Args: df: Polars DataFrame with standardized columns ("product_id", "period", "aggregated_price", "aggregated_quantity") containing data for multiple periods, with each product having exactly one price and quantity per period. max_iter: Maximum number of iterations for convergence (default 100). tol: Tolerance for convergence check (default 1e-8). Returns: DataFrame with columns "period" (Date) and "index_value" (float), where index_value represents the multilateral price index for each period relative to the base period (first chronological period = 1.0). Raises: ValueError: If DataFrame doesn't meet requirements or iteration doesn't converge. """ _validate_multilateral_input(df) periods = df.select("period").unique().sort("period").to_series().to_list() products = df.select("product_id").unique().to_series().to_list() T = len(periods) N = len(products) # Convert to matrix for faster iterative computation # Rows: periods, Cols: products pivot_price = df.pivot(index="period", on="product_id", values="aggregated_price").sort("period") pivot_qty = df.pivot(index="period", on="product_id", values="aggregated_quantity").sort("period") # Fill nulls if any (though validation should catch this) product_cols = [str(p) for p in products] P = pivot_price.select(product_cols).to_numpy() Q = pivot_qty.select(product_cols).to_numpy() # Initialize price levels P_GK^t = 1 for all t price_levels = np.ones(T) for iteration in range(max_iter): prev_price_levels = price_levels.copy() # 1. Calculate Reference Prices (alpha_n) # alpha_n = [sum_t (q_n^t * p_n^t / P_GK^t)] / [sum_t q_n^t] numerator_alpha = np.sum((Q * P) / price_levels[:, np.newaxis], axis=0) denominator_alpha = np.sum(Q, axis=0) alpha = numerator_alpha / denominator_alpha # 2. Update Period Price Levels (P_GK^t) # P_GK^t = [sum_n p_n^t * q_n^t] / [sum_n alpha_n * q_n^t] numerator_P = np.sum(P * Q, axis=1) denominator_P = np.sum(alpha * Q, axis=1) price_levels = numerator_P / denominator_P # Normalize price_levels to make the system stable (e.g., first period = 1) # This doesn't change the relatives but helps convergence price_levels /= price_levels[0] # Check convergence if np.max(np.abs(price_levels - prev_price_levels)) < tol: break else: raise ValueError(f"Geary-Khamis did not converge within {max_iter} iterations") indices = [] for t in range(T): indices.append({ "period": periods[t], "index_value": price_levels[t] }) return pl.DataFrame(indices)
[docs] def time_product_dummy(df: pl.DataFrame, weighted: bool = True) -> pl.DataFrame: """ Compute the Time Product Dummy multilateral price index. The Time Product Dummy (TPD) method uses regression analysis to estimate price indices. Time and product dummy variables are included in the model, with the index values derived from the time dummy coefficients. Args: df: Polars DataFrame with standardized columns ("product_id", "period", "aggregated_price") and optionally "aggregated_quantity" if weighted=True. Contains data for multiple periods, with each product having exactly one price per period. weighted: If True, use weighted least squares with expenditure shares (p*q / sum(p*q) per period) as weights. If False, use unweighted OLS. If no quantity column, automatically uses unweighted regardless of this parameter. Returns: DataFrame with columns "period" (Date) and "index_value" (float), where index_value represents the multilateral price index for each period relative to the base period (first chronological period = 1.0). Raises: ValueError: If DataFrame doesn't meet requirements. Examples: >>> import polars as pl >>> df = pl.DataFrame({ ... "product_id": ["A", "A", "B", "B"], ... "period": [pl.date(2023, 1, 1), pl.date(2023, 2, 1), pl.date(2023, 1, 1), pl.date(2023, 2, 1)], ... "aggregated_price": [100, 110, 200, 210], ... "aggregated_quantity": [10, 10, 20, 20] ... }) >>> result = time_product_dummy(df, weighted=True) >>> # Returns DataFrame with period and index_value columns """ _validate_multilateral_input(df, weighted) periods = df.select("period").unique().sort("period").to_series().to_list() products = df.select("product_id").unique().to_series().to_list() if len(periods) < 2 or len(products) < 2: raise ValueError("Time Product Dummy requires at least 2 periods and 2 products") # Create design matrix for regression # Dependent variable: log(price) # Independent variables: time dummies + product dummies # Create time dummy variables (exclude base period) base_period = periods[0] time_dummies = {} for period in periods[1:]: # Skip base period time_dummies[period] = [1 if p == period else 0 for p in df.select("period").to_series()] # Create product dummy variables (exclude one product to avoid multicollinearity) base_product = products[0] product_dummies = {} for product in products[1:]: # Skip base product product_dummies[product] = [1 if p == product else 0 for p in df.select("product_id").to_series()] # Prepare X matrix (design matrix) n_obs = df.height n_time_dummies = len(time_dummies) n_product_dummies = len(product_dummies) n_vars = n_time_dummies + n_product_dummies X = np.zeros((n_obs, n_vars)) # Fill time dummies for i, (period, dummy_vals) in enumerate(time_dummies.items()): X[:, i] = dummy_vals # Fill product dummies for i, (product, dummy_vals) in enumerate(product_dummies.items()): X[:, n_time_dummies + i] = dummy_vals # Add intercept column (for base period and base product) X = np.column_stack([np.ones(n_obs), X]) # Dependent variable: log of prices y = np.log(df.select("aggregated_price").to_series().to_numpy()) # Weights for WLS: expenditure shares per period (p*q / sum(p*q) within each period) weights = None if weighted and "aggregated_quantity" in df.columns: prices_arr = df.select("aggregated_price").to_series().to_numpy() quantities_arr = df.select("aggregated_quantity").to_series().to_numpy() expenditure = prices_arr * quantities_arr periods_arr = df.select("period").to_series() weights = np.empty(n_obs) unique_periods = periods_arr.unique(maintain_order=True).to_list() for p in unique_periods: mask = (periods_arr == p) period_total = expenditure[mask.to_numpy()].sum() weights[mask.to_numpy()] = expenditure[mask.to_numpy()] / period_total # Perform regression using sqrt-weights trick to avoid dense N×N diagonal matrix. # Use QR decomposition via np.linalg.lstsq for numerical stability with # large, potentially ill-conditioned design matrices. if weighted and weights is not None: sqrt_w = np.sqrt(weights) Xw = X * sqrt_w[:, np.newaxis] yw = y * sqrt_w beta, _, _, _ = np.linalg.lstsq(Xw, yw, rcond=None) else: beta, _, _, _ = np.linalg.lstsq(X, y, rcond=None) # Extract time dummy coefficients # beta[0] is intercept (base period) # beta[1:n_time_dummies+1] are time dummy coefficients indices = [{"period": base_period, "index_value": 1.0}] # Base period = 1.0 for i, period in enumerate(periods[1:], 1): index_value = np.exp(beta[i]) # exp(time_dummy_coeff) indices.append({"period": period, "index_value": index_value}) return pl.DataFrame(indices)
def _validate_multilateral_input(df: pl.DataFrame, weighted: bool = True) -> None: """ Validate DataFrame for multilateral index computation. Args: df: DataFrame to validate weighted: Whether weighted regression is requested, defult True Raises: ValueError: If validation fails """ # Check required columns if weighted: required_cols = ["product_id", "period", "aggregated_price", "aggregated_quantity"] else: required_cols = ["product_id", "period", "aggregated_price"] missing_cols = [col for col in required_cols if col not in df.columns] if missing_cols: raise ValueError(f"Missing required columns: {missing_cols}") # Check data types if not df.schema["aggregated_price"].is_numeric(): raise ValueError("aggregated_price must be numeric") # Check no negative or zero prices min_price = df.select(pl.col("aggregated_price").min()).item() if min_price <= 0: raise ValueError("All aggregated_price values must be positive") # Check each product has exactly one observation per period grouped = df.group_by(["product_id", "period"]).len() max_count = grouped.select(pl.col("len").max()).item() if max_count > 1: raise ValueError("Each product must have exactly one observation per period") # Check we have at least 2 periods n_periods = df.select("period").unique().height if n_periods < 2: raise ValueError("Multilateral indices require at least 2 periods") # Check we have at least 2 products n_products = df.select("product_id").unique().height if n_products < 2: raise ValueError("Multilateral indices require at least 2 products") # Additional checks for weighted indices if weighted: # Check aggregated_quantity is numeric if weighted if "aggregated_quantity" in df.columns and not df.schema["aggregated_quantity"].is_numeric(): raise ValueError("aggregated_quantity must be numeric") # Check quantities are positive if present if "aggregated_quantity" in df.columns: min_quantity = df.select(pl.col("aggregated_quantity").min()).item() if min_quantity <= 0: raise ValueError("All aggregated_quantity values must be positive")