17 离散事件模拟SimJulia

对于离散事件系统模拟, 比如排队系统的模拟, SimJulia包可以很容易地建模进行模拟。

一般的独立样本抽样、重要性抽样等可以直接利用Julia的随机数发生器模拟。

贝叶斯推断问题一般使用MCMC, Julia的Stan包调用Stan程序库进行MCMC计算, Stan程序会将模型转换为C++然后编译成本地二进制代码运行。

Julia的Mamba包是纯Julia语言进行MCMC模拟的扩展包, 这样模型的描述可以利用Julia程序的所有功能。

17.1 SimJulia介绍

Julia的SimJulia库是一个模仿Python语言SimPy库的软件包,采用过程模拟方法实现了离散事件模拟功能。

离散事件模拟经常用于后勤、社会科学、通信等问题的研究。 设要研究的系统可以用有限个变量的状态来刻画, 而状态仅在有“事件”发生时才发生改变, “事件”仅在离散的时间点上发生, 两个时间点之间状态不改变(或以某种确定性规律改变)。 例如,一个银行的取款机, 其工作状态以及取款的人数、排队人数构成了系统状态, 状态仅在有新顾客到来、顾客开始取款、顾客离开系统时改变。

将SimJulia的离散事件模拟看作一场戏剧表演, 需要一个“环境”作为舞台, 用“过程”作为演员,过程用“可恢复函数”实现, 演员与环境之间、演员之间的交互用“事件”实现。

过程是一种特殊的函数, 声明为@resumable function, 称为过程函数, 这种函数的特点是可以在运行过程中搁置运行, 遇到适当信号后从搁置的位置恢复运行。 SimJulia提供了一个@yield宏命令, 意思是提交一个事件(或多个事件), 并在提交后搁置运行, 在被搁置的时间段内其它过程函数仍可运行。 这些被提交的事件被依次序列在模拟系统的事件队列中, 有模拟系统依序激活这些事件, 从而使得提交了这些事件的过程函数恢复运行。 允许多个过程函数提交同一事件并等待此事件激活。

常见的事件是“超时”(timeout)事件, 这样的事件不立即激活, 从实际效果来看, 提交了超时事件以后, 过程函数将等待或者保持当前状态指定的时间, 然后超时事件被激活, 提交了该超时事件的过程函数恢复运行。 其它类型的事件一般设置为生成时激活。 这些事件都用事件构建函数产生, 事件构建函数以模拟环境作为第一个自变量。

SimJulia另一个重要命令是@process命令, 用意是在模拟系统中注册一个过程函数。 可以在模拟的控制层面用@process注册有限个个模拟任务必要的过程函数。 @process另外一种用法是在一个过程函数中利用循环调用@process, 如果这个过程函数又用了超时事件延迟这些@process的实际运行, 则可以用这种方法产生不停到来的事件列, 比如不停到来的顾客, 产生一个事件“发生器”的效果。

过程可以要求某种资源, 并在资源被其它过程占用时搁置起来等待资源, 一旦资源空闲就自动占用资源并恢复运行。 资源类型有Resources, Containers, Stores。 Resources资源典型例子是银行柜员, 顾客占用后其它顾客只能排队等候。 Containers的例子类似于加油站的油库, 有一定的最大储量, 有当前储量, 可以提取、存入一定数量, 数量可以取连续值或离散值。 Store的典型例子是物品仓库, 可以存入、取出若干物品, 物品可以是同类或不同类, 没有要求先入先出。

这一章的例子主要来自SimJulia的文档, 和SimPy的文档:

using Random, ResumableFunctions, SimJulia, Distributions

17.2 基本使用范例

17.2.1 最小的例子

@yield@process是SimJulia最基本的元素。 @yield在一个“过程函数”中等待一个事件, @process在模拟系统中注册一个过程函数的实例。

设有一辆汽车, 每5小时经过一个路口并留下经过的信息, 经过3次。

@resumable function car(env::Environment)
    println("$(now(env)) Here I am.")
    @yield timeout(env, 5)
    println("$(now(env)) Here I am.")
    @yield timeout(env, 5)
    println("$(now(env)) Here I am.")
end 

上面的程序定义了一个“过程函数”, 仅是定义,不开始运行。 这个函数表面的运行基本上和正常的Julia函数类似, 先显示经过信息, 然后等待5小时并显示第二次经过信息。 然后,再等待5小时,显示第三次经过信息。 后面的例子则要比这个简单例子要难理解一些, 因为在@yield timeout执行等待时一般会有其他的过程函数在运行。

自变量env的类型是模拟环境Environment, 一般是一个Simulation对象。 所有的过程函数都应该以模拟环境为第一个自变量。

now(env)返回模拟运行中间的模拟时钟值, 是虚拟的而不是真实的时间。 模拟时钟值是无量纲的, 可以看成是任意的时间单位。

下面的程序构建模拟运行环境(戏剧演出舞台):

sim = Simulation()

下面的程序注册过程函数car的一个实例。 过程函数可以生成多个实例, 本例仅生成一个。 注册过程函数也不不会使得该函数马上运行, 而是会将其初始化, 把该过程函数的第一个事件添加到模拟系统维持的事件队列末尾。

@process car(sim)

下面的程序运行整个模拟。 一般应该指定模拟运行到虚拟时间的什么时刻为止, 但是这个简单模拟只有3个事件, 在只有有限个事件时, 也可以在所有事件都发生过以后停止模拟。

run(sim)

运行结果:

0.0 Here I am.
5.0 Here I am.
10.0 Here I am.

17.2.2 反复激活一种事件的例子

@yield命令在过程函数中将某个事件加入到事件队列, 暂时停止过程函数的执行去执行其它的过程, 在提交事件被激活后恢复该过程函数的执行。

典型的事件是用timeout定义的超时事件, 直观理解是等待指定的时间后才继续执行。 一定要注意程序逻辑只是增加了一个允许搁置运行, @yield timeout命令后面的命令还是必须在@yield timeout命令完成且指定时间已过时才会被执行。

作为简单例子, 假设有一辆汽车每隔5个小时从某个路口经过, 但不像前面的简单例子那样指定经过有限次, 这个行为会不断重复, 直到模拟系统决定停止模拟运行。

下面的程序用@resumable function定义了一个过程函数, 会每隔5个小时显示经过信息:

@resumable function car(env::Environment)
    duration = 5
    while true
        println("Here I am: ", now(env))
        @yield timeout(env, duration)
    end
end

每次用@yield激活事件后, 过程函数的运行将被暂时搁置, 直到该事件被激活。 因为事件是“超时”事件, 所以模拟系统的时钟(不是实际时钟)经过指定的时间(比如,每5小时)后激活超时事件, 过程函数得以在@yield命令之后恢复运行。

表面上看, 上面的程序while true ... end好像是进行了无限的循环, 但因为在激活事件之前过程函数并不会继续运行, 只有一个事件被激活后, 下一个事件才会被添加到模拟系统控制的事件队列中, 所以无限循环并不会造成事件队列中有无穷多个事件。 如果只有一辆汽车, 模拟系统的事件队列在同一时刻最多有一个事件。

建立模拟环境:

sim = Simulation()

下面在模拟环境中注册一辆汽车的实例。

@process car(sim)

上面的程序用@process在模拟环境sim中注册了一辆汽车的实例, 但现在汽车过程函数并没有实际运行, 只是对“汽车”实例进行了必要的初始化, 将它最初的事件添加到模拟系统的事件队列中。 注意car用了while true ... end可以不停地制造超时事件, 但制造这些事件是激活了一个超时事件才向事件队列添加下一个超时事件的, 所以在用@process注册时仅注册第一个超时事件。

run函数开始模拟的运行(戏剧表演开始了), 并控制在多长时间以后结束:

run(sim, 15.0)

结果:

Here I am: 0.0
Here I am: 5.0
Here I am: 10.0

注意在第15小时的时候函数已经停止运行, 所以run(sim, T)停止运行时间是恰好在T之前, 即模拟系统继续模拟是while 模拟系统时钟 < T这样的做法。

