PythonでOptical flow映像から最も動きの大きな部分を検出し別画面表示できるようにしてみた

Optical flow Image

今日はOptical flowの応用編を紹介します。

以前ご紹介したOptical flowプログラムは物体の移動量を可視化するだけでした。

今回は物体の移動量が最も大きな部分を計算し、その検出点周りを枠で囲って別画面に表示できるようにしてみました。

前回プログラムをベースに細かな修正を加えています。比較しながら見て頂くと新しい気づきがあると思います。

本プログラムの特徴

  • オプティカルフロー画面と通常画面(ライブ画面)と最大移動量検出画面の三画面構成
  • それぞれの画面をCaptureボタンで保存可能
  • 物体の移動量や基準点間隔および検出枠のサイズをスライダーで変更可能

実行画面

以下の映像をご覧ください。

プログラム

以下のコードをみてください。

import cv2
import time
import datetime
import numpy as np
import PySimpleGUI as sg
from PIL import Image


### window の色を設定 ###
sg.theme('Dark Blue 3')

### 初期パラメータ設定 ###
c = []        # 計算した移動量を保存するリスト
c1 = []       # 検出点を保存するリスト
no = 0        # キャプチャした画像のナンバリング用 
step_0 = 16   # optical flow画面の点と点の間隔(初期値)
line_0 = 30   # 移動量の大きさ(初期値)
cam_ID = 0    # USB cameraのID
X_box = 50    # X方向のボックスサイズ
Y_box = 50    # Y方向のボックスサイズ

### ボタン関係の設定 ###

# テキスト設定
s_moji_1 = sg.Text('Point step', size=(15, 1), font=('メイリオ', 10))
s_moji_2 = sg.Text('Line length', size=(15, 1), font=('メイリオ', 10))
s_moji_3 = sg.Text('X box size', size=(15, 1), font=('メイリオ', 10))
s_moji_4 = sg.Text('Y box size', size=(15, 1), font=('メイリオ', 10))

# 線設定
s_sen_1 = sg.Text('_'  * 50)
s_sen_2 = sg.Text('_'  * 50)

# スライダー設定
s_Slider_1 = sg.Slider((0, 30), 16, 1, orientation='h', size=(50, 14), key='-point_step-')
s_Slider_2 = sg.Slider((0, 100), 30, 1, orientation='h', size=(50, 14), key='-line_length-')
s_Slider_3 = sg.Slider((0, 200), 50, 1, orientation='h', size=(50, 14), key='-X_box-')
s_Slider_4 = sg.Slider((0, 200), 50, 1, orientation='h', size=(50, 14), key='-Y_box-')

# 実行ボタン設定
s_button_1 = sg.Submit(button_text='Capture of Optical flow image', size=(50, 3))
s_button_2 = sg.Submit(button_text='Capture of Live image', size=(50, 3))
s_button_3 = sg.Submit(button_text='Capture of Box image', size=(50, 3))
s_button_4 = sg.Submit(button_text='Quit', size=(50, 3), button_color=('black', '#4adcd6'))

# 文字出力ウィンドウ設定
s_out_1 = sg.Output(size=(130,20), font=('メイリオ', 12) , key='-OUTPUT-')

# image
s_image_1 = sg.Image(filename='', key='-IMAGE-')
s_image_2 = sg.Image(filename='', key='-IMAGE_2-')
s_image_3 = sg.Image(filename='', key='-IMAGE_3-')

### 画面レイアウト設定 ###
""" 設定パラメータはフレームでまとめた方が便利 """

layout_0 = sg.Frame('',
                    [
                     [s_image_3],
                     [s_sen_1],
                     [s_moji_1],
                     [s_Slider_1],
                     [s_moji_2],
                     [s_Slider_2],
                     [s_moji_3],
                     [s_Slider_3],
                     [s_moji_4],
                     [s_Slider_4],
                     [s_sen_2],
                     [s_button_1],
                     [s_button_2],
                     [s_button_3],
                     [s_button_4]
                    ],
                    relief=sg.RELIEF_SUNKEN
                   )

layout_2 = sg.Frame('',
                    [
                     [s_image_1, s_image_2],
                     [s_out_1]
                    ],
                    relief=sg.RELIEF_SUNKEN
                   )


