センサー値をリアルタイムグラフ表示してみたところ、人感センサー(ir)は手を近づけたりすると敏感に反応することが観察できました。この値を使ってパペロの首を動かしたらどうなるのか?という思いつきを試してみました。
PaPeRo iの人感センサーについて
PaPeRo iの人感センサーは
(1) 赤外線センサ自体の温度素子
(2) 上下左右に配置された4つの素子の検出値
(3) センサーコンポーネント内で判定した位置情報“Center”(中央)、”Left”(左)、”Right”(右)、”Lost”(不在)
の6値を返します。今回は(2)の値の、上下の差、左右の差の値に単に係数を掛けて首の絶対位置(角度)として指定しました。
プログラムについて
リアルタイムグラフ表示のプログラムに首の制御を追加します。これはグラフ表示のための状態遷移プログラムになっていて、首の動きとは同期させづらいので、首は簡単にタイマーで2秒に1回、最新のセンサー値で動かすことにします。このため、状態とは無関係にイベントごとにstpypaperoから呼び出されるメソッドevent_hook()に処理を記述します。
def event_hook(self, name, msg):
if name == '_timer':
if msg.get('TimerID') == 'head':
self.do_move()
def do_move(self):
if self.irlst is None:
return
s2x = 0.5
s2y = 0.5
ofx = 0
ofy = -10
x = s2x * (self.irlst[1] - self.irlst[3]) + ofx
y = s2y * (self.irlst[4] - self.irlst[2]) + ofy
logger.info((x, y))
self.move_to(x, y, sfx="S50L")
self.set_timer(2, 'head')
センサー値から首の指令値に変換するための係数(0.5, 0.5)とオフセット(0, -10)は、動作を見て適当に決めたものです。
動作確認
オリジナル同様、このプログラムはPaPeRo i上でもラズパイでもPCでも、pythonが動作する環境であれば動かすことが出来ます。センサー値をリアルタイムグラフ表示が動作する状態から、末尾記載のファイルpaperograph.pyとstpypapero.pyを上書きして下さい。PaPeRo iのアドレスはpaperograph.pyに直書きしているので必要なら修正し、以下で起動して下さい。
> python3 paperograph.py
座布団前面中央の人感センサーの周りに手をかざすと、かざした方に首が向いてくれました。手は使わず、顔をぐっと近づけるだけでも、だいたいこちらを向いてくれるような動きになりました。グラフ通り、ノイズ(?)により不安定な動きになりますが、そのランダムさがこの場合には良い方に働く様です。
アプリの中で、単に首をランダムに動かしている場合には、この方法に置き換えると、そこそこ周りの人の動きを反映する動きになり、面白いかも知れません。ただし、2秒に1回の動作を連続で続けるのは単調な動作音がうるさく、もう少しメリハリを付けるような工夫が必要かも知れません。
ソース
(1) paperograph.py
# -*- coding: utf-8 -*-
# センサー値をリアルタイムグラフ表示する
# 0.1.0 2018/05/31 IRセンサー値で首を移動する試み
# 0.0.0 2018/05/25 初版
import os
from logging import (getLogger, debug, info, warn, error, critical,
DEBUG, INFO, WARN, ERROR, CRITICAL, basicConfig)
import tornado.ioloop
import tornado.web
import tornado.websocket
from stpypapero import StPyPaperoThread as StPyPapero
logger = getLogger(__name__)
class HtmlHandler(tornado.web.RequestHandler):
def get(self):
self.render('paperograph.html')
class JsHandler(tornado.web.RequestHandler):
def get(self):
self.render('paperograph.js')
class Js2Handler(tornado.web.RequestHandler):
def get(self):
self.render('smoothie.js')
class WsHandler(tornado.websocket.WebSocketHandler):
clients = []
def initialize(self):
""" initialize (called by tornado) """
self.clients.append(self)
def open(self, *args, **kwargs):
logger.debug("WsHandler args:{} kwargs:{}".format(args, kwargs))
def on_message(self, msg):
logger.debug("WsHandler msg:{}".format(msg))
def on_close(self):
logger.debug("WsHandler")
self.clients.remove(self)
def send(self, msg, binary=False):
logger.debug("WsHandler msg:{}".format(msg))
super().write_message(msg, binary)
@classmethod
def broadcast(cls, msg):
for con in cls.clients:
con.send(msg)
@classmethod
def invoke_broadcast(cls, msg):
tornado.ioloop.IOLoop.current().add_callback(cls.broadcast, msg)
def start_web():
app = tornado.web.Application([
(r'/paperograph.html', HtmlHandler),
(r'/paperograph.js', JsHandler),
(r'/smoothie.js', Js2Handler),
(r'/__ws', WsHandler)
],
template_path=os.path.dirname(__file__),
)
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
class SensorApp(StPyPapero):
def initialize(self):
self.handler = self.in_init
self.irlst = None
# states are:
# init, sensors, time_wait, lum_start, lum_get, lum_stop
# 処理メソッド
def trans_state(self, newhandler):
# ログを抑制するためのオーバーライド
self.handler = newhandler
def do_move(self):
if self.irlst is None:
return
s2x = 0.5
s2y = 0.5
ofx = 0
ofy = -10
x = s2x * (self.irlst[1] - self.irlst[3]) + ofx
y = s2y * (self.irlst[4] - self.irlst[2]) + ofy
logger.info((x, y))
self.move_to(x, y, sfx="S50L")
self.set_timer(2, 'head')
# 状態遷移メソッド
def go_lum_start(self):
self.papero.send_start_lum_sensor()
self.trans_state(self.in_lum_start)
def go_lum_get(self):
self.papero.send_get_lum_sensor_value()
self.trans_state(self.in_lum_get)
def go_lum_stop(self):
self.papero.send_stop_lum_sensor()
self.trans_state(self.in_lum_stop)
def go_sensors(self):
self.papero.send_get_sensor_value()
self.trans_state(self.in_sensors)
def go_time_wait(self):
self.set_timer(0.1, 'tm0')
self.trans_state(self.in_time_wait)
# 状態ごとのイベント処理メソッド
def in_init(self, name, msg):
if name == 'Ready':
# self.go_sensors()
self.set_timer(2, 'head')
self.go_lum_start()
def in_lum_start(self, name, msg):
logger.debug('event: {}'.format(name))
if name == 'startLumSensorRes':
self.go_lum_get()
def in_lum_get(self, name, msg):
logger.debug('event: {}'.format(name))
if name == 'getLumSensorValueRes':
lx = msg.get('Return')
WsHandler.invoke_broadcast({'lx': lx})
#self.go_lum_stop()
self.go_sensors()
def in_lum_stop(self, name, msg):
logger.debug('event: {}'.format(name))
if name == 'stopLumSensorRes':
self.go_sensors()
def in_sensors(self, name, msg):
logger.debug('event: {}'.format(name))
if name == 'getSensorValueRes':
if 'ret' not in msg or 0 <= msg.get('ret'):
temp = msg.get('TEM')
rh = msg.get('HYG')
ir = msg.get('IR')
irlst = ir.split(',') if ir else []
WsHandler.invoke_broadcast({'temp': temp, 'rh': rh, 'ir': irlst})
self.irlst = [int(s) for j, s in enumerate(irlst) if j < 5 ]
else:
logger.info('error ret={}'.format(msg.get('ret')))
self.go_time_wait()
def in_time_wait(self, name, msg):
logger.debug('event: {}'.format(name))
if name == '_timer':
if msg.get('TimerID') == 'tm0':
self.go_lum_start()
# self.go_sensors()
# 状態にかかわらず実行するイベントフック
def event_hook(self, name, msg):
if name == '_timer':
if msg.get('TimerID') == 'head':
self.do_move()
def main():
#url = 'ws://192.168.1.188:8088/papero'
url = 'ws://192.168.4.1:8088/papero'
papero = SensorApp(url=url)
papero.start()
#threading.Thread(target=start_web).start()
start_web()
if __name__ == '__main__':
basicConfig(format='%(asctime)-15s %(levelname)s %(module)s.%(funcName)s %(message)s')
logger = getLogger() # root logger
# logger.setLevel(DEBUG)
logger.setLevel(INFO)
logger.info('start')
main()
(2) stpypapero.py
# -*- coding: utf-8 -*-
# pypaperoラッパークラスstpypapero
# (c) 2017-2018 Sophia Planning Inc.
# LICENSE: MIT
#
# Ver Date Description
# 0.3.1 2018/05/31 event_hook()が機能しなかった不具合修正
# 0.3.0 2018/05/29 put_ex_event(), move_to()、event_hook()追加
# 0.2.0 2018/05/22 状態変数を廃止、状態処理メソッドself.handlerを書き換える方法に変更
# 0.1.1 2018/02/20 speech()にmouth_en, head_en追加
# 0.1.0 2018/02/09 名称変更paperoappbase->stpypapero、on_startup削除、
# メッセージのリスト(現状要素数常に1)ではなく要素を渡すよう変更、
# paperoの引数指定追加、処理関数の返値をTRUEでプログラム終了(return書かなければ継続)に反転
# タイマー機能追加
# 0.0.2 2018/02/02 基本クラスAppBaseと派生クラスThreadAppBaseに分離
# 0.0.1 2017/12/19 Initial version
"""
使い方:
(1) StPyPaperoを継承したクラスを作る
(2) in_xxxで各状態ごとの処理を記述
(3) initialize()でself.handlerに初期状態用のin_xxxを設定
(4) 状態遷移時にはself.handlerを別のin_xxxに書き換える
機能:
(1) 終話検知
stpypapero.speech(text)
...
def in_xxxx(self, name, msg):
if name == '_endOfSpeech':
...
イベント: ''
(2) タイマー
stpypapero.set_timer(delay, timer_id)
stpypapero.cancel_timer(timer_id)
...
def in_xxxx(self, name, msg):
if name == '_timer':
tid = msg.get('TimerID')
if tid == timer_id:
...
(3) 外部イベント(パペロスレッドにパペロ以外のイベントを送る)
stpypapero.put_ex_event('__exEvent1', {'data': [1,2,3,4]})
...
def in_xxxx(self, name, msg):
if name == '__exEvent1':
...
"""
import sys
import threading
import random
import json
from logging import (getLogger, basicConfig, DEBUG, INFO, WARN, ERROR,)
import pypapero
logger = getLogger(__name__)
# mouth data for speech
MOUTH_DAT = [
[
"G3" + "G3" + "G3" + "G3" + "G3" + "G3" + "G3" + "G3" + "G3", "3",
"NNN" + "G3" + "G3" + "G3" + "NNN", "3",
"NNNN" + "G3" + "NNNN", "3",
"N" + "G3" + "G3" + "G3" + "G3" + "G3" + "G3" + "G3" + "N", "3",
"NN" + "G3" + "G3" + "G3" + "G3" + "G3" + "NN", "3",
]
]
# head data for speech (random select)
HEAD_DAT = [
[ # 0
["A4T300L", "R0T300L", "R0T300L", "R0T300L", "A0T300L", ], # x
["A3T300L", "A-3T300L", "A3T300L", "A-6T300L", "A0T300L", ], # y
], [ # 1
["A-4T300L", "R0T300L", "R0T300L", "R0T300L", "A0T300L", ], # x
["A-3T300L", "A3T300L", "A-3T300L", "A6T300L", "A0T300L", ], # y
], [ # 2
["A-3T300L", "R0T300L", "R5T300L", "R0T300L", "A0T300L", ], # x
["A-3T300L", "A3T300L", "A-2T300L", "A4T300L", "A0T300L", ], # y
]
]
class StPyPapero(object):
# papero application class
def __init__(self, simid='', simname='', url=None, polling_sec=0.2, papero=None):
if url is None:
simid, simname, url = pypapero.get_params_from_commandline(sys.argv)
self._polling_sec = polling_sec
self._eos_wait_res = False # send_get_speech_staus and waiting response
self._eos_polling = False
self._eos_callback = None # end of speech callback
self._timer_dic = {}
if papero is None:
papero = pypapero.Papero(simid, simname, url)
self.papero = papero
self.handler = None
self.initialize()
if 0 < self.papero.errOccurred:
logger.error(self.papero.errDetail)
else:
self._inner_event_handler('Ready')
"""virtual
def initialize(self):
self.handler = self.in_init
"""
def trans_state(self, newhandler):
# transit state
logger.info('state: {} -> {}'.format(self.handler, newhandler))
self.handler = newhandler
def speech(self, text,
language=None, speaker_id=None,
pitch=None, speed=None, volume=None,
pause=None, comma_pause=None, urgent=None, priority="normal",
eos_callback=None, mouth_en=True, head_en=True):
# speech with head and mouth
logger.debug('start speech')
self._eos_polling = True
self._eos_callback = eos_callback
if mouth_en:
self.papero.send_turn_led_on('mouth', MOUTH_DAT[0], repeat=True)
if head_en:
hidx = random.randint(0, len(HEAD_DAT) - 1)
self.papero.send_move_head(HEAD_DAT[hidx][1], HEAD_DAT[hidx][0])
self.papero.send_start_speech(text, language=language, speaker_id=speaker_id,
pitch=pitch, speed=speed, volume=volume,
pause=pause, comma_pause=comma_pause,
urgent=urgent, priority=priority)
def recv_msg(self):
msg = self.papero.papero_robot_message_recv(self._polling_sec)
return self._inner_event_handler(msg)
def _inner_event_handler(self, msg):
if self.papero.errOccurred != 0:
return True
# msg = self.recv_msg()
if msg is None:
if self._eos_polling and not self._eos_wait_res:
# poll end of speech
self.papero.send_get_speech_status()
self._eos_wait_res = True
msg = self._timeout_hook()
if msg is None:
return False
else:
res = self._handle_msg(msg)
return res
def run(self):
while True:
if self.recv_msg():
break
self.papero.papero_cleanup()
def _timeout_hook(self):
return None
"""
def in_init(self, name, msg):
if name == 'Ready':
self.trans_state(self.State.IDLE)
def in_idle(self, name, msg):
pass
"""
def _handle_msg(self, msg):
if msg is None:
return False
if isinstance(msg, str):
name = msg
elif isinstance(msg, list):
if 1 == len(msg):
msg = msg[0]
name = msg.get('Name')
else:
name = 'MsgList'
else:
name = msg.get('Name')
if name == "getSpeechStatusRes":
self._eos_wait_res = False
ret = msg.get("Return")
if self._eos_polling and (ret == 0):
logger.debug('end of speech')
self._eos_polling = False
self.papero.send_turn_led_off('mouth')
if callable(self._eos_callback):
self._eos_callback()
# change name
name = "_endOfSpeech"
if hasattr(self, 'event_hook'):
self.event_hook(name, msg)
# call function for this state
if callable(self.handler):
self.handler(name, msg)
# callbackが別スレッドで動く事に注意
def _set_timeout(self, delay, callback, *args, **kwargs):
hdl = threading.Timer(delay, callback, args, kwargs)
hdl.start()
return hdl
def _remove_timeout(self, hdl):
hdl.cancel()
def _send_timeout_msg(self, timer_id):
msg = ('{{"Name": "RobotMessage", "Messages": [' +
'{{"Name": "_timer", "TimerID": "{}"}}] }}').format(timer_id)
self.papero.queFromCom.put(msg)
def set_timer(self, delay, timer_id):
hdl = self._set_timeout(delay, self._send_timeout_msg, timer_id)
self._timer_dic[timer_id] = hdl
def cancel_timer(self, timer_id):
if timer_id in self._timer_dic:
hdl = self._timer_dic[timer_id]
self._remove_timeout(hdl)
def put_ex_event(self, name, msg=None):
if msg is None:
msg = {}
if name is not None:
msg['Name'] = name
dic = {
'Name': 'RobotMessage',
"Messages": [msg, ],
}
s = json.dumps(dic)
self.papero.queFromCom.put(s)
@staticmethod
def limit(x, lb, ub):
if x < lb:
return lb
if x > ub:
return ub
return x
def move_to(self, xy, y=None, pfx='A', sfx='S40L'):
if y is None:
x, y = xy
else:
x = xy
x = self.limit(x, -128, 128)
y = self.limit(y, -20, 55)
hs = ['{}{}{}'.format(pfx, int(x), sfx), ]
vs = ['{}{}{}'.format(pfx, int(y), sfx), ]
self.papero.send_move_head(vs, hs)
return x, y
class StPyPaperoThread(threading.Thread, StPyPapero):
def __init__(self, simid='', simname='', url=None, polling_sec=0.2, papero=None):
if url is None:
simid, simname, url = pypapero.get_params_from_commandline(sys.argv)
# threading.Thread.__init__()
super(StPyPaperoThread, self).__init__()
# PyPaperoAppBase.__init__()
super(threading.Thread, self).__init__(simid=simid, simname=simname,
url=url, polling_sec=polling_sec,
papero=papero)
def run(self):
return super(threading.Thread, self).run()