17.2.3 等待随机长度的例子

上面的例子是等时间间隔地激活事件。 下面的程序做了小的修改, 间隔随机长度的时间激活事件。 这里用了Distribution包提供的指数分布和指数分布随机数。

@resumable function car(env::Environment)
    λ = 0.5
    interval_dist = Exponential(1/λ)
    while true
        duration = rand(interval_dist)
        println("Here I am: ", now(env))
        @yield timeout(env, duration)
    end
end

sim = Simulation()
@process car(sim)
run(sim, 15.0)

结果:

Here I am: 0.0
Here I am: 0.9166323913946255
Here I am: 1.189959582695258
Here I am: 1.6295248782234217
Here I am: 5.330939643042086
Here I am: 8.970607309675186

17.2.4 同一个过程函数激活不同事件的例子

下面的程序用@resumable function定义了一个过程函数, 这个过程函数反复不停地激活两种事件:停车和行驶。

@resumable function car(env::Environment)
    parking_duration = 5
    trip_duration = 2
    while true
        println("Start parking at ", now(env))
        @yield timeout(env, parking_duration)
        println("Start driving at ", now(env))
        @yield timeout(env, trip_duration)
    end
end

与前面例子相比, 这里有两种事件。 但是, 因为只有前一个事件已经被激发后, 才提交下一个事件到模拟系统的事件队列中, 所以在只有一辆汽车的情况下, 模拟系统的事件队列在同一时刻最多有一个事件。

建立模拟环境(舞台):

sim = Simulation()

下面在模拟环境中注册一辆汽车, 开始模拟运行:

@process car(sim)
run(sim, 15.0)

结果:

Start parking at 0.0
Start driving at 5.0
Start parking at 7.0
Start driving at 12.0
Start parking at 14.0

17.2.5 两个实例简单交替激活的例子

下面的例子生成表示两辆汽车的过程函数实例, 每辆汽车都按一定时间间隔定时通过某一特定路口并显示提示消息, 两辆汽车反复不停地提交通过路口的事件, 这些事件被依次存储在事件队列中并适当按时激活。

代表一辆汽车的过程函数:

@resumable function car(
    env::Environment, id::String, duration::Number)
    while true
        println("Car $(id) passing: ", now(env))
        @yield timeout(env, duration)
    end
end

下面的@process命令在模拟环境中注册两个代表汽车的过程函数实例, 相当于有两辆汽车, 分别每2小时和每5小时通过指定路口一次。

sim = Simulation()
@process car(sim, "A", 2) # 每2小时通过一次
@process car(sim, "B", 5) # 每5小时通过一次
run(sim, 15) # 执行15小时

结果:

Car A passing: 0.0
Car B passing: 0.0
Car A passing: 2.0
Car A passing: 4.0
Car B passing: 5.0
Car A passing: 6.0
Car A passing: 8.0
Car B passing: 10.0
Car A passing: 10.0
Car A passing: 12.0
Car A passing: 14.0

模拟开始运行后, 两个汽车过程函数实例会向队列中添加激活事件, 但每个激活事件被实际激活之前, 因为过程函数的运行被搁置, 所以同一汽车过程函数实例并不会在模拟系统的事件队列中添加下一个激活事件。 所以, 在每个给定时刻, 模拟系统的队列中至多有两个待激活的事件, 分别属于两个汽车过程实例。

17.2.6 等待另一过程的例子

@process在模拟系统注册生成过程函数的实例(对象), 返回值的类型是Process, 可以保存其返回值, 用来进行过程之间的交互。

例如, “电动汽车”是一个过程实例, “充电”是一种表示动作的过程, 汽车在充电时会等待“充电”过程的完成事件。 @yield充电过程, 就会等待充电过程完成的事件才继续运行。 这是在过程函数中调用@process的例子, 这样可以动态地注册多个甚至无穷多个过程实例。

代表充电的过程:

@resumable function charge(
    env::Environment, duration::Number)
    @yield timeout(env, duration)
end

代表汽车的过程:

@resumable function car(env::Environment)
    charge_duration = 5.0
    trip_duration = 2.0
    while true
        println("Start parking and charging at $(now(env))")
        charge_proc = @process charge(env, charge_duration)
        @yield charge_proc # 等待充电过程结束
        println("Start driving at $(now(env))")
        @yield timeout(env, trip_duration)
    end
end

汽车的过程函数反复添加两种事件:充电事件和行驶事件。 充电事件是用另一个过程函数表示的, 每次用@process charge(env, charge_duration)重新注册生成一个充电过程函数实例, 保存到变量charge_proc中, 这同时也是一个事件, 在执行@process charge(env, charge_duration)完毕时这个事件就被添加到模拟系统的事件队列中, car的程序中用@yield charge_proc使得car过程函数可以等待充电事件完成时被恢复运行。 所以@process构建并向模拟系统注册一个充电过程实例, 而@yield charge_proc是汽车过程函数等待充电完成事件。

下面的程序中仅用@process注册并生成了一辆汽车的过程函数实例, 但并没有用@process去注册生成充电的过程函数实例, 充电的过程函数实例是在模拟运行过程中由car函数实时地反复注册生成的, 模拟过程中注册生成了多个充电过程函数实例,但同一时刻至多有一个实例存在。

运行:

sim = Simulation()
@process car(sim)
run(sim, 15.0)

注意在运行前仅注册了car过程, 另一个charge过程是被car函数注册并作为事件提交的。

结果:

Start parking and charging at 0.0
Start driving at 5.0
Start parking and charging at 7.0
Start driving at 12.0
Start parking and charging at 14.0

17.2.7 中断另一过程运行的例子

在上述汽车充电的过程中, 增加一个“司机”过程, 此过程在模拟开始后的3小时, 提交一个“中断”代表汽车的过程函数执行的事件, 该事件使得car函数接收到“异常”(exception)信号, 异常类型是Interrupt, 需要car函数用try ... catch ... end结构进行异常处理。 进行异常处理时, 可以决定是继续执行函数中后续命令, 还是提交新的事件。

“司机”过程函数:

@resumable function driver(
    env::Environment, car_proc::Process)
    @yield timeout(env, 3.0)
    @yield interrupt(car_proc)
end

@yield interrupt(car_proc)命令执行时, 会使得正在执行的car_proc函数接收到异常信号, 需要用try ... catch ... end结构进行异常处理。

充电过程函数:

@resumable function charge(
    env::Environment, duration::Float64)
    @yield timeout(env, duration)
end

汽车充电、行驶过程函数:

@resumable function car(env::Environment)
    charge_duration = 5.0
    trip_duration = 2.0
    while true
        println("Start parking and charging at $(now(env))")
        charge_proc = @process charge(env, charge_duration)
        try
            @yield charge_proc
        catch exc
            println("Was interrupted. Hopefully, the battery is full enough ...")
        end
        println("Start driving at $(now(env))")
        @yield timeout(env, trip_duration)
    end
end

汽车过程函数反复地提交充电过程、行驶过程。 但是, 因为充电过程会在第3小时被司机过程中断, 所以用了try去提交充电事件, 等待充电事件完成; 如果在等待时收到中断信号, 则catch部分的作用是显示被中断的提示信息, 然后继续执行后面的命令而不需要出错停止。 “司机”过程仅中断了一次“汽车”过程。

运行:

sim = Simulation()
proc_car = @process car(sim)
proc_dr = @process driver(sim, proc_car)
run(sim, 15.0)

结果:

Start parking and charging at 0.0
Was interrupted. Hopefully, the battery is full enough ...
Start driving at 3.0
Start parking and charging at 5.0
Start driving at 10.0
Start parking and charging at 12.0

注意停车充电的间隔本来是5个小时, 结果中仅充电3个小时就被“司机”过程中断了, “汽车”过程处理中断的方式是继续运行函数后续的命令, 即进入行驶阶段。

这个程序仅作为简单的中断演示, 是已知中断发生的时候汽车正处于充电过程中。 如果不确知这样, 则car函数定义中@yield timeout(env, trip_duration)也需要保护在try块中。