### レイアウト設定 ###
layout_1 = [
            [layout_2, layout_0],
           ]

### ウィンドウ生成 ###
window = sg.Window('Optical flow Image', layout_1,
                                    location=(0, 0),
                                    alpha_channel=1.0,
                                    no_titlebar=False,
                                    grab_anywhere=False).Finalize()
window.Maximize()


### optical flowを計算するための関数 ###
def draw_flow(im, flow, step=step_0):
    global c, c1

    h, w = im.shape[:2]
    y, x = np.mgrid[step//2:h:step, step//2:w:step].reshape(2, -1)
    fx, fy = flow[y, x].T

    lines = np.vstack([x, y, x+fx, y+fy]).T.reshape(-1, 2, 2)
    lines = np.int32(lines)

    #print('lines=', lines)

    vis = cv2.cvtColor(im, cv2.COLOR_GRAY2BGR)

    for (x1, y1), (x2, y2) in lines:
        cv2.line(vis, (x1, y1), (x2, y2), (0, 255, 0), 1)
        cv2.circle(vis, (x1, y1), 1, (0, 255, 0), -1)

        # 変化前後で出来る軌跡の長さを計算
        # 軌跡が長いと変化が大きいことを示している
        r = np.sqrt((x1-x2)**2 + (y1-y2)**2)
        c.append(r)
        c1.append([x1, x2, y1, y2])
    
    # printした文字は、文字出力ウィンドウに表示される
    print('Line length :', np.amax(c), 'index =', np.argmax(c))

    print('c1=', c1[np.argmax(c)])
    print('c1[np.argmax(c)][0]=', c1[np.argmax(c)][0])
    
    return vis

### capture セクション ###
cap = cv2.VideoCapture(cam_ID)

cap.set(3, 1280)
cap.set(4, 720)
cap.set(5, 30)

# 初回キャプチャ
frame = cap.read()[1]

# 720×720ピクセルサイズにスライシング
frame = frame[0:720, 280:1000]

# 初回画像のBGRの色順の画像をgray色の画像に変換
prev_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)


