音声強化版PaPeRo i で音声認識を行う

 音声強化版PaPeRo i は、使われているPaPeRo i 自体は従来と同じものですがお腹にTinker Board S が格納されており、このTinker Board S に

・マイクアレイ
・単語だけでなく文章を認識可能な音声認識エンジン
・自動応答機能

が接続・搭載されていて、従来よりも高機能な音声認識アプリを作ることができます。
この音声強化版PaPeRo i で音声認識を行う方法を紹介します。なお音声強化版PaPeRo i ではアプリはTinker Board S 上で動作するPythonプログラムとして作成します。

音声認識する方法

 音声強化版PaPeRo i の音声認識機能はPaPeRo i 本体のロボットプラットフォームには含まれず、Tinker Board S にプリインストールされているPythonパッケージとして提供されます。音声認識機能と自動応答機能が統合されて「PaPeRo i おしゃべり機能」として提供されており、パッケージ名はpapebotです。

 まず比較対象として、従来の本体の音声認識機能を使って単に音声認識した言葉を画面に表示するだけの簡単なプログラムです。

speechrecog_body.py:

import sys
from logging import (getLogger, INFO, basicConfig)

import pypapero

logger = getLogger(__name__)


def main():
    simid, simname, url = pypapero.get_params_from_commandline(sys.argv)
    papero = pypapero.Papero(simid, simname, url)
    if papero.errOccurred != 0:
        logger.error('errOccurred: {}'.format(papero.errOccurred))
        return
    logger.info('start.')
    dn = '/opt/papero/lib/Standard.mrg'
    logger.info('dictionary={}'.format(dn))
    papero.send_stop_speech_recognition()
    papero.send_free_dictionary('')
    papero.send_read_dictionary(dn)
    papero.send_add_speech_recognition_rule('Standard')
    papero.send_start_speech_recognition()
    papero.send_turn_led_on('ear', ["W2W2", str(int(500 / 100)), "NN", str(int(500 / 100))], repeat=True)
    while True:
        msgs = papero.papero_robot_message_recv(1.0)
        if (msgs is  None) or (0 == len(msgs)):
            continue
        msg0 = msgs[0]
        name = msg0.get("Name")
        if name == "detectPhrase":
            if "Expression" in msg0:
                txt = msg0["Expression"]
                if txt != "Reject":
                    # 音声認識した言葉を表示
                    logger.info(txt)
                else:
                    pass
        elif name == "detectButton":
            # 座布団ボタンでプログラム終了
            logger.info('end.')
            break
    papero.send_stop_speech_recognition()
    papero.send_turn_led_off('ear')
    papero.send_free_dictionary('')
    papero.papero_cleanup()


if __name__ == "__main__":
    basicConfig(level=INFO, format='%(asctime)s %(levelname)s %(name)s.%(funcName)s %(message)s')
    main()

なお起動は

python3 speechrecog_body.py -wssvr ws://192.168.5.1:8088/papero

の様にパペロのアドレスを指定する必要があります。

 Tinker Board S 上でpapebotパッケージを使用してこれと同じことをやると以下の様になります(Tinker Board S 上で実行する必要があります)。

speechrecog_tinker.py:

import os
import sys
from logging import (getLogger, INFO, basicConfig)

import papebot

import pypapero

logger = getLogger(__name__)


def main():
    simid, simname, url = pypapero.get_params_from_commandline(sys.argv)
    papero = pypapero.Papero(simid, simname, url)
    if papero.errOccurred != 0:
        logger.error('errOccurred: {}'.format(papero.errOccurred))
        return
    logger.info('start.')
    papebot.init(papero)
    papebot.set_triggerword_onoff(True)  # トリガーワード必要
    papebot.start_listening(chat_talk=False)
    is1st = True
    while True:
        msgs = papero.papero_robot_message_recv(1.0)
        if (msgs is  None) or (0 == len(msgs)):
            continue
        msg0 = msgs[0]
        name = msg0.get("Name")
        if name == "papebotRecogStart":
            # トリガーワード「パペロ」を認識したら耳を点滅
            papero.send_turn_led_on('ear', ["W2W2", str(int(500 / 100)), "NN", str(int(500 / 100))], repeat=True)
        elif name == "papebotRecognized":
            if is1st:
                # 以後はトリガーワード不要に設定
                papebot.set_triggerword_onoff(False)
                is1st = False
            txt = msg0.get("Sentence")
            if (txt is not None) and (0 < len(txt)):
                # 音声認識した言葉を表示
                logger.info(txt)
            else:
                pass
        elif name == "detectButton":
            # 座布団ボタンでプログラム終了
            logger.info('end.')
            break
    papebot.stop_listening()
    papero.send_turn_led_off('ear')
    papero.papero_cleanup()


