【実験】PaPeRo i の温度計の値から実際の温度をニューラルネットワークに推測させてみる

PaPeRo i の温度計による計測値から実際の温度をニューラルネットワークに推測させる方法について検討しました。

実験条件

本検討で行った実験は、いずれもエアコンの効いた室内の環境(気温25~30℃)で行いました。
実際の温度の参考値を得る為、BME280温湿度・気圧センサーモジュール(温度精度=±1℃、湿度精度=±3%)を使用しました。

データ記録方法

学習及び検証の為のデータを得る為、PaPeRo i のセンサーで計測した温湿度及び、その時のBME280による温湿度を記録しました。
首動作の有無による計測値の違いを考慮し、首を動かした場合と動かさなかった場合について記録を行いました。

記録に使用したプログラムを以下に示します。

import sys
import time
import codecs
from enum import Enum

import serial

import pypapero


class State(Enum):
    st0 = 10
    st1 = 11
    st2 = 12
    end = 999


def get_now_datetime_str():
    localtime_now = time.localtime()
    year = localtime_now.tm_year
    month = localtime_now.tm_mon
    day = localtime_now.tm_mday
    hour = localtime_now.tm_hour
    minute = localtime_now.tm_min
    sec =localtime_now.tm_sec
    str_datetime = str(year) + "/" + str(month) + "/" + str(day) + " " + \
        str(hour) + ":" + str(minute) + ":" + str(sec)
    return str_datetime


def append_to_file(fname, out_str):
    f = None
    try:
       f = codecs.open(fname, "a", "shift_jis")
    except IOError:
        print("Cannot open file " + str(fname))
    if f is not None:
        f.write(out_str + "\r\n")
        f.close()


def get_sensor_value_from_serial(serial_line):
    list_name_value = serial_line.split(":")
    tem_serial = None
    hyg_serial = None
    atm_serial = None
    for name_value in list_name_value:
        if name_value[:3] == "tm=":
            tem_serial = int(name_value[3:])/100
        elif name_value[:3] == "hu=":
            hyg_serial = int(name_value[3:])/100
        elif name_value[:3] == "at=":
            atm_serial = int(name_value[3:])
    return tem_serial, hyg_serial, atm_serial


def main(papero, head_move):
    serial_buf = b""
    state = State.st0
    tem_serial = None
    hyg_serial = None
    atm_serial = None
    state_motor = State.st0
    while state != State.end:
        try:
            serial_byte = ser.read()
            if serial_byte == b'\n':
                serial_line = serial_buf.decode('UTF-8')
                serial_buf = b""
                tem, hyg, atm = get_sensor_value_from_serial(serial_line)
                if (tem is not None) and (hyg is not None) and (atm is not None):
                    print(serial_line)
                    tem_serial = tem
                    hyg_serial = hyg
                    atm_serial = atm
                    state = State.st1
            else:
                serial_buf += serial_byte
        except:
            pass
        messages = papero.papero_robot_message_recv(0.1)
        if messages is not None:
            msg_dic_rcv = messages[0]
        else:
            msg_dic_rcv = None
        if papero.errOccurred != 0:
            print("------Error occured(main()). Detail : " + papero.errDetail)
            break
        if state == State.st1:
            papero.send_get_sensor_value()
            state = State.st2
        elif state == State.st2:
            if (msg_dic_rcv is not None) and (msg_dic_rcv["Name"] == "getSensorValueRes"):
                if "TEM" in msg_dic_rcv:
                    tem_papero = msg_dic_rcv["TEM"]
                    hyg_papero = msg_dic_rcv["HYG"]
                    res_str = str(get_now_datetime_str()) + "," + \
                              str(tem_papero) + "," + str(hyg_papero)
                    res_str += "," + str(tem_serial) + "," + str(hyg_serial) + "," + str(atm_serial)
                    res_str += "," + str(head_move)
                    append_to_file("log.csv", res_str)
                    print(res_str)
                state = State.st0
        if head_move == 1:
            if state_motor == State.st0:
                papero.send_move_head(["A30T1000L", "A0T1000L", "R0T4000L"],
                                      ["A0T1000L", "R0T1000L", "A80T1000L", "A0T1000L", "A-80T1000L", "A0T1000L"])
                state_motor = State.st1
            elif state_motor == State.st1:
                if (msg_dic_rcv is not None) and (msg_dic_rcv["Name"] == "moveFinish"):
                    state_motor = State.st0


