major refactor; cleaned up some wonky architecture; passing unit tests for comic downloads
This commit is contained in:
@@ -1,66 +0,0 @@
|
|||||||
import os
|
|
||||||
from qbittorrent import Client
|
|
||||||
from yoink.provider import PirateBay, Provider, ReadAllComics
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Downloader:
|
|
||||||
def __init__(self) -> None:
|
|
||||||
self.qb = Client('http://127.0.0.1:8080')
|
|
||||||
self.qb.login('admin', 'adminadmin')
|
|
||||||
self.headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.75 Safari/537.36'}
|
|
||||||
self.limit = 1
|
|
||||||
self.queue = []
|
|
||||||
self.config_path = self.set_path(os.path.abspath(os.path.join(os.environ.get('HOME'), '.config/yoink')))
|
|
||||||
self.root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
|
|
||||||
self.download_path = self.set_path(os.path.join(os.environ.get('HOME'), 'yoink/downloads'))
|
|
||||||
|
|
||||||
|
|
||||||
def __download_torrent(self, magnetlink):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def set_path(self, path):
|
|
||||||
if path.strip() == '': raise ValueError('Path cannot be an empty string')
|
|
||||||
|
|
||||||
if not os.path.exists(path):
|
|
||||||
os.makedirs(path)
|
|
||||||
|
|
||||||
return path
|
|
||||||
|
|
||||||
def empty_queue(self):
|
|
||||||
self.queue = []
|
|
||||||
|
|
||||||
def add(self, item):
|
|
||||||
self.queue.append(item)
|
|
||||||
|
|
||||||
def download(self, file):
|
|
||||||
if isinstance(file, ReadAllComics):
|
|
||||||
pass
|
|
||||||
elif isinstance(file, PirateBay):
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
raise TypeError('Downloads from this site are not yet supported')
|
|
||||||
|
|
||||||
|
|
||||||
class Bounty:
|
|
||||||
def __init__(self, url):
|
|
||||||
self.provider = Provider(site=url)
|
|
||||||
self.downloader = Downloader()
|
|
||||||
|
|
||||||
def plunder(self, *args, **kwargs):
|
|
||||||
if isinstance(self.provider, ReadAllComics):
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
raise TypeError(f'{self.provider} is not a valid provider')
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
item = Bounty('http://readallcomics.com/static-season-one-4-2021/')
|
|
||||||
# downloader = Downloader()
|
|
||||||
# print(downloader.download_path)
|
|
||||||
item.provider.download()
|
|
||||||
@@ -1,5 +1,7 @@
|
|||||||
import click
|
import click
|
||||||
|
|
||||||
|
from yoink.common import qb_client, app_root, library_path, config_path
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@click.group()
|
@click.group()
|
||||||
|
|||||||
100
yoink/comic.py
Normal file
100
yoink/comic.py
Normal file
@@ -0,0 +1,100 @@
|
|||||||
|
from click import format_filename
|
||||||
|
from soupsieve import select
|
||||||
|
from yoink.common import required_comic_files, skippable_images, library_path
|
||||||
|
from yoink.scraper import Scrapable
|
||||||
|
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import urllib
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class Comic(Scrapable):
|
||||||
|
def __init__(self, url) -> None:
|
||||||
|
super().__init__(url)
|
||||||
|
|
||||||
|
|
||||||
|
def __get_image_src(self, comic):
|
||||||
|
if comic.attrs:
|
||||||
|
return comic.attrs['src']
|
||||||
|
|
||||||
|
for image in comic:
|
||||||
|
return image.attrs['src']
|
||||||
|
|
||||||
|
def __parse_soup(self):
|
||||||
|
soup = {
|
||||||
|
'default': self.soup.find_all('div', class_='separator'),
|
||||||
|
'no-div': self.soup.find_all('img', attrs={'width': '1000px'}),
|
||||||
|
'excaliber': self.soup.find_all('img')
|
||||||
|
}
|
||||||
|
|
||||||
|
for case in soup.keys():
|
||||||
|
comics = soup.get(case)
|
||||||
|
|
||||||
|
if len(comics) > 0:
|
||||||
|
return comics
|
||||||
|
|
||||||
|
@property
|
||||||
|
def filelist(self):
|
||||||
|
comics = self.__parse_soup()
|
||||||
|
return [comic for comic in list(map(self.__get_image_src, comics)) if not comic.endswith(skippable_images)]
|
||||||
|
|
||||||
|
|
||||||
|
@property
|
||||||
|
def title(self): return self.soup.title.string.replace(' | Read All Comics Online For Free', '').replace('…', '').replace('#', '').replace(':', '').strip()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def category(self):
|
||||||
|
data = self.soup.find('a', attrs={'rel': 'category tag'} )
|
||||||
|
return data.text
|
||||||
|
|
||||||
|
def can_remove(self, filename):
|
||||||
|
return not filename.endswith(required_comic_files)
|
||||||
|
|
||||||
|
|
||||||
|
class ComicArchiver:
|
||||||
|
def __init__(self, comic : Comic) -> None:
|
||||||
|
self.comic = comic
|
||||||
|
self.worktree = os.path.join(library_path, f'comics/{self.comic.title}')
|
||||||
|
|
||||||
|
def download(self):
|
||||||
|
|
||||||
|
if not os.path.exists(self.worktree):
|
||||||
|
os.makedirs(self.worktree, mode=0o777)
|
||||||
|
|
||||||
|
for index,url in enumerate(self.comic.filelist):
|
||||||
|
opener = urllib.request.build_opener()
|
||||||
|
opener.addheaders = [('User-Agent', 'Mozilla/5.0')]
|
||||||
|
urllib.request.install_opener(opener)
|
||||||
|
|
||||||
|
if not url.endswith('.jpg'):
|
||||||
|
formatted_file = os.path.join(self.worktree, f'{self.comic.title} ' + ''.join([str(index).zfill(3), '.jpg']))
|
||||||
|
print(formatted_file, end='\r')
|
||||||
|
urllib.request.urlretrieve(url, filename=formatted_file)
|
||||||
|
else:
|
||||||
|
page_number = url.split('/')[-1].split('.')[0].zfill(3)
|
||||||
|
file_extension = url.split('/')[-1].split('.')[1]
|
||||||
|
urllib.request.urlretrieve(url, filename=os.path.join(self.worktree, f'{self.comic.title}{page_number}.{file_extension}'))
|
||||||
|
|
||||||
|
def generate_archive(self, archive_format='.cbr'):
|
||||||
|
if os.path.exists(os.path.join(self.worktree, f'{self.comic.title}{archive_format}')):
|
||||||
|
return
|
||||||
|
|
||||||
|
output = shutil.make_archive(self.comic.title, 'zip', self.worktree)
|
||||||
|
os.rename(output, os.path.join(self.worktree, f'{self.comic.title}{archive_format}'))
|
||||||
|
|
||||||
|
|
||||||
|
def cleanup_worktree(self):
|
||||||
|
for image in os.listdir(self.worktree):
|
||||||
|
if not image.endswith(required_comic_files):
|
||||||
|
os.remove(os.path.join(self.worktree, image))
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
comic = Comic('http://www.readallcomics.com/static-season-one-4-2021/')
|
||||||
|
# # print(comic.filelist)
|
||||||
|
# # print(len(comic.filelist))
|
||||||
|
# archiver = ComicArchiver(comic)
|
||||||
|
# archiver.download()
|
||||||
|
# archiver.generate_archive()
|
||||||
|
# archiver.cleanup_worktree()
|
||||||
|
print(comic.category)
|
||||||
19
yoink/common.py
Normal file
19
yoink/common.py
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
import pathlib
|
||||||
|
|
||||||
|
from qbittorrent import Client
|
||||||
|
|
||||||
|
import pathlib
|
||||||
|
# TODO replace os path with pathlib
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
app_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
|
||||||
|
config_path = os.path.abspath(os.path.join(os.environ.get('HOME'), '.config/yoink'))
|
||||||
|
library_path = os.path.abspath(os.path.join(os.environ.get('HOME'), 'yoink/library'))
|
||||||
|
required_comic_files = ('.cbr', '.cbz', '000.jpg', '001.jpg')
|
||||||
|
skippable_images = ('logo-1.png', 'logo.png', 'report.png', 'request.png', 'prev.png', 'Next.png', 'Donate.png', '11.png')
|
||||||
|
torrent_concurrent_download_limit = 1
|
||||||
|
qb_client = Client('127.0.0.1:8080').login('admin', 'adminadmin')
|
||||||
|
supported_sites = ['readallcomics.com', 'tpb.party']
|
||||||
|
headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.75 Safari/537.36'}
|
||||||
@@ -1,126 +0,0 @@
|
|||||||
import os
|
|
||||||
import requests
|
|
||||||
import urllib
|
|
||||||
from bs4 import BeautifulSoup
|
|
||||||
from urllib.parse import urlparse
|
|
||||||
|
|
||||||
|
|
||||||
root_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
|
|
||||||
config_dir = os.path.abspath(os.environ.get('HOME'))
|
|
||||||
|
|
||||||
class Downloadable(object):
|
|
||||||
stopped_state = ('pausedUP', 'stalledUP', 'uploading', 'seeding')
|
|
||||||
|
|
||||||
def __init__(self, uri) -> None:
|
|
||||||
self.uri = uri
|
|
||||||
self.headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.75 Safari/537.36'}
|
|
||||||
|
|
||||||
@property
|
|
||||||
def markup(self):
|
|
||||||
return requests.get(self.uri)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def soup(self):
|
|
||||||
return BeautifulSoup(self.markup.content, 'html.parser')
|
|
||||||
|
|
||||||
def download(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class PirateBay(Downloadable):
|
|
||||||
@property
|
|
||||||
def magnet(self):
|
|
||||||
self.soup.find('', attrs={'title': 'Get this torrent'}).attrs['href']
|
|
||||||
|
|
||||||
class ReadAllComics(Downloadable):
|
|
||||||
|
|
||||||
def __init__(self, uri) -> None:
|
|
||||||
super().__init__(uri)
|
|
||||||
self.filelist = self.__get_comic_filelist()
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_frontpage_links(cls):
|
|
||||||
markup = requests.get('http://www.readallcomics.com')
|
|
||||||
soup = BeautifulSoup(markup.content, 'html.parser')
|
|
||||||
posts = soup.find_all('div', class_='type-post')
|
|
||||||
links = []
|
|
||||||
|
|
||||||
for post in posts:
|
|
||||||
links.append({
|
|
||||||
'title': post.find('h2').text,
|
|
||||||
'image': post.find('img', height='250').attrs['src'],
|
|
||||||
'uri': post.find('a', class_='font-link').attrs['href']
|
|
||||||
})
|
|
||||||
|
|
||||||
return links
|
|
||||||
|
|
||||||
@property
|
|
||||||
def title(self):
|
|
||||||
return self.soup.title.string.replace(' | Read All Comics Online For Free', '').replace('…', '').replace('#', '').replace(':', '').strip()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def category(self):
|
|
||||||
data = self.soup.find('a', attrs={'rel': 'category tag'})
|
|
||||||
return data.text
|
|
||||||
|
|
||||||
def __can_remove(self, filename):
|
|
||||||
ignore = ('.cbr', '.cbz', '000.jpg', '001.jpg')
|
|
||||||
return not filename.endswith(ignore)
|
|
||||||
|
|
||||||
def __get_image_src(self, comic):
|
|
||||||
if comic.attrs:
|
|
||||||
return comic.attrs['src']
|
|
||||||
|
|
||||||
for image in comic:
|
|
||||||
return image.attrs['src']
|
|
||||||
|
|
||||||
def __parse_soup(self):
|
|
||||||
soup = {
|
|
||||||
'default': self.soup.find_all('div', class_='separator'),
|
|
||||||
'no-div': self.soup.find_all('img', attrs={'width': '1000px'}),
|
|
||||||
'excaliber': self.soup.find_all('img')
|
|
||||||
}
|
|
||||||
|
|
||||||
for case in soup.keys():
|
|
||||||
comics = soup.get(case)
|
|
||||||
|
|
||||||
if len(comics) > 0:
|
|
||||||
return comics
|
|
||||||
|
|
||||||
def __get_comic_filelist(self):
|
|
||||||
comics = self.__parse_soup()
|
|
||||||
return list(map(self.__get_image_src, comics))
|
|
||||||
|
|
||||||
|
|
||||||
def download(self):
|
|
||||||
skippable_files = ('logo-1.png', 'logo.png', 'report.png', 'request.png', 'prev.png', 'Next.png', 'Donate.png', '11.png')
|
|
||||||
|
|
||||||
for url in self.filelist:
|
|
||||||
opener = urllib.request.build_opener()
|
|
||||||
opener.addheaders = [('User-agent', self.headers['user-agent'])]
|
|
||||||
urllib.request.install_opener(opener)
|
|
||||||
|
|
||||||
if url.endswith(skippable_files):
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not url.endswith('.jpg'):
|
|
||||||
urllib.request.urlretrieve(url, filename=os.path.join(self.download_path + f'/{self.title}', f'{self.title}'.join([str(url.index(url)).zfill(3), '.jpg'])))
|
|
||||||
else:
|
|
||||||
page_number = url.split('/')[-1].split('.')[0].zfill(3)
|
|
||||||
file_extension = url.split('/')[-1].split('.')[1]
|
|
||||||
urllib.request.urlretrieve(url, filename=os.path.join(self.download_path + f'/{self.title}', f'{self.title}{page_number}.{file_extension}'))
|
|
||||||
|
|
||||||
|
|
||||||
def Provider(site='http://readallcomics.com'):
|
|
||||||
providers = {
|
|
||||||
'readallcomics': ReadAllComics
|
|
||||||
}
|
|
||||||
|
|
||||||
domain = urlparse(site)
|
|
||||||
|
|
||||||
name=domain.netloc.split('.')[0]
|
|
||||||
|
|
||||||
if name not in providers:
|
|
||||||
raise ValueError('Downloads for this site are not yet supported')
|
|
||||||
|
|
||||||
return providers[name](uri=site)
|
|
||||||
25
yoink/scraper.py
Normal file
25
yoink/scraper.py
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
import requests
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
|
||||||
|
|
||||||
|
from yoink.common import supported_sites
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class Scrapable:
|
||||||
|
def __init__(self, url) -> None:
|
||||||
|
self.url = url
|
||||||
|
|
||||||
|
for link in supported_sites:
|
||||||
|
if link in self.url:
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
raise ValueError('Unsupported site')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@property
|
||||||
|
def markup(self) -> str: return requests.get(self.url).content
|
||||||
|
|
||||||
|
@property
|
||||||
|
def soup(self) -> BeautifulSoup: return BeautifulSoup(self.markup, 'html.parser')
|
||||||
@@ -1,42 +1,41 @@
|
|||||||
|
import imp
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import unittest
|
import unittest
|
||||||
from bs4 import BeautifulSoup
|
|
||||||
from yoink.bounty import Bounty, Downloader
|
from yoink.common import app_root, library_path, config_path, skippable_images, supported_sites, qb_client, required_comic_files, torrent_concurrent_download_limit, headers
|
||||||
from yoink.provider import Provider, ReadAllComics
|
from yoink.comic import Comic, ComicArchiver
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class BasicTestCase(unittest.TestCase):
|
class BasicTestCase(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.test_comic = 'http://readallcomics.com/static-season-one-4-2021/'
|
self.test_comic = 'http://readallcomics.com/static-season-one-4-2021/'
|
||||||
self.item = Bounty(self.test_comic)
|
self.comic = Comic(self.test_comic)
|
||||||
|
self.archiver = ComicArchiver(self.comic)
|
||||||
|
|
||||||
def test_000_provider_generates_or_fails_correctly(self):
|
def test_000_comic_generates_valid_markup(self):
|
||||||
# ensure valid comic link returns correct factory
|
self.assertTrue('!DOCTYPE html' in str(self.comic.markup))
|
||||||
self.assertTrue(isinstance(self.item.provider, ReadAllComics))
|
|
||||||
|
|
||||||
# ensure invalid comic link raises ValueError stating lack of support
|
def test_001_comic_has_valid_title(self):
|
||||||
def busted():
|
self.assertEqual('Static Season One 4 (2021)', self.comic.title)
|
||||||
return Bounty('http://viz.com')
|
|
||||||
|
|
||||||
with self.assertRaises(ValueError) as context:
|
def test_002_comic_has_valid_category(self):
|
||||||
busted()
|
self.assertEqual('Static: Season One', self.comic.category)
|
||||||
|
|
||||||
self.assertTrue('Downloads for this site are not yet supported' in context.exception)
|
def test_003_empty_comic_folder(self):
|
||||||
|
self.assertEqual(len(os.listdir(os.path.join(library_path, 'comics'))), 0)
|
||||||
|
|
||||||
|
def test_004_comic_folder_created_and_populated(self):
|
||||||
|
self.archiver.download()
|
||||||
|
self.assertTrue(os.path.exists(os.path.join(library_path, f'comics/{self.comic.title}')))
|
||||||
|
self.assertGreater(len(os.listdir(os.path.join(library_path, f'comics/{self.comic.title}'))), 0)
|
||||||
|
|
||||||
def test_001_provider_markup_returns_200(self):
|
def test_005_comic_archive_generated(self):
|
||||||
self.assertEqual(self.item.provider.markup.status_code, 200)
|
self.archiver.generate_archive()
|
||||||
|
self.assertTrue(os.path.exists(os.path.join(library_path, f'comics/{self.comic.title}/{self.comic.title}.cbr')))
|
||||||
|
|
||||||
def test_002_provider_soup_object_exists(self):
|
|
||||||
self.assertTrue(isinstance(self.item.provider.soup, BeautifulSoup))
|
|
||||||
|
|
||||||
|
|
||||||
def test_003_downloader_object_exists(self):
|
|
||||||
self.assertTrue(isinstance(self.item.downloader, Downloader))
|
|
||||||
|
|
||||||
def test_004_downloader_paths_exist(self):
|
|
||||||
self.assertTrue(os.path.exists(self.item.downloader.root_path))
|
|
||||||
self.assertTrue(os.path.exists(self.item.downloader.config_path))
|
|
||||||
|
|
||||||
|
def test_006_folder_cleaned_after_archive_generation(self):
|
||||||
|
self.archiver.cleanup_worktree()
|
||||||
|
self.assertAlmostEqual(len(os.listdir(os.path.join(library_path, f'comics/{self.comic.title}'))), 3)
|
||||||
76
yoink/torrent.py
Normal file
76
yoink/torrent.py
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
from bs4 import BeautifulSoup
|
||||||
|
import requests
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
from yoink.common import qb_client, library_path, config_path, app_root, headers
|
||||||
|
from yoink.scraper import Scrapable
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
stopped_state = ('pausedUP', 'stalledUP', 'uploading', 'seeding')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class TorrentDownloader:
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.limit = 1
|
||||||
|
self.queue = []
|
||||||
|
self.download_path = self.set_path(os.path.join(library_path, 'downloads'))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def create_torrent(cls, url):
|
||||||
|
return Torrent(url)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_torrent(cls, name):
|
||||||
|
return [torrent for torrent in qb_client.torrents() if name == torrent['name']][0]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def quick_download(cls, url):
|
||||||
|
if not isinstance(url, str):
|
||||||
|
raise TypeError('URL string expected')
|
||||||
|
|
||||||
|
if not url.startswith('magnet'):
|
||||||
|
markup = requests.get(url, headers=headers).content
|
||||||
|
soup = BeautifulSoup(markup, 'html.parser')
|
||||||
|
magnet_link = soup.find('a', attrs={'title': 'Get this torrent'}.attrs['href'])
|
||||||
|
|
||||||
|
qb_client.download_from_link(url if url.startswith('magnet') else magnet_link)
|
||||||
|
|
||||||
|
|
||||||
|
def set_path(self, path):
|
||||||
|
if path.strip() == '': raise ValueError('Path cannot be an empty string')
|
||||||
|
|
||||||
|
if not os.path.exists(path):
|
||||||
|
os.makedirs(path)
|
||||||
|
|
||||||
|
return path
|
||||||
|
|
||||||
|
def empty_queue(self):
|
||||||
|
self.queue = []
|
||||||
|
|
||||||
|
def add(self, torrent):
|
||||||
|
if not isinstance(torrent, Torrent):
|
||||||
|
raise TypeError('Not a valid torrent')
|
||||||
|
|
||||||
|
self.queue.append(torrent)
|
||||||
|
|
||||||
|
def download(self):
|
||||||
|
while len(self.queue) > 0:
|
||||||
|
for torrent in self.queue:
|
||||||
|
if not isinstance(torrent, Torrent):
|
||||||
|
raise TypeError('Not a valid torrent')
|
||||||
|
|
||||||
|
qb_client.download_from_link(torrent.magnet_link)
|
||||||
|
|
||||||
|
|
||||||
|
class Torrent(Scrapable):
|
||||||
|
def __init__(self, url) -> None:
|
||||||
|
super().__init__(url)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self) -> str: return self.soup.find('div', attrs={'id': 'title'})
|
||||||
|
|
||||||
|
@property
|
||||||
|
def magnet_link(self) -> str: return self.soup.find('a', attrs={'title': 'Get this torrent'}).attrs['href']
|
||||||
Reference in New Issue
Block a user