| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- package job
- import (
- "audio_transcoder/internal/svc"
- "audio_transcoder/internal/types"
- "bytes"
- "encoding/json"
- "errors"
- "github.com/google/uuid"
- "github.com/zeromicro/go-zero/core/logx"
- "io"
- "net/http"
- "os"
- "silk2audio/silk"
- "strings"
- "sync"
- )
- type CallbackReq struct {
- types.CallbackRequest
- FailNum int `json:"fail_num"`
- }
- // AutoTranscoder 自动处理队列
- func AutoTranscoder(ctx *svc.ServiceContext) {
- sdk := ctx.BoltSdk
- for {
- key, value, err := sdk.GetFirst()
- if err != nil || key == "" || value == "" {
- continue
- }
- logx.Info("转码信息 data:", value)
- var req CallbackReq
- err = json.Unmarshal([]byte(value), &req)
- if err != nil {
- logx.Error(err)
- continue
- }
- //业务处理
- resp, err := transcoder(ctx, &req.CallbackRequest)
- logx.Info("转码结果 data:", resp)
- if err != nil {
- logx.Error(err)
- _ = sdk.Delete(key)
- //重新放入队列
- req.FailNum++
- if req.FailNum < 10 {
- b, _ := json.Marshal(&req)
- logx.Info("转码失败!重新放入队列 data:", string(b))
- _ = sdk.Set(uuid.NewString(), string(b))
- }
- continue
- }
- _ = sdk.Delete(key)
- //数据回调
- if req.CallbackUrl != "" {
- bts, _ := json.Marshal(resp)
- body, err := http.Post(req.CallbackUrl, "application/json", bytes.NewBuffer(bts))
- if err != nil {
- logx.Error(err)
- continue
- }
- if body.StatusCode != http.StatusOK {
- logx.Errorf("post %v fail, ErrorCode %v", req.CallbackUrl, body.StatusCode)
- continue
- }
- }
- }
- }
- func transcoder(svcCtx *svc.ServiceContext, req *types.CallbackRequest) (resp *types.OriginDataResponse, err error) {
- if req.Path == "" {
- err = errors.New("path is empty")
- return
- }
- resp = &types.OriginDataResponse{OriginData: req.OriginData}
- // 发起HTTP GET请求
- resp1, err := http.Get(req.Path)
- if err != nil {
- logx.Errorf("下载文件失败 error:", err.Error())
- return
- }
- defer resp1.Body.Close()
- if resp1.StatusCode != http.StatusOK {
- err = errors.New("download file failed")
- logx.Errorf("下载文件失败 error:", resp1.StatusCode)
- return
- }
- // 读取响应体内容
- silkPath := uuid.NewString() + ".silk" // 本地保存的文件路径
- silkFile, err := os.Create(silkPath)
- if err != nil {
- logx.Errorf("读取silk文件失败 error:", err.Error())
- return
- }
- defer func() {
- silkFile.Close()
- silk.FileRemove(silkPath)
- }()
- _, err = io.Copy(silkFile, resp1.Body)
- if err != nil {
- logx.Errorf("读取文件失败 error:", err.Error())
- return
- }
- // 转码
- wavPath, pcmPath := silk.TransSilkToWav(silkPath)
- wavFile, err := os.Open(wavPath)
- if err != nil {
- logx.Errorf("读取wav文件失败 error:", err.Error())
- return
- }
- defer func() {
- wavFile.Close()
- silk.FileRemove(wavPath)
- }()
- pcmFile, err := os.Open(pcmPath)
- if err != nil {
- logx.Errorf("读取pcm文件失败 error:", err.Error())
- return
- }
- defer func() {
- pcmFile.Close()
- silk.FileRemove(pcmPath)
- }()
- //多线程处理上传录音和语音识别
- var wg sync.WaitGroup
- wg.Add(2)
- //构建返回数据
- var err1, err2 error
- //上传录音
- go func() {
- defer wg.Done()
- bts, _ := io.ReadAll(wavFile)
- url, err := svcCtx.QiNiuSdk.Upload(bts, int64(len(bts)), wavPath)
- if err != nil {
- logx.Errorf("上传七牛云失败 error:", err.Error())
- return
- }
- err1 = err
- resp.Path = url
- }()
- //识别录音
- go func() {
- defer wg.Done()
- bts, _ := io.ReadAll(pcmFile)
- asrResp, err := svcCtx.AsrSdk.Asr(bts, "pcm", 16000)
- if err != nil {
- logx.Errorf("语音识别失败 error:", err.Error())
- return
- }
- err2 = err
- resp.Message = strings.Join(asrResp.Result, "\n")
- }()
- wg.Wait()
- //返回数据
- if err1 != nil {
- return nil, err1
- }
- if err2 != nil {
- return nil, err2
- }
- return
- }
|