if __name__ == "__main__":
    simulator_id = ""
    robot_name = ""
    ws_server_addr = "ws://localhost:8088/papero"
    papero = pypapero.Papero(simulator_id, robot_name, ws_server_addr)
    if papero.errOccurred == 0:
        ser = serial.Serial("/dev/ttyUSB0", 115200)
        head_move = 0
        if (len(sys.argv) > 1) and sys.argv[1] == "-m":
            head_move = 1
        main(papero, head_move)
    papero.papero_cleanup()

動作させるには、まず「無線モジュールTWE-Liteを使う」に従い、TWE-Lite経由でBME280のセンサー値を取得できるようにします。
次に、上記プログラムをPaPeRo i の/tmpの下に sensorrec.py として保存し、同ディレクトリに「PaPeRo iをRaspberry Pi上のpythonから操作する」の「Raspberry Piへ通信ライブラリをインストール」にあるリンクからダウンロードした「PaPeRo制御用Pythonライブラリ」(pypapero.py)を同じく/tmpの下に配置した上で、

# python3 sensorrec.py > /dev/null &

で実行すると、首を動かさずに計測及び記録が行われます。
首を動かしながら計測及び記録を行うには、

# python3 sensorrec.py -m > /dev/null &

とします。

実行中、PaPeRo i は首動作と発話を繰り返し、BME280からセンサー値を受信すると、PaPeRo i で計測した温湿度と共に、下記のようなcsvファイル(log.csv)に出力します。

2016/1/1 0:7:7,21,55,27.42,46.45,1006,0
2016/1/1 0:8:17,21,55,27.41,46.45,1006,0
2016/1/1 0:10:27,21,55,27.42,46.44,1006,0
2016/1/1 0:11:37,21,60,27.42,46.43,1006,0

「,」で区切られた値の並びは、左からPaPeRo i の時刻、PaPeRo i の温度(℃)、PaPeRo i の湿度(%)、BME280の温度(℃)、BME280の湿度(%)、BME280の気圧(ヘクトパスカル)、首動作(0=首動作なし、1=首動作あり)です。
時刻から 2016/1/1 0:0:0 を引く事により、PaPeRoiが立ち上がってからの経過時間が分かります。
記録の中断は、Linux の kill コマンドで行います。
記録したデータのうち、
・PaPeRo i の温度
・BME280の温度
・首動作
を、この後の学習で使用します。

計測結果

実際に計測を行った結果は、こちらからダウンロードして頂けます。
ダウンロードされたsensordata.zipファイルの中のsensordataフォルダにある log1.csv~log9.csv が、計測結果のCSVファイルです。

計測結果のうち、log1.csv の温度(PaPeRo i 計測値及びBME280計測値)をグラフ化したものを、下図に示します。

この図から、PaPeRo i 計測温度に関して以下の事が分かります。

・立ち上げ直後はBME280の温度に比べて大きく下回っている(※)
・時間の経過に伴い上昇し、BME280の値に近づく
・首を動かすと若干上昇する

従って、PaPeRo i による計測温度から実際の温度を推測する為には、温度の時系列及び、首を動かしているかどうかに関する情報が利用できそうです。

(※) PaPeRo i の計測温度は、設定ファイルにより補正された値となっている為、立ち上げ直後は実際より低い値となります。

ニューラルネットワークによる学習

