手をかざしてパペロの首を動かせるようにする

 センサー値をリアルタイムグラフ表示してみたところ、人感センサー(ir)は手を近づけたりすると敏感に反応することが観察できました。この値を使ってパペロの首を動かしたらどうなるのか?という思いつきを試してみました。

PaPeRo iの人感センサーについて

 PaPeRo iの人感センサーは
(1) 赤外線センサ自体の温度素子
(2) 上下左右に配置された4つの素子の検出値
(3) センサーコンポーネント内で判定した位置情報“Center”(中央)、”Left”(左)、”Right”(右)、”Lost”(不在)
の6値を返します。今回は(2)の値の、上下の差、左右の差の値に単に係数を掛けて首の絶対位置(角度)として指定しました。

プログラムについて

 リアルタイムグラフ表示のプログラムに首の制御を追加します。これはグラフ表示のための状態遷移プログラムになっていて、首の動きとは同期させづらいので、首は簡単にタイマーで2秒に1回、最新のセンサー値で動かすことにします。このため、状態とは無関係にイベントごとにstpypaperoから呼び出されるメソッドevent_hook()に処理を記述します。

    def event_hook(self, name, msg):
        if name == '_timer':
            if msg.get('TimerID') == 'head':
                self.do_move()

    def do_move(self):
        if self.irlst is None:
            return
        s2x = 0.5
        s2y = 0.5
        ofx = 0
        ofy = -10
        x = s2x * (self.irlst[1] - self.irlst[3]) + ofx
        y = s2y * (self.irlst[4] - self.irlst[2]) + ofy
        logger.info((x, y))
        self.move_to(x, y, sfx="S50L")
        self.set_timer(2, 'head')

センサー値から首の指令値に変換するための係数(0.5, 0.5)とオフセット(0, -10)は、動作を見て適当に決めたものです。

動作確認

 オリジナル同様、このプログラムはPaPeRo i上でもラズパイでもPCでも、pythonが動作する環境であれば動かすことが出来ます。センサー値をリアルタイムグラフ表示が動作する状態から、末尾記載のファイルpaperograph.pyとstpypapero.pyを上書きして下さい。PaPeRo iのアドレスはpaperograph.pyに直書きしているので必要なら修正し、以下で起動して下さい。

> python3 paperograph.py

 座布団前面中央の人感センサーの周りに手をかざすと、かざした方に首が向いてくれました。手は使わず、顔をぐっと近づけるだけでも、だいたいこちらを向いてくれるような動きになりました。グラフ通り、ノイズ(?)により不安定な動きになりますが、そのランダムさがこの場合には良い方に働く様です。
 アプリの中で、単に首をランダムに動かしている場合には、この方法に置き換えると、そこそこ周りの人の動きを反映する動きになり、面白いかも知れません。ただし、2秒に1回の動作を連続で続けるのは単調な動作音がうるさく、もう少しメリハリを付けるような工夫が必要かも知れません。

ソース

(1) paperograph.py

# -*- coding: utf-8 -*-
# センサー値をリアルタイムグラフ表示する
# 0.1.0  2018/05/31 IRセンサー値で首を移動する試み
# 0.0.0  2018/05/25 初版
import os
from logging import (getLogger, debug, info, warn, error, critical,
                     DEBUG, INFO, WARN, ERROR, CRITICAL, basicConfig)

import tornado.ioloop
import tornado.web
import tornado.websocket

from stpypapero import StPyPaperoThread as StPyPapero


logger = getLogger(__name__)


class HtmlHandler(tornado.web.RequestHandler):
    def get(self):
        self.render('paperograph.html')


class JsHandler(tornado.web.RequestHandler):
    def get(self):
        self.render('paperograph.js')


class Js2Handler(tornado.web.RequestHandler):
    def get(self):
        self.render('smoothie.js')


class WsHandler(tornado.websocket.WebSocketHandler):
    clients = []

    def initialize(self):
        """ initialize (called by tornado) """
        self.clients.append(self)

    def open(self, *args, **kwargs):
        logger.debug("WsHandler args:{} kwargs:{}".format(args, kwargs))

    def on_message(self, msg):
        logger.debug("WsHandler msg:{}".format(msg))

    def on_close(self):
        logger.debug("WsHandler")
        self.clients.remove(self)

    def send(self, msg, binary=False):
        logger.debug("WsHandler msg:{}".format(msg))
        super().write_message(msg, binary)

    @classmethod
    def broadcast(cls, msg):
        for con in cls.clients:
            con.send(msg)

    @classmethod
    def invoke_broadcast(cls, msg):
        tornado.ioloop.IOLoop.current().add_callback(cls.broadcast, msg)


