1. 程式人生 > >運籌系列19:scheduling模型與python程式碼求解

運籌系列19:scheduling模型與python程式碼求解

1. 問題模型

scheduling問題比assignment問題又要複雜很多。在排程問題中,除了要考慮任務分配外,還要考慮時間約束。
來看一個官方例子:job shop problem。資料如下:
job 0 = [(0, 3), (1, 2), (2, 2)]
job 1 = [(0, 2), (2, 1), (1, 4)]
job 2 = [(1, 4), (2, 3)]
有3臺機器編號0、1、2。job資料總每個tuple的第一個數字表示機器編號,第二個數字表示加工小時數,下圖是方案之一。使用CP求最短需要花費的時間。
在這裡插入圖片描述

2. 求解程式碼

使用CP模型進行求解。ortools可以建立NewIntervalVar(start_var, length, end_var)型別的變數,並且可以呼叫cp_model.AddNoOverlap簡單新增無重疊的約束。
在上述問題中,約束條件主要有兩個:

  • 每臺機器上分配的任務的interval不能重疊(同一時間只能加工一個產品)
  • 每個產品分配的機器有前後順序

目標函式是min{max{interval.end_var}}。可以簡單呼叫cp_model.AddMaxEquality來進行表示。
python程式碼如下:

import collections
from ortools.sat.python import cp_model
# Create the model.
model = cp_model.CpModel()

jobs_data = [  # task = (machine_id, processing_time).
[(0, 3), (1, 2), (2, 2)], # Job0 [(0, 2), (2, 1), (1, 4)], # Job1 [(1, 4), (2, 3)] # Job2 ] machines_count = 1 + max(task[0] for job in jobs_data for task in job) all_machines = range(machines_count) jobs_count = len(jobs_data) all_jobs = range(jobs_count) # Compute horizon. horizon = sum
(task[1] for job in jobs_data for task in job) task_type = collections.namedtuple('task_type', 'start end interval') assigned_task_type = collections.namedtuple('assigned_task_type', 'start job index') # Create jobs. all_tasks = {} for job in all_jobs: for task_id, task in enumerate(jobs_data[job]): start_var = model.NewIntVar(0, horizon, 'start_%i_%i' % (job, task_id)) duration = task[1] end_var = model.NewIntVar(0, horizon, 'end_%i_%i' % (job, task_id)) interval_var = model.NewIntervalVar( start_var, duration, end_var, 'interval_%i_%i' % (job, task_id)) all_tasks[job, task_id] = task_type( start=start_var, end=end_var, interval=interval_var) # Create and add disjunctive constraints. for machine in all_machines: intervals = [] for job in all_jobs: for task_id, task in enumerate(jobs_data[job]): if task[0] == machine: intervals.append(all_tasks[job, task_id].interval) model.AddNoOverlap(intervals) # Add precedence contraints. for job in all_jobs: for task_id in range(0, len(jobs_data[job]) - 1): model.Add(all_tasks[job, task_id + 1].start >= all_tasks[job, task_id].end) # Makespan objective. obj_var = model.NewIntVar(0, horizon, 'makespan') model.AddMaxEquality( obj_var, [all_tasks[(job, len(jobs_data[job]) - 1)].end for job in all_jobs]) model.Minimize(obj_var) # Solve model. solver = cp_model.CpSolver() status = solver.Solve(model) if status == cp_model.OPTIMAL: # Print out makespan. print('Optimal Schedule Length: %i' % solver.ObjectiveValue()) print() # Create one list of assigned tasks per machine. assigned_jobs = [[] for _ in all_machines] for job in all_jobs: for task_id, task in enumerate(jobs_data[job]): machine = task[0] assigned_jobs[machine].append( assigned_task_type( start=solver.Value(all_tasks[job, task_id].start), job=job, index=task_id)) disp_col_width = 10 sol_line = '' sol_line_tasks = '' print('Optimal Schedule', '\n') for machine in all_machines: # Sort by starting time. assigned_jobs[machine].sort() sol_line += 'Machine ' + str(machine) + ': ' sol_line_tasks += 'Machine ' + str(machine) + ': ' for assigned_task in assigned_jobs[machine]: name = 'job_%i_%i' % (assigned_task.job, assigned_task.index) # Add spaces to output to align columns. sol_line_tasks += name + ' ' * (disp_col_width - len(name)) start = assigned_task.start duration = jobs_data[assigned_task.job][assigned_task.index][1] sol_tmp = '[%i,%i]' % (start, start + duration) # Add spaces to output to align columns. sol_line += sol_tmp + ' ' * (disp_col_width - len(sol_tmp)) sol_line += '\n' sol_line_tasks += '\n' print(sol_line_tasks) print('Task Time Intervals\n') print(sol_line)

輸出如下:

Optimal Schedule Length: 11

Optimal Schedule

Machine 0: Job_0_0   Job_1_0
Machine 1: Job_2_0   Job_0_1   Job_1_2
Machine 2: Job_1_1   Job_0_2   Job_2_1

Task Time Intervals

Machine 0: [0,3]     [3,5]
Machine 1: [0,4]     [4,6]     [6,10]
Machine 2: [5,6]     [6,8]     [8,11]

如下圖,用時11小時。
在這裡插入圖片描述