if __name__ == "__main__":
    basicConfig(level=INFO, format='%(asctime)s %(levelname)s %(name)s.%(funcName)s %(message)s')
    main()

ここではAPIの使い方の説明も兼ねて、本体版より少し複雑なことをしています。順に説明すると、

(1) 初期化:init()
 papebotパッケージの初期化API init()にはpypapero.Paperoのインスタンスを渡します。以後papebotのイベントもパペロのイベントと同様papero.papero_robot_message_recv()で受け取れるようになります。

(2) トリガーワードON/OFF:set_triggerword_onoff()
 音声強化版パペロにはトリガーワード機能があり、そのON/OFFを指定できます。set_triggerword_onoff(True)とするとトリガーワード機能がONになり、トリガーワード「パペロ」を認識するまで他の言葉は音声認識しません。この場合「パペロ」を認識するとイベント”papebotRecogStart”を発行し、続けて言葉を認識すると”papebotRecognized”イベントを1回発行し、再び「パペロ」待ちになります。set_triggerword_onoff(False)とした場合は言葉を音声認識するたびに”papebotRecognized”イベントを連続して発行します。
 上記プログラムでは最初はトリガーワード機能ONとし、一度「パペロ」を認識して、続けて言葉を音声認識した時点でset_triggerword_onoff(False)としてトリガーワード機能をOFFにしており、つまり最初の一回だけ「パペロ」が必要で以後は「パペロ」なしで連続して音声認識する様にしています。

(3) 動作開始:start_listening()
 音声認識動作を開始します。引数は動作モード指定で、start_listening(True)とすると音声認識すると自動応答する動作になります(おしゃべりモード)。start_listening(False)とすると、単に音声認識する動作になります(音声認識モード)。ここでは音声認識モードを指定しています。

他の機能の紹介

 上記サンプルプログラムからは外れますが、他の機能を紹介します。
(1) ミュート制御:voice_mute()/voice_unmute()
 音声認識のON/OFFを一時的に制御するAPIです。特にパペロに発話させる間は、自分の声を聞き取ってしまわない様にミュートする必要がありますが、papebotのAPIを使って発話させる場合、ミュート制御は自動で行われるのでこれらのAPIを明示的に使用する必要はありません。発話中以外でミュートしたい場合に使用します。

(2) おしゃべり発話:do_chatbot_operation()
 音声認識モードで使用中に一時的におしゃべりモードの動作をさせることができるAPIです。つまりアプリで特定の言葉についてはそのアプリ固有の応答や制御を行うが、それ以外の言葉には自動応答させる、ということができます。応答の発話までさせるか、応答のテキストだけを取得するかも引数によって制御でき、発話までさせる場合ミュート制御も行われます。

(3) 発話指示:auto_talk()
 ミュート制御が行われる発話指示APIです。なお本APIだけでなくpapebotのAPIで発話させた場合はミュート制御の他に発話終了イベント”papebotChatTalkEnd”が発行されるので、従来必要だったアプリで発話終了をポーリングする必要がなくなります。

(4) 自動動作設定:set_auto_operation()
 papebotでは耳、口、頬のLED制御および首の動作も自動で実行する機能を持っており、本APIで有効・無効を指示します。

papebotを使うプログラムをPCでデバッグするためのダミーライブラリ

 papebotを利用するプログラムをPC上でVisual Studio CodePyCharmなどを使ってデバッグしようとしても、PCにはpapebotパッケージが存在しないためimport部分で例外となりプログラムを動かすことができません。それでは何かと不便なのでpapebotと同じAPIを持つダミーライブラリを作成したので紹介します(PaPeRo i おしゃべり機能API仕様書0.7版互換)。
 但しおしゃべりモードの実際の発話動作や発話終了イベント”papebotChatTalkEnd”を発行する機能は実装していないため、単に音声認識機能だけを利用し、発話やミュート制御をアプリ側で行う場合しか使えないのでご注意ください。
 使い方は、一つの例ですが”/home/linaro”が存在する場合はTinker Board S 上で実行されていると判断して本物のpapebotをimportし、そうでなければ下記dummypapebot.pyをpapebotとしてimportする様にすれば、Tinker Board S とPCどちらでも動作するプログラムにできます。

RUN_ON_TINKER = os.path.exists('/home/linaro')
if RUN_ON_TINKER:
    import papebot
