運籌系列19:scheduling模型與python程式碼求解
阿新 • • 發佈:2018-11-24
1. 問題模型
scheduling問題比assignment問題又要複雜很多。在排程問題中,除了要考慮任務分配外,還要考慮時間約束。
來看一個官方例子:job shop problem。資料如下:
job 0 = [(0, 3), (1, 2), (2, 2)]
job 1 = [(0, 2), (2, 1), (1, 4)]
job 2 = [(1, 4), (2, 3)]
有3臺機器編號0、1、2。job資料總每個tuple的第一個數字表示機器編號,第二個數字表示加工小時數,下圖是方案之一。使用CP求最短需要花費的時間。
2. 求解程式碼
使用CP模型進行求解。ortools可以建立NewIntervalVar(start_var, length, end_var)型別的變數,並且可以呼叫cp_model.AddNoOverlap簡單新增無重疊的約束。
在上述問題中,約束條件主要有兩個:
- 每臺機器上分配的任務的interval不能重疊(同一時間只能加工一個產品)
- 每個產品分配的機器有前後順序
目標函式是min{max{interval.end_var}}。可以簡單呼叫cp_model.AddMaxEquality來進行表示。
python程式碼如下:
import collections
from ortools.sat.python import cp_model
# Create the model.
model = cp_model.CpModel()
jobs_data = [ # task = (machine_id, processing_time).
[(0, 3), (1, 2), (2, 2)], # Job0
[(0, 2), (2, 1), (1, 4)], # Job1
[(1, 4), (2, 3)] # Job2
]
machines_count = 1 + max(task[0] for job in jobs_data for task in job)
all_machines = range(machines_count)
jobs_count = len(jobs_data)
all_jobs = range(jobs_count)
# Compute horizon.
horizon = sum (task[1] for job in jobs_data for task in job)
task_type = collections.namedtuple('task_type', 'start end interval')
assigned_task_type = collections.namedtuple('assigned_task_type',
'start job index')
# Create jobs.
all_tasks = {}
for job in all_jobs:
for task_id, task in enumerate(jobs_data[job]):
start_var = model.NewIntVar(0, horizon,
'start_%i_%i' % (job, task_id))
duration = task[1]
end_var = model.NewIntVar(0, horizon, 'end_%i_%i' % (job, task_id))
interval_var = model.NewIntervalVar(
start_var, duration, end_var, 'interval_%i_%i' % (job, task_id))
all_tasks[job, task_id] = task_type(
start=start_var, end=end_var, interval=interval_var)
# Create and add disjunctive constraints.
for machine in all_machines:
intervals = []
for job in all_jobs:
for task_id, task in enumerate(jobs_data[job]):
if task[0] == machine:
intervals.append(all_tasks[job, task_id].interval)
model.AddNoOverlap(intervals)
# Add precedence contraints.
for job in all_jobs:
for task_id in range(0, len(jobs_data[job]) - 1):
model.Add(all_tasks[job, task_id +
1].start >= all_tasks[job, task_id].end)
# Makespan objective.
obj_var = model.NewIntVar(0, horizon, 'makespan')
model.AddMaxEquality(
obj_var,
[all_tasks[(job, len(jobs_data[job]) - 1)].end for job in all_jobs])
model.Minimize(obj_var)
# Solve model.
solver = cp_model.CpSolver()
status = solver.Solve(model)
if status == cp_model.OPTIMAL:
# Print out makespan.
print('Optimal Schedule Length: %i' % solver.ObjectiveValue())
print()
# Create one list of assigned tasks per machine.
assigned_jobs = [[] for _ in all_machines]
for job in all_jobs:
for task_id, task in enumerate(jobs_data[job]):
machine = task[0]
assigned_jobs[machine].append(
assigned_task_type(
start=solver.Value(all_tasks[job, task_id].start),
job=job,
index=task_id))
disp_col_width = 10
sol_line = ''
sol_line_tasks = ''
print('Optimal Schedule', '\n')
for machine in all_machines:
# Sort by starting time.
assigned_jobs[machine].sort()
sol_line += 'Machine ' + str(machine) + ': '
sol_line_tasks += 'Machine ' + str(machine) + ': '
for assigned_task in assigned_jobs[machine]:
name = 'job_%i_%i' % (assigned_task.job, assigned_task.index)
# Add spaces to output to align columns.
sol_line_tasks += name + ' ' * (disp_col_width - len(name))
start = assigned_task.start
duration = jobs_data[assigned_task.job][assigned_task.index][1]
sol_tmp = '[%i,%i]' % (start, start + duration)
# Add spaces to output to align columns.
sol_line += sol_tmp + ' ' * (disp_col_width - len(sol_tmp))
sol_line += '\n'
sol_line_tasks += '\n'
print(sol_line_tasks)
print('Task Time Intervals\n')
print(sol_line)
輸出如下:
Optimal Schedule Length: 11
Optimal Schedule
Machine 0: Job_0_0 Job_1_0
Machine 1: Job_2_0 Job_0_1 Job_1_2
Machine 2: Job_1_1 Job_0_2 Job_2_1
Task Time Intervals
Machine 0: [0,3] [3,5]
Machine 1: [0,4] [4,6] [6,10]
Machine 2: [5,6] [6,8] [8,11]
如下圖,用時11小時。