学習は、Windows10(64bit)のインストールされたPC上で、以下の手順により行いました。

(1) https://conda.io/miniconda.html から、Python3.6用のWindows 64-bit (exe installer) をダウンロードします。

(2) ダウンロードしたファイル(Miniconda3-latest-Windows-x86_64.exe)を実行します。

(3) 画面の指示に従って、インストールを完了させます。

(4) Windowsのスタートボタン→Anaconda3 (64bit)→Anaconda Prompt の順にクリックします。

(5) keras 専用の環境を作る為、Anaconda Prompt 上で、下記のコマンドを入力します。

conda create -n mykeras

(6) 先ほど作った環境に移行する為、下記のコマンドを入力します。

activate mykeras

プロンプトの先頭が、(base)から(mykeras)に変わります。

(7) 環境に、ニューラルネットワークのライブラリであるKerasをインストールします。

conda install keras

これにより、NumPy や TensorFlow を含む依存ライブラリも同時にインストールされます。

(8) 下記のコードをコピー・ペーストし、gru_tem.py という名前で任意のフォルダに置きます。

import sys
import codecs

import numpy as np
from keras.layers.recurrent import GRU
from keras.models import Sequential
from keras.optimizers import Adam
from keras.layers import Dense, Activation
from keras.models import model_from_json


def get_csv_data(file_path):
    csv_data = None
    f_in = None
    try:
        f_in = codecs.open(file_path, "r", "shift_jis")
        csv_data = []
    except IOError:
        print("ファイル" + file_path + "のオープンに失敗しました");
        f_in = None
    while f_in is not None:
        try:
            line = f_in.readline()
        except:
            line = False
            err = True
        if line:
            csv_line = (line.strip()).split(",")
            csv_data.append(csv_line)
        else:
            f_in.close()
            f_in = None
    return csv_data


def get_csv_filenames(file_path):
    csv_filepathes = []
    try:
        f_in = codecs.open(file_path, "r", "shift_jis")
    except IOError:
        print("ファイル" + file_path + "のオープンに失敗しました");
        f_in = None
    while f_in is not None:
        try:
            line = f_in.readline()
        except:
            line = False
            err = True
        if line:
            csv_filepathes.append(line.strip())
        else:
            f_in.close()
            f_in = None
    return csv_filepathes


def get_csv_dataset(csv_filenames):
    csv_dataset = []
    for csvfname in csv_filenames:
        csv_data = get_csv_data(csvfname)
        csv_dataset.append(csv_data)
    return csv_dataset


def train(csv_dataset, timesteps):
    list_data_x = []
    list_data_y = []
    total_num_data = 0
    for i_log in range(len(csv_dataset)):
        csv_data = csv_dataset[i_log]
        if csv_data is not None:
            num_data = max(len(csv_data) - timesteps + 1, 0)
            for i_time in range(num_data):
                for i_dat in range(timesteps):
                    j = i_time + i_dat
                    list_data_x.append(csv_data[j][1])
                    list_data_x.append(csv_data[j][6])
                    list_data_y.append(csv_data[j][3])
            total_num_data += num_data
    train_x = np.array(list_data_x).reshape(total_num_data, timesteps, 2)
    train_y = np.array(list_data_y).reshape(total_num_data, timesteps, 1)
    test_x = train_x
    test_y = train_y

    n_in_dim = 2
    n_hidden = 20
    n_out = 1
    epochs = 100
    batch_size = 10
    model=Sequential()
    model.add(GRU(n_hidden, input_shape=(timesteps, n_in_dim),
                  kernel_initializer='random_normal', return_sequences=True))
    model.add(Dense(n_out, kernel_initializer='random_normal'))
    model.add(Activation('linear'))
    model.compile(loss='mean_squared_error', optimizer=Adam(lr=0.01, beta_1=0.9, beta_2=0.999))
    model.summary();
    model.fit(train_x, train_y, batch_size=batch_size, epochs=epochs, validation_data=(test_x, test_y))
    return model