### イベントループ ###
while True:
    # 時間測定開始
    t1 = time.perf_counter()

    event, values = window.read(timeout=20)

    if event == 'Quit' or event == sg.WIN_CLOSED:
        break

    frame_1 = cap.read()[1]

    # 720×720ピクセルサイズにスライシング
    frame_2 = frame_1[0:720, 280:1000]

    # BGRの色順の画像をgray色の画像に変換
    gray = cv2.cvtColor(frame_2, cv2.COLOR_BGR2GRAY)

    # オプティカルフロー関数を使用
    flow = cv2.calcOpticalFlowFarneback(prev_gray, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    prev_gray = gray
    
    # 前に定義したdraw_flow関数で計算して結果をframe_dに代入
    frame_d = draw_flow(gray, flow, step_0)

    if event is None:
        print('exit')
        break
    
    # 検出点の間隔を変更
    if values['-point_step-']:
        step_0 = int(values['-point_step-'])

    # 検出長さを変更
    if values['-line_length-']:
        line_0 = int(values['-line_length-'])


    # 検出ボックスのサイズを変更
    if values['-X_box-']:
        a0 = int(values['-X_box-'])

    if values['-Y_box-']:
        b0 = int(values['-Y_box-'])


    ### 画像の保存 ###

    # 日付の取得
    d_today = datetime.date.today()
    dt_now = datetime.datetime.now()

    if event == 'Capture of Optical flow image':
        cv2.imwrite('./cnn_act/capture/' +
                    str(d_today) + str("_") +
                    str(dt_now.hour) + str("_") +
                    str(dt_now.minute) + str("_") +
                    str(dt_now.second) + '.jpg', frame_d)

    elif event == 'Capture of Live image':
        cv2.imwrite('./cnn_act/capture/' +
                    str(d_today) + str("_") +
                    str(dt_now.hour) + str("_") +
                    str(dt_now.minute) + str("_") +
                    str(dt_now.second) + '.jpg', frame_2)

    elif event == 'Capture of Box image':
        cv2.imwrite('./cnn_act/capture/' +
                    str(d_today) + str("_") +
                    str(dt_now.hour) + str("_") +
                    str(dt_now.minute) + str("_") +
                    str(dt_now.second) + '.jpg', frame_3)

    ### FPS 計算 ###
    elapsedTime = time.perf_counter() - t1
    fps = "{:.0f}FPS".format(1/elapsedTime)

    # 画面にfpsを表示
    frame_2 = cv2.putText(frame_2, fps, (15, 35),
                          cv2.FONT_HERSHEY_PLAIN, 2, (0, 255, 255), 2, cv2.LINE_AA)

    ### 最大移動量の点の中心座標を計算 ###
    x1_c = abs(int(((c1[np.argmax(c)][1])+(c1[np.argmax(c)][0]))//2) - a0)
    x2_c = abs(int(((c1[np.argmax(c)][1])+(c1[np.argmax(c)][0]))//2) + a0)
    y1_c = abs(int(((c1[np.argmax(c)][3])+(c1[np.argmax(c)][2]))//2) - b0)
    y2_c = abs(int(((c1[np.argmax(c)][3])+(c1[np.argmax(c)][2]))//2) + b0)

    print('x1_c, x2_c, y1_c, y2_c :', x1_c, x2_c, y1_c, y2_c)

    # 検出画像表示用のブランク画面
    frame_3 = np.zeros((400, 400, 3), np.float32)

    if np.amax(c) > line_0:

        # (image, (x1, y1), (x2, y2), (B,G,R), thickness)
        cv2.rectangle(frame_2, (x1_c, y1_c), (x2_c, y2_c), (0, 255, 255), 2)  # 黄色枠
        
        # frame_2をトリミング
        img0 = frame_2[y1_c:y2_c, x1_c:x2_c]

        h, w = img0.shape[:2]

        if w > h:
            img0 = Image.fromarray(img0)
            img_back = Image.new(img0.mode, (w, w), (0, 0, 0))
            img_back.paste(img0, (0, (w - h)//2))
            img0 = img_back.resize((400, 400), Image.LANCZOS)
            img0 = np.asarray(img0)  # asarrayにするとimg0のコピーを作らず元画像に同期してndarrayに変換

        elif h > w:
            img0 = Image.fromarray(img0)
            img_back = Image.new(img0.mode, (h, h), (0, 0, 0))
            img_back.paste(img0, ((h - w)//2, 0))
            img0 = img_back.resize((400, 400), Image.LANCZOS)
            img0 = np.asarray(img0)

        else:
            img0 = Image.fromarray(img0)
            img_back = Image.new(img0.mode, (w, h), (0, 0, 0))
            img_back.paste(img0, (0, 0))
            img0 = img_back.resize((400, 400), Image.LANCZOS)
            img0 = np.asarray(img0)

        frame_3 = img0


    # 映像をpng画像に変換してwindow画面を更新
    imgbytes = cv2.imencode('.png', frame_d)[1].tobytes()
    window['-IMAGE-'].update(data=imgbytes)

    imgbytes_2 = cv2.imencode('.png', frame_2)[1].tobytes()
    window['-IMAGE_2-'].update(data=imgbytes_2)

    imgbytes_3 = cv2.imencode('.png', frame_3)[1].tobytes()
    window['-IMAGE_3-'].update(data=imgbytes_3)

    # 初期化
    c = []
    c1 = []

    if event == 'Quit':
        print('Quit')
        break

cap.release()
cv2.destroyAllWindows()

# ウィンドウの破棄と終了
window.close()

ポイント解説

1.出力文字のフォントと文字サイズを変更できるように引数を追加

以下のコードをみてください。

s_out_1 = sg.Output(size=(130,20), font=('メイリオ', 12) , key='-OUTPUT-')

第二引数の font=( ) を追加することで、出力文字のフォントと文字サイズを変更することができます。

なお、第二引数を削除するとデフォルトのフォントと文字サイズになります。

2.最大移動量を示す点の中心座標を計算

一例として以下のコードを見てください。

x1_c = abs(int(((c1[np.argmax(c)][1])+(c1[np.argmax(c)][0]))//2) - a0)

まず、np.argmax ( c ) の部分について説明します。

この部分では、 def draw_flow( ) の部分で計算したリスト c について、最大値が保存されている値のインデックス(何番目か)を求めています。

次に、求めたインデックスを使ってリストc1に保存されている座標を調べます。

リストc1 には[ x1, x2, y1, y2 ]の順番で各座標の値がセットで保存されています。

先ほど求めたインデックスをc1[ ][ 1 ]に入れると、最大値が保存されたときのx2座標が取得できます。

同様に、c1[ ][ 0 ]に先ほどのインデックスを入れると、最大値が保存されたときのx1座標が取得できます。

c1[ ][ 2 ]にインデックスを入れるとy1座標、 c1[ ][ 3 ]にインデックスを入れるとy2座標が取得できます。

なお、a0 はBOXのサイズを決める値になります。この値は、スライダーの値を読み込んで変更できるようになっています。

以上の計算から、最終的に x1_c を求めます。

同様に、x2_c、y1_c、y2_c も求めます。

さらっと読むと理解するのが難しいかもしれませんが、焦らずにコードの意味を理解してみてください。

3.検出枠を描く

上記で計算した座標(x1_c, x2_c, y1_c, y2_c)を使って、ライブ画面に検出枠を描きます。

以下のコードをみてください。

cv2.rectangle(frame_2, (x1_c, y1_c), (x2_c, y2_c), (0, 255, 255), 2)  # 黄色枠

このコードで黄色の四角枠が描けます。

第四引数は枠の色指定部分で(Blue, Green, Red)の順番です。

(0, 0, 0)で黒、(255, 255, 255)で白となります。

GreenとRedを255としたので加法混色により黄色となります。

4.検出画像用のブランク画面を用意

何も検出していないときでもブランク画面は表示させておく必要があります。

検出されたときだけ画面表示されるようにすると、画面のレイアウトが崩れとても見にくくなってしまいます。

ブランク画面としては黒背景の画面が便利です。

今回は以下のコードで作成しています。

frame_3 = np.zeros((400, 400, 3), np.float32)

np.zeros( ) で(B, G, R)の全要素がゼロの配列が作成されます。つまり黒画面です。

カッコ内の第二引数をnp.float32にしているのは処理速度が早いからです。

5.検出画像と背景の合成

先ほど用意した黒画面サイズは常に400×400サイズです。

黒画面サイズと検出画像サイズが一致しないと、画像サイズのミスマッチによりエラーが発生します。

これを避けるため、ちょっとした工夫が必要になります。

以下のコードをみてください。

img0 = frame_2[y1_c:y2_c, x1_c:x2_c]

img0 = Image.fromarray(img0)
img_back = Image.new(img0.mode, (w, w), (0, 0, 0))
img_back.paste(img0, (0, (w - h)//2))
img0 = img_back.resize((400, 400), Image.LANCZOS)
img0 = np.asarray(img0)  # asarrayにするとimg0のコピーを作らず元画像に同期してndarrayに変換

一番上のコードで、座標(x1_c, x2_c, y1_c, y2_c)を使って frame_2 からスライス(トリミング)した画像を img0 に代入します。

次に、img0 を Image.fromarray( ) に入れ、画像をnumpy配列からImage形式に変換します。その上で Image.new( ) で黒背景を作り、img_back.paste( ) で検出画像と黒背景を合成して黒画面サイズ(400×400)に合うようにresize( ) します。その後、np.asarray( ) で合成画像をnumpy配列に戻して黒画面に表示できるようにします。

こうすることで、検出画像のサイズによらず常に400X400サイズの画像を表示させることができます。

まとめ

  • 最大移動量を示す点の中心座標を元に検出枠の座標を計算
  • 検出画像を別画面表示させる際は、検出画像サイズが常に変化することを踏まえ画像サイズが常に一定になるような工夫が必要

参考図書

今回のポイントは画像のデータ処理です。そして、画像のデータ処理と言えばNumpyです。Numpyを使う理由は処理速度が速いからです。私の実体験として、Numpyの理解が深まるほどやりたいことができるようになります。私の愛読書です。かなり使える本なのでオススメです。

また、optical flowについてさらに知りたい方には以下の本をオススメします。コンピュータビジョン全般について書かれた本です。参考となるコードが豊富です。 たまに読み返すとアイデアがもらえます。コンピュータビジョンに興味のある方は必携です。