| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- import time
- from typing import Any
- import requests
- from net.task_api import task_api
- from tools.log import logger
- class WyIP:
- def __init__(self):
- self.token = None
- self.local_node_list = []
- self.node_list = []
- self._initialize_node_list()
- self._initialize_local_node_list()
- self.error_count = 0
- def _initialize_node_list(self):
- t = 0
- while True:
- result, _list = self.get_node_list()
- if not result:
- t += 1
- if t == 10:
- task_api.notify('无忧IP取账号节点列表失败')
- logger.error('无忧IP取账号节点列表失败,5秒后重新获取')
- time.sleep(5)
- continue
- else:
- self.node_list = _list
- break
- def _initialize_local_node_list(self):
- t = 0
- while True:
- self.local_node_list = self.get_local_node_list()
- if len(self.local_node_list) < 1:
- t += 1
- if t == 10:
- task_api.notify('无忧IP取本地节点列表失败')
- logger.error('无忧IP取本地节点列表失败,10秒后重新获取')
- time.sleep(10)
- continue
- else:
- break
- def set_wy_token(self):
- try:
- result = requests.get('http://120.92.154.154:8888/loging/getWuYToken',timeout=10)
- if result.status_code == 200:
- data = result.json()
- if data['code'] == 0:
- self.token = data['data']['token']
- return
- raise Exception('获取token失败')
- except Exception as e:
- raise Exception(e)
- def get_node_list(self) -> tuple[bool, str] | tuple[bool, list[Any]]:
- self.set_wy_token()
- if not self.token:
- return False, 'token为空'
- try:
- node_list = []
- for i in range(1, 5):
- header = {
- 'content-type': 'application/x-www-form-urlencoded',
- 'cookie': self.token,
- }
- data = f'rows=500&page={i}&NodeWheres%5BType%5D=&NodeWheres%5BNodeType%5D=all&NodeWheres%5BDeviceType%5D=all&NodeWheres%5BItemNumber%5D=all&NodeWheres%5BRegionState%5D=all&NodeWheres%5BRegionNumber%5D=all&NodeWheres%5BNodeIsUsed%5D=all&NodeWheres%5BGroupNumber%5D=all&NodeWheres%5BNodePrice%5D=all&NodeWheres%5BIsGlobalDynamic%5D=all&TimeDesc='
- result = requests.post('https://user.wuyouip.com/Node/Home/ListUserNodewData', data=data,
- headers=header,timeout=10)
- if result.status_code == 200:
- text = result.text
- if 'DOCTYPE' in text:
- return False, 'get_node_list:登录已过期,请重新登录'
- data = result.json()
- if data['success']:
- node_list.extend(data['rows'])
- if data['total'] <= 500 * i:
- return True, node_list
- return False, '获取账号节点列表失败'
- except Exception as e:
- return False, str(e)
- @staticmethod
- def get_local_node_list() -> list[Any]:
- try:
- result = requests.get('http://127.0.0.1:54321/api/v1/getNodeBindDataList', timeout=10)
- if result.status_code == 200:
- data = result.json()
- if data['message'] == '':
- return data['data']
- return []
- except Exception as e:
- return []
- def switch_node_area(self, node_id: str, price_id: str, region: str) -> tuple[bool, str]:
- header = {
- 'content-type': 'application/x-www-form-urlencoded',
- 'cookie': self.token
- }
- data = f'id={node_id}&pId={price_id}®ions={region}'
- try:
- result = requests.post('https://user.wuyouip.com/Node/Home/SwitchNodeAreaV2', data=data, headers=header, timeout=10)
- if result.status_code == 200:
- text = result.text
- logger.info(f'{node_id}-{region}-{price_id}-切换地区接口返回:{text}')
- if 'DOCTYPE' in text:
- return False, 'switch_node_area:登录已过期,请重新登录'
- data = result.json()
- if data['success']:
- return True, 'switch_node_area:切换成功'
- if data['error']:
- return False, f'switch_node_area:{data["message"]}'
- return False, '切换失败'
- except Exception as e:
- return False, f'switch_node_area:{e}'
- def get_node_id_by_emulator_index(self, emulator_index: int) -> tuple[bool, str] | tuple[
- bool, tuple[Any, Any | None]]:
- node_order = None
- for node in self.local_node_list:
- for order in node['simulatorOrders']:
- if int(order) == emulator_index:
- node_order = node['nodeOrder']
- if not node_order:
- return False, 'get_node_id_by_emulator_index:未找到对应的本地节点'
- for node in self.node_list:
- if node['UserNodeBuyOrder'] == node_order:
- return True, (node['NodeId'], node_order)
- return False, 'get_node_id_by_emulator_index:未找到对应的账号节点'
- def switch_emulator_area(self, emulator_index: int, price_id: str, region: str) -> tuple[bool, str]:
- if not self.token:
- self.error_count += 1
- return False, '未登录'
- if not self.node_list:
- self._initialize_node_list()
- if not self.local_node_list:
- self._initialize_local_node_list()
- result, tup = self.get_node_id_by_emulator_index(emulator_index)
- if not result:
- self.error_count += 1
- if self.error_count == 5:
- task_api.notify(f'模拟器{emulator_index}切换地区失败,请检查')
- return result, tup
- result, message = self.switch_node_area(tup[0], price_id, region)
- if not result:
- self.error_count += 1
- if self.error_count == 5:
- task_api.notify(f'模拟器{emulator_index}切换地区失败,请检查')
- return result, message
- self.error_count = 0
- time.sleep(5)
- result, message = self.update_node(tup[1])
- return True, message
- def queryProcessProxyRegion(self,pid) -> tuple[bool, str]:
- try:
- result = requests.get(f'http://127.0.0.1:54321/api/v1/queryProcessIsProxy?pid={pid}', timeout=10)
- if result.status_code == 200:
- data = result.json()
- if data['result'] == 0:
- return True, data['data']['region']
- return False, f'queryProcessIsProxy 错误:{data["message"]}'
- return False, f'queryProcessIsProxy 错误:{result.status_code}'
- except Exception as e:
- return False, f'queryProcessIsProxy 错误:{str(e)}'
- def update_node(self, node_order) -> tuple[bool, Any]:
- try:
- result = requests.get(f'http://127.0.0.1:54321/api/v1/nodeRequestSwitch?nodeOrder={node_order}', timeout=10)
- if result.status_code == 200:
- data = result.json()
- if data['result'] == 0:
- return True, data['data']
- return False, f'update_node 错误:{data["message"]}'
- return False, f'update_node 错误:{result.status_code}'
- except Exception as e:
- return False, f'update_node 错误:{str(e)}'
|