17.2.8 排队等待某一资源的例子

一种资源是类似银行柜员、电动车充电桩之类, 在提供服务时有独占性, 但不计算消耗、库存, 用Resource()表示。

例如, 设有一个电动车充电桩, 有四辆汽车在不同时间要求充电, 如果充电桩被占用就排队等待。

汽车的过程函数:

@resumable function car(
    env::Environment, 
    name::Int, 
    bcs::Resource, 
    driving_time::Float64, 
    charge_duration::Float64)
    # 先行驶
    @yield timeout(env, driving_time)
    # 到达充电桩
    println("$name arriving at $(now(env))")
    @yield request(bcs) # 要求资源,没有则等待
    # 获得资源,占用
    println("$name starting to charge at $(now(env))")
    @yield timeout(env, charge_duration)
    # 充电结束,释放资源
    println("$name leaving the bcs at $(now(env))")
    @yield release(bcs)
end

上面的函数用来产生汽车过程函数的实例, 除了过程函数必要的模拟环境输入以外, 还输入name用来区分不同的汽车实例, 用bsc表示要使用的充电桩资源, 用driving_time表示要行驶的时间, 用charging_duration表示充电需要的时间。 每个汽车实例在模拟时先行驶, 然后用@yield request(bcs)请求占用充电桩, 如果当前充电桩没有空闲, 就排队等待; 如果充电桩空闲或充电桩被释放且自己在队首, 就自动获得充电桩的资源, 充电完毕后用@yield release(bcs)释放对资源的独占。

注意每个汽车实例只有四个事件要激活, 而不像前面的程序那样反复产生新的事件。

下面的程序产生模拟环境, 注册产生有两个充电桩的资源(排一个队), 注册产生四辆电动汽车实例, 然后运行模拟。 因为只有有限的事件, 所以不需要指定模拟的终止时间, 所有事件都发生以后模拟自动终止。

sim = Simulation()
bcs = Resource(sim, 2) # 两个充电桩资源
for i=1:4 # 注册四辆车的实例
    @process car(sim, i, bcs, 2.0*i, 5.0)
end
run(sim)

结果:

1 arriving at 2.0
1 starting to charge at 2.0
2 arriving at 4.0
2 starting to charge at 4.0
3 arriving at 6.0
1 leaving the bcs at 7.0
3 starting to charge at 7.0
4 arriving at 8.0
2 leaving the bcs at 9.0
4 starting to charge at 9.0
3 leaving the bcs at 12.0
4 leaving the bcs at 14.0

注意3号汽车在第6小时到达充电桩, 但这时2个充电桩都被占用, 只有在第7小时1号汽车充电完毕后3号汽车才开始充电。

17.2.9 依赖于多个事件的例子

一个过程函数可以同时依赖于多个事件的激发, 这时可以用|&连接不同的事件, |表示只要两个事件之一被激活就可以恢复运行, &表示必须两个事件都激活才可以恢复运行。 下面的程序产生了两个充电桩空闲的事件, 然后汽车过程依赖于这两个事件任意一个先发生就可以恢复运行。

@resumable function charger(env::Environment, 
    name::String)
    λ = 0.5
    interval_dist = Exponential(1/λ)
    @yield timeout(env, rand(interval_dist))
    println("$(name): empty at $(now(env))")
end

@resumable function car(env::Environment)
    @yield ch1 | ch2
    ch1used = state(ch1) == SimJulia.processed
    ch2used = state(ch2) == SimJulia.processed
    if ch1used
        println("May charge with Charger 1: $(now(env))")
    end
    if ch2used
        println("May charge with Charger 2: $(now(env))")
    end
end

sim = Simulation()
ch1 = @process charger(sim, "Charger 1")
ch2 = @process charger(sim, "Charger 2")
@process car(sim)
run(sim, 15)

在上面的程序中, ch1 = @process charger(sim, "Charger 1")ch2 = @process charger(sim, "Charger 2")在模拟系统的事件队列中添加了两个超市事件。 在car过程函数中, 用@yield ch1 | ch2等待ch1ch2中较早的一个事件。 用state(ch1)访问过程函数ch1的当前状态, 分为SimJulia.idleSimJulia.scheduledSimJulia.processed三种状态, 其中SimJulia.processed表示事件已经完成, 从模拟系统的事件队列中取出了。

结果:

Charger 1: empty at 2.5264952086720447
May charge with Charger 1: 2.5264952086720447
Charger 2: empty at 3.468118800288168

以上的程序在反复重复运行的时候, 有时第一个充电桩被使用, 有时第二个充电桩被使用。

将前面程序中的@yield ch1 | ch2改成@yield ch1 & ch2, 则汽车过程等待两个充电桩同时空闲时才恢复运行。 程序结果如:

Charger 1: empty at 0.1635556209901166
Charger 2: empty at 1.6720793256630084
May charge with Charger 1: 1.6720793256630084
May charge with Charger 2: 1.6720793256630084

17.2.10 Container资源例子

Container是一种“容器”资源, 它有一个capacity(容量)、level(存货量), 以及其它一些属性。 用@yield put(con, amount)补充存货, 用@yield get(con, amount)取货。 取货时如果要求的数量超过存货量就排队等待。

下面的程序短期模拟一个加油站, 有一辆油罐车向加油站注油一次, 有两辆汽车从加油站加油各一次。

# 油罐车在60分钟后补货
@resumable function tanker(env, tank)
    @yield timeout(env, 60)
    println("$(now(env)) tanker filling tank, level before = $(tank.level)")
    @yield put(tank, 1000)
    @yield timeout(env, 0)
    println("$(now(env)) tanker filled tank, level after = $(tank.level)")
end 

# 小汽车在指定时间加油
@resumable function car(env,
    name, time_arrival, tank)
    @yield timeout(env, time_arrival)
    println("$(now(env)) car $(name) arrived, level before = $(tank.level)")
    @yield get(tank, 40)
    @yield timeout(env, 0)
    println("$(now(env)) car $(name) refilled, level after = $(tank.level)")
end

sim = Simulation()
tank = Container(sim, 10000; level = 2000)
@process tanker(sim, tank)
@process car(sim, "A", 10, tank)
@process car(sim, "B", 20, tank)
run(sim)

结果:

10.0 car A arrived, level before = 2000
10.0 car A refilled, level after = 1960
20.0 car B arrived, level before = 1960
20.0 car B refilled, level after = 1920
60.0 tanker filling tank, level before = 1920
60.0 tanker filled tank, level after = 2920

17.2.11 Store类型

Container资源类型是仅有一种无区别的物资, 存量可以取连续值或离散值。 Store资源类型可以放入、取出一个一个有区分的物品, 如商店存货、生产者和消费者模型等。

例如, 有一个发送端向信道发送信息, 有两个接收端用排队方式从信道接收信息, 信道用Store类型资源, Store没有设置容量限制。


@resumable function producer(
    env::Environment, store::Store)
    for i=1:100
        # 发送间隔
        @yield timeout(env, 1)
        @yield put(store, "Message $(i)")
        println(now(env),"  message produced.")
    end
end

# 信号接收端
@resumable function consumer(
    env::Environment, name, store::Store)
    while true
        @yield timeout(env, 3) # 接收延迟
        println(now(env), "  ", name, " receiving...")
        item = @yield get(store)
        println(now(env), "  ", name, " got ", item)
    end
end

sim = Simulation()
store = Store{String}(sim)
@process producer(sim, store)
consumer1 = @process consumer(sim, 1, store)
consumer2 = @process consumer(sim, 2, store)
run(sim, 10)

结果:

1.0  message produced.
2.0  message produced.
3.0  1 receiving...
3.0  2 receiving...
3.0  1 got Message 1
3.0  2 got Message 2
3.0  message produced.
4.0  message produced.
5.0  message produced.
6.0  1 receiving...
6.0  2 receiving...
6.0  1 got Message 3
6.0  2 got Message 5
6.0  message produced.
7.0  message produced.
8.0  message produced.
9.0  1 receiving...
9.0  2 receiving...
9.0  1 got Message 6
9.0  2 got Message 8
9.0  message produced.
["Message 9", "Message 4", "Message 7"]

