【実験】学習済のSSDを使ってPaPeRo i に複数種類の物体を数えさせる

PaPeRo i のカメラで撮影した画像に含まれる複数の物体の種類と数を PaPeRo iに発話させる方法の一例を紹介します。
画像からの物体検出には、SSD(Single Shot MultiBox Detector)のKerasによる実装の一つを使用します。
SSDは深層学習を利用して、画像に含まれる複数の物体の種類と位置を検出する技術の一つですが、今回は自前での学習を行わず、インターネット上に公開されている学習済モデルを使用します。
SSDはWindows PC 上で動作させます。
今回の方法では認識できる物体の種類は学習済の20種類のみであるため実用性はあまりありませんが、自前で学習させる事により任意の物体を認識できるようになれば、様々な応用が期待できるのではないかと思います。

手順(PaPeRo i 側)

(1) PaPeRo i 制御用WebSocket通信アドオンシナリオをまだインストールしていない場合は、「PaPeRo iをRaspberry Pi上のpythonから操作する」の「PaPeRo iにアドオンシナリオをインストール」に従ってインストール作業を行います。

手順(Windows PC側)

(1) https://conda.io/miniconda.html から、Python3.7用のWindows 64-bit (exe installer) をダウンロードします。

(2) ダウンロードしたファイル(Miniconda3-latest-Windows-x86_64.exe)を実行します。

(3) 画面の指示に従って、インストールを完了させます。

(4) Windowsのスタートボタン→Anaconda3 (64bit)→Anaconda Prompt の順にクリックします。

(5) keras 専用の環境を作る為、Anaconda Prompt 上で、下記のコマンドを入力します。

conda create -n mykeras

(6) (5)で作った環境に移行する為、Anaconda Prompt 上で、下記のコマンドを入力します。

activate mykeras

プロンプトの先頭が、(base)から(mykeras)に変わります。

(7) 環境に、今回使用するSSDを動作させる為に必要なライブラリやツール等をインストールする為、Anaconda Prompt 上で下記のコマンドを入力します。

conda install keras
conda install opencv
conda install matplotlib
conda install imageio

(8) 今回使用するSSDをダウンロードします。
ダウンロード元は https://github.com/rykov8/ssd_keras です。
「Clone or download」ボタンを押し、現れる吹き出しの「Download ZIP」をクリックすると、zipファイルがダウンロードされます。

(9) ダウンロードしたzipファイルに含まれる ssd_keras-master フォルダを、任意の場所に配置します(配置場所によっては、後で説明する方法でサンプルコードを参照する事ができないため、ドキュメントフォルダ配下の場所を推奨します)。

(10) https://mega.nz/folder/7RowVLCL#q3cEVRK9jyOSB9el3SssIA から、weights_SSD300.hdf5 をダウンロードし、(9)で配置したssd_keras-masterの中に置きます。

(11) (9)でダウンロードしたSSDは、Keras v1.2.2、Tensorflow v1.0.0 の環境で動作確認が行われたものですが、2020年6月現在、(7)のコマンドでインストールされるライブラリのバージョン(Keras v2.3.1、Tensorflow v2.1.0)ではそのままでは動作しません。
対処方法として、

A ライブラリのバージョンを合わせる
B ダウンロードしたSSDを現在のライブラリで動作するように修正する

の2つが考えられますが、今回はBの方法を取ります。

まず ssd.pyについて、kerasの新しいバージョンに適合するように変更したファイルが https://gist.github.com/anonymous/4c3105119a233cb33926651c3ea1966c で公開されていますので、それををダウンロードし、(9)で配置したssd_keras-masterの中の ssd.py と置き換えます。
その後、置き換えたssd.pyについて、更に以下の変更を行います。

(変更前)
if K.image_dim_ordering() == 'tf':
↓
(変更後)
if K.image_data_format() == 'channels_last':

(12) (11)と同様の理由により、ssd_keras-masterの中の、ssd_layers.py について、以下の変更を行います。

(変更前)(2箇所)
if K.image_dim_ordering() == 'tf':
↓
(変更後)(2箇所とも)
if K.image_data_format() == 'channels_last':