def pred_and_output(model, csv_data, timesteps, csv_filename):
    line_set = []
    f_out = codecs.open("pred_" + csv_filename, "w")
    for csv_line in csv_data:
        val_x1 = csv_line[1]
        val_x2 = csv_line[6]
        line_set.append(csv_line)
        if len(line_set) > timesteps:
            del line_set[0]
        dat_x = np.zeros((1, timesteps, 2))
        for i in range(len(line_set)):
            dat_x[0, i, 0] = line_set[i][1]
            dat_x[0, i, 1] = line_set[i][6]
        dat_y = model.predict(dat_x)
        val_y = dat_y[0, len(line_set) - 1, 0]
        val_t = csv_line[3]
        print("val_x1 =", val_x1, "val_x2 =", val_x2, "val_y =", val_y, "val_t =", val_t)
        outstr = csv_line[0] + "," + str(val_x1) + "," + str(val_y) + "," + str(val_t) + "\n"
        f_out.write(outstr)
    f_out.close()


if __name__ == "__main__":
    if len(sys.argv) > 2:
        csv_filenames = get_csv_filenames(sys.argv[1])
        csv_dataset = get_csv_dataset(csv_filenames)
        timesteps = 20
        if sys.argv[2] == "-l":
            model = train(csv_dataset, timesteps)
            # save model and weights
            model_json_str = model.to_json()
            open('gru_tem_model.json', 'w').write(model_json_str)
            model.save_weights('gru_tem_weights.h5')
        elif sys.argv[2] == "-p":
            # Load model
            model = model_from_json(open('gru_tem_model.json').read())
            model.load_weights('gru_tem_weights.h5')
            model.summary();
            for i in range(len(csv_filenames)):
                pred_and_output(model, csv_dataset[i], timesteps, csv_filenames[i])
    else:
        print("Usage: " + sys.argv[0] + " csv_file_list -l  --> learn")
        print("Usage: " + sys.argv[0] + " csv_file_list -p  --> predict")

(9) 実際の計測データ(sensordata.zipファイルの中のsensordataフォルダにあるlog1.csv~log9.csv)を、gru_tem.pyと同じフォルダに置きます。

(10) 学習に使用するcsvファイルを下記のように記述した、csvs_learn.txt ファイルを、gru_tem.pyと同じフォルダに置きます(このファイルは、sensordata.zipファイルの中のsensordataフォルダにも含まれています)。

log1.csv
log2.csv
log3.csv
log4.csv
log5.csv
log6.csv

(11) Anaconda Prompt 上で cd コマンドにより gru_tem.py を置いたディレクトリに移動し、下記のコマンドを実行します。

python gru_tem.py csvs_learn.txt -l

これにより、csvs_learn.txt に含まれるCSVファイルに記録されたデータのうち、
・PaPeRo i の温度
・BME280の温度
・首動作
を用いて学習が行われ、学習結果として gru_tem_model.json 及び gru_tem_weights.h5 がgru_tem.pyと同じフォルダに出力されます。
なお、sensordata.zipファイルには、こちらで学習を行った学習結果が同梱されています。
学習にかかる時間は、Intel(R) Core(TM) i7-3630QM CPU @ 2.40GHz、メモリ8GB のPC で、3分程度です。
Kerasを上記手順でインストールした場合、GPUは使用されません。

(12) 学習結果を利用して計測データのCSVから推測結果を出力する為、推測対象とするcsvファイルを下記のように記述した、csv_pred.txt ファイルを、gru_tem.py と同じフォルダに置きます(このファイルも、sensordata.zipファイルの中のsensordataフォルダにも含まれています)。

log1.csv
log2.csv
log3.csv
log4.csv
log5.csv
log6.csv
log7.csv
log8.csv
log9.csv

