| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- package job
- import (
- "audio_transcoder/internal/svc"
- "audio_transcoder/internal/types"
- "bytes"
- "encoding/json"
- "errors"
- "fmt"
- "github.com/google/uuid"
- "github.com/zeromicro/go-zero/core/logx"
- "io"
- "net/http"
- "os"
- "silk2audio/silk"
- "strings"
- "sync"
- )
- var (
- StopTranscoderError = errors.New("stop transcoder")
- )
- type CallbackReq struct {
- types.CallbackRequest
- FailNum int `json:"fail_num"`
- }
- // AutoTranscoder 自动处理队列
- func AutoTranscoder(ctx *svc.ServiceContext) {
- sdk := ctx.BoltSdk
- for {
- key, value, err := sdk.GetFirst()
- if err != nil || key == "" || value == "" {
- continue
- }
- logx.Info("转码信息 data:", value)
- var req CallbackReq
- err = json.Unmarshal([]byte(value), &req)
- if err != nil {
- logx.Error(err)
- continue
- }
- //业务处理
- resp, err := Transcoder(ctx, &req.CallbackRequest)
- logx.Info("转码结果 data:", resp, err)
- if err != nil {
- logx.Error(err)
- _ = sdk.Delete(key)
- //重新放入队列
- req.FailNum++
- if err != StopTranscoderError && req.FailNum < 3 {
- b, _ := json.Marshal(&req)
- logx.Info("转码失败!重新放入队列 data:", string(b))
- _ = sdk.Set(uuid.NewString(), string(b))
- }
- continue
- }
- _ = sdk.Delete(key)
- //数据回调
- if req.CallbackUrl != "" {
- bts, _ := json.Marshal(resp)
- body, err := http.Post(req.CallbackUrl, "application/json", bytes.NewBuffer(bts))
- if err != nil {
- logx.Error(err)
- continue
- }
- if body.StatusCode != http.StatusOK {
- logx.Errorf("post %v fail, ErrorCode %v", req.CallbackUrl, body.StatusCode)
- continue
- }
- }
- }
- }
- // Transcoder 转码
- func Transcoder(svcCtx *svc.ServiceContext, req *types.CallbackRequest) (resp *types.OriginDataResponse, err error) {
- fmt.Println("转码中...")
- if req.Path == "" {
- err = StopTranscoderError
- return
- }
- // 去除空格
- req.Path = strings.TrimSpace(req.Path)
- req.CallbackUrl = strings.TrimSpace(req.CallbackUrl)
- resp = &types.OriginDataResponse{OriginData: req.OriginData}
- // 发起HTTP GET请求
- resp1, err := http.Get(req.Path)
- if err != nil {
- logx.Errorf("下载文件失败 error:%s", err.Error())
- return
- }
- defer resp1.Body.Close()
- if resp1.StatusCode != http.StatusOK {
- err = errors.New("download file failed")
- logx.Errorf("下载文件失败 error:%v", resp1.StatusCode)
- return
- }
- // 读取响应体内容
- inputPath := uuid.NewString() // 本地保存的文件路径
- inputFile, err := os.Create(inputPath)
- if err != nil {
- logx.Errorf("读取文件失败 error:%s", err.Error())
- return
- }
- _, err = io.Copy(inputFile, resp1.Body)
- inputFile.Close()
- if err != nil {
- logx.Errorf("读取文件失败 error:%s", err.Error())
- return
- }
- //判断文件类型,如果不是指定语音则返回
- inputFile, _ = os.Open(inputPath)
- var inputType int // 1:silk 2:mp3
- var buf = make([]byte, 10)
- if _, err = inputFile.Read(buf); err == nil {
- inputFile.Close()
- if bytes.Equal(buf, []byte{2, 35, 33, 83, 73, 76, 75, 95, 86, 51}) {
- os.Rename(inputPath, inputPath+".silk")
- inputType = 1
- inputPath += ".silk"
- } else if bytes.Equal(buf[:5], []byte{35, 33, 65, 77, 82}) {
- os.Rename(inputPath, inputPath+".mp3")
- inputType = 2
- inputPath += ".mp3"
- }
- defer silk.FileRemove(inputPath)
- }
- if inputType == 0 {
- err = StopTranscoderError
- return
- }
- // 转码
- var wavPath, pcmPath string
- if inputType == 1 {
- wavPath, pcmPath = silk.TransSilkToWav(inputPath)
- } else if inputType == 2 {
- wavPath, pcmPath = silk.TransMp3ToWav(inputPath)
- }
- defer silk.FileRemove(wavPath)
- defer silk.FileRemove(pcmPath)
- //多线程处理上传录音和语音识别
- var wg sync.WaitGroup
- wg.Add(2)
- //构建返回数据
- var err1, err2 error
- //上传录音
- go func() {
- defer wg.Done()
- url, err := svcCtx.QiNiuSdk.Upload(wavPath, wavPath)
- if err != nil {
- logx.Errorf("上传七牛云失败 error:%s", err.Error())
- return
- }
- err1 = err
- resp.Path = url
- }()
- //识别录音
- go func() {
- defer wg.Done()
- bts, _ := os.ReadFile(pcmPath)
- asrResp, err := svcCtx.AsrSdk.Asr(bts, "pcm", 16000)
- if err != nil {
- logx.Errorf("语音识别失败 error:%s", err.Error())
- return
- }
- err2 = err
- resp.Message = strings.Join(asrResp.Result, "\n")
- }()
- wg.Wait()
- //返回数据
- if err1 != nil {
- return nil, err1
- }
- if err2 != nil {
- return nil, err2
- }
- fmt.Println("已转码...")
- return
- }
|