可以看出,Store类型作为信道是不太合格的, 如果接收端速度较慢, 物品并不能做到先进先出(FIFO), 有些内容被保留在信道中了。

Store类型也可以有容量限制。 增加信道容量限制后, 就可以使得发送端发送速度变慢, 以适应接收端的速度。 将上面程序中生成store的命令改成

store = Store{String}(sim; capacity = UInt(2))

结果为:

1.0  message produced.
2.0  message produced.
3.0  1 receiving...
3.0  2 receiving...
3.0  1 got Message 1
3.0  2 got Message 2
3.0  message produced.
4.0  message produced.
6.0  1 receiving...
6.0  2 receiving...
6.0  1 got Message 3
6.0  2 got Message 4
6.0  message produced.
7.0  message produced.
9.0  1 receiving...
9.0  2 receiving...
9.0  1 got Message 5
9.0  2 got Message 6
9.0  message produced.
["Message 7"]

17.3 模拟运行

sim是运行环境, 一般用run(sim, T)开始运行并在模拟系统的模拟时钟达到T之前结束运行。

如果仅注册了有限个事件, 可以用run(sim)运行, 并在队列中所有事件都激活过以后结束运行。

还可以用step(sim)一个事件一个事件地激发运行, 这需要知道要激活多少个事件, 或用now(sim)模拟时钟值控制到何时结束。

在物理世界中, 事件通常不会“同时”发生, 但是在计算机中因为时间的离散化或者数值表示的离散化, 可能会有事件被调度为同时发生。 SimJulia还是会按加入队列先后给这些事件排序, 而不会同时恢复这些事件要激活的过程函数。

“事件”有三种状态:

  • SimJulia.idle,还没有放到模拟系统的事件队列中;
  • SimJulia.scheduled,已经放到模拟系统的事件队列中指定了要发生的时间, 还没有触发;
  • SimJulia.processed,已经触发,不在事件队列中了。

每个事件都要依序经过这三种状态。 可以用state(event)查询某一事件的当前状态。

可以用ev = Event(sim)直接生成事件, 这时事件状态为SimJulia.idle; 过程函数可以用@yield等待这样的事件, 用succeed(ev)可以触发此事件, 这时等待这个事件的过程函数被恢复。 多个过程函数实例可以等待同一个事件。

17.4 数据收集和统计分析

离散事件模拟系统的目的是研究比较各种控制策略, 比较所模拟的系统的不同参数设置的性能。 SimJulia目前(0.8.2版本)没有提供自动的数据收集和分析功能, 可以自己收集数据并编写分析代码。

一般可以在关心的事件发生时,记录下发生时间和事件类型, 以及一些关心的量,比如随机服务系统中排队人数。 记录方法可选:

  • 用全局变量(或模块全局变量)保存。 在更新这些量的程序位置用global声明全局变量并保存当前值。 保存数组时,push!(array, value)是很有效的做法。
  • 保存在一个日志文件中。 这也是比较方便的做法, 不需要利用全局变量, 格式也很自由, 但是设计输出内容时要考虑到如何用程序读入进行统计分析。
  • 保存到一个数据库中。

17.5 示例:M/M/c服务系统模拟

假设有一个银行, 有c个柜员, 顾客以速率λ的泊松过程到来(间隔时间为均值\(1/\lambda\)的指数分布), 如果有柜员空闲就直接接受服务, 否则就排成一队等待服务, 接受服务时服务时间服从均值为\(1/\mu\)的指数分布。 这样一个随机服务系统称为“M/M/c”模型, 这里M代表无记忆(memoryless), 即到达间隔和服务时间分布都是指数分布。 如果仅有一个柜员,就称为M/M/1随机服务系统。

每个顾客平均的停留时间的理论公式为 \[ \text{平均停留时间} = \frac{1}{c \mu - \lambda} . \]

更复杂的系统难以得到各种问题的理论结果, 所以需要能够用模拟软件研究。 下面的程序模拟银行足够长的时间, 然后用每个人的平均停留时间估计理论值。

为了分析模拟过程产生的数据, 在模拟运行中需要记录相应的数据。 SimJulia当前的版本(0.8.2)还没有提供数据收集和统计分析功能, 所以下面的程序用了全局变量来保存模拟过程中收集的数据。 因为用了全局变量来保存每个顾客的到来时间、开始服务时间、结束服务时间, 所以用了模块(module), 来克服使用全局变量的缺点。

17.5.1 有限顾客版本

这个例子演示如何等待一个资源, 以及一次性注册所有要模拟的事件。

这个版本仅产生有限个顾客, 所有顾客到达、服务事件都处理过后模拟就结束了, 仅显示顾客的各种事件的时间信息。 直接在模拟开始的时候就将所有的顾客实例都注册到模拟系统中了。 如果事先不知道有多少顾客就不能用这种办法, 而是逐步地注册产生新的顾客实例。

程序:

using Random, Distributions
using SimJulia, ResumableFunctions
using Printf

# 仅产生有限个顾客
num_customers = 5
# 柜员个数设置
num_servers = 2 
# 服务完成速率
mu = 1.0 / 2 
# 顾客到来速率
lam = 0.9 
# 到来时间间隔分布
arrival_dist = Exponential(1 / lam) # interarrival time distriubtion
# 每位顾客接受服务的时间分布
service_dist = Exponential(1 / mu)  # service time distribution

# 显示一定精度的系统模拟时间
function timefmt(tm::Number)
    @sprintf "%8.4f" float(tm)
end
function timefmt(env::Environment)
    timefmt(now(env))
end

# 定义顾客行为
@resumable function customer(
    env::Environment, server::Resource, 
    id::Integer, time_arr::Float64, dist_serve::Distribution)
    # 在指定的时刻到来,注意不是到来间隔
    @yield timeout(env, time_arr)
    println(timefmt(env), "  Customer $id arrived.")
    # 请求柜员资源,如果都忙就排队等待
    @yield request(server) 
    # 获得柜员资源,开始接受服务,占用柜员资源随机服务时间
    println(timefmt(env), "  Customer $id entered service.")
    @yield timeout(env, rand(dist_serve)) 
    # 服务结束,释放资源,离开系统
    @yield release(server) 
    println(timefmt(env),  "  Customer $id exited service.")
end

# 为了模拟可重复,设置随机数种子
Random.seed!(101)
# 设置模拟环境
sim = Simulation()
# 生成柜员资源
server = Resource(sim, num_servers) # initialize servers
# 注册有限个顾客在事件队列中
arrival_time = 0.0
for i = 1:num_customers # initialize customers
    global arrival_time 
    arrival_time += rand(arrival_dist)
    @process customer(sim, server, i, arrival_time, service_dist)
end
# 运行直到没有顾客为止
run(sim) 

结果:

  1.0854  Customer 1 arrived.
  1.0854  Customer 1 entered service.
  1.5966  Customer 2 arrived.        
  1.5966  Customer 2 entered service.
  2.4742  Customer 2 exited service. 
  2.9570  Customer 3 arrived.        
  2.9570  Customer 3 entered service.
  3.8253  Customer 4 arrived.        
  4.6549  Customer 5 arrived.        
  5.7325  Customer 1 exited service. 
  5.7325  Customer 4 entered service.
  6.1697  Customer 4 exited service. 
  6.1697  Customer 5 entered service.
  6.9065  Customer 5 exited service. 
 10.7595  Customer 3 exited service. 

17.5.2 无限顾客的版本

这个例子演示了如何等待一个资源(本例为柜员), 如何源源不断地产生某种过程函数的实例, 如何在模拟运行中收集数据, 在模拟完成后进行统计数据分析。

随机服务系统一般都模拟很长时间, 所以需要不限制顾客人数。 增加一个反复注册产生新顾客的过程函数customer_generator, 因为使用了@yield timeout产生延迟, 所以每个新顾客到达事件发生后才注册产生下一个顾客的到达, 不会在队列中一下子增加无穷多个顾客。