def start_web():
    app = tornado.web.Application([
        (r'/paperograph.html', HtmlHandler),
        (r'/paperograph.js', JsHandler),
        (r'/smoothie.js', Js2Handler),
        (r'/__ws', WsHandler)
        ],
        template_path=os.path.dirname(__file__),
    )
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()


class SensorApp(StPyPapero):
    def initialize(self):
        self.handler = self.in_init
        self.irlst = None
        # states are:
        # init, sensors, time_wait, lum_start, lum_get, lum_stop

    # 処理メソッド

    def trans_state(self, newhandler):
        # ログを抑制するためのオーバーライド
        self.handler = newhandler

    def do_move(self):
        if self.irlst is None:
            return
        s2x = 0.5
        s2y = 0.5
        ofx = 0
        ofy = -10
        x = s2x * (self.irlst[1] - self.irlst[3]) + ofx
        y = s2y * (self.irlst[4] - self.irlst[2]) + ofy
        logger.info((x, y))
        self.move_to(x, y, sfx="S50L")
        self.set_timer(2, 'head')

    # 状態遷移メソッド

    def go_lum_start(self):
        self.papero.send_start_lum_sensor()
        self.trans_state(self.in_lum_start)

    def go_lum_get(self):
        self.papero.send_get_lum_sensor_value()
        self.trans_state(self.in_lum_get)

    def go_lum_stop(self):
        self.papero.send_stop_lum_sensor()
        self.trans_state(self.in_lum_stop)

    def go_sensors(self):
        self.papero.send_get_sensor_value()
        self.trans_state(self.in_sensors)

    def go_time_wait(self):
        self.set_timer(0.1, 'tm0')
        self.trans_state(self.in_time_wait)

    # 状態ごとのイベント処理メソッド

    def in_init(self, name, msg):
        if name == 'Ready':
            # self.go_sensors()
            self.set_timer(2, 'head')
            self.go_lum_start()

    def in_lum_start(self, name, msg):
        logger.debug('event: {}'.format(name))
        if name == 'startLumSensorRes':
            self.go_lum_get()

    def in_lum_get(self, name, msg):
        logger.debug('event: {}'.format(name))
        if name == 'getLumSensorValueRes':
            lx = msg.get('Return')
            WsHandler.invoke_broadcast({'lx': lx})
            #self.go_lum_stop()
            self.go_sensors()

    def in_lum_stop(self, name, msg):
        logger.debug('event: {}'.format(name))
        if name == 'stopLumSensorRes':
            self.go_sensors()

    def in_sensors(self, name, msg):
        logger.debug('event: {}'.format(name))
        if name == 'getSensorValueRes':
            if 'ret' not in msg or 0 <= msg.get('ret'):
                temp = msg.get('TEM')
                rh = msg.get('HYG')
                ir = msg.get('IR')
                irlst = ir.split(',') if ir else []
                WsHandler.invoke_broadcast({'temp': temp, 'rh': rh, 'ir': irlst})
                self.irlst = [int(s) for j, s in enumerate(irlst) if j < 5 ]
            else:
                logger.info('error ret={}'.format(msg.get('ret')))
            self.go_time_wait()

    def in_time_wait(self, name, msg):
        logger.debug('event: {}'.format(name))
        if name == '_timer':
            if msg.get('TimerID') == 'tm0':
                self.go_lum_start()
                # self.go_sensors()

    # 状態にかかわらず実行するイベントフック
    def event_hook(self, name, msg):
        if name == '_timer':
            if msg.get('TimerID') == 'head':
                self.do_move()


def main():
    #url = 'ws://192.168.1.188:8088/papero'
    url = 'ws://192.168.4.1:8088/papero'
    papero = SensorApp(url=url)
    papero.start()
    #threading.Thread(target=start_web).start()
    start_web()


if __name__ == '__main__':
    basicConfig(format='%(asctime)-15s %(levelname)s %(module)s.%(funcName)s %(message)s')
    logger = getLogger()  # root logger
    # logger.setLevel(DEBUG)
    logger.setLevel(INFO)
    logger.info('start')
    main()

(2) stpypapero.py

