Juliusをリモートから再起動する

 Juliusで独自辞書を使った音声認識を行うなどの実験をしていて気づいたのですが、PCやラズパイでJuliusを動かしPaPeRo iのマイクを使った音声認識をする、つまり

  • moduleモード(音声認識サーバ動作)
  • adinnet使用(音声データをJuliusとは別のホストから送る)

という使い方だと、何らかのきっかけで音声認識ができなくなることがあるようで、その場合、Juliusを再起動しないと確実には復帰できませんでした。音声認識ができなくなった場合に最悪PaPeRo iのボタン操作で復帰させようとしても、別ホストなのでプロセスを探して再起動するといった方法は使えません。
 そこで、

  • Juliusを実行できる
  • WebブラウザからJuliusのオプションを変更できる
  • WebSocket通信でJuliusを再起動できる

という機能を持ったツールexectrlを作成しました。

外部コマンド制御ツール

 このツールの作成にはgolangを使用しましたのでビルドオプションを指定して再ビルドすればPaPeRo i本体を含めgolangがサポートしている全てのホストで動作させることが可能です。
 すぐに試せるように実行するJuliusのパスもWebブラウザから指定できる様にしてありますが、システムを破壊するコマンドも指定できてしまいますので、万一実際に利用する場合にはオプションのみをWebブラウザから変更できる様にするか、Web画面自体を無効にして下さい。
 全ソースはこちらからダウンロードできます。以下抜粋してご紹介します。

Web画面定義

 WebブラウザからJuliusのオプションを変更可能とするため、Webから変更できるアプリの設定用Pythonライブラリのgolang版を作成しました。このライブラリを使用して今回のツールの画面定義をするコードは以下の様になりました。

