dxc 2 anni fa
parent
commit
afbea6f126

+ 3 - 0
callback.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"callback/internal/logic/callback"
 	"callback/internal/middleware"
 	"flag"
 	"fmt"
@@ -28,6 +29,8 @@ func main() {
 
 	server.Use(middleware.NewCorsMiddleware().Handle)
 	ctx := svc.NewServiceContext(c)
+	//初始化时同步状态
+	callback.InitCustomerState(ctx)
 	handler.RegisterHandlers(server, ctx)
 
 	fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)

+ 27 - 1
doc/cb_customer.sql

@@ -1,11 +1,37 @@
+/*
+ Navicat Premium Data Transfer
 
+ Source Server         : 10.8.230.17
+ Source Server Type    : MySQL
+ Source Server Version : 50635
+ Source Host           : 10.8.230.17:3308
+ Source Schema         : wechat_cs_online
+
+ Target Server Type    : MySQL
+ Target Server Version : 50635
+ File Encoding         : 65001
+
+ Date: 02/04/2024 09:25:30
+*/
+
+SET NAMES utf8mb4;
+SET FOREIGN_KEY_CHECKS = 0;
+
+-- ----------------------------
+-- Table structure for cb_customer
+-- ----------------------------
+DROP TABLE IF EXISTS `cb_customer`;
 CREATE TABLE `cb_customer`  (
   `id` bigint(20) NOT NULL AUTO_INCREMENT,
+  `open_kfid` varchar(40) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '客服ID',
   `external_userid` varchar(40) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '微信客户的external_userid',
   `nickname` varchar(40) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '昵称',
   `avatar` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '图片',
   `gender` tinyint(4) NOT NULL COMMENT '性别',
+  `service_state` tinyint(4) NOT NULL DEFAULT -1 COMMENT '会话状态:0未处理,1由智能助手接待,2待接入池排队中,3由人工接待,4已结束/未开始',
   `created_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
   `updated_at` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
   PRIMARY KEY (`id`) USING BTREE
-) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '客户信息表' ROW_FORMAT = Compact;
+) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '客户信息表' ROW_FORMAT = Compact;
+
+SET FOREIGN_KEY_CHECKS = 1;

+ 3 - 0
doc/command.sh

@@ -0,0 +1,3 @@
+goctl api go --api .\api\callback.api --dir .
+
+goctl model mysql ddl --src ./doc/cb_customer.sql --dir .\model\

+ 90 - 41
internal/logic/callback/callbackmsglogic.go

@@ -7,7 +7,6 @@ import (
 	"encoding/json"
 	"encoding/xml"
 	"fmt"
-	"github.com/elliotchance/pie/pie"
 	"github.com/valyala/fastjson"
 	"io/ioutil"
 	"net/http"
@@ -111,9 +110,8 @@ func KfMsgOrEventHandle(svcCtx *svc.ServiceContext, ct types.EventContent) {
 		}
 		_ = svcCtx.CbServiceModel.UpdateCursorByOpenKfid(ct.OpenKfId, cursor)
 		//保存消息到数据库
-		var userIds []string
 		for _, msg := range list {
-			userIds = append(userIds, msg.ExternalUserid)
+			//保存消息
 			cbMsg := &model.CbMsg{}
 			cbMsg.Msgid = msg.Msgid
 			cbMsg.OpenKfid = msg.OpenKfid
@@ -139,7 +137,6 @@ func KfMsgOrEventHandle(svcCtx *svc.ServiceContext, ct types.EventContent) {
 				msgTypeEventHandle(svcCtx, cbMsg)
 			}
 		}
-		go updateCustomerList(svcCtx, userIds)
 		if more == 0 {
 			break
 		}
@@ -155,10 +152,13 @@ func KfAccountAuthChangeHandle(svcCtx *svc.ServiceContext, ct types.EventContent
 		return
 	}
 	for _, s := range list {
-		cbService, err := svcCtx.CbServiceModel.GetServiceByOpenKfid(ct.OpenKfId)
+		if s.OpenKfid != ct.OpenKfId {
+			continue
+		}
+		cbService, err := svcCtx.CbServiceModel.GetServiceByOpenKfid(s.OpenKfid)
 		//保存客服账号
 		if err != nil {
-			cbService.NextCursor, _ = svcCtx.Redis.Get(fmt.Sprintf("cb_cursor:%v", ct.OpenKfId))
+			cbService.NextCursor, _ = svcCtx.Redis.Get(fmt.Sprintf("cb_cursor:%v", s.OpenKfid))
 			cbService.Corpid = svcCtx.Config.Wxwork.Corpid
 			cbService.OpenKfid = s.OpenKfid
 			cbService.Name = s.Name
@@ -221,51 +221,50 @@ func KfAccountAuthChangeHandle(svcCtx *svc.ServiceContext, ct types.EventContent
 	return
 }
 
-// 更新客户信息
-func updateCustomerList(svcCtx *svc.ServiceContext, userIds pie.Strings) {
-	for _, uid := range userIds.Unique() {
-		if uid == "" {
-			continue
-		}
-		_, err := svcCtx.CbCustomerModel.GetCustomerByExternalUserid(uid)
-		key := fmt.Sprintf("cb_user_state:%v", uid)
-		v, _ := svcCtx.Redis.Get(key)
-		if v == "4" {
-			continue
-		}
-		if err != nil {
+// 消息额外处理
+func msgTypeTextHandle(svcCtx *svc.ServiceContext, msg *model.CbMsg) {
+	key := fmt.Sprintf("cb_user_state:%v", msg.ExternalUserid)
+	v, err := svcCtx.Redis.Get(key)
+	if err != nil || v == "" {
+		//新增用户
+		c, err := svcCtx.CbCustomerModel.GetCustomerByExternalUserid(msg.OpenKfid, msg.ExternalUserid)
+		if err != nil || c.Id == 0 {
 			//此处因为该接口查询多个用户时,如果其中一个超过24小时不回复,会导致所有信息都不会返回,因此这里单独请求
-			err, list := svcCtx.WxApi.GetCustomerList([]string{uid})
-			if err != nil || len(list) == 0 {
+			err, clist := svcCtx.WxApi.GetCustomerList([]string{msg.ExternalUserid})
+			if err != nil || len(clist) == 0 {
 				logx.Error("GetCustomerList fail ", err)
-				continue
+				return
 			}
-			c := &model.CbCustomer{}
-			c.ExternalUserid = list[0].ExternalUserid
-			c.Nickname = list[0].Nickname
-			c.Avatar = list[0].Avatar
-			c.Gender = int64(list[0].Gender)
-			now := time.Now()
-			c.CreatedAt = now
-			c.UpdatedAt = now
+			c.ServiceState = -1
+			c.OpenKfid = msg.OpenKfid
+			c.ExternalUserid = clist[0].ExternalUserid
+			c.Nickname = clist[0].Nickname
+			c.Avatar = clist[0].Avatar
+			c.Gender = int64(clist[0].Gender)
+			cnow := time.Now()
+			c.CreatedAt = cnow
+			c.UpdatedAt = cnow
 			_, err = svcCtx.CbCustomerModel.Insert(nil, c)
 			if err != nil {
 				logx.Error("Insert cb_customer fail ", err)
+				return
+			} else {
+				_ = svcCtx.Redis.Set(key, fmt.Sprintf("%v", c.ServiceState))
+				v = fmt.Sprintf("%v", c.ServiceState)
 			}
-
 		}
 	}
-}
-
-// 消息额外处理
-func msgTypeTextHandle(svcCtx *svc.ServiceContext, msg *model.CbMsg) {
-	key := fmt.Sprintf("cb_user_state:%v", msg.ExternalUserid)
-	v, _ := svcCtx.Redis.Get(key)
-	if v == "" || v == "4" {
+	if v == "-1" || v == "4" {
 		//获取会话状态
 		err, sessionState := svcCtx.WxApi.GetSessionState(msg.OpenKfid, msg.ExternalUserid)
 		if err != nil {
 			logx.Error("GetSessionState fail ", err)
+			return
+		}
+		err = svcCtx.CbCustomerModel.UpdateServiceState(msg.OpenKfid, msg.ExternalUserid, sessionState.ServiceState)
+		if err != nil {
+			logx.Error("UpdateServiceState fail ", err)
+			return
 		}
 		_ = svcCtx.Redis.Set(key, fmt.Sprintf("%v", sessionState.ServiceState))
 		v = fmt.Sprintf("%v", sessionState.ServiceState)
@@ -280,9 +279,15 @@ func msgTypeTextHandle(svcCtx *svc.ServiceContext, msg *model.CbMsg) {
 		err, sessionState := svcCtx.WxApi.GetSessionState(msg.OpenKfid, msg.ExternalUserid)
 		if err != nil {
 			logx.Error("GetSessionState fail ", err)
+			return
+		}
+		err = svcCtx.CbCustomerModel.UpdateServiceState(msg.OpenKfid, msg.ExternalUserid, sessionState.ServiceState)
+		if err != nil {
+			logx.Error("UpdateServiceState fail ", err)
+			return
 		}
 		_ = svcCtx.Redis.Set(key, fmt.Sprintf("%v", sessionState.ServiceState))
-		return
+		v = fmt.Sprintf("%v", sessionState.ServiceState)
 	}
 	return
 }
@@ -297,14 +302,58 @@ func msgTypeEventHandle(svcCtx *svc.ServiceContext, msg *model.CbMsg) {
 	switch string(eventType) {
 	case types.EventTypeSessionStatusChange:
 		key := fmt.Sprintf("cb_user_state:%v", msg.ExternalUserid)
-		_, _ = svcCtx.Redis.Del(key)
 		//获取会话状态
 		err, sessionState := svcCtx.WxApi.GetSessionState(msg.OpenKfid, msg.ExternalUserid)
 		if err != nil {
 			logx.Error("GetSessionState fail ", err)
+			return
+		}
+		err = svcCtx.CbCustomerModel.UpdateServiceState(msg.OpenKfid, msg.ExternalUserid, sessionState.ServiceState)
+		if err != nil {
+			logx.Error("UpdateServiceState fail ", err)
+			return
 		}
 		_ = svcCtx.Redis.Set(key, fmt.Sprintf("%v", sessionState.ServiceState))
-		return
 	case types.EventTypeEnterSession:
 	}
 }
+
+// 同步状态
+func InitCustomerState(svcCtx *svc.ServiceContext) {
+	page := 1
+	size := 1000
+	for {
+		all, _ := svcCtx.CbCustomerModel.GetCustomerByPage(page, size)
+		if len(all) == 0 {
+			return
+		}
+		for _, customer := range all {
+			key := fmt.Sprintf("cb_user_state:%v", customer.ExternalUserid)
+			v, _ := svcCtx.Redis.Get(key)
+			//状态不一致则同步
+			if v != fmt.Sprintf("%v", customer.ServiceState) {
+				//获取会话状态
+				err, sessionState := svcCtx.WxApi.GetSessionState(customer.OpenKfid, customer.ExternalUserid)
+				if err != nil {
+					logx.Error("GetSessionState fail ", err)
+					continue
+				}
+				if sessionState.ServiceState == 0 {
+					//进入会话事件,加入消息流转池
+					err := svcCtx.WxApi.UpdateSessionState(customer.OpenKfid, customer.ExternalUserid, "", 2)
+					if err != nil {
+						logx.Error("UpdateSessionState fail ", err)
+					}
+					sessionState.ServiceState = 2
+				}
+				err = svcCtx.CbCustomerModel.UpdateServiceState(customer.OpenKfid, customer.ExternalUserid, sessionState.ServiceState)
+				if err != nil {
+					logx.Error("UpdateServiceState fail ", err)
+					continue
+				}
+				_ = svcCtx.Redis.Set(key, fmt.Sprintf("%v", sessionState.ServiceState))
+			}
+		}
+		page++
+	}
+}

+ 18 - 4
model/cbcustomermodel.go

@@ -12,7 +12,9 @@ type (
 	// and implement the added methods in customCbCustomerModel.
 	CbCustomerModel interface {
 		cbCustomerModel
-		GetCustomerByExternalUserid(externalUserid string) (d *CbCustomer, err error)
+		GetCustomerByExternalUserid(openKfid, externalUserid string) (d *CbCustomer, err error)
+		GetCustomerByPage(page, size int) (list []CbCustomer, err error)
+		UpdateServiceState(openKfid, externalUserid string, state int) (err error)
 	}
 
 	customCbCustomerModel struct {
@@ -27,10 +29,22 @@ func NewCbCustomerModel(conn sqlx.SqlConn) CbCustomerModel {
 	}
 }
 
-func (m *customCbCustomerModel) GetCustomerByExternalUserid(externalUserid string) (d *CbCustomer, err error) {
-	query := fmt.Sprintf("select * from %s where `external_userid` = ? limit 1", m.table)
+func (m *customCbCustomerModel) GetCustomerByExternalUserid(openKfid, externalUserid string) (d *CbCustomer, err error) {
+	query := fmt.Sprintf("select * from %s where `open_kfid` = ? AND `external_userid` = ? limit 1", m.table)
 	var resp CbCustomer
-	err = m.conn.QueryRow(&resp, query, externalUserid)
+	err = m.conn.QueryRow(&resp, query, openKfid, externalUserid)
 	d = &resp
 	return
 }
+
+func (m *customCbCustomerModel) UpdateServiceState(openKfid, externalUserid string, state int) (err error) {
+	query := fmt.Sprintf("update %s set `service_state` = ? where `open_kfid` = ? AND `external_userid` = ?", m.table)
+	_, err = m.conn.Exec(query, state, openKfid, externalUserid)
+	return err
+}
+
+func (m *customCbCustomerModel) GetCustomerByPage(page, size int) (list []CbCustomer, err error) {
+	query := fmt.Sprintf("select * from %s where service_state != 4 limit ?,?", m.table)
+	err = m.conn.QueryRows(&list, query, (page-1)*size, size)
+	return
+}

+ 5 - 3
model/cbcustomermodel_gen.go

@@ -37,10 +37,12 @@ type (
 
 	CbCustomer struct {
 		Id             int64     `db:"id"`
+		OpenKfid       string    `db:"open_kfid"`       // 客服ID
 		ExternalUserid string    `db:"external_userid"` // 微信客户的external_userid
 		Nickname       string    `db:"nickname"`        // 昵称
 		Avatar         string    `db:"avatar"`          // 图片
 		Gender         int64     `db:"gender"`          // 性别
+		ServiceState   int64     `db:"service_state"`   // 会话状态:0未处理,1由智能助手接待,2待接入池排队中,3由人工接待,4已结束/未开始
 		CreatedAt      time.Time `db:"created_at"`
 		UpdatedAt      time.Time `db:"updated_at"`
 	}
@@ -81,14 +83,14 @@ func (m *defaultCbCustomerModel) FindOne(ctx context.Context, id int64) (*CbCust
 }
 
 func (m *defaultCbCustomerModel) Insert(ctx context.Context, data *CbCustomer) (sql.Result, error) {
-	query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?)", m.table, cbCustomerRowsExpectAutoSet)
-	ret, err := m.conn.ExecCtx(ctx, query, data.ExternalUserid, data.Nickname, data.Avatar, data.Gender)
+	query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?)", m.table, cbCustomerRowsExpectAutoSet)
+	ret, err := m.conn.ExecCtx(ctx, query, data.OpenKfid, data.ExternalUserid, data.Nickname, data.Avatar, data.Gender, data.ServiceState)
 	return ret, err
 }
 
 func (m *defaultCbCustomerModel) Update(ctx context.Context, data *CbCustomer) error {
 	query := fmt.Sprintf("update %s set %s where `id` = ?", m.table, cbCustomerRowsWithPlaceHolder)
-	_, err := m.conn.ExecCtx(ctx, query, data.ExternalUserid, data.Nickname, data.Avatar, data.Gender, data.Id)
+	_, err := m.conn.ExecCtx(ctx, query, data.OpenKfid, data.ExternalUserid, data.Nickname, data.Avatar, data.Gender, data.ServiceState, data.Id)
 	return err
 }