Kaynağa Gözat

新增转码回调接口

dxc 1 yıl önce
ebeveyn
işleme
f2e0eb3236

+ 2 - 0
.gitignore

@@ -1 +1,3 @@
 /pkg/silk2audio/silk/libSKP_SILK_SDK.a
+/transcoder.db
+/transcoder.db.lock

+ 1 - 0
etc/transcoder.yaml

@@ -1,6 +1,7 @@
 Name: transcoder
 Host: 0.0.0.0
 Port: 8888
+Timeout: 60000
 
 QiNiuConf:
   AccessKey: "B81Gsvry2StqKVE3txS-7v9GBBfqykC9zhebmxnW"

+ 2 - 0
go.mod

@@ -10,6 +10,8 @@ require (
 
 require (
 	github.com/beorn7/perks v1.0.1 // indirect
+	github.com/boltdb/bolt v1.3.1 // indirect
+	github.com/boltdb/boltd v0.0.0-20150220181201-1f04e2021e45 // indirect
 	github.com/cenkalti/backoff/v4 v4.2.1 // indirect
 	github.com/cespare/xxhash/v2 v2.2.0 // indirect
 	github.com/fatih/color v1.16.0 // indirect

+ 12 - 0
go.sum

@@ -1,5 +1,9 @@
 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
+github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
+github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
+github.com/boltdb/boltd v0.0.0-20150220181201-1f04e2021e45 h1:ep0QrTbZFgXa8e4lTAdd/NPY9ZiSP/ysDf26+UNPogo=
+github.com/boltdb/boltd v0.0.0-20150220181201-1f04e2021e45/go.mod h1:512MO8jN3usqrPdyZ8YtG6vLsQ8wGV5W9cykDwCy1WI=
 github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
 github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
 github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
@@ -11,6 +15,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
 github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
+github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
 github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
 github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
 github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
@@ -34,6 +39,9 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0 h1:RtRsiaGvWxcwd8y3BiRZxsylPT8
 github.com/grpc-ecosystem/grpc-gateway/v2 v2.18.0/go.mod h1:TzP6duP4Py2pHLVPPQp42aoYI92+PCrVotyR5e8Vqlk=
 github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 h1:2VTzZjLZBgl62/EtslCrtky5vbi9dd7HrQPQIx6wqiw=
 github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
+github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
+github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
+github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
 github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
 github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
@@ -50,6 +58,9 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
 github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
 github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
 github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
+github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
 github.com/openzipkin/zipkin-go v0.4.2 h1:zjqfqHjUpPmB3c1GlCvvgsM1G4LkvqQbBDueDOCg/jA=
 github.com/openzipkin/zipkin-go v0.4.2/go.mod h1:ZeVkFjuuBiSy13y8vpSDCjMi9GoI3hPpCJSBx/EYFhY=
 github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
@@ -156,6 +167,7 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
 golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY=
 google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de h1:jFNzHPIeuzhdRwVhbZdiym9q0ory/xY3sA+v2wPg8I0=
 google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:5iCWqnniDlqZHrd3neWVTOwvh/v6s3232omMecelax8=

+ 5 - 0
internal/handler/routes.go

@@ -17,6 +17,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
 				Path:    "/v1/transcoder",
 				Handler: TranscoderHandler(serverCtx),
 			},
+			{
+				Method:  http.MethodPost,
+				Path:    "/v1/transcoder/callback",
+				Handler: TranscoderCallbackHandler(serverCtx),
+			},
 		},
 	)
 }

+ 28 - 0
internal/handler/transcodercallbackhandler.go

@@ -0,0 +1,28 @@
+package handler
+
+import (
+	"net/http"
+
+	"audio_transcoder/internal/logic"
+	"audio_transcoder/internal/svc"
+	"audio_transcoder/internal/types"
+	"github.com/zeromicro/go-zero/rest/httpx"
+)
+
+func TranscoderCallbackHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
+	return func(w http.ResponseWriter, r *http.Request) {
+		var req types.CallbackRequest
+		if err := httpx.Parse(r, &req); err != nil {
+			httpx.ErrorCtx(r.Context(), w, err)
+			return
+		}
+
+		l := logic.NewTranscoderCallbackLogic(r.Context(), svcCtx)
+		resp, err := l.TranscoderCallback(&req)
+		if err != nil {
+			httpx.ErrorCtx(r.Context(), w, err)
+		} else {
+			httpx.OkJsonCtx(r.Context(), w, resp)
+		}
+	}
+}