func exeCtrlConfDef(param *wc.ConfDefPrm) (*wc.ConfDefSt, func()) {
    // 画面要素のID
    var (
        idBasePath   = "BasePath"
        idExePath    = "ExePath"
        idExeOption  = "ExeOption"
        idStartBtn   = "StartBtn"
        idStopBtn    = "StopBtn"
        idAutoChk    = "AutoChk"
        idStatus     = "Status"
        idStdout     = "Stdout"
        idDefaultBtn = "DefaultBtn"
        idReloadBtn  = "ReloadBtn"
        idSaveBtn    = "SaveBtn"
    )

    // ...(中略)...

    // 画面/データ定義
    genitems := func() []wc.ItemPtr {
        return []wc.ItemPtr{
            wc.Text(&wc.ItemPrm{ID: idBasePath, Title: "ベースパス(同じパスを複数回使う場合に指定します)", Val: defaultBasePath, Size: 79}),
            wc.Text(&wc.ItemPrm{ID: idExePath, Title: "コマンド(%sはベースパスで置換します)", Val: defaultExePath, Size: 79}),
            wc.Textarea(&wc.ItemPrm{ID: idExeOption, Title: "オプション(%sはベースパスで置換します。%sは複数使用できます)", Val: defaultExeOption, Size: 80, Rows: 4}),
            wc.Button(&wc.ItemPrm{ID: idStartBtn, Title: "開始", Eol: " ", Handler: func(s *wc.ConfDefSt, item *wc.ItemSt, val string, dic map[string]string) bool {
                doStart(s)
                return true
            }}),
            wc.Button(&wc.ItemPrm{ID: idStopBtn, Title: "停止", Eol: " ", Handler: func(s *wc.ConfDefSt, item *wc.ItemSt, val string, dic map[string]string) bool {
                doStop(s)
                return true
            }}),
            wc.Checkbox(&wc.ItemPrm{ID: idAutoChk, Title: "起動時に実行開始する", Eot: " ", Val: ""}),
            wc.Text(&wc.ItemPrm{ID: idStatus, Title: "実行状態", Nosave: true, Val: "init"}),
            wc.Textarea(&wc.ItemPrm{ID: idStdout, Title: "stdout", Nosave: true, Val: "", Size: 80, Rows: 24}),
            wc.Button(&wc.ItemPrm{ID: idDefaultBtn, Title: "デフォルトに戻す", Eol: " ", Handler: func(s *wc.ConfDefSt, item *wc.ItemSt, val string, dic map[string]string) bool {
                s.SetDefault()
                return true
            }}),
            wc.Button(&wc.ItemPrm{ID: idReloadBtn, Title: "ファイルから読み直す", Eol: " ", Handler: func(s *wc.ConfDefSt, item *wc.ItemSt, val string, dic map[string]string) bool {
                s.Load("")
                return true
            }}),
            wc.Button(&wc.ItemPrm{ID: idSaveBtn, Title: "保存", Handler: func(s *wc.ConfDefSt, item *wc.ItemSt, val string, dic map[string] string) bool {
                s.Dump("")
                return true
            }}),
        }
    }

    // ...(後略)...

Windows PCで実行した場合のWebブラウザの表示は以下の様になります。

WebSocketサーバ

 再起動指令を受け取るために同時にWebSocketサーバを動かす必要がありますが、これには今回もgorilla/websocketを使用しました。
golangの開発環境で

> go get github.com/gorilla/websocket

とし、

import "github.com/gorilla/websocket"

で使用できます。WebSocketから再起動指示を受け取ったらコールバックdoRestart()を呼び出すWebSocketサーバは以下の様になりました。

type wsMsg struct {
    Name string
}

func genCtrlWsHandler(doRestart func()) func(w http.ResponseWriter, r *http.Request) {
    var upgrader = websocket.Upgrader{} // use default options
    wsHandler := func(w http.ResponseWriter, r *http.Request) {
        ws, err := upgrader.Upgrade(w, r, nil)
        if err != nil {
            log.Print("upgrade:", err)
            return
        }
        defer ws.Close()
        log.Printf("ws connect %p", ws)
        for {
            _, dat, err := ws.ReadMessage()
            if err != nil {
                log.Println("ws read error:", err)
                break
            }
            msg := wsMsg{}
            err = json.Unmarshal(dat, &msg)
            if err != nil {
                log.Println("unmarshal err:", err)
                continue
            }
            if msg.Name == "RestartReq" {
                log.Println("recv RestartReq.")
                doRestart()
                break
            }
        }
    }
    return wsHandler
}

コマンド実行・停止

 golangで外部コマンド文字列cmdstr、オプションlstを実行するには

ctx := context.Background()
cmd = exec.CommandContext(ctx, cmdstr, lst...)
outpipe, err := cmd.StdoutPipe()
cmd.Start()

停止するには

cmd.Process.Kill()
cmd.Wait()

とします。実行開始ごとに標準出力StdoutPipeを画面に反映するループを実行するgoroutineを生成しています。

動作確認

動作確認用プログラム

 Juliusに接続し、音声認識結果をオウム返しするPythonプログラムです。座布団真ん中ボタンでexectrlに対してJuliusの再起動要求を送ります。こちらで作成したpypapero.pyのラッパークラスstpypapero.pyを使用しています。JuliusManagerスレッドでJuliusとの接続を管理し、音声認識イベントはstpypaperoのput_ex_event()を利用してパペロアプリのイベントループに送ります。

juliusdemo.py:

# -*- coding: utf-8 -*-
# juliusデモ(exectrl版)
# (c) 2018 Sophia Planning Inc.
# LICENCE: MIT
#
# (exectrlにより起動されている)juliusサーバ(moduleモード)に接続し、認識語をオウム返しする
# 座布団真ん中ボタンでexectrlに対しjuliusの再起動を要求する
#
import argparse
import enum
import select
import socket
import threading
import time
import xml.etree.cElementTree as ET
import subprocess
from logging import (getLogger, debug, info, warn, error, critical,
                     DEBUG, INFO, WARN, ERROR, CRITICAL, basicConfig)

from ws4py.client.threadedclient import WebSocketClient

from stpypapero import StPyPaperoThread as SxPyPapero

logger = getLogger(__name__)


class JuliusManager(threading.Thread):
    # Juliusマネージャ
    # moduleモードのjuliusに接続し、音声認識時、コールバックrecog_cbを呼ぶ
    # exectrlの再起動要求機能
    TIMEOUT = 1
    RECV_MAX = 4096
    RETRY_PERIOD = 3

    class Status(enum.Enum):
        INIT = 1
        FIN = 2
        CONNECT = 3
        ERR = 4

    def __init__(self, host, recog_cb, port=10500, exectrl_port=8860,
                 exectrl_path='_server_control', adintool=''):
        self.host = host
        self.port = port
        self.recog_cb = recog_cb
        self.exectrl_port = exectrl_port
        self.exectrl_path = exectrl_path
        self.adintool = adintool.strip().format(host).strip('"')
        self.adin_proc = None
        self.sock = None
        self.recv_lines = []
        self.buf = b''
        self.status = self.Status.INIT
        super().__init__()

    def connect(self):
        # 接続指示
        self.__init__(host=self.host, recog_cb=self.recog_cb, port=self.port,
                      exectrl_port=self.exectrl_port,
                      exectrl_path=self.exectrl_path,
                      adintool=self.adintool)
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            self.sock.connect((self.host, self.port))
            logger.info('connect')
            self.status = self.Status.CONNECT
        except Exception as e:
            logger.error(e)
            self.status = self.Status.ERR
            return
        if 0 < len(self.adintool):
            try:
                self.adin_proc = subprocess.Popen(self.adintool.split(' '))
            except Exception as e:
                logger.error(e)
                self.adin_proc = None


    def closed(self):
        # 切断時処理
        if self.adin_proc is not None:
            if self.adin_proc.poll() is None:
                self.adin_proc.terminate()

    def recv_msg(self):
        # sockから受信し\nがあれば行をrecv_linesに移す
        ready_rd, ready_wr, in_err = select.select([self.sock], [], [],
                                                   self.TIMEOUT)
        if self.sock not in ready_rd:
            return False
        try:
            dat = self.sock.recv(self.RECV_MAX)
        except Exception as e:
            logger.error(e)
            self.status = self.Status.ERR
            self.closed()
            return False
        if not dat:
            logger.error('connection closed')
            self.status = self.Status.FIN
            self.closed()
            return False
        self.buf += dat
        lines = self.buf.split(b'\n')
        cnt = len(lines)
        for j in range(cnt - 1):
            self.recv_lines.append(lines[j].decode('utf-8'))
        self.buf = lines[cnt - 1]
        return cnt > 1

    def get_blocks(self):
        # recv_linesから'.'で区切られたブロックを取り出す
        txt = ''
        last_idx = -1
        res = []
        for j, line in enumerate(self.recv_lines):
            if line == '.':
                last_idx = j
                if 0 < len(txt):
                    txt = self.extract_xml(txt).strip()
                    if 0 < len(txt):
                        res.append(txt)
                txt = ''
            else:
                if 0 < len(line):
                    txt += line + '\n'
        if 0 <= last_idx:
            if last_idx < len(self.recv_lines) - 1:
                self.recv_lines = self.recv_lines[last_idx+1:]
            else:
                self.recv_lines = []
        return res

    # 置換辞書
    repl_map = {'<': '&lt;', '>': '&gt;'}

    def replace_xml(self, txt):
        # '"'で囲まれた'<'と'>'を置換する
        res = ''
        quoted = False
        for c in txt:
            if quoted:
                if c in self.repl_map:
                    c = self.repl_map[c]
                elif c == '"':
                    quoted = False
            else:
                if c == '"':
                    quoted = True
            res += c
        return res

    # 削除リスト
    del_words = ['[s]', '[/s]', '<s>', '</s>', '&lt;s&gt;', '&lt;/s&gt;']

    def extract_xml(self, txt):
        # 認識語を取り出す
        txt = self.replace_xml(txt)
        res = ''
        try:
            objs = ET.fromstring('<Root>' + txt + '</Root>')
        except Exception as e:
            logger.error('txt: {} error:{}'.format(txt, e))
            return ''
        lst = objs.findall('.//SHYPO')
        for tgt in lst:
            for elm in tgt:
                if elm.tag == 'WHYPO':
                    word = elm.attrib['WORD']
                    res += word
        if 0 < len(res):
            # 削除対象文字列を削除
            for dw in self.del_words:
                res = res.replace(dw, '')
        return res

    def send(self, cmd):
        # 送信(送信と受信が別スレッドは許される)
        if self.status != self.Status.CONNECT:
            return
        dat = (cmd + '\n').encode('utf-8')
        try:
            self.sock.sendall(dat)
        except Exception as e:
            logger.error(e)
            self.status = self.Status.ERR
            self.closed()

    def close(self):
        # 切断
        self.sock.close()

    def recv_proc(self):
        # 受信処理
        if self.recv_msg():
            blk = self.get_blocks()
            for word in blk:
                # 音声認識コールバック
                self.recog_cb(word)

    def reconnect(self):
        # 再接続
        time.sleep(self.RETRY_PERIOD)
        self.connect()

    def run(self):
        # 実行ループ
        self.connect()
        while True:
            if self.status == self.Status.CONNECT:
                self.recv_proc()
            if self.status != self.Status.CONNECT:
                self.reconnect()

    def recog_pause(self):
        # juliusに対して音声認識停止を要求する
        logger.info('pause')
        self.send('PAUSE')

    def recog_resume(self):
        # juliusに対して音声認識再開を要求する
        logger.info('resume')
        self.send('RESUME')

    def restart_req(self):
        # exectrlに対し、juliusの再起動を要求する
        logger.info('restart')
        url = 'ws://{}:{}/{}'.format(self.host, self.exectrl_port,
                                     self.exectrl_path)
        try:
            self.server_ctrl = WebSocketClient(url=url)
            self.server_ctrl.connect()
            self.server_ctrl.send('{ "Name": "RestartReq" }', binary=False)
            logger.info('send restart req to {}'.format(url))
        except Exception as e:
            logger.error(e)


class JuliusDemo(SxPyPapero):
    # julius demo パペロアプリ
    def __init__(self, simid='', simname='', url=None, polling_sec=0.2, papero=None,
                 julius_ip='', adintool=None, julius_port=10500, exectrl_port=8860):
        # julius_ipなどのパラメータのため__init__も定義する
        self.julius_ip = julius_ip
        self.julius_port = julius_port
        self.exectrl_port = exectrl_port
        self.adintool = adintool
        self.julius_mgr = None
        super().__init__(simid=simid, simname=simname, url=url, polling_sec=polling_sec, papero=papero)

    def initialize(self):
        self.handler = self.in_init

        # 音声認識コールバック
        def recog_handler(txt):
            # 外部イベント発行(queueへのputはスレッドセーフ)
            self.put_ex_event('__recog', {'data': txt})

        self.julius_mgr = JuliusManager(host=self.julius_ip, recog_cb=recog_handler,
                                        port=self.julius_port,
                                        exectrl_port=self.exectrl_port,
                                        adintool=self.adintool)
        self.julius_mgr.start()

    def in_init(self, name, msg):
        # 初期状態
        if name == 'Ready':
            # 準備完了:起動発話
            self.speech('ジュリアスデモ')
            self.trans_state(self.in_speech)

    def in_wait(self, name, msg):
        # アイドル状態
        logger.debug('event: {}'.format(name))
        if name == 'detectButton':
            logger.info("APP detect button")
            status = msg.get('Status')
            if status == 'C':
                # 真ん中ボタン 再起動要求
                self.julius_mgr.restart_req()
        elif name == '__recog':
            # 音声認識結果受信
            txt = msg.get('data')
            if not txt:
                return
            logger.info('recog: {}'.format(txt))
            # 音声認識停止
            self.julius_mgr.recog_pause()
            # 発話
            self.speech_fin = False
            self.head_fin = False
            self.speech(txt, head_en=False)
            self.trans_state(self.in_speech)

    def in_speech(self, name, msg):
        # 発話中状態
        if name == '_endOfSpeech':
            # 発話終了:音声認識再開
            self.julius_mgr.recog_resume()
            self.trans_state(self.in_wait)


def arg_parse():
    paperoip='192.168.1.189'
    #paperoip='192.168.1.1'
    ap = argparse.ArgumentParser()
    ap.add_argument('-sim', type=str, help='simulator id', default='')
    ap.add_argument('-robot', type=str, help='robot name', default='')
    # ap.add_argument('-wssvr', type=str, help='papero url', default='wss://smilerobo.com:8000/papero')
    ap.add_argument('-wssvr', type=str, help='papero url', default='ws://{}:8088/papero'.format(paperoip))
    ap.add_argument('-julius', type=str, help='julius ip address', default='{}'.format(paperoip))
    ap.add_argument('-adintool', type=str, help='adintool', default='"/Extension/local/bin/adintool -in mic -out adinnet -server {}"')
    return ap.parse_args()


def main():
    args = arg_parse()
    url = args.wssvr
    sim_id = args.sim
    sim_name = args.robot
    julius = args.julius
    adintool = args.adintool
    papero = JuliusDemo(url=url, simid=sim_id, simname=sim_name,
                        julius_ip=julius, adintool=adintool)
    papero.run()


if __name__ == '__main__':
    basicConfig(format='%(asctime)-15s %(levelname)s %(module)s.%(funcName)s %(message)s')
    logger = getLogger()  # root logger
    logger.setLevel(INFO)
    logger.info('start')
    main()

juliusdemo.pyはstpypapero.py、pypapero.pyと同じディレクトリに置いてください。

動作確認手順

 WindowsでJuliusとexectrlを動かし、PaPeRo iで動作確認用Pythonアプリを動かす手順です。

(1) Windows PCでJuliusディクテーションキットversion 4.4(http://julius.osdn.jp/index.php?q=dictation-kit.htmlからダウンロードできます)をC:\juliusフォルダに展開します(=exectrlのパラメータと一致させます)。
(2) Windows PCで独自辞書mydict.*をC:\julius\mydictフォルダに配置します(=同じくexectrlのパラメータと一致させます)。
(3) Windows PCでexectrl.exeを実行、ブラウザでlocalhost:8860/exectrlを開いてstartボタンをクリックします。

> exectrl.exe

(4) PaPeRo iで動作確認用Pythonアプリを実行します。

# python3 juliusdemo.py -wssvr ws://localhost:8088/papero -julius PCのIPアドレス

パペロに「こんにちは」など独自辞書に登録された単語を話しかけるとパペロがオウム返しします。
exectrlのブラウザ画面でstopボタンで音声認識できない状態にしても、PaPeRo iの座布団真ん中ボタンを押すことでJuliusが起動し、音声認識できる様になります。
アプリをラズパイなどPaPeRo i以外で動作させる場合には、Juliusと接続するごとにPaPeRo i上でadintoolを実行する様に修正する必要があります。