# -*- coding: utf-8 -*-
# pypaperoラッパークラスstpypapero
# (c) 2017-2018 Sophia Planning Inc.
# LICENSE: MIT
#
# Ver   Date       Description
# 0.3.1 2018/05/31 event_hook()が機能しなかった不具合修正
# 0.3.0 2018/05/29 put_ex_event(), move_to()、event_hook()追加
# 0.2.0 2018/05/22 状態変数を廃止、状態処理メソッドself.handlerを書き換える方法に変更
# 0.1.1 2018/02/20 speech()にmouth_en, head_en追加
# 0.1.0 2018/02/09 名称変更paperoappbase->stpypapero、on_startup削除、
#                  メッセージのリスト(現状要素数常に1)ではなく要素を渡すよう変更、
#                  paperoの引数指定追加、処理関数の返値をTRUEでプログラム終了(return書かなければ継続)に反転
#                  タイマー機能追加
# 0.0.2 2018/02/02 基本クラスAppBaseと派生クラスThreadAppBaseに分離
# 0.0.1 2017/12/19 Initial version
"""
使い方:
(1) StPyPaperoを継承したクラスを作る
(2) in_xxxで各状態ごとの処理を記述
(3) initialize()でself.handlerに初期状態用のin_xxxを設定
(4) 状態遷移時にはself.handlerを別のin_xxxに書き換える

機能:
(1) 終話検知
  stpypapero.speech(text)
  ...
  def in_xxxx(self, name, msg):
      if name == '_endOfSpeech':
          ...
 イベント: ''
(2) タイマー
  stpypapero.set_timer(delay, timer_id)
  stpypapero.cancel_timer(timer_id)
  ...
  def in_xxxx(self, name, msg):
      if name == '_timer':
          tid = msg.get('TimerID')
          if tid == timer_id:
              ...
(3) 外部イベント(パペロスレッドにパペロ以外のイベントを送る)
  stpypapero.put_ex_event('__exEvent1', {'data': [1,2,3,4]})
  ...
  def in_xxxx(self, name, msg):
      if name == '__exEvent1':
          ...
"""
import sys
import threading
import random
import json
from logging import (getLogger, basicConfig, DEBUG, INFO, WARN, ERROR,)

import pypapero

logger = getLogger(__name__)


# mouth data for speech
MOUTH_DAT = [
    [
        "G3" + "G3" + "G3" + "G3" + "G3" + "G3" + "G3" + "G3" + "G3", "3",
        "NNN" + "G3" + "G3" + "G3" + "NNN", "3",
        "NNNN" + "G3" + "NNNN", "3",
        "N" + "G3" + "G3" + "G3" + "G3" + "G3" + "G3" + "G3" + "N", "3",
        "NN" + "G3" + "G3" + "G3" + "G3" + "G3" + "NN", "3",
    ]
]

# head data for speech (random select)
HEAD_DAT = [
    [ # 0
        ["A4T300L",  "R0T300L",  "R0T300L",  "R0T300L", "A0T300L", ],  # x
        ["A3T300L", "A-3T300L", "A3T300L", "A-6T300L", "A0T300L", ],   # y
    ], [ # 1
        ["A-4T300L",  "R0T300L",  "R0T300L",  "R0T300L", "A0T300L", ],  # x
        ["A-3T300L", "A3T300L", "A-3T300L", "A6T300L", "A0T300L", ],    # y
    ], [ # 2
        ["A-3T300L",  "R0T300L",  "R5T300L",  "R0T300L", "A0T300L", ],  # x
        ["A-3T300L", "A3T300L", "A-2T300L", "A4T300L", "A0T300L", ],    # y
    ]
]


