Refactor
This commit is contained in:
parent
47da73aec6
commit
b9a7dd298c
6
.gitignore
vendored
6
.gitignore
vendored
@ -1,6 +1,8 @@
|
||||
config.py
|
||||
/__pycache__/*
|
||||
/images/*
|
||||
__pycache__/
|
||||
garfpy/__pycache__/
|
||||
*.venv*
|
||||
*.json
|
||||
*.old
|
||||
*.log*
|
||||
meows.py
|
||||
|
@ -13,4 +13,4 @@ RUN pip3 install openai
|
||||
RUN pip3 install aiohttp
|
||||
RUN pip3 install requests
|
||||
|
||||
CMD [ "python", "garfbot.py" ]
|
||||
CMD [ "python", "garfmain.py" ]
|
||||
|
428
garfbot.py
428
garfbot.py
@ -1,428 +0,0 @@
|
||||
import os
|
||||
import json
|
||||
import config
|
||||
import random
|
||||
import openai
|
||||
import logging
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import discord
|
||||
import requests
|
||||
import ipaddress
|
||||
import subprocess
|
||||
from base64 import b64encode
|
||||
from openai import AsyncOpenAI
|
||||
from datetime import datetime
|
||||
from collections import defaultdict
|
||||
from operator import itemgetter
|
||||
from logging.handlers import TimedRotatingFileHandler
|
||||
|
||||
|
||||
# Log setup
|
||||
logger = logging.getLogger('garflog')
|
||||
logger.setLevel(logging.INFO)
|
||||
formatter=logging.Formatter(
|
||||
'%(asctime)s [%(levelname)s] %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
file_handler = TimedRotatingFileHandler(
|
||||
'garfbot.log',
|
||||
when='midnight',
|
||||
interval=1,
|
||||
backupCount=7,
|
||||
delay=True # Counter-intuitively, this will flush output immediately
|
||||
)
|
||||
file_handler.setFormatter(formatter)
|
||||
console_handler = logging.StreamHandler()
|
||||
console_handler.setFormatter(formatter)
|
||||
logger.addHandler(file_handler)
|
||||
logger.addHandler(console_handler)
|
||||
|
||||
|
||||
# Bot Setup
|
||||
openaikey = config.OPENAI_TOKEN
|
||||
gapikey = config.GIF_TOKEN
|
||||
garfkey = config.GARFBOT_TOKEN
|
||||
txtmodel = "gpt-4o-mini"
|
||||
imgmodel = "dall-e-3"
|
||||
|
||||
intents = discord.Intents.default()
|
||||
intents.members = True
|
||||
intents.messages = True
|
||||
intents.message_content = True
|
||||
garfbot = discord.Client(intents=intents)
|
||||
|
||||
|
||||
# Network Utils Setup
|
||||
def is_private(target):
|
||||
try:
|
||||
ip_obj = ipaddress.ip_address(target)
|
||||
if ip_obj.is_private:
|
||||
return True
|
||||
except ValueError:
|
||||
if "crate.lan" in target.lower():
|
||||
return True
|
||||
if "crate.zip" in target.lower():
|
||||
return True
|
||||
if "memtec.org" in target.lower():
|
||||
return True
|
||||
if "crateit.net" in target.lower():
|
||||
return True
|
||||
if "garfbot.art" in target.lower():
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# Kroger Setup
|
||||
client_id = config.CLIENT_ID
|
||||
client_secret = config.CLIENT_SECRET
|
||||
|
||||
auth = b64encode(f"{client_id}:{client_secret}".encode()).decode()
|
||||
|
||||
def kroger_token():
|
||||
headers = {
|
||||
'Content-Type': 'application/x-www-form-urlencoded',
|
||||
'Authorization': f'Basic {auth}'
|
||||
}
|
||||
|
||||
response = requests.post('https://api.kroger.com/v1/connect/oauth2/token', headers=headers, data={
|
||||
'grant_type': 'client_credentials',
|
||||
'scope': 'product.compact'
|
||||
})
|
||||
|
||||
response.raise_for_status()
|
||||
return response.json()['access_token']
|
||||
|
||||
def find_store(zipcode, kroken):
|
||||
headers = {
|
||||
'Authorization': f'Bearer {kroken}',
|
||||
}
|
||||
params = {
|
||||
'filter.zipCode.near': zipcode,
|
||||
'filter.limit': 1,
|
||||
}
|
||||
response = requests.get('https://api.kroger.com/v1/locations', headers=headers, params=params)
|
||||
return response.json()
|
||||
|
||||
def search_product(product, loc_id, kroken):
|
||||
headers = {
|
||||
'Authorization': f'Bearer {kroken}',
|
||||
}
|
||||
params = {
|
||||
'filter.term': product,
|
||||
'filter.locationId': loc_id,
|
||||
'filter.limit': 10
|
||||
}
|
||||
response = requests.get('https://api.kroger.com/v1/products', headers=headers, params=params)
|
||||
return response.json()
|
||||
|
||||
|
||||
# Meows Json Handling
|
||||
meows_file = "meow_counts.json"
|
||||
stats_file = "user_stats.json"
|
||||
|
||||
def json_load(file_path, default):
|
||||
if os.path.isfile(file_path):
|
||||
with open(file_path, "r") as f:
|
||||
return json.load(f)
|
||||
else:
|
||||
return default
|
||||
|
||||
meow_counts = defaultdict(int, json_load(meows_file, {}))
|
||||
user_stats = json_load(stats_file, {})
|
||||
|
||||
|
||||
# GarfBot Ready!
|
||||
@garfbot.event
|
||||
async def on_ready():
|
||||
asyncio.create_task(process_image_requests()) # Important!
|
||||
logger.info(f"Logged in as {garfbot.user.name} running {txtmodel} and {imgmodel}.")
|
||||
|
||||
|
||||
# GarfChats
|
||||
async def generate_chat_response(question):
|
||||
try:
|
||||
client = AsyncOpenAI(api_key = openaikey)
|
||||
response = await client.chat.completions.create(
|
||||
model=txtmodel,
|
||||
messages=[
|
||||
{"role": "system", "content": "Pretend you are sarcastic Garfield."},
|
||||
{"role": "user", "content": f"{question}"}
|
||||
],
|
||||
max_tokens=400
|
||||
)
|
||||
answer = response.choices[0].message.content
|
||||
return answer.replace("an AI language model", "a cartoon animal")
|
||||
except openai.BadRequestError as e:
|
||||
return f"`GarfBot Error: {e}`"
|
||||
except openai.APIError as e:
|
||||
logger.info(e, flush=True)
|
||||
return f"`GarfBot Error: Monday`"
|
||||
except Exception as e:
|
||||
logger.info(e, flush=True)
|
||||
return f"`GarfBot Error: Lasagna`"
|
||||
|
||||
|
||||
# GarfPics
|
||||
async def generate_image(prompt):
|
||||
try:
|
||||
client = AsyncOpenAI(api_key = openaikey)
|
||||
response = await client.images.generate(
|
||||
model=imgmodel,
|
||||
prompt=prompt,
|
||||
n=1,
|
||||
size="1024x1024"
|
||||
)
|
||||
image_url = response.data[0].url
|
||||
return image_url
|
||||
except openai.BadRequestError as e:
|
||||
return f"`GarfBot Error: ({e.status_code}) - Your request was rejected as a result of our safety system.`"
|
||||
except openai.InternalServerError as e:
|
||||
logger.error(e)
|
||||
return f"`GarfBot Error: ({e.status_code}) - Monday`"
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
return f"`GarfBot Error: Lasagna`"
|
||||
|
||||
image_request_queue = asyncio.Queue()
|
||||
|
||||
async def process_image_requests():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
while True:
|
||||
request = await image_request_queue.get()
|
||||
message = request['message']
|
||||
prompt = request['prompt']
|
||||
image_url = await generate_image(prompt)
|
||||
if "GarfBot Error" not in image_url:
|
||||
async with session.get(image_url) as resp:
|
||||
if resp.status == 200:
|
||||
image_data = await resp.read()
|
||||
timestamp = message.created_at.strftime('%Y%m%d%H%M%S')
|
||||
save_filename = f"{timestamp}_generated_image.png"
|
||||
with open(save_filename, "wb") as f:
|
||||
f.write(image_data)
|
||||
with open(save_filename, "rb") as f:
|
||||
await message.channel.send(file=discord.File(f, save_filename))
|
||||
os.remove(save_filename)
|
||||
else:
|
||||
await message.channel.send("`GarfBot Error: Odie`")
|
||||
else:
|
||||
await message.channel.send(image_url)
|
||||
image_request_queue.task_done()
|
||||
await asyncio.sleep(2)
|
||||
|
||||
|
||||
# Message Listener
|
||||
@garfbot.event
|
||||
async def on_message(message):
|
||||
if message.author == garfbot.user:
|
||||
return
|
||||
|
||||
if message.content.lower().startswith("hey garfield") or isinstance(message.channel, discord.DMChannel):
|
||||
question = message.content[12:] if message.content.lower().startswith("hey garfield") else message.content
|
||||
answer = await generate_chat_response(question)
|
||||
await message.channel.send(answer)
|
||||
|
||||
if message.content.lower().startswith('garfpic '):
|
||||
user = message.author.name
|
||||
server = message.guild.name if message.guild else "Direct Message"
|
||||
prompt = message.content[8:]
|
||||
logger.info(f"Image Request - User: {user}, Server: {server}, Prompt: {prompt}")
|
||||
await message.channel.send(f"`Please wait... image generation queued: {prompt}`")
|
||||
await image_request_queue.put({'message': message, 'prompt': prompt})
|
||||
|
||||
if message.content.lower() == "lasagna":
|
||||
await send_gif(message, "garfield lasagna")
|
||||
|
||||
if message.content.lower() == "monday":
|
||||
await send_gif(message, "garfield monday")
|
||||
|
||||
if message.content.lower().startswith("garfgif "):
|
||||
search_term = message.content[8:]
|
||||
await send_gif(message, search_term)
|
||||
|
||||
if message.content.lower().startswith("garfping "):
|
||||
try:
|
||||
query = message.content.split()
|
||||
user = message.author.name
|
||||
server = message.guild.name if message.guild else "Direct Message"
|
||||
target = query[-1]
|
||||
logger.info(f"Ping Request - User: {user}, Server: {server}, Target: {target}", flush=True)
|
||||
if is_private(target):
|
||||
rejection = await generate_chat_response("Hey Garfield, explain to me why I am dumb for trying to hack your private computer network.")
|
||||
await message.channel.send(rejection)
|
||||
else:
|
||||
result = subprocess.run(['ping', '-c', '4', target], capture_output=True, text=True)
|
||||
await message.channel.send(f"`Ping result for {target}:`\n```\n{result.stdout}\n```")
|
||||
except Exception as e:
|
||||
await message.channel.send(f"`GarfBot Error: {str(e)}`")
|
||||
|
||||
if message.content.lower().startswith("garfdns "):
|
||||
try:
|
||||
query = message.content.split()
|
||||
user = message.author.name
|
||||
server = message.guild.name if message.guild else "Direct Message"
|
||||
target = query[-1]
|
||||
logger.info(f"NSLookup Request - User: {user}, Server: {server}, Target: {target}", flush=True)
|
||||
if is_private(target):
|
||||
rejection = await generate_chat_response("Hey Garfield, explain to me why I am dumb for trying to hack your private computer network.")
|
||||
await message.channel.send(rejection)
|
||||
else:
|
||||
result = subprocess.run(['nslookup', target], capture_output=True, text=True)
|
||||
await message.channel.send(f"`NSLookup result for {target}:`\n```\n{result.stdout}\n```")
|
||||
except Exception as e:
|
||||
await message.channel.send(f"`GarfBot Error: {str(e)}`")
|
||||
|
||||
if message.content.lower().startswith("garfhack "):
|
||||
try:
|
||||
query = message.content.split()
|
||||
user = message.author.name
|
||||
server = message.guild.name if message.guild else "Direct Message"
|
||||
target = query[-1]
|
||||
logger.info(f"Nmap Request - User: {user}, Server: {server}, Target: {target}", flush=True)
|
||||
if is_private(target):
|
||||
rejection = await generate_chat_response("Hey Garfield, explain to me why I am dumb for trying to hack your private computer network.")
|
||||
await message.channel.send(rejection)
|
||||
else:
|
||||
await message.channel.send(f"`Scanning {target}...`")
|
||||
result = subprocess.run(['nmap', '-Pn', '-O', '-v', target], capture_output=True, text=True)
|
||||
await message.channel.send(f"`Ping result for {target}:`\n```\n{result.stdout}\n```")
|
||||
except Exception as e:
|
||||
await message.channel.send(f"`GarfBot Error: {str(e)}`")
|
||||
|
||||
# Kroger Shopping
|
||||
if message.content.lower().startswith("garfshop "):
|
||||
try:
|
||||
kroken = kroger_token()
|
||||
kroger_query = message.content.split()
|
||||
product = " ".join(kroger_query[1:-1])
|
||||
zipcode = kroger_query[-1]
|
||||
loc_data = find_store(zipcode, kroken)
|
||||
loc_id = loc_data['data'][0]['locationId']
|
||||
store_name = loc_data['data'][0]['name']
|
||||
product_query = search_product(product, loc_id, kroken)
|
||||
products = product_query['data']
|
||||
sorted_products = sorted(products, key=lambda item: item['items'][0]['price']['regular'])
|
||||
response = f"Prices for `{product}` at `{store_name}` near `{zipcode}`:\n"
|
||||
for item in sorted_products:
|
||||
product_name = item['description']
|
||||
price = item['items'][0]['price']['regular']
|
||||
response += f"- `${price}`: {product_name} \n"
|
||||
await message.channel.send(response)
|
||||
except Exception as e:
|
||||
await message.channel.send(f"`GarfBot Error: {str(e)}`")
|
||||
|
||||
# Army of Dawn Server only!!
|
||||
if message.guild and message.guild.id == 719605634772893757:
|
||||
|
||||
if "meow" in message.content.lower():
|
||||
logger.info(f"Meow detected! {message.author.name} said: {message.content}")
|
||||
# print(f"Meow detected! {message.author.name} said: {message.content}", flush=True)
|
||||
|
||||
meow_counts[str(message.author.id)] += 1
|
||||
|
||||
with open(meows_file, "w") as f:
|
||||
json.dump(dict(meow_counts), f)
|
||||
|
||||
if message.content.lower() == "meowcount":
|
||||
response = f"My records show that <@{message.author.id}> has meowed {meow_counts[str(message.author.id)]} time(s). Have a nice day."
|
||||
await message.channel.send(response)
|
||||
|
||||
if message.content.lower() == "top meowers":
|
||||
top_meowers = sorted(meow_counts.items(), key=itemgetter(1), reverse=True)[:10]
|
||||
embed = discord.Embed(title="Top Meowers :cat:", color=0x000000)
|
||||
for i, (user_id, meow_count) in enumerate(top_meowers):
|
||||
user = await garfbot.fetch_user(int(user_id))
|
||||
embed.add_field(name=f"{i+1}. {user.name}", value=f"{meow_count} meows", inline=False)
|
||||
await message.channel.send(embed=embed)
|
||||
|
||||
if message.content.lower() == "checking in":
|
||||
user_id = str(message.author.id)
|
||||
if user_id in user_stats and user_stats[user_id]["check_in_time"] is not None:
|
||||
await message.channel.send(f"{message.author.mention} You are already checked in. Please check out first.")
|
||||
return
|
||||
|
||||
check_in_time = datetime.utcnow().timestamp()
|
||||
if user_id not in user_stats:
|
||||
user_stats[user_id] = {"check_ins": 0, "total_time": 0, "check_in_time": None}
|
||||
user_stats[user_id]["check_in_time"] = check_in_time
|
||||
await message.channel.send(f"{message.author.mention} You have been checked in. Please mute your microphone.")
|
||||
|
||||
elif message.content.lower() == "checking out":
|
||||
user_id = str(message.author.id)
|
||||
if user_id not in user_stats or user_stats[user_id]["check_in_time"] is None:
|
||||
await message.channel.send(f"{message.author.mention} You have not checked in yet. Please check in first.")
|
||||
return
|
||||
|
||||
check_out_time = datetime.utcnow().timestamp()
|
||||
check_in_time = user_stats[user_id]["check_in_time"]
|
||||
time_delta = check_out_time - check_in_time
|
||||
user_stats[user_id]["check_ins"] += 1
|
||||
user_stats[user_id]["total_time"] += time_delta
|
||||
user_stats[user_id]["check_in_time"] = None
|
||||
|
||||
with open("user_stats.json", "w") as f:
|
||||
json.dump(user_stats, f)
|
||||
await message.channel.send(f"{message.author.mention} You have been checked out. Your session was {time_delta:.2f} seconds.")
|
||||
|
||||
elif message.content.lower() == "stats":
|
||||
stats_embed = discord.Embed(title="User stats :trophy:", color=0x000000)
|
||||
sorted_user_stats = sorted(user_stats.items(), key=lambda x: x[1]["total_time"], reverse=True)
|
||||
table_rows = [["Name", "Check-ins", "Total Time"]]
|
||||
for user_id, stats in sorted_user_stats:
|
||||
if stats["check_in_time"] is None:
|
||||
total_time_seconds = stats["total_time"]
|
||||
hours, total_time_seconds = divmod(total_time_seconds, 3600)
|
||||
minutes, total_time_seconds = divmod(total_time_seconds, 60)
|
||||
seconds, fractions = divmod(total_time_seconds, 1)
|
||||
fractions_str = f"{fractions:.3f}"[2:]
|
||||
username = garfbot.get_user(int(user_id)).name
|
||||
table_rows.append([username, str(stats["check_ins"]), f"{int(hours)}h {int(minutes)}m {int(seconds)}s {fractions_str}ms"])
|
||||
else:
|
||||
username = garfbot.get_user(int(user_id)).name
|
||||
table_rows.append([username, "Currently checked in", "-"])
|
||||
table_columns = list(zip(*table_rows[1:]))
|
||||
table_fields = table_rows[0]
|
||||
for field, values in zip(table_fields, table_columns):
|
||||
stats_embed.add_field(name=field, value="\n".join(values), inline=True)
|
||||
await message.channel.send(embed=stats_embed)
|
||||
|
||||
|
||||
# GarfGifs
|
||||
@garfbot.event
|
||||
async def send_gif(message, search_term):
|
||||
lmt = 50
|
||||
ckey = "garfbot"
|
||||
r = requests.get(f"https://tenor.googleapis.com/v2/search?q={search_term}&key={gapikey}&client_key={ckey}&limit={lmt}")
|
||||
if r.status_code == 200:
|
||||
top_50gifs = json.loads(r.content)
|
||||
gif_url = random.choice(top_50gifs["results"])["itemurl"]
|
||||
logger.info(gif_url)
|
||||
# print(gif_url)
|
||||
try:
|
||||
await message.channel.send(gif_url)
|
||||
except KeyError:
|
||||
await message.channel.send("Oops, something went wrong.")
|
||||
else:
|
||||
await message.channel.send(f"`Oops, something went wrong. Error code: {r.status_code}`")
|
||||
|
||||
|
||||
# discord.py Error Handling
|
||||
@garfbot.event
|
||||
async def on_error(event, *args, **kwargs):
|
||||
logger.error(f'GarfBot Error: {event}')
|
||||
|
||||
|
||||
# Run GarfBot!
|
||||
async def garfbot_connect():
|
||||
while True:
|
||||
try:
|
||||
await garfbot.start(garfkey)
|
||||
except Exception as e:
|
||||
e = str(e)
|
||||
logger.error(f"Garfbot couldn't connect! {e}")
|
||||
await asyncio.sleep(60)
|
||||
|
||||
if __name__ == "__main__":
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(garfbot_connect())
|
169
garfmain.py
Normal file
169
garfmain.py
Normal file
@ -0,0 +1,169 @@
|
||||
import config
|
||||
# import random
|
||||
import asyncio
|
||||
import discord
|
||||
import subprocess
|
||||
|
||||
from garfpy import(
|
||||
logger, is_private,
|
||||
kroger_token, find_store, search_product,
|
||||
picture_time, process_image_requests, generate_chat,
|
||||
aod_message)
|
||||
|
||||
|
||||
gapikey = config.GIF_TOKEN
|
||||
garfkey = config.GARFBOT_TOKEN
|
||||
txtmodel = config.TXT_MODEL
|
||||
imgmodel = config.IMG_MODEL
|
||||
|
||||
intents = discord.Intents.default()
|
||||
intents.members = True
|
||||
intents.messages = True
|
||||
intents.message_content = True
|
||||
garfbot = discord.Client(intents=intents)
|
||||
|
||||
|
||||
@garfbot.event
|
||||
async def on_ready():
|
||||
try:
|
||||
asyncio.create_task(process_image_requests())
|
||||
logger.info(f"Logged in as {garfbot.user.name} running {txtmodel} and {imgmodel}.")
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
|
||||
|
||||
@garfbot.event
|
||||
async def on_message(message):
|
||||
if message.author == garfbot.user:
|
||||
return
|
||||
|
||||
if message.content.lower().startswith("hey garfield") or isinstance(message.channel, discord.DMChannel):
|
||||
question = message.content[12:] if message.content.lower().startswith("hey garfield") else message.content
|
||||
answer = await generate_chat(question)
|
||||
await message.channel.send(answer)
|
||||
|
||||
if message.content.lower().startswith('garfpic '):
|
||||
user = message.author.name
|
||||
server = message.guild.name if message.guild else "Direct Message"
|
||||
prompt = message.content[8:]
|
||||
logger.info(f"Image Request - User: {user}, Server: {server}, Prompt: {prompt}")
|
||||
await message.channel.send(f"`Please wait... image generation queued: {prompt}`")
|
||||
await picture_time(message, prompt)
|
||||
|
||||
# if message.content.lower() == "lasagna":
|
||||
# await send_gif(message, "garfield lasagna")
|
||||
|
||||
# if message.content.lower() == "monday":
|
||||
# await send_gif(message, "garfield monday")
|
||||
|
||||
# if message.content.lower().startswith("garfgif "):
|
||||
# search_term = message.content[8:]
|
||||
# await send_gif(message, search_term)
|
||||
|
||||
if message.content.lower().startswith("garfping "):
|
||||
try:
|
||||
query = message.content.split()
|
||||
user = message.author.name
|
||||
server = message.guild.name if message.guild else "Direct Message"
|
||||
target = query[-1]
|
||||
logger.info(f"Ping Request - User: {user}, Server: {server}, Target: {target}")
|
||||
if is_private(target):
|
||||
rejection = await generate_chat("Hey Garfield, explain to me why I am dumb for trying to hack your private computer network.")
|
||||
await message.channel.send(rejection)
|
||||
else:
|
||||
result = subprocess.run(['ping', '-c', '4', target], capture_output=True, text=True)
|
||||
await message.channel.send(f"`Ping result for {target}:`\n```\n{result.stdout}\n```")
|
||||
except Exception as e:
|
||||
await message.channel.send(f"`GarfBot Error: {str(e)}`")
|
||||
|
||||
if message.content.lower().startswith("garfdns "):
|
||||
try:
|
||||
query = message.content.split()
|
||||
user = message.author.name
|
||||
server = message.guild.name if message.guild else "Direct Message"
|
||||
target = query[-1]
|
||||
logger.info(f"NSLookup Request - User: {user}, Server: {server}, Target: {target}")
|
||||
if is_private(target):
|
||||
rejection = await generate_chat("Hey Garfield, explain to me why I am dumb for trying to hack your private computer network.")
|
||||
await message.channel.send(rejection)
|
||||
else:
|
||||
result = subprocess.run(['nslookup', target], capture_output=True, text=True)
|
||||
await message.channel.send(f"`NSLookup result for {target}:`\n```\n{result.stdout}\n```")
|
||||
except Exception as e:
|
||||
await message.channel.send(f"`GarfBot Error: {str(e)}`")
|
||||
|
||||
if message.content.lower().startswith("garfhack "):
|
||||
try:
|
||||
query = message.content.split()
|
||||
user = message.author.name
|
||||
server = message.guild.name if message.guild else "Direct Message"
|
||||
target = query[-1]
|
||||
logger.info(f"Nmap Request - User: {user}, Server: {server}, Target: {target}")
|
||||
if is_private(target):
|
||||
rejection = await generate_chat("Hey Garfield, explain to me why I am dumb for trying to hack your private computer network.")
|
||||
await message.channel.send(rejection)
|
||||
else:
|
||||
await message.channel.send(f"`Scanning {target}...`")
|
||||
result = subprocess.run(['nmap', '-Pn', '-O', '-v', target], capture_output=True, text=True)
|
||||
await message.channel.send(f"`Ping result for {target}:`\n```\n{result.stdout}\n```")
|
||||
except Exception as e:
|
||||
await message.channel.send(f"`GarfBot Error: {str(e)}`")
|
||||
|
||||
# Kroger Shopping
|
||||
if message.content.lower().startswith("garfshop "):
|
||||
try:
|
||||
kroken = kroger_token()
|
||||
kroger_query = message.content.split()
|
||||
product = " ".join(kroger_query[1:-1])
|
||||
zipcode = kroger_query[-1]
|
||||
loc_data = find_store(zipcode, kroken)
|
||||
loc_id = loc_data['data'][0]['locationId']
|
||||
store_name = loc_data['data'][0]['name']
|
||||
product_query = search_product(product, loc_id, kroken)
|
||||
products = product_query['data']
|
||||
sorted_products = sorted(products, key=lambda item: item['items'][0]['price']['regular'])
|
||||
response = f"Prices for `{product}` at `{store_name}` near `{zipcode}`:\n"
|
||||
for item in sorted_products:
|
||||
product_name = item['description']
|
||||
price = item['items'][0]['price']['regular']
|
||||
response += f"- `${price}`: {product_name} \n"
|
||||
await message.channel.send(response)
|
||||
except Exception as e:
|
||||
await message.channel.send(f"`GarfBot Error: {str(e)}`")
|
||||
|
||||
# Army of Dawn Server only!!
|
||||
if message.guild and message.guild.id == 719605634772893757:
|
||||
|
||||
await aod_message(garfbot, message)
|
||||
|
||||
|
||||
# # GarfGifs
|
||||
# @garfbot.event
|
||||
# async def send_gif(message, search_term):
|
||||
# lmt = 50
|
||||
# ckey = "garfbot"
|
||||
# r = requests.get(f"https://tenor.googleapis.com/v2/search?q={search_term}&key={gapikey}&client_key={ckey}&limit={lmt}")
|
||||
# if r.status_code == 200:
|
||||
# top_50gifs = json.loads(r.content)
|
||||
# gif_url = random.choice(top_50gifs["results"])["itemurl"]
|
||||
# logger.info(gif_url)
|
||||
# # logger.info(gif_url)
|
||||
# try:
|
||||
# await message.channel.send(gif_url)
|
||||
# except KeyError:
|
||||
# await message.channel.send("Oops, something went wrong.")
|
||||
# else:
|
||||
# await message.channel.send(f"`Oops, something went wrong. Error code: {r.status_code}`")
|
||||
|
||||
|
||||
async def garfbot_connect():
|
||||
while True:
|
||||
try:
|
||||
await garfbot.start(garfkey)
|
||||
except Exception as e:
|
||||
e = str(e)
|
||||
logger.error(f"Garfbot couldn't connect! {e}")
|
||||
await asyncio.sleep(60)
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(garfbot_connect())
|
12
garfpy/__init__.py
Normal file
12
garfpy/__init__.py
Normal file
@ -0,0 +1,12 @@
|
||||
# garfpy/__init__.py
|
||||
from .log import logger
|
||||
from .kroger import(
|
||||
kroger_token, find_store, search_product
|
||||
)
|
||||
from .garfai import(
|
||||
picture_time,
|
||||
process_image_requests,
|
||||
generate_chat
|
||||
)
|
||||
from .iputils import is_private
|
||||
from .aod import aod_message
|
94
garfpy/aod.py
Normal file
94
garfpy/aod.py
Normal file
@ -0,0 +1,94 @@
|
||||
import os
|
||||
import json
|
||||
import discord
|
||||
from garfpy import logger
|
||||
from datetime import datetime
|
||||
from operator import itemgetter
|
||||
from collections import defaultdict
|
||||
|
||||
|
||||
# Meows Json Handling
|
||||
meows_file = "meow_counts.json"
|
||||
stats_file = "user_stats.json"
|
||||
|
||||
def json_load(file_path, default):
|
||||
if os.path.isfile(file_path):
|
||||
with open(file_path, "r") as f:
|
||||
return json.load(f)
|
||||
else:
|
||||
return default
|
||||
|
||||
meow_counts = defaultdict(int, json_load(meows_file, {}))
|
||||
user_stats = json_load(stats_file, {})
|
||||
|
||||
async def aod_message(garfbot, message):
|
||||
if "meow" in message.content.lower():
|
||||
logger.info(f"Meow detected! {message.author.name} said: {message.content}")
|
||||
|
||||
meow_counts[str(message.author.id)] += 1
|
||||
|
||||
with open(meows_file, "w") as f:
|
||||
json.dump(dict(meow_counts), f)
|
||||
|
||||
if message.content.lower() == "meowcount":
|
||||
response = f"My records show that <@{message.author.id}> has meowed {meow_counts[str(message.author.id)]} time(s). Have a nice day."
|
||||
await message.channel.send(response)
|
||||
|
||||
if message.content.lower() == "top meowers":
|
||||
top_meowers = sorted(meow_counts.items(), key=itemgetter(1), reverse=True)[:10]
|
||||
embed = discord.Embed(title="Top Meowers :cat:", color=0x000000)
|
||||
for i, (user_id, meow_count) in enumerate(top_meowers):
|
||||
user = await garfbot.fetch_user(int(user_id))
|
||||
embed.add_field(name=f"{i+1}. {user.name}", value=f"{meow_count} meows", inline=False)
|
||||
await message.channel.send(embed=embed)
|
||||
|
||||
if message.content.lower() == "checking in":
|
||||
user_id = str(message.author.id)
|
||||
if user_id in user_stats and user_stats[user_id]["check_in_time"] is not None:
|
||||
await message.channel.send(f"{message.author.mention} You are already checked in. Please check out first.")
|
||||
return
|
||||
|
||||
check_in_time = datetime.now().timestamp()
|
||||
if user_id not in user_stats:
|
||||
user_stats[user_id] = {"check_ins": 0, "total_time": 0, "check_in_time": None}
|
||||
user_stats[user_id]["check_in_time"] = check_in_time
|
||||
await message.channel.send(f"{message.author.mention} You have been checked in. Please mute your microphone.")
|
||||
|
||||
elif message.content.lower() == "checking out":
|
||||
user_id = str(message.author.id)
|
||||
if user_id not in user_stats or user_stats[user_id]["check_in_time"] is None:
|
||||
await message.channel.send(f"{message.author.mention} You have not checked in yet. Please check in first.")
|
||||
return
|
||||
|
||||
check_out_time = datetime.now().timestamp()
|
||||
check_in_time = user_stats[user_id]["check_in_time"]
|
||||
time_delta = check_out_time - check_in_time
|
||||
user_stats[user_id]["check_ins"] += 1
|
||||
user_stats[user_id]["total_time"] += time_delta
|
||||
user_stats[user_id]["check_in_time"] = None
|
||||
|
||||
with open("user_stats.json", "w") as f:
|
||||
json.dump(user_stats, f)
|
||||
await message.channel.send(f"{message.author.mention} You have been checked out. Your session was {time_delta:.2f} seconds.")
|
||||
|
||||
elif message.content.lower() == "stats":
|
||||
stats_embed = discord.Embed(title="User stats :trophy:", color=0x000000)
|
||||
sorted_user_stats = sorted(user_stats.items(), key=lambda x: x[1]["total_time"], reverse=True)
|
||||
table_rows = [["Name", "Check-ins", "Total Time"]]
|
||||
for user_id, stats in sorted_user_stats:
|
||||
if stats["check_in_time"] is None:
|
||||
total_time_seconds = stats["total_time"]
|
||||
hours, total_time_seconds = divmod(total_time_seconds, 3600)
|
||||
minutes, total_time_seconds = divmod(total_time_seconds, 60)
|
||||
seconds, fractions = divmod(total_time_seconds, 1)
|
||||
fractions_str = f"{fractions:.3f}"[2:]
|
||||
username = garfbot.get_user(int(user_id)).name
|
||||
table_rows.append([username, str(stats["check_ins"]), f"{int(hours)}h {int(minutes)}m {int(seconds)}s {fractions_str}ms"])
|
||||
else:
|
||||
username = garfbot.get_user(int(user_id)).name
|
||||
table_rows.append([username, "Currently checked in", "-"])
|
||||
table_columns = list(zip(*table_rows[1:]))
|
||||
table_fields = table_rows[0]
|
||||
for field, values in zip(table_fields, table_columns):
|
||||
stats_embed.add_field(name=field, value="\n".join(values), inline=True)
|
||||
await message.channel.send(embed=stats_embed)
|
90
garfpy/garfai.py
Normal file
90
garfpy/garfai.py
Normal file
@ -0,0 +1,90 @@
|
||||
import io
|
||||
import openai
|
||||
import config
|
||||
import aiohttp
|
||||
import asyncio
|
||||
import discord
|
||||
from openai import AsyncOpenAI
|
||||
from garfpy import logger
|
||||
|
||||
openaikey = config.OPENAI_TOKEN
|
||||
|
||||
txtmodel = config.TXT_MODEL
|
||||
imgmodel = config.IMG_MODEL
|
||||
|
||||
# GarfPics
|
||||
image_request_queue = asyncio.Queue()
|
||||
|
||||
async def picture_time(message, prompt):
|
||||
await image_request_queue.put({'message': message, 'prompt': prompt})
|
||||
|
||||
async def generate_image(prompt):
|
||||
try:
|
||||
client = AsyncOpenAI(api_key = openaikey)
|
||||
response = await client.images.generate(
|
||||
model=imgmodel,
|
||||
prompt=prompt,
|
||||
n=1,
|
||||
size="1024x1024"
|
||||
)
|
||||
image_url = response.data[0].url
|
||||
return image_url
|
||||
except openai.BadRequestError as e:
|
||||
return f"`GarfBot Error: ({e.status_code}) - Your request was rejected as a result of our safety system.`"
|
||||
except openai.InternalServerError as e:
|
||||
logger.error(e)
|
||||
return f"`GarfBot Error: ({e.status_code}) - Monday`"
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
return f"`GarfBot Error: Lasagna`"
|
||||
|
||||
async def process_image_requests():
|
||||
async with aiohttp.ClientSession() as session:
|
||||
while True:
|
||||
request = await image_request_queue.get()
|
||||
message = request['message']
|
||||
prompt = request['prompt']
|
||||
image_url = await generate_image(prompt)
|
||||
if "GarfBot Error" not in image_url:
|
||||
logger.info("Downloading & sending image...")
|
||||
async with session.get(image_url) as resp:
|
||||
if resp.status == 200:
|
||||
image_data = await resp.read()
|
||||
ram_image = io.BytesIO(image_data)
|
||||
ram_image.seek(0)
|
||||
timestamp = message.created_at.strftime('%Y%m%d%H%M%S')
|
||||
filename = f"{timestamp}_generated_image.png"
|
||||
sendfile = discord.File(fp=ram_image, filename=filename)
|
||||
try:
|
||||
await message.channel.send(file=sendfile)
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
else:
|
||||
await message.channel.send("`GarfBot Error: Odie`")
|
||||
else:
|
||||
await message.channel.send(image_url)
|
||||
image_request_queue.task_done()
|
||||
await asyncio.sleep(2)
|
||||
|
||||
# GarfChats
|
||||
async def generate_chat(question):
|
||||
try:
|
||||
client = AsyncOpenAI(api_key = openaikey)
|
||||
response = await client.chat.completions.create(
|
||||
model=txtmodel,
|
||||
messages=[
|
||||
{"role": "system", "content": "Pretend you are sarcastic Garfield."},
|
||||
{"role": "user", "content": f"{question}"}
|
||||
],
|
||||
max_tokens=400
|
||||
)
|
||||
answer = response.choices[0].message.content
|
||||
return answer.replace("an AI language model", "a cartoon animal")
|
||||
except openai.BadRequestError as e:
|
||||
return f"`GarfBot Error: {e}`"
|
||||
except openai.APIError as e:
|
||||
logger.info(e, flush=True)
|
||||
return f"`GarfBot Error: Monday`"
|
||||
except Exception as e:
|
||||
logger.info(e, flush=True)
|
||||
return f"`GarfBot Error: Lasagna`"
|
19
garfpy/iputils.py
Normal file
19
garfpy/iputils.py
Normal file
@ -0,0 +1,19 @@
|
||||
import ipaddress
|
||||
|
||||
def is_private(target):
|
||||
try:
|
||||
ip_obj = ipaddress.ip_address(target)
|
||||
if ip_obj.is_private:
|
||||
return True
|
||||
except ValueError:
|
||||
if "crate.lan" in target.lower():
|
||||
return True
|
||||
if "crate.zip" in target.lower():
|
||||
return True
|
||||
if "memtec.org" in target.lower():
|
||||
return True
|
||||
if "crateit.net" in target.lower():
|
||||
return True
|
||||
if "garfbot.art" in target.lower():
|
||||
return True
|
||||
return False
|
48
garfpy/kroger.py
Normal file
48
garfpy/kroger.py
Normal file
@ -0,0 +1,48 @@
|
||||
import config
|
||||
import requests
|
||||
from base64 import b64encode
|
||||
from garfpy import logger
|
||||
|
||||
|
||||
client_id = config.CLIENT_ID
|
||||
client_secret = config.CLIENT_SECRET
|
||||
|
||||
auth = b64encode(f"{client_id}:{client_secret}".encode()).decode()
|
||||
|
||||
def kroger_token():
|
||||
headers = {
|
||||
'Content-Type': 'application/x-www-form-urlencoded',
|
||||
'Authorization': f'Basic {auth}'
|
||||
}
|
||||
|
||||
response = requests.post('https://api.kroger.com/v1/connect/oauth2/token', headers=headers, data={
|
||||
'grant_type': 'client_credentials',
|
||||
'scope': 'product.compact'
|
||||
})
|
||||
|
||||
response.raise_for_status()
|
||||
return response.json()['access_token']
|
||||
|
||||
def find_store(zipcode, kroken):
|
||||
headers = {
|
||||
'Authorization': f'Bearer {kroken}',
|
||||
}
|
||||
params = {
|
||||
'filter.zipCode.near': zipcode,
|
||||
'filter.limit': 1,
|
||||
}
|
||||
response = requests.get('https://api.kroger.com/v1/locations', headers=headers, params=params)
|
||||
return response.json()
|
||||
|
||||
def search_product(product, loc_id, kroken):
|
||||
logger.info(f"Searching for {product}...")
|
||||
headers = {
|
||||
'Authorization': f'Bearer {kroken}',
|
||||
}
|
||||
params = {
|
||||
'filter.term': product,
|
||||
'filter.locationId': loc_id,
|
||||
'filter.limit': 10
|
||||
}
|
||||
response = requests.get('https://api.kroger.com/v1/products', headers=headers, params=params)
|
||||
return response.json()
|
25
garfpy/log.py
Normal file
25
garfpy/log.py
Normal file
@ -0,0 +1,25 @@
|
||||
import logging
|
||||
from logging.handlers import TimedRotatingFileHandler
|
||||
|
||||
logger = logging.getLogger('garflog')
|
||||
logger.setLevel(logging.INFO)
|
||||
formatter=logging.Formatter(
|
||||
'%(asctime)s [%(levelname)s] %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
file_handler = TimedRotatingFileHandler(
|
||||
'garfbot.log',
|
||||
when='midnight',
|
||||
interval=1,
|
||||
backupCount=7,
|
||||
delay=True # Counter-intuitively, this will flush output immediately
|
||||
)
|
||||
file_handler.setFormatter(formatter)
|
||||
console_handler = logging.StreamHandler()
|
||||
console_handler.setFormatter(formatter)
|
||||
logger.addHandler(file_handler)
|
||||
logger.addHandler(console_handler)
|
||||
|
||||
if not logger.hasHandlers():
|
||||
logger.addHandler(file_handler)
|
||||
logger.addHandler(console_handler)
|
Loading…
Reference in New Issue
Block a user