import json from astrbot.api.event import filter, AstrMessageEvent, MessageEventResult from astrbot.api.star import Context, Star, register from astrbot.api import logger import random from .back import time_long, volume, isUserExist, insertUser, seconds_to_hms, ml_to_l_ml, get_user_name import pymysql from .Tool import get_tool_name import astrbot.api.message_components as Comp from astrbot.core.utils.session_waiter import ( session_waiter, SessionController, ) import aiohttp # 需要异步HTTP客户端 from .TicTacToeGame import TicTacToeGame @register("helloworld", "YourName", "一个简单的 Hello World 插件", "1.0.0") class MyPlugin(Star): def __init__(self, context: Context): super().__init__(context) @filter.command("打打我的") async def dajiao(self, event: AstrMessageEvent): if isUserExist(event.get_sender_id()) != True: insertUser(event.get_sender_id()) time = round(random.uniform(1, 600), 2) V = round(random.uniform(0.01,100), 2) a = time_long(time) b = volume(V) user_name = event.get_sender_id() conn=pymysql.connect(host = '192.168.31.9' # 连接名称,默认127.0.0.1 ,user = 'saipo' # 用户名 ,passwd='Grasste0403' # 密码 ,port= 3306 # 端口,默认为3306 ,db='saipo' # 数据库名称 ,charset='utf8' # 字符编码 ) cur = conn.cursor() sql = "INSERT INTO dajiao (openid, timelong, volume) VALUES (%s, %s, %s)" values = (user_name,time,V) cur.execute(sql,values) conn.commit() cur.close() conn.close() yield event.plain_result(f"Hello,{get_user_name(event.get_sender_id())}, 你坚持了{time}s哦,{a}.射出{V}ml,{b}!") @filter.command("帮我连打十次") async def dajiaoX(self, event: AstrMessageEvent): if isUserExist(event.get_sender_id()) != True: insertUser(event.get_sender_id()) user_name = event.get_sender_id() conn=pymysql.connect(host = '192.168.31.9' # 连接名称,默认127.0.0.1 ,user = 'saipo' # 用户名 ,passwd='Grasste0403' # 密码 ,port= 3306 # 端口,默认为3306 ,db='saipo' # 数据库名称 ,charset='utf8' # 字符编码 ) cur = conn.cursor() time = [];V = [];a = [];b = [] for i in range(0,10): time.append(round(random.uniform(1, 600), 2)) V.append(round(random.uniform(0.01,100), 2)) a.append(time_long(time[i])) b.append(volume(V[i])) sql = "INSERT INTO dajiao (openid, timelong, volume) VALUES (%s, %s, %s),(%s, %s, %s),(%s, %s, %s),(%s, %s, %s),(%s, %s, %s),(%s, %s, %s),(%s, %s, %s),(%s, %s, %s),(%s, %s, %s),(%s, %s, %s)" values = (user_name,time[0],V[0]),(user_name,time[1],V[1]),(user_name,time[2],V[2]),(user_name,time[3],V[3]),(user_name,time[4],V[4]),(user_name,time[5],V[5]),(user_name,time[6],V[6]),(user_name,time[7],V[7]),(user_name,time[8],V[8]),(user_name,time[9],V[9]) cur.execute(sql,values) conn.commit() cur.close() conn.close() yield event.plain_result(f"Hello,{get_user_name(event.get_sender_id())},准备开始挑战十连咯!\n第一次你坚持了{time[0]}s哦,{a[0]}.射出{V[0]}ml,{b[0]}!\n第二次你坚持了{time[1]}s哦,{a[1]}.射出{V[1]}ml,{b[1]}!\n第三次你坚持了{time[2]}s哦,{a[2]}.射出{V[2]}ml,{b[2]}!\n第四次你坚持了{time[3]}s哦,{a[3]}.射出{V[3]}ml,{b[3]}!\n第五次你坚持了{time[4]}s哦,{a[4]}.射出{V[4]}ml,{b[4]}!\n第六次你坚持了{time[5]}s哦,{a[5]}.射出{V[5]}ml,{b[5]}!\n第七次你坚持了{time[6]}s哦,{a[6]}.射出{V[6]}ml,{b[6]}!\n第八次你坚持了{time[7]}s哦,{a[7]}.射出{V[7]}ml,{b[7]}!\n第九次你坚持了{time[8]}s哦,{a[8]}.射出{V[8]}ml,{b[8]}!\n第十次你坚持了{time[9]}s哦,{a[9]}.射出{V[9]}ml,{b[9]}!\n哎呀,一点也打不出来了!杂鱼!") @filter.command("打打你的") async def dajiao_other(self, event: AstrMessageEvent, message: str): if isUserExist(event.get_sender_id()) != True: insertUser(event.get_sender_id()) time = round(random.uniform(1, 600), 2) V = round(random.uniform(0.01,100), 2) a = time_long(time) b = volume(V) conn=pymysql.connect(host = '192.168.31.9' # 连接名称,默认127.0.0.1 ,user = 'saipo' # 用户名 ,passwd='Grasste0403' # 密码 ,port= 3306 # 端口,默认为3306 ,db='saipo' # 数据库名称 ,charset='utf8' # 字符编码 ) sql = "SELECT openid FROM dajiaouser WHERE nameid = %s" cur = conn.cursor() cur.execute(sql, (message,)) user_name_dajiao = cur.fetchall()[0][0] if user_name_dajiao is None: cur.close() conn.close() yield event.plain_result(f"没有找到这个人哦!") sql = "INSERT INTO dajiao (openid, timelong, volume) VALUES (%s, %s, %s)" values = (user_name_dajiao,time,V) cur.execute(sql,values) conn.commit() cur.close() conn.close() yield event.plain_result(f"Hello,{get_user_name(event.get_sender_id())},你帮助{get_user_name(user_name_dajiao)}进行了一次打胶, TA坚持了{time}s哦,{a}.射出{V}ml,{b}!") @filter.command("我的时长成就") async def timebeat(self, event: AstrMessageEvent): if isUserExist(event.get_sender_id()) != True: insertUser(event.get_sender_id()) conn=pymysql.connect(host = '192.168.31.9' # 连接名称,默认127.0.0.1 ,user = 'saipo' # 用户名 ,passwd='Grasste0403' # 密码 ,port= 3306 # 端口,默认为3306 ,db='saipo' # 数据库名称 ,charset='utf8' # 字符编码 ) cur = conn.cursor() sql = "SELECT eventTime, openid, timelong, volume FROM dajiao WHERE openid = %s AND timelong = (SELECT MAX(timelong) FROM dajiao WHERE openid = %s) LIMIT 1" openid = event.get_sender_id() cur.execute(sql,(openid,openid)) date1 = cur.fetchone() sql2 = "SELECT eventTime, openid, timelong, volume FROM dajiao WHERE openid = %s AND timelong = (SELECT MIN(timelong) FROM dajiao WHERE openid = %s) LIMIT 1" cur.execute(sql2,(openid,openid)) date2 = cur.fetchone() cur.close() conn.close() yield event.plain_result(f"Hello,{get_user_name(event.get_sender_id())}\n你的时长最佳记录为{date1[2]}s,发生在{date1[0]},射出{date1[3]}ml!\n你的时长最弱记录为{date2[2]}s,发生在{date2[0]},射出{date2[3]}ml!") @filter.command("我的产量成就") async def volumebeat(self, event: AstrMessageEvent): if isUserExist(event.get_sender_id()) != True: insertUser(event.get_sender_id()) conn=pymysql.connect(host = '192.168.31.9' # 连接名称,默认127.0.0.1 ,user = 'saipo' # 用户名 ,passwd='Grasste0403' # 密码 ,port= 3306 # 端口,默认为3306 ,db='saipo' # 数据库名称 ,charset='utf8' # 字符编码 ) cur = conn.cursor() sql = "SELECT eventTime, openid, timelong, volume FROM dajiao WHERE openid = %s AND volume = (SELECT MAX(volume) FROM dajiao WHERE openid = %s) LIMIT 1" openid = event.get_sender_id() cur.execute(sql,(openid,openid)) date1 = cur.fetchone() sql2 = "SELECT eventTime, openid, timelong, volume FROM dajiao WHERE openid = %s AND volume = (SELECT MIN(volume) FROM dajiao WHERE openid = %s) LIMIT 1" cur.execute(sql2,(openid,openid)) date2 = cur.fetchone() cur.close() conn.close() yield event.plain_result(f"Hello,{get_user_name(event.get_sender_id())}\n你的产量最佳记录为{date1[3]}ml,发生在{date1[0]},时长为{date1[2]}s!\n你的产量最弱记录为{date2[3]}ml,发生在{date2[0]},时长为{date2[2]}s!") @filter.command("我的个人信息") async def userinfo(self, event: AstrMessageEvent): if isUserExist(event.get_sender_id()) != True: insertUser(event.get_sender_id()) conn=pymysql.connect(host = '192.168.31.9' # 连接名称,默认 ,user = 'saipo' ,passwd='Grasste0403' # 密码 ,port= 3306 # 端口,默认为3306 ,db='saipo' # 数据库名称 ,charset='utf8' # 字符编码 ) cur = conn.cursor() sql = "SELECT uid, openid , nameid FROM dajiaouser WHERE openid = %s" openid = event.get_sender_id() cur.execute(sql,(openid,)) date1 = cur.fetchone() sql2 = "SELECT sum(timelong),sum(volume) FROM dajiao WHERE openid = %s group by openid" cur.execute(sql2,(openid,)) date2 = cur.fetchone() cur.close() conn.close() yield event.plain_result(f"个人信息\nuid:{date1[0]}\nopenid:{date1[1]}\nname:{date1[2]}\n总时长:{seconds_to_hms(date2[0])}\n总产量:{ml_to_l_ml(date2[1])}") @filter.command("修改昵称") async def updatename(self, event: AstrMessageEvent, name: str): if isUserExist(event.get_sender_id()) != True: insertUser(event.get_sender_id()) conn=pymysql.connect(host = '192.168.31.9' # 连接名称,默认 ,user = 'saipo' ,passwd='Grasste0403' # 密码 ,port= 3306 # 端口,默认为3306 ,db='saipo' # 数据库名称 ,charset='utf8' # 字符编码 ) cur = conn.cursor() sql = "UPDATE dajiaouser SET nameid = %s WHERE openid = %s" openid = event.get_sender_id() cur.execute(sql,(name,openid)) conn.commit() cur.close() conn.close() yield event.plain_result(f"昵称修改成功!\n新的昵称为:{name}") @filter.command("我的背包") async def bag(self, event: AstrMessageEvent): if isUserExist(event.get_sender_id()) != True: insertUser(event.get_sender_id()) conn=pymysql.connect(host = '192.168.31.9' # 连接名称,默认 ,user = 'saipo' ,passwd='Grasste0403' # 密码 ,port= 3306 # 端口,默认为3306 ,db='saipo' # 数据库名称 ,charset='utf8' # 字符编码 ) cur = conn.cursor() sql = "SELECT baglist FROM dajiaouser WHERE openid = %s" openid = event.get_sender_id() cur.execute(sql,(openid,)) date = cur.fetchone() baglist = json.loads(date[0]) baglist_str = "" if len(baglist["baglist"]) == 0: baglist_str = "背包空空如也" else: for item in baglist["baglist"]: if get_tool_name(item["id"]) is None: cur.close() conn.close() yield event.plain_result(f"背包存在异常!") else: baglist_str += f"{get_tool_name(item['id'])} x {item['num']}\n" cur.close() conn.close() yield event.plain_result(f"背包列表:\n{baglist_str}") @filter.command("产量成就排行") async def volumerank(self, event: AstrMessageEvent): conn=pymysql.connect(host = '192.168.31.9' # 连接名称,默认 ,user = 'saipo' ,passwd='Grasste0403' # 密码 ,port= 3306 # 端口,默认为3306 ,db='saipo' # 数据库名称 ,charset='utf8' # 字符编码 ) cur = conn.cursor() sql = "SELECT du.uid, du.nameid, t.max_volume, MAX(dj.eventTime) AS eventTime FROM dajiaouser du LEFT JOIN (SELECT openid, MAX(volume) AS max_volume FROM dajiao GROUP BY openid) t ON du.openid = t.openid LEFT JOIN dajiao dj ON du.openid = dj.openid AND t.max_volume = dj.volume GROUP BY du.uid, du.nameid, t.max_volume;" cur.execute(sql) date = cur.fetchall() ranklist = [] for i in range(len(date)): ranklist.append((date[i][0], date[i][1], date[i][2], date[i][3])) ranklist.sort(key=lambda x: x[2], reverse=True) ranklist_str = "产量最高排名\n" for i in range(len(ranklist)): ranklist_str += f"{i+1} uid:{ranklist[i][0]} nameid:{ranklist[i][1]} 最高产量:{ranklist[i][2]}ml 打胶时间:{ranklist[i][3]}\n" ranklist_str += "\n" sql2 = "SELECT du.uid, du.nameid, t.min_volume, MAX(dj.eventTime) AS eventTime FROM dajiaouser du LEFT JOIN (SELECT openid, MIN(volume) AS min_volume FROM dajiao GROUP BY openid) t ON du.openid = t.openid LEFT JOIN dajiao dj ON du.openid = dj.openid AND t.min_volume = dj.volume GROUP BY du.uid, du.nameid, t.min_volume;" cur.execute(sql2) date2 = cur.fetchall() ranklist2 = [] for i in range(len(date2)): ranklist2.append((date2[i][0], date2[i][1], date2[i][2], date2[i][3])) ranklist2.sort(key=lambda x: x[2]) ranklist_str += "产量最低排名\n" for i in range(len(ranklist2)): ranklist_str += f"{i+1} uid:{ranklist2[i][0]} nameid:{ranklist2[i][1]} 最低产量:{ranklist2[i][2]}ml 打胶时间:{ranklist2[i][3]}\n" yield event.plain_result(ranklist_str) cur.close() conn.close() @filter.command("时长成就排行") async def timerank(self, event: AstrMessageEvent): conn=pymysql.connect(host = '192.168.31.9' # 连接名称,默认 ,user = 'saipo' ,passwd='Grasste0403' # 密码 ,port= 3306 # 端口,默认为3306 ,db='saipo' # 数据库名称 ,charset='utf8' # 字符编码 ) cur = conn.cursor() sql = "SELECT du.uid, du.nameid, t.max_timelong, MAX(dj.eventTime) AS eventTime FROM dajiaouser du LEFT JOIN (SELECT openid, MAX(timelong) AS max_timelong FROM dajiao GROUP BY openid) t ON du.openid = t.openid LEFT JOIN dajiao dj ON du.openid = dj.openid AND t.max_timelong = dj.timelong GROUP BY du.uid, du.nameid, t.max_timelong;" cur.execute(sql) date = cur.fetchall() ranklist = [] for i in range(len(date)): ranklist.append((date[i][0], date[i][1], date[i][2], date[i][3])) ranklist.sort(key=lambda x: x[2], reverse=True) ranklist_str = "时长最长排名\n" for i in range(len(ranklist)): ranklist_str += f"{i+1} uid:{ranklist[i][0]} {ranklist[i][1]} 最长时长:{ranklist[i][2]}s 打胶时间:{ranklist[i][3]}\n" ranklist_str += "\n" sql2 = "SELECT du.uid, du.nameid, t.min_timelong, MAX(dj.eventTime) AS eventTime FROM dajiaouser du LEFT JOIN (SELECT openid, MIN(timelong) AS min_timelong FROM dajiao GROUP BY openid) t ON du.openid = t.openid LEFT JOIN dajiao dj ON du.openid = dj.openid AND t.min_timelong = dj.timelong GROUP BY du.uid, du.nameid, t.min_timelong;" cur.execute(sql2) date2 = cur.fetchall() ranklist2 = [] for i in range(len(date2)): ranklist2.append((date2[i][0], date2[i][1], date2[i][2], date2[i][3])) ranklist2.sort(key=lambda x: x[2]) ranklist_str += "时长最短排名\n" for i in range(len(ranklist2)): ranklist_str += f"{i+1} uid:{ranklist2[i][0]} {ranklist2[i][1]} 最短时长:{ranklist2[i][2]}s 打胶时间:{ranklist2[i][3]}\n" yield event.plain_result(ranklist_str) cur.close() conn.close() testNumber = 0 @filter.command("测试") async def test(self, event: AstrMessageEvent): global testNumber testNumber = testNumber + 1 yield event.plain_result(f"累积结果: {testNumber}") @filter.command("Dora") async def Dora(self, event: AstrMessageEvent): async with aiohttp.ClientSession() as session: async with session.get('https://www.doro.asia/api/random-sticker') as resp: if resp.status == 200: data = await resp.json() if data.get('success', False): image_url = data['sticker']['url'] chain = [ Comp.At(qq=event.get_sender_id()), # At 消息发送者 Comp.Plain("来看这个图:"), Comp.Image.fromURL(image_url), # 使用API返回的图片URL ] yield event.chain_result(chain) else: yield "获取表情包失败" else: yield "API请求失败" @filter.command("来句骚话") async def lovelive(self, event: AstrMessageEvent): async with aiohttp.ClientSession() as session: async with session.get('https://api.lovelive.tools/api/SweetNothings') as resp: if resp.status == 200: text = await resp.text() # 获取返回的文本内容 chain = [ Comp.At(qq=event.get_sender_id()), # At 消息发送者 Comp.Plain(text) # 使用API返回的文本 ] yield event.chain_result(chain) else: yield "获取骚话失败" @filter.command("部署") async def bushutest(self, event: AstrMessageEvent): yield event.plain_result("自动部署成功!!") @filter.command("井字棋") async def tic_tac_toe_handler(self, event: AstrMessageEvent): """井字棋游戏处理器""" try: # 初始化游戏 game = TicTacToeGame() # 发送初始说明和空棋盘 instructions = ( "欢迎来到井字棋游戏!\n" "输入1-9的数字来选择位置:\n" " 1 | 2 | 3 \n" "-----------\n" " 4 | 5 | 6 \n" "-----------\n" " 7 | 8 | 9 \n" "\n当前棋盘:" + game.print_board() ) yield event.plain_result(instructions) @session_waiter(timeout=60, record_history_chains=False) async def game_waiter(controller: SessionController, event: AstrMessageEvent): nonlocal game idiom = event.message_str # 用户发来的成语,假设是 "一马当先" # 玩家回合 if game.current_player == game.player_symbol: move = int(idiom) - 1 if not game.make_move(move): await event.send(event.make_result(chain=[Comp.Plain("无效移动,请重试。")])) controller.keep(timeout=60, reset_timeout=True) return else: # 电脑回合 move = game.computer_move() game.board[move] = game.current_player # 检查游戏状态 winner = game.check_winner() if winner: result = event.make_result() result.chain = [ Comp.Plain(f"{game.print_board()}\n"), Comp.Plain("恭喜你赢了!" if winner == game.player_symbol else "电脑赢了!") ] await event.send(result) controller.stop() return if game.is_board_full(): result = event.make_result() result.chain = [ Comp.Plain(f"{game.print_board()}\n"), Comp.Plain("平局!") ] await event.send(result) controller.stop() return # 切换玩家并继续游戏 game.switch_player() # 发送更新后的棋盘 result = event.make_result() if game.current_player == game.player_symbol: result.chain = [ Comp.Plain(f"{game.print_board()}\n"), Comp.Plain("轮到你走了,请输入1-9:") ] else: result.chain = [ Comp.Plain(f"{game.print_board()}\n"), Comp.Plain("电脑思考中...") ] await event.send(result) # 重置超时时间 controller.keep(timeout=60, reset_timeout=True) try: await game_waiter(event) except TimeoutError: yield event.plain_result("井字棋游戏超时结束!") except Exception as e: yield event.plain_result(f"游戏出错: {str(e)}") finally: event.stop_event() except Exception as e: logger.error(f"井字棋游戏错误: {str(e)}") yield event.plain_result("游戏发生错误,请稍后再试。")