| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311 |
- package callback
- import (
- "callback/model"
- "callback/pkg/lock"
- "context"
- "encoding/json"
- "encoding/xml"
- "fmt"
- "github.com/elliotchance/pie/pie"
- "github.com/valyala/fastjson"
- "io/ioutil"
- "net/http"
- "strconv"
- "time"
- "callback/internal/svc"
- "callback/internal/types"
- "github.com/zeromicro/go-zero/core/logx"
- )
- type CallbackMsgLogic struct {
- logx.Logger
- ctx context.Context
- svcCtx *svc.ServiceContext
- }
- func NewCallbackMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CallbackMsgLogic {
- return &CallbackMsgLogic{
- Logger: logx.WithContext(ctx),
- ctx: ctx,
- svcCtx: svcCtx,
- }
- }
- func (l *CallbackMsgLogic) CallbackMsg(req *types.CallbackMsgRequest, w http.ResponseWriter, r *http.Request) (err error) {
- wxcpt := l.svcCtx.Wxcpt
- defer r.Body.Close()
- bts, err := ioutil.ReadAll(r.Body)
- //logx.Info(fmt.Sprintf("--> OriginalParam: %+v ,Error: %v\n", req, err))
- //logx.Info(fmt.Sprintf("--> OriginalBody: %v ,Error: %v\n", string(bts), err))
- //xml转json
- var callbackMsg types.CallbackMsg
- err = xml.Unmarshal(bts, &callbackMsg)
- if err != nil {
- return
- }
- bts, err = json.Marshal(callbackMsg)
- //解密消息
- msg, err_ := wxcpt.DecryptMsg(req.MsgSignature, req.Timestamp, req.Nonce, bts)
- if err_ != nil {
- logx.Error("DecryptMsg Error: ", err_)
- }
- logx.Info(fmt.Sprintf("--> DecryptMsg: %v ,Error: %v\n", string(msg), err_))
- //xml转结构体
- var ct types.EventContent
- err = xml.Unmarshal(msg, &ct)
- if nil != err {
- logx.Error("Unmarshal fail ", err)
- } else {
- //事件处理
- if ct.MsgType == types.Event && ct.Event == types.KfMsgOrEvent {
- //新消息事件
- go KfMsgOrEventHandle(l.svcCtx, ct)
- } else if ct.MsgType == types.Event && ct.Event == types.KfAccountAuthChange {
- //客服账号变动事件
- go KfAccountAuthChangeHandle(l.svcCtx, ct)
- }
- }
- //加密返回
- now := time.Now()
- encryptMsg, cryptErr := wxcpt.EncryptMsg("", strconv.FormatInt(now.Unix(), 10), strconv.FormatInt(now.Unix(), 10))
- if nil != cryptErr {
- logx.Error("DecryptMsg fail ", cryptErr)
- }
- _, err = w.Write(encryptMsg)
- return
- }
- // KfMsgOrEventHandle 新消息或者事件上报处理
- func KfMsgOrEventHandle(svcCtx *svc.ServiceContext, ct types.EventContent) {
- //获取分布式锁
- rl := lock.NewRedisLock(ct.OpenKfId, svcCtx.Redis)
- if err := rl.Lock(); err != nil {
- logx.Error("获取锁失败", err)
- }
- defer rl.UnLock()
- cursor, err := svcCtx.Redis.Get(fmt.Sprintf("cb_cursor:%v", ct.OpenKfId))
- if err != nil || cursor == "" {
- var cbService *model.CbService
- cbService, err = svcCtx.CbServiceModel.GetServiceByOpenKfid(ct.OpenKfId)
- cursor = cbService.NextCursor
- if err != nil {
- go KfAccountAuthChangeHandle(svcCtx, ct)
- }
- }
- for true {
- //查询消息列表
- err, more, nextCursor, list := svcCtx.WxApi.GetMsgList(cursor, ct.Token, ct.OpenKfId)
- if err != nil {
- logx.Error("GetMsgList fail ", err)
- break
- }
- //更新游标
- cursor = nextCursor
- err = svcCtx.Redis.Set(fmt.Sprintf("cb_cursor:%v", ct.OpenKfId), cursor)
- if err != nil {
- logx.Error(err)
- }
- _ = svcCtx.CbServiceModel.UpdateCursorByOpenKfid(ct.OpenKfId, cursor)
- //保存消息到数据库
- var userIds []string
- for _, msg := range list {
- userIds = append(userIds, msg.ExternalUserid)
- cbMsg := &model.CbMsg{}
- cbMsg.Msgid = msg.Msgid
- cbMsg.OpenKfid = msg.OpenKfid
- cbMsg.ExternalUserid = msg.ExternalUserid
- cbMsg.SendTime = int64(msg.SendTime)
- cbMsg.Origin = int64(msg.Origin)
- cbMsg.ServicerUserid = msg.ServicerUserid
- cbMsg.Msgtype = msg.Msgtype
- cbMsg.DataInfo = msg.DataInfo
- now := time.Now()
- cbMsg.CreatedAt = now
- cbMsg.UpdatedAt = now
- _, err = svcCtx.CbMsgModel.Insert(nil, cbMsg)
- if err != nil {
- logx.Error("Insert cb_msg fail ", err)
- }
- //消息额外处理
- if cbMsg.Msgtype == types.MsgTypeText {
- msgTypeTextHandle(svcCtx, cbMsg)
- }
- //事件消息额外处理
- if cbMsg.Msgtype == types.MsgTypeEvent {
- msgTypeEventHandle(svcCtx, cbMsg)
- }
- }
- go updateCustomerList(svcCtx, userIds)
- if more == 0 {
- break
- }
- }
- return
- }
- // KfAccountAuthChangeHandle 管理员权限修改事件上报处理
- func KfAccountAuthChangeHandle(svcCtx *svc.ServiceContext, ct types.EventContent) {
- err, list := svcCtx.WxApi.GetServiceList()
- if err != nil {
- logx.Error("GetServiceList fail ", err)
- return
- }
- for _, s := range list {
- cbService, err := svcCtx.CbServiceModel.GetServiceByOpenKfid(ct.OpenKfId)
- //保存客服账号
- if err != nil {
- cbService.NextCursor, _ = svcCtx.Redis.Get(fmt.Sprintf("cb_cursor:%v", ct.OpenKfId))
- cbService.Corpid = svcCtx.Config.Wxwork.Corpid
- cbService.OpenKfid = s.OpenKfid
- cbService.Name = s.Name
- cbService.Avatar = s.Avatar
- now := time.Now()
- cbService.CreatedAt = now
- cbService.UpdatedAt = now
- _, err := svcCtx.CbServiceModel.Insert(nil, cbService)
- if err != nil {
- logx.Error("Insert cb_service fail ", err)
- }
- }
- //保存接待人员列表
- err, accounts := svcCtx.WxApi.GetServicerList(s.OpenKfid)
- if err != nil {
- logx.Error("GetServicerList fail ", err)
- }
- for _, a := range accounts {
- err := svcCtx.CbServicerModel.DeleteServicerByOpenKfid(s.OpenKfid)
- if err != nil {
- logx.Error("DeleteServicerByOpenKfid fail ", err)
- }
- cbServicer := &model.CbServicer{}
- cbServicer.OpenKfid = s.OpenKfid
- cbServicer.Corpid = svcCtx.Config.Wxwork.Corpid
- cbServicer.Userid = a.Userid
- cbServicer.Status = a.Status
- cbServicer.DepartmentId = a.DepartmentId
- now := time.Now()
- cbServicer.CreatedAt = now
- cbServicer.UpdatedAt = now
- _, err = svcCtx.CbServicerModel.Insert(nil, cbServicer)
- if err != nil {
- logx.Error("Insert cb_servicer fail ", err)
- }
- //保存成员信息
- staff, err := svcCtx.CbStaffModel.GetStaff(svcCtx.Config.Wxwork.Corpid, a.Userid)
- if err != nil {
- err, d := svcCtx.WxApi.GetStaffList(a.Userid)
- if err != nil {
- logx.Error("GetStaffList fail ", err)
- } else {
- staff.Corpid = svcCtx.Config.Wxwork.Corpid
- staff.Userid = d.Userid
- staff.Name = d.Name
- staff.Status = d.Status
- staff.MainDepartment = d.MainDepartment
- now := time.Now()
- staff.CreatedAt = now
- staff.UpdatedAt = now
- _, err := svcCtx.CbStaffModel.Insert(nil, staff)
- if err != nil {
- logx.Error("Insert cb_servicer fail ", err)
- continue
- }
- }
- }
- }
- }
- return
- }
- // 更新客户信息
- func updateCustomerList(svcCtx *svc.ServiceContext, userIds pie.Strings) {
- for _, uid := range userIds.Unique() {
- if uid == "" {
- continue
- }
- _, err := svcCtx.CbCustomerModel.GetCustomerByExternalUserid(uid)
- key := fmt.Sprintf("cb_user_state:%v", uid)
- v, _ := svcCtx.Redis.Get(key)
- if v == "4" {
- continue
- }
- if err != nil {
- //此处因为该接口查询多个用户时,如果其中一个超过24小时不回复,会导致所有信息都不会返回,因此这里单独请求
- err, list := svcCtx.WxApi.GetCustomerList([]string{uid})
- if err != nil || len(list) == 0 {
- logx.Error("GetCustomerList fail ", err)
- continue
- }
- c := &model.CbCustomer{}
- c.ExternalUserid = list[0].ExternalUserid
- c.Nickname = list[0].Nickname
- c.Avatar = list[0].Avatar
- c.Gender = int64(list[0].Gender)
- now := time.Now()
- c.CreatedAt = now
- c.UpdatedAt = now
- _, err = svcCtx.CbCustomerModel.Insert(nil, c)
- if err != nil {
- logx.Error("Insert cb_customer fail ", err)
- }
- }
- }
- }
- // 消息额外处理
- func msgTypeTextHandle(svcCtx *svc.ServiceContext, msg *model.CbMsg) {
- key := fmt.Sprintf("cb_user_state:%v", msg.ExternalUserid)
- v, _ := svcCtx.Redis.Get(key)
- if v == "" || v == "4" {
- //获取会话状态
- err, sessionState := svcCtx.WxApi.GetSessionState(msg.OpenKfid, msg.ExternalUserid)
- if err != nil {
- logx.Error("GetSessionState fail ", err)
- }
- _ = svcCtx.Redis.Set(key, fmt.Sprintf("%v", sessionState.ServiceState))
- v = fmt.Sprintf("%v", sessionState.ServiceState)
- }
- if v == "0" || v == "1" {
- //进入会话事件,加入消息流转池
- err := svcCtx.WxApi.UpdateSessionState(msg.OpenKfid, msg.ExternalUserid, "", 2)
- if err != nil {
- logx.Error("UpdateSessionState fail ", err)
- }
- //获取会话状态
- err, sessionState := svcCtx.WxApi.GetSessionState(msg.OpenKfid, msg.ExternalUserid)
- if err != nil {
- logx.Error("GetSessionState fail ", err)
- }
- _ = svcCtx.Redis.Set(key, fmt.Sprintf("%v", sessionState.ServiceState))
- return
- }
- return
- }
- // 事件消息额外处理
- func msgTypeEventHandle(svcCtx *svc.ServiceContext, msg *model.CbMsg) {
- content, err := fastjson.Parse(msg.DataInfo)
- if err != nil {
- logx.Error("fastjson.Parse(cbMsg.DataInfo) fail ", err)
- }
- eventType := content.GetStringBytes("event_type")
- switch string(eventType) {
- case types.EventTypeSessionStatusChange:
- key := fmt.Sprintf("cb_user_state:%v", msg.ExternalUserid)
- _, _ = svcCtx.Redis.Del(key)
- //获取会话状态
- err, sessionState := svcCtx.WxApi.GetSessionState(msg.OpenKfid, msg.ExternalUserid)
- if err != nil {
- logx.Error("GetSessionState fail ", err)
- }
- _ = svcCtx.Redis.Set(key, fmt.Sprintf("%v", sessionState.ServiceState))
- return
- case types.EventTypeEnterSession:
- }
- }
|