(変更前)
def get_output_shape_for(self, input_shape):
↓
(変更後)
def compute_output_shape(self, input_shape):

(13) (11)と同様の理由により、ssd_keras-masterの中の、ssd_utils.py について、以下の変更を行います。

(変更前)
import tensorflow as tf
↓
(変更後)
import tensorflow.compat.v1 as tf
tf.disable_v2_behavior()

(14) 下記のコードをコピー・ペーストして、ssd_predict_papero.py というファイルを作成し、ssd_keras-master の中に置きます。
※ ****ユーザ名****、****パスワード**** の部分につきましては、PaPeRo i に一般ユーザでログインする際に使用するユーザ名とパスワードに置き換えて下さい。

import argparse
import time
from enum import Enum

import cv2
import keras
from keras.applications.imagenet_utils import preprocess_input
from keras.models import Model
from keras.preprocessing import image
import matplotlib.pyplot as plt
import numpy as np
from imageio import imread
import ws4py
from paramiko import SSHClient,AutoAddPolicy
from scp import SCPClient

from ssd import SSD300
from ssd_utils import BBoxUtility
import pypapero


class State(Enum):
    st0 = 10
    st1 = 11
    st2 = 12
    st3 = 13
    st4 = 14
    end = 999


def predict(img_path):
    img = image.load_img(img_path, target_size=(300, 300))
    img = image.img_to_array(img)
    inputs = [img]
    image_to_show = imread(img_path);
    inputs = preprocess_input(np.array(inputs))
    preds = model.predict(inputs, batch_size=1, verbose=1)
    results = bbox_util.detection_out(preds)

    # Parse the outputs.
    det_label = results[0][:, 0]
    det_conf = results[0][:, 1]
    det_xmin = results[0][:, 2]
    det_ymin = results[0][:, 3]
    det_xmax = results[0][:, 4]
    det_ymax = results[0][:, 5]

    # Get detections with confidence higher than 0.6.
    top_indices = [i for i, conf in enumerate(det_conf) if conf >= 0.6]

    top_conf = det_conf[top_indices]
    top_label_indices = det_label[top_indices].tolist()
    top_xmin = det_xmin[top_indices]
    top_ymin = det_ymin[top_indices]
    top_xmax = det_xmax[top_indices]
    top_ymax = det_ymax[top_indices]

    if do_show:
        colors = plt.cm.hsv(np.linspace(0, 1, 21)).tolist()
        plt.clf()
        plt.imshow(image_to_show / 255.)
        currentAxis = plt.gca()

    label_counts = []
    for i in range(len(voc_classes)):
        label_counts.append(0)

    for i in range(top_conf.shape[0]):
        score = top_conf[i]
        label = int(top_label_indices[i])
        label_counts[label - 1] += 1
        if do_show:
            xmin = int(round(top_xmin[i] * image_to_show.shape[1]))
            ymin = int(round(top_ymin[i] * image_to_show.shape[0]))
            xmax = int(round(top_xmax[i] * image_to_show.shape[1]))
            ymax = int(round(top_ymax[i] * image_to_show.shape[0]))
            label_name = voc_classes[label - 1][0]
            display_txt = '{:0.2f}, {}'.format(score, label_name)
            coords = (xmin, ymin), xmax-xmin+1, ymax-ymin+1
            color = colors[label]
            currentAxis.add_patch(plt.Rectangle(*coords, fill=False, edgecolor=color, linewidth=2))
            currentAxis.text(xmin, ymin, display_txt, bbox={'facecolor':color, 'alpha':0.5})

    if do_show:
        plt.pause(0.01)

    return label_counts


def make_str_speech(label_counts):
    rtnstr = ""
    for i in range(len(voc_classes)):
        if(label_counts[i] > 0):
            if(rtnstr != ""):
                rtnstr += "と、"
            rtnstr += voc_classes[i][1] + "が" + str(label_counts[i]) + str(voc_classes[i][2])
    if(rtnstr == ""):
        rtnstr = "認識できる物は何も見えません"
    else:
        rtnstr += "見えます"
    return rtnstr


