Skip to main content

Streaming STT - gRPC

This guide shows how to implement the gRPC mode for streaming STT.

Example

Examples use a local audio file as the source. For microphones or other devices, replace file reads with device input code. The proto file is available.

Authentication token

Obtain a token via the Authentication guide.

DecoderConfig

See Common DecoderConfig/Parameters.

StreamingRecognitionResult

{
start_at: integer,
duration: integer,
is_final: bool,
alternatives: [
{
text: string,
confidence: float,
words?: [
{
text: string,
start_at: integer,
duration: integer,
confidence: float
}
]
}
]
}

Sample code

package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
pb "github.com/vito-ai/go-genproto/vito-openapi/stt"
"github.com/xfrr/goffmpeg/transcoder"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)

const ServerHost = "grpc-openapi.vito.ai:443"

var ClientId = os.Getenv("RTZR_CLIENT_ID")
var ClientSecret = os.Getenv("RTZR_CLIENT_SECRET")

const SAMPLE_RATE int = 8000
const BYTES_PER_SAMPLE int = 2

var False = false
var True = true

/*
In this example, we simulate streaming input by reading an audio file.
When using the API, you need to provide a real-time audio stream, such as microphone input.
*/
type FileStreamer struct {
file *os.File
}

func (fs *FileStreamer) Read(p []byte) (int, error) {
byteSize := len(p)
maxSize := 1024
if byteSize > maxSize {
byteSize = maxSize
}

defer time.Sleep(time.Duration(byteSize/((SAMPLE_RATE*BYTES_PER_SAMPLE)/1000)) * time.Millisecond)
return fs.file.Read(p[:byteSize])
}

func (fs *FileStreamer) Close() error {
defer os.Remove(fs.file.Name())
return fs.file.Close()
}

func OpenAudioFile(audioFile string) (io.ReadCloser, error) {
fileName := filepath.Base(audioFile)
i := strings.LastIndex(fileName, ".")
audioFileName8K := filepath.Join(os.TempDir(), fileName[:i]) + fmt.Sprintf("_%d.%s", SAMPLE_RATE, "wav")
trans := new(transcoder.Transcoder)
if err := trans.Initialize(audioFile, audioFileName8K); err != nil {
log.Fatal(err)
}

trans.MediaFile().SetAudioRate(SAMPLE_RATE)
trans.MediaFile().SetAudioChannels(1)
trans.MediaFile().SetSkipVideo(true)
trans.MediaFile().SetAudioFilter("aresample=resampler=soxr")

err := <-trans.Run(false)
if err != nil {
return nil, fmt.Errorf("transcode audio file failed: %w", err)
}

file, err := os.Open(audioFileName8K)
if err != nil {
return nil, fmt.Errorf("open audio file failed: %w", err)
}

return &FileStreamer{file: file}, nil
}

func main() {
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s <AUDIOFILE>\n", filepath.Base(os.Args[0]))
fmt.Fprintf(os.Stderr, "<AUDIOFILE> must be a path to a local audio file. Audio file must be a 16-bit signed little-endian encoded with a sample rate of 16000.\n")

}
flag.Parse()
if len(flag.Args()) != 1 {
log.Fatal("Please pass path to your local audio file as a command line argument")
}
audioFile := flag.Arg(0)

data := map[string][]string{
"client_id": []string{ClientId},
"client_secret": []string{ClientSecret},
}
resp, _ := http.PostForm("https://openapi.vito.ai/v1/authenticate", data)

if resp.StatusCode != 200 {
panic("Failed to authenticate")
}

bytes, _ := io.ReadAll(resp.Body)
var result struct {
Token string `json:"access_token"`
}
json.Unmarshal(bytes, &result)

var dialOpts []grpc.DialOption
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")))
dialOpts = append(dialOpts, grpc.WithBlock())
dialOpts = append(dialOpts, grpc.WithTimeout(10*time.Second))
conn, err := grpc.Dial(ServerHost, dialOpts...)
if err != nil {
log.Fatalf("fail to dial: %v", err)
}
defer conn.Close()

md := metadata.Pairs("authorization", fmt.Sprintf("%s %v", "bearer", result.Token))
ctx := context.Background()
nCtx := metautils.NiceMD(md).ToOutgoing(ctx)
client := pb.NewOnlineDecoderClient(conn)
stream, err := client.Decode(nCtx)
if err != nil {
log.Printf("Failed to create stream: %v\n", err)
log.Fatal(err)
}

// Send the initial configuration message.
if err := stream.Send(&pb.DecoderRequest{
StreamingRequest: &pb.DecoderRequest_StreamingConfig{
StreamingConfig: &pb.DecoderConfig{
SampleRate: int32(SAMPLE_RATE),
Encoding: pb.DecoderConfig_LINEAR16,
UseItn: &True,
UseDisfluencyFilter: &False,
UseProfanityFilter: &False,
},
},
}); err != nil {
log.Fatal(err)
}

streamingFile, err := OpenAudioFile(audioFile)
if err != nil {
log.Fatal(err)
}
defer streamingFile.Close()

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
buf := make([]byte, 1024)
for {
n, err := streamingFile.Read(buf)
if n > 0 {
if err := stream.Send(&pb.DecoderRequest{
StreamingRequest: &pb.DecoderRequest_AudioContent{
AudioContent: buf[:n],
},
}); err != nil {
log.Printf("Could not send audio: %v", err)
}
}
if err == io.EOF {
// Nothing else to pipe, close the stream.
if err := stream.CloseSend(); err != nil {
log.Fatalf("Could not close stream: %v", err)
}
return
}
if err != nil {
log.Printf("Could not read from %s: %v", audioFile, err)
continue
}
}
}()

_, err = stream.Recv()
if err != nil {
log.Fatalf("failed to recv: %v", err)
}

for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Cannot stream results: %v", err)
break
}

if err := resp.Error; err {
log.Printf("Could not recognize: %v", err)
break
}
for _, result := range resp.Results {
if result.IsFinal {
fmt.Printf("final: %v\n", result.Alternatives[0].Text)
} else {
fmt.Printf("%v\n", result.Alternatives[0].Text)
}
}
}
wg.Wait()
}

Error codes

Errors are indicated via gRPC error codes.

CodeDescriptionNotes
16UnauthenticatedAuthentication failed
3InvalidArgumentInvalid parameter
8ResourceExhaustedUsage exceeded or card required
13InternalServer error

Notes

For converting audio files to text, Batch STT is often simpler than Streaming STT.