Source code for porcupy.cpo_class

import numpy as np
import multiprocessing as mp
from collections import deque
from typing import Callable, Tuple, List, Optional, Dict, Any, Union
import warnings

from .base import Optimizer


[docs]class CPO(Optimizer): """ CPO (Crested Porcupine Optimizer) for optimization problems. This class implements the CPO algorithm, a nature-inspired metaheuristic that mimics the defensive behaviors of crested porcupines (sight, sound, odor, physical attack) to balance exploration and exploitation, with cyclic population reduction for convergence. Parameters ---------- dimensions : int Number of dimensions in the search space. bounds : tuple of numpy.ndarray A tuple of size 2 where the first entry is the minimum bound while the second entry is the maximum bound. Each array must be of shape (dimensions,). pop_size : int, optional Number of search agents (porcupines) in the initial population (default: 30). min_pop_size : int, optional Minimum population size during reduction cycles (default: pop_size // 2). max_iter : int, optional Maximum number of iterations (default: 100). cycles : int, optional Number of cycles for population reduction (default: 2). alpha : float, optional Convergence rate for fourth defense mechanism (default: 0.2). tf : float, optional Tradeoff threshold between third and fourth mechanisms (default: 0.8). ftol : float, optional Relative error in objective_func(best_pos) acceptable for convergence (default: -np.inf). ftol_iter : int, optional Number of iterations over which the relative error in objective_func(best_pos) is acceptable for convergence (default: 1). Attributes ---------- min_pop_size : int Minimum population size during reduction cycles. cycles : int Number of cycles for population reduction. alpha : float Convergence rate for fourth defense mechanism. tf : float Tradeoff threshold between third and fourth mechanisms. positions : ndarray Current positions of all porcupines in the population. fitness : ndarray Current fitness values of all porcupines. personal_best_pos : ndarray Personal best positions found by each porcupine. """ def __init__( self, dimensions: int, bounds: Tuple[np.ndarray, np.ndarray], pop_size: int = 30, min_pop_size: Optional[int] = None, max_iter: int = 100, cycles: int = 2, alpha: float = 0.2, tf: float = 0.8, ftol: float = -np.inf, ftol_iter: int = 1, ): # Initialize base class super(CPO, self).__init__( dimensions=dimensions, bounds=bounds, pop_size=pop_size, max_iter=max_iter, options={"alpha": alpha, "tf": tf, "cycles": cycles}, ftol=ftol, ftol_iter=ftol_iter, ) # Set CPO-specific parameters
[docs] self.min_pop_size = min_pop_size if min_pop_size is not None else max(10, pop_size // 2)
[docs] self.cycles = cycles
[docs] self.alpha = alpha
[docs] self.tf = tf
# Initialize population attributes
[docs] self.positions = None
[docs] self.fitness = None
[docs] self.personal_best_pos = None
# Tracking attributes for visualization
[docs] self.positions_history = []
[docs] self.defense_types_history = []
[docs] self.pop_size_history = []
[docs] self.best_positions_history = []
[docs] self.fitness_history = []
# Initialize the population self._init_population() def _init_population(self): """Initialize the population of porcupines""" lb, ub = self.bounds # Initialize positions within bounds self.positions = np.zeros((self.pop_size, self.dimensions)) for i in range(self.dimensions): self.positions[:, i] = np.random.uniform(lb[i], ub[i], self.pop_size) # Initialize fitness and personal best self.fitness = np.full(self.pop_size, np.inf) self.personal_best_pos = self.positions.copy() def _evaluate_population(self, objective_func, f_ieqcons=None, pool=None, **kwargs): """Evaluate the fitness of all porcupines in the population Parameters ---------- objective_func : callable The objective function to minimize f_ieqcons : callable, optional Constraint function pool : multiprocessing.Pool, optional Pool for parallel evaluation **kwargs Additional arguments for the objective function """ if pool is None: # Serial evaluation for i in range(len(self.positions)): if f_ieqcons is None or np.all(np.array(f_ieqcons(self.positions[i])) >= 0): self.fitness[i] = objective_func(self.positions[i], **kwargs) else: # Parallel evaluation results = [] for i in range(len(self.positions)): if f_ieqcons is None or np.all(np.array(f_ieqcons(self.positions[i])) >= 0): results.append(pool.apply_async(objective_func, (self.positions[i],), kwargs)) else: results.append(None) # Collect results for i, result in enumerate(results): if result is not None: self.fitness[i] = result.get() def _update_best_solution(self): """Update the best solution found so far""" best_idx = np.argmin(self.fitness) if self.fitness[best_idx] < self.best_cost: self.best_pos = self.positions[best_idx].copy() self.best_cost = self.fitness[best_idx] def _calculate_current_pop_size(self, iteration): """Calculate the current population size based on cyclic reduction Parameters ---------- iteration : int Current iteration number Returns ------- int Current population size """ cycle_progress = (iteration % (self.max_iter // self.cycles)) / (self.max_iter // self.cycles) current_pop_size = max(1, int(self.min_pop_size + (self.pop_size - self.min_pop_size) * (1 - cycle_progress))) # Ensure population size does not exceed current array size return min(current_pop_size, len(self.positions)) def _apply_first_defense(self, i, current_pop_size): """Apply first defense mechanism (sight) Parameters ---------- i : int Index of the current porcupine current_pop_size : int Current population size """ # Midpoint between current and random position y = (self.positions[i] + self.positions[np.random.randint(current_pop_size)]) / 2 # Update with random perturbation toward global best self.positions[i] += np.random.randn() * np.abs(2 * np.random.random() * self.best_pos - y) def _apply_second_defense(self, i, current_pop_size, u1): """Apply second defense mechanism (sound) Parameters ---------- i : int Index of the current porcupine current_pop_size : int Current population size u1 : ndarray Binary mask for position updates """ # Midpoint with another random position y = (self.positions[i] + self.positions[np.random.randint(current_pop_size)]) / 2 # Update using binary mask and random differences rand_diff = (self.positions[np.random.randint(current_pop_size)] - self.positions[np.random.randint(current_pop_size)]) self.positions[i] = u1 * self.positions[i] + (1 - u1) * (y + np.random.random() * rand_diff) def _apply_third_defense(self, i, current_pop_size, u1, yt, s): """Apply third defense mechanism (odor) Parameters ---------- i : int Index of the current porcupine current_pop_size : int Current population size u1 : ndarray Binary mask for position updates yt : float Time-dependent factor s : ndarray Random direction vector """ # Exponential scaling based on relative fitness fitness_sum = np.sum(self.fitness[:current_pop_size]) if np.isfinite(self.fitness[i]) and fitness_sum > 0: st = np.exp(self.fitness[i] / (fitness_sum + np.finfo(float).eps)) else: st = 1.0 # Default scaling if fitness is invalid s = s * yt * st # Update with random position and scaled difference rand_diff = (self.positions[np.random.randint(current_pop_size)] - self.positions[np.random.randint(current_pop_size)]) self.positions[i] = ((1 - u1) * self.positions[i] + u1 * (self.positions[np.random.randint(current_pop_size)] + st * rand_diff - s)) def _apply_fourth_defense(self, i, current_pop_size, u2, yt, s): """Apply fourth defense mechanism (physical attack) Parameters ---------- i : int Index of the current porcupine current_pop_size : int Current population size u2 : ndarray Random direction vector yt : float Time-dependent factor s : ndarray Random direction vector """ # Exponential movement factor fitness_sum = np.sum(self.fitness[:current_pop_size]) if np.isfinite(self.fitness[i]) and fitness_sum > 0: mt = np.exp(self.fitness[i] / (fitness_sum + np.finfo(float).eps)) else: mt = 1.0 # Default scaling if fitness is invalid vt = self.positions[i] vtp = self.positions[np.random.randint(current_pop_size)] ft = np.random.random(self.dimensions) * (mt * (-vt + vtp)) s = s * yt * ft # Update toward global best with convergence rate r2 = np.random.random() self.positions[i] = (self.best_pos + (self.alpha * (1 - r2) + r2) * (u2 * self.best_pos - self.positions[i]) - s)
[docs] def optimize( self, objective_func: Callable, f_ieqcons: Optional[Callable] = None, n_processes: Optional[int] = None, verbose: bool = False, track_history: bool = True, **kwargs ) -> Tuple[np.ndarray, float, np.ndarray]: """Optimize the objective function using Crested Porcupine Optimizer Parameters ---------- objective_func : callable The objective function to be minimized f_ieqcons : callable, optional Constraint function returning a 1D array of inequality constraints (g(x) >= 0) n_processes : int, optional Number of processes to use for parallel evaluation verbose : bool, optional Whether to display progress information **kwargs Additional arguments to pass to the objective function Returns ------- tuple A tuple containing (best_position, best_cost, cost_history) """ # Reset the optimizer self.reset() self._init_population() # Reset history tracking if track_history: self.positions_history = [] self.defense_types_history = [] self.pop_size_history = [] self.best_positions_history = [] self.fitness_history = [] # Setup Pool of processes for parallel evaluation pool = None if n_processes is None else mp.Pool(n_processes) # Initial evaluation self._evaluate_population(objective_func, f_ieqcons, pool, **kwargs) self._update_best_solution() # Initialize convergence tracking ftol_history = deque(maxlen=self.ftol_iter) cost_history = np.zeros(self.max_iter) # Main optimization loop for t in range(self.max_iter): # Calculate current population size current_pop_size = self._calculate_current_pop_size(t) # Initialize defense types tracking for this iteration if track_history: defense_types = [""] * current_pop_size # Update each porcupine for i in range(current_pop_size): # Random binary mask for position updates u1 = np.random.random(self.dimensions) > np.random.random() if np.random.random() < np.random.random(): # Exploration phase if np.random.random() < np.random.random(): # First defense mechanism (sight) self._apply_first_defense(i, current_pop_size) if track_history: defense_types[i] = "sight" else: # Second defense mechanism (sound) self._apply_second_defense(i, current_pop_size, u1) if track_history: defense_types[i] = "sound" else: # Exploitation phase # Time-dependent factor for scaling updates yt = 2 * np.random.random() * (1 - t / self.max_iter) ** (t / self.max_iter) # Random direction vector u2 = (np.random.random(self.dimensions) < 0.5) * 2 - 1 s = np.random.random() * u2 if np.random.random() < self.tf: # Third defense mechanism (odor) self._apply_third_defense(i, current_pop_size, u1, yt, s) if track_history: defense_types[i] = "odor" else: # Fourth defense mechanism (physical attack) self._apply_fourth_defense(i, current_pop_size, u2, yt, s) if track_history: defense_types[i] = "physical" # Clip positions to bounds lb, ub = self.bounds self.positions[i] = np.clip(self.positions[i], lb, ub) # Evaluate new fitness prev_best_cost = self.best_cost self._evaluate_population(objective_func, f_ieqcons, pool, **kwargs) # Update personal and global bests for i in range(current_pop_size): if self.fitness[i] < objective_func(self.personal_best_pos[i], **kwargs): self.personal_best_pos[i] = self.positions[i].copy() self._update_best_solution() # Trim population if size reduced if current_pop_size < len(self.positions): self.positions = self.positions[:current_pop_size].copy() self.personal_best_pos = self.personal_best_pos[:current_pop_size].copy() self.fitness = self.fitness[:current_pop_size].copy() # Update history cost_history[t] = self.best_cost hist = self.ToHistory( best_cost=self.best_cost, mean_cost=np.mean(self.fitness), position=self.positions.copy(), population_size=current_pop_size ) self._populate_history(hist) # Store detailed history for visualization if track_history: self.positions_history.append(self.positions[:current_pop_size].copy()) self.defense_types_history.append(defense_types) self.pop_size_history.append(current_pop_size) self.best_positions_history.append(self.best_pos.copy()) self.fitness_history.append(self.fitness[:current_pop_size].copy()) # Check convergence relative_measure = self.ftol * (1 + np.abs(prev_best_cost)) delta = np.abs(self.best_cost - prev_best_cost) < relative_measure if t < self.ftol_iter: ftol_history.append(delta) else: ftol_history.append(delta) if all(ftol_history): break # Print progress if verbose if verbose: print(f"Iteration {t+1}/{self.max_iter}: Best Cost = {self.best_cost:.6f}, " f"Population Size = {current_pop_size}") # Close Pool of Processes if n_processes is not None: pool.close() return self.best_pos, self.best_cost, np.array(self.cost_history)