Skip to main content

Asynchronous Solving

The Solver class provides an asynchronous API with callbacks for monitoring search progress, capturing solutions as they're found, and controlling solver execution.

Basic Async Solve

Create a Solver instance and use it to solve models:

import optalcp as cp

solver = cp.Solver()
result = await solver.solve(model, params)

The Solver class provides everything model.solve() does, plus:

  • Callbacks for solutions, bounds, logs, warnings, and errors
  • Solution injection during search
  • Cancellation
Event Loop

Do not block the event loop while the solver is running. Communication with the solver process happens through the event loop—if blocked, messages cannot be sent or received, and callbacks cannot fire.

Solution Callback

Receive notifications when solutions are found:

import optalcp as cp

solver = cp.Solver()

def on_solution(event: cp.SolutionEvent):
print(f"[{event.solve_time:.2f}s] Solution: {event.solution.get_objective()}")

# Access solution values
for var in model.get_interval_vars():
if event.solution.is_present(var):
start = event.solution.get_start(var)
end = event.solution.get_end(var)
print(f" {var.name}: {start} -> {end}")

solver.on_solution = on_solution

result = await solver.solve(model, params)

SolutionEvent

The SolutionEvent is passed to the solution callback when a new solution is found:

class SolutionEvent:
solve_time: float # Time when solution was found (seconds)
solution: Solution # The solution object
valid: bool | None # Verification result (see below)

The valid field is populated when the verifySolution parameter is enabled. It is true if verification passed, or None / undefined if verification was not enabled.

Objective Bound Callback

Track improvements to the objective bound:

import optalcp as cp

solver = cp.Solver()

def on_bound(event: cp.ObjectiveBoundEntry):
print(f"[{event.solve_time:.2f}s] Bound: {event.value}")

solver.on_objective_bound = on_bound

result = await solver.solve(model, params)

ObjectiveBoundEntry

The ObjectiveBoundEntry is passed to the bound callback when the objective bound improves:

class ObjectiveBoundEntry:
solve_time: float # Time when bound was proved (seconds)
value: int # Bound value

The bound is a lower bound for minimization problems, upper bound for maximization.

Log, Warning, and Error Callbacks

Capture log messages, warnings, and errors:

solver = cp.Solver()

# Regular log messages
solver.on_log = lambda msg: print(f"LOG: {msg}")

# Warnings
solver.on_warning = lambda msg: print(f"WARNING: {msg}")

# Errors
solver.on_error = lambda msg: print(f"ERROR: {msg}")

result = await solver.solve(model, params)

All callbacks receive a single string parameter.

Callbacks vs printLog

These callbacks fire regardless of the printLog setting. Even with printLog: false, log, warning, and error events are still emitted—they just aren't written to any stream by default.

To reduce the number of log events at the source, use logLevel. With logLevel: 0, only warning and error events are emitted.

Error Handling

Errors fall into two categories:

  1. Recoverable errors: The solver reports the error through the callback, then continues and reports "no solution". These are model or configuration problems.

  2. Solver crashes: If the solver process terminates unexpectedly, an exception is thrown from solve().

Errors reported through callbacks accumulate and are thrown together as a single exception when solve() completes.

Summary Callback

Receive final summary when solving completes:

import optalcp as cp

solver = cp.Solver()

def on_summary(summary: cp.SolveSummary):
print(f"Completed in {summary.duration:.2f}s")
print(f"Solutions found: {summary.nb_solutions}")
print(f"Branches: {summary.nb_branches}")
print(f"Proof: {summary.proof}")

solver.on_summary = on_summary

result = await solver.solve(model, params)

SolveSummary

The SolveSummary is passed to the summary callback when solving completes:

class SolveSummary:
# Search statistics
nb_solutions: int # Number of solutions found
proof: bool # Optimality/infeasibility proved?
duration: float # Total solve time (seconds)
nb_branches: int # Search tree branches
nb_fails: int # Failed search nodes
nb_lns_steps: int # LNS iterations
nb_restarts: int # Search restarts
memory_used: int # Memory used (bytes)

# Objective
objective: int | None # Best objective value (None if no solution)
objective_bound: int | None # Proved bound (None if not proved)
objective_sense: str | None # 'minimize', 'maximize', or None

# Model info
nb_interval_vars: int # Number of IntervalVars
nb_constraints: int # Number of constraints

# Environment
solver: str # Solver version (e.g., "OptalCP 1.0.0")
actual_workers: int # Workers used
cpu: str # CPU model

