package job import ( "audio_transcoder/internal/svc" "audio_transcoder/internal/types" "bytes" "encoding/json" "errors" "github.com/google/uuid" "github.com/zeromicro/go-zero/core/logx" "io" "net/http" "os" "silk2audio/silk" "strings" "sync" ) type CallbackReq struct { types.CallbackRequest FailNum int `json:"fail_num"` } // AutoTranscoder 自动处理队列 func AutoTranscoder(ctx *svc.ServiceContext) { sdk := ctx.BoltSdk for { key, value, err := sdk.GetFirst() if err != nil || key == "" || value == "" { continue } logx.Info("转码信息 data:", value) var req CallbackReq err = json.Unmarshal([]byte(value), &req) if err != nil { logx.Error(err) continue } //业务处理 resp, err := transcoder(ctx, &req.CallbackRequest) logx.Info("转码结果 data:", resp) if err != nil { logx.Error(err) _ = sdk.Delete(key) //重新放入队列 req.FailNum++ if req.FailNum < 10 { b, _ := json.Marshal(&req) logx.Info("转码失败!重新放入队列 data:", string(b)) _ = sdk.Set(uuid.NewString(), string(b)) } continue } _ = sdk.Delete(key) //数据回调 if req.CallbackUrl != "" { bts, _ := json.Marshal(resp) body, err := http.Post(req.CallbackUrl, "application/json", bytes.NewBuffer(bts)) if err != nil { logx.Error(err) continue } if body.StatusCode != http.StatusOK { logx.Errorf("post %v fail, ErrorCode %v", req.CallbackUrl, body.StatusCode) continue } } } } func transcoder(svcCtx *svc.ServiceContext, req *types.CallbackRequest) (resp *types.OriginDataResponse, err error) { if req.Path == "" { err = errors.New("path is empty") return } resp = &types.OriginDataResponse{OriginData: req.OriginData} // 发起HTTP GET请求 resp1, err := http.Get(req.Path) if err != nil { logx.Errorf("下载文件失败 error:", err.Error()) return } defer resp1.Body.Close() if resp1.StatusCode != http.StatusOK { err = errors.New("download file failed") logx.Errorf("下载文件失败 error:", resp1.StatusCode) return } // 读取响应体内容 silkPath := uuid.NewString() + ".silk" // 本地保存的文件路径 silkFile, err := os.Create(silkPath) if err != nil { logx.Errorf("读取silk文件失败 error:", err.Error()) return } defer func() { silkFile.Close() silk.FileRemove(silkPath) }() _, err = io.Copy(silkFile, resp1.Body) if err != nil { logx.Errorf("读取文件失败 error:", err.Error()) return } // 转码 wavPath, pcmPath := silk.TransSilkToWav(silkPath) wavFile, err := os.Open(wavPath) if err != nil { logx.Errorf("读取wav文件失败 error:", err.Error()) return } defer func() { wavFile.Close() silk.FileRemove(wavPath) }() pcmFile, err := os.Open(pcmPath) if err != nil { logx.Errorf("读取pcm文件失败 error:", err.Error()) return } defer func() { pcmFile.Close() silk.FileRemove(pcmPath) }() //多线程处理上传录音和语音识别 var wg sync.WaitGroup wg.Add(2) //构建返回数据 var err1, err2 error //上传录音 go func() { defer wg.Done() bts, _ := io.ReadAll(wavFile) url, err := svcCtx.QiNiuSdk.Upload(bts, int64(len(bts)), wavPath) if err != nil { logx.Errorf("上传七牛云失败 error:", err.Error()) return } err1 = err resp.Path = url }() //识别录音 go func() { defer wg.Done() bts, _ := io.ReadAll(pcmFile) asrResp, err := svcCtx.AsrSdk.Asr(bts, "pcm", 16000) if err != nil { logx.Errorf("语音识别失败 error:", err.Error()) return } err2 = err resp.Message = strings.Join(asrResp.Result, "\n") }() wg.Wait() //返回数据 if err1 != nil { return nil, err1 } if err2 != nil { return nil, err2 } return }