大一的时候看到果果自己写了一个qq机器人,就想着什么时候自己也实现一个,刚好因为疫情的原因在宿舍封了两周,就动手实现一下自己久远的小愿望。

开发环境配置

系统: Windows

框架: go-cqhttp

语言: python3.8

编译器: Visual Studio Code

go-cqhttp 框架下载和配置

下载与配置

qq机器人的框架是基于 mirai 以及 MiraiGo 开发的 go-cqhttp

到 Github上下载即可。下载链接

选择 go-cqhttp_windows_amd64.zip,下载并解压。

点击exe文件,出现小黑框后一路确定就行。

会生成一个 .bat批处理文件,点击它,在小黑框中选择 0:HTTP通信。

得到 comfig.yml 配置文件,打开并去掉最后两行前的#。

配置完成后,点击 go-cqhttp.bat 批处理文件就可以扫码登录机器人的qq号了。

简介

这个框架就是一个实现收发消息的工具,相当于前后端之间的桥梁。

剩下的部分就是用python写出我们想要的逻辑,也就是后端。

python实现功能

为了方便程序的再开发和管理,将后端分层。

底层: 包括收发消息,从数据文件中写入或读出数据。

字符串处理层: 处理传入的字符串,返回要发送的字符串。

功能层: 实现一些特别的功能,如定时发送消息功能等。

服务层: 字符串处理层的上一层,将不同特点的字符串进行不同的处理。

domain层:实现一个msg_talker()类,将服务封装成它的方法,各种数据封装为它的属性。

main函数:一个无限循环,用于最后的运行。

底层

消息发送

收发消息是最基本的功能,也是其他功能实现的基础,将其部署在最底层,封装成函数供服务调用。

分为私聊群聊

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import requests

def send_message(msg,qq_id,qq_type):
if qq_type == "private":
data = {
'user_id':qq_id,
'message':msg,
'auto_escape':False
}
cq_url = "http://127.0.0.1:5700/send_private_msg"
rev = requests.post(cq_url,data=data)
elif qq_type == "group":
data = {
'group_id':qq_id,
'message':msg,
'auto_escape':False
}
cq_url = "http://127.0.0.1:5700/send_group_msg"
rev = requests.post(cq_url,data=data)
else:
return False
if rev.json()['status'] == 'ok':
return True
return False

消息获取

监听端口,获取收到的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import socket
import json

ListenSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ListenSocket.bind(('localhost', 5701))
ListenSocket.listen(100)

HttpResponseHeader = '''HTTP/1.1 200 OK
Content-Type: text/html\r\n\r\n
'''

def request_to_json(msg):
for i in range(len(msg)):
if msg[i]=="{" and msg[-1]=="\n":
return json.loads(msg[i:])
return None

#需要循环执行,返回值为json格式
def rev_msg():# json or None
Client, Address = ListenSocket.accept()
Request = Client.recv(1024).decode(encoding='utf-8')
#print(Request)
rev_json=request_to_json(Request)
Client.sendall((HttpResponseHeader).encode(encoding='utf-8'))
Client.close()
return rev_json

数据读出

读出放在data文件夹中的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import os
import json
def read_file():
data = []
for count ,line in enumerate(open("./data/talk_data/words", 'r', encoding='UTF-8')):
temp = line.strip().split("|")
temp = [temp[0],temp[1].split("/")[:-1]]
data.append(temp)
return data
def read_one():
data = []
for count ,line in enumerate(open("./data/talk_data/oneword", 'r', encoding='UTF-8')):
temp = line.strip().split("|")
temp = [temp[0],temp[1].split("/")[:-1]]
data.append(temp)
return data
def read_imglist():
img_list=[]
for count,line in enumerate(open("./data/imglist", 'r', encoding='UTF-8')):
temp = line.strip()
img_list.append(temp)
return img_list

字符串处理层

本地实现的字符串处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import requests
import json
import os
import re
from random import choice
from data.load_data import read_one
from data.load_data import read_imglist
group = json.load(open("./config.json", encoding='utf-8'))["group"]
apikey= json.load(open("./config.json", encoding='utf-8'))["apikey"]
ban_words = json.load(open("./config.json", encoding='utf-8'))["ban_words"]
mao_path = json.load(open("./config.json", encoding='utf-8'))["mao_path"]
rr_path = json.load(open("./config.json", encoding='utf-8'))["rr_path"]
key_words = json.load(open("./config.json", encoding='utf-8'))["key_words"]