else:
    import dummypapebot as papebot 

また、キーボードでEnterが入力されたタイミングでそれまでに入力された文字列を認識語として音声認識イベントを発行する様にしています。start_listening()など正しい初期化が行われたかどうかは判断しておらず、キー入力があればイベントを発行します。

dummypapebot.py:

from logging import (getLogger, debug, info, warn, error, critical,
DEBUG, INFO, WARN, ERROR, CRITICAL, basicConfig)
import json
import threading

logger = getLogger(__name__)

MODE_IDLE = 'idle'
MODE_CHATBOT = 'chatbot'
MODE_ASR = 'asr'


class DummyPapebot:
    # ダミーpapebotクラス
    def __init__(self):
        self._mode = MODE_IDLE
        self._papero = None
        self._triggerword_onoff = True
        self._ear_led = True
        self._head_move = False
        self._mouth_led = True
        self._cheek_led = False
        self._system_running = False
        self._muted = False
        self._micarray_volume = 100
        self._micarray_width = 180
        self._micarray_angle = 0
        self._config_file = None
        self._config_data = None
        self._recording = False

    def init(self, papero, config_file=None, config_data=None):
        # エンジン初期設定
        logger.info('papero={} config_file={} config_data={}'.format(
            papero, config_file, config_data))
        self._papero = papero
        self._config_file = config_file
        self._config_data = config_data
        threading.Thread(target=self._key_wait_loop).start()

    def set_auto_operation(self, dic):
        # 自動動作設定
        ear = dic.get('EarLED')
        head = dic.get('HeadMove')
        mouth = dic.get('MouthLED')
        cheek = dic.get('CheekLED')
        if ear is not None:
            self._ear_led = ear
        if head is not None:
            self._head_move = head
        if mouth is not None:
            self._mouth_led = mouth
        if cheek is not None:
            self._cheek_led = cheek
        logger.info('dic={} ear={} head={} mouth={} cheek={}'.format(
            dic, self._ear_led, self._head_move, self._mouth_led, self._cheek_led
        ))

    def start_listening(self, chat_talk=True):
        # 動作開始
        logger.info(chat_talk)
        mode = MODE_CHATBOT if chat_talk else MODE_ASR
        self._mode = mode
        self._system_running = True
        res = True
        # res = False
        return res

    def stop_listening(self):
        # 動作停止
        logger.info('mode={}'.format(self._mode))
        self._system_running = False
        if self._mode == MODE_IDLE:
            return False
        self._mode = MODE_IDLE
        res = True
        # res = False
        return res

    def get_mode(self):
        # 動作モード取得
        res = self._mode
        logger.info('mode={}'.format(res))
        return res

    def get_system_running(self):
        # 動作状態取得
        res = self._system_running
        # res = True
        # res = False
        logger.info('system_running={}'.format(res))
        return res

    def do_chatbot_operation(self, sentence, talk=True):
        # おしゃべり発話実行
        logger.info('sentence={} talk={}'.format(sentence, talk))
        return sentence + 'って聞こえたよ'

    def voice_mute(self):
        # 聞き取りミュート
        logger.info('muted={}'.format(self._muted))
        self._muted = True

    def voice_unmute(self):
        # 聞き取りミュート解除
        logger.info('muted={}'.format(self._muted))
        self._muted = False

    def is_papero_micarray_present(self):
        # 本体マイクアレイ制御可否
        res = True
        # res = False
        logger.info('res={}'.format(res))
        return res

    def get_papero_micarray_volume(self):
        # マイク音量設定取得
        res = self._micarray_volume
        logger.info('res={}'.format(res))
        return res

    def set_papero_micarray_volume(self, value):
        # マイク音量設定
        logger.info(value)
        if isinstance(value, int) and  0 <= value <= 100:
            self._micarray_volume = value
        else:
            raise Exception('volume out of range: {}'.format(value))

    @staticmethod
    def _clip_value_close(value, vmin, vmax):
        if vmin > value:
            return vmin
        elif vmax < value:
            return vmax
        return value

    def set_papero_micarray_beam(self, *, width=None, angle=None):
        # 本体マイクビーム設定
        if isinstance(width, int):
            self._micarray_width = self._clip_value_close(width, 23, 180)
        if isinstance(angle, int):
            self._micarray_angle = self._clip_value_close(angle, -90, 90)
        logger.info('arg: width={} angle={} stat: width={} angle={}'.format(
            width, angle, self._micarray_width, self._micarray_angle))

    def send_move_head(self, vertical, horizontal):
        # 首動作指示
        logger.info('vertical={} horizontal={}'.format(vertical, horizontal))

    def auto_talk(self, message):
        # 発話指示
        logger.info(message)

    def add_module(self, name):
        # おしゃべりプラグインのロード指示
        logger.info(name)
        res = True
        # res = False
        return res

    def del_module(self, name):
        # おしゃべりプラグインのアンロード指示
        logger.info(name)
        res = True
        # res = False
        return res

    def get_config_item(self, name, attrname):
        # おしゃべりプラグイン設定取得
        res = name + attrname
        logger.info('name={} attrname={} res={}'.format(name, attrname, res))
        return res

    def set_config_item(self, name, attrname, value):
        # おしゃべりプラグイン設定項目更新
        logger.info('name={} attrname={} value={}'.format(name, attrname, value))
        res = True
        # res = False
        return res

    def set_secondary_triggerword_off(self, value, ntime=None, ncount=None):
        # トリガーワード自動ON/OFF機能の有効化・無効化
        logger.info('value={} ntime={} ncount={}'.format(value, ntime, ncount))
        res = True
        # res = False
        return res

    def set_triggerword_onoff(self, value):
        # トリガーワードON/OFF
        logger.info('value={}'.format(value))
        self._triggerword_onoff = value
        res = True
        # res = False
        return res

    def get_triggerword_onoff(self):
        # トリガーワードON/OFF状態取得
        logger.info('')
        res = self._triggerword_onoff
        return res

    def set_recording_onoff(self, value):
        # 認識音声録音ON/OFF
        logger.info('value={}'.format(value))
        self._recording = value
        res = True
        # res = False
        return res

    def get_recording_onoff(self):
        # 認識音声録音ON/OFF
        v = self._recording
        logger.info('value={}'.format(v))
        return v

    def _key_wait_loop(self):
        while True:
            user_input = input("音声入力の代わり:")
            cmd = 'papebotRecognized'
            if user_input == ' ':
                cmd = 'papebotRecogStart'
            msg = {
                'Name': cmd,
                'Sentence': user_input
            }
            dic = {
                'Name': 'RobotMessage',
                "Messages": [msg, ],
            }
            s = json.dumps(dic)
            papero = self._papero
            if papero is not None:
                papero.queFromCom.put(s)