(13) 下記のコマンドを実行する事により、CSVファイルの PaPeRo i の計測温度から実際の温度の推測が行われ、推測結果がBME280計測結果と共に別なCSVファイルに出力されます。出力先のCSVファイルのファイル名は、入力元のCSVファイル名の先頭に、”pred_”を付加したものになります(例:log1.csv → pred_log1.csv)。

python gru_tem.py csvs_pred.txt -p

(14) sensordata.zipファイルの中のsensordataフォルダにある 推測結果CSVグラフ化.xlsm を任意のフォルダに置き、Excelで開きます。「CSV読み込み」ボタンをクリックし、表示されるファイル選択ダイアログで、(13)で出力されたCSVファイル(pred_log*.csv)を選択すると、読み込み及びグラフ化が行われます。

推測結果について

学習に使用した、log1.csv のPaPeRo i 計測温度から、実際の温度を推測した結果(pred_log1.csv)のグラフを、下図に示します。

推測された温度は PaPeRo i 計測温度に比べて、BME280による温度に近くなっている事が分かります。

次に、学習に使用しなかった、log9.csv のPaPeRo i 計測温度から、実際の温度を推測した結果(pred_log9.csv)のグラフを、下図に示します。

こちらについても、推測された温度は PaPeRo i 計測温度に比べ、BME280による温度に近くなっており、学習データを計測した条件に近い環境であれば、PaPeRo i の計測温度から実際に近い温度を推測できる事が分かります。

使用したニューラルネットワークについて

上記 gru_tem.py では、PaPeRo i 計測温度の時系列から実際の温度を推測する為、時系列データを学習する事が可能なリカレントニューラルネットワークの一つである、GRU(Gated Recurrent Unit)を使用します。
GRUのニューロン数は、20としております。
GRUにはPaPeRo i 計測温度及び首動作(0~1)を入力し、GRUからの20次元の出力を全結合層により1次元に変換する事により、温度の推測値を求め、それがBME280の計測値に近づくように、全結合層及びGRUの学習を行います。
学習回数は、100回となっています。
ネットワークの構成図を、下図に示します。

GRUからの現在の出力には、現在の入力だけでなく、過去から現在までの入力の時系列が反映されます。
時系列の長さはあらかじめ決めておく必要があり、gru_tem.py では20となっています。
計測された温度の時間間隔は、平均約80秒なので、最大約26分(=(20-1)×80秒)前から現在までの温度・首動作の時系列を使って、現在のBME280の温度値を推測する事になります。

PaPeRo i アプリケーションにおける温度推測

学習結果を使用し、PaPeRo i の温度計測値から、実際の温度の値を推測して発話するプログラム例を、以下に示します。

import sys
import time
from enum import Enum

import numpy as np
from keras.models import model_from_json

import pypapero


class State(Enum):
    st0 = 10
    st1 = 11
    st2 = 12
    end = 999


class Prediction:
    def __init__(self):
        self.model = model_from_json(open('gru_tem_model.json').read())
        self.model.load_weights('gru_tem_weights.h5')
        self.model.summary();
        self.list_src_data = []
        self.timesteps = 20
        self.last_measure_time = time.monotonic()
        self.st_measure = State.st0
        self.tem_pred = 0
        self.valid_tem_pred = False

    def measure_and_record(self, papero, msg_dic_rcv, head_move):
        if self.st_measure == State.st0:
            now_time = time.monotonic()
            if (len(self.list_src_data) <= 0) or (now_time - self.last_measure_time) >= 80:
                papero.send_get_sensor_value()
                self.last_measure_time = now_time
                self.st_measure = State.st1
        elif self.st_measure == State.st1:
            if (msg_dic_rcv is not None) and (msg_dic_rcv["Name"] == "getSensorValueRes"):
                if "TEM" in msg_dic_rcv:
                    tem_papero = int(msg_dic_rcv["TEM"])
                    self.list_src_data.append([tem_papero, head_move])
                    if len(self.list_src_data) > self.timesteps:
                        del self.list_src_data[0]
                    self.predict()
                    self.st_measure = State.st0
                else:
                    papero.send_get_sensor_value()

    def predict(self):
        dat_x = np.zeros((1, self.timesteps, 2))
        for i in range(len(self.list_src_data)):
            dat_x[0, i, 0] = self.list_src_data[i][0]
            dat_x[0, i, 1] = self.list_src_data[i][1]
        dat_y = self.model.predict(dat_x)
        val_y = dat_y[0, len(self.list_src_data) - 1, 0]
        self.tem_pred = val_y
        self.valid_tem_pred = True


