package job import ( "audio_transcoder/internal/svc" "audio_transcoder/internal/types" "bytes" "encoding/json" "errors" "fmt" "github.com/google/uuid" "github.com/zeromicro/go-zero/core/logx" "io" "net/http" "os" "silk2audio/silk" "strings" "sync" ) var ( StopTranscoderError = errors.New("stop transcoder") ) type CallbackReq struct { types.CallbackRequest FailNum int `json:"fail_num"` } // AutoTranscoder 自动处理队列 func AutoTranscoder(ctx *svc.ServiceContext) { sdk := ctx.BoltSdk for { key, value, err := sdk.GetFirst() if err != nil || key == "" || value == "" { continue } logx.Info("转码信息 data:", value) var req CallbackReq err = json.Unmarshal([]byte(value), &req) if err != nil { logx.Error(err) continue } //业务处理 resp, err := Transcoder(ctx, &req.CallbackRequest) logx.Info("转码结果 data:", resp, err) if err != nil { logx.Error(err) _ = sdk.Delete(key) //重新放入队列 req.FailNum++ if err != StopTranscoderError && req.FailNum < 3 { b, _ := json.Marshal(&req) logx.Info("转码失败!重新放入队列 data:", string(b)) _ = sdk.Set(uuid.NewString(), string(b)) } continue } _ = sdk.Delete(key) //数据回调 if req.CallbackUrl != "" { bts, _ := json.Marshal(resp) body, err := http.Post(req.CallbackUrl, "application/json", bytes.NewBuffer(bts)) if err != nil { logx.Error(err) continue } if body.StatusCode != http.StatusOK { logx.Errorf("post %v fail, ErrorCode %v", req.CallbackUrl, body.StatusCode) continue } } } } // Transcoder 转码 func Transcoder(svcCtx *svc.ServiceContext, req *types.CallbackRequest) (resp *types.OriginDataResponse, err error) { fmt.Println("转码中...") if req.Path == "" { err = StopTranscoderError return } // 去除空格 req.Path = strings.TrimSpace(req.Path) req.CallbackUrl = strings.TrimSpace(req.CallbackUrl) resp = &types.OriginDataResponse{OriginData: req.OriginData} // 发起HTTP GET请求 resp1, err := http.Get(req.Path) if err != nil { logx.Errorf("下载文件失败 error:%s", err.Error()) return } defer resp1.Body.Close() if resp1.StatusCode != http.StatusOK { err = errors.New("download file failed") logx.Errorf("下载文件失败 error:%v", resp1.StatusCode) return } // 读取响应体内容 inputPath := uuid.NewString() // 本地保存的文件路径 inputFile, err := os.Create(inputPath) if err != nil { logx.Errorf("读取文件失败 error:%s", err.Error()) return } _, err = io.Copy(inputFile, resp1.Body) inputFile.Close() if err != nil { logx.Errorf("读取文件失败 error:%s", err.Error()) return } //判断文件类型,如果不是指定语音则返回 inputFile, _ = os.Open(inputPath) var inputType int // 1:silk 2:mp3 var buf = make([]byte, 10) if _, err = inputFile.Read(buf); err == nil { inputFile.Close() if bytes.Equal(buf, []byte{2, 35, 33, 83, 73, 76, 75, 95, 86, 51}) { os.Rename(inputPath, inputPath+".silk") inputType = 1 inputPath += ".silk" } else if bytes.Equal(buf[:5], []byte{35, 33, 65, 77, 82}) { os.Rename(inputPath, inputPath+".mp3") inputType = 2 inputPath += ".mp3" } defer silk.FileRemove(inputPath) } if inputType == 0 { err = StopTranscoderError return } // 转码 var wavPath, pcmPath string if inputType == 1 { wavPath, pcmPath = silk.TransSilkToWav(inputPath) } else if inputType == 2 { wavPath, pcmPath = silk.TransMp3ToWav(inputPath) } defer silk.FileRemove(wavPath) defer silk.FileRemove(pcmPath) //多线程处理上传录音和语音识别 var wg sync.WaitGroup wg.Add(2) //构建返回数据 var err1, err2 error //上传录音 go func() { defer wg.Done() url, err := svcCtx.QiNiuSdk.Upload(wavPath, wavPath) if err != nil { logx.Errorf("上传七牛云失败 error:%s", err.Error()) return } err1 = err resp.Path = url }() //识别录音 go func() { defer wg.Done() bts, _ := os.ReadFile(pcmPath) asrResp, err := svcCtx.AsrSdk.Asr(bts, "pcm", 16000) if err != nil { logx.Errorf("语音识别失败 error:%s", err.Error()) return } err2 = err resp.Message = strings.Join(asrResp.Result, "\n") }() wg.Wait() //返回数据 if err1 != nil { return nil, err1 } if err2 != nil { return nil, err2 } fmt.Println("已转码...") return }