# グローバル変数
_papebot = DummyPapebot()


def init(papero, config_file=None, config_data=None):
    _papebot.init(papero, config_file=config_file, config_data=config_data)


def set_auto_operation(dic):
    _papebot.set_auto_operation(dic)


def start_listening(chat_talk=True):
    return _papebot.start_listening(chat_talk)


def stop_listening():
    return _papebot.stop_listening()


def get_mode():
    return _papebot.get_mode()


def get_system_running():
    return _papebot.get_system_running()


def do_chatbot_operation(sentence, talk=True):
    return _papebot.do_chatbot_operation(sentence=sentence,talk=talk)


def voice_mute():
    _papebot.voice_mute()


def voice_unmute():
    _papebot.voice_unmute()


def is_papero_micarray_present():
    return _papebot.is_papero_micarray_present()


def get_papero_micarray_volume():
    return _papebot.get_papero_micarray_volume()


def set_papero_micarray_volume(volume):
    _papebot.set_papero_micarray_volume(volume)


def set_papero_micarray_beam(*, width=None, angle=None):
    _papebot.set_papero_micarray_beam(width=width, angle=angle)


def send_move_head(vertical, horizontal):
    _papebot.send_move_head(vertical=vertical, horizontal=horizontal)


def auto_talk(message):
    _papebot.auto_talk(message=message)


def add_module(name):
    return _papebot.add_module(name)


def del_module(name):
    return _papebot.del_module(name)


def get_config_item(name, attrname):
    return _papebot.get_config_item(name=name, attrname=attrname)


def set_config_item(name, attrname, value):
    return _papebot.set_config_item(name=name, attrname=attrname, value=value)


def set_secondary_triggerword_off(value, ntime=None, ncount=None):
    return _papebot.set_secondary_triggerword_off(value=value, ntime=ntime,
                                                    ncount=ncount)


def set_triggerword_onoff(value):
    return _papebot.set_triggerword_onoff(value)


def get_triggerword_onoff():
    return _papebot.get_triggerword_onoff()


def set_recording_onoff(value):
    return _papebot.set_recording_onoff(value=value)


def get_recording_onoff():
    return _papebot.get_recording_onoff()