+ 39 - 0
internal/logic/transcodercallbacklogic.go

@@ -0,0 +1,39 @@
+package logic
+
+import (
+	"audio_transcoder/internal/svc"
+	"audio_transcoder/internal/types"
+	"context"
+	"encoding/json"
+	"errors"
+	"github.com/google/uuid"
+
+	"github.com/zeromicro/go-zero/core/logx"
+)
+
+type TranscoderCallbackLogic struct {
+	logx.Logger
+	ctx    context.Context
+	svcCtx *svc.ServiceContext
+}
+
+func NewTranscoderCallbackLogic(ctx context.Context, svcCtx *svc.ServiceContext) *TranscoderCallbackLogic {
+	return &TranscoderCallbackLogic{
+		Logger: logx.WithContext(ctx),
+		ctx:    ctx,
+		svcCtx: svcCtx,
+	}
+}
+
+func (l *TranscoderCallbackLogic) TranscoderCallback(req *types.CallbackRequest) (resp *types.CallbackResponse, err error) {
+	if req.Path == "" {
+		err = errors.New("path is empty")
+		return
+	}
+	bts, err := json.Marshal(req)
+	if err != nil {
+		return
+	}
+	err = l.svcCtx.BoltSdk.Set(uuid.NewString(), string(bts))
+	return
+}

+ 3 - 0
internal/svc/servicecontext.go

@@ -4,12 +4,14 @@ import (
 	"audio_transcoder/internal/config"
 	"audio_transcoder/pkg/baidu"
 	"audio_transcoder/pkg/oss"
+	"audio_transcoder/pkg/storage"
 )
 
 type ServiceContext struct {
 	Config   config.Config
 	QiNiuSdk *oss.QiNiuSdk
 	AsrSdk   *baidu.AsrSdk
+	BoltSdk  *storage.BoltSdk
 }
 
 func NewServiceContext(c config.Config) *ServiceContext {
@@ -17,5 +19,6 @@ func NewServiceContext(c config.Config) *ServiceContext {
 		Config:   c,
 		QiNiuSdk: oss.NewQiNiuSdk(c.QiNiuConf.AccessKey, c.QiNiuConf.SecretKey, c.QiNiuConf.Bucket, c.QiNiuConf.HostUrl),
 		AsrSdk:   baidu.NewAsrSdk(c.BaiduAsrConf.AppID, c.BaiduAsrConf.APIKey, c.BaiduAsrConf.SecretKey),
+		BoltSdk:  storage.NewBoltSdk(),
 	}
 }

+ 15 - 0
internal/types/types.go

@@ -9,3 +9,18 @@ type Response struct {
 	Path    string `json:"path"`
 	Message string `json:"message"`
 }
+
+type CallbackRequest struct {
+	Path        string            `json:"path"`
+	CallbackUrl string            `json:"callback_url"` //回调地址
+	OriginData  map[string]string `json:"origin_data"`  //透传数据
+}
+
+type CallbackResponse struct {
+}
+
+type OriginDataResponse struct {
+	Path       string            `json:"path"`
+	Message    string            `json:"message"`
+	OriginData map[string]string `json:"origin_data"` //透传数据
+}

+ 163 - 0
job/job.go

