Pythonで物体の動きを可視化する方法::Optical flow画像をPySimpleGUIで表示させ自動保存できるようにしてみた話

2020年12月28日

Optical flow Image

今日は物体の動きを可視化するプログラムを紹介します。

可視化にはオプティカルフローを使います。

オプティカルフローとは、2枚の連続する画像間で物体が移動した軌跡を線として表示するものです。移動量が2Dのベクトル場として確認できます。

コンピュータビジョンでは昔から研究されてきた分野で、動き推定や物体追跡などに応用されています。

動き推定(人、動物、車、流体、風向きなど)や物体追跡(畑を荒らす動物の自動撮影、防犯カメラで犯人の自動撮影など)したい方にはメリットがあるかもしれません。

アイデア次第で色々作れると思います。参考にしてみてください。

本プログラムの特徴

  • オプティカルフロー画面と通常画面(ライブ画面)の二画面構成
  • それぞれの画面をCaptureボタンで保存可能
  • Parameterチェックボタンにチェックを入れると、物体の移動量や基準点間隔をスライダーで変更可能
  • Captureチェックボタンにチェックを入れると、設定したパラメータに応じて画像の自動保存が可能

実行画面

以下の映像をご覧ください。

プログラム

以下のコードをみてください。

import cv2
import time
import datetime
import numpy as np
import PySimpleGUI as sg

### window の色を設定 ###
sg.theme('Dark Blue 3')

### 初期パラメータ設定 ###
c = []        # 計算した移動量を保存するリスト
no = 0        # キャプチャした画像のナンバリング用 
step_0 = 16   # optical flow画面の点と点の間隔(初期値)
line_0 = 50   # 移動量の大きさ(初期値)
cam_ID = 3    # USB cameraのID

### ボタン関係の設定 ###

# テキスト設定
s_moji_1 = sg.Text('Point step', size=(15, 1), font=('メイリオ', 13))
s_moji_2 = sg.Text('Line length', size=(15, 1), font=('メイリオ', 13))

# 線設定
s_sen_1 = sg.Text('_'  * 50)
s_sen_2 = sg.Text('_'  * 50)
s_sen_3 = sg.Text('_'  * 50)
s_sen_4 = sg.Text('_'  * 50)

# チェックボタン設定
s_check_1 = sg.Checkbox('Parameter ON', default=False, size=(15,1), font=('メイリオ', 13), key='-optical flow-') 
s_check_2 = sg.Checkbox('Capture ON', default=False, size=(15,1), font=('メイリオ', 13), key='-capture optical flow-') 

# スライダー設定
s_Slider_1 = sg.Slider((0, 30), 16, 1, orientation='h', size=(50, 15), key='-point_step-')
s_Slider_2 = sg.Slider((0, 100), 50, 1, orientation='h', size=(50, 15), key='-line_length-')

# 実行ボタン設定
s_button_1 = sg.Submit(button_text='Capture of Optical flow image', size=(50, 7))
s_button_2 = sg.Submit(button_text='Capture of Live image', size=(50, 7))
s_button_3 = sg.Submit(button_text='Quit', size=(50, 7), button_color=('black', '#4adcd6'))

# 文字出力ウィンドウ設定
s_out_1 = sg.Output(size=(250,20), key='-OUTPUT-')

# image
s_image_1 = sg.Image(filename='', key='-IMAGE-')
s_image_2 = sg.Image(filename='', key='-IMAGE_2-')

### 画面レイアウト設定 ###
""" 設定パラメータはフレームでまとめた方が便利 """

layout_0 = sg.Frame('',
                    [
                     [s_check_1, s_check_2],
                     [s_sen_1],
                     [s_moji_1],
                     [s_Slider_1],
                     [s_sen_2],
                     [s_moji_2],
                     [s_Slider_2],
                     [s_sen_3],  
                     [s_button_1],
                     [s_button_2],
                     [s_sen_4],
                     [s_button_3]
                    ],
                    relief=sg.RELIEF_SUNKEN
                   )

### レイアウト設定 ###
layout_1 = [
            [s_image_1, s_image_2, layout_0],
            [s_out_1]
           ]

### ウィンドウ生成 ###
window = sg.Window('Optical flow Image', layout_1,
                                    location=(0, 0),
                                    alpha_channel=1.0,
                                    no_titlebar=False,
                                    grab_anywhere=False).Finalize()