def main(papero, head_move):
    prediction = Prediction()
    last_time = time.monotonic()
    state = State.st0
    while state != State.end:
        messages = papero.papero_robot_message_recv(0.1)
        if messages is not None:
            msg_dic_rcv = messages[0]
        else:
            msg_dic_rcv = None
        if papero.errOccurred != 0:
            print("------Error occured(main()). Detail : " + papero.errDetail)
            break
        prediction.measure_and_record(papero, msg_dic_rcv, head_move)
        if state == State.st0:
            if prediction.valid_tem_pred:
                state = State.st1
        elif state == State.st1:
            if head_move > 0:
                papero.send_move_head(["A30T1000L", "A0T1000L", "R0T4000L"],
                                      ["A0T1000L", "R0T1000L", "A80T1000L", "A0T1000L",
                                       "A-80T1000L", "A0T1000L"])
            else:
                last_time = time.monotonic()
            tem = int(prediction.tem_pred * 100 + 0.5) / 100
            speech = "現在の温度は" + str(tem) + "度です"
            papero.send_start_speech(speech)
            state = State.st2
        elif state == State.st2:
            if head_move > 0:
                if (msg_dic_rcv is not None) and (msg_dic_rcv["Name"] == "moveFinish"):
                    state = State.st1
            else:
                now_time = time.monotonic()
                if (now_time - last_time) > 6:
                    state = State.st1
        if (msg_dic_rcv is not None) and (msg_dic_rcv["Name"] == "detectButton"):
            state = State.end


if __name__ == "__main__":
    if len(sys.argv) > 1:
        simulator_id = ""
        robot_name = ""
        ws_server_addr = "ws://" + sys.argv[1] + ":8088/papero"
        papero = pypapero.Papero(simulator_id, robot_name, ws_server_addr)
        if papero.errOccurred == 0:
            head_move = 0
            if (len(sys.argv) > 2) and (sys.argv[2] == "-m"):
                head_move = 1
            main(papero, head_move)
        papero.papero_cleanup()
    else:
        print("Usage: " + sys.argv[0] + "ip_addr [-m]")

実行には、学習時と同様に、Keras、TensorFlow、NumPyがインストールされた環境が必要です。
上記 Anaconda Prompt の mykeras 環境で実行するには、ws4pyを下記のコマンドでインストールします。

conda install ws4py

上記のコードを gru_tem_model.json 及び gru_tem_weights.h5 の置かれたディレクトリに gru_ppr_tem.py という名前で保存し、pypapero.py を同ディレクトリに置いた上で、Anaconda Prompt から

python gru_ppr_tem.py PaPeRoiのIPアドレス

で実行すると、推測値の発話を繰り返します。

python gru_ppr_tem.py PaPeRoiのIPアドレス -m

で実行すると、推測値の発話と同時に、首動作が繰り返されます。
PaPeRo i の座布団のボタンを押すと、プログラムが終了します。

注意

sensordata.zip に含まれるCSVファイル及び学習結果は、エアコンの効いた室内の環境(気温25~30℃)でのものです。
上記以外の環境での推測の為には、その環境でのデータ計測及び学習が必要となります。
環境によっては、推測の精度を上げるために、GRUのニューロン数・学習回数・時系列長の変更が必要となる場合もあります。