help_base = "这里是帮助菜单:\n"
help_base += "1.发送猫猫图,返回一张猫猫表情包\n"
help_base += "2.发送若若爆照,可以看到我的自拍\n"
help_base += "3.发送城市+天气,获取城市天气\n"
help_base += "4.私聊调教对话 例如aaa+bbb \n"
help_base += "那么发送aaa就会返回bbb啦~\n"
help_base += "可以发送rmaaa+bbb删除对话哦~\n"
help_base += "快来和若若聊天吧~\n"

def help_menu(msg):
if msg == "help":
return [True,help_base]
return [False]

def add_data(msg,all_data):
one=False
if "+" in msg:
return add_sxt_data(msg,all_data)
if msg.count("+") != 1:
return [False]
if "/" in msg or "|" in msg:
return [True,"不能含有/或|呀~"]
if msg.split("+")[1]=="":
return [False]
msg = msg.split("+")
if len(msg[0])< 2:
one=True
all_data=read_one()
for row in all_data:
if msg[0] == row[0]:
if msg[1] in row[1]:
return [True,"oh,这句话我已经会啦,不用再教我啦~"]
row[1].append(msg[1])
save_data(all_data,one)
return [True,"添加成功!"]
all_data.append([msg[0], [msg[1]]])
save_data(all_data,one)
return [True,"添加成功!"]

def save_data(all_data,one):
if one:
f=open("./data/talk_data/oneword","w",encoding='UTF-8')
else:
f = open("./data/talk_data/words","w",encoding='UTF-8')
for row in all_data:
temp = row[0]+"|"+"".join([i+"/" for i in row[1]])
f.writelines(temp+"\n")
f.close()

def del_data(del_data,all_data):
one=False
if del_data[:2] != "rm":
return [False]
if "+" in del_data:
msg = del_data[2:].split("+")
else:
msg = del_data[2:].split("+")
if len(msg[0])<2:
one=True
all_data=read_one()

for i in range(len(all_data)):
if msg[0] == all_data[i][0]:
if len(all_data[i][1]) == 1:
all_data.pop(i)
save_data(all_data,one)
return [True,"已经删除啦~"]
all_data[i][1].remove(msg[1])
save_data(all_data,one)
return [True,"已经删除啦~"]
return [True,"删除出错啦~"]

def mao_pic(msg):
if msg in ["来张猫猫图", "来张猫图", "猫图", "喵图", "maomao","猫猫图","猫","喵","猫猫"]:
setu_list = os.listdir(mao_path)
local_img_url = "[CQ:image,file=file://"+mao_path+choice(setu_list)+"]"
return [True, local_img_url]
return [False]

def rr_pic(msg):
if "若若爆照" in msg:
setu_list = os.listdir(rr_path)
local_img_url = "[CQ:image,type=flash, file=file://"+rr_path+choice(setu_list)+"]"
return [True, local_img_url]
return [False]

def detect_ban(msg,user_id,group_id):
if group_id not in group:
return [False]

for i in ban_words:
s = re.search(i,msg)
if not s == None:
data = {
'user_id':user_id,
'group_id':group_id,
'duration':60
}
cq_url = "http://127.0.0.1:5700/set_group_ban"
requests.post(cq_url,data=data)
return [True,"不要说不该说的话啦~"]
return [False]


基于网络 api 接口的字符串处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import urllib
from urllib.parse import quote
import string
import requests
def check_with_api(msg):
url = 'http://api.qingyunke.com/api.php?key=free&appid=0&msg=' + msg
s = quote(url, safe=string.printable)
with urllib.request.urlopen(s) as response:
html = response.read()
# 将获取到的响应内容进行解码,并将json字符串内容转换为python字典格式
# 通过下标取到机器人回复的内容
with urllib.request.urlopen(s) as response:
html = response.read()
# 将获取到的响应内容进行解码,并将json字符串内容转换为python字典格式
# 通过下标取到机器人回复的内容
msg=eval(html.decode("utf-8"))["content"].replace('{br}', '\n')
msg=msg.replace('菲菲', '若若')
msg=msg.replace('未获取到相关信息','找若若有什么事吗?')
return msg

服务层

呈现所有对于私聊与群聊的服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from send_message.word_detect import *
from random import choice
from data.talk_data.base_talk import others_answer
from random import randint
from data.load_data import read_one
from func.api import check_with_api

def match(msg,talk_data):
if (len(msg)==1):
talk_data=read_one()
return match_oneword(msg,talk_data)
for row in talk_data:
if row[0] in msg:
x=randint(0,len(row[1])-1)
return [True,row[1][x]]
return [False,check_with_api(msg)]

def match_oneword(msg,talk_data):
for row in talk_data:
if msg in row[0]:
x=randint(0,len(row[1])-1)
return [True,row[1][x]]
return [False,choice(others_answer["no_answer"])]


