Advanced Optimization Examples

This guide demonstrates advanced optimization techniques using Porcupy, including multi-objective optimization, dynamic optimization, and hybrid approaches.

Table of Contents

Multi-Objective Optimization

Weighted Sum Approach

import numpy as np
from porcupy import CPO

def multi_objective_optimization():
    # Define two competing objectives
    def objective1(x):
        return np.sum(x**2)  # Minimize distance from origin
    
    def objective2(x):
        return np.sum((x - 2)**2)  # Minimize distance from [2, 2, ...]
    
    # Generate Pareto front by varying weights
    n_weights = 10
    pareto_front = []
    
    for w in np.linspace(0, 1, n_weights):
        def weighted_objective(x):
            return w * objective1(x) + (1 - w) * objective2(x)
        
        optimizer = CPO(dimensions=3, bounds=([-5]*3, [5]*3), max_iter=100)
        best_solution, best_fitness, _ = optimizer.optimize(weighted_objective)
        
        # Store both objectives
        pareto_front.append((objective1(best_solution), objective2(best_solution)))
    
    return np.array(pareto_front)

Constraint-Based Multi-Objective

def constrained_multi_objective():
    def objective1(x):
        return np.sum(x**2)
    
    def objective2(x):
        return np.sum((x - 1)**2)
    
    def constraint(x):
        return np.sum(x) - 1  # sum(x) >= 1
    
    # Optimize first objective with constraint
    optimizer = CPO(dimensions=3, bounds=([0]*3, [1]*3))
    x1, f1, _ = optimizer.optimize(
        objective_func=objective1,
        f_ieqcons=constraint
    )
    
    # Optimize second objective with constraint
    x2, f2, _ = optimizer.optimize(
        objective_func=objective2,
        f_ieqcons=constraint
    )
    
    return {"obj1": (x1, f1), "obj2": (x2, f2)}

Dynamic Optimization

Time-Varying Objective

