Advanced Optimization Examples
This guide demonstrates advanced optimization techniques using Porcupy, including multi-objective optimization, dynamic optimization, and hybrid approaches.
Table of Contents
Multi-Objective Optimization
Weighted Sum Approach
import numpy as np
from porcupy import CPO
def multi_objective_optimization():
# Define two competing objectives
def objective1(x):
return np.sum(x**2) # Minimize distance from origin
def objective2(x):
return np.sum((x - 2)**2) # Minimize distance from [2, 2, ...]
# Generate Pareto front by varying weights
n_weights = 10
pareto_front = []
for w in np.linspace(0, 1, n_weights):
def weighted_objective(x):
return w * objective1(x) + (1 - w) * objective2(x)
optimizer = CPO(dimensions=3, bounds=([-5]*3, [5]*3), max_iter=100)
best_solution, best_fitness, _ = optimizer.optimize(weighted_objective)
# Store both objectives
pareto_front.append((objective1(best_solution), objective2(best_solution)))
return np.array(pareto_front)
Constraint-Based Multi-Objective
def constrained_multi_objective():
def objective1(x):
return np.sum(x**2)
def objective2(x):
return np.sum((x - 1)**2)
def constraint(x):
return np.sum(x) - 1 # sum(x) >= 1
# Optimize first objective with constraint
optimizer = CPO(dimensions=3, bounds=([0]*3, [1]*3))
x1, f1, _ = optimizer.optimize(
objective_func=objective1,
f_ieqcons=constraint
)
# Optimize second objective with constraint
x2, f2, _ = optimizer.optimize(
objective_func=objective2,
f_ieqcons=constraint
)
return {"obj1": (x1, f1), "obj2": (x2, f2)}
Dynamic Optimization
Time-Varying Objective
def dynamic_optimization():
class DynamicObjective:
def __init__(self):
self.time = 0
self.period = 50 # Change every 50 iterations
def __call__(self, x):
phase = 2 * np.pi * (self.time // self.period)
offset = 5 * np.sin(phase) # Moving target
return np.sum((x - offset)**2)
def update(self):
self.time += 1
# Initialize
obj = DynamicObjective()
optimizer = CPO(dimensions=2, bounds=([-10]*2, [10]*2), max_iter=200)
# Track best solutions over time
solutions = []
# Run optimization with dynamic updates
for _ in range(4): # 4 dynamic updates
x, f, _ = optimizer.optimize(obj)
solutions.append({
'time': obj.time,
'solution': x.copy(),
'fitness': f
})
# Update the objective
obj.update()
# Re-initialize population around current best
optimizer.initialize_population_around(x, radius=2.0)
return solutions
Hybrid Optimization
CPO with Local Search
from scipy.optimize import minimize
def hybrid_cpo_local_search():
# Define objective (Rastrigin function)
def rastrigin(x):
n = len(x)
return 10*n + np.sum(x**2 - 10*np.cos(2*np.pi*x))
# Global search with CPO
optimizer = CPO(
dimensions=5,
bounds=([-5.12]*5, [5.12]*5),
pop_size=50,
max_iter=50
)
x0, f0, _ = optimizer.optimize(rastrigin)
# Local refinement with L-BFGS-B
result = minimize(
rastrigin,
x0,
method='L-BFGS-B',
bounds=[(-5.12, 5.12)]*5,
options={'maxiter': 100}
)
return {
'cpo_solution': (x0, f0),
'refined_solution': (result.x, result.fun)
}
Surrogate-Assisted Optimization
Using Gaussian Process Surrogate
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel
def surrogate_assisted_optimization():
# Initialize surrogate model
kernel = ConstantKernel(1.0) * RBF(length_scale=1.0)
surrogate = GaussianProcessRegressor(kernel=kernel)
# Initial samples (Latin Hypercube Sampling)
n_initial = 20
X_train = np.random.uniform(-5, 5, (n_initial, 2))
y_train = np.array([ackley(x) for x in X_train])
# Main optimization loop
n_iterations = 10
for i in range(n_iterations):
# Update surrogate model
surrogate.fit(X_train, y_train)
# Optimize acquisition function (expected improvement)
def acquisition(x):
x = np.array(x).reshape(1, -1)
mu, sigma = surrogate.predict(x, return_std=True)
return -(mu - 0.1 * sigma) # Exploration-exploitation trade-off
optimizer = CPO(dimensions=2, bounds=([-5, -5], [5, 5]))
x_next, _, _ = optimizer.optimize(acquisition)
# Evaluate true objective
y_next = ackley(x_next)
# Update training data
X_train = np.vstack([X_train, x_next])
y_train = np.append(y_train, y_next)
# Return best solution found
best_idx = np.argmin(y_train)
return X_train[best_idx], y_train[best_idx]
Distributed Optimization
Parallel Evaluation with Dask
def distributed_optimization():
from dask.distributed import Client
# Start a local Dask cluster
client = Client()
# Define objective function
def expensive_objective(x):
import time
time.sleep(0.1) # Simulate expensive computation
return np.sum(x**2)
# Scatter data to workers
x_scattered = client.scatter([np.random.rand(10) for _ in range(100)])
# Define parallel evaluation function
def evaluate_population(positions):
futures = [client.submit(expensive_objective, x) for x in positions]
return np.array(client.gather(futures))
# Initialize optimizer
optimizer = CPO(
dimensions=10,
bounds=([0]*10, [1]*10),
pop_size=100,
max_iter=20
)
# Run optimization with parallel evaluation
best_solution, best_fitness, _ = optimizer.optimize(
objective_func=None, # Will use parallel evaluation
parallel_func=evaluate_population
)
client.close()
return best_solution, best_fitness
Custom Optimization Loop
Manual Iteration Control
def custom_optimization_loop():
# Initialize optimizer
optimizer = CPO(
dimensions=5,
bounds=([-5]*5, [5]*5),
pop_size=30,
max_iter=100
)
# Manual iteration loop
best_fitness_history = []
# Initialize population
optimizer.initialize_population()
for iteration in range(optimizer.max_iter):
# Evaluate current population
fitness = np.array([ackley(ind) for ind in optimizer.positions])
# Update best solution
best_idx = np.argmin(fitness)
best_fitness_history.append(fitness[best_idx])
# Print progress
if iteration % 10 == 0:
print(f"Iteration {iteration}: Best fitness = {fitness[best_idx]:.6f}")
# Update population
optimizer.step(fitness)
# Custom termination condition
if iteration > 10 and (best_fitness_history[-11] - best_fitness_history[-1]) < 1e-6:
print(f"Early stopping at iteration {iteration}")
break
# Get final results
best_idx = np.argmin([ackley(ind) for ind in optimizer.positions])
best_solution = optimizer.positions[best_idx]
best_fitness = ackley(best_solution)
return best_solution, best_fitness, best_fitness_history
Advanced Constraint Handling
Adaptive Constraint Handling
def adaptive_constraint_handling():
# Define objective and constraints
def objective(x):
return x[0]**2 + x[1]**2 + x[2]**2
def constraint1(x):
return x[0] + x[1] + x[2] - 1 # x + y + z >= 1
def constraint2(x):
return 2 - x[0] - x[1] # x + y <= 2
# Initialize optimizer with adaptive penalty
class AdaptivePenalty:
def __init__(self, initial_penalty=1.0, max_penalty=1e6):
self.penalty = initial_penalty
self.max_penalty = max_penalty
self.best_feasible = None
self.best_violation = float('inf')
def __call__(self, x):
# Calculate constraint violations
c1 = max(0, -constraint1(x)) # c1 >= 0
c2 = max(0, constraint2(x)) # c2 <= 0
violation = c1 + c2
# Update best feasible solution
if violation < 1e-6: # Feasible solution
if self.best_feasible is None or objective(x) < objective(self.best_feasible):
self.best_feasible = x.copy()
# Update best violation
if violation > 0 and violation < self.best_violation:
self.best_violation = violation
self.best_infeasible = x.copy()
# Apply penalty
return objective(x) + self.penalty * violation**2
def update_penalty(self):
# Increase penalty if no feasible solution found
if self.best_feasible is None:
self.penalty = min(self.penalty * 10, self.max_penalty)
# Initialize
penalty_func = AdaptivePenalty()
optimizer = CPO(dimensions=3, bounds=([0]*3, [5]*3), max_iter=200)
# Main optimization loop
for _ in range(5): # 5 penalty updates
x, f, _ = optimizer.optimize(penalty_func)
print(f"Penalty: {penalty_func.penalty}, Best feasible: {penalty_func.best_feasible is not None}")
penalty_func.update_penalty()
# Re-initialize population
if penalty_func.best_feasible is not None:
optimizer.initialize_population_around(penalty_func.best_feasible, radius=0.5)
else:
optimizer.initialize_population_around(penalty_func.best_infeasible, radius=0.5)
# Return best solution found
if penalty_func.best_feasible is not None:
return penalty_func.best_feasible, objective(penalty_func.best_feasible)
else:
return penalty_func.best_infeasible, objective(penalty_func.best_infeasible)
Performance Tips for Advanced Usage
For High-Dimensional Problems:
Use GPU acceleration for dimensions > 20
Consider using dimension reduction techniques
Increase population size with dimension
For Noisy Objectives:
Use multiple evaluations per solution
Implement resampling strategies
Consider surrogate modeling
For Constrained Problems:
Use adaptive penalty methods
Consider feasibility rules
Implement repair operators for infeasible solutions
For Multi-Modal Problems:
Use niching or crowding techniques
Implement restart strategies
Consider island models for parallel exploration
For Expensive Objectives:
Use surrogate models
Implement efficient parallel evaluation
Consider early stopping for poor solutions