#!/usr/bin/python3 # -*- coding: utf-8 -*- import asyncio import datetime import json import signal import sys import urllib.request import xml.etree.ElementTree as ET from urllib.error import HTTPError import telepot from telepot.aio.delegate import per_chat_id, create_open, pave_event_space from telepot.aio.loop import MessageLoop class Essen: def __init__(self, name, preis, kategorie): self.name = name self.preis = preis self.kategorie = kategorie def __str__(self): if self.preis > 0: return str("*%s*: %s (%.2f €)" % (self.kategorie, self.name, self.preis)) else: return str("*%s*: %s" % (self.kategorie, self.name)) config_filename = "config.json" monate = ["Januar", "Februar", "März", "April", "Mai", "Juni", "August", "September", "Oktober", "November", "Dezember"] wochentage = ["Montag", "Dienstag", "Mittwoch", "Donnerstag", "Freitag", "Samstag", "Sonntag"] info_str = "*Mensa-Bot der Hochschule Mittweida (beta)*\nDieser Bot versendet jeden Tag um 10 Uhr den aktuellen " \ "Mensa-Speiseplan. Er wird über /start für den aktuellen Chat oder die aktuelle Gruppe gestartet, /stop " \ "beendet ihn wieder. Mit /essen, /mensa und /speiseplan kann der aktuelle Speiseplan manuell abgerufen " \ "werden.\n\n_Haftungsausschluss: Dieser Bot steht in keiner Verbindung mit der Hochschule Mittweida oder" \ " dem Studentenwerk Freiberg. Alle Angaben ohne Gewähr._" status = "" essen = [] var = True class HSMensaW(telepot.aio.helper.ChatHandler): async def on_chat_message(self, msg): global config, essen, status, var content_type, chat_type, chat_id = telepot.glance(msg) if content_type == "text": text = str(msg["text"]).lower() if text.startswith("/start"): if chat_id not in ids: ids.append(chat_id) config['ids'] = ids await bot.sendMessage(chat_id, "Bot wurde aktiviert") chat = get_chat_name(msg) await send_status("Bot aktiviert für Chat %s (ID: %i)" % (chat, chat_id)) with open(config_filename, 'w') as outfile: json.dump(config, outfile) else: await bot.sendMessage(chat_id, "Bot war bereits aktiviert") elif text.startswith("/stop"): if chat_id in ids: ids.remove(chat_id) config['ids'] = ids await bot.sendMessage(chat_id, "Bot wurde deaktiviert") chat = get_chat_name(msg) await send_status("Bot deaktiviert für Chat %s (ID: %i)" % (chat, chat_id)) with open(config_filename, 'w') as outfile: json.dump(config, outfile) else: await bot.sendMessage(chat_id, "Bot war nicht aktiv") elif text.startswith("/essen") or text.startswith("/mensa") or text.startswith("/speiseplan"): chat = get_chat_name(msg) await send_status("Essen angefordert für Chat %s (ID: %i)" % (chat, chat_id)) await get_essen() if len(essen) == 0: if var: await bot.sendMessage(chat_id, "Es ist ein Fehler aufgetreten. Bitte später erneut versuchen.") else: await bot.sendMessage(chat_id, "Für heute ist leider kein Speiseplan verfügbar.") else: await send_essen(chat_id) await send_status("Essen versendet für Chat %s (ID: %i)" % (chat, chat_id)) elif text.startswith("/status") and chat_id in config_ids: await bot.sendMessage(chat_id, status) elif text.startswith("/info"): await bot.sendMessage(chat_id, info_str, "markdown") elif content_type == "new_chat_member": if msg["new_chat_participant"]["id"] == get_bot_id(): await bot.sendMessage(chat_id, info_str, "markdown") elif content_type == "left_chat_member": if msg["left_chat_participant"]["id"] == get_bot_id(): if chat_id in ids: ids.remove(chat_id) config['ids'] = ids with open(config_filename, 'w') as outfile: json.dump(config, outfile) async def send_essen(chat_id): global datum, essen nachricht = "Speiseplan am WOCHENTAG, den %s:\n" % datum.strftime("%d. MONAT %Y") nachricht = nachricht.replace("MONAT", monate[(datum.month - 1) % 12]) nachricht = nachricht.replace("WOCHENTAG", wochentage[datum.weekday()]) for i in essen: nachricht += "- " + str(i).replace(".", ",") + "\n\n" await bot.sendMessage(chat_id, nachricht, "markdown") async def send_status(text): global config_ids for chat_id in config_ids: await bot.sendMessage(chat_id, text) async def get_essen(): global datum, essen, var # , ctx essen = [] try: # response = urllib.request.urlopen(url, context=ctx) response = urllib.request.urlopen(url) except HTTPError: var = False return except Exception as expt: await send_status("Es ist ein Fehler aufgetreten:\n%s" % str(expt)) print("Fehler:\n%s" % str(expt)) var = True return data = response.read() response.close() text = data.decode('utf-8') et = ET.fromstring(text) date = et.findall("./menus/day/date")[0].text datum = datetime.date(int(date[:4]), int(date[5:7]), int(date[8:10])) menus = et.findall("./menus/day/menu") for i in menus: kategorie = i.findall("type")[0].text name = i.findall("description")[0].text.strip("()1234567890, ") try: preis = float(i.findall("./prices/price[label='Studenten']/value")[0].text.replace(",", ".")) except ValueError: preis = -1 essen.append(Essen(name, preis, kategorie)) def get_bot_id(): api_url = "https://api.telegram.org/bot" + telegram_bot_token + "/getMe" with urllib.request.urlopen(api_url) as response: data = json.load(response) bot_id = data["result"]["id"] return bot_id def get_chat_name(msg): if msg["chat"]["type"] == "group": chat = msg["chat"]["title"] + " (g)" elif msg["chat"]["type"] == "private": chat = msg["chat"]["first_name"] + " " + msg["chat"]["last_name"] + " (p)" else: chat = "null" return chat def shutdown(signum, frame): print('Signal handler called with signal', signum) ml.close() loop.stop() loop.close() sys.stdout.close() sys.stderr.close() sys.exit(-1) async def essen_loop(): global status, essen, ids while True: now = datetime.datetime.today() next_day = datetime.datetime(now.year, now.month, now.day) + datetime.timedelta(1, 36000) await send_status("Wartezeit: %i Sekunden" % (next_day - now).seconds) status = "Warten bis %s" % next_day await asyncio.sleep((next_day - now).seconds) await send_status("Aufwachen um 10 Uhr") status = "Essen abrufen" await get_essen() await send_status("%i Essen gefunden" % len(essen)) status = "Essen senden" if len(essen) > 0: await send_status("Essen werden gesendet") for i in ids: await send_essen(i) await send_status("Abgeschlossen, warte 30 Sekunden") status = "Warte 30 Sekunden" await asyncio.sleep(30) sys.stdout = open("out.log", "a") sys.stderr = open("err.log", "a") signal.signal(signal.SIGTERM, shutdown) signal.signal(signal.SIGINT, shutdown) try: with open(config_filename, 'r') as config_file: config = json.load(config_file) except FileNotFoundError as e: print('Error: config file "{}" not found: {}'.format(config_filename, e)) sys.exit(-1) except ValueError as e: print('Error: invalid config file "{}": {}'.format(config_filename, e)) sys.exit(-1) telegram_bot_token = config.get("telegram_bot_token") url = config.get("url") ids = config.get("ids") config_ids = config.get("config_ids") bot = telepot.aio.DelegatorBot(telegram_bot_token, [ pave_event_space()( per_chat_id(), create_open, HSMensaW, timeout=10 ), ]) loop = asyncio.get_event_loop() # ml = MessageLoop(bot, handle).run_forever() ml = MessageLoop(bot).run_forever() loop.create_task(ml) loop.create_task(essen_loop()) loop.run_forever() # ctx = ssl.create_default_context() # ctx.check_hostname = False # ctx.verify_mode = ssl.CERT_NONE