HSMensaW/HSMensaW_botA.py

510 lines
20 KiB
Python

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# since 3.5: type hints
# def <name>(<args) -> <returntype>:
import asyncio
# write concurrent code (not threading. not multiprocessing)
# it's cooperative multitasking, no parallelism
# coroutines: suspend execution before return and pass control to another coroutine
# use await only in async functions
import datetime
import json
import signal
import sys
import traceback
import urllib.request
import xml.etree.ElementTree as ET
from enum import Enum
from re import sub
from time import time
from typing import Any
from urllib.error import HTTPError
import telepot
from babel.dates import format_date
from googletrans import Translator
from telepot.aio import DelegatorBot
from telepot.aio.delegate import per_chat_id, create_open, pave_event_space
from telepot.aio.loop import MessageLoop
from telepot.delegate import include_callback_query_chat_id
from telepot.exception import BotWasBlockedError
from telepot.namedtuple import InlineKeyboardMarkup, InlineKeyboardButton
class Essen:
"""Class which represents a Essen-Object"""
def __init__(self, name, preis, kategorie) -> None:
"""
Initialize a Essen-Object with given values
Parameters:
name (string): name of the meal
preis (float): price of the meal
kategorie (string): category of the meal
Returns:
None
"""
self.name = name
self.preis = preis
self.kategorie = kategorie
return None
def __str__(self) -> str:
"""
Formats the attribute of the Essen-Object to a string
Parameters:
None
Returns:
_ (string): formatted String of attributes
"""
if self.preis > 0:
return str("*%s*: `%s` (%.2f €)" % (self.kategorie, self.name, self.preis))
else:
return str("*%s*: `%s`" % (self.kategorie, self.name))
class Language(Enum):
GERMAN = "de"
ENGLISH = "en"
GERMAN_ENGLISH = "de+en"
config_filename = "config.json"
var_corona_msg_de = "Bitte halten Sie sich an die AHA-Regeln in der Mensa und achten sie auf die "\
"vorgegebenen Markierung und Hinweise. Den aktuellen Corona-Status an der HS Mittweida "\
"können Sie an der [Corona-Ampel](https://www.hs-mittweida.de/) oder im [Corona-Newsticker]"\
"(https://www.hs-mittweida.de/index.php?id=247957) der Hochschule einsehen. Nur *ZUSAMMEN* "\
"bewirken wir etwas und bewätigen die Pandemie. Bleiben Sie gesund."
var_corona_msg_eng = "Please follow the AHA rules in the cafeteria and pay attention to the given markings "\
"and instructions. You can see the current Corona status of HS Mittweida at the [Corona"\
"traffic light](https://www.hs-mittweida.de/) or in the [Corona news ticker]"\
"(https://www.hs-mittweida.de/index.php?id=247957) of the university. Only *TOGETHER* we "\
"make a difference and overcome the pandemic. Stay healthy."
info_str = "*Inoffizieller Mensa-Bot der Hochschule Mittweida*\nDieser Bot versendet jeden Tag um 10 Uhr den aktuellen " \
"Mensa-Speiseplan. Er wird über /start für den aktuellen Chat oder die aktuelle Gruppe gestartet, " \
"/stop beendet ihn wieder. Mit /essen, /mensa und /speiseplan (optional gefolgt von _en_ oder _de_) kann " \
"der aktuelle Speiseplan manuell abgerufen werden. Mit /settings kann (von Gruppenadmins) die Sprache " \
"verändert werden.\n\n_Haftungsausschluss: Dieser Bot steht in keiner Verbindung mit der Hochschule " \
"Mittweida oder dem Studentenwerk Freiberg. Alle Angaben ohne Gewähr._\n\nGrafik bereitgestellt von [" \
"vecteezy.com](https://de.vecteezy.com) "
status = ""
essen = []
essen_eng = []
var = True
datum = None
last_updated = 0.0
botID = -1
class EnumEncode(json.JSONEncoder):
def default(self, obj):
if type(obj) is Language:
return str(obj.name)
return json.JSONEncoder.default(self, obj)
class HSMensaW(telepot.aio.helper.ChatHandler):
async def on_chat_message(self, msg: dict) -> None:
global config, essen, status, var, logging_enabled
content_type, chat_type, chat_id = telepot.glance(msg)
if content_type == "text":
text = str(msg["text"]).lower()
if text.startswith("/start"):
if chat_id not in ids:
ids[chat_id] = Language.GERMAN
config['ids'] = ids
await send_message(bot_obj=bot, chat_id=chat_id, msg="Bot wurde aktiviert")
chat = get_chat_name(msg)
await send_status("Bot aktiviert für Chat %s (ID: %i)" % (chat, chat_id))
write_config()
else:
await send_message(bot_obj=bot, chat_id=chat_id, msg="Bot war bereits aktiviert")
elif text.startswith("/stop"):
if ids.pop(chat_id, None) is not None:
config['ids'] = ids
await send_message(bot_obj=bot, chat_id=chat_id, msg="Bot wurde deaktiviert")
chat = get_chat_name(msg)
await send_status("Bot deaktiviert für Chat %s (ID: %i)" % (chat, chat_id))
write_config()
else:
await send_message(bot_obj=bot, chat_id=chat_id, msg="Bot war nicht aktiv")
elif text.startswith("/essen") or text.startswith("/mensa") or text.startswith("/speiseplan"):
chat = get_chat_name(msg)
await send_status("Essen angefordert für Chat %s (ID: %i)" % (chat, chat_id))
await get_essen(False)
if len(essen) == 0:
if var:
await send_message(bot_obj=bot, chat_id=chat_id,
msg="Es ist ein Fehler aufgetreten. Bitte später erneut versuchen.")
else:
await send_message(bot_obj=bot, chat_id=chat_id,
msg="Für heute ist leider kein Speiseplan verfügbar.")
else:
if text.endswith(" en"):
language = Language.ENGLISH
elif text.endswith(" de"):
language = Language.GERMAN
else:
if chat_id in ids.keys():
language = ids[chat_id]
else:
language = Language.GERMAN
await send_essen(chat_id, language)
await send_status("Essen versendet für Chat %s (ID: %i)" % (chat, chat_id))
elif text.startswith("/help"):
await send_message(bot_obj=bot, chat_id=chat_id, msg=info_str, parse_mode="markdown")
elif text.startswith("/settings"):
if chat_id in ids:
callback_chat_id = "::" + str(chat_id)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Deutsch", callback_data=Language.GERMAN.value + callback_chat_id)],
[InlineKeyboardButton(text="English", callback_data=Language.ENGLISH.value + callback_chat_id)],
[InlineKeyboardButton(text="Deutsch + English",
callback_data=Language.GERMAN_ENGLISH.value + callback_chat_id)],
])
await send_message(bot_obj=bot, chat_id=chat_id, msg="Bitte Sprache(n) auswählen/Please select "
"language(s)", reply_markup=keyboard)
else:
await send_message(bot_obj=bot, chat_id=chat_id,
msg="Bitte starten Sie zuerst den Bot/Please start the bot first.\n/start")
elif text.startswith("/status") and chat_id in config_ids:
await send_message(bot_obj=bot, chat_id=chat_id, msg=status, parse_mode="markdown")
elif text.startswith("/logging") and chat_id in config_ids:
if text.endswith("on") or text.endswith("1") or text.lower().endswith("true"):
logging_enabled = True
config["logging_enabled"] = True
write_config()
await send_message(bot_obj=bot, chat_id=chat_id, msg="logging enabled", parse_mode="markdown")
elif text.endswith("off") or text.endswith("0") or text.lower().endswith("false"):
logging_enabled = False
config["logging_enabled"] = False
write_config()
await send_message(bot_obj=bot, chat_id=chat_id, msg="logging disabled", parse_mode="markdown")
else:
message = "logging enabled: %s (change with on/off or true/false or 1/0)" % logging_enabled
await send_message(bot_obj=bot, chat_id=chat_id, msg=message, parse_mode="markdown")
elif content_type == "new_chat_member":
if msg["new_chat_participant"]["id"] == get_bot_id():
await send_message(bot_obj=bot, chat_id=chat_id, msg=info_str, parse_mode="markdown")
elif content_type == "left_chat_member":
if msg["left_chat_participant"]["id"] == get_bot_id():
if ids.pop(chat_id, None) is not None:
config['ids'] = ids
write_config()
async def on_callback_query(self, msg):
global config, ids
query_id, from_id, query_data = telepot.glance(msg, flavor='callback_query')
lang = query_data[:query_data.index("::")]
chat_id = int(query_data[query_data.index("::") + 2:])
chat_member = await bot.getChatMember(chat_id=chat_id, user_id=from_id)
member_status = chat_member["status"]
if msg["message"]["chat"]["type"] != "private" and member_status != "creator" and \
member_status != "administrator":
await bot.answerCallbackQuery(callback_query_id=query_id, text="Keine Berechtigung. Bitten Sie einen "
"Admin, die Sprache zu ändern.")
return
name = ""
user_id = msg["from"]["id"]
if "first_name" in msg["from"].keys():
name = msg["from"]["first_name"] + " "
if "last_name" in msg["from"].keys():
name += msg["from"]["last_name"]
if len(name) == 0:
name = "User " + user_id
msg = "[" + name + "](tg://user?id=" + str(user_id) + ") hat die Sprache geändert nach "
message = "Fehler beim Setzen der Sprache"
if lang == Language.GERMAN.value:
message = msg + "_Deutsch_."
elif lang == Language.ENGLISH.value:
message = msg + "_English_."
elif lang == Language.GERMAN_ENGLISH.value:
message = msg + "_Deutsch + Englisch_."
if chat_id in ids:
ids[chat_id] = Language(lang)
config['ids'] = ids
write_config()
else:
message = "Fehler: Bot nicht aktiviert für diesen Chat!"
await send_message(bot_obj=bot, chat_id=chat_id, msg=message, parse_mode="markdown")
async def send_essen(chat_id: int, sprache: Language = Language.GERMAN) -> None:
global datum, essen, essen_eng
if sprache == Language.GERMAN or sprache == Language.GERMAN_ENGLISH:
nachricht = "Speiseplan am %s:\n" % format_date(datum, format="full", locale="de_DE")
for i in essen:
nachricht += "- " + str(i).replace(".", ",") + "\n\n"
nachricht += var_corona_msg_de
await send_message(bot_obj=bot, chat_id=chat_id, msg=nachricht, parse_mode="markdown")
if sprache == Language.ENGLISH or sprache == Language.GERMAN_ENGLISH:
nachricht = "Menu on %s:\n" % format_date(datum, format="full", locale="en")
for i in essen_eng:
nachricht += "- " + str(i) + "\n\n"
nachricht += var_corona_msg_eng
await send_message(bot_obj=bot, chat_id=chat_id, msg=nachricht, parse_mode="markdown")
async def send_status(text: str, ignore_flag: bool = False) -> None:
"""
Sends a status message to all chats specified in config_ids
:param text: Message
:param ignore_flag: Should the logging_enabled flag be ignored (should a message be sent even if it is set to False)?
:return:
"""
global config_ids
message_log.write("[%s] %s\n" % (get_now(), text))
if ignore_flag or logging_enabled:
for chat_id in config_ids:
await send_message(bot_obj=bot, chat_id=chat_id, msg=text)
async def send_message(bot_obj: DelegatorBot, chat_id: int, msg: str, parse_mode: Any = None,
disable_web_page_preview: bool = None, disable_notification: bool = None,
reply_to_message_id: int = None, reply_markup: Any = None) -> None:
"""Sends a message with bot_obj to chat_id. See https://core.telegram.org/bots/api#sendmessage for details."""
try:
await bot_obj.sendMessage(chat_id=chat_id, text=msg, parse_mode=parse_mode,
disable_web_page_preview=disable_web_page_preview,
disable_notification=disable_notification, reply_to_message_id=reply_to_message_id,
reply_markup=reply_markup)
except BotWasBlockedError:
if ids.pop(chat_id, None) is not None:
config['ids'] = ids
await send_status("Bot wurde blockiert für Chat ID %i" % chat_id)
write_config()
def write_config() -> None:
with open(config_filename, 'w') as outfile:
json.dump(config, outfile, indent=4, sort_keys=True, cls=EnumEncode)
async def get_essen(only_today: bool) -> None:
"""
Downloads the plan for the next day and parses it
:param only_today: only current day; no download if the mensa is closed
:return:
"""
global datum, essen, essen_eng, var, last_updated # , ctx
if time() < last_updated + 30 or (len(essen) > 0 and time() < last_updated + 900):
return
last_updated = time()
essen = []
essen_eng = []
try:
# response = urllib.request.urlopen(url, context=ctx)
response = urllib.request.urlopen(url)
except HTTPError:
var = False
return
except Exception as expt:
await send_status("Es ist ein Fehler aufgetreten:\n[%s] %s" % (sys.exc_info()[0], str(expt)), ignore_flag=True)
print("[%s] Fehler:\n%s" % (get_now(), traceback.format_exc()),
file=sys.stderr)
var = True
return
data = response.read()
response.close()
text = data.decode('utf-8')
et = ET.fromstring(text)
dates = et.findall("./menus/day/date")
if datetime.datetime.now().hour > 13:
day = datetime.date.today() + datetime.timedelta(1)
else:
day = datetime.date.today()
if len(dates) == 0:
return
date_dt = None
for i in dates:
date_xml = i.text
date_dt = datetime.date(int(date_xml[:4]), int(date_xml[5:7]), int(date_xml[8:10]))
if date_dt >= day:
break
if date_dt < day or (only_today and date_dt != day):
return
datum = date_dt
menus = et.findall("./menus/day[date='" + datum.isoformat() + "']/menu")
essen_transl = []
preise = []
for i in menus:
kategorie = i.findall("type")[0].text
essen_transl.append(kategorie)
name = sub("\\s*\\([0-9A-Z, ]*\\)", "", i.findall("description")[0].text)
essen_transl.append(name)
try:
preis = float(i.findall("./prices/price[label='Studenten']/value")[0].text.replace(",", "."))
except ValueError:
preis = -1
preise.append(preis)
essen.append(Essen(name, preis, kategorie))
translations = translator.translate(essen_transl)
for i in range(int(len(translations) / 2)):
essen_eng.append(Essen(translations[i * 2 + 1].text, preise[i], translations[i * 2].text))
def get_bot_id() -> int:
global botID
if botID == -1:
api_url = "https://api.telegram.org/bot" + telegram_bot_token + "/getMe"
with urllib.request.urlopen(api_url) as response:
data = json.loads(response.read().decode('utf-8'))
botID = data["result"]["id"]
return botID
def get_chat_name(msg: dict) -> str:
chat = ""
if msg["chat"]["type"] == "group":
chat = msg["chat"]["title"] + " (g)"
elif msg["chat"]["type"] == "private":
if "first_name" in msg["chat"].keys():
chat = msg["chat"]["first_name"] + " "
if "last_name" in msg["chat"].keys():
chat += msg["chat"]["last_name"] + " "
chat += "(p)"
else:
chat = "null"
return chat
def shutdown(signum, frame):
print('Signal handler called with signal', signum)
ml.close()
loop.stop()
loop.close()
message_log.flush()
message_log.close()
sys.stdout.flush()
sys.stdout.close()
sys.stderr.flush()
sys.stderr.close()
sys.exit(-1)
async def essen_loop() -> None:
global status, essen, ids, var
while True:
now = datetime.datetime.today()
next_day = datetime.datetime(now.year, now.month, now.day) + datetime.timedelta(1, 36000)
await send_status("Wartezeit: %i Sekunden" % (next_day - now).seconds)
status = "Warten bis %s" % next_day
await asyncio.sleep((next_day - now).seconds)
await send_status("Aufwachen um 10 Uhr")
status = "Essen abrufen"
await get_essen(True)
# try again if error
if len(essen) == 0 and var:
await send_status("Fehler, warte 30 Sekunden")
await asyncio.sleep(30)
await get_essen(True)
await send_status("%i Essen gefunden" % len(essen))
status = "Essen senden"
if len(essen) > 0:
await send_status("Essen werden gesendet")
for i in ids.keys():
await send_essen(i, ids[i])
await send_status("Abgeschlossen, warte 30 Sekunden")
status = "Warte 30 Sekunden"
await asyncio.sleep(30)
def get_now() -> str:
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def process_json(d):
if isinstance(d, dict):
try:
return {int(k): getattr(Language, v) for k, v in d.items()}
except (ValueError, TypeError, AttributeError):
return d
return d
sys.stdout = open("out.log", "a")
sys.stderr = open("err.log", "a")
message_log = open("msg.log", "a")
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
translator = Translator()
try:
with open(config_filename, 'r') as config_file:
config = json.load(config_file, object_hook=process_json)
except FileNotFoundError as e:
print('Error: config file "{}" not found: {}'.format(config_filename, e))
sys.exit(-1)
except ValueError as e:
print('Error: invalid config file "{}": {}'.format(config_filename, e))
sys.exit(-1)
telegram_bot_token = config.get("telegram_bot_token")
url = config.get("url")
ids = config.get("ids")
config_ids = config.get("config_ids")
logging_enabled = config.get("logging_enabled")
bot = telepot.aio.DelegatorBot(telegram_bot_token, [
include_callback_query_chat_id(
pave_event_space())(
per_chat_id(), create_open, HSMensaW, timeout=10
),
])
loop = asyncio.get_event_loop()
# ml = MessageLoop(bot, handle).run_forever()
ml = MessageLoop(bot).run_forever()
loop.create_task(ml)
loop.create_task(essen_loop())
loop.run_forever()
# ctx = ssl.create_default_context()
# ctx.check_hostname = False
# ctx.verify_mode = ssl.CERT_NONE