用了全局变量来保存关心的量, 比如每位顾客的到来时间、开始服务时间、结束服务时间, 这样可以进行统计分析。 因为使用了全局变量,所以将主要代码写成了一个模块(module)。

程序:

module mmc_simjulia
using ResumableFunctions, SimJulia, Distributions 
using Printf

export mmc_sim, mmc_stat, debug!

# 设置全局变量DEBUG为true
function debug!()
    global DEBUG
    DEBUG = true
end

# 显示一定精度的系统模拟时间
function timefmt(tm::Number)
    @sprintf "%8.4f" float(tm)
end
function timefmt(env::Environment)
    timefmt(now(env))
end

# 顾客行为
@resumable function customer(
    env::Environment, # 模拟环境
    id::Integer,      # 编号
    server::Resource, # 要求的柜员 
    dist_serve::Distribution) # 服务分布
    global time_arr, time_ser, time_lea

    # 到达事件被触发
    t0 = now(env)
    DEBUG && println(timefmt(env), "  Customer $id arrived.")
    # 柜员要求
    @yield request(server) 
    # 获得空闲柜员
    t1 = now(env)
    DEBUG && println(timefmt(env), "  Customer $id entered service.")
    # 占用柜员随机的服务时间
    @yield timeout(env, rand(dist_serve)) 
    # 释放柜员资源
    @yield release(server) # customer exits service
    # 终止服务离开
    t2 = now(env)
    DEBUG && println(timefmt(env), "  Customer $id exited service.")

    push!(time_arr, t0)
    push!(time_ser, t1)
    push!(time_lea, t2)
end

@resumable function customer_generator(
    env::Environment, 
    server::Resource, 
    arrival_dist,
    service_dist)

    i = 0 # 顾客编号初值
    arrival_time =  0.0 # 实际到达时间
    while true 
        i += 1
        # 注意arrival_time是各个顾客的实际到达时间而不是间隔时间
        tint = rand(arrival_dist)
        arrival_time += tint 
        @yield timeout(env, tint)
        proc = @process customer(env, i, server, service_dist)
    end
end

function mmc_sim= 0.9, μ = 0.5, c = 2, T=1_000)
    # 顾客到来时间间隔分布
    arrival_dist = Exponential(1 / λ)
    # 接受服务持续时间分布
    service_dist = Exponential(1 / μ)

    sim = Simulation() # 生成一个模拟控制环境
    server = Resource(sim, c) # 柜员资源
    @process customer_generator(sim, 
        server, arrival_dist, service_dist)

    run(sim, T)
end

# 比较模拟的平均等待时间与理论期望等待时间
function mmc_stat= 0.9, μ = 0.5, c = 2)
    mwait = 1 / (c*μ - λ)
    arrs = [time_arr, time_ser, time_lea]
    n = minimum(length, arrs)
    arrs = [x[1:n] for x in arrs]
    time_stay = arrs[3] .- arrs[1]
    sampavg = mean(time_stay)
    println("理论期望等待时间=$(mwait), 模拟估计=$(sampavg)")
end

# 利用全局变量保存结果
time_arr = []
time_ser = []
time_lea = []
DEBUG = false 

end # module

短时间测试运行:

using .mmc_simjulia
using Random 

debug!()
Random.seed!(101)
mmc_sim(0.9, 0.5, 2, 5)
mmc_stat(0.9, 0.5, 2)

结果:

  1.0854  Customer 1 arrived.
  1.0854  Customer 1 entered service.
  1.5966  Customer 2 arrived.
  1.5966  Customer 2 entered service.
  2.4649  Customer 3 arrived.
  3.0901  Customer 2 exited service.
  3.0901  Customer 3 entered service.
  3.5341  Customer 1 exited service.
  3.9676  Customer 3 exited service.
理论期望等待时间=10.000000000000002, 模拟估计=1.8149625102353653

长时间运行:

Random.seed!(101)
mmc_sim(0.9, 0.5, 2, 100_000)
mmc_stat(0.9, 0.5, 2)
## 理论期望等待时间=10.000000000000002, 模拟估计=9.963842803897457

17.6 示例:洗车房模拟

这个例子演示了Resource资源的使用, 以及如何等待一个过程函数。

设某个洗车房有两台洗车机, 待洗车辆到来后如果有空机器就马上洗车, 否则排成一队等待。 模拟开始时就有4辆车辆需要洗车, 然后以平均每7分钟的速度到来新的车辆。 洗车需要5分钟。

using Random, ResumableFunctions, SimJulia, Distributions

下面的过程函数不用来代表一个实体, 而是代表一个动作。 后面的car过程将注册并等待它的完成事件。

@resumable function wash(
    sim::Simulation, name::String, washtime::Number)
    println("$(now(sim)) car $name wash starting.")
    @yield timeout(sim, washtime)
    println("$(now(sim)) car $name wash complete.")
end 

下面的过程函数car产生一辆车洗车的实例, 包括请求、释放资源和调用过程函数wash。 一定要注意, 调用另一个过程函数wash的写法, 是@yield @process wash, 而不是@yield wash。 因为wash也是一个过程函数, 所以需要用@process才能产生它的实例, 然后才用@yield等待它的完成事件, 即代表洗车完成的事件。

@resumable function car(
    sim::Simulation, name::String, 
    machine::Resource, washtime::Number)
    println("$(now(sim)) car $name arrived.")
    @yield request(machine)

    @yield @process wash(sim, name, washtime)

    @yield release(machine)
end 

下面的过程函数用来产生源源不断到来的待洗车辆。 一开始就注册4辆待洗车辆, 然后以平均7分钟一辆的速度注册产生新的待洗车辆。 要注意仅进行了@process car, 而没有像car函数那样@yield @process car, 所以仅定期产生待洗车辆, 而不会等待洗车完毕再产生新的待洗车辆。

@resumable function car_generator(
    sim::Simulation, 
    machine::Resource,
    washtime::Number)
    # 先注册4辆车的实例,这4个是直接进入模拟系统的事件队列的
    # 仅注册,不马上开始洗车
    for i=1:4
        @process car(sim, "$i", machine, washtime)
    end 

    # 然后再源源不绝地产生待洗车辆,
    # 但有一定产生速度,所以不会使得事件队列无穷增长
    t_arrival = 7
    i = 4
    while true
        i += 1
        @yield timeout(sim, rand((t_arrival - 2):(t_arrival + 2)))
        @process car(sim, "$i", machine, washtime)
    end
end 

模拟的主控函数:

function car_sim(num_machines=2, washtime = 5, T=20)
    # 模拟环境:
    sim = Simulation()

    # 洗车机资源:
    machine = Resource(sim, num_machines)

    # 不停产生待洗车辆的过程:
    @process car_generator(sim, machine, washtime)

    run(sim, T)
end

car_sim()

结果:

0.0 car 1 arrived.
0.0 car 2 arrived.
0.0 car 3 arrived.
0.0 car 4 arrived.
0.0 car 1 wash starting.
0.0 car 2 wash starting.
5.0 car 1 wash complete.
5.0 car 2 wash complete.
5.0 car 3 wash starting.
5.0 car 4 wash starting.
8.0 car 5 arrived.
10.0 car 3 wash complete.
10.0 car 4 wash complete.
10.0 car 5 wash starting.
15.0 car 5 wash complete.
17.0 car 6 arrived.
17.0 car 6 wash starting.

17.7 示例:加油站模拟

此例演示Container资源的使用。

此例模拟一个加油站, 有一个监控过程每1分钟检查油量是否小于某个比例, 然后就开始补货过程,需要一定时间。 汽车到来并加油,使得油库存货量变少。

程序:

# 加油站补货过程。
@resumable function tanker(env, tank)
    while true
        if tank.level < tank.capacity*0.2
            # 加油
            println("$(now(env)) Tanker on road.")
            amount = tank.capacity - tank.level
            @yield timeout(env, 60)
            @yield put(tank, amount)
            println("$(now(env)) Tanker refill, level: ", tank.level)
        end
        # 每隔1分钟检查剩余量
        @yield timeout(env, 1)
    end