window.Maximize()


### optical flowを計算するための関数 ###
def draw_flow(im, flow, step=step_0):
    global c

    h, w = im.shape[:2]
    y, x = np.mgrid[step//2:h:step, step//2:w:step].reshape(2, -1)
    fx, fy = flow[y, x].T

    lines = np.vstack([x, y, x+fx, y+fy]).T.reshape(-1, 2, 2)
    lines = np.int32(lines)

    vis = cv2.cvtColor(im, cv2.COLOR_GRAY2BGR)

    for (x1, y1), (x2, y2) in lines:
        cv2.line(vis, (x1, y1), (x2, y2), (0, 255, 0), 1)
        cv2.circle(vis, (x1, y1), 1, (0, 255, 0), -1)

        # 変化前後で出来る軌跡の長さを計算
        # 軌跡が長いと変化が大きいことを示している
        r = np.sqrt((x1-x2)**2 + (y1-y2)**2)
        c.append(r)
    
    # printした文字は、文字出力ウィンドウに表示される
    print('Line length :', np.amax(c))
    
    return vis

### capture セクション ###
cap = cv2.VideoCapture(cam_ID)

cap.set(3, 1280)
cap.set(4, 720)
cap.set(5, 30)

# 初回キャプチャ
_, frame = cap.read()

# 720×720ピクセルサイズにスライシング
frame = frame[0:720, 280:1000]

# 初回画像のBGRの色順の画像をgray色の画像に変換
prev_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

### イベントループ ###
while True:
    # 時間測定開始
    t1 = time.perf_counter()

    event, values = window.read(timeout=20)

    if event == 'Quit' or event == sg.WIN_CLOSED:
        break

    _, frame_1 = cap.read()

    # 720×720ピクセルサイズにスライシング
    frame_2 = frame_1[0:720, 280:1000]

    # BGRの色順の画像をgray色の画像に変換
    gray = cv2.cvtColor(frame_2, cv2.COLOR_BGR2GRAY)

    # オプティカルフロー関数を使用
    flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    prev_gray = gray
    
    # 前に定義したdraw_flow関数で計算して結果をframe_dに代入
    frame_d = draw_flow(gray, flow, step_0)

    if event is None:
        print('exit')
        break
    
    # optical flow ONのチェックボタンがチェックされたときに実行
    if values['-optical flow-']:
        if values['-point_step-']:
            step_0 = int(values['-point_step-'])

        elif values['-line_length-']:
            line_0 = int(values['-line_length-'])

    # capture ONのチェックボタンがチェックされたときに実行
    if values['-capture optical flow-']:
        if np.amax(c) >= line_0:
            cv2.imwrite("./cnn_act/capture/" + str(no) + ".jpg", frame_d)
            no += 1
            c = []

    ### 画像の保存 ###

    # 日付の取得
    d_today = datetime.date.today()
    dt_now = datetime.datetime.now()

    if event == 'Capture of Optical flow image':
        cv2.imwrite('./cnn_act/capture/' +
                    str(d_today) + str("_") +
                    str(dt_now.hour) + str("_") +
                    str(dt_now.minute) + str("_") +
                    str(dt_now.second) + '.jpg', frame_d)

    elif event == 'Capture of Live image':
        cv2.imwrite('./cnn_act/capture/' +
                    str(d_today) + str("_") +
                    str(dt_now.hour) + str("_") +
                    str(dt_now.minute) + str("_") +
                    str(dt_now.second) + '.jpg', frame_2)

    ### FPS 計算 ###
    elapsedTime = time.perf_counter() - t1
    fps = "{:.0f}FPS".format(1/elapsedTime)

    # 画面にfpsを表示
    frame_2 = cv2.putText(frame_2, fps, (15, 35),
                          cv2.FONT_HERSHEY_PLAIN, 2, (0, 255, 255), 2, cv2.LINE_AA)

    # 映像をpng画像に変換してwindow画面を更新
    imgbytes = cv2.imencode('.png', frame_d)[1].tobytes()
    window['-IMAGE-'].update(data=imgbytes)

    imgbytes_2 = cv2.imencode('.png', frame_2)[1].tobytes()
    window['-IMAGE_2-'].update(data=imgbytes_2)

    # 初期化
    c = []

    if event == 'Quit':
        print('Quit')
        break

cap.release()
cv2.destroyAllWindows()

# ウィンドウの破棄と終了
window.close()

ポイント解説

1.コードの先頭付近でパラメータをまとめて定義する

パラメータは後で数値を変えたり追加する場合があります。

コードの先頭付近に記載されていると、追加や修正が容易になるのでおすすめです。

2.ボタン類は先にまとめて定義しておく

PySimpleGUIでコードを記載するときは、ボタン類を先にまとめて定義しておきます。

理由は、画面レイアウトを作成するとき楽になるからです。

画面レイアウトはリストで作成するため、リストの中のコードが長いと見にくいです。

リストの中のコードを短くすることで視認性が上がり修正忘れが防げます。

3.Optical flow の計算

def draw_flow( ) の部分でOptical flow を計算します。

Optical flow の計算部分はこちらの本を参考に作成しています。

ただ、コードが古いためそのままでは動かないので少し修正しています。

また、画像の変化前後で出来る軌跡の長さを三平方の定理で計算しています。

今回は物体の移動量の大きさに応じで画像を自動保存できるようにしたかったので、私の方でコードを追加しました。

r = np.sqrt((x1-x2)**2 + (y1-y2)**2)
c.append(r)

変化前後の座標情報から長さを計算しています。

また、c.append(r) の部分で計算した長さをリスト c に保存しています。

次に、以下のprint( ) 文で リスト c の中で最も大きな値を出力するようにしています。

リスト c の中で最も大きな値を計算するときは、np.amax(c) を使います。

print('Line length :', np.amax(c))

NumPyには便利な関数が多数用意されています。

自分のやりたいことを速く見つけたいときは、ググるのも良いですがハンドブック的なものを一つ持っておくと便利です。参考までに私はこれを使っています。

なお、print( ) の出力は以下で定義した文字出力ウィンドウに表示されます。

s_out_1 = sg.Output(size=(250,20), key='-OUTPUT-')

print( ) で出力しておくと物体の移動量の大きさをリアルタイムで確認できるので便利です。

4.オプティカルフロー関数

オプティカルフローはOpenCVの関数を使用します。以下の部分です。

flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)

