Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 7b09d1a348 | |||
| 01e5208cba | |||
| 9d97b10c20 | |||
| 8739f889d3 | |||
| 0e368ade2f | |||
| 9acb6b04eb | |||
| 2a33bc70b7 | |||
| a775a55291 | |||
| c365cd1ff2 |
+2
-1
@@ -1,8 +1,9 @@
|
||||
FROM python:3.13.12-slim-trixie
|
||||
FROM python:3.13.13-slim-trixie
|
||||
|
||||
ENV PATH=/usr/local/bin:$PATH
|
||||
WORKDIR /app
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends ffmpeg && rm -rf /var/lib/apt/lists/*
|
||||
COPY . .
|
||||
CMD ["python", "-u", "main.py"]
|
||||
@@ -10,13 +10,16 @@ import os
|
||||
import re
|
||||
import logging
|
||||
import random
|
||||
import yt_player
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
# this is a bot which posts the latest image post from ich_iel
|
||||
# the code probably sucks, but it works, so I don't care
|
||||
|
||||
load_dotenv()
|
||||
prefix = os.getenv("COMMAND_PREFIX", "/")
|
||||
bot = fluxer.Bot(command_prefix=prefix, intents=fluxer.Intents.GUILD_MESSAGES | fluxer.Intents.GUILDS | fluxer.Intents.MESSAGE_CONTENT)
|
||||
bot = fluxer.Bot(command_prefix=prefix, intents=fluxer.Intents.GUILD_MESSAGES | fluxer.Intents.GUILDS | fluxer.Intents.MESSAGE_CONTENT | fluxer.Intents.GUILD_VOICE_STATES)
|
||||
yt_player.setup(bot)
|
||||
|
||||
task = None
|
||||
|
||||
@@ -51,32 +54,30 @@ async def post_reddit_periodically():
|
||||
|
||||
async def get_latest_post(subreddit):
|
||||
post_limit = os.getenv("POST_LIMIT", 20)
|
||||
url = f"https://www.reddit.com/r/{subreddit}/hot.json?limit={post_limit}"
|
||||
headers = {"User-Agent": "Mozilla/5.0 (compatible; ich_iel-Bot/0.3)"}
|
||||
url = f"https://www.reddit.com/r/{subreddit}/hot.rss?limit={post_limit}"
|
||||
headers = {"User-Agent": "Mozilla/5.0 (compatible; ich_iel-Bot/0.6.1)"}
|
||||
response = requests.get(url, headers=headers)
|
||||
if response.status_code == 200:
|
||||
if response.headers.get("Content-Type", "").startswith("application/json"):
|
||||
try:
|
||||
data = response.json()
|
||||
logging.debug(f"Fetched data from Reddit: {data}")
|
||||
posts = []
|
||||
if data["data"]["children"]:
|
||||
for child in data["data"]["children"]:
|
||||
post = child["data"]
|
||||
if post.get("post_hint") == "image" and post.get("url", "").endswith((".jpg", ".png", ".jpeg", ".gif")):
|
||||
logging.debug(f"Found image post: {post['title']} - {post['url']}")
|
||||
posts.append((post["title"], post["url"]))
|
||||
return posts
|
||||
except (KeyError, json.JSONDecodeError):
|
||||
logging.error(f"Error parsing Reddit response: {response.text}")
|
||||
return []
|
||||
else:
|
||||
logging.warning(f"Unexpected content type from Reddit: {response.headers.get('Content-Type')}")
|
||||
logging.warning(f"Response content: {response.text}")
|
||||
return []
|
||||
else:
|
||||
logging.error(f"Failed to fetch Reddit data (maybe a block?): {response.status_code}")
|
||||
if response.status_code != 200:
|
||||
logging.error(f"Failed to fetch RSS feed: {response.status_code}")
|
||||
return []
|
||||
posts = []
|
||||
try:
|
||||
root = ET.fromstring(response.text)
|
||||
ns = {"atom": "http://www.w3.org/2005/Atom"}
|
||||
for entry in root.findall("atom:entry", ns):
|
||||
post_title = entry.find("atom:title", ns)
|
||||
content = entry.find("atom:content", ns)
|
||||
if post_title is None or content is None or content.text is None:
|
||||
continue
|
||||
link_match = re.search(r'<a href="([^"]+)">\[link\]</a>', content.text)
|
||||
if link_match and link_match.group(1).lower().endswith(('.jpg', '.jpeg', '.png', '.gif')):
|
||||
logging.debug(f"Found image post: {post_title.text} - {link_match.group(1)}")
|
||||
posts.append((post_title.text, link_match.group(1)))
|
||||
except ET.ParseError as e:
|
||||
logging.error(f"Error parsing RSS feed: {e}")
|
||||
return []
|
||||
|
||||
return posts
|
||||
|
||||
async def init_db():
|
||||
try:
|
||||
@@ -120,7 +121,7 @@ async def setChannel(message):
|
||||
|
||||
@bot.command()
|
||||
async def version(message):
|
||||
await message.channel.send("Version 0.5.1 is running\nSource code: https://github.com/michelleDeko/ich_iel-bot")
|
||||
await message.channel.send("Version 0.6.1 is running\nSource code: https://github.com/michelleDeko/ich_iel-bot")
|
||||
|
||||
# the cat bot died, so i wanted to add this command to this bot
|
||||
@bot.command()
|
||||
@@ -175,6 +176,28 @@ async def racoon(message):
|
||||
return
|
||||
await message.channel.send(file=fluxer.File(image_bytes, filename="racoon.jpg"))
|
||||
|
||||
# i just watched beastars and i realised that this bot needs a bnuy command
|
||||
# maybe I will switch the API later since this one seems to only have gifs
|
||||
@bot.command()
|
||||
async def bnuy(message):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get("https://api.bunnies.io/v2/loop/random/?media=gif,png") as response:
|
||||
data = await response.json()
|
||||
if response.status != 200 or not data or "media" not in data:
|
||||
await message.channel.send("Failed to fetch image of a bunny")
|
||||
return
|
||||
image_url = data["media"].get("gif")
|
||||
if not image_url:
|
||||
await message.channel.send("Failed to fetch image of a bunny")
|
||||
return
|
||||
file_ext = "gif" if image_url.lower().endswith(".gif") else "png"
|
||||
async with session.get(image_url) as image_response:
|
||||
if image_response.status != 200:
|
||||
await message.channel.send("Failed to fetch image of a bunny")
|
||||
return
|
||||
image_bytes = await image_response.read()
|
||||
await message.channel.send(file=fluxer.File(image_bytes, filename=f"bnuy.{file_ext}"))
|
||||
|
||||
async def post_reddit():
|
||||
subreddit = os.getenv("SUBREDDIT", "ich_iel")
|
||||
posts = await get_latest_post(subreddit)
|
||||
|
||||
+2
-1
@@ -1,5 +1,6 @@
|
||||
fluxer.py
|
||||
fluxer.py[voice]
|
||||
requests
|
||||
asyncio
|
||||
dotenv
|
||||
aiohttp
|
||||
yt-dlp
|
||||
@@ -0,0 +1,43 @@
|
||||
# at this point this bot isn't just a reddit bot anymore, maybe i should start renaming it lol
|
||||
import yt_dlp
|
||||
import logging
|
||||
import os
|
||||
|
||||
AUDIO_DIR = "data/audio"
|
||||
|
||||
_bot = None
|
||||
|
||||
def setup(bot):
|
||||
global _bot
|
||||
_bot = bot
|
||||
os.makedirs(AUDIO_DIR, exist_ok=True)
|
||||
bot.command()(play)
|
||||
|
||||
async def play(ctx, *, url: str):
|
||||
guild_id = ctx._guild.id
|
||||
voice_state = _bot.get_voice_state(guild_id, ctx.author.id)
|
||||
|
||||
if voice_state is None or voice_state.channel_id is None:
|
||||
await ctx.reply("You're not in a voice channel!")
|
||||
return
|
||||
channel = await _bot.fetch_channel(str(voice_state.channel_id))
|
||||
logging.info(f"Playing {url}")
|
||||
ydl_opts = {
|
||||
'format': 'm4a/bestaudio/best',
|
||||
'outtmpl': f'{AUDIO_DIR}/%(id)s.%(ext)s',
|
||||
'postprocessors': [{ # Extract audio using ffmpeg
|
||||
'key': 'FFmpegExtractAudio',
|
||||
'preferredcodec': 'm4a',
|
||||
}]
|
||||
}
|
||||
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
info = ydl.extract_info(url, download=True)
|
||||
filename = ydl.prepare_filename(info).rsplit('.', 1)[0] + '.m4a'
|
||||
title = info.get('title', 'Unknown Title')
|
||||
logging.info(f"Downloaded to {filename}")
|
||||
|
||||
await ctx.reply(f"Playing {title} in {channel.mention}")
|
||||
|
||||
async with await channel.connect(_bot) as vc:
|
||||
await vc.play_file(filename)
|
||||
Reference in New Issue
Block a user