diff --git a/.gitignore b/.gitignore index c90768b..d6578f6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ automations_webhooks.yaml spotify.yaml calendars.yaml +ics_calendars.yaml esphome/common/secrets.yaml esphome/secrets.yaml secrets.yaml diff --git a/configuration.yaml b/configuration.yaml index f7b8c44..d88c389 100644 --- a/configuration.yaml +++ b/configuration.yaml @@ -42,6 +42,7 @@ template: !include template.yaml # calendar integration calendar: !include calendars.yaml +ics_calendar: !include ics_calendars.yaml # DB-recorder configuration recorder: !include recorder.yaml diff --git a/custom_components/ics_calendar/__init__.py b/custom_components/ics_calendar/__init__.py new file mode 100644 index 0000000..b34768a --- /dev/null +++ b/custom_components/ics_calendar/__init__.py @@ -0,0 +1,113 @@ +"""ics Calendar for Home Assistant.""" + +import logging + +import homeassistant.helpers.config_validation as cv +import voluptuous as vol +from homeassistant.const import ( + CONF_EXCLUDE, + CONF_INCLUDE, + CONF_NAME, + CONF_PASSWORD, + CONF_PREFIX, + CONF_URL, + CONF_USERNAME, + Platform, +) +from homeassistant.core import HomeAssistant +from homeassistant.helpers.typing import ConfigType + +from .const import DOMAIN, UPGRADE_URL + +_LOGGER = logging.getLogger(__name__) +PLATFORMS: list[Platform] = [Platform.CALENDAR] + +CONF_DEVICE_ID = "device_id" +CONF_CALENDARS = "calendars" +CONF_DAYS = "days" +CONF_INCLUDE_ALL_DAY = "include_all_day" +CONF_PARSER = "parser" +CONF_DOWNLOAD_INTERVAL = "download_interval" +CONF_USER_AGENT = "user_agent" +CONF_OFFSET_HOURS = "offset_hours" +CONF_ACCEPT_HEADER = "accept_header" + +CONFIG_SCHEMA = vol.Schema( + { + DOMAIN: vol.Schema( + { + # pylint: disable=no-value-for-parameter + vol.Optional(CONF_CALENDARS, default=[]): vol.All( + cv.ensure_list, + vol.Schema( + [ + vol.Schema( + { + vol.Required(CONF_URL): vol.Url(), + vol.Required(CONF_NAME): cv.string, + vol.Optional( + CONF_INCLUDE_ALL_DAY, default=False + ): cv.boolean, + vol.Optional( + CONF_USERNAME, default="" + ): cv.string, + vol.Optional( + CONF_PASSWORD, default="" + ): cv.string, + vol.Optional( + CONF_PARSER, default="rie" + ): cv.string, + vol.Optional( + CONF_PREFIX, default="" + ): cv.string, + vol.Optional( + CONF_DAYS, default=1 + ): cv.positive_int, + vol.Optional( + CONF_DOWNLOAD_INTERVAL, default=15 + ): cv.positive_int, + vol.Optional( + CONF_USER_AGENT, default="" + ): cv.string, + vol.Optional( + CONF_EXCLUDE, default="" + ): cv.string, + vol.Optional( + CONF_INCLUDE, default="" + ): cv.string, + vol.Optional( + CONF_OFFSET_HOURS, default=0 + ): int, + vol.Optional( + CONF_ACCEPT_HEADER, default="" + ): cv.string, + } + ) + ] + ), + ) + } + ) + }, + extra=vol.ALLOW_EXTRA, +) + + +def setup(hass: HomeAssistant, config: ConfigType) -> bool: + """Set up calendars.""" + _LOGGER.debug("Setting up ics_calendar component") + hass.data.setdefault(DOMAIN, {}) + + if DOMAIN in config and config[DOMAIN]: + hass.helpers.discovery.load_platform( + PLATFORMS[0], DOMAIN, config[DOMAIN], config + ) + else: + _LOGGER.error( + "No configuration found! If you upgraded from ics_calendar v3.2.0 " + "or older, you need to update your configuration! See " + "%s for more information.", + UPGRADE_URL, + ) + + return True diff --git a/custom_components/ics_calendar/__pycache__/__init__.cpython-311.pyc b/custom_components/ics_calendar/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..37a3e01 Binary files /dev/null and b/custom_components/ics_calendar/__pycache__/__init__.cpython-311.pyc differ diff --git a/custom_components/ics_calendar/__pycache__/calendar.cpython-311.pyc b/custom_components/ics_calendar/__pycache__/calendar.cpython-311.pyc new file mode 100644 index 0000000..588e6b0 Binary files /dev/null and b/custom_components/ics_calendar/__pycache__/calendar.cpython-311.pyc differ diff --git a/custom_components/ics_calendar/__pycache__/calendardata.cpython-311.pyc b/custom_components/ics_calendar/__pycache__/calendardata.cpython-311.pyc new file mode 100644 index 0000000..4eb2f05 Binary files /dev/null and b/custom_components/ics_calendar/__pycache__/calendardata.cpython-311.pyc differ diff --git a/custom_components/ics_calendar/__pycache__/const.cpython-311.pyc b/custom_components/ics_calendar/__pycache__/const.cpython-311.pyc new file mode 100644 index 0000000..f4a1dbe Binary files /dev/null and b/custom_components/ics_calendar/__pycache__/const.cpython-311.pyc differ diff --git a/custom_components/ics_calendar/__pycache__/filter.cpython-311.pyc b/custom_components/ics_calendar/__pycache__/filter.cpython-311.pyc new file mode 100644 index 0000000..1d59761 Binary files /dev/null and b/custom_components/ics_calendar/__pycache__/filter.cpython-311.pyc differ diff --git a/custom_components/ics_calendar/__pycache__/icalendarparser.cpython-311.pyc b/custom_components/ics_calendar/__pycache__/icalendarparser.cpython-311.pyc new file mode 100644 index 0000000..f036f7a Binary files /dev/null and b/custom_components/ics_calendar/__pycache__/icalendarparser.cpython-311.pyc differ diff --git a/custom_components/ics_calendar/__pycache__/utility.cpython-311.pyc b/custom_components/ics_calendar/__pycache__/utility.cpython-311.pyc new file mode 100644 index 0000000..c45d96a Binary files /dev/null and b/custom_components/ics_calendar/__pycache__/utility.cpython-311.pyc differ diff --git a/custom_components/ics_calendar/calendar.py b/custom_components/ics_calendar/calendar.py new file mode 100644 index 0000000..8fa6b23 --- /dev/null +++ b/custom_components/ics_calendar/calendar.py @@ -0,0 +1,291 @@ +"""Support for ICS Calendar.""" +import logging +from datetime import datetime, timedelta +from typing import Optional + +# import homeassistant.helpers.config_validation as cv +# import voluptuous as vol +from homeassistant.components.calendar import ( + ENTITY_ID_FORMAT, + CalendarEntity, + CalendarEvent, + extract_offset, + is_offset_reached, +) +from homeassistant.const import ( + CONF_EXCLUDE, + CONF_INCLUDE, + CONF_NAME, + CONF_PASSWORD, + CONF_PREFIX, + CONF_URL, + CONF_USERNAME, +) +from homeassistant.core import HomeAssistant +from homeassistant.helpers.entity import generate_entity_id +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType +from homeassistant.util import Throttle +from homeassistant.util.dt import now as hanow + +from . import ( + CONF_ACCEPT_HEADER, + CONF_CALENDARS, + CONF_DAYS, + CONF_DOWNLOAD_INTERVAL, + CONF_INCLUDE_ALL_DAY, + CONF_OFFSET_HOURS, + CONF_PARSER, + CONF_USER_AGENT, +) +from .calendardata import CalendarData +from .filter import Filter +from .icalendarparser import ICalendarParser + +_LOGGER = logging.getLogger(__name__) + + +OFFSET = "!!" + + +MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=15) + + +def setup_platform( + hass: HomeAssistant, + config: ConfigType, + add_entities: AddEntitiesCallback, + discovery_info: DiscoveryInfoType | None = None, +): + """Set up ics_calendar platform. + + :param hass: Home Assistant object + :type hass: HomeAssistant + :param config: Config information for the platform + :type config: ConfigType + :param add_entities: Callback to add entities to HA + :type add_entities: AddEntitiesCallback + :param discovery_info: Config information for the platform + :type discovery_info: DiscoveryInfoType | None, optional + """ + _LOGGER.debug("Setting up ics calendars") + if discovery_info is not None: + calendars: list = discovery_info.get(CONF_CALENDARS) + else: + calendars: list = config.get(CONF_CALENDARS) + + calendar_devices = [] + for calendar in calendars: + device_data = { + CONF_NAME: calendar.get(CONF_NAME), + CONF_URL: calendar.get(CONF_URL), + CONF_INCLUDE_ALL_DAY: calendar.get(CONF_INCLUDE_ALL_DAY), + CONF_USERNAME: calendar.get(CONF_USERNAME), + CONF_PASSWORD: calendar.get(CONF_PASSWORD), + CONF_PARSER: calendar.get(CONF_PARSER), + CONF_PREFIX: calendar.get(CONF_PREFIX), + CONF_DAYS: calendar.get(CONF_DAYS), + CONF_DOWNLOAD_INTERVAL: calendar.get(CONF_DOWNLOAD_INTERVAL), + CONF_USER_AGENT: calendar.get(CONF_USER_AGENT), + CONF_EXCLUDE: calendar.get(CONF_EXCLUDE), + CONF_INCLUDE: calendar.get(CONF_INCLUDE), + CONF_OFFSET_HOURS: calendar.get(CONF_OFFSET_HOURS), + CONF_ACCEPT_HEADER: calendar.get(CONF_ACCEPT_HEADER), + } + device_id = f"{device_data[CONF_NAME]}" + entity_id = generate_entity_id(ENTITY_ID_FORMAT, device_id, hass=hass) + calendar_devices.append(ICSCalendarEntity(entity_id, device_data)) + + add_entities(calendar_devices) + + +class ICSCalendarEntity(CalendarEntity): + """A CalendarEntity for an ICS Calendar.""" + + def __init__(self, entity_id: str, device_data): + """Construct ICSCalendarEntity. + + :param entity_id: Entity id for the calendar + :type entity_id: str + :param device_data: dict describing the calendar + :type device_data: dict + """ + _LOGGER.debug( + "Initializing calendar: %s with URL: %s", + device_data[CONF_NAME], + device_data[CONF_URL], + ) + self.data = ICSCalendarData(device_data) + self.entity_id = entity_id + self._event = None + self._name = device_data[CONF_NAME] + self._last_call = None + + @property + def event(self) -> Optional[CalendarEvent]: + """Return the current or next upcoming event or None. + + :return: The current event as a dict + :rtype: dict + """ + return self._event + + @property + def name(self): + """Return the name of the calendar.""" + return self._name + + @property + def should_poll(self): + """Indicate if the calendar should be polled. + + If the last call to update or get_api_events was not within the minimum + update time, then async_schedule_update_ha_state(True) is also called. + :return: True + :rtype: boolean + """ + this_call = hanow() + if ( + self._last_call is None + or (this_call - self._last_call) > MIN_TIME_BETWEEN_UPDATES + ): + self._last_call = this_call + self.async_schedule_update_ha_state(True) + return True + + async def async_get_events( + self, hass: HomeAssistant, start_date: datetime, end_date: datetime + ) -> list[CalendarEvent]: + """Get all events in a specific time frame. + + :param hass: Home Assistant object + :type hass: HomeAssistant + :param start_date: The first starting date to consider + :type start_date: datetime + :param end_date: The last starting date to consider + :type end_date: datetime + """ + _LOGGER.debug( + "%s: async_get_events called; calling internal.", self.name + ) + return await self.data.async_get_events(hass, start_date, end_date) + + def update(self): + """Get the current or next event.""" + self.data.update() + self._event = self.data.event + self._attr_extra_state_attributes = { + "offset_reached": is_offset_reached( + self._event.start_datetime_local, self.data.offset + ) + if self._event + else False + } + + +class ICSCalendarData: # pylint: disable=R0902 + """Class to use the calendar ICS client object to get next event.""" + + def __init__(self, device_data): + """Set up how we are going to connect to the URL. + + :param device_data Information about the calendar + """ + self.name = device_data[CONF_NAME] + self._days = device_data[CONF_DAYS] + self._offset_hours = device_data[CONF_OFFSET_HOURS] + self.include_all_day = device_data[CONF_INCLUDE_ALL_DAY] + self._summary_prefix: str = device_data[CONF_PREFIX] + self.parser = ICalendarParser.get_instance(device_data[CONF_PARSER]) + self.parser.set_filter( + Filter(device_data[CONF_EXCLUDE], device_data[CONF_INCLUDE]) + ) + self.offset = None + self.event = None + + self._calendar_data = CalendarData( + _LOGGER, + self.name, + device_data[CONF_URL], + timedelta(minutes=device_data[CONF_DOWNLOAD_INTERVAL]), + ) + + self._calendar_data.set_headers( + device_data[CONF_USERNAME], + device_data[CONF_PASSWORD], + device_data[CONF_USER_AGENT], + device_data[CONF_ACCEPT_HEADER], + ) + + async def async_get_events( + self, hass: HomeAssistant, start_date: datetime, end_date: datetime + ) -> list[CalendarEvent]: + """Get all events in a specific time frame. + + :param hass: Home Assistant object + :type hass: HomeAssistant + :param start_date: The first starting date to consider + :type start_date: datetime + :param end_date: The last starting date to consider + :type end_date: datetime + """ + event_list = [] + if await hass.async_add_executor_job( + self._calendar_data.download_calendar + ): + _LOGGER.debug("%s: Setting calendar content", self.name) + self.parser.set_content(self._calendar_data.get()) + try: + event_list = self.parser.get_event_list( + start=start_date, + end=end_date, + include_all_day=self.include_all_day, + offset_hours=self._offset_hours, + ) + except: # pylint: disable=W0702 + _LOGGER.error( + "async_get_events: %s: Failed to parse ICS!", + self.name, + exc_info=True, + ) + event_list = [] + + for event in event_list: + event.summary = self._summary_prefix + event.summary + + return event_list + + @Throttle(MIN_TIME_BETWEEN_UPDATES) + def update(self): + """Get the current or next event.""" + _LOGGER.debug("%s: Update was called", self.name) + if self._calendar_data.download_calendar(): + _LOGGER.debug("%s: Setting calendar content", self.name) + self.parser.set_content(self._calendar_data.get()) + try: + self.event = self.parser.get_current_event( + include_all_day=self.include_all_day, + now=hanow(), + days=self._days, + offset_hours=self._offset_hours, + ) + except: # pylint: disable=W0702 + _LOGGER.error( + "update: %s: Failed to parse ICS!", self.name, exc_info=True + ) + if self.event is not None: + _LOGGER.debug( + "%s: got event: %s; start: %s; end: %s; all_day: %s", + self.name, + self.event.summary, + self.event.start, + self.event.end, + self.event.all_day, + ) + (summary, offset) = extract_offset(self.event.summary, OFFSET) + self.event.summary = self._summary_prefix + summary + self.offset = offset + return True + + _LOGGER.debug("%s: No event found!", self.name) + return False diff --git a/custom_components/ics_calendar/calendardata.py b/custom_components/ics_calendar/calendardata.py new file mode 100644 index 0000000..2d870ea --- /dev/null +++ b/custom_components/ics_calendar/calendardata.py @@ -0,0 +1,198 @@ +"""Provide CalendarData class.""" +import zlib +from datetime import timedelta +from gzip import BadGzipFile, GzipFile +from logging import Logger +from threading import Lock +from urllib.error import ContentTooShortError, HTTPError, URLError +from urllib.request import ( + HTTPBasicAuthHandler, + HTTPDigestAuthHandler, + HTTPPasswordMgrWithDefaultRealm, + build_opener, + install_opener, + urlopen, +) + +from homeassistant.util.dt import now as hanow + + +class CalendarData: + """CalendarData class. + + The CalendarData class is used to download and cache calendar data from a + given URL. Use the get method to retrieve the data after constructing your + instance. + """ + + opener_lock = Lock() + + def __init__( + self, logger: Logger, name: str, url: str, min_update_time: timedelta + ): + """Construct CalendarData object. + + :param logger: The logger for reporting problems + :type logger: Logger + :param name: The name of the calendar (used for reporting problems) + :type name: str + :param url: The URL of the calendar + :type url: str + :param min_update_time: The minimum time between downloading data from + the URL when requested + :type min_update_time: timedelta + """ + self._calendar_data = None + self._last_download = None + self._min_update_time = min_update_time + self._opener = None + self.logger = logger + self.name = name + self.url = url + + def download_calendar(self) -> bool: + """Download the calendar data. + + This only downloads data if self.min_update_time has passed since the + last download. + + returns: True if data was downloaded, otherwise False. + rtype: bool + """ + now = hanow() + if ( + self._calendar_data is None + or self._last_download is None + or (now - self._last_download) > self._min_update_time + ): + self._last_download = now + self._calendar_data = None + self.logger.debug( + "%s: Downloading calendar data from: %s", self.name, self.url + ) + self._download_data() + return self._calendar_data is not None + + return False + + def get(self) -> str: + """Get the calendar data that was downloaded. + + :return: The downloaded calendar data. + :rtype: str + """ + return self._calendar_data + + def set_headers( + self, + user_name: str, + password: str, + user_agent: str, + accept_header: str, + ): + """Set a user agent, accept header, and/or user name and password. + + The user name and password will be set into an HTTPBasicAuthHandler an + an HTTPDigestAuthHandler. Both are attached to a new urlopener, so + that HTTP Basic Auth and HTTP Digest Auth will be supported when + opening the URL. + + If the user_agent parameter is not "", a User-agent header will be + added to the urlopener. + + :param user_name: The user name + :type user_name: str + :param password: The password + :type password: str + :param user_agent: The User Agent string to use or "" + :type user_agent: str + :param accept_header: The accept header string to use or "" + :type accept_header: str + """ + if user_name != "" and password != "": + passman = HTTPPasswordMgrWithDefaultRealm() + passman.add_password(None, self.url, user_name, password) + basic_auth_handler = HTTPBasicAuthHandler(passman) + digest_auth_handler = HTTPDigestAuthHandler(passman) + self._opener = build_opener( + digest_auth_handler, basic_auth_handler + ) + + additional_headers = [] + if user_agent != "": + additional_headers.append(("User-agent", user_agent)) + if accept_header != "": + additional_headers.append(("Accept", accept_header)) + if len(additional_headers) > 0: + if self._opener is None: + self._opener = build_opener() + self._opener.addheaders = additional_headers + + def _decode_data(self, conn): + if ( + "Content-Encoding" in conn.headers + and conn.headers["Content-Encoding"] == "gzip" + ): + reader = GzipFile(fileobj=conn) + else: + reader = conn + try: + return self._decode_stream(reader.read()).replace("\0", "") + except zlib.error: + self.logger.error( + "%s: Failed to uncompress gzip data from url(%s): zlib", + self.name, + self.url, + ) + except BadGzipFile as gzip_error: + self.logger.error( + "%s: Failed to uncompress gzip data from url(%s): %s", + self.name, + self.url, + gzip_error.strerror, + ) + return None + + def _decode_stream(self, strm): + for encoding in "utf-8-sig", "utf-8", "utf-16": + try: + return strm.decode(encoding) + except UnicodeDecodeError: + continue + return None + + def _download_data(self): + """Download the calendar data.""" + try: + with CalendarData.opener_lock: + if self._opener is not None: + install_opener(self._opener) + with urlopen(self._make_url()) as conn: + self._calendar_data = self._decode_data(conn) + except HTTPError as http_error: + self.logger.error( + "%s: Failed to open url(%s): %s", + self.name, + self.url, + http_error.reason, + ) + except ContentTooShortError as content_too_short_error: + self.logger.error( + "%s: Could not download calendar data: %s", + self.name, + content_too_short_error.reason, + ) + except URLError as url_error: + self.logger.error( + "%s: Failed to open url: %s", self.name, url_error.reason + ) + except: # pylint: disable=W0702 + self.logger.error( + "%s: Failed to open url!", self.name, exc_info=True + ) + + def _make_url(self): + now = hanow() + return self.url.replace("{year}", f"{now.year:04}").replace( + "{month}", f"{now.month:02}" + ) diff --git a/custom_components/ics_calendar/const.py b/custom_components/ics_calendar/const.py new file mode 100644 index 0000000..524b424 --- /dev/null +++ b/custom_components/ics_calendar/const.py @@ -0,0 +1,7 @@ +"""Constants for ics_calendar platform.""" +VERSION = "4.1.0" +DOMAIN = "ics_calendar" +UPGRADE_URL = ( + "https://github.com/franc6/ics_calendar/blob/releases/" + "UpgradeTo4.0AndLater.md" +) diff --git a/custom_components/ics_calendar/filter.py b/custom_components/ics_calendar/filter.py new file mode 100644 index 0000000..4787ded --- /dev/null +++ b/custom_components/ics_calendar/filter.py @@ -0,0 +1,124 @@ +"""Provide Filter class.""" +import re +from ast import literal_eval +from typing import List, Optional, Pattern + +from homeassistant.components.calendar import CalendarEvent + + +class Filter: + """Filter class. + + The Filter class is used to filter events according to the exclude and + include rules. + """ + + def __init__(self, exclude: str, include: str): + """Construct Filter class. + + :param exclude: The exclude rules + :type exclude: str + :param include: The include rules + :type include: str + """ + self._exclude = Filter.set_rules(exclude) + self._include = Filter.set_rules(include) + + @staticmethod + def set_rules(rules: str) -> List[Pattern]: + """Set the given rules into an array which is returned. + + :param rules: The rules to set + :type rules: str + :return: An array of regular expressions + :rtype: List[Pattern] + """ + arr = [] + if rules != "": + for rule in literal_eval(rules): + if rule.startswith("/"): + re_flags = re.NOFLAG + [expr, flags] = rule[1:].split("/") + for flag in flags: + match flag: + case "i": + re_flags |= re.IGNORECASE + case "m": + re_flags |= re.MULTILINE + case "s": + re_flags |= re.DOTALL + arr.append(re.compile(expr, re_flags)) + else: + arr.append(re.compile(rule, re.IGNORECASE)) + return arr + + def _is_match( + self, summary: str, description: Optional[str], regexes: List[Pattern] + ) -> bool: + """Indicate if the event matches the given list of regular expressions. + + :param summary: The event summary to examine + :type summary: str + :param description: The event description summary to examine + :type description: Optional[str] + :param regexes: The regular expressions to match against + :type regexes: List[] + :return: True if the event matches the exclude filter + :rtype: bool + """ + for regex in regexes: + if regex.search(summary) or ( + description and regex.search(description) + ): + return True + + return False + + def _is_excluded(self, summary: str, description: Optional[str]) -> bool: + """Indicate if the event should be excluded. + + :param summary: The event summary to examine + :type summary: str + :param description: The event description summary to examine + :type description: Optional[str] + :return: True if the event matches the exclude filter + :rtype: bool + """ + return self._is_match(summary, description, self._exclude) + + def _is_included(self, summary: str, description: Optional[str]) -> bool: + """Indicate if the event should be included. + + :param summary: The event summary to examine + :type summary: str + :param description: The event description summary to examine + :type description: Optional[str] + :return: True if the event matches the include filter + :rtype: bool + """ + return self._is_match(summary, description, self._include) + + def filter(self, summary: str, description: Optional[str]) -> bool: + """Check if the event should be included or not. + + :param summary: The event summary to examine + :type summary: str + :param description: The event description summary to examine + :type description: Optional[str] + :return: true if the event should be included, otherwise false + :rtype: bool + """ + add_event = not self._is_excluded(summary, description) + if not add_event: + add_event = self._is_included(summary, description) + return add_event + + def filter_event(self, event: CalendarEvent) -> bool: + """Check if the event should be included or not. + + :param event: The event to examine + :type event: CalendarEvent + :return: true if the event should be included, otherwise false + :rtype: bool + """ + return self.filter(event.summary, event.description) diff --git a/custom_components/ics_calendar/icalendarparser.py b/custom_components/ics_calendar/icalendarparser.py new file mode 100644 index 0000000..dc9e1d0 --- /dev/null +++ b/custom_components/ics_calendar/icalendarparser.py @@ -0,0 +1,97 @@ +"""Provide ICalendarParser class.""" +import importlib +from datetime import datetime +from typing import Optional + +from homeassistant.components.calendar import CalendarEvent + +from .filter import Filter + + +class ICalendarParser: + """Provide interface for various parser classes. + + The class provides a static method , get_instace, to get a parser instance. + The non static methods allow this class to act as an "interface" for the + parser classes. + """ + + @staticmethod + def get_class(parser: str): + """Get the class of the requested parser.""" + parser_module_name = ".parsers.parser_" + parser + parser = "Parser" + parser.upper() + try: + module = importlib.import_module(parser_module_name, __package__) + return getattr(module, parser) + except ImportError: + return None + + @staticmethod + def get_instance(parser: str, *args): + """Get an instance of the requested parser.""" + parser_cls = ICalendarParser.get_class(parser) + if parser_cls is not None: + return parser_cls(*args) + return None + + def set_content(self, content: str): + """Parse content into a calendar object. + + This must be called at least once before get_event_list or + get_current_event. + :param content is the calendar data + :type content str + """ + + def set_filter(self, filt: Filter): + """Set a Filter object to filter events. + + :param filt: The Filter object + :type exclude: Filter + """ + + def get_event_list( + self, + start: datetime, + end: datetime, + include_all_day: bool, + offset_hours: int = 0, + ) -> list[CalendarEvent]: + """Get a list of events. + + Gets the events from start to end, including or excluding all day + events. + :param start the earliest start time of events to return + :type start datetime + :param end the latest start time of events to return + :type end datetime + :param include_all_day if true, all day events will be included. + :type include_all_day boolean + :param offset_hours the number of hours to offset the event + :type offset_hours int + :returns a list of events, or an empty list + :rtype list[CalendarEvent] + """ + + def get_current_event( + self, + include_all_day: bool, + now: datetime, + days: int, + offset_hours: int = 0, + ) -> Optional[CalendarEvent]: + """Get the current or next event. + + Gets the current event, or the next upcoming event with in the + specified number of days, if there is no current event. + :param include_all_day if true, all day events will be included. + :type include_all_day boolean + :param now the current date and time + :type now datetime + :param days the number of days to check for an upcoming event + :type days int + :param offset_hours the number of hours to offset the event + :type offset_hours int + :returns a CalendarEvent or None + """ diff --git a/custom_components/ics_calendar/manifest.json b/custom_components/ics_calendar/manifest.json new file mode 100644 index 0000000..c52135c --- /dev/null +++ b/custom_components/ics_calendar/manifest.json @@ -0,0 +1,13 @@ +{ + + "domain": "ics_calendar", + "name": "ics Calendar", + "codeowners": ["@franc6"], + "dependencies": [], + "documentation": "https://github.com/franc6/ics_calendar", + "integration_type": "service", + "iot_class": "cloud_polling", + "issue_tracker": "https://github.com/franc6/ics_calendar/issues", + "requirements": ["ics>=0.7.2", "recurring_ical_events>=2.0.2", "icalendar>=5.0.4"], + "version": "4.1.0" +} diff --git a/custom_components/ics_calendar/parsers/__init__.py b/custom_components/ics_calendar/parsers/__init__.py new file mode 100644 index 0000000..5fd5428 --- /dev/null +++ b/custom_components/ics_calendar/parsers/__init__.py @@ -0,0 +1 @@ +"""Provide parsers.""" diff --git a/custom_components/ics_calendar/parsers/__pycache__/__init__.cpython-311.pyc b/custom_components/ics_calendar/parsers/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..a9032b1 Binary files /dev/null and b/custom_components/ics_calendar/parsers/__pycache__/__init__.cpython-311.pyc differ diff --git a/custom_components/ics_calendar/parsers/__pycache__/parser_rie.cpython-311.pyc b/custom_components/ics_calendar/parsers/__pycache__/parser_rie.cpython-311.pyc new file mode 100644 index 0000000..79ed694 Binary files /dev/null and b/custom_components/ics_calendar/parsers/__pycache__/parser_rie.cpython-311.pyc differ diff --git a/custom_components/ics_calendar/parsers/parser_ics.py b/custom_components/ics_calendar/parsers/parser_ics.py new file mode 100644 index 0000000..fc1b5e9 --- /dev/null +++ b/custom_components/ics_calendar/parsers/parser_ics.py @@ -0,0 +1,190 @@ +"""Support for ics parser.""" +import re +from datetime import date, datetime, timedelta +from typing import Optional, Union + +from arrow import Arrow, get as arrowget +from homeassistant.components.calendar import CalendarEvent +from ics import Calendar + +from ..filter import Filter +from ..icalendarparser import ICalendarParser +from ..utility import compare_event_dates + + +class ParserICS(ICalendarParser): + """Class to provide parser using ics module.""" + + def __init__(self): + """Construct ParserICS.""" + self._re_method = re.compile("^METHOD:.*$", flags=re.MULTILINE) + self._calendar = None + self._filter = Filter("", "") + + def set_content(self, content: str): + """Parse content into a calendar object. + + This must be called at least once before get_event_list or + get_current_event. + :param content is the calendar data + :type content str + """ + self._calendar = Calendar(re.sub(self._re_method, "", content)) + + def set_filter(self, filt: Filter): + """Set a Filter object to filter events. + + :param filt: The Filter object + :type exclude: Filter + """ + self._filter = filt + + def get_event_list( + self, start, end, include_all_day: bool, offset_hours: int = 0 + ) -> list[CalendarEvent]: + """Get a list of events. + + Gets the events from start to end, including or excluding all day + events. + :param start the earliest start time of events to return + :type datetime + :param end the latest start time of events to return + :type datetime + :param include_all_day if true, all day events will be included. + :type boolean + :param offset_hours the number of hours to offset the event + :type offset_hours int + :returns a list of events, or an empty list + :rtype list[CalendarEvent] + """ + event_list: list[CalendarEvent] = [] + + if self._calendar is not None: + # ics 0.8 takes datetime not Arrow objects + # ar_start = start + # ar_end = end + ar_start = arrowget(start - timedelta(hours=offset_hours)) + ar_end = arrowget(end - timedelta(hours=offset_hours)) + + for event in self._calendar.timeline.included(ar_start, ar_end): + if event.all_day and not include_all_day: + continue + summary: str = "" + # ics 0.8 uses 'summary' reliably, older versions use 'name' + # if hasattr(event, "summary"): + # summary = event.summary + # elif hasattr(event, "name"): + summary = event.name + calendar_event: CalendarEvent = CalendarEvent( + summary=summary, + start=ParserICS.get_date( + event.begin, event.all_day, offset_hours + ), + end=ParserICS.get_date( + event.end, event.all_day, offset_hours + ), + location=event.location, + description=event.description, + ) + if self._filter.filter_event(calendar_event): + event_list.append(calendar_event) + + return event_list + + def get_current_event( # noqa: $701 + self, + include_all_day: bool, + now: datetime, + days: int, + offset_hours: int = 0, + ) -> Optional[CalendarEvent]: + """Get the current or next event. + + Gets the current event, or the next upcoming event with in the + specified number of days, if there is no current event. + :param include_all_day if true, all day events will be included. + :type boolean + :param now the current date and time + :type datetime + :param days the number of days to check for an upcoming event + :type int + :param offset_hours the number of hours to offset the event + :type int + :returns a CalendarEvent or None + """ + if self._calendar is None: + return None + + temp_event = None + now = now - timedelta(offset_hours) + end = now + timedelta(days=days) + for event in self._calendar.timeline.included( + arrowget(now), arrowget(end) + ): + if event.all_day and not include_all_day: + continue + + if not self._filter.filter(event.name, event.description): + continue + + if temp_event is None or compare_event_dates( + now, + temp_event.end, + temp_event.begin, + temp_event.all_day, + event.end, + event.begin, + event.all_day, + ): + temp_event = event + + if temp_event is None: + return None + # if hasattr(event, "summary"): + # summary = temp_event.summary + # elif hasattr(event, "name"): + summary = temp_event.name + return CalendarEvent( + summary=summary, + start=ParserICS.get_date( + temp_event.begin, temp_event.all_day, offset_hours + ), + end=ParserICS.get_date( + temp_event.end, temp_event.all_day, offset_hours + ), + location=temp_event.location, + description=temp_event.description, + ) + + @staticmethod + def get_date( + arw: Arrow, is_all_day: bool, offset_hours: int + ) -> Union[datetime, date]: + """Get datetime. + + :param arw The arrow object representing the date. + :type Arrow + :param is_all_day If true, the returned datetime will have the time + component set to 0. + :type: bool + :param offset_hours the number of hours to offset the event + :type int + :returns The datetime. + :rtype datetime + """ + # if isinstance(arw, Arrow): + if is_all_day: + return arw.date() + # else: + # if arw.tzinfo is None or arw.tzinfo.utcoffset(arw) is None + # or is_all_day: + # arw = arw.astimezone() + # if is_all_day: + # return arw.date() + # + arw = arw.shift(hours=offset_hours) + + return_value = arw.datetime + if return_value.tzinfo is None: + return_value = return_value.astimezone() + return return_value diff --git a/custom_components/ics_calendar/parsers/parser_rie.py b/custom_components/ics_calendar/parsers/parser_rie.py new file mode 100644 index 0000000..5d71f7a --- /dev/null +++ b/custom_components/ics_calendar/parsers/parser_rie.py @@ -0,0 +1,198 @@ +"""Support for recurring_ical_events parser.""" +from datetime import date, datetime, timedelta +from typing import Optional, Union + +import recurring_ical_events as rie +from homeassistant.components.calendar import CalendarEvent +from icalendar import Calendar + +from ..filter import Filter +from ..icalendarparser import ICalendarParser +from ..utility import compare_event_dates + + +class ParserRIE(ICalendarParser): + """Provide parser using recurring_ical_events.""" + + def __init__(self): + """Construct ParserRIE.""" + self._calendar = None + self.oneday = timedelta(days=1) + self.oneday2 = timedelta(hours=23, minutes=59, seconds=59) + self._filter = Filter("", "") + + def set_content(self, content: str): + """Parse content into a calendar object. + + This must be called at least once before get_event_list or + get_current_event. + :param content is the calendar data + :type content str + """ + self._calendar = Calendar.from_ical(content) + + def set_filter(self, filt: Filter): + """Set a Filter object to filter events. + + :param filt: The Filter object + :type exclude: Filter + """ + self._filter = filt + + def get_event_list( + self, + start: datetime, + end: datetime, + include_all_day: bool, + offset_hours: int = 0, + ) -> list[CalendarEvent]: + """Get a list of events. + + Gets the events from start to end, including or excluding all day + events. + :param start the earliest start time of events to return + :type datetime + :param end the latest start time of events to return + :type datetime + :param include_all_day if true, all day events will be included. + :type boolean + :param offset_hours the number of hours to offset the event + :type offset_hours int + :returns a list of events, or an empty list + :rtype list[CalendarEvent] + """ + event_list: list[CalendarEvent] = [] + + if self._calendar is not None: + for event in rie.of(self._calendar).between( + start - timedelta(hours=offset_hours), + end - timedelta(hours=offset_hours), + ): + start, end, all_day = self.is_all_day(event, offset_hours) + + if all_day and not include_all_day: + continue + + calendar_event: CalendarEvent = CalendarEvent( + summary=event.get("SUMMARY"), + start=start, + end=end, + location=event.get("LOCATION"), + description=event.get("DESCRIPTION"), + ) + if self._filter.filter_event(calendar_event): + event_list.append(calendar_event) + + return event_list + + def get_current_event( # noqa: R701 + self, + include_all_day: bool, + now: datetime, + days: int, + offset_hours: int = 0, + ) -> Optional[CalendarEvent]: + """Get the current or next event. + + Gets the current event, or the next upcoming event with in the + specified number of days, if there is no current event. + :param include_all_day if true, all day events will be included. + :type boolean + :param now the current date and time + :type datetime + :param days the number of days to check for an upcoming event + :type int + :param offset_hours the number of hours to offset the event + :type offset_hours int + :returns a CalendarEvent or None + """ + if self._calendar is None: + return None + + temp_event: CalendarEvent = None + temp_start: date | datetime = None + temp_end: date | datetime = None + temp_all_day: bool = None + end: datetime = now + timedelta(days=days) + for event in rie.of(self._calendar).between( + now - timedelta(hours=offset_hours), + end - timedelta(hours=offset_hours), + ): + start, end, all_day = self.is_all_day(event, offset_hours) + + if all_day and not include_all_day: + continue + + if not self._filter.filter( + event.get("SUMMARY"), event.get("DESCRIPTION") + ): + continue + + if temp_start is None or compare_event_dates( + now, temp_end, temp_start, temp_all_day, end, start, all_day + ): + temp_event = event + temp_start = start + temp_end = end + temp_all_day = all_day + + if temp_event is None: + return None + + return CalendarEvent( + summary=temp_event.get("SUMMARY"), + start=temp_start, + end=temp_end, + location=temp_event.get("LOCATION"), + description=temp_event.get("DESCRIPTION"), + ) + + @staticmethod + def get_date(date_time) -> Union[datetime, date]: + """Get datetime with timezone information. + + If a date object is passed, it will first have a time component added, + set to 0. + :param date_time The date or datetime object + :type date_time datetime or date + :type: bool + :returns The datetime. + :rtype datetime + """ + # Must use type here, since a datetime is also a date! + if isinstance(date_time, date) and not isinstance(date_time, datetime): + date_time = datetime.combine(date_time, datetime.min.time()) + return date_time.astimezone() + + def is_all_day(self, event, offset_hours: int): + """Determine if the event is an all day event. + + Return all day status and start and end times for the event. + :param event The event to examine + :param offset_hours the number of hours to offset the event + :type offset_hours int + """ + start: datetime | date = ParserRIE.get_date(event.get("DTSTART").dt) + end: datetime | date = ParserRIE.get_date(event.get("DTEND").dt) + all_day = False + diff = event.get("DURATION") + if diff is not None: + diff = diff.dt + else: + diff = end - start + if (start == end or diff in {self.oneday, self.oneday2}) and all( + x == 0 for x in [start.hour, start.minute, start.second] + ): + # if all_day, start and end must be date, not datetime! + start = start.date() + end = end.date() + all_day = True + else: + start = start + timedelta(hours=offset_hours) + end = end + timedelta(hours=offset_hours) + if start.tzinfo is None: + start = start.astimezone() + if end.tzinfo is None: + end = end.astimezone() + + return start, end, all_day diff --git a/custom_components/ics_calendar/utility.py b/custom_components/ics_calendar/utility.py new file mode 100644 index 0000000..821e454 --- /dev/null +++ b/custom_components/ics_calendar/utility.py @@ -0,0 +1,37 @@ +"""Utility methods.""" +from datetime import date, datetime + + +def make_datetime(val): + """Ensure val is a datetime, not a date.""" + if isinstance(val, date) and not isinstance(val, datetime): + return datetime.combine(val, datetime.min.time()).astimezone() + return val + + +def compare_event_dates( # pylint: disable=R0913 + now, end2, start2, all_day2, end, start, all_day +) -> bool: + """Determine if end2 and start2 are newer than end and start.""" + # Make sure we only compare datetime values, not dates with datetimes. + # Set each date object to a datetime at midnight. + end = make_datetime(end) + end2 = make_datetime(end2) + start = make_datetime(start) + start2 = make_datetime(start2) + + if all_day2 == all_day: + if end2 == end: + return start2 > start + return end2 > end and start2 >= start + + if now.tzinfo is None: + now = now.astimezone() + + event2_current = start2 <= now <= end2 + event_current = start <= now <= end + + if event_current and event2_current: + return all_day + + return start2 >= start or end2 >= end