@@ -0,0 +1,163 @@
+package job
+
+import (
+	"audio_transcoder/internal/svc"
+	"audio_transcoder/internal/types"
+	"bytes"
+	"encoding/json"
+	"errors"
+	"github.com/google/uuid"
+	"github.com/zeromicro/go-zero/core/logx"
+	"io"
+	"net/http"
+	"os"
+	"silk2audio/silk"
+	"strings"
+	"sync"
+)
+
+type CallbackReq struct {
+	types.CallbackRequest
+	FailNum int `json:"fail_num"`
+}
+
+// AutoTranscoder 自动处理队列
+func AutoTranscoder(ctx *svc.ServiceContext) {
+	sdk := ctx.BoltSdk
+	for {
+		key, value, err := sdk.GetFirst()
+		if err != nil || key == "" || value == "" {
+			continue
+		}
+		logx.Info("转码信息 data:", value)
+		var req CallbackReq
+		err = json.Unmarshal([]byte(value), &req)
+		if err != nil {
+			logx.Error(err)
+			continue
+		}
+		//业务处理
+		resp, err := transcoder(ctx, &req.CallbackRequest)
+		if err != nil {
+			logx.Error(err)
+			_ = sdk.Delete(key)
+			//重新放入队列
+			req.FailNum++
+			if req.FailNum < 10 {
+				b, _ := json.Marshal(&req)
+				logx.Info("转码失败!重新放入队列 data:", string(b))
+				_ = sdk.Set(uuid.NewString(), string(b))
+			}
+			continue
+		}
+		_ = sdk.Delete(key)
+		//数据回调
+		if req.CallbackUrl != "" {
+			bts, _ := json.Marshal(resp)
+			body, err := http.Post(req.CallbackUrl, "application/json", bytes.NewBuffer(bts))
+			if err != nil {
+				logx.Error(err)
+				return
+			}
+			if body.StatusCode != http.StatusOK {
+				logx.Errorf("post %v fail, ErrorCode %v", req.CallbackUrl, body.StatusCode)
+				return
+			}
+		}
+	}
+}
+
+func transcoder(svcCtx *svc.ServiceContext, req *types.CallbackRequest) (resp *types.OriginDataResponse, err error) {
+	if req.Path == "" {
+		err = errors.New("path is empty")
+		return
+	}
+	resp = &types.OriginDataResponse{OriginData: req.OriginData}
+	// 发起HTTP GET请求
+	resp1, err := http.Get(req.Path)
+	if err != nil {
+		logx.Errorf("下载文件失败 error:", err.Error())
+		return
+	}
+	defer resp1.Body.Close()
+	if resp1.StatusCode != http.StatusOK {
+		err = errors.New("download file failed")
+		logx.Errorf("下载文件失败 error:", resp1.StatusCode)
+		return
+	}
+	// 读取响应体内容
+	silkPath := uuid.NewString() + ".silk" // 本地保存的文件路径
+	silkFile, err := os.Create(silkPath)
+	if err != nil {
+		logx.Errorf("读取silk文件失败 error:", err.Error())
+		return
+	}
+	defer func() {
+		silkFile.Close()
+		silk.FileRemove(silkPath)
+	}()
+	_, err = io.Copy(silkFile, resp1.Body)
+	if err != nil {
+		logx.Errorf("读取文件失败 error:", err.Error())
+		return
+	}
+	// 转码
+	wavPath, pcmPath := silk.TransSilkToWav(silkPath)
+	wavFile, err := os.Open(wavPath)
+	if err != nil {
+		logx.Errorf("读取wav文件失败 error:", err.Error())
+		return
+	}
+	defer func() {
+		wavFile.Close()
+		silk.FileRemove(wavPath)
+	}()
+	pcmFile, err := os.Open(pcmPath)
+	if err != nil {
+		logx.Errorf("读取pcm文件失败 error:", err.Error())
+		return
+	}
+	defer func() {
+		pcmFile.Close()
+		silk.FileRemove(pcmPath)
+	}()
+	//多线程处理上传录音和语音识别
+	var wg sync.WaitGroup
+	wg.Add(2)
+	//构建返回数据
+	resp = &types.OriginDataResponse{}
+	var err1, err2 error
+	//上传录音
+	go func() {
+		defer wg.Done()
+		bts, _ := io.ReadAll(wavFile)
+		url, err := svcCtx.QiNiuSdk.Upload(bts, int64(len(bts)), wavPath)
+		if err != nil {
+			logx.Errorf("上传七牛云失败 error:", err.Error())
+			return
+		}
+		err1 = err
+		resp.Path = url
+	}()
+	//识别录音
+	go func() {
+		defer wg.Done()
+		bts, _ := io.ReadAll(pcmFile)
+		asrResp, err := svcCtx.AsrSdk.Asr(bts, "pcm", 16000)
+		if err != nil {
+			logx.Errorf("语音识别失败 error:", err.Error())
+			return
+		}
+		err2 = err
+		resp.Message = strings.Join(asrResp.Result, "\n")
+	}()
+	wg.Wait()
+	//返回数据
+	if err1 != nil {
+		return nil, err1
+	}
+	if err2 != nil {
+		return nil, err2
+	}
+	return
+}

