Title
import requests
from bs4 import BeautifulSoup
import json
_URL = "https://multimedia.europarl.europa.eu/ga/search?sn=true&st=EPV_EDITED_VIDEOS-WS_VIDEO&ut=EPV_REPLAY-EPV_VIDEO_FOOTAGE-EPV_PHOTO-EPV_AUDIO&ol=EPV_EDITED_VIDEOS&lg=ga_IE&at=1&p_p_id=advanced_search_portlet_AdvancedSearchPortlet&_advanced_search_portlet_AdvancedSearchPortlet_p="
def get_soup(num = "1"):
req = requests.get(_URL + num)
if req.status_code != 200:
raise Exception("Problem scraping page " + num)
return BeautifulSoup(req.content, "lxml")
def get_last_page_number(soup):
for last_candidate in soup.find_all("li", {"class": "last"}):
anchors = last_candidate.find_all("a")
for anchor in anchors:
# if "href" in anchor and "AdvancedSearchPortlet_p" in anchor["href"]:
if "AdvancedSearchPortlet_p" in anchor["href"]:
eq_pos = anchor["href"].rfind("=")
return anchor["href"][eq_pos + 1:]
def get_video_urls(videos):
video_urls = []
no_url = []
for video in videos:
item = {}
if "europarltv-link" not in video.text:
no_url.append(video)
vid_url = video.find("a", {"class", "europarltv-link"})
item["url"] = "https://multimedia.europarl.europa.eu" + vid_url["href"]
data_divs = video.find_all("div", {"class": "media-quick-actions"})
for data_div in data_divs:
if data_div.has_attr("data-id"):
item["data_id"] = data_div["data-id"]
video_urls.append(item)
return video_urls
def scrape_video_page(num = "1", soup = None):
if soup is None:
req = requests.get(_URL + num)
if req.status_code != 200:
raise Exception("Problem scraping page " + num)
soup = BeautifulSoup(req.content, "lxml")
videos = soup.find_all("div", {"class": "media-preview"})
return get_video_urls(videos)
soup = get_soup()
last = get_last_page_number(soup)
videos = scrape_video_page(num = "1")
for num in range(2, int(last) + 1):
videos += scrape_video_page(num = str(num))
_JSON_REQUEST = """
{
"1":{"service":"session","action":"startWidgetSession","widgetId":"_102"},
"2":{"service":"baseEntry","action":"list","ks":"{1:result:ks}",
"filter":{"redirectFromEntryId":"DUMMY_ENTRY_ID"},
"responseProfile":{"type":1,
"fields":"id,referenceId,name,description,thumbnailUrl,dataUrl,duration,msDuration,flavorParamsIds,mediaType,type,tags,dvrStatus,externalSourceType,status"}},
"3":{"service":"baseEntry","action":"getPlaybackContext",
"entryId":"{2:result:objects:0:id}","ks":"{1:result:ks}",
"contextDataParams":{"objectType":"KalturaContextDataParams","flavorTags":"all"}},
"4":{"service":"metadata_metadata","action":"list",
"filter":{"objectType":"KalturaMetadataFilter",
"objectIdEqual":"DUMMY_ENTRY_ID","metadataObjectTypeEqual":"1"},
"ks":"{1:result:ks}"},"apiVersion":"3.3.0","format":1,"ks":"",
"clientTag":"html5:v0.53.7","partnerId":102
}
"""
_MULT_HEADERS = {
"Content-Type": "application/json",
"Origin": "https://multimedia.europarl.europa.eu",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty"
}
def get_vid_id(url):
response = requests.get(url)
if response.status_code != 200:
#print("Problem scraping page " + url)
return None
soup = BeautifulSoup(response.content, "lxml")
ogvid = soup.find("meta", {"property": "og:video"})
if ogvid and ogvid.has_attr("content"):
cont = ogvid["content"]
cont = cont.split("entryId/")[1]
cont = cont.split("/v/")[0]
return cont
else:
return None
def get_vid_id2(content_id):
response = requests.get(f"https://multimedia.europarl.europa.eu/ga/c/portal/layout?p_l_id=39691&p_p_id=media_quick_actions_portlet_MediaQuickActionsPortlet&p_p_lifecycle=0&p_p_state=exclusive&_media_quick_actions_portlet_MediaQuickActionsPortlet_mvcPath=%2Fhtml%2Ftogglers%2Fpreview_toggler.jsp&_media_quick_actions_portlet_MediaQuickActionsPortlet_mediaId={content_id}&_media_quick_actions_portlet_MediaQuickActionsPortlet_arrowDivXPositionStart=-912.4258792266845&_media_quick_actions_portlet_MediaQuickActionsPortlet_arrowDivXLength=461")
if response.status_code != 200:
return None
if not "kalturaPlayer.loadMedia" in response.text:
return None
prune = response.text.split("kalturaPlayer.loadMedia")[1]
return prune.split("'")[1]
def get_json_body(vid_id):
actual_json = _JSON_REQUEST.replace("\n", "").replace("DUMMY_ENTRY_ID", vid_id)
response = requests.post("https://kmc.europarltv.europa.eu/api_v3/service/multirequest", headers=_MULT_HEADERS, data=actual_json)
body = json.loads(response.content)
return body
def get_subtitles(body):
subtitles = {}
for part in body:
if 'playbackCaptions' in part:
for subtitle in part['playbackCaptions']:
if 'languageCode' in subtitle:
lang_code = subtitle['languageCode']
else:
lang_code = None
if 'webVttUrl' in subtitle:
webvtt = subtitle['webVttUrl']
else:
webvtt = None
if webvtt is not None and lang_code is not None:
subtitles[lang_code] = webvtt
else:
continue
return subtitles
def get_video(body):
for part in body:
if "sources" in part:
for source in part["sources"]:
if source["url"].endswith(".mp4"):
return source["url"]
data = []
for item in videos:
url = item["url"]
item["id"] = get_vid_id2(item["data_id"])
if "id" not in item or item["id"] is None:
item["id"] = get_vid_id(url)
if "id" not in item or item["id"] is None:
print(url)
continue
body = get_json_body(item["id"])
item["video"] = get_video(body)
item["vtts"] = get_subtitles(body)
data.append(item)
with open('europarl.json', 'w') as outfile:
json.dump(data, outfile)