SolveSummary contains the same fields as SolveResult, minus the solution object and history arrays.

Cancellation

Stop the solver during execution:

import asyncio
import optalcp as cp

solver = cp.Solver()

# Start solve in background
solve_task = asyncio.create_task(solver.solve(model, params))

# Wait a bit
await asyncio.sleep(5)

# Stop the solver
solver.stop("Manual cancellation")

# Wait for result
result = await solve_task

print(f"Stopped after {result.duration:.2f}s")
print(f"Best solution: {result.objective}")

The stop() method only initiates the stop—it returns immediately without waiting for the solver to actually terminate. The solver will stop as soon as possible and solve() will return with the best solution found so far.

Calling stop() when the solver is not running is silently ignored. Calling stop() a second time while the solver is still shutting down will forcefully kill the solver subprocess.

Complete Callback Example

import optalcp as cp

solver = cp.Solver()

# Track search progress
best_objective = None
best_bound = None

def on_solution(event):
global best_objective
best_objective = event.solution.get_objective()
gap = "N/A"
if best_bound is not None:
gap_value = abs(best_objective - best_bound)
gap_pct = 100 * gap_value / abs(best_objective) if best_objective != 0 else 0
gap = f"{gap_value} ({gap_pct:.1f}%)"
print(f"[{event.solve_time:.2f}s] Solution: {best_objective}, Gap: {gap}")

def on_bound(event):
global best_bound
best_bound = event.value
gap = "N/A"
if best_objective is not None:
gap_value = abs(best_objective - best_bound)
gap_pct = 100 * gap_value / abs(best_objective) if best_objective != 0 else 0
gap = f"{gap_value} ({gap_pct:.1f}%)"
print(f"[{event.solve_time:.2f}s] Bound: {best_bound}, Gap: {gap}")

def on_summary(summary):
print(f"\nSearch complete:")
print(f" Time: {summary.duration:.2f}s")
print(f" Solutions: {summary.nb_solutions}")
print(f" Branches: {summary.nb_branches}")
print(f" Optimal: {summary.proof}")

solver.on_solution = on_solution
solver.on_objective_bound = on_bound
solver.on_summary = on_summary

params: cp.Parameters = {'timeLimit': 60, 'logLevel': 0} # suppresses logs (not warnings/errors)
result = await solver.solve(model, params)

if result.solution is not None:
print(f"\nFinal objective: {result.objective}")

Async Callbacks

Callbacks can be async functions:

import asyncio
import optalcp as cp

solver = cp.Solver()

async def on_solution(event):
# Do async work
await asyncio.sleep(0.1)
print(f"Solution: {event.solution.get_objective()}")

solver.on_solution = on_solution

result = await solver.solve(model, params)

The solver doesn't wait for async callbacks to complete. Callbacks run concurrently with the search.

In Python, an unhandled exception in an async callback propagates and terminates the solve.

Web Service Example

Using callbacks in a web service:

from fastapi import FastAPI, WebSocket
import optalcp as cp

app = FastAPI()

@app.websocket("/solve")
async def solve_endpoint(websocket: WebSocket):
await websocket.accept()

# Receive model from client
model_json = await websocket.receive_text()
model, params, _ = cp.Model.from_json(model_json)

# Create solver with callbacks
solver = cp.Solver()

async def on_solution(event):
await websocket.send_json({
"type": "solution",
"objective": event.solution.get_objective(),
"time": event.solve_time
})

async def on_bound(event):
await websocket.send_json({
"type": "bound",
"value": event.value,
"time": event.solve_time
})

solver.on_solution = on_solution
solver.on_objective_bound = on_bound

# Solve with logLevel=0 to suppress log output
if params is None:
params = {}
params['logLevel'] = 0
result = await solver.solve(model, params)

# Send final result
await websocket.send_json({
"type": "complete",
"objective": result.objective,
"proof": result.proof,
"duration": result.duration
})

All Callbacks Summary

CallbackEvent TypeWhen FiredUse Case
on_solution / onSolutionSolutionEventNew solution foundDisplay progress, save intermediate solutions
on_objective_bound / onObjectiveBoundObjectiveBoundEntryBound improvedTrack optimality gap
on_log / onLogstringLog messageCustom logging, filtering
on_warning / onWarningstringWarning issuedAlert on model issues
on_error / onErrorstringError occurredError handling
on_summary / onSummarySolveSummarySolve completeFinal statistics

See Also