每天必须在websocket端进入房间才能领取当日荧光棒,这基本上等价于斗鱼发送弹幕。
前言
- 搜了一下,斗鱼弹幕发送的python实现全部都是基于selenium,就很离谱。。。
- 参考了https://hub.fastgit.org/ligb888/ligb-douyu
douyu_danmu.py
from collections import OrderedDict
import random
import re, time, hashlib, json, datetime
from struct import pack
class DouyuHelper:
@staticmethod
def generateDyMsg(data: str) -> bytes:
data_bytes = data.encode('utf-8')
buffer = pack('i', 9 + len(data_bytes)) * 2
buffer += b'\xb1\x02\x00\x00' # 689
buffer += data_bytes + b'\x00'
return buffer
@staticmethod
def encodeMsg(params: OrderedDict) -> str:
buffer = ''
for (key, value) in params.items():
buffer += key.replace("@", "@A").replace("/", "@S")
buffer += "@="
buffer += value.replace("@", "@A").replace("/", "@S")
buffer += "/"
buffer += '\0'
return buffer
@staticmethod
def decodeMsg(data: bytes) -> []:
msgs = []
for msg in re.findall(b'(type@=.*?)\x00', data):
msgs.append(str(msg, encoding = "utf-8"))
return msgs
@staticmethod
def getLoginData(roomId: str, cookies: str ) -> bytes:
"""
发送弹幕: 登录
"""
uuid = 'b' + ''.join(random.choices("0123456789abcdefghijklmnopqrstuvwxyz",k=31))
timestamp = str(int(time.time()))
_raw = timestamp + "r5*^5;}2#${XF[h+;'./.Q'1;,-]f'p[" + uuid
vk = hashlib.md5(bytes(_raw, encoding="utf8")).hexdigest()
params = OrderedDict()
params["type"] = "loginreq"
params["password"] = ""
params["roomid"] = roomId
jsonArray = json.loads(cookies)
for obj in jsonArray:
name = obj.get("name")
value = obj.get("value")
if name =="acf_username":
params["username"] = value
elif name == "acf_ltkid":
params["ltkid"] = value
elif name == "acf_biz":
params["biz"] = value
elif name == "acf_stk":
params["stk"] = value
elif name == "acf_ct":
params["ct"] = value
params["devid"] = uuid
params["rt"] = timestamp
params["pt"] = "2"
params["vk"] = vk
params["ver"] = "20180222"
params["aver"] = "219032101"
params["dmbt"] = "mobile safari"
params["dmbv"] = "11"
return DouyuHelper.generateDyMsg(DouyuHelper.encodeMsg(params))
@staticmethod
def getH5ckreq(roomId: str, type: str = "h5ckreq") -> bytes:
"""
发送弹幕: 加入房间
"""
params = OrderedDict()
params["type"] = type
params["rid"] = roomId
params["ti"] = "2501" + datetime.datetime.now().strftime('%Y%m%d')
return DouyuHelper.generateDyMsg(DouyuHelper.encodeMsg(params))
@staticmethod
def getMsgData(msg: str) -> bytes:
"""
发送弹幕: 发送具体弹幕
"""
params = OrderedDict()
msg = msg.replace("@", "@A").replace("/", "@S")
params["content"] = msg
params["col"] = "0"
params["type"] = "chatmessage"
#params["sender"] = "4892184"
params["ifs"] = "0"
params["nc"] = "0"
params["dat"] = "0"
params["rev"] = "0"
params["admzq"] = "0"
params["cst"] = str(int(time.time()*1000))
params["dmt"] = "3"
return DouyuHelper.generateDyMsg(DouyuHelper.encodeMsg(params))
@staticmethod
def getSendKeepAliveData() -> bytes:
"""
发送弹幕: 心跳包
"""
params = OrderedDict()
params["type"] = "keeplive"
params["tick"] = str(int( time.time() ))
params["vbw"] = "0"
params["cdn"] = ""
params["kd"] = 'b' + ''.join(random.choices("0123456789abcdefghijklmnopqrstuvwxyz",k=31))
return DouyuHelper.generateDyMsg(DouyuHelper.encodeMsg(params))
@staticmethod
def getReceiveKeepAliveData() -> bytes:
"""
接收弹幕: 心跳包
"""
params = OrderedDict()
params["type"] = "mrkl"
return DouyuHelper.generateDyMsg(DouyuHelper.encodeMsg(params))
@staticmethod
def getDanmuLoginData(roomId: str) -> bytes:
"""
接收弹幕: 登录
"""
params = OrderedDict()
params["type"] = "loginreq"
params["roomid"] = roomId
params["dfl"] = "sn@AA=105@ASss@AA=1"
params["username"] = "visitor" + ''.join(random.choices("0123456789",k=7))
params["uid"] = "11" + ''.join(random.choices("0123456789",k=8))
params["ver"] = "20190610"
params["aver"] = "218101901"
params["ct"] = "0"
return DouyuHelper.generateDyMsg(DouyuHelper.encodeMsg(params))
@staticmethod
def getJoinGroupData(roomId: str, groupId: str = "-9999") -> bytes:
"""
接收弹幕: 加入房间
"""
params = OrderedDict()
params["type"] = "joingroup"
params["rid"] = roomId
params["gid"] = groupId
return DouyuHelper.generateDyMsg(DouyuHelper.encodeMsg(params))
douyu_dm_sender.py
import websocket
from collections import OrderedDict
from douyu_danmu import DouyuHelper
wsSend = None
isLogin = False
def start_send_msg(room_id: str, cookies: str, headers: dict = None, trace: bool = False):
def on_open(ws):
print("发送登录数据")
global isLogin
isLogin = False
print("发送登录数据2")
try:
login_msg = DouyuHelper.getLoginData(room_id, cookies)
#print(login_msg)
ws.send(login_msg)
except Exception as e:
print(e)
def on_message(ws, message):
global isLogin
#print("接收到数据")
#print(message)
msgs = DouyuHelper.decodeMsg(message)
for msg in msgs:
#print(msg)
#print(isLogin)
if msg.startswith("type@=loginres"):
isLogin = True
join_group_msg = DouyuHelper.getH5ckreq(room_id)
print("发送进入房间数据")
ws.send(join_group_msg)
print("发送弹幕666")
danmu = DouyuHelper.getMsgData("哈")
ws.send(danmu)
ws.close()
def on_error(ws, error):
print(error)
def on_close(ws):
print("### websocket closed ###")
websocket.enableTrace(trace)
global wsSend
wsSend = websocket.WebSocketApp("wss://wsproxy.douyu.com:6672",
header = headers,
on_open = on_open,
on_message = on_message,
on_error = on_error,
on_close = on_close)
wsSend.run_forever()
def run_as_service(room_id: str, cookies: str):
start_send_msg(room_id, cookies)
def run_as_client(room_id: str, cookies: str, chat_msg:str = None):
remain = 50
ws = websocket.WebSocket()
result = {"err": -1, "msg": "未能在规定时间内等到恰当的回复"}
try:
#print('ws.connect') # 连接服务器
ws.connect("wss://wsproxy.douyu.com:6672", timeout = 10)
# print('ws.send(login_msg)') # 登录
login_msg = DouyuHelper.getLoginData(room_id, cookies)
ws.send(login_msg)
data = ws.recv()
while data and remain > 0:
msgs = DouyuHelper.decodeMsg(data)
remain -= 1
for msg in msgs:
#print(msg)
if msg.startswith("type@=loginres"):
#print(msg)
if "roomgroup@=1" in msg:
join_group_msg = DouyuHelper.getH5ckreq(room_id)
ws.send(join_group_msg)
else:
result = {"err": -2, "msg": "cookie鉴权失败"}
ws.close()
elif msg.startswith("type@=h5ckres"):
#print(msg)
if chat_msg:
danmu = DouyuHelper.getMsgData(chat_msg)
ws.send(danmu)
else:
result = {"err": 0, "msg": "进入房间%s成功"%room_id}
ws.close()
elif msg.startswith("type@=chatres"):
# print(msg)
result = {"err": 0, "msg": "消息发送成功"}
ws.close()
pass
data = ws.recv()
except Exception as e:
#print(e)
pass
ws.close()
return result
if __name__ == '__main__':
room_id = "262537"
# 主要是从cookies里获取 acf_username, acf_ltkid, acf_biz, acf_stk, acf_ct
# 请自行将cookie转化成以下字符串格式
cookies = '[{"name": "dy_auth", "value": "xxx"}, {"name": "dy_did", "value": "xxx"}, ...]'
result = run_as_client(room_id, cookies, chat_msg = "6")
print(result)
douyu_dm_reciever.py
import websocket
from collections import OrderedDict
from douyu_danmu import DouyuHelper
wsReceive = None
def start_receive_msg(room_id: str, headers: dict = None, trace: bool = False):
def on_open(ws):
login_msg = DouyuHelper.getDanmuLoginData(room_id)
print("发送登录数据")
#print(login_msg)
ws.send(login_msg)
def on_message(ws, message):
#print("接收到数据")
#print(message)
msgs = DouyuHelper.decodeMsg(message)
for msg in msgs:
if msg.startswith("type@=loginres"):
join_group_msg = DouyuHelper.getJoinGroupData(room_id, "-9999")
print("发送进入房间数据")
ws.send(join_group_msg)
elif "224961119" in msg:
print(msg)
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
websocket.enableTrace(trace)
global wsReceive
wsReceive = websocket.WebSocketApp("wss://danmuproxy.douyu.com:8501",
header = headers,
on_open = on_open,
on_message = on_message,
on_error = on_error,
on_close = on_close)
wsReceive.run_forever()
room_id = "262537"
start_receive_msg(room_id, trace = True)