def dynamic_optimization():
    class DynamicObjective:
        def __init__(self):
            self.time = 0
            self.period = 50  # Change every 50 iterations
        
        def __call__(self, x):
            phase = 2 * np.pi * (self.time // self.period)
            offset = 5 * np.sin(phase)  # Moving target
            return np.sum((x - offset)**2)
        
        def update(self):
            self.time += 1
    
    # Initialize
    obj = DynamicObjective()
    optimizer = CPO(dimensions=2, bounds=([-10]*2, [10]*2), max_iter=200)
    
    # Track best solutions over time
    solutions = []
    
    # Run optimization with dynamic updates
    for _ in range(4):  # 4 dynamic updates
        x, f, _ = optimizer.optimize(obj)
        solutions.append({
            'time': obj.time,
            'solution': x.copy(),
            'fitness': f
        })
        
        # Update the objective
        obj.update()
        
        # Re-initialize population around current best
        optimizer.initialize_population_around(x, radius=2.0)
    
    return solutions

Hybrid Optimization

Surrogate-Assisted Optimization

Using Gaussian Process Surrogate

from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel

def surrogate_assisted_optimization():
    # Initialize surrogate model
    kernel = ConstantKernel(1.0) * RBF(length_scale=1.0)
    surrogate = GaussianProcessRegressor(kernel=kernel)
    
    # Initial samples (Latin Hypercube Sampling)
    n_initial = 20
    X_train = np.random.uniform(-5, 5, (n_initial, 2))
    y_train = np.array([ackley(x) for x in X_train])
    
    # Main optimization loop
    n_iterations = 10
    for i in range(n_iterations):
        # Update surrogate model
        surrogate.fit(X_train, y_train)
        
        # Optimize acquisition function (expected improvement)
        def acquisition(x):
            x = np.array(x).reshape(1, -1)
            mu, sigma = surrogate.predict(x, return_std=True)
            return -(mu - 0.1 * sigma)  # Exploration-exploitation trade-off
        
        optimizer = CPO(dimensions=2, bounds=([-5, -5], [5, 5]))
        x_next, _, _ = optimizer.optimize(acquisition)
        
        # Evaluate true objective
        y_next = ackley(x_next)
        
        # Update training data
        X_train = np.vstack([X_train, x_next])
        y_train = np.append(y_train, y_next)
    
    # Return best solution found
    best_idx = np.argmin(y_train)
    return X_train[best_idx], y_train[best_idx]

Distributed Optimization

Parallel Evaluation with Dask

def distributed_optimization():
    from dask.distributed import Client
    
    # Start a local Dask cluster
    client = Client()
    
    # Define objective function
    def expensive_objective(x):
        import time
        time.sleep(0.1)  # Simulate expensive computation
        return np.sum(x**2)
    
    # Scatter data to workers
    x_scattered = client.scatter([np.random.rand(10) for _ in range(100)])
    
    # Define parallel evaluation function
    def evaluate_population(positions):
        futures = [client.submit(expensive_objective, x) for x in positions]
        return np.array(client.gather(futures))
    
    # Initialize optimizer
    optimizer = CPO(
        dimensions=10,
        bounds=([0]*10, [1]*10),
        pop_size=100,
        max_iter=20
    )
    
    # Run optimization with parallel evaluation
    best_solution, best_fitness, _ = optimizer.optimize(
        objective_func=None,  # Will use parallel evaluation
        parallel_func=evaluate_population
    )
    
    client.close()
    return best_solution, best_fitness

Custom Optimization Loop

Manual Iteration Control

def custom_optimization_loop():
    # Initialize optimizer
    optimizer = CPO(
        dimensions=5,
        bounds=([-5]*5, [5]*5),
        pop_size=30,
        max_iter=100
    )
    
    # Manual iteration loop
    best_fitness_history = []
    
    # Initialize population
    optimizer.initialize_population()
    
    for iteration in range(optimizer.max_iter):
        # Evaluate current population
        fitness = np.array([ackley(ind) for ind in optimizer.positions])
        
        # Update best solution
        best_idx = np.argmin(fitness)
        best_fitness_history.append(fitness[best_idx])
        
        # Print progress
        if iteration % 10 == 0:
            print(f"Iteration {iteration}: Best fitness = {fitness[best_idx]:.6f}")
        
        # Update population
        optimizer.step(fitness)
        
        # Custom termination condition
        if iteration > 10 and (best_fitness_history[-11] - best_fitness_history[-1]) < 1e-6:
            print(f"Early stopping at iteration {iteration}")
            break
    
    # Get final results
    best_idx = np.argmin([ackley(ind) for ind in optimizer.positions])
    best_solution = optimizer.positions[best_idx]
    best_fitness = ackley(best_solution)
    
    return best_solution, best_fitness, best_fitness_history

Advanced Constraint Handling

Adaptive Constraint Handling

def adaptive_constraint_handling():
    # Define objective and constraints
    def objective(x):
        return x[0]**2 + x[1]**2 + x[2]**2
    
    def constraint1(x):
        return x[0] + x[1] + x[2] - 1  # x + y + z >= 1
    
    def constraint2(x):
        return 2 - x[0] - x[1]  # x + y <= 2
    
    # Initialize optimizer with adaptive penalty
    class AdaptivePenalty:
        def __init__(self, initial_penalty=1.0, max_penalty=1e6):
            self.penalty = initial_penalty
            self.max_penalty = max_penalty
            self.best_feasible = None
            self.best_violation = float('inf')
        
        def __call__(self, x):
            # Calculate constraint violations
            c1 = max(0, -constraint1(x))  # c1 >= 0
            c2 = max(0, constraint2(x))    # c2 <= 0
            violation = c1 + c2
            
            # Update best feasible solution
            if violation < 1e-6:  # Feasible solution
                if self.best_feasible is None or objective(x) < objective(self.best_feasible):
                    self.best_feasible = x.copy()
            
            # Update best violation
            if violation > 0 and violation < self.best_violation:
                self.best_violation = violation
                self.best_infeasible = x.copy()
            
            # Apply penalty
            return objective(x) + self.penalty * violation**2
        
        def update_penalty(self):
            # Increase penalty if no feasible solution found
            if self.best_feasible is None:
                self.penalty = min(self.penalty * 10, self.max_penalty)
    
    # Initialize
    penalty_func = AdaptivePenalty()
    optimizer = CPO(dimensions=3, bounds=([0]*3, [5]*3), max_iter=200)
    
    # Main optimization loop
    for _ in range(5):  # 5 penalty updates
        x, f, _ = optimizer.optimize(penalty_func)
        print(f"Penalty: {penalty_func.penalty}, Best feasible: {penalty_func.best_feasible is not None}")
        penalty_func.update_penalty()
        
        # Re-initialize population
        if penalty_func.best_feasible is not None:
            optimizer.initialize_population_around(penalty_func.best_feasible, radius=0.5)
        else:
            optimizer.initialize_population_around(penalty_func.best_infeasible, radius=0.5)
    
    # Return best solution found
    if penalty_func.best_feasible is not None:
        return penalty_func.best_feasible, objective(penalty_func.best_feasible)
    else:
        return penalty_func.best_infeasible, objective(penalty_func.best_infeasible)

Performance Tips for Advanced Usage

  1. For High-Dimensional Problems:

    • Use GPU acceleration for dimensions > 20

    • Consider using dimension reduction techniques

    • Increase population size with dimension

  2. For Noisy Objectives:

    • Use multiple evaluations per solution

    • Implement resampling strategies

    • Consider surrogate modeling

  3. For Constrained Problems:

    • Use adaptive penalty methods

    • Consider feasibility rules

    • Implement repair operators for infeasible solutions

  4. For Multi-Modal Problems:

    • Use niching or crowding techniques

    • Implement restart strategies

    • Consider island models for parallel exploration

  5. For Expensive Objectives:

    • Use surrogate models

    • Implement efficient parallel evaluation

    • Consider early stopping for poor solutions