HSMensaW/HSMensaW_botA.py

510 lines
20 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/python3
# -*- coding: utf-8 -*-
# since 3.5: type hints
# def <name>(<args) -> <returntype>:
2020-10-15 20:39:02 +02:00
import asyncio
# write concurrent code (not threading. not multiprocessing)
# it's cooperative multitasking, no parallelism
# coroutines: suspend execution before return and pass control to another coroutine
# use await only in async functions
import datetime
import json
import signal
import sys
import traceback
import urllib.request
import xml.etree.ElementTree as ET
2019-09-27 18:43:56 +02:00
from enum import Enum
from re import sub
2019-09-27 19:57:32 +02:00
from time import time
from typing import Any
from urllib.error import HTTPError
import telepot
from babel.dates import format_date
from googletrans import Translator
from telepot.aio import DelegatorBot
from telepot.aio.delegate import per_chat_id, create_open, pave_event_space
from telepot.aio.loop import MessageLoop
2019-09-27 18:43:56 +02:00
from telepot.delegate import include_callback_query_chat_id
from telepot.exception import BotWasBlockedError
2019-09-27 18:43:56 +02:00
from telepot.namedtuple import InlineKeyboardMarkup, InlineKeyboardButton
class Essen:
2020-10-15 20:39:02 +02:00
"""Class which represents a Essen-Object"""
def __init__(self, name, preis, kategorie) -> None:
"""
Initialize a Essen-Object with given values
Parameters:
name (string): name of the meal
preis (float): price of the meal
kategorie (string): category of the meal
Returns:
None
"""
self.name = name
self.preis = preis
self.kategorie = kategorie
2020-10-15 20:39:02 +02:00
return None
2020-10-15 20:39:02 +02:00
def __str__(self) -> str:
"""
Formats the attribute of the Essen-Object to a string
Parameters:
None
Returns:
_ (string): formatted String of attributes
"""
if self.preis > 0:
return str("*%s*: `%s` (%.2f €)" % (self.kategorie, self.name, self.preis))
else:
return str("*%s*: `%s`" % (self.kategorie, self.name))
2019-09-27 18:43:56 +02:00
class Language(Enum):
GERMAN = "de"
ENGLISH = "en"
GERMAN_ENGLISH = "de+en"
config_filename = "config.json"
2020-10-25 12:57:41 +01:00
2020-10-25 23:32:14 +01:00
var_corona_msg_de = "Bitte halten Sie sich an die AHA-Regeln in der Mensa und achten sie auf die "\
"vorgegebenen Markierung und Hinweise. Den aktuellen Corona-Status an der HS Mittweida "\
"können Sie an der [Corona-Ampel](https://www.hs-mittweida.de/) oder im [Corona-Newsticker]"\
"(https://www.hs-mittweida.de/index.php?id=247957) der Hochschule einsehen. Nur *ZUSAMMEN* "\
"bewirken wir etwas und bewätigen die Pandemie. Bleiben Sie gesund."
var_corona_msg_eng = "Please follow the AHA rules in the cafeteria and pay attention to the given markings "\
"and instructions. You can see the current Corona status of HS Mittweida at the [Corona"\
"traffic light](https://www.hs-mittweida.de/) or in the [Corona news ticker]"\
"(https://www.hs-mittweida.de/index.php?id=247957) of the university. Only *TOGETHER* we "\
"make a difference and overcome the pandemic. Stay healthy."
info_str = "*Inoffizieller Mensa-Bot der Hochschule Mittweida*\nDieser Bot versendet jeden Tag um 10 Uhr den aktuellen " \
"Mensa-Speiseplan. Er wird über /start für den aktuellen Chat oder die aktuelle Gruppe gestartet, " \
"/stop beendet ihn wieder. Mit /essen, /mensa und /speiseplan (optional gefolgt von _en_ oder _de_) kann " \
"der aktuelle Speiseplan manuell abgerufen werden. Mit /settings kann (von Gruppenadmins) die Sprache " \
"verändert werden.\n\n_Haftungsausschluss: Dieser Bot steht in keiner Verbindung mit der Hochschule " \
"Mittweida oder dem Studentenwerk Freiberg. Alle Angaben ohne Gewähr._\n\nGrafik bereitgestellt von [" \
"vecteezy.com](https://de.vecteezy.com) "
2020-10-25 12:57:41 +01:00
status = ""
essen = []
essen_eng = []
var = True
datum = None
2019-09-27 19:57:32 +02:00
last_updated = 0.0
botID = -1
class EnumEncode(json.JSONEncoder):
def default(self, obj):
if type(obj) is Language:
return str(obj.name)
return json.JSONEncoder.default(self, obj)
class HSMensaW(telepot.aio.helper.ChatHandler):
2019-09-27 18:43:56 +02:00
async def on_chat_message(self, msg: dict) -> None:
global config, essen, status, var, logging_enabled
content_type, chat_type, chat_id = telepot.glance(msg)
if content_type == "text":
text = str(msg["text"]).lower()
if text.startswith("/start"):
if chat_id not in ids:
2019-09-27 18:43:56 +02:00
ids[chat_id] = Language.GERMAN
config['ids'] = ids
await send_message(bot_obj=bot, chat_id=chat_id, msg="Bot wurde aktiviert")
chat = get_chat_name(msg)
await send_status("Bot aktiviert für Chat %s (ID: %i)" % (chat, chat_id))
write_config()
else:
await send_message(bot_obj=bot, chat_id=chat_id, msg="Bot war bereits aktiviert")
elif text.startswith("/stop"):
2019-09-27 18:43:56 +02:00
if ids.pop(chat_id, None) is not None:
config['ids'] = ids
await send_message(bot_obj=bot, chat_id=chat_id, msg="Bot wurde deaktiviert")
chat = get_chat_name(msg)
await send_status("Bot deaktiviert für Chat %s (ID: %i)" % (chat, chat_id))
write_config()
else:
await send_message(bot_obj=bot, chat_id=chat_id, msg="Bot war nicht aktiv")
elif text.startswith("/essen") or text.startswith("/mensa") or text.startswith("/speiseplan"):
chat = get_chat_name(msg)
await send_status("Essen angefordert für Chat %s (ID: %i)" % (chat, chat_id))
await get_essen(False)
if len(essen) == 0:
if var:
await send_message(bot_obj=bot, chat_id=chat_id,
msg="Es ist ein Fehler aufgetreten. Bitte später erneut versuchen.")
else:
await send_message(bot_obj=bot, chat_id=chat_id,
msg="Für heute ist leider kein Speiseplan verfügbar.")
else:
if text.endswith(" en"):
language = Language.ENGLISH
elif text.endswith(" de"):
language = Language.GERMAN
else:
if chat_id in ids.keys():
language = ids[chat_id]
else:
language = Language.GERMAN
await send_essen(chat_id, language)
await send_status("Essen versendet für Chat %s (ID: %i)" % (chat, chat_id))
2019-09-27 18:37:17 +02:00
elif text.startswith("/help"):
await send_message(bot_obj=bot, chat_id=chat_id, msg=info_str, parse_mode="markdown")
2019-09-27 18:43:56 +02:00
elif text.startswith("/settings"):
if chat_id in ids:
callback_chat_id = "::" + str(chat_id)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Deutsch", callback_data=Language.GERMAN.value + callback_chat_id)],
[InlineKeyboardButton(text="English", callback_data=Language.ENGLISH.value + callback_chat_id)],
[InlineKeyboardButton(text="Deutsch + English",
callback_data=Language.GERMAN_ENGLISH.value + callback_chat_id)],
])
await send_message(bot_obj=bot, chat_id=chat_id, msg="Bitte Sprache(n) auswählen/Please select "
"language(s)", reply_markup=keyboard)
else:
await send_message(bot_obj=bot, chat_id=chat_id,
msg="Bitte starten Sie zuerst den Bot/Please start the bot first.\n/start")
2019-09-27 18:43:56 +02:00
elif text.startswith("/status") and chat_id in config_ids:
await send_message(bot_obj=bot, chat_id=chat_id, msg=status, parse_mode="markdown")
elif text.startswith("/logging") and chat_id in config_ids:
if text.endswith("on") or text.endswith("1") or text.lower().endswith("true"):
logging_enabled = True
config["logging_enabled"] = True
write_config()
await send_message(bot_obj=bot, chat_id=chat_id, msg="logging enabled", parse_mode="markdown")
elif text.endswith("off") or text.endswith("0") or text.lower().endswith("false"):
logging_enabled = False
config["logging_enabled"] = False
write_config()
await send_message(bot_obj=bot, chat_id=chat_id, msg="logging disabled", parse_mode="markdown")
else:
message = "logging enabled: %s (change with on/off or true/false or 1/0)" % logging_enabled
await send_message(bot_obj=bot, chat_id=chat_id, msg=message, parse_mode="markdown")
elif content_type == "new_chat_member":
if msg["new_chat_participant"]["id"] == get_bot_id():
await send_message(bot_obj=bot, chat_id=chat_id, msg=info_str, parse_mode="markdown")
elif content_type == "left_chat_member":
if msg["left_chat_participant"]["id"] == get_bot_id():
2019-09-27 18:43:56 +02:00
if ids.pop(chat_id, None) is not None:
config['ids'] = ids
write_config()
2019-09-27 18:43:56 +02:00
async def on_callback_query(self, msg):
global config, ids
query_id, from_id, query_data = telepot.glance(msg, flavor='callback_query')
lang = query_data[:query_data.index("::")]
chat_id = int(query_data[query_data.index("::") + 2:])
chat_member = await bot.getChatMember(chat_id=chat_id, user_id=from_id)
member_status = chat_member["status"]
if msg["message"]["chat"]["type"] != "private" and member_status != "creator" and \
member_status != "administrator":
await bot.answerCallbackQuery(callback_query_id=query_id, text="Keine Berechtigung. Bitten Sie einen "
"Admin, die Sprache zu ändern.")
return
name = ""
user_id = msg["from"]["id"]
if "first_name" in msg["from"].keys():
name = msg["from"]["first_name"] + " "
if "last_name" in msg["from"].keys():
name += msg["from"]["last_name"]
if len(name) == 0:
name = "User " + user_id
msg = "[" + name + "](tg://user?id=" + str(user_id) + ") hat die Sprache geändert nach "
2019-09-27 18:43:56 +02:00
message = "Fehler beim Setzen der Sprache"
if lang == Language.GERMAN.value:
message = msg + "_Deutsch_."
elif lang == Language.ENGLISH.value:
message = msg + "_English_."
elif lang == Language.GERMAN_ENGLISH.value:
message = msg + "_Deutsch + Englisch_."
2019-09-27 18:43:56 +02:00
if chat_id in ids:
ids[chat_id] = Language(lang)
2019-09-27 18:43:56 +02:00
config['ids'] = ids
write_config()
else:
message = "Fehler: Bot nicht aktiviert für diesen Chat!"
2019-09-27 18:43:56 +02:00
await send_message(bot_obj=bot, chat_id=chat_id, msg=message, parse_mode="markdown")
2019-09-27 18:43:56 +02:00
async def send_essen(chat_id: int, sprache: Language = Language.GERMAN) -> None:
global datum, essen, essen_eng
2019-09-27 18:43:56 +02:00
if sprache == Language.GERMAN or sprache == Language.GERMAN_ENGLISH:
nachricht = "Speiseplan am %s:\n" % format_date(datum, format="full", locale="de_DE")
for i in essen:
nachricht += "- " + str(i).replace(".", ",") + "\n\n"
nachricht += var_corona_msg_de
2019-09-27 18:43:56 +02:00
await send_message(bot_obj=bot, chat_id=chat_id, msg=nachricht, parse_mode="markdown")
if sprache == Language.ENGLISH or sprache == Language.GERMAN_ENGLISH:
nachricht = "Menu on %s:\n" % format_date(datum, format="full", locale="en")
for i in essen_eng:
nachricht += "- " + str(i) + "\n\n"
nachricht += var_corona_msg_eng
2019-09-27 18:43:56 +02:00
await send_message(bot_obj=bot, chat_id=chat_id, msg=nachricht, parse_mode="markdown")
async def send_status(text: str, ignore_flag: bool = False) -> None:
"""
Sends a status message to all chats specified in config_ids
:param text: Message
:param ignore_flag: Should the logging_enabled flag be ignored (should a message be sent even if it is set to False)?
:return:
"""
global config_ids
message_log.write("[%s] %s\n" % (get_now(), text))
if ignore_flag or logging_enabled:
for chat_id in config_ids:
await send_message(bot_obj=bot, chat_id=chat_id, msg=text)
async def send_message(bot_obj: DelegatorBot, chat_id: int, msg: str, parse_mode: Any = None,
disable_web_page_preview: bool = None, disable_notification: bool = None,
reply_to_message_id: int = None, reply_markup: Any = None) -> None:
"""Sends a message with bot_obj to chat_id. See https://core.telegram.org/bots/api#sendmessage for details."""
try:
await bot_obj.sendMessage(chat_id=chat_id, text=msg, parse_mode=parse_mode,
disable_web_page_preview=disable_web_page_preview,
disable_notification=disable_notification, reply_to_message_id=reply_to_message_id,
reply_markup=reply_markup)
except BotWasBlockedError:
2019-09-27 18:43:56 +02:00
if ids.pop(chat_id, None) is not None:
config['ids'] = ids
await send_status("Bot wurde blockiert für Chat ID %i" % chat_id)
write_config()
def write_config() -> None:
with open(config_filename, 'w') as outfile:
json.dump(config, outfile, indent=4, sort_keys=True, cls=EnumEncode)
async def get_essen(only_today: bool) -> None:
"""
Downloads the plan for the next day and parses it
:param only_today: only current day; no download if the mensa is closed
:return:
"""
2019-09-27 19:57:32 +02:00
global datum, essen, essen_eng, var, last_updated # , ctx
if time() < last_updated + 30 or (len(essen) > 0 and time() < last_updated + 900):
2019-09-27 19:57:32 +02:00
return
last_updated = time()
essen = []
essen_eng = []
try:
# response = urllib.request.urlopen(url, context=ctx)
response = urllib.request.urlopen(url)
except HTTPError:
var = False
return
except Exception as expt:
await send_status("Es ist ein Fehler aufgetreten:\n[%s] %s" % (sys.exc_info()[0], str(expt)), ignore_flag=True)
print("[%s] Fehler:\n%s" % (get_now(), traceback.format_exc()),
file=sys.stderr)
var = True
return
data = response.read()
response.close()
text = data.decode('utf-8')
et = ET.fromstring(text)
dates = et.findall("./menus/day/date")
if datetime.datetime.now().hour > 13:
day = datetime.date.today() + datetime.timedelta(1)
else:
day = datetime.date.today()
if len(dates) == 0:
return
date_dt = None
for i in dates:
date_xml = i.text
date_dt = datetime.date(int(date_xml[:4]), int(date_xml[5:7]), int(date_xml[8:10]))
if date_dt >= day:
break
if date_dt < day or (only_today and date_dt != day):
return
datum = date_dt
menus = et.findall("./menus/day[date='" + datum.isoformat() + "']/menu")
essen_transl = []
preise = []
for i in menus:
kategorie = i.findall("type")[0].text
essen_transl.append(kategorie)
name = sub("\\s*\\([0-9A-Z, ]*\\)", "", i.findall("description")[0].text)
essen_transl.append(name)
try:
preis = float(i.findall("./prices/price[label='Studenten']/value")[0].text.replace(",", "."))
except ValueError:
preis = -1
preise.append(preis)
essen.append(Essen(name, preis, kategorie))
translations = translator.translate(essen_transl)
for i in range(int(len(translations) / 2)):
essen_eng.append(Essen(translations[i * 2 + 1].text, preise[i], translations[i * 2].text))
def get_bot_id() -> int:
global botID
if botID == -1:
api_url = "https://api.telegram.org/bot" + telegram_bot_token + "/getMe"
with urllib.request.urlopen(api_url) as response:
data = json.loads(response.read().decode('utf-8'))
botID = data["result"]["id"]
return botID
def get_chat_name(msg: dict) -> str:
chat = ""
if msg["chat"]["type"] == "group":
chat = msg["chat"]["title"] + " (g)"
elif msg["chat"]["type"] == "private":
if "first_name" in msg["chat"].keys():
chat = msg["chat"]["first_name"] + " "
if "last_name" in msg["chat"].keys():
chat += msg["chat"]["last_name"] + " "
chat += "(p)"
else:
chat = "null"
return chat
def shutdown(signum, frame):
print('Signal handler called with signal', signum)
ml.close()
loop.stop()
loop.close()
message_log.flush()
message_log.close()
sys.stdout.flush()
sys.stdout.close()
sys.stderr.flush()
sys.stderr.close()
sys.exit(-1)
async def essen_loop() -> None:
global status, essen, ids, var
while True:
now = datetime.datetime.today()
next_day = datetime.datetime(now.year, now.month, now.day) + datetime.timedelta(1, 36000)
await send_status("Wartezeit: %i Sekunden" % (next_day - now).seconds)
status = "Warten bis %s" % next_day
await asyncio.sleep((next_day - now).seconds)
await send_status("Aufwachen um 10 Uhr")
status = "Essen abrufen"
await get_essen(True)
# try again if error
if len(essen) == 0 and var:
await send_status("Fehler, warte 30 Sekunden")
await asyncio.sleep(30)
await get_essen(True)
await send_status("%i Essen gefunden" % len(essen))
status = "Essen senden"
if len(essen) > 0:
await send_status("Essen werden gesendet")
2019-09-27 18:43:56 +02:00
for i in ids.keys():
await send_essen(i, ids[i])
await send_status("Abgeschlossen, warte 30 Sekunden")
status = "Warte 30 Sekunden"
await asyncio.sleep(30)
def get_now() -> str:
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def process_json(d):
if isinstance(d, dict):
try:
return {int(k): getattr(Language, v) for k, v in d.items()}
except (ValueError, TypeError, AttributeError):
return d
return d
sys.stdout = open("out.log", "a")
sys.stderr = open("err.log", "a")
message_log = open("msg.log", "a")
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
translator = Translator()
try:
with open(config_filename, 'r') as config_file:
config = json.load(config_file, object_hook=process_json)
except FileNotFoundError as e:
print('Error: config file "{}" not found: {}'.format(config_filename, e))
sys.exit(-1)
except ValueError as e:
print('Error: invalid config file "{}": {}'.format(config_filename, e))
sys.exit(-1)
telegram_bot_token = config.get("telegram_bot_token")
url = config.get("url")
ids = config.get("ids")
config_ids = config.get("config_ids")
logging_enabled = config.get("logging_enabled")
bot = telepot.aio.DelegatorBot(telegram_bot_token, [
2019-09-27 18:43:56 +02:00
include_callback_query_chat_id(
pave_event_space())(
per_chat_id(), create_open, HSMensaW, timeout=10
),
])
loop = asyncio.get_event_loop()
# ml = MessageLoop(bot, handle).run_forever()
ml = MessageLoop(bot).run_forever()
loop.create_task(ml)
loop.create_task(essen_loop())
loop.run_forever()
# ctx = ssl.create_default_context()
# ctx.check_hostname = False
# ctx.verify_mode = ssl.CERT_NONE