2025-05-24 11:33:50 +08:00

501 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
from astrbot.api.event import filter, AstrMessageEvent, MessageEventResult
from astrbot.api.star import Context, Star, register
from astrbot.api import logger
import random
from .back import time_long, volume, isUserExist, insertUser, seconds_to_hms, ml_to_l_ml, get_user_name
import pymysql
import matplotlib
import matplotlib.pyplot as plt
from .Tool import get_tool_name
import astrbot.api.message_components as Comp
from astrbot.core.utils.session_waiter import (
session_waiter,
SessionController,
)
import aiohttp # 需要异步HTTP客户端
from .TicTacToeGame import TicTacToeGame
@register("helloworld", "YourName", "一个简单的 Hello World 插件", "1.0.0")
class MyPlugin(Star):
def __init__(self, context: Context):
super().__init__(context)
@filter.command("打打我的")
async def dajiao(self, event: AstrMessageEvent):
if isUserExist(event.get_sender_id()) != True:
insertUser(event.get_sender_id())
time = round(random.uniform(1, 600), 2)
V = round(random.uniform(0.01,100), 2)
a = time_long(time)
b = volume(V)
user_name = event.get_sender_id()
conn=pymysql.connect(host = '192.168.31.9' # 连接名称默认127.0.0.1
,user = 'saipo' # 用户名
,passwd='Grasste0403' # 密码
,port= 3306 # 端口默认为3306
,db='saipo' # 数据库名称
,charset='utf8' # 字符编码
)
cur = conn.cursor()
sql = "INSERT INTO dajiao (openid, timelong, volume) VALUES (%s, %s, %s)"
values = (user_name,time,V)
cur.execute(sql,values)
conn.commit()
cur.close()
conn.close()
yield event.plain_result(f"Hello,{get_user_name(event.get_sender_id())}, 你坚持了{time}s哦{a}.射出{V}ml,{b}!")
@filter.command("帮我连打十次")
async def dajiaoX(self, event: AstrMessageEvent):
if isUserExist(event.get_sender_id()) != True:
insertUser(event.get_sender_id())
user_name = event.get_sender_id()
conn=pymysql.connect(host = '192.168.31.9' # 连接名称默认127.0.0.1
,user = 'saipo' # 用户名
,passwd='Grasste0403' # 密码
,port= 3306 # 端口默认为3306
,db='saipo' # 数据库名称
,charset='utf8' # 字符编码
)
cur = conn.cursor()
time = [];V = [];a = [];b = []
for i in range(0,10):
time.append(round(random.uniform(1, 600), 2))
V.append(round(random.uniform(0.01,100), 2)) if i==0 else V.append(round(random.uniform(1, V[i-1]+1), 2))
a.append(time_long(time[i]))
b.append(volume(V[i]))
sql = "INSERT INTO dajiao (openid, timelong, volume) VALUES (%s, %s, %s),(%s, %s, %s),(%s, %s, %s),(%s, %s, %s),(%s, %s, %s),(%s, %s, %s),(%s, %s, %s),(%s, %s, %s),(%s, %s, %s),(%s, %s, %s)"
values = (user_name,time[0],V[0],user_name,time[1],V[1],user_name,time[2],V[2],user_name,time[3],V[3],user_name,time[4],V[4],user_name,time[5],V[5],user_name,time[6],V[6],user_name,time[7],V[7],user_name,time[8],V[8],user_name,time[9],V[9])
cur.execute(sql,values)
conn.commit()
cur.close()
conn.close()
yield event.plain_result(f"Hello,{get_user_name(event.get_sender_id())},准备开始挑战十连咯!\n第一次你坚持了{time[0]}s哦,{a[0]}.射出{V[0]}ml,{b[0]}!\n第二次你坚持了{time[1]}s哦,{a[1]}.射出{V[1]}ml,{b[1]}!\n第三次你坚持了{time[2]}s哦,{a[2]}.射出{V[2]}ml,{b[2]}!\n第四次你坚持了{time[3]}s哦,{a[3]}.射出{V[3]}ml,{b[3]}!\n第五次你坚持了{time[4]}s哦,{a[4]}.射出{V[4]}ml,{b[4]}!\n第六次你坚持了{time[5]}s哦,{a[5]}.射出{V[5]}ml,{b[5]}!\n第七次你坚持了{time[6]}s哦,{a[6]}.射出{V[6]}ml,{b[6]}!\n第八次你坚持了{time[7]}s哦,{a[7]}.射出{V[7]}ml,{b[7]}!\n第九次你坚持了{time[8]}s哦,{a[8]}.射出{V[8]}ml,{b[8]}!\n第十次你坚持了{time[9]}s哦,{a[9]}.射出{V[9]}ml,{b[9]}!\n哎呀,一点也打不出来了!杂鱼!")
@filter.command("打打你的")
async def dajiao_other(self, event: AstrMessageEvent, message: str):
if isUserExist(event.get_sender_id()) != True:
insertUser(event.get_sender_id())
time = round(random.uniform(1, 600), 2)
V = round(random.uniform(0.01,100), 2)
a = time_long(time)
b = volume(V)
conn=pymysql.connect(host = '192.168.31.9' # 连接名称默认127.0.0.1
,user = 'saipo' # 用户名
,passwd='Grasste0403' # 密码
,port= 3306 # 端口默认为3306
,db='saipo' # 数据库名称
,charset='utf8' # 字符编码
)
sql = "SELECT openid FROM dajiaouser WHERE nameid = %s"
cur = conn.cursor()
cur.execute(sql, (message,))
user_name_dajiao = cur.fetchall()[0][0]
if user_name_dajiao is None:
cur.close()
conn.close()
yield event.plain_result(f"没有找到这个人哦!")
sql = "INSERT INTO dajiao (openid, timelong, volume) VALUES (%s, %s, %s)"
values = (user_name_dajiao,time,V)
cur.execute(sql,values)
conn.commit()
cur.close()
conn.close()
yield event.plain_result(f"Hello,{get_user_name(event.get_sender_id())},你帮助{get_user_name(user_name_dajiao)}进行了一次打胶, TA坚持了{time}s哦{a}.射出{V}ml,{b}!")
@filter.command("我的时长成就")
async def timebeat(self, event: AstrMessageEvent):
if isUserExist(event.get_sender_id()) != True:
insertUser(event.get_sender_id())
conn=pymysql.connect(host = '192.168.31.9' # 连接名称默认127.0.0.1
,user = 'saipo' # 用户名
,passwd='Grasste0403' # 密码
,port= 3306 # 端口默认为3306
,db='saipo' # 数据库名称
,charset='utf8' # 字符编码
)
cur = conn.cursor()
sql = "SELECT eventTime, openid, timelong, volume FROM dajiao WHERE openid = %s AND timelong = (SELECT MAX(timelong) FROM dajiao WHERE openid = %s) LIMIT 1"
openid = event.get_sender_id()
cur.execute(sql,(openid,openid))
date1 = cur.fetchone()
sql2 = "SELECT eventTime, openid, timelong, volume FROM dajiao WHERE openid = %s AND timelong = (SELECT MIN(timelong) FROM dajiao WHERE openid = %s) LIMIT 1"
cur.execute(sql2,(openid,openid))
date2 = cur.fetchone()
cur.close()
conn.close()
yield event.plain_result(f"Hello,{get_user_name(event.get_sender_id())}\n你的时长最佳记录为{date1[2]}s,发生在{date1[0]},射出{date1[3]}ml\n你的时长最弱记录为{date2[2]}s,发生在{date2[0]},射出{date2[3]}ml")
@filter.command("我的产量成就")
async def volumebeat(self, event: AstrMessageEvent):
if isUserExist(event.get_sender_id()) != True:
insertUser(event.get_sender_id())
conn=pymysql.connect(host = '192.168.31.9' # 连接名称默认127.0.0.1
,user = 'saipo' # 用户名
,passwd='Grasste0403' # 密码
,port= 3306 # 端口默认为3306
,db='saipo' # 数据库名称
,charset='utf8' # 字符编码
)
cur = conn.cursor()
sql = "SELECT eventTime, openid, timelong, volume FROM dajiao WHERE openid = %s AND volume = (SELECT MAX(volume) FROM dajiao WHERE openid = %s) LIMIT 1"
openid = event.get_sender_id()
cur.execute(sql,(openid,openid))
date1 = cur.fetchone()
sql2 = "SELECT eventTime, openid, timelong, volume FROM dajiao WHERE openid = %s AND volume = (SELECT MIN(volume) FROM dajiao WHERE openid = %s) LIMIT 1"
cur.execute(sql2,(openid,openid))
date2 = cur.fetchone()
cur.close()
conn.close()
yield event.plain_result(f"Hello,{get_user_name(event.get_sender_id())}\n你的产量最佳记录为{date1[3]}ml,发生在{date1[0]},时长为{date1[2]}s\n你的产量最弱记录为{date2[3]}ml,发生在{date2[0]},时长为{date2[2]}s")
@filter.command("我的个人信息")
async def userinfo(self, event: AstrMessageEvent):
if isUserExist(event.get_sender_id()) != True:
insertUser(event.get_sender_id())
conn=pymysql.connect(host = '192.168.31.9' # 连接名称,默认
,user = 'saipo'
,passwd='Grasste0403' # 密码
,port= 3306 # 端口默认为3306
,db='saipo' # 数据库名称
,charset='utf8' # 字符编码
)
cur = conn.cursor()
sql = "SELECT uid, openid , nameid FROM dajiaouser WHERE openid = %s"
openid = event.get_sender_id()
cur.execute(sql,(openid,))
date1 = cur.fetchone()
sql2 = "SELECT sum(timelong),sum(volume) FROM dajiao WHERE openid = %s group by openid"
cur.execute(sql2,(openid,))
date2 = cur.fetchone()
cur.close()
conn.close()
yield event.plain_result(f"个人信息\nuid{date1[0]}\nopenid{date1[1]}\nname{date1[2]}\n总时长:{seconds_to_hms(date2[0])}\n总产量:{ml_to_l_ml(date2[1])}")
@filter.command("修改昵称")
async def updatename(self, event: AstrMessageEvent, name: str):
if isUserExist(event.get_sender_id()) != True:
insertUser(event.get_sender_id())
conn=pymysql.connect(host = '192.168.31.9' # 连接名称,默认
,user = 'saipo'
,passwd='Grasste0403' # 密码
,port= 3306 # 端口默认为3306
,db='saipo' # 数据库名称
,charset='utf8' # 字符编码
)
cur = conn.cursor()
sql = "UPDATE dajiaouser SET nameid = %s WHERE openid = %s"
openid = event.get_sender_id()
cur.execute(sql,(name,openid))
conn.commit()
cur.close()
conn.close()
yield event.plain_result(f"昵称修改成功!\n新的昵称为:{name}")
@filter.command("我的背包")
async def bag(self, event: AstrMessageEvent):
if isUserExist(event.get_sender_id()) != True:
insertUser(event.get_sender_id())
conn=pymysql.connect(host = '192.168.31.9' # 连接名称,默认
,user = 'saipo'
,passwd='Grasste0403' # 密码
,port= 3306 # 端口默认为3306
,db='saipo' # 数据库名称
,charset='utf8' # 字符编码
)
cur = conn.cursor()
sql = "SELECT baglist FROM dajiaouser WHERE openid = %s"
openid = event.get_sender_id()
cur.execute(sql,(openid,))
date = cur.fetchone()
baglist = json.loads(date[0])
baglist_str = ""
if len(baglist["baglist"]) == 0:
baglist_str = "背包空空如也"
else:
for item in baglist["baglist"]:
if get_tool_name(item["id"]) is None:
cur.close()
conn.close()
yield event.plain_result(f"背包存在异常!")
else:
baglist_str += f"{get_tool_name(item['id'])} x {item['num']}\n"
cur.close()
conn.close()
yield event.plain_result(f"背包列表:\n{baglist_str}")
@filter.command("产量成就排行")
async def volumerank(self, event: AstrMessageEvent):
conn=pymysql.connect(host = '192.168.31.9' # 连接名称,默认
,user = 'saipo'
,passwd='Grasste0403' # 密码
,port= 3306 # 端口默认为3306
,db='saipo' # 数据库名称
,charset='utf8' # 字符编码
)
cur = conn.cursor()
sql = "SELECT du.uid, du.nameid, t.max_volume, MAX(dj.eventTime) AS eventTime FROM dajiaouser du LEFT JOIN (SELECT openid, MAX(volume) AS max_volume FROM dajiao GROUP BY openid) t ON du.openid = t.openid LEFT JOIN dajiao dj ON du.openid = dj.openid AND t.max_volume = dj.volume GROUP BY du.uid, du.nameid, t.max_volume;"
cur.execute(sql)
date = cur.fetchall()
ranklist = []
for i in range(len(date)):
ranklist.append((date[i][0], date[i][1], date[i][2], date[i][3]))
ranklist.sort(key=lambda x: x[2], reverse=True)
ranklist_str = "产量最高排名\n"
for i in range(len(ranklist)):
ranklist_str += f"{i+1} uid:{ranklist[i][0]} nameid:{ranklist[i][1]} 最高产量:{ranklist[i][2]}ml 打胶时间:{ranklist[i][3]}\n"
ranklist_str += "\n"
sql2 = "SELECT du.uid, du.nameid, t.min_volume, MAX(dj.eventTime) AS eventTime FROM dajiaouser du LEFT JOIN (SELECT openid, MIN(volume) AS min_volume FROM dajiao GROUP BY openid) t ON du.openid = t.openid LEFT JOIN dajiao dj ON du.openid = dj.openid AND t.min_volume = dj.volume GROUP BY du.uid, du.nameid, t.min_volume;"
cur.execute(sql2)
date2 = cur.fetchall()
ranklist2 = []
for i in range(len(date2)):
ranklist2.append((date2[i][0], date2[i][1], date2[i][2], date2[i][3]))
ranklist2.sort(key=lambda x: x[2])
ranklist_str += "产量最低排名\n"
for i in range(len(ranklist2)):
ranklist_str += f"{i+1} uid:{ranklist2[i][0]} nameid:{ranklist2[i][1]} 最低产量:{ranklist2[i][2]}ml 打胶时间:{ranklist2[i][3]}\n"
yield event.plain_result(ranklist_str)
cur.close()
conn.close()
@filter.command("时长成就排行")
async def timerank(self, event: AstrMessageEvent):
conn=pymysql.connect(host = '192.168.31.9' # 连接名称,默认
,user = 'saipo'
,passwd='Grasste0403' # 密码
,port= 3306 # 端口默认为3306
,db='saipo' # 数据库名称
,charset='utf8' # 字符编码
)
cur = conn.cursor()
sql = "SELECT du.uid, du.nameid, t.max_timelong, MAX(dj.eventTime) AS eventTime FROM dajiaouser du LEFT JOIN (SELECT openid, MAX(timelong) AS max_timelong FROM dajiao GROUP BY openid) t ON du.openid = t.openid LEFT JOIN dajiao dj ON du.openid = dj.openid AND t.max_timelong = dj.timelong GROUP BY du.uid, du.nameid, t.max_timelong;"
cur.execute(sql)
date = cur.fetchall()
ranklist = []
for i in range(len(date)):
ranklist.append((date[i][0], date[i][1], date[i][2], date[i][3]))
ranklist.sort(key=lambda x: x[2], reverse=True)
ranklist_str = "时长最长排名\n"
for i in range(len(ranklist)):
ranklist_str += f"{i+1} uid:{ranklist[i][0]} {ranklist[i][1]} 最长时长:{ranklist[i][2]}s 打胶时间:{ranklist[i][3]}\n"
ranklist_str += "\n"
sql2 = "SELECT du.uid, du.nameid, t.min_timelong, MAX(dj.eventTime) AS eventTime FROM dajiaouser du LEFT JOIN (SELECT openid, MIN(timelong) AS min_timelong FROM dajiao GROUP BY openid) t ON du.openid = t.openid LEFT JOIN dajiao dj ON du.openid = dj.openid AND t.min_timelong = dj.timelong GROUP BY du.uid, du.nameid, t.min_timelong;"
cur.execute(sql2)
date2 = cur.fetchall()
ranklist2 = []
for i in range(len(date2)):
ranklist2.append((date2[i][0], date2[i][1], date2[i][2], date2[i][3]))
ranklist2.sort(key=lambda x: x[2])
ranklist_str += "时长最短排名\n"
for i in range(len(ranklist2)):
ranklist_str += f"{i+1} uid:{ranklist2[i][0]} {ranklist2[i][1]} 最短时长:{ranklist2[i][2]}s 打胶时间:{ranklist2[i][3]}\n"
yield event.plain_result(ranklist_str)
cur.close()
conn.close()
@filter.command("我的日产量")
async def mydayvolume(self, event: AstrMessageEvent):
if isUserExist(event.get_sender_id()) != True:
insertUser(event.get_sender_id())
conn=pymysql.connect(host = '192.168.31.9' # 连接名称,默认
,user = 'saipo'
,passwd='Grasste0403' # 密码
,port= 3306 # 端口默认为3306
,db='saipo' # 数据库名称
,charset='utf8' # 字符编码
)
cur = conn.cursor()
sql = "SELECT DAY(eventTime),round(sum(volume),2) FROM dajiao WHERE openid = %s and DATE_SUB(CURDATE(), INTERVAL 7 DAY) <= date(eventTime) GROUP BY DAY(eventTime);"
openid = event.get_sender_id()
cur.execute(sql,(openid,))
date = cur.fetchall()
if len(date) == 0:
cur.close()
conn.close()
yield event.plain_result(f"没有找到数据哦!")
day = []
volume = []
for i in range(len(date)):
day.append(date[i][0])
volume.append(date[i][1])
plt.plot(day, volume)
plt.title('dayvolume')
plt.xlabel('day')
plt.ylabel('volume')
plt.xticks(day)
#设置显示数据
for x, y in zip(day, volume):
plt.text(x, y, f'{y}', ha='center', va='bottom', fontsize=10)
plt.grid()
plt.savefig('dayvolume.png')
plt.close()
with open('dayvolume.png', 'rb') as f:
image_data = f.read()
image = Comp.Image.fromBytes(image_data)
chain = [
Comp.At(qq=event.get_sender_id()),
Comp.Plain("日产量折线图:"),
image,
]
yield event.chain_result(chain)
cur.close()
conn.close()
testNumber = 0
@filter.command("测试")
async def test(self, event: AstrMessageEvent):
global testNumber
testNumber = testNumber + 1
yield event.plain_result(f"累积结果: {testNumber}")
@filter.command("Dora")
async def Dora(self, event: AstrMessageEvent):
async with aiohttp.ClientSession() as session:
async with session.get('https://www.doro.asia/api/random-sticker') as resp:
if resp.status == 200:
data = await resp.json()
if data.get('success', False):
image_url = data['sticker']['url']
chain = [
Comp.At(qq=event.get_sender_id()), # At 消息发送者
Comp.Plain("来看这个图:"),
Comp.Image.fromURL(image_url), # 使用API返回的图片URL
]
yield event.chain_result(chain)
else:
yield "获取表情包失败"
else:
yield "API请求失败"
@filter.command("来句骚话")
async def lovelive(self, event: AstrMessageEvent):
async with aiohttp.ClientSession() as session:
async with session.get('https://api.lovelive.tools/api/SweetNothings') as resp:
if resp.status == 200:
text = await resp.text() # 获取返回的文本内容
chain = [
Comp.At(qq=event.get_sender_id()), # At 消息发送者
Comp.Plain(text) # 使用API返回的文本
]
yield event.chain_result(chain)
else:
yield "获取骚话失败"
@filter.command("本地目录测试")
async def Juno(self, event: AstrMessageEvent):
yield event.plain_result("本地目录测试:")
yield event.image_result("path/to/M.png") # 发送图片
@filter.command("部署")
async def bushutest(self, event: AstrMessageEvent):
yield event.plain_result("自动部署成功!!")
@filter.command("井字棋")
async def tic_tac_toe_handler(self, event: AstrMessageEvent):
"""井字棋游戏处理器"""
try:
# 初始化游戏
game = TicTacToeGame()
# 发送初始说明和空棋盘
instructions = (
"欢迎来到井字棋游戏!\n"
"输入1-9的数字来选择位置:\n"
" 1 | 2 | 3 \n"
"-----------\n"
" 4 | 5 | 6 \n"
"-----------\n"
" 7 | 8 | 9 \n"
"\n当前棋盘:" + game.print_board()
)
yield event.plain_result(instructions)
@session_waiter(timeout=60, record_history_chains=False)
async def game_waiter(controller: SessionController, event: AstrMessageEvent):
nonlocal game
idiom = event.message_str # 用户发来的成语,假设是 "一马当先"
# 玩家回合
if game.current_player == game.player_symbol:
move = int(idiom) - 1
if not game.make_move(move):
await event.send(event.make_result(chain=[Comp.Plain("无效移动,请重试。")]))
controller.keep(timeout=60, reset_timeout=True)
return
else:
# 电脑回合
move = game.computer_move()
game.board[move] = game.current_player
# 检查游戏状态
winner = game.check_winner()
if winner:
result = event.make_result()
result.chain = [
Comp.Plain(f"{game.print_board()}\n"),
Comp.Plain("恭喜你赢了!" if winner == game.player_symbol else "电脑赢了!")
]
await event.send(result)
controller.stop()
return
if game.is_board_full():
result = event.make_result()
result.chain = [
Comp.Plain(f"{game.print_board()}\n"),
Comp.Plain("平局!")
]
await event.send(result)
controller.stop()
return
# 切换玩家并继续游戏
game.switch_player()
# 发送更新后的棋盘
result = event.make_result()
if game.current_player == game.player_symbol:
result.chain = [
Comp.Plain(f"{game.print_board()}\n"),
Comp.Plain("轮到你走了请输入1-9:")
]
else:
result.chain = [
Comp.Plain(f"{game.print_board()}\n"),
Comp.Plain("电脑思考中...")
]
await event.send(result)
# 重置超时时间
controller.keep(timeout=60, reset_timeout=True)
try:
await game_waiter(event)
except TimeoutError:
yield event.plain_result("井字棋游戏超时结束!")
except Exception as e:
yield event.plain_result(f"游戏出错: {str(e)}")
finally:
event.stop_event()
except Exception as e:
logger.error(f"井字棋游戏错误: {str(e)}")
yield event.plain_result("游戏发生错误,请稍后再试。")