#!/usr/bin/python3 # -*- coding: utf-8 -*- import asyncio import datetime import json import signal import sys import urllib.request import xml.etree.ElementTree as ET from typing import Any from urllib.error import HTTPError import telepot from telepot.aio import DelegatorBot from telepot.aio.delegate import per_chat_id, create_open, pave_event_space from telepot.aio.loop import MessageLoop from telepot.exception import BotWasBlockedError class Essen: def __init__(self, name, preis, kategorie): self.name = name self.preis = preis self.kategorie = kategorie def __str__(self): if self.preis > 0: return str("*%s*: `%s` (%.2f €)" % (self.kategorie, self.name, self.preis)) else: return str("*%s*: `%s`" % (self.kategorie, self.name)) config_filename = "config.json" monate = ["Januar", "Februar", "März", "April", "Mai", "Juni", "August", "September", "Oktober", "November", "Dezember"] wochentage = ["Montag", "Dienstag", "Mittwoch", "Donnerstag", "Freitag", "Samstag", "Sonntag"] info_str = "*Mensa-Bot der Hochschule Mittweida (beta)*\nDieser Bot versendet jeden Tag um 10 Uhr den aktuellen " \ "Mensa-Speiseplan. Er wird über /start für den aktuellen Chat oder die aktuelle Gruppe gestartet, /stop " \ "beendet ihn wieder. Mit /essen, /mensa und /speiseplan kann der aktuelle Speiseplan manuell abgerufen " \ "werden.\n\n_Haftungsausschluss: Dieser Bot steht in keiner Verbindung mit der Hochschule Mittweida oder" \ " dem Studentenwerk Freiberg. Alle Angaben ohne Gewähr._\n\nGrafik bereitgestellt von " \ "[vecteezy.com](https://de.vecteezy.com)" status = "" essen = [] var = True datum = None class HSMensaW(telepot.aio.helper.ChatHandler): @staticmethod async def on_chat_message(msg: dict) -> None: global config, essen, status, var content_type, chat_type, chat_id = telepot.glance(msg) if content_type == "text": text = str(msg["text"]).lower() if text.startswith("/start"): if chat_id not in ids: ids.append(chat_id) config['ids'] = ids await send_message(bot_obj=bot, chat_id=chat_id, msg="Bot wurde aktiviert") chat = get_chat_name(msg) await send_status("Bot aktiviert für Chat %s (ID: %i)" % (chat, chat_id)) write_config() else: await send_message(bot_obj=bot, chat_id=chat_id, msg="Bot war bereits aktiviert") elif text.startswith("/stop"): if chat_id in ids: ids.remove(chat_id) config['ids'] = ids await send_message(bot_obj=bot, chat_id=chat_id, msg="Bot wurde deaktiviert") chat = get_chat_name(msg) await send_status("Bot deaktiviert für Chat %s (ID: %i)" % (chat, chat_id)) write_config() else: await send_message(bot_obj=bot, chat_id=chat_id, msg="Bot war nicht aktiv") elif text.startswith("/essen") or text.startswith("/mensa") or text.startswith("/speiseplan"): chat = get_chat_name(msg) await send_status("Essen angefordert für Chat %s (ID: %i)" % (chat, chat_id)) await get_essen(False) if len(essen) == 0: if var: await send_message(bot_obj=bot, chat_id=chat_id, msg="Es ist ein Fehler aufgetreten. Bitte später erneut versuchen.") else: await send_message(bot_obj=bot, chat_id=chat_id, msg="Für heute ist leider kein Speiseplan verfügbar.") else: await send_essen(chat_id) await send_status("Essen versendet für Chat %s (ID: %i)" % (chat, chat_id)) elif text.startswith("/status") and chat_id in config_ids: await send_message(bot_obj=bot, chat_id=chat_id, msg=status, parse_mode="markdown") elif text.startswith("/info"): await send_message(bot_obj=bot, chat_id=chat_id, msg=info_str, parse_mode="markdown") elif content_type == "new_chat_member": if msg["new_chat_participant"]["id"] == get_bot_id(): await send_message(bot_obj=bot, chat_id=chat_id, msg=info_str, parse_mode="markdown") elif content_type == "left_chat_member": if msg["left_chat_participant"]["id"] == get_bot_id(): if chat_id in ids: ids.remove(chat_id) config['ids'] = ids write_config() async def send_essen(chat_id: int) -> None: global datum, essen nachricht = "Speiseplan am %s, den %s:\n" % (wochentage[datum.weekday()], datum.strftime("%d. MONAT %Y")) nachricht = nachricht.replace("MONAT", monate[(datum.month - 1) % 12]) for i in essen: nachricht += "- " + str(i).replace(".", ",") + "\n\n" send_message(bot_obj=bot, chat_id=chat_id, msg=nachricht, parse_mode="markdown") async def send_status(text: str) -> None: global config_ids for chat_id in config_ids: send_message(bot_obj=bot, chat_id=chat_id, msg=text) def send_message(bot_obj: DelegatorBot, chat_id: int, msg: str, parse_mode: Any = None, disable_web_page_preview: bool = None, disable_notification: bool = None, reply_to_message_id: int = None, reply_markup: Any = None) -> None: """Sends a message with bot_obj to chat_id. See https://core.telegram.org/bots/api#sendmessage for details.""" try: bot_obj.sendMessage(chat_id=chat_id, text=msg, parse_mode=parse_mode, disable_web_page_preview=disable_web_page_preview, disable_notification=disable_notification, reply_to_message_id=reply_to_message_id, reply_markup=reply_markup) except BotWasBlockedError: if chat_id in ids: ids.remove(chat_id) config['ids'] = ids await send_status("Bot wurde blockiert für Chat ID %i)" % chat_id) write_config() def write_config() -> None: with open(config_filename, 'w') as outfile: json.dump(config, outfile) async def get_essen(only_today: bool) -> None: global datum, essen, var # , ctx essen = [] try: # response = urllib.request.urlopen(url, context=ctx) response = urllib.request.urlopen(url) except HTTPError: var = False return except Exception as expt: await send_status("Es ist ein Fehler aufgetreten:\n%s" % str(expt)) print("Fehler:\n%s" % str(expt)) var = True return data = response.read() response.close() text = data.decode('utf-8') et = ET.fromstring(text) dates = et.findall("./menus/day/date") if datetime.datetime.now().hour > 13: day = datetime.date.today() + datetime.timedelta(1) else: day = datetime.date.today() if len(dates) == 0: return date_dt = None for i in dates: date_xml = i.text date_dt = datetime.date(int(date_xml[:4]), int(date_xml[5:7]), int(date_xml[8:10])) if date_dt >= day: break if date_dt < day or (only_today and date_dt != day): return datum = date_dt menus = et.findall("./menus/day[date='" + datum.isoformat() + "']/menu") for i in menus: kategorie = i.findall("type")[0].text name = i.findall("description")[0].text.strip("()1234567890, ") try: preis = float(i.findall("./prices/price[label='Studenten']/value")[0].text.replace(",", ".")) except ValueError: preis = -1 essen.append(Essen(name, preis, kategorie)) def get_bot_id() -> int: api_url = "https://api.telegram.org/bot" + telegram_bot_token + "/getMe" with urllib.request.urlopen(api_url) as response: data = json.load(response) bot_id = data["result"]["id"] return bot_id def get_chat_name(msg: dict) -> str: if msg["chat"]["type"] == "group": chat = msg["chat"]["title"] + " (g)" elif msg["chat"]["type"] == "private": chat = msg["chat"]["first_name"] + " " + msg["chat"]["last_name"] + " (p)" else: chat = "null" return chat def shutdown(signum, frame): print('Signal handler called with signal', signum) ml.close() loop.stop() loop.close() sys.stdout.close() sys.stderr.close() sys.exit(-1) async def essen_loop() -> None: global status, essen, ids while True: now = datetime.datetime.today() next_day = datetime.datetime(now.year, now.month, now.day) + datetime.timedelta(1, 36000) await send_status("Wartezeit: %i Sekunden" % (next_day - now).seconds) status = "Warten bis %s" % next_day await asyncio.sleep((next_day - now).seconds) await send_status("Aufwachen um 10 Uhr") status = "Essen abrufen" await get_essen(True) await send_status("%i Essen gefunden" % len(essen)) status = "Essen senden" if len(essen) > 0: await send_status("Essen werden gesendet") for i in ids: await send_essen(i) await send_status("Abgeschlossen, warte 30 Sekunden") status = "Warte 30 Sekunden" await asyncio.sleep(30) sys.stdout = open("out.log", "a") sys.stderr = open("err.log", "a") signal.signal(signal.SIGTERM, shutdown) signal.signal(signal.SIGINT, shutdown) try: with open(config_filename, 'r') as config_file: config = json.load(config_file) except FileNotFoundError as e: print('Error: config file "{}" not found: {}'.format(config_filename, e)) sys.exit(-1) except ValueError as e: print('Error: invalid config file "{}": {}'.format(config_filename, e)) sys.exit(-1) telegram_bot_token = config.get("telegram_bot_token") url = config.get("url") ids = config.get("ids") config_ids = config.get("config_ids") bot = telepot.aio.DelegatorBot(telegram_bot_token, [ pave_event_space()( per_chat_id(), create_open, HSMensaW, timeout=10 ), ]) loop = asyncio.get_event_loop() # ml = MessageLoop(bot, handle).run_forever() ml = MessageLoop(bot).run_forever() loop.create_task(ml) loop.create_task(essen_loop()) loop.run_forever() # ctx = ssl.create_default_context() # ctx.check_hostname = False # ctx.verify_mode = ssl.CERT_NONE