+ 68 - 0
pkg/storage/bolt.go

@@ -0,0 +1,68 @@
+package storage
+
+import (
+	"fmt"
+	"github.com/boltdb/bolt"
+	"log"
+)
+
+type BoltSdk struct {
+	bucket []byte
+	db     *bolt.DB
+}
+
+func NewBoltSdk() *BoltSdk {
+	db, err := bolt.Open("transcoder.db", 0600, nil)
+	if err != nil {
+		log.Fatal(err)
+	}
+	bucket := []byte("transcoder")
+	_ = db.Update(func(tx *bolt.Tx) error {
+		_, err := tx.CreateBucketIfNotExists(bucket)
+		if err != nil && err != bolt.ErrBucketExists {
+			fmt.Println(err)
+		}
+		return err
+	})
+	return &BoltSdk{
+		db:     db,
+		bucket: bucket,
+	}
+}
+
+func (s *BoltSdk) Set(key, value string) (err error) {
+	err = s.db.Update(func(tx *bolt.Tx) error {
+		bucket := tx.Bucket(s.bucket)
+		if bucket == nil {
+			return fmt.Errorf("bucket not found")
+		}
+		return bucket.Put([]byte(key), []byte(value))
+	})
+	return
+}
+
+func (s *BoltSdk) Get(key string) (value string, err error) {
+	err = s.db.View(func(tx *bolt.Tx) error {
+		bts := tx.Bucket(s.bucket).Get([]byte(key))
+		value = string(bts)
+		return nil
+	})
+	return
+}
+
+func (s *BoltSdk) GetFirst() (key, value string, err error) {
+	err = s.db.Update(func(tx *bolt.Tx) error {
+		cursor := tx.Bucket(s.bucket).Cursor()
+		k, v := cursor.First()
+		key = string(k)
+		value = string(v)
+		return nil
+	})
+	return
+}
+func (s *BoltSdk) Delete(key string) (err error) {
+	err = s.db.Update(func(tx *bolt.Tx) error {
+		return tx.Bucket(s.bucket).Delete([]byte(key))
+	})
+	return
+}

+ 27 - 0
pkg/storage/bolt_test.go

@@ -0,0 +1,27 @@
+package storage
+
+import (
+	"fmt"
+	"github.com/google/uuid"
+	"testing"
+)
+
+func TestSet(t *testing.T) {
+	sdk := NewBoltSdk()
+	err := sdk.Set("2", uuid.NewString())
+	if err != nil {
+		fmt.Println(err)
+		return
+	}
+	value, err := sdk.Get("2")
+	fmt.Println(value, err)
+}
+
+func TestGet(t *testing.T) {
+	sdk := NewBoltSdk()
+	value, err := sdk.Get("1")
+	fmt.Println(value, err)
+	sdk.Delete("1")
+	k, v, err := sdk.GetFirst()
+	fmt.Println(k, v, err)
+}

BIN
pkg/storage/transcoder.db


+ 0 - 0
pkg/storage/transcoder.db.lock


+ 18 - 0
transcoder.api

@@ -7,7 +7,25 @@ type Response {
 	Message string `json:"message"`
 }
 
+type CallbackRequest {
+	Path        string            `json:"path"`
+	CallbackUrl string            `json:"callback_url"` //回调地址
+	OriginData  map[string]string `json:"origin_data"`  //透传数据
+}
+
+type CallbackResponse {
+}
+
+type OriginDataResponse {
+	Path       string            `json:"path"`
+	Message    string            `json:"message"`
+	OriginData map[string]string `json:"origin_data"` //透传数据
+}
+
 service transcoder {
 	@handler TranscoderHandler
 	get /v1/transcoder(Request) returns (Response)
+
+	@handler TranscoderCallbackHandler
+	post /v1/transcoder/callback(Request) returns (CallbackResponse)
 }

+ 4 - 0
transcoder.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"audio_transcoder/job"
 	"flag"
 	"fmt"
 
@@ -24,6 +25,9 @@ func main() {
 	defer server.Stop()
 
 	ctx := svc.NewServiceContext(c)
+	//处理任务
+	go job.AutoTranscoder(ctx)
+	//注册接口
 	handler.RegisterHandlers(server, ctx)
 
 	fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)