callbackmsglogic.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package callback
  2. import (
  3. "callback/model"
  4. "callback/pkg/lock"
  5. "context"
  6. "encoding/json"
  7. "encoding/xml"
  8. "fmt"
  9. "github.com/elliotchance/pie/pie"
  10. "github.com/valyala/fastjson"
  11. "io/ioutil"
  12. "net/http"
  13. "strconv"
  14. "time"
  15. "callback/internal/svc"
  16. "callback/internal/types"
  17. "github.com/zeromicro/go-zero/core/logx"
  18. )
  19. type CallbackMsgLogic struct {
  20. logx.Logger
  21. ctx context.Context
  22. svcCtx *svc.ServiceContext
  23. }
  24. func NewCallbackMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CallbackMsgLogic {
  25. return &CallbackMsgLogic{
  26. Logger: logx.WithContext(ctx),
  27. ctx: ctx,
  28. svcCtx: svcCtx,
  29. }
  30. }
  31. func (l *CallbackMsgLogic) CallbackMsg(req *types.CallbackMsgRequest, w http.ResponseWriter, r *http.Request) (err error) {
  32. wxcpt := l.svcCtx.Wxcpt
  33. defer r.Body.Close()
  34. bts, err := ioutil.ReadAll(r.Body)
  35. //logx.Info(fmt.Sprintf("--> OriginalParam: %+v ,Error: %v\n", req, err))
  36. //logx.Info(fmt.Sprintf("--> OriginalBody: %v ,Error: %v\n", string(bts), err))
  37. //xml转json
  38. var callbackMsg types.CallbackMsg
  39. err = xml.Unmarshal(bts, &callbackMsg)
  40. if err != nil {
  41. return
  42. }
  43. bts, err = json.Marshal(callbackMsg)
  44. //解密消息
  45. msg, err_ := wxcpt.DecryptMsg(req.MsgSignature, req.Timestamp, req.Nonce, bts)
  46. if err_ != nil {
  47. logx.Error("DecryptMsg Error: ", err_)
  48. }
  49. logx.Info(fmt.Sprintf("--> DecryptMsg: %v ,Error: %v\n", string(msg), err_))
  50. //xml转结构体
  51. var ct types.EventContent
  52. err = xml.Unmarshal(msg, &ct)
  53. if nil != err {
  54. logx.Error("Unmarshal fail ", err)
  55. } else {
  56. //事件处理
  57. if ct.MsgType == types.Event && ct.Event == types.KfMsgOrEvent {
  58. //新消息事件
  59. go KfMsgOrEventHandle(l.svcCtx, ct)
  60. } else if ct.MsgType == types.Event && ct.Event == types.KfAccountAuthChange {
  61. //客服账号变动事件
  62. go KfAccountAuthChangeHandle(l.svcCtx, ct)
  63. }
  64. }
  65. //加密返回
  66. now := time.Now()
  67. encryptMsg, cryptErr := wxcpt.EncryptMsg("", strconv.FormatInt(now.Unix(), 10), strconv.FormatInt(now.Unix(), 10))
  68. if nil != cryptErr {
  69. logx.Error("DecryptMsg fail ", cryptErr)
  70. }
  71. _, err = w.Write(encryptMsg)
  72. return
  73. }
  74. // KfMsgOrEventHandle 新消息或者事件上报处理
  75. func KfMsgOrEventHandle(svcCtx *svc.ServiceContext, ct types.EventContent) {
  76. //获取分布式锁
  77. rl := lock.NewRedisLock(ct.OpenKfId, svcCtx.Redis)
  78. if err := rl.Lock(); err != nil {
  79. logx.Error("获取锁失败", err)
  80. }
  81. defer rl.UnLock()
  82. cursor, err := svcCtx.Redis.Get(fmt.Sprintf("cb_cursor:%v", ct.OpenKfId))
  83. if err != nil || cursor == "" {
  84. var cbService *model.CbService
  85. cbService, err = svcCtx.CbServiceModel.GetServiceByOpenKfid(ct.OpenKfId)
  86. cursor = cbService.NextCursor
  87. if err != nil {
  88. go KfAccountAuthChangeHandle(svcCtx, ct)
  89. }
  90. }
  91. for true {
  92. //查询消息列表
  93. err, more, nextCursor, list := svcCtx.WxApi.GetMsgList(cursor, ct.Token, ct.OpenKfId)
  94. if err != nil {
  95. logx.Error("GetMsgList fail ", err)
  96. break
  97. }
  98. //更新游标
  99. cursor = nextCursor
  100. err = svcCtx.Redis.Set(fmt.Sprintf("cb_cursor:%v", ct.OpenKfId), cursor)
  101. if err != nil {
  102. logx.Error(err)
  103. }
  104. _ = svcCtx.CbServiceModel.UpdateCursorByOpenKfid(ct.OpenKfId, cursor)
  105. //保存消息到数据库
  106. var userIds []string
  107. for _, msg := range list {
  108. userIds = append(userIds, msg.ExternalUserid)
  109. cbMsg := &model.CbMsg{}
  110. cbMsg.Msgid = msg.Msgid
  111. cbMsg.OpenKfid = msg.OpenKfid
  112. cbMsg.ExternalUserid = msg.ExternalUserid
  113. cbMsg.SendTime = int64(msg.SendTime)
  114. cbMsg.Origin = int64(msg.Origin)
  115. cbMsg.ServicerUserid = msg.ServicerUserid
  116. cbMsg.Msgtype = msg.Msgtype
  117. cbMsg.DataInfo = msg.DataInfo
  118. now := time.Now()
  119. cbMsg.CreatedAt = now
  120. cbMsg.UpdatedAt = now
  121. _, err = svcCtx.CbMsgModel.Insert(nil, cbMsg)
  122. if err != nil {
  123. logx.Error("Insert cb_msg fail ", err)
  124. }
  125. //消息额外处理
  126. if cbMsg.Msgtype == types.MsgTypeText {
  127. msgTypeTextHandle(svcCtx, cbMsg)
  128. }
  129. //事件消息额外处理
  130. if cbMsg.Msgtype == types.MsgTypeEvent {
  131. msgTypeEventHandle(svcCtx, cbMsg)
  132. }
  133. }
  134. go updateCustomerList(svcCtx, userIds)
  135. if more == 0 {
  136. break
  137. }
  138. }
  139. return
  140. }
  141. // KfAccountAuthChangeHandle 管理员权限修改事件上报处理
  142. func KfAccountAuthChangeHandle(svcCtx *svc.ServiceContext, ct types.EventContent) {
  143. err, list := svcCtx.WxApi.GetServiceList()
  144. if err != nil {
  145. logx.Error("GetServiceList fail ", err)
  146. return
  147. }
  148. for _, s := range list {
  149. cbService, err := svcCtx.CbServiceModel.GetServiceByOpenKfid(ct.OpenKfId)
  150. //保存客服账号
  151. if err != nil {
  152. cbService.NextCursor, _ = svcCtx.Redis.Get(fmt.Sprintf("cb_cursor:%v", ct.OpenKfId))
  153. cbService.Corpid = svcCtx.Config.Wxwork.Corpid
  154. cbService.OpenKfid = s.OpenKfid
  155. cbService.Name = s.Name
  156. cbService.Avatar = s.Avatar
  157. now := time.Now()
  158. cbService.CreatedAt = now
  159. cbService.UpdatedAt = now
  160. _, err := svcCtx.CbServiceModel.Insert(nil, cbService)
  161. if err != nil {
  162. logx.Error("Insert cb_service fail ", err)
  163. }
  164. }
  165. //保存接待人员列表
  166. err, accounts := svcCtx.WxApi.GetServicerList(s.OpenKfid)
  167. if err != nil {
  168. logx.Error("GetServicerList fail ", err)
  169. }
  170. for _, a := range accounts {
  171. err := svcCtx.CbServicerModel.DeleteServicerByOpenKfid(s.OpenKfid)
  172. if err != nil {
  173. logx.Error("DeleteServicerByOpenKfid fail ", err)
  174. }
  175. cbServicer := &model.CbServicer{}
  176. cbServicer.OpenKfid = s.OpenKfid
  177. cbServicer.Corpid = svcCtx.Config.Wxwork.Corpid
  178. cbServicer.Userid = a.Userid
  179. cbServicer.Status = a.Status
  180. cbServicer.DepartmentId = a.DepartmentId
  181. now := time.Now()
  182. cbServicer.CreatedAt = now
  183. cbServicer.UpdatedAt = now
  184. _, err = svcCtx.CbServicerModel.Insert(nil, cbServicer)
  185. if err != nil {
  186. logx.Error("Insert cb_servicer fail ", err)
  187. }
  188. //保存成员信息
  189. staff, err := svcCtx.CbStaffModel.GetStaff(svcCtx.Config.Wxwork.Corpid, a.Userid)
  190. if err != nil {
  191. err, d := svcCtx.WxApi.GetStaffList(a.Userid)
  192. if err != nil {
  193. logx.Error("GetStaffList fail ", err)
  194. } else {
  195. staff.Corpid = svcCtx.Config.Wxwork.Corpid
  196. staff.Userid = d.Userid
  197. staff.Name = d.Name
  198. staff.Status = d.Status
  199. staff.MainDepartment = d.MainDepartment
  200. now := time.Now()
  201. staff.CreatedAt = now
  202. staff.UpdatedAt = now
  203. _, err := svcCtx.CbStaffModel.Insert(nil, staff)
  204. if err != nil {
  205. logx.Error("Insert cb_servicer fail ", err)
  206. continue
  207. }
  208. }
  209. }
  210. }
  211. }
  212. return
  213. }
  214. // 更新客户信息
  215. func updateCustomerList(svcCtx *svc.ServiceContext, userIds pie.Strings) {
  216. for _, uid := range userIds.Unique() {
  217. if uid == "" {
  218. continue
  219. }
  220. _, err := svcCtx.CbCustomerModel.GetCustomerByExternalUserid(uid)
  221. key := fmt.Sprintf("cb_user_state:%v", uid)
  222. v, _ := svcCtx.Redis.Get(key)
  223. if v == "4" {
  224. continue
  225. }
  226. if err != nil {
  227. //此处因为该接口查询多个用户时,如果其中一个超过24小时不回复,会导致所有信息都不会返回,因此这里单独请求
  228. err, list := svcCtx.WxApi.GetCustomerList([]string{uid})
  229. if err != nil || len(list) == 0 {
  230. logx.Error("GetCustomerList fail ", err)
  231. continue
  232. }
  233. c := &model.CbCustomer{}
  234. c.ExternalUserid = list[0].ExternalUserid
  235. c.Nickname = list[0].Nickname
  236. c.Avatar = list[0].Avatar
  237. c.Gender = int64(list[0].Gender)
  238. now := time.Now()
  239. c.CreatedAt = now
  240. c.UpdatedAt = now
  241. _, err = svcCtx.CbCustomerModel.Insert(nil, c)
  242. if err != nil {
  243. logx.Error("Insert cb_customer fail ", err)
  244. }
  245. }
  246. }
  247. }
  248. // 消息额外处理
  249. func msgTypeTextHandle(svcCtx *svc.ServiceContext, msg *model.CbMsg) {
  250. key := fmt.Sprintf("cb_user_state:%v", msg.ExternalUserid)
  251. v, _ := svcCtx.Redis.Get(key)
  252. if v == "" || v == "4" {
  253. //获取会话状态
  254. err, sessionState := svcCtx.WxApi.GetSessionState(msg.OpenKfid, msg.ExternalUserid)
  255. if err != nil {
  256. logx.Error("GetSessionState fail ", err)
  257. }
  258. _ = svcCtx.Redis.Set(key, fmt.Sprintf("%v", sessionState.ServiceState))
  259. v = fmt.Sprintf("%v", sessionState.ServiceState)
  260. }
  261. if v == "0" || v == "1" {
  262. //进入会话事件,加入消息流转池
  263. err := svcCtx.WxApi.UpdateSessionState(msg.OpenKfid, msg.ExternalUserid, "", 2)
  264. if err != nil {
  265. logx.Error("UpdateSessionState fail ", err)
  266. }
  267. //获取会话状态
  268. err, sessionState := svcCtx.WxApi.GetSessionState(msg.OpenKfid, msg.ExternalUserid)
  269. if err != nil {
  270. logx.Error("GetSessionState fail ", err)
  271. }
  272. _ = svcCtx.Redis.Set(key, fmt.Sprintf("%v", sessionState.ServiceState))
  273. return
  274. }
  275. return
  276. }
  277. // 事件消息额外处理
  278. func msgTypeEventHandle(svcCtx *svc.ServiceContext, msg *model.CbMsg) {
  279. content, err := fastjson.Parse(msg.DataInfo)
  280. if err != nil {
  281. logx.Error("fastjson.Parse(cbMsg.DataInfo) fail ", err)
  282. }
  283. eventType := content.GetStringBytes("event_type")
  284. switch string(eventType) {
  285. case types.EventTypeSessionStatusChange:
  286. key := fmt.Sprintf("cb_user_state:%v", msg.ExternalUserid)
  287. _, _ = svcCtx.Redis.Del(key)
  288. //获取会话状态
  289. err, sessionState := svcCtx.WxApi.GetSessionState(msg.OpenKfid, msg.ExternalUserid)
  290. if err != nil {
  291. logx.Error("GetSessionState fail ", err)
  292. }
  293. _ = svcCtx.Redis.Set(key, fmt.Sprintf("%v", sessionState.ServiceState))
  294. return
  295. case types.EventTypeEnterSession:
  296. }
  297. }