end

# 来加油的车辆
@resumable function car(env, id, tank, arrival)
    @yield timeout(env, arrival) # 指定到达时间而非间隔
    @yield get(tank, 20.0)
    println("$(now(env)) car $(id) refill, left $(tank.level)")
end

# 反复产生来加油的车辆
@resumable function car_series(env, tank)
    i=0
    interval = 10
    arrival = 0.0
    while true
        i += 1
        @yield timeout(env, interval)
        arrival += interval
        @process car(env, i, tank, arrival)
    end
end 

# 当Container的level在同一时刻被并发修改时,结果不合理。
sim = Simulation()
tank = Container(sim, 200.0, level=100.0)
@process tanker(sim, tank)
@process car_series(sim, tank)
run(sim, 200)

结果:

20.0 car 1 refill, left 80.0
40.0 car 2 refill, left 60.0
60.0 car 3 refill, left 40.0
80.0 Tanker on road.
80.0 car 4 refill, left 20.0
100.0 car 5 refill, left 0.0
140.0 Tanker refill, level: 140.0
140.0 car 6 refill, left 140.0
140.0 car 7 refill, left 140.0
160.0 car 8 refill, left 120.0
180.0 car 9 refill, left 100.0

Store资源现在的版本(0.8.2)有一个问题: 同一时刻有多个过程补货、取货时, 水平值无法正确区分各个过程造成的水平变化。

监控过程在80分钟时发现油量不足, 开始补货,需要60分钟才能完成, 所以在140分钟时补货, 但这时已经有6、7两号汽车加油, 所以补货与加油联合的效果是还剩140油量。

为了避免Container并发访问的这个问题, 可以将并发访问的时间略作很小的修改; 或者在补货时中断排队加油的车辆, 这需要在车辆的过程函数中用try ... catch ... end捕获中断异常, 并等待补货过程完成后重新发起加油请求。

17.8 示例:带有信道延迟的信号发送接收

这个例子演示了如何将一个过程函数包装在一个普通函数中, 实现了一定的抽象化, 使得包装后的函数更容易使用。

设有一个信号发送接收系统, 发送端持续、间隔地发出信号, 接收端持续、间隔地接受信号。 如果用Store类型表示信道, 可以用信道的容量限制双方的发送、接收速度。 但是, 直接用Store表示信道, 无法模拟信道造成的延迟。 下面的程序用一个Cable自定义类型将Store资源包裹在内, 并重新定义putget函数实现类似Store的功能, 但增加了信道的时延。

程序中的latency是一个过程函数, 如果直接使用, 就需要犹豫调用方法到底是@yield latency, @process latency, @yield @process latency, 还是简单地直接用latency。 所以, 程序中使用了函数putlatency函数的调用包裹起来, 随后用户使用时只要将put看成一个普通函数, 按普通函数调用即可。

using Random, ResumableFunctions, SimJulia
using Printf

import SimJulia.put # 为了重载
import Base.get     # 为了重载

Random.seed!(8710)  # 为了模拟可重复
const SIM_DURATION = 100.  # 要模拟的时间长度
const SEND_PERIOD = 5.0    # 发送间隔
const RECEIVE_PERIOD = 3.0 # 接受间隔

# 显示一定精度的系统模拟时间
function timefmt(tm::Number)
    @sprintf "%8.4f" float(tm)
end
function timefmt(env::Environment)
    timefmt(now(env))
end

# 自定义Cable类型,包裹Store资源
# 作为带有时延的Store
mutable struct Cable
    env::Simulation # 模拟环境
    delay::Float64  # 时延参数
    store::Store{String} # 包裹Store并重载put和get

    # 自定义数据类型的构建函数
    function Cable(env::Simulation, delay::Float64)
        return new(env, delay, Store{String}(env))
    end
end

# 实现Cable类型时延的过程函数
# 这个函数会产生一个超时事件,实现时延;
# 并产生一个put事件,实现发送
@resumable function latency(
    env::Simulation, cable::Cable, value::String)
    @yield timeout(env, cable.delay)
    @yield put(cable.store, value)
end

# 下面重载了put和get函数,用来实现自定义类型Cable的Store功能
# 这两个函数不是过程函数
# put中的`@process latency`会将时延和信息发送加入到模拟系统事件队列
function put(cable::Cable, value::String)
    @process latency(cable.env, cable, value) 
end

# `get`需要用`@yield get`方式调用,
# 产生接收信息的事件,并返回接收的信息
function get(cable::Cable)
    get(cable.store) # returns an element stored in the cable store
end

# 信号发送端的过程函数。
@resumable function sender(env::Simulation, cable::Cable)
    i = 0
    while true
        i += 1
        # 发送端等间隔发送信号到cable
        @yield timeout(env, SEND_PERIOD)
        value = "$(timefmt(env)) sending $(i)"
        # 注意put不用@yield put,
        # 因为put会用@process增加latency中两个@yield事件,
        # 其中一个代表了信道时延
        put(cable, value)
    end
end

# 信号接收端的过程函数
@resumable function receiver(
    env::Simulation, cable::Cable)
    while true
        # 接收端等间隔探查信道
        @yield timeout(env, RECEIVE_PERIOD)
        # 注意这里用了@yiled get, 
        # 使得取信号能够探查资源是否可用
        msg = @yield get(cable)
        println("$(timefmt(env)) received: $msg")
    end
end

sim = Simulation()
cable = Cable(sim, 10.) # 时延。没有容量限制。
@process sender(sim, cable)
@process receiver(sim, cable)

run(sim, SIM_DURATION)

结果:

  15 received:    5 sending 1
  20 received:   10 sending 2 
  25 received:   15 sending 3 
  30 received:   20 sending 4 
  35 received:   25 sending 5 
  40 received:   30 sending 6 
  45 received:   35 sending 7 
  50 received:   40 sending 8 
  55 received:   45 sending 9 
  60 received:   50 sending 10
  65 received:   55 sending 11
  70 received:   60 sending 12
  75 received:   65 sending 13
  80 received:   70 sending 14
  85 received:   75 sending 15
  90 received:   80 sending 16
  95 received:   85 sending 17

17.9 示例:一对多广播发信的模拟

如果模拟一对一通信, 只要收信速率大于等于发信速率, 只要用Store来代表信道就可以。 如果收信速率低于发信速率, 就需要设置信道容量以限制发信速率。 多个发信、一个收信的通信情况类似。

如果要做一对多通信, 需要发信端和每个收信端单独建立信道。 这可以定义一个新的BroadcastPipe类型, 管理多个信道。

using Random, ResumableFunctions, SimJulia, Distributions
import SimJulia.put # 为了重载

# 自定义数据类型,用于管理一对多的多个信道 
mutable struct BroadcastPipe 
    sim::Simulation 
    capacity::Number
    pipes::Vector{Store}

    function BroadcastPipe(
        sim::Simulation, capacity=typemax(UInt))
        new(sim, capacity, Store{String}[])
    end 
end

# 发送信息时,需要向每个信道发送
# 这是过程函数,所以使用时需要用@process注册。
@resumable function bcp_put(sim::Simulation,
    bcp::BroadcastPipe, value)
    for pipe in bcp.pipes 
        @yield put(pipe, value)
    end
    # 等待所有信道都发送完成
end 

# 将bcp_put包装成一个非过程函数
function put(bcp::BroadcastPipe, value)
    @process bcp_put(bcp.sim, bcp, value)
end

# 提供一个函数,当新的接收端要求连接时,
# 产生一个新信道并记住
function get_output_conn(bcp::BroadcastPipe)
    pipe = Store{String}(bcp.sim, capacity=bcp.capacity)
    push!(bcp.pipes, pipe)
    return pipe 
end 

# 一个发信端的过程函数
@resumable function message_generator(
    sim::Simulation, out_pipe::BroadcastPipe)
    i = 0
    while true 
        i += 1
        @yield timeout(sim, rand(6:9))
        msg = "$(now(sim)) Message $i"
        println(now(sim), " Sending ", i)
        put(out_pipe, msg)
    end
