Initial library code commit

This commit is contained in:
2020-08-11 20:51:24 -07:00
parent 5c66aa8c7a
commit 871fbd6d0e
12 changed files with 732 additions and 0 deletions
+2
View File
@@ -0,0 +1,2 @@
from archivers import ArchiveLi
from archivers import ArchiveOrg
+3
View File
@@ -0,0 +1,3 @@
from base import ArchiveBase
from org import ArchiveOrg
from li import ArchiveLi
+193
View File
@@ -0,0 +1,193 @@
import json
import os
import time
import urllib
from selenium.common.exceptions import NoSuchElementException
from timer import ArcTimer
from driver import get_driver, DEAD_DRIVER_TUPLE
class ArchiveBase(object):
'''
Archiver base class.
'''
def __init__(self, base_url=None, new_driver=False, print_sleep=3, request_sleep=0, pageLoadStrategy=None, timeout=None, **kwargs):
'''
Args:
base_url (str): Base archive url, should have a "{url}" format string
wherever the target URL is supposed to go.
new_driver (bool): whether to make a new webdriver or
use the existing global one
print_sleep (int): seconds to wait before updating the console readout
request_sleep (int): seconds to wait between ALL requests
pageLoadStrategy (str): look up pageLoadStrategy in Selenium
timeout (int): seconds to wait for the current archive to complete
before starting the next one
**kwargs: arbitrary keyword arguments to set as object attributes
'''
self.base_url = base_url
self.print_sleep = print_sleep
self.request_sleep = request_sleep
self.driver = get_driver(new_driver)
self.timeout = timeout
self.pageLoadStrategy = pageLoadStrategy
self.out = []
for k,v in kwargs.items():
setattr(self, k, v)
def archive_all(self, urls=None, file=None, unique=True):
'''
Archive multiple URLs. If no URLs are specified, will attempt to load a list of URLs to archive from the specified file.
Args:
urls (list, [optional]): list of URLs to archive
file (str, [optional]): path to a JSON to save/load from
unique (bool, [optional]): if true then ignore URLs that are
already archived (i.e. in self.out attribute)
Returns:
TYPE: #DOC#
Raises:
ValueError: #DOC#
'''
if urls is not None:
if unique:
urls_done = set([i['original'] for i in self.out])
urls_todo = set(urls).difference(urls_done)
self.out += [{'original':i, 'archived':None} for i in urls_todo]
elif file is not None:
try:
self.out = self.load(file)
except:
raise ValueError('u dun goofed!')
todo = [i for i in self.out if i['archived'] is None]
total = len(todo)
for index, dicto in enumerate(todo):
url = dicto['original']
this = self.archive_one(url, index+1, total)
dicto.update(this)
if file:
self.save(self.out, file)
time.sleep(self.request_sleep)
return self.out
def archive_one(self, url, cur=1, max=1, **kwargs):
'''
#DOC# Add description
Args:
url (str): URL to archive
cur (int, [optional]): current index for the console printout
max (int, [optional]): maximum index for the console printout
**kwargs: keyword args to use for URL formatting
Returns:
dict: dictionary with original URL + archive URL
'''
print("Archiving ({cur}/{max}): {url}".format(**locals()))
archive_end_url = self.get_looper(url)
print("\nORIGINAL: {}\nARCHIVED: {}".format(url, archive_end_url))
return {'original':url, 'archived':archive_end_url}
def get(self, url, **kwargs):
'''
Get an archive of the specified URL
Args:
url (str):
**kwargs: keyword args to use for URL formatting
'''
target_url = self.base_url.format(url=urllib.quote(url), **kwargs)
try:
self.driver.get(target_url)
assert(self.neterror_check())
except AssertionError:
self.neterror_handler(url=url, **kwargs)
except DEAD_DRIVER_TUPLE:
self.restart_driver()
self.get(url)
def get_looper(self, url, timer=None, **kwargs):
'''
Makes sure that the driver gets restarted if it dies partway through
Args:
url (TYPE): URL to initiate the archive process
timer (timer.ArcTimer, [optional]): timer object; monitors timeouts
**kwargs: keyword args to use for URL formatting
Returns:
str: current page URL
'''
if timer is None:
timer = ArcTimer(sleep=self.print_sleep, timeout=self.timeout)
try:
self.get(url, **kwargs)
while not self.is_done(timer=timer):
timer.report()
return self.driver.current_url
except DEAD_DRIVER_TUPLE:# as e:
self.restart_driver()
self.get_looper(url, timer)
def is_done(self, **kwargs):
'''
Used by get_looper to determine when the archive is done
'''
raise NotImplementedError('ERROR: Must implement "is_done" method!')
def load(self, path):
'''
Loads data from a JSON file
'''
path = os.path.expanduser(path)
with open(path) as file:
js = json.load(file)
return js
def neterror_check(self):
'''
Makes sure a neterror hasn't ruined shit
'''
try:
assert(self.driver.find_element_by_class_name('neterror'))
return True
except AssertionError:
return False
except NoSuchElementException:
return True
except Exception as e:
raise e
def neterror_handler(self, url, **kwargs):
'''
placeholder for class-specific neterror handler
'''
pass
def restart_driver(self):
'''
Restarts the webdriver
'''
try:
self.driver.quit()
except:
pass
self.driver = get_driver(new=True, pageLoadStrategy=self.pageLoadStrategy)
def save(self, data, path, indent=4):
'''
Saves the archived URLs
Args:
data (dict/list): JSON-compatible archive data object (e.g. dict, list)
path (str): desired save path for the archive JSON
indent (int, [optional]): JSON indent to pass into json.dumps
'''
path = os.path.expanduser(path)
with open(path, 'w+') as file:
file.write(json.dumps(data,indent=indent))
print('Saved data to file: {}'.format(path))
+82
View File
@@ -0,0 +1,82 @@
import re
from base import ArchiveBase
REGEX_WIP = r'https://archive\.(li|today|vn|md|[a-z]+)/wip/.+'
REGEX_DONE = r'https://archive\.(li|today|vn|md|[a-z]+)/(?![\?]run=1|submit/|wip/)'
URL_BASE = "https://archive.{tld}/?run=1&url={{url}}"
TLD_ARCHIVE_LI = ['li','today','vn','md','is']
class ArchiveTLD(object):
'''
Descriptor class for archive TLDs
'''
def __get__(self, obj, objtype):
'''
Returns the object's _tld attribute
'''
return getattr(obj, '_tld')
def __set__(self, obj, value):
'''
Sets the object's _tld attribute and updates the object's
_base_url attribute so that the URL uses the new TLD
'''
setattr(obj, '_tld', value)
new_base = getattr(obj, '_base_url').format(tld=obj._tld)
setattr(obj, 'base_url', new_base)
class ArchiveLi(ArchiveBase):
'''
Archiver for archive.li/today/vn/md/is
'''
tld = ArchiveTLD()
def __init__(self, base_url=URL_BASE, regex=REGEX_DONE, regex_wip=REGEX_WIP, new_driver=False, print_sleep=3, request_sleep=0, tld='li', pageLoadStrategy=None, timeout=60):
'''
Args:
base_url (str): Base archive url, should have a "{url}" format string
wherever the target URL is supposed to go.
regex (str): regex string for completed archive URLs
regex_wip (str): regex string for URLs that indicate the archive is still going
new_driver (bool): whether to make a new webdriver or
use the existing global one
print_sleep (int): seconds to wait before updating the console readout
request_sleep (int): seconds to wait between ALL requests
tld (str, [optional]): #DOC#
pageLoadStrategy (str): look up pageLoadStrategy in Selenium
timeout (int): seconds to wait for the current archive to complete
before starting the next one
**kwargs: arbitrary keyword arguments to set as object attributes
'''
self._base_url = base_url
super(ArchiveLi,self).__init__(**{k:v for k,v in locals().items() if k is not 'self'})
self.regex = re.compile(regex)
self.regex_wip = re.compile(regex_wip)
self.timeout = timeout
def is_done(self, timer, **kwargs):
'''
Used by get_looper to determine when the archive is done
'''
if self.regex.match(self.driver.current_url):
return True
elif timer.is_timeout() and self.regex_wip.match(self.driver.current_url):
return True
else:
return False
def neterror_handler(self, url, **kwargs):
'''
placeholder for class-specific neterror handler
'''
for index, tld in enumerate(TLD_ARCHIVE_LI):
if tld == self.tld:
break
try:
self.tld = TLD_ARCHIVE_LI[index+1]
except IndexError:
self.tld = TLD_ARCHIVE_LI[0]
self.get(url, **kwargs)
+35
View File
@@ -0,0 +1,35 @@
import re
from base import ArchiveBase
REGEX_DONE = r'https://web.archive.org/web/[0-9]{14}/'
URL_BASE = 'https://web.archive.org/save/{url}'
class ArchiveOrg(ArchiveBase):
'''
Archiver class for Internet Archive (archive.org)
'''
def __init__(self, base_url=URL_BASE, new_driver=False, print_sleep=3, request_sleep=10, pageLoadStrategy=None, timeout=None):
'''
Args:
base_url (str): Base archive url, should have a "{url}" format string
wherever the target URL is supposed to go.
new_driver (bool): whether to make a new webdriver or
use the existing global one
print_sleep (int): seconds to wait before updating the console readout
request_sleep (int): seconds to wait between ALL requests
pageLoadStrategy (str): look up pageLoadStrategy in Selenium
timeout (int): seconds to wait for the current archive to complete
before starting the next one
'''
super(ArchiveOrg,self).__init__(**{k:v for k,v in locals().items() if k is not 'self'})
self.regex = re.compile(REGEX_DONE)
def is_done(self, url, **kwargs):
'''
Used by get_looper to determine when the archive is done
'''
if self.regex.match(self.driver.current_url)\
and self.driver.current_url.endswith(url):
return True
else:
return False
+24
View File
@@ -0,0 +1,24 @@
from bs4 import BeautifulSoup
from driver import Driven
class ChomperBase(Driven):
'''
Web parser/ingester base class
Attributes:
soup (TYPE): #DOC#
'''
def __init__(self, new_driver=False):
localArgs = {k:v for k,v in locals().items() if k is not 'self'}
super(ChomperBase,self).__init__(**localArgs)
def main(self, url):
self.get(url)
self.make_soup()
return self.parse()
def make_soup(self):
self.soup = BeautifulSoup(self.driver.page_source,'lxml')
def parse(self):
raise NotImplementedError('Inheriting class must implement parse_soup!')
+18
View File
@@ -0,0 +1,18 @@
from driver import Driven
from bs4 import BeautifulSoup
class CrawlerBase(Driven):
'''
Webcrawler base class
'''
def __init__(self, new_driver=False, **kwargs):
super(CrawlerBase,self).__init__(new_driver=new_driver, **kwargs)
self.data = []
def crawl(self):
raise NotImplementedError('Inheriting class must implement crawl!')
def make_soup(self):
self.soup = BeautifulSoup(self.driver.page_source,'lxml')
+45
View File
@@ -0,0 +1,45 @@
import socket
from selenium import webdriver
from selenium.common.exceptions import NoSuchFrameException, NoSuchWindowException, RemoteDriverServerException, TimeoutException, WebDriverException
#from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
#DRIVER_CAPS = DesiredCapabilities.CHROME
#DRIVER_CAPS['pageLoadStrategy'] = 'eager'
DEAD_DRIVER_TUPLE = (NoSuchWindowException, NoSuchFrameException, RemoteDriverServerException, WebDriverException, TimeoutException, socket.error)
PAGE_LOAD_STRATEGY = 'normal'
def get_driver(new=False, pageLoadStrategy=None):
global driver
if new:
return make_driver(pageLoadStrategy)
else:
try:
assert(driver)
except (AssertionError, NameError):
driver = make_driver(pageLoadStrategy)
return driver
def make_driver(pageLoadStrategy=None):
if pageLoadStrategy is None:
pageLoadStrategy = PAGE_LOAD_STRATEGY
return webdriver.Chrome(desired_capabilities={'pageLoadStrategy':pageLoadStrategy})
class Driven(object):
def __init__(self, new_driver=False, **kwargs):
self.driver = get_driver(new_driver)
for k,v in kwargs.items():
setattr(self, k, v)
def get(self, url):
try:
self.driver.get(url)
return self.driver.current_url
except DEAD_DRIVER_TUPLE:
self.restart_driver()
self.get(url)
def restart_driver(self):
try:
self.driver.quit()
except:
pass
self.driver = get_driver(new=True)
+13
View File
@@ -0,0 +1,13 @@
from bs4 import BeautifulSoup
def bungie(source):
soup_urls = []
soup = BeautifulSoup(source,'lxml')
for tr in soup.find_all('tr'):
try:
a = tr.find_all('td')[0].find_all('a')[0]
except IndexError:
continue
if 'href' in a.attrs and a.attrs['href'].startswith('http://forums.bungie.org/'):
soup_urls.append({'title':a.text,'original':a.attrs['href']})
return soup_urls
+137
View File
@@ -0,0 +1,137 @@
from chomper import ChomperBase
from crawler import CrawlerBase
from selenium.common.exceptions import TimeoutException, WebDriverException, NoSuchElementException
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
import re
FORUM_URL_BASE = 'https://prettyuglylittleliar.net/forum/{forum}/'
FORUM_URL_PAGE = FORUM_URL_BASE + '?page={{page}}'
PAGE_REGEX = r'Page [0-9]+ of |[\s]+$'
CATEGORIES = [
"1-pretty-ugly-little-liar",
"4-snowflakes",
"6-general"
]
SUBFORUMS = [
"2-news-announcements",
"3-introduce-yourself",
"5-dakota-rose-\u30c0\u30b3\u30bf-\u30ed\u30fc\u30ba",
"7-general-discussion",
"8-site-feedback",
"9-kiki-kannibal",
"10-venus-angelic",
"11-kanadajin",
"12-little-snowflakes",
"13-online-personalities",
"14-beauty-fashion",
"15-health-wellbeing",
"16-love-relationships",
"20-johanna-herrstedt",
"21-taylor-r",
"22-jessica-nigri",
"24-movies-television",
"25-music",
"26-gaming",
"27-wylona-hayashi",
"28-skincare",
"29-reading",
"32-entertainment",
"35-yumi-king",
"36-yandev",
"38-simply-kenna-cozy-kitsune",
"39-vic-mignogna",
"42-pokimane"
]
class PullCrawler(CrawlerBase):
def __init__(self, new_driver=False):
#localArgs = {k:v for k,v in locals().items() if k is not 'self'}
super(PullCrawler,self).__init__(new_driver=new_driver)
def crawl(self):
total = len(self.subforums)
for index, subforum in enumerate(self.subforums):
print('Crawling subforum "{}" ({}/{})...'.format(subforum.subforum,index+1,total))
subforum.main()
self.data.append(subforum.dump())
def make_subforum_crawlers(self):
self.subforums = []
for subforum in SUBFORUMS:
self.subforums.append(PullSubforumCrawler(subforum))
def main(self):
self.make_subforum_crawlers()
self.crawl()
class PullSubforumCrawler(CrawlerBase):
def __init__(self, subforum, new_driver=False):
localArgs = {k:v for k,v in locals().items() if k is not 'self'}
super(PullSubforumCrawler,self).__init__(**localArgs)
self.base_url = FORUM_URL_BASE.format(forum=subforum)
self.page_url = FORUM_URL_PAGE.format(forum=subforum)
def crawl(self):
total = len(self.urls)
for index, url in enumerate(self.urls):
print('Crawling subforum page {}/{}...'.format(index+1,total))
if self.driver.current_url != url:
self.get(url)
page_threads = self.get_page_threads()
self.data.append({'url':url, 'page_number':index+1, 'threads':page_threads, 'type':'subforum_page', 'source':self.driver.page_source})
def dump(self):
return {'url':self.base_url, 'title':self.title, 'pages':self.data}
def get_page_count(self):
try:
count_text = self.driver.find_element_by_class_name('ipsPagination_pageJump').text
self.page_count = int(re.sub(PAGE_REGEX, '', count_text))
except NoSuchElementException:
self.page_count = 0
except Exception as e:
raise e
def get_page_threads(self):
self.make_soup()
threads = self.soup.find_all('li',attrs={'itemtype':'http://schema.org/Article'})
page_threads = []
for thread in threads:
page_threads.append(self.parse_thread(thread))
return page_threads
def get_subforum_title(self):
self.make_soup()
self.title = self.soup.find('h1',attrs={'class':'ipsType_pageTitle'}).text
def main(self):
self.get(self.base_url)
self.get_page_count()
self.get_subforum_title()
self.make_page_urls()
self.crawl()
def make_page_urls(self):
self.urls = [self.base_url]
for x in range(1, self.page_count+1):
self.urls.append(self.page_url.format(page=x))
def parse_thread(self, thread):
head = thread.find_all('h4')[0]
#url = head.find_all('a')[0].attrs['href']
url = head.find_all('a',attrs={'itemprop':'url'})[0].attrs['href']
title = head.find_all('span',attrs={'itemprop':'name'})[-1].text.strip()
try:
last_page_obj = thread.find_all('li',attrs={'class':'ipsPagination_last'})[0]
last_page = int(last_page_obj.text.strip())
except IndexError:
last_page = 1
urls = self._parse_thread_pages(url, last_page)
return {'urls':urls, 'title':title, 'page_count':last_page, 'type':'thread'}
def _parse_thread_pages(self, url, last_page):
out = [url]
for x in range(2, last_page+1):
out.append(url+'?page={}'.format(x))
return out
+152
View File
@@ -0,0 +1,152 @@
from chomper import ChomperBase
from crawler import CrawlerBase
from selenium.common.exceptions import TimeoutException, WebDriverException
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
class ChomperWikiaProfile(ChomperBase):
def __init__(self, new_driver=False):
localArgs = {k:v for k,v in locals().items() if k is not 'self'}
super(ChomperWikiaProfile,self).__init__(**localArgs)
def parse(self):
pass
class WikiaMessageCrawler(CrawlerBase):
def __init__(self, new_driver=False):
localArgs = {k:v for k,v in locals().items() if k is not 'self'}
super(WikiaMessageCrawler,self).__init__(**localArgs)
def crawl(self):
has_more_pages = True
while has_more_pages:
self.get_page_comments()
has_more_pages = self.next_page()
return self.data
def main(self, url):
self.get(url)
return self.crawl()
def next_page(self):
try:
page_next = self.driver.find_elements_by_class_name('Pagination')[0].find_elements_by_link_text('Next')[0]
self._next_page_click(page_next)
#page_next.send_keys(Keys.ARROW_DOWN)
#page_next.click()
return True
except IndexError:
return False
def _next_page_click(self, page_next):
try:
page_next.click()
except WebDriverException:
page_next.send_keys(Keys.ARROW_DOWN)
self.next_page_click(page_next)
except Exception as e:
raise e
def get_page_comments(self):
self._get_page_comment_trees()
for tree in self.comment_trees:
self.data.append(self.process_comment_tree(tree))
return self.data
def _get_page_comment_trees(self):
#get container with all comments
self.comments_box = driver.find_elements_by_class_name('comments')[0]
#get all top-level comments
self.comment_trees = []
for i in self.comments_box.find_elements_by_tag_name('li'):
if i.get_attribute('class').startswith('SpeechBubble message message-main'):
self.comment_trees.append(i)
return self.comment_trees
def process_comment_tree(self, tree):
self.comment_tree = {'replies':[]}
self._process_comment_tree_load_more(tree)
messages = tree.find_elements_by_class_name('speech-bubble-message')
for index, message in enumerate(messages):
try:
data = self.process_message(message)
except IndexError as e: #should only happen if it's the reply box
try: #make sure it's the reply box, break if so
assert(message.find_element_by_class_name('replyBody'))
break
except AssertionError:
raise e #raise original exception if it's not the reply box
if index == 0:
self.comment_tree.update(data)
self.comment_tree.update(self._process_message_title(message))
else:
self.comment_tree['replies'].append(data)
return self.comment_tree
def _process_comment_tree_load_more(self, tree):
try:
load_more = tree.find_elements_by_class_name('load-more')[0]
'''
try:
load_more.send_keys(Keys.ARROW_DOWN)
except:
pass
load_more.click()
'''
self.force_click_element(load_more)
except IndexError:
pass
except Exception as e:
raise e
def force_click_element(self, element, retries=100):
try:
assert(retries > 0)
element.click()
except WebDriverException:
try:
element.send_keys(Keys.ARROW_DOWN)
self.force_click_element(element, retries=retries-1)
except WebDriverException:
actions = ActionChains(self.driver)
actions.move_to_element(element)
actions.click()
actions.perform()
except (AssertionError, TimeoutException):
raise TimeoutException('Tried to click element {} times without success!'.format(retries))
except Exception as e:
raise e
def process_message(self, message):
#get message author
author = message.find_elements_by_class_name('edited-by')[0].text
#get message body
body = message.find_elements_by_class_name('msg-body')[0].text
#get permalink and timestamp from timestamp class
time_ptr = message.find_elements_by_class_name('timestamp')[0].find_elements_by_tag_name('a')[0]
permalink = time_ptr.get_attribute('href')
timestamp = time_ptr.text
return {'author':author, 'body':body, 'date':timestamp, 'url':permalink}
def _process_message_title(self, message):
try:
title = message.find_elements_by_class_name('msg-title')[0].text
return {'title':title}
except:
return {}
class Wikia:
def wikia_crawl():
#go to wall
driver.find_elements_by_link_text('Message Wall')[0].click()
#get container with all comments
comments_box = driver.find_elements_by_class_name('comments')[0]
pages = driver.find_elements_by_class_name('Pagination')[0]
def other():
driver.find_elements_by_link_text('Blog')[0].click()
driver.find_elements_by_link_text('Contributions')[0].click()
+28
View File
@@ -0,0 +1,28 @@
import datetime
import sys
import time
class ArcTimer(object):
def __init__(self, sleep=3, timeout=None):
self.start = datetime.datetime.now()
self.sleep = sleep
self.timeout = timeout
def count_seconds(self):
self.seconds = (datetime.datetime.now() - self.start).total_seconds()
def is_timeout(self):
try:
assert(self.timeout is not None)
assert(self.seconds >= self.timeout)
sys.stdout.write("\rHit timeout when waiting for archive to complete ({} seconds)".format(self.seconds))
sys.stdout.flush()
return True
except:
return False
def report(self):
self.count_seconds()
sys.stdout.write("\rWaiting for archive to complete ({} seconds)".format(self.seconds))
sys.stdout.flush()
time.sleep(self.sleep)