計算には連続した2枚の画像が必要です。

このため、繰り返し処理を定義している while True : の前に一枚目の画像を準備しておく必要があります。

それが以下の部分です。

# 初回キャプチャ
_, frame = cap.read()

# 720×720ピクセルサイズにスライシング
frame = frame[0:720, 280:1000]

# 初回画像のBGRの色順の画像をgray色の画像に変換
prev_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

cap.read( ) の部分でUSBカメラからキャプチャした画像をframeに代入し、720×720ピクセルサイズにスライシングしてからグレイ画像に変換しています。

グレイ画像にする理由は、オプティカルフローはグレイ画像で計算するためです。

まずこの prev_gray 画像が一枚目の画像になります。

次に、以下の部分で二枚目の画像(gray)を準備します。

_, frame_1 = cap.read()

# 720×720ピクセルサイズにスライシング
frame_2 = frame_1[0:720, 280:1000]

# BGRの色順の画像をgray色の画像に変換
gray = cv2.cvtColor(frame_2, cv2.COLOR_BGR2GRAY)

# オプティカルフロー関数を使用
flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
prev_gray = gray
    
# 前に定義したdraw_flow関数で計算して結果をframe_dに代入
frame_d = draw_flow(gray, flow, step_0)

一枚目の画像 prev_gray と二枚目の画像 gray からオプティカルフロー関数(cv2.calcOpticalFlowFarneback( ))で変化後の座標(flow)を計算します。

その後、draw_flow( ) 関数でオプティカルフロー画像(frame_d)を計算します。

ちなみに、prev_gray = gray の部分は、二枚目の画像であるgrayを次回計算時に一枚目の画像prev_grayとして扱うための準備です。

ちょっと複雑ですがやっていることは単純です。

コーヒーでも飲みながらゆっくりとコードをながめてみてください。

まとめ

  • コードの先頭付近でパラメータをまとめて定義する
  • ボタン類は先にまとめて定義しておく
  • 物体の変化前後の座標情報から長さを計算し画像保存するかしないかの判定に使用
  • オプティカルフローの計算はOpenCVの関数を使用