end

# 一个收信端的过程函数。
@resumable function message_consumer(
    sim::Simulation, name::String, 
    in_pipe::Store{String})
    while true
        println(now(sim), " ", name, " Receiving.")
        msg = @yield get(in_pipe)
        println(now(sim), " ", name, " receiving ", msg)
        @yield timeout(sim, rand(4:7))
    end
end

Random.seed!(106)
sim = Simulation()
bcp = BroadcastPipe(sim)
@process message_generator(sim, bcp)
@process message_consumer(sim, "A", get_output_conn(bcp))
@process message_consumer(sim, "B", get_output_conn(bcp))
run(sim, 20)

一定要注意信息发送端没有直接调用bcp_put, 因为这个函数中有@yield, 是一个过程函数, 过程函数被调用时,需要先用@process注册; 程序中用put函数包裹了bcp_put, 使得put可以作为普通函数调用。 这个例子在编程实现时, 因为@yield bcp_put, @process bcp_put, @yield @process bcp_put, bcp_put的取舍作者花费了比较多的时间进行试错。

结果:

0.0 A Receiving.
0.0 B Receiving.
8.0 Sending 1
8.0 A receiving 8.0 Message 1  
8.0 B receiving 8.0 Message 1  
14.0 Sending 2
14.0 B Receiving.
14.0 B receiving 14.0 Message 2
15.0 A Receiving.
15.0 A receiving 14.0 Message 2
18.0 B Receiving.
19.0 A Receiving.

17.10 示例:排队超过耐心模拟

仍模拟一个银行, 仅有一个柜员,服务时间随机, 顾客按随机间隔到来, 但排队超过随机的时间间隔后放弃排队退出。

这个程序演示了如何在两个事件之间选择较早一个响应。 程序:

using Random, Distributions 
using ResumableFunctions, SimJulia
using Printf

# 显示一定精度的系统模拟时间
function timefmt(tm::Number)
    @sprintf "%8.4f" float(tm)
end
function timefmt(env::Environment)
    timefmt(now(env))
end

# 一个顾客行为的过程函数
@resumable function customer(
    env::Environment, # 模拟环境
    id::Integer,      # 编号
    server::Resource) # 要求的柜员 

    # 到达事件被触发
    t0 = now(env)
    DEBUG && println(timefmt(env), "  Customer $id arrived.")

    # 设置一定的排队耐心
    patience = rand(8.0:0.1:12.0)
    # 柜员要求
    req = request(server) # 结果是一个事件
    # 获得空闲柜员,或超过耐心
    @yield req | timeout(sim, patience)
    # 已等待时间
    wait = round(now(env) - t0, digits=1)

    # 判断是获得了柜员还是超过耐心
    if state(req) == SimJulia.processed
        # 获得了柜员服务
        DEBUG && println(timefmt(env), "  Customer $id waited for $(wait)")
        # 占用柜员随机时间
        serv_time = round(rand(Exponential(12.0)), digits=1)
        @yield timeout(env, serv_time)
        @yield release(server)
        DEBUG && println(timefmt(env), "  Customer $id exiting.")
    else # 没有获得柜员服务,耐心超限,退出银行。
        DEBUG && println(timefmt(env), "  Customer $id waited for $(wait), reneging.")
    end
end

# 不停地产生顾客到来
@resumable function customer_generator(
    env::Environment, 
    server::Resource)

    i = 0 # 顾客编号初值
    while true 
        i += 1
        # 两个顾客到来之间的间隔时间:
        tint = round(rand(Exponential(10.0)), digits=1)
        @yield timeout(env, tint)
        @process customer(env, i, server)
        # 注意仅`@process customer`而不要`@yield customer`或`@yield @process customer`
    end
end

Random.seed!(102)
DEBUG=true
sim = Simulation() # 生成一个模拟控制环境
server = Resource(sim, 1) # 柜员资源
@process customer_generator(sim, server)
run(sim, 50)

上面的程序中, 设置了一个不停产生顾客到来的过程函数customer_generator, 它仅需要用@process注册一次。 在这个函数中, 不停地每隔一段时间就用@process新注册产生一个顾客实例, 注意不要用@yield等待这个顾客实例完成, 如果那样就只能完成一个顾客服务才产生下一个顾客了。

程序中,用yield req | timeout(sim, patience)选择两个事件较早的一个响应, 为了获知哪一个事件被响应, 使用了state(req)来访问@yield require的输出, 此输出的类型是事件, 事件可以用state函数查询当前状态, 包括SimJulia.idle(未加入事件队列), SimJulia.scheduled(已加入事件队列但未发生), SimJulia.processed(已从事件队列取出)。 被响应的事件应该状态为SimJulia.processed

结果:

 1.3  Customer 1 arrived.
 1.3  Customer 1 waited for 0.0
15.6  Customer 2 arrived.
21.3  Customer 1 exiting.
21.3  Customer 2 waited for 5.7
21.6  Customer 3 arrived.
26.6  Customer 4 arrived.
30.4  Customer 3 waited for 8.8, reneging.
35.2  Customer 4 waited for 8.6, reneging.
38.0  Customer 5 arrived.
43.1  Customer 2 exiting.
45.6  Customer 6 arrived.
47.0  Customer 5 waited for 9.0, reneging.
48.8  Customer 7 arrived.

17.11 示例:n取k冗余系统可靠性模拟研究

这个例子展示了过程之间用“中断”(interrupt)唤醒其它过程的例子, 展示了Store资源的用法。

设某个设施需要\(n\)台相同的机器同时正常工作才能运作。 为保险, 增加\(s\)台备用机器, 一旦某台在用的机器失效, 就立即用一台备用机器代替, 只有在有在用机器失效但没有备用机器时此设施才宣告崩溃。 只要有失效的机器, 一个维修工就开始维修, 有多台失效时依序进行维修, 每修好一台就将修好的机器作为备用机器。 每台机器不论是否经过修理, 其开始工作到失效之间的时间为随机变量, 相互独立,分布函数为\(F\)。 修理一台机器所需的时间是随机变量, 彼此独立,与工作时间也独立,分布函数为\(G\)。 设\(T\)表示此设施从开始运作到崩溃经过的时间, 要估计其期望值\(ET\)

例子来自:

  • Ross, Simulation 5th edition, Section 7.7, p. 124-126。

程序:

using Random, Distributions
using ResumableFunctions, SimJulia
using Printf

# 运行多次至崩溃然后计算运作时间的样本均值
const RUNS = 100
const N = 10 # 同时工作个数
const S = 3  # 备用机器个数
const SEED = 150 # 随机数种子
const LAMBDA = 100 # 每台机器平均失效前时间
const MU = 5   # 每台机器修复时间

# 失效前时间和修复时间都用指数分布
const F = Exponential(LAMBDA)
const G = Exponential(MU)

# 显示一定精度的系统模拟时间
function timefmt(tm::Number)
    @sprintf "%8.4f" float(tm)
end
function timefmt(env::Environment)
    timefmt(now(env))
end

