メインコンテンツまでスキップ

ストリーミング STT - gRPC

gRPC 方式でストリーミング STT を実装する方法を説明します。

以下の例はローカル音声ファイルを入力とします。デバイス入力に置き換えることで同様に利用できます。proto ファイルも公開しています。

認証トークン

認証ガイドに従ってトークンを取得してください。

DecoderConfig

共通 DecoderConfig/Parameters を参照してください。

StreamingRecognitionResult

{
start_at: integer,
duration: integer,
is_final: bool,
alternatives: [
{
text: string,
confidence: float,
words?: [
{
text: string,
start_at: integer,
duration: integer,
confidence: float
}
]
}
]
}

サンプルコード

package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
pb "github.com/vito-ai/go-genproto/vito-openapi/stt"
"github.com/xfrr/goffmpeg/transcoder"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)

const ServerHost = "grpc-openapi.vito.ai:443"

var ClientId = os.Getenv("RTZR_CLIENT_ID")
var ClientSecret = os.Getenv("RTZR_CLIENT_SECRET")

const SAMPLE_RATE int = 8000
const BYTES_PER_SAMPLE int = 2

var False = false
var True = true

/*
In this example, we simulate streaming input by reading an audio file.
When using the API, you need to provide a real-time audio stream, such as microphone input.
*/
type FileStreamer struct {
file *os.File
}

func (fs *FileStreamer) Read(p []byte) (int, error) {
byteSize := len(p)
maxSize := 1024
if byteSize > maxSize {
byteSize = maxSize
}

defer time.Sleep(time.Duration(byteSize/((SAMPLE_RATE*BYTES_PER_SAMPLE)/1000)) * time.Millisecond)
return fs.file.Read(p[:byteSize])
}

func (fs *FileStreamer) Close() error {
defer os.Remove(fs.file.Name())
return fs.file.Close()
}

func OpenAudioFile(audioFile string) (io.ReadCloser, error) {
fileName := filepath.Base(audioFile)
i := strings.LastIndex(fileName, ".")
audioFileName8K := filepath.Join(os.TempDir(), fileName[:i]) + fmt.Sprintf("_%d.%s", SAMPLE_RATE, "wav")
trans := new(transcoder.Transcoder)
if err := trans.Initialize(audioFile, audioFileName8K); err != nil {
log.Fatal(err)
}

trans.MediaFile().SetAudioRate(SAMPLE_RATE)
trans.MediaFile().SetAudioChannels(1)
trans.MediaFile().SetSkipVideo(true)
trans.MediaFile().SetAudioFilter("aresample=resampler=soxr")

err := <-trans.Run(false)
if err != nil {
return nil, fmt.Errorf("transcode audio file failed: %w", err)
}

file, err := os.Open(audioFileName8K)
if err != nil {
return nil, fmt.Errorf("open audio file failed: %w", err)
}

return &FileStreamer{file: file}, nil
}

func main() {
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s <AUDIOFILE>\n", filepath.Base(os.Args[0]))
fmt.Fprintf(os.Stderr, "<AUDIOFILE> must be a path to a local audio file. Audio file must be a 16-bit signed little-endian encoded with a sample rate of 16000.\n")

}
flag.Parse()
if len(flag.Args()) != 1 {
log.Fatal("Please pass path to your local audio file as a command line argument")
}
audioFile := flag.Arg(0)

data := map[string][]string{
"client_id": []string{ClientId},
"client_secret": []string{ClientSecret},
}
resp, _ := http.PostForm("https://openapi.vito.ai/v1/authenticate", data)

if resp.StatusCode != 200 {
panic("Failed to authenticate")
}

bytes, _ := io.ReadAll(resp.Body)
var result struct {
Token string `json:"access_token"`
}
json.Unmarshal(bytes, &result)

var dialOpts []grpc.DialOption
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")))
dialOpts = append(dialOpts, grpc.WithBlock())
dialOpts = append(dialOpts, grpc.WithTimeout(10*time.Second))
conn, err := grpc.Dial(ServerHost, dialOpts...)
if err != nil {
log.Fatalf("fail to dial: %v", err)
}
defer conn.Close()

md := metadata.Pairs("authorization", fmt.Sprintf("%s %v", "bearer", result.Token))
ctx := context.Background()
nCtx := metautils.NiceMD(md).ToOutgoing(ctx)
client := pb.NewOnlineDecoderClient(conn)
stream, err := client.Decode(nCtx)
if err != nil {
log.Printf("Failed to create stream: %v\n", err)
log.Fatal(err)
}

// Send the initial configuration message.
if err := stream.Send(&pb.DecoderRequest{
StreamingRequest: &pb.DecoderRequest_StreamingConfig{
StreamingConfig: &pb.DecoderConfig{
SampleRate: int32(SAMPLE_RATE),
Encoding: pb.DecoderConfig_LINEAR16,
UseItn: &True,
UseDisfluencyFilter: &False,
UseProfanityFilter: &False,
},
},
}); err != nil {
log.Fatal(err)
}

streamingFile, err := OpenAudioFile(audioFile)
if err != nil {
log.Fatal(err)
}
defer streamingFile.Close()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
buf := make([]byte, 1024)
for {
n, err := streamingFile.Read(buf)
if n > 0 {
if err := stream.Send(&pb.DecoderRequest{
StreamingRequest: &pb.DecoderRequest_AudioContent{
AudioContent: buf[:n],
},
}); err != nil {
log.Printf("Could not send audio: %v", err)
}
}
if err == io.EOF {
// Nothing else to pipe, close the stream.
if err := stream.CloseSend(); err != nil {
log.Fatalf("Could not close stream: %v", err)
}
return
}
if err != nil {
log.Printf("Could not read from %s: %v", audioFile, err)
continue
}
}
}()

_, err = stream.Recv()
if err != nil {
log.Fatalf("failed to recv: %v", err)
}

for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Cannot stream results: %v", err)
break
}

if err := resp.Error; err {
log.Printf("Could not recognize: %v", err)
break
}
for _, result := range resp.Results {
if result.IsFinal {
fmt.Printf("final: %v\n", result.Alternatives[0].Text)
} else {
fmt.Printf("%v\n", result.Alternatives[0].Text)
}
}
}
wg.Wait()
}

エラーコード

gRPC のエラーコードでエラーを示します。

CodeDescriptionNotes
16Unauthenticated認証失敗
3InvalidArgumentパラメータ不正
8ResourceExhausted使用量超過またはカード登録必要
13Internalサーバーエラー

備考

音声ファイルの変換は、バッチ STT の方が簡便な場合があります。