class StPyPapero(object):
    # papero application class
    def __init__(self, simid='', simname='', url=None, polling_sec=0.2, papero=None):
        if url is None:
            simid, simname, url = pypapero.get_params_from_commandline(sys.argv)
        self._polling_sec = polling_sec
        self._eos_wait_res = False  # send_get_speech_staus and waiting response
        self._eos_polling = False
        self._eos_callback = None  # end of speech callback
        self._timer_dic = {}
        if papero is None:
            papero = pypapero.Papero(simid, simname, url)
        self.papero = papero
        self.handler = None
        self.initialize()
        if 0 < self.papero.errOccurred:
            logger.error(self.papero.errDetail)
        else:
            self._inner_event_handler('Ready')

    """virtual
    def initialize(self):
        self.handler = self.in_init
    """

    def trans_state(self, newhandler):
        # transit state
        logger.info('state: {} -> {}'.format(self.handler, newhandler))
        self.handler = newhandler

    def speech(self, text,
               language=None, speaker_id=None,
               pitch=None, speed=None, volume=None,
               pause=None, comma_pause=None, urgent=None, priority="normal",
               eos_callback=None, mouth_en=True, head_en=True):
        # speech with head and mouth
        logger.debug('start speech')
        self._eos_polling = True
        self._eos_callback = eos_callback
        if mouth_en:
            self.papero.send_turn_led_on('mouth', MOUTH_DAT[0], repeat=True)
        if head_en:
            hidx = random.randint(0, len(HEAD_DAT) - 1)
            self.papero.send_move_head(HEAD_DAT[hidx][1], HEAD_DAT[hidx][0])
        self.papero.send_start_speech(text, language=language, speaker_id=speaker_id,
                                      pitch=pitch, speed=speed, volume=volume,
                                      pause=pause, comma_pause=comma_pause,
                                      urgent=urgent, priority=priority)

    def recv_msg(self):
        msg = self.papero.papero_robot_message_recv(self._polling_sec)
        return self._inner_event_handler(msg)

    def _inner_event_handler(self, msg):
        if self.papero.errOccurred != 0:
            return True
        # msg = self.recv_msg()
        if msg is None:
            if self._eos_polling and not self._eos_wait_res:
                # poll end of speech
                self.papero.send_get_speech_status()
                self._eos_wait_res = True
            msg = self._timeout_hook()
        if msg is None:
            return False
        else:
            res = self._handle_msg(msg)
            return res

    def run(self):
        while True:
            if self.recv_msg():
                break
        self.papero.papero_cleanup()

    def _timeout_hook(self):
        return None

    """
    def in_init(self, name, msg):
        if name == 'Ready':
            self.trans_state(self.State.IDLE)

    def in_idle(self, name, msg):
        pass
    """

    def _handle_msg(self, msg):
        if msg is None:
            return False
        if isinstance(msg, str):
            name = msg
        elif isinstance(msg, list):
            if 1 == len(msg):
                msg = msg[0]
                name = msg.get('Name')
            else:
                name = 'MsgList'
        else:
            name = msg.get('Name')
        if name == "getSpeechStatusRes":
            self._eos_wait_res = False
            ret = msg.get("Return")
            if self._eos_polling and (ret == 0):
                logger.debug('end of speech')
                self._eos_polling = False
                self.papero.send_turn_led_off('mouth')
                if callable(self._eos_callback):
                    self._eos_callback()
                # change name
                name = "_endOfSpeech"
        if hasattr(self, 'event_hook'):
            self.event_hook(name, msg)
        # call function for this state
        if callable(self.handler):
            self.handler(name, msg)

    # callbackが別スレッドで動く事に注意
    def _set_timeout(self, delay, callback, *args, **kwargs):
        hdl = threading.Timer(delay, callback, args, kwargs)
        hdl.start()
        return hdl

    def _remove_timeout(self, hdl):
        hdl.cancel()

    def _send_timeout_msg(self, timer_id):
        msg = ('{{"Name": "RobotMessage", "Messages": [' +
               '{{"Name": "_timer", "TimerID": "{}"}}] }}').format(timer_id)
        self.papero.queFromCom.put(msg)

    def set_timer(self, delay, timer_id):
        hdl = self._set_timeout(delay, self._send_timeout_msg, timer_id)
        self._timer_dic[timer_id] = hdl

    def cancel_timer(self, timer_id):
        if timer_id in self._timer_dic:
            hdl = self._timer_dic[timer_id]
            self._remove_timeout(hdl)

    def put_ex_event(self, name, msg=None):
        if msg is None:
            msg = {}
        if name is not None:
            msg['Name'] = name
        dic = {
            'Name': 'RobotMessage',
            "Messages": [msg, ],
        }
        s = json.dumps(dic)
        self.papero.queFromCom.put(s)

    @staticmethod
    def limit(x, lb, ub):
        if x < lb:
            return lb
        if x > ub:
            return ub
        return x

    def move_to(self, xy, y=None, pfx='A', sfx='S40L'):
        if y is None:
            x, y = xy
        else:
            x = xy
        x = self.limit(x, -128, 128)
        y = self.limit(y, -20, 55)
        hs = ['{}{}{}'.format(pfx, int(x), sfx), ]
        vs = ['{}{}{}'.format(pfx, int(y), sfx), ]
        self.papero.send_move_head(vs, hs)
        return x, y


class StPyPaperoThread(threading.Thread, StPyPapero):
    def __init__(self, simid='', simname='', url=None, polling_sec=0.2, papero=None):
        if url is None:
            simid, simname, url = pypapero.get_params_from_commandline(sys.argv)
        # threading.Thread.__init__()
        super(StPyPaperoThread, self).__init__()
        # PyPaperoAppBase.__init__()
        super(threading.Thread, self).__init__(simid=simid, simname=simname,
                                               url=url, polling_sec=polling_sec,
                                               papero=papero)

    def run(self):
        return super(threading.Thread, self).run()