def talk_to_user(rev,talk_data):#这里可以DIY对私聊和群聊中@白若的操作
msg=rev["raw_message"]
#--------------------------------------------------------------------------------------帮助页面
if_help = help_menu(msg)
if if_help[0] == True:
return if_help[1]
#--------------------------------------------------------------------------------------删除数据
if_del = del_data(msg,talk_data)
if if_del[0] == True:
return if_del[1]
#--------------------------------------------------------------------------------------添加数据
if_add = add_data(msg,talk_data)
if if_add[0] == True:
return if_add[1]
#--------------------------------------------------------------------------------------发送自拍
if_setu = rr_pic(msg)
if if_setu[0] == True:
return if_setu[1]
#--------------------------------------------------------------------------------------发送猫猫图
if_setu = mao_pic(msg)
if if_setu[0] == True:
return if_setu[1]

return match(msg,talk_data)[1]

def talk_to_gourp(rev,talk_data):#这里可以DIY对群聊的操作
msg=rev["raw_message"]
user_id=rev["user_id"]
group_id=rev["group_id"]
#--------------------------------------------------------------------------------------检测关键字禁言
if_ban = detect_ban(msg,user_id,group_id)
if if_ban[0] == True:
return if_ban[1]
#--------------------------------------------------------------------------------------发送猫猫图
if_setu = mao_pic(msg)
if if_setu[0] == True:
return if_setu[1]
#--------------------------------------------------------------------------------------发送自拍
if_setu = rr_pic(msg)
if if_setu[0] == True:
return if_setu[1]

if match(msg,talk_data)[0]==True:
return match(msg,talk_data)[1]

return ""

domain层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import json
from data.load_data import read_file
from send_message.send_message import send_message
from send_message.talk_to_user import *
from random import randint
self_qq = json.load(open("./config.json", encoding='utf-8'))["self_qq"]
ban_words = json.load(open("./config.json", encoding='utf-8'))["ban_words"]
class msg_talker():
def __init__(self):
self.talk_data = read_file()

def private_msg(self,rev):
if rev["sub_type"] != "friend":
return send_message('你还不是我的好友呀',rev['user_id'],"private")
return send_message(talk_to_user(rev, self.talk_data), rev["user_id"], "private")

def group_msg(self,rev):
if "[CQ:at,qq={}]".format(self_qq) in rev["raw_message"]:
try:
rev['raw_message']=rev['raw_message'].split(" ")[1]
except:
pass
return send_message(talk_to_user(rev, self.talk_data), rev["group_id"], "group")

# if randint(1,10)<4 or rev['raw_message'] in ban_words or rev['raw_message'] in key_words:
return send_message(talk_to_gourp(rev, self.talk_data), rev["group_id"], "group")

#return True

功能层

实现一些其它功能,例如群聊定时发送消息。

1
2
3
4
5
6
7
from send_message.send_message import send_message
import datetime

def time_send(h,m,s,msg,qq):
now=datetime.datetime.now()
if(now.hour==h and now.minute==m and now.second==s):
send_message(msg,qq,"group")

main

最后只要运行main函数就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from receive import rev_msg
from send_message.send_message import send_message
from massage_flide import msg_talker
import json
from func import time

group = json.load(open("./config.json", encoding='utf-8'))["group"]

talker = msg_talker()
print("start")

while True:
try:
rev = rev_msg()
if rev == None:
continue
except:
continue

#定时模块#
time.time_send(23,40,0,"各位晚安啦,早睡早起身体好[CQ:face,id=75]",qq群号)

#收发消息模块
if rev["post_type"] == "message":
print(rev) #需要功能自己DIY
if rev["message_type"] == "private": #私聊
talker.private_msg(rev)
elif rev["message_type"] == "group": #群聊
group_id=rev["group_id"]
if group_id in group:
talker.group_msg(rev)
else:
continue
elif rev["post_type"] == "notice":
if rev["notice_type"] == "group_upload": # 有人上传群文件
continue
elif rev["notice_type"] == "group_decrease": # 群成员减少
continue
elif rev["notice_type"] == "group_increase": # 群成员增加
continue
else:
continue
elif rev["post_type"] == "request":
if rev["request_type"] == "friend": # 添加好友请求
pass
if rev["request_type"] == "group": # 加群请求
pass
else: # rev["post_type"]=="meta_event":
continue

写在最后

其实也就是一个小小的玩具,在开发的过程中也学到了很多东西。

写后端的时候合理的布局很重要。

欢迎大家来和若若聊天呀!

功能介绍

  1. 私聊调教对话。
  2. 天气预报。
  3. 定时晚安。
  4. 发送自拍。
  5. 简单的对话。

更多功能正在开发中……