change to create an ics file instead of using google calender
This commit is contained in:
251
main.py
251
main.py
@@ -1,188 +1,185 @@
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from datetime import datetime, timezone
|
||||
from googleapiclient.discovery import build
|
||||
from httplib2 import Http
|
||||
from oauth2client import file, client, tools
|
||||
import pytz
|
||||
import hashlib
|
||||
import os
|
||||
|
||||
# We don't store this url in the source, as it is sensitive
|
||||
URL = open("timetableurl").read().strip()
|
||||
# ------------------------------------------------------------
|
||||
# Config
|
||||
# ------------------------------------------------------------
|
||||
|
||||
RESPONSE_FILE = "response.txt"
|
||||
OLD_CALENDAR = "old_calendar.ics"
|
||||
NEW_CALENDAR = "calendar.ics"
|
||||
LOCAL_TZ = pytz.timezone("Europe/London")
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# Parsing timetable JS
|
||||
# ------------------------------------------------------------
|
||||
|
||||
def parse_events(events_data):
|
||||
# Replace date objects with tuples, easier to parse
|
||||
# Replace JS date objects
|
||||
events_data = events_data.replace("new Date", "")
|
||||
|
||||
cleaned_data = ""
|
||||
|
||||
# Remove comments, properties to keys
|
||||
for line in events_data.split("\n"):
|
||||
comment_pos = line.find("//")
|
||||
if comment_pos != -1:
|
||||
line = line[:comment_pos]
|
||||
|
||||
|
||||
if ":" in line:
|
||||
line_values = line.split(":")
|
||||
line = "'" + line_values[0] + "': " + line_values[1]
|
||||
|
||||
cleaned_data += line + "\n"
|
||||
key, val = line.split(":", 1)
|
||||
line = f"'{key}': {val}"
|
||||
|
||||
cleaned_data += line + "\n"
|
||||
|
||||
# Parse the event, as if it were a dict
|
||||
parsed_data = eval(cleaned_data)
|
||||
|
||||
# Parse the datetime info
|
||||
for event in parsed_data:
|
||||
if "start" in event:
|
||||
event["start"] = list(event["start"])
|
||||
event["start"][1] += 1
|
||||
event["start"].append(0)
|
||||
event["start"] = datetime(*event["start"])
|
||||
event["start"] = pytz.timezone("Europe/London").localize(event["start"])
|
||||
|
||||
s = list(event["start"])
|
||||
s[1] += 1
|
||||
s.append(0)
|
||||
event["start"] = LOCAL_TZ.localize(datetime(*s))
|
||||
|
||||
if "end" in event:
|
||||
event["end"] = list(event["end"])
|
||||
event["end"][1] += 1
|
||||
event["end"].append(0)
|
||||
event["end"] = datetime(*event["end"])
|
||||
event["end"] = pytz.timezone("Europe/London").localize(event["end"])
|
||||
e = list(event["end"])
|
||||
e[1] += 1
|
||||
e.append(0)
|
||||
event["end"] = LOCAL_TZ.localize(datetime(*e))
|
||||
|
||||
return parsed_data
|
||||
|
||||
|
||||
def get_events_data(url):
|
||||
page_data = requests.get(url).text
|
||||
def get_events_data_from_file(path):
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
page_data = f.read()
|
||||
|
||||
soup = BeautifulSoup(page_data, features="html.parser")
|
||||
|
||||
source = ""
|
||||
for script in soup.head.findAll("script", {"type": "text/javascript"}):
|
||||
for script in soup.head.find_all("script", {"type": "text/javascript"}):
|
||||
if not script.has_attr("src"):
|
||||
source = script.text
|
||||
break
|
||||
|
||||
events_data = source.split("events:")[1].split("]")[0] +"]"
|
||||
else:
|
||||
raise RuntimeError("Could not find inline timetable script")
|
||||
|
||||
return events_data
|
||||
return source.split("events:")[1].split("]")[0] + "]"
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# ICS helpers
|
||||
# ------------------------------------------------------------
|
||||
|
||||
def ics_time(dt):
|
||||
return dt.astimezone(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
|
||||
|
||||
|
||||
def create_google_event(event):
|
||||
new_event = event.copy()
|
||||
|
||||
if not new_event:
|
||||
return new_event
|
||||
|
||||
# Make the event the correct format
|
||||
new_event["summary"] = event["moduleDesc"] + " - " + event["title"]
|
||||
new_event["description"] = event["lecturer"] + " - " + event["room"]
|
||||
|
||||
new_event["end"] = {"dateTime": str(event["end"].isoformat()), "timeZone": "Europe/London"}
|
||||
new_event["start"] = {"dateTime": str(event["start"].isoformat()), "timeZone": "Europe/London"}
|
||||
|
||||
new_event["reminders"] = {'useDefault': False,
|
||||
'overrides': [{'method': 'popup', 'minutes': 30}]}
|
||||
return new_event
|
||||
def make_uid(event):
|
||||
key = f"{event['moduleDesc']}|{event['title']}|{event['start'].isoformat()}"
|
||||
return hashlib.sha1(key.encode()).hexdigest() + "@timetable"
|
||||
|
||||
|
||||
# Read and write access
|
||||
SCOPES = "https://www.googleapis.com/auth/calendar"
|
||||
def create_ics_event(event):
|
||||
return {
|
||||
"uid": make_uid(event),
|
||||
"summary": f"{event['moduleDesc']} - {event['title']}",
|
||||
"description": f"{event['lecturer']} - {event['room']}",
|
||||
"start": event["start"],
|
||||
"end": event["end"],
|
||||
}
|
||||
|
||||
def get_calendar_service():
|
||||
'''
|
||||
Connect to the google calendar service, and return the
|
||||
service object
|
||||
'''
|
||||
# ------------------------------------------------------------
|
||||
# Load existing calendar (UIDs only)
|
||||
# ------------------------------------------------------------
|
||||
|
||||
store = file.Storage("token.json")
|
||||
creds = store.get()
|
||||
def load_existing_uids(path):
|
||||
uids = set()
|
||||
|
||||
# Run prompt to get the google credentials
|
||||
if not creds or creds.invalid:
|
||||
flow = client.flow_from_clientsecrets("redentials.json", SCOPES)
|
||||
creds = tools.run_flow(flow, store)
|
||||
if not os.path.exists(path):
|
||||
return uids
|
||||
|
||||
return build("calendar", "v3", http=creds.authorize(Http()))
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
current_uid = None
|
||||
cancelled = False
|
||||
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
|
||||
def execute_batch(service, commands):
|
||||
batch = service.new_batch_http_request()
|
||||
batch_count = 0
|
||||
if line == "BEGIN:VEVENT":
|
||||
current_uid = None
|
||||
cancelled = False
|
||||
|
||||
for command in commands:
|
||||
batch.add(command)
|
||||
batch_count += 1
|
||||
|
||||
if batch_count > 999:
|
||||
batch.execute()
|
||||
elif line.startswith("UID:"):
|
||||
current_uid = line[4:]
|
||||
|
||||
batch = service.new_batch_http_request()
|
||||
batch_count = 0
|
||||
|
||||
if batch_count > 0:
|
||||
batch.execute()
|
||||
elif line == "STATUS:CANCELLED":
|
||||
cancelled = True
|
||||
|
||||
elif line == "END:VEVENT" and current_uid and not cancelled:
|
||||
uids.add(current_uid)
|
||||
|
||||
return uids
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# Main
|
||||
# ------------------------------------------------------------
|
||||
|
||||
def main():
|
||||
type_to_color = {}
|
||||
# A queue of colors, where a color is removed when
|
||||
# when an event we have not seen before exists
|
||||
color_queue = list(range(0, 12, 3))
|
||||
events_js = get_events_data_from_file(RESPONSE_FILE)
|
||||
parsed_events = parse_events(events_js)
|
||||
|
||||
service = get_calendar_service()
|
||||
new_events = [
|
||||
create_ics_event(e)
|
||||
for e in parsed_events
|
||||
if e
|
||||
]
|
||||
|
||||
# Get a list of all events in the future
|
||||
results = service.events().list(timeMin=datetime.now().isoformat() + 'Z', calendarId='primary').execute()
|
||||
future_events = results.get("items", [])
|
||||
old_uids = load_existing_uids(OLD_CALENDAR)
|
||||
new_uids = {e["uid"] for e in new_events}
|
||||
|
||||
cov_events = parse_events(get_events_data(URL))
|
||||
new_events = []
|
||||
now = ics_time(datetime.now(timezone.utc))
|
||||
|
||||
new_summaries = set()
|
||||
lines = [
|
||||
"BEGIN:VCALENDAR",
|
||||
"VERSION:2.0",
|
||||
"PRODID:-//Timetable Sync//EN",
|
||||
"CALSCALE:GREGORIAN",
|
||||
]
|
||||
|
||||
for event in cov_events:
|
||||
if not event:
|
||||
continue
|
||||
# Add / update events
|
||||
for ev in new_events:
|
||||
lines.extend([
|
||||
"BEGIN:VEVENT",
|
||||
f"UID:{ev['uid']}",
|
||||
f"DTSTAMP:{now}",
|
||||
f"DTSTART:{ics_time(ev['start'])}",
|
||||
f"DTEND:{ics_time(ev['end'])}",
|
||||
f"SUMMARY:{ev['summary']}",
|
||||
f"DESCRIPTION:{ev['description']}",
|
||||
"END:VEVENT",
|
||||
])
|
||||
|
||||
new_event = create_google_event(event)
|
||||
# Cancel removed events
|
||||
for uid in old_uids - new_uids:
|
||||
lines.extend([
|
||||
"BEGIN:VEVENT",
|
||||
f"UID:{uid}",
|
||||
f"DTSTAMP:{now}",
|
||||
"STATUS:CANCELLED",
|
||||
"END:VEVENT",
|
||||
])
|
||||
|
||||
color_type = new_event["mainColor"]
|
||||
|
||||
if color_type in type_to_color:
|
||||
colorId = type_to_color[color_type]
|
||||
else:
|
||||
colorId = color_queue.pop(0)
|
||||
color_queue.append(colorId)
|
||||
type_to_color[color_type] = colorId
|
||||
lines.append("END:VCALENDAR")
|
||||
|
||||
new_event["colorId"] = colorId
|
||||
with open(NEW_CALENDAR, "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(lines))
|
||||
|
||||
new_events.append(new_event)
|
||||
new_summaries.add(new_event["summary"])
|
||||
|
||||
# Make sure we remove old events so as not to create duplicates
|
||||
if not future_events:
|
||||
print('No existing events found')
|
||||
else:
|
||||
deletes = []
|
||||
for existing_event in future_events:
|
||||
if "summary" in existing_event and existing_event["summary"] in new_summaries:
|
||||
deletes.append(service.events()
|
||||
.delete(calendarId='primary',
|
||||
eventId=existing_event['id']))
|
||||
|
||||
|
||||
print(f'Removing {len(deletes)} existing events')
|
||||
execute_batch(service, deletes)
|
||||
|
||||
inserts = []
|
||||
for new_event in new_events:
|
||||
inserts.append(service.events()
|
||||
.insert(body=new_event,
|
||||
calendarId='primary'))
|
||||
print(f"Inserting {len(inserts)} new events")
|
||||
execute_batch(service, inserts)
|
||||
|
||||
print(f"Added / updated: {len(new_events)}")
|
||||
print(f"Removed: {len(old_uids - new_uids)}")
|
||||
print(f"Wrote {NEW_CALENDAR}")
|
||||
|
||||
# ------------------------------------------------------------
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user