離散事象シミュレーションのアニメーション化

シミュレーションアニメ

待ち行列モデルの離散事象シミュレーションを行うイベント駆動型プログラミングSimPyを使って、シミュレーション結果を評価することができる。「Pythonコンピュータシミュレーション入門-人文・自然・社会科学の数理モデル」橋本 洋志/牧野 浩二・共著(オーム社)を参考にした。しかし、その途中過程を視覚で確認することができないが、今回、このSimPyのReal-time simulationsを使い、イベント発生に従って事象が変化するアニメーション動画を作成したので、その手順を記録に残すことにした。アニメーション動画はpygameを使った。尚、このプログラムはRaspberry PiでなくてもWindows PCでも動作するが、将来、ラズパイのGPIOと組み合わせると更に興味深いプログラムを作ることができると考えている。

●シミュレーションのシナリオ

シミュレーションシナリオ
シミュレーションシナリオ

ある頻度で到着する車がガソリンスタンドでガソリンを入れる事象を想定している。給油所に到着した車は2台ある給油ポンプの1台からガソリンを注入するが、給油所のタンク残量が車の給油要求量を満たせない時には車は待機することになる。
給油所のタンク残量がある規定値以下になると、補給タンク車を呼びガソリンを給油所タンクに補給する。
2台の給油ポンプが使用中、後続する車は給油ポンプが空くまで行列待ちとなる。
更に、車が給油所に到着する前に信号機があり、赤信号の時には車は決められた時間停止する。
このシナリオ自体あまり現実性はないが、このシナリオの元は、Documentation for SimPy (https://simpy.readthedocs.io/en/latest/contents.html)のプログラムサンプルを参考にして、SimPyの使い方を追求する中で、信号待ちのシナリオを追加した。

●ラズパイのプログラム環境

Raspberry Pi 4B(4GB)+ MicroSDHC(32GB)にフルセットでRaspberry Pi OSをインストールした後、モジュールSimPyとpygameをインストールした。

●燃料補給シミュレーションの設定条件

 ここでの時間管理は秒(sec)を単位としているが、シミュレーション時間の10秒を実際の時間では1分と考えれば、現実に近い数字となる。
 (1)車の設定条件
  発生頻度(車の到着頻度):平均時間間隔40(sec)のポアソン到着(λ=1/40)
  車のガソリンタンク容量 : 50(liters)
  車のガソリン残量(給油時) : 5~25(liters)の離散一様分布
 (2)給油所の設定条件
  給油所タンクの最大容量 : 300(liters)
  給油所タンクの残容量が最大容量の30(%)を下回ったら、補給タンク車を呼んで満杯まで補給
  車へのガソリン補給速度 : 2 (liters / sec.)
  補給タンク車を呼んでから給油所に到着するまでの時間 : 150~350(sec.)の離散一様分布
  補給タンク車から給油所タンクに補給する平均速度 10(liters / sec.)
 (3)シミュレーション時間 : 3000(sec.)

●プログラムの構成

(1)リアルタイムシミュレーションの設定
 イベント駆動型のシミュレーションはイベント発生事象だけを出力するので、その出力間隔は実際の時間とは異なる。しかし、リアルタイムシミュレーションでは、実際の時間間隔でイベントが出力されるので、シミュレーション時間が60分なら、実時間で60分かかることになる。これではシミュレーション結果を評価するには時間がかかるために、管理する時間単位をスケールダウンしたり、環境設定で用意されているfactorで調整することができる。リアルタイムシミュレーションを使うための設定は次の通り。
 import simpy.rt
 env = simpy.rt.RealtimeEnvironment(initial_time=0, factor=0.1, strict=False)
 リアルタイムシミュレーションは、simpyではなくsimpy.rtをimportする。
 ここで、factor=0.1はシミュレーション時間を0.1倍で実行する。例えば、シミュレーション時間1secを0.1secに早めてシミュレーションを実行することになる。
 早めることでプログラム処理が追い付かない場合はerrorで停止するので、追いつかない場合に時間遅れを無視するため、厳密な時間管理をstrict=Falseに設定する。

(2)給油所(GasStation)クラスの設定
 gasStation = GasStation(env, 2)
 2台の給油ポンプをResoruceで設定してそれぞれの給油ポンプのイベントを管理。シミュレーションでは台数管理だけでよいが、アニメーション化をする場合には、2台の給油ポンプを識別する必要があり、def pumpOccupy(self):で識別管理している。
 給油所タンクはContainerで設定して、ガソリンの最大容量、使用量と残容量を管理する。給油所タンクの残量を監視するために、次の様に10sec毎にEventを発生させている。
 yield env.timeout(10)
 残量が規定値を下回った場合には、次の補給タンク車を呼ぶプロセスを作っている。
 yield env.process(self.tankTruck(env))

(3)信号機(Traffic Signal)クラスの設定
 traficSignal = TraficSignal(env, blueTime, redTime)
 青信号、赤信号の状態管理は、青信号の時間blueTime、赤信号の時間redTimeを使って次のEventを交互に発生させている。
 yield env.timeout(self.blueTime)
 yield env.timeout(self.redTime)
 青信号で通過し、赤信号で停止する管理には、信号機にContainer設定してイベント管理している。青信号の場合には、Containerに最大通過台数(putAmaount)をセットし①、赤信号の場合には、Containerの残量(getAmount)を取出し②、残量を0(ゼロ)にする。
 ①yield self.passBlueSignal.put(putAmount)
 ②yield self.passBlueSignal.get(getAmount)
これにより、車が信号を通過する際に、青信号の場合は③のようにget(1)の通過要求を出すとその要求が満たされ通過できるが、赤信号の場合は、残量が0(ゼロ)なので、③の要求は次の青信号まで保留され行列待ちすることになる。
 ③yield trafficSignal.passBlueSignal.get(1)

(4)車(Car)クラスインスタンスの発生プロセスを開始
 env.process(carGenerator(env))
 指数分布を使ってランダムにEventを発生させ、Carインスタンスを作っている。
 Carクラスのアトリビュートには、アニメーション描画に必要な変数を持たせている。
  self.posTime = 現時点での車の待ち行列の位置(ステージ)と到着時間
    ステージ: 0=発生、1=交通信号、2=給油所、3/4=給油ポンプ(0/1)、5=システム外
  self.reqGas = 給油ポンプでの状態(要求中/注入中)
  self.litersRequired = ガソリン給油要求量
  self.carColor = アニメ用の車画像の番号(ランダムに発生)
 Carクラスのメソッドdef drive(self, env)には、車のプロセスを順に設定

(5)1sec単位で、シミュレーション状態をpygameで描画
 env.process(drawSim(env))
 上記で記載したCarクラス、GasStationクラス、TrafficSignalクラスのインスタンス保持データを使って描画している。

(6)シミュレーション開始と時間
 env.run(until=SIM_TIME)

<プログラム例> refuelingRealTimeSimulation.py
main()プログラム部分
# -*- coding: utf-8 -*-
import itertools
import random
import simpy.rt
import pygame
import time

RANDOM_SEED = 123
LAMBDA = 1 / 40	# 車の単位時間到着台数、1/λ時間ごとに1台
GAS_STATION_SIZE = 300     # Gas StationのTank最大容量(liters)
THRESHOLD = 30             # Gas補給車を呼ぶタイミング:タンクレベル (in %)
CAR_FUEL_TANK_SIZE = 50        # 車のGasタンク容量(liters)
CAR_FUEL_TANK_LEVEL = [5, 25]  # 補給時の車の残量(Min/Max in liters)
REFUELING_SPEED = 2        # 平均補給速度(liters / second)
TANK_TRUCK_TIME = [150, 350]      # CALLしてからGas補給車が到着するまでのMin/Max時間(sec.)
GAS_POURING_SPEED = 10		# 補給車からタンクに補給する平均速度(Liters / sec)
SIM_TIME = 3000            # Simulation time in seconds
global cars
global gasStation
global trafficSignal

def main():
    global gasStation
    global trafficSignal
    print('Gas Station Simulation Start')
    random.seed(RANDOM_SEED)
    env = simpy.rt.RealtimeEnvironment(initial_time=0, factor=0.1, strict=False)
    gasStation = GasStation(env, 2)
    trafficSignal = TrafficSignal(env, 60, 60)
    env.process(carGenerator(env))
    env.process(drawSim(env))
    env.run(until=SIM_TIME)

def drawSim(env):
    pygame.init()
    
    #<<<<< 省略  >>>>>
    
    while True:
        screen.fill(WHITE)
          以下省略
        # Draw Simulation Time
        # Draw GasStation(Pumps)
        # Draw Gas Tank
        # Draw Tank Truck
        # Draw Queue Image and Traffic Signal
        # Draw Cars
        
        pygame.display.update()
        yield env.timeout(1)
    pygame.quit()

# <<<< ここに、Car, GasStation, TrafficSignal クラスを入れる >>>

def carGenerator(env):
    global cars
    cars = []
    for i in itertools.count():
        yield env.timeout(random.expovariate(LAMBDA))	# 指数分布確率でCarを発生
        cars.append(Car(env, i))

if __name__ == '__main__':
    main()
    
◆Car クラス部分
class Car(object):
    def __init__(self, env, no):
        self.name = 'Car-' + str(no)
        self.exist = True
        self.posTime = [0, env.now]	# 主なステージと到着時間
        self.reqGas = 0		# Fuel Pumpでの状態:0=要求中/1=注入中
        self.litersRequired = 0	# 要求Fuel量
        self.carColor = random.randint(0, 5)	# アニメ用車の画像番号
        env.process(self.drive(env))
        
    def drive(self, env):
        global gasStation
        global trafficSignal
        yield env.timeout(10)	# 発生~信号機までの時間
        self.posTime = [1, env.now]	# 信号機待ち
        yield env.timeout(5)	# 信号機確認の時間
        yield trafficSignal.passBlueSignal.get(1)	# 青信号機確認
        self.posTime = [2, env.now]	# Gas Station待ち
        yield env.timeout(10)	# 信号機~Gas Stationまでの時間
        request = gasStation.fuelPumps.request()	# Fule Pump利用要求
        yield request	# Fule Pump利用要求
        pumpNo = gasStation.pumpOccupy()	# 利用可能な Fule Pump No
        self.posTime = [3 + pumpNo, env.now]		# Fuel注入待ち
        carFuelTankLevel = random.randint(*CAR_FUEL_TANK_LEVEL)
        self.litersRequired = CAR_FUEL_TANK_SIZE - carFuelTankLevel	# CarのFule要求量
        self.reqGas = 0		# Fuel要求中
        yield gasStation.fuelTank.get(self.litersRequired)		# Fuel要求
        self.reqGas = 1		# Fuel注入中
        yield env.timeout(self.litersRequired / REFUELING_SPEED)		# Fuel注入に必要な時間
        self.exist = False
        self.posTime = [5, env.now]	# シーンから退去
        gasStation.fuelPumps.release(request)	# Resourceのリリース
        gasStation.pumpRelease(pumpNo)

◆GasStationクラス部分
class GasStation(object):
    def __init__(self, env, num):
        self.numPumps = num
        self.pumpBusy = [False] * num 	# Fuel Pumpの利用状況
        self.callTime = [0, env.now]	# 
        self.fuelPumps = simpy.Resource(env, num)
        self.fuelTank = simpy.Container(env, GAS_STATION_SIZE, init=GAS_STATION_SIZE)
        env.process(self.gasControl(env))

    def pumpOccupy(self):	# Fuel Pumpの識別管理
        for no in range(self.numPumps):
            if self.pumpBusy[no] == False:
                self.pumpBusy[no] = True
                return no
    
    def pumpRelease(self, no):
        self.pumpBusy[no] = False

    def gasControl(self, env):
        while True:
            if self.fuelTank.level / self.fuelTank.capacity * 100 < THRESHOLD:	# Fuel Tank残量の確認
                self.callTime = [1, env.now]		# Tank Truck Call Request
                yield env.process(self.tankTruck(env))	# Tank Truckのプロセス
            yield env.timeout(10)  # 10sec毎にFuel Tank量のチェック

    def tankTruck(self, env):
        yield env.timeout(random.randint(*TANK_TRUCK_TIME))	# Tank Truckが到着するまでの時間
        self.callTime = [2, env.now]		# Tank Truck Arrived
        ammount = self.fuelTank.capacity - self.fuelTank.level
        yield env.timeout(ammount / GAS_POURING_SPEED)	# Fuel Tankを満杯にするまでの時間
        yield self.fuelTank.put(ammount)	# Fuel Tankを満杯にセット
        self.callTime = [0, env.now]	# Normal Operation

◆TrafficSignalクラス部分
class TrafficSignal(object):
    def __init__(self, env, blueTime, redTime):
        self.blueTime = blueTime
        self.redTime = redTime
        self.blueLight = True 
        self.passCapacity = blueTime		# 青信号で通過できる最大台数(1台/sec)
        self.passBlueSignal = simpy.Container(env, self.passCapacity, init=self.passCapacity)
        env.process(self.signalControl(env))

    def signalControl(self, env):
        while True:
            yield env.timeout(self.blueTime)
            self.blueLight = False
            getAmount = self.passBlueSignal.level
            yield self.passBlueSignal.get(getAmount)		# 赤信号では通過台数をゼロにセット
            yield env.timeout(self.redTime)
            self.blueLight = True
            putAmount = self.passCapacity - self.passBlueSignal.level
            yield self.passBlueSignal.put(putAmount)	# 青信号では通過台数を最大にセット

 

●シミュレーションのアニメーション画像

シミュレーション時間3000secをfactor=0.1(10倍速)のリアルタイムで記録したアニメーション動画をYouTube動画で示します。