# 机器的行为,过程函数
@resumable function machine(
    sim::Simulation, # 模拟环境
    name,
    repair_facility::Resource, # 修理工,资源
    spares::Store{Process}) # 备件,仓储资源
    while true
        try 
            # 这里用来作为机器的初始化,
            # 模拟开始,或者一台机器从备件到在用,
            # 都需要用Interrupt事件从这里跳到后面
            @yield timeout(sim, Inf)
        catch
        end
        DEBUG && println(timefmt(sim), "  ", name, " starts working.")

        # 工作时间
        @yield timeout(sim, rand(F))
        DEBUG && println(timefmt(sim), "  ", name, " stops working, ",
            " spares left: ", num_spares(spares))

        # 尝试获取备用机器
        # @yield中用`|`表示等待两个事件的较早一个被激活时恢复运行
        # 因为是超时0,所以如果不能获取备用机器,
        # 则超时0事件被激活,这代表设施发生崩溃
        get_spare = get(spares)
        @yield get_spare | timeout(sim, 0.0)
        if state(get_spare) != SimJulia.idle
            # 这表明获取备件成功
            # value(get_spare)获取了代表一台备用机器的过程实例
            # 按machine的定义,备件需要用interrupt方法开始启用
            interrupt(value(get_spare))
        else
            # 这表明获取备件失败
            throw(SimJulia.StopSimulation("No more spares!"))
        end

        # 因为当前的机器处于失效状态,所以请求修理工资源
        @yield request(repair_facility)
        DEBUG && println(timefmt(sim), "  ", name, " repair starts.")
        # 修理时间随机
        @yield timeout(sim, rand(G))
        # 修理完毕释放修理工资源
        @yield release(repair_facility)
        DEBUG && println(timefmt(sim), "  ", name, " is repaired.")

        # 将修好的当前机器存入备件仓库
        @yield put(spares, active_process(sim))
    end
end # function machine 

# 模拟初始化
@resumable function start_sim(
    sim::Simulation, 
    repair_facility::Resource, 
    spares::Store{Process})

    procs = Process[]
    for i=1:N
        push!(procs, @process machine(
            sim, i, repair_facility, spares))
        # 从这里看出,`@process`的返回值是过程函数实例,
        # 这里代表一台机器
    end

    for proc in procs
        # 按machine函数定义,这会启用proc代表的机器
        @yield interrupt(proc)
    end
    for i=1:S
        # 备用机器存入备用机器仓库
        @yield put(spares, @process machine(
            sim, N+i, repair_facility, spares))
    end
end

# 模拟一次到崩溃的函数
function sim_repair()
    sim = Simulation()
    repair_facility = Resource(sim)
    spares = Store{Process}(sim)
    @process start_sim(sim, repair_facility, spares)
    msg = run(sim)
    stop_time = now(sim)
    DEBUG && println(timefmt(sim), " crashed.")
    stop_time
end

function num_spares(spares::Store{Process})
    length(spares.items)
end

测试一次运行:

Random.seed!(SEED)
DEBUG=true
sim_repair()

结果:

  0.0000  1 starts working. 
  0.0000  2 starts working. 
  0.0000  3 starts working. 
  0.0000  4 starts working. 
  0.0000  5 starts working. 
  0.0000  6 starts working. 
  0.0000  7 starts working. 
  0.0000  8 starts working. 
  0.0000  9 starts working. 
  0.0000  10 starts working.
 10.3417  9 stops working,  spares left: 3
 10.3417  13 starts working.
 10.3417  9 repair starts.
 12.7194  9 is repaired.
 16.7932  13 stops working,  spares left: 3
 16.7932  11 starts working.
 16.7932  13 repair starts.
 18.4354  13 is repaired.
 28.5850  1 stops working,  spares left: 3 
 28.5850  13 starts working.
 28.5850  1 repair starts.
 39.2907  1 is repaired.
 42.4253  4 stops working,  spares left: 3 
 42.4253  9 starts working.
 42.4253  4 repair starts.
 45.5695  4 is repaired.
 48.4878  2 stops working,  spares left: 3 
 48.4878  4 starts working.
 48.4878  2 repair starts.
 49.8409  4 stops working,  spares left: 2
 49.8409  1 starts working.
 50.5184  5 stops working,  spares left: 1
 50.5184  12 starts working.
 52.6692  12 stops working,  spares left: 0
 52.6692 crashed.

模拟多次,计算设施崩溃时间均值:

Random.seed!(SEED)
DEBUG=false
results = Float64[]
for i=1:RUNS
    push!(results, sim_repair())
end
println("Average crash time: ", sum(results)/RUNS)
## Average crash time: 284.34194386903255

17.12 示例:剧院购票模拟

这个例子演示Resource资源使用, 多个事件选择响应, 事件共享,是比较复杂的例子。

设有一个剧院, 有一个售票处, 正在出售3个表演的票。 顾客不停按随机间隔到来, 随机选择一个表演, 并随机选择购买1到6张票。 售票没有时间, 当顾客选择的演出票售罄时顾客直接离开, 但未售罄而是所需数量时顾客会争吵一定时间, 这时到来的顾客排队等待。

当某个演出的票仅剩一张或没有剩余时触发售罄事件。

程序最后显示因为排队时售罄而离开的人数, 售罄时间。


using Random, Distributions 
using ResumableFunctions, SimJulia
using Printf

TICKETS = 50
SIM_TIME = 120

# 用来保存当前信息的结构
mutable struct Theater
    counter::Resource # 售票员资源
    movies  # 所有的演出
    available # 所有演出分别的余票数
    sold_out  # 所有的演出分别是否售罄
    when_sold_out # 所有的演出分别的售罄时间
    num_renegers  # 所有演出分别因未满足要求离开剧院人数
end 

# 一个顾客的行为的过程函数 
@resumable function moviegoer(
    env::Environment, 
    movie, # 选择的表演
    num_tickets,  # 选择的购票章数
    theater) # 来到时的剧院现状

    # 等待售票员或者欲购演出售罄事件
    req = request(theater.counter)
    @yield req | theater.sold_out[movie]
    if state(req) != SimJulia.processed
        # 因为售罄而退出
        theater.num_renegers[movie] += 1
    else 
        # 没有售罄,可以购票
        if theater.available[movie] < num_tickets
            # 没有售罄但余票不够所需
            @yield timeout(env, 3)
        else 
            # 可以购票
            # 这里没有使用Container资源,因为不需要放入过程
            theater.available[movie] -= num_tickets
            if theater.available[movie] <= 1
                # 立即触发“售罄”事件
                succeed(theater.sold_out[movie])
                theater.when_sold_out[movie] = now(env)
                theater.available[movie] = 0
            end
        end
        # 释放售票员资源
        @yield release(theater.counter)
    end

end # function moviegoer

# 顾客不停到来的过程函数
@resumable function customer_arrivals(
    env::Environment, theater::Theater)
    while true 
        tint = rand(Exponential(1/5))
        @yield timeout(env, tint)
        movie = rand(theater.movies)
        num_tickets = rand(1:6)
        if theater.available[movie] > 0
            # 仅在未售罄时才处理顾客的购票实例
            @process moviegoer(env, movie, num_tickets, theater)
        end
    end
end # end function customer_arrivals

# 判断某个演出是否售罄
function is_soldout(movie, theater)
    return state(theater.sold_out[movie]) == SimJulia.processed
end

# 初始化剧院信息
function make_theater(sim::Environment)
    counter = Resource(sim, 1)
    movies = ["Show A", "Movie B", "Concert C"]
    available = Dict(movie => TICKETS for movie in movies)
    sold_out = Dict(movie => Event(sim) for movie in movies)
    when_sold_out = Dict(movie => Inf for movie in movies)
    num_renegers = Dict(movie => 0 for movie in movies)
    Theater(counter, movies, available,
        sold_out, when_sold_out, num_renegers)
end

# 执行模拟
Random.seed!(101)
sim = Simulation()
theater = make_theater(sim)
@process customer_arrivals(sim, theater)
run(sim, SIM_TIME)

# 显示结果
for movie in theater.movies
    if is_soldout(movie, theater)
        println(movie, " sold out after ", 
            theater.when_sold_out[movie])
        println("Number of people leaving queue : ",
            theater.num_renegers[movie])
    end 
end 

程序中直接用Event(sim)生成了事件, 这些事件一开始的状态是SimJulia.idle, 可以用success(event)将事件event状态设置为SimJulia.processed, 这也会触发于这个事件关联的过程函数。 程序中三个演出的售罄事件是被所有顾客共同响应的。

程序中用了@yield req | theater.sold_out[movie]来响应售票员空闲或者所需票售罄两个事件较早一个, 用state(req) != SimJulia.processed判断是否未获得售票员资源。

结果:

Show A sold out after 10.430318916615349
Number of people leaving queue : 2
Concert C sold out after 10.430318916615349
Number of people leaving queue : 3