def main(papero, host, do_disp):
    label_counts_prev = None
    prev_time = time.monotonic()
    past_time = 0
    interval_time = 0
    state = State.st0
    print("HOST=" + host)
    PORT = 22
    USER = "****ユーザ名****"
    PSWD = "****パスワード****"
    scp = None
    ssh = SSHClient() 
    ssh.set_missing_host_key_policy(AutoAddPolicy())
    ssh.connect(host, port=PORT, username=USER, password=PSWD)
    scp = SCPClient(ssh.get_transport())
    while state != State.end:
        messages = papero.papero_robot_message_recv(0.1)
        now_time = time.monotonic()
        delta_time = now_time - prev_time
        prev_time = now_time
        if messages is not None:
            msg_dic_rcv = messages[0]
        else:
            msg_dic_rcv = None
        if papero.errOccurred != 0:
            print("------Error occured(main()). Detail : " + papero.errDetail)
            break
        if state == State.st0:
            papero.send_start_speech("認識した物体の種類と数を発話します。座布団のボタンで終了します。")
            past_time = 0.0
            state = State.st1
        elif state == State.st1:
            past_time += delta_time
            if past_time > 0.5:
                papero.send_get_speech_status()
                state = State.st2
        elif state == State.st2:
            if msg_dic_rcv is not None:
                if msg_dic_rcv["Name"] == "getSpeechStatusRes":
                    if str(msg_dic_rcv["Return"]) == "0":
                        state = State.st3
                    else:
                        past_time = 0
                        state = State.st1
        elif state == State.st3:
            past_time += delta_time
            if past_time >= interval_time:
                papero.send_take_picture("JPEG", filename="tmp.jpg", camera="VGA")
                past_time = 0
                state = State.st4
        elif state == State.st4:
            if msg_dic_rcv is not None:
                if msg_dic_rcv["Name"] == "takePictureRes":
                    scp.get("/tmp/tmp.jpg")
                    label_counts_now = predict("tmp.jpg")
                    if (label_counts_prev is None) or (label_counts_now != label_counts_prev):
                        str_speech = make_str_speech(label_counts_now)
                        papero.send_start_speech(str_speech)
                        label_counts_prev = label_counts_now
                    past_time = 0.0
                    state = State.st1
        if msg_dic_rcv is not None:
            if msg_dic_rcv["Name"] == "detectButton":
                state = State.end


if __name__ == "__main__":
    np.set_printoptions(suppress=True)
    voc_classes = [
        #(名前(表示用), 名前(発話用), 数え方(発話用))
        ('Aeroplane', '飛行機', '台'),
        ('Bicycle', '自転車', '台'),
        ('Bird', '鳥', '羽'),
        ('Boat', 'ボート', '台'),
        ('Bottle', '瓶', '本'),
        ('Bus', 'バス', '台'),
        ('Car', '車', '台'),
        ('Cat', '猫', '匹'),
        ('Chair', '椅子', '台'),
        ('Cow', '牛', '頭'),
        ('Diningtable', 'ダイニングテーブル', '台'),
        ('Dog', '犬', '匹'),
        ('Horse', '馬', '頭'),
        ('Motorbike', 'モーターバイク', '台'),
        ('Person', '人', '人'),
        ('Pottedplant', '鉢植え', '鉢'),
        ('Sheep', '羊', '匹'),
        ('Sofa', 'ソファー', '匹'),
        ('Train', '列車', '本'),
        ('Tvmonitor', 'テレビモニター', '台')]
    NUM_CLASSES = len(voc_classes) + 1
    input_shape=(300, 300, 3)

    parser = argparse.ArgumentParser(description = "Usage:")
    parser.add_argument("host", type=str, help = "Host IP address")
    parser.add_argument("-img", help = "Display image", action='store_true')
    command_arguments = parser.parse_args()

    model = SSD300(input_shape, num_classes=NUM_CLASSES)
    model.load_weights('weights_SSD300.hdf5', by_name=True)
    bbox_util = BBoxUtility(NUM_CLASSES)

    simulator_id = ""
    robot_name = ""
    host = command_arguments.host
    do_show = command_arguments.img
    if do_show:
        plt.rcParams['figure.figsize'] = (8, 8)
        plt.rcParams['image.interpolation'] = 'nearest'

    ws_server_addr = "ws://" + host + ":8088/papero"
    papero = pypapero.Papero(simulator_id, robot_name, ws_server_addr)
    main(papero, host, do_show)
    papero.papero_cleanup()

(15) (7) PaPeRo i との通信や、画像ファイル転送の為に必要なライブラリをインストールする為、Anaconda Prompt 上で下記のコマンドを入力します。

conda install ws4py
conda install paramiko
conda install scp

(16)「PaPeRo iをRaspberry Pi上のpythonから操作する」の「Raspberry Piへ通信ライブラリをインストール」に従い、ライブラリ(pypapero.py)を配置します。配置場所は、ssd_keras-master の下とします。

(17) ssd_predict_papero.py を実行する為、Anaconda Prompt 上で、下記のコマンドを入力します。

cd ((9)で配置したssd_keras-masterのパス)
python ssd_predict_papero.py PaPeRoiのIPアドレス -img

実行すると、PaPeRo i が「認識した物体の種類と数を発話します。座布団のボタンで終了します」と発話した後、カメラに映った物体(上記コード中のvoc_classesに含まれる物)の種類と数を「椅子が1台と、人が1人と、テレビモニターが2台見えます」のように発話するようになります。
また、この時の物体認識状況が、下の画像のように表示されます。

画像での確認が不要であれば、コマンド入力時に -imgを省略する事により、画像での表示は行われなくなります。

【補足】使用したSSDに付属するサンプルコードを参照・実行する方法

今回使用したSSDには、付属のサンプル画像に対して物体認識をするためのサンプルコードが付属しており、上記のssd_predict_papero.pyはそれを改造したものです。
サンプルコードとの違い等を確認して頂くために、サンプルコードを参照・実行するための手順を紹介します。

以下の手順に先立ち、「手順(Windows PC側)」の(13)までを既に実施しているものとします。

(1) サンプルコードを参照するために必要なツールである Jupyter Notebook をインストールする為、Anaconda Prompt 上で下記のコマンドを入力します。

conda install notebook

(2) Windowsのスタートボタン→Anaconda3 (64bit)→Jupyter Notebook(mykeras) の順にクリックします。
ブラウザに Jupyter Notebook の画面が表示され、その中にファイルとフォルダの一覧が表示されるので、ssd_keras-masterの中の SSD.ipynb を探してクリックします。
すると、Pythonのサンプルコードと実行結果の一覧が表示されます。
左側にIn[x]と書かれた、背景が灰色の枠内に書かれている部分がサンプルコードです。

(3) サンプルコードの実行は、Jupyter Notebook 上で行う事も可能ですが、ここではPythonのスクリプトファイルとして実行する事にします。
Pythonのサンプルコード部分を順にコピー・ペーストで繋ぎ合わせて一つのテキストファイルを作成し、ファイル名を ssd_predict.py として保存します。

(4) (3)で作成したssd_predict.py について、以下の変更を行います。

(変更前)
from keras.backend.tensorflow_backend import set_session
↓
(削除)

(変更前)
from scipy.misc import imread
↓
(変更後)
from imageio import imread

(変更前)
import tensorflow as tf
↓
(削除)

(変更前)
%matplotlib inline
↓
(削除)

(変更前)
config = tf.ConfigProto()
config.gpu_options.per_process_gpu_memory_fraction = 0.45
set_session(tf.Session(config=config))
↓
(削除)

(変更前)
%%time
a = model.predict(inputs, batch_size=1)
b = bbox_util.detection_out(preds)
↓
(削除)

(5) ssd_predict.py を実行する為、Anaconda Prompt 上で下記のコマンドを入力します。

cd (手順(Windows PC側)の(9)で配置したssd_keras-masterのパス)
python ssd_predict.py

実行すると、ssd_keras-masterの中の pics フォルダ内の5枚の画像について物体認識処理が行われます。
結果は1枚ずつウィンドウに表示され、ウィンドウを閉じる毎に次の画像の結果の表示に進みます。


0