From 5e3dffb674e7514439d782cdc8474fc0b8eccbc4 Mon Sep 17 00:00:00 2001 From: Bryan Bailey Date: Wed, 9 Mar 2022 23:01:05 -0500 Subject: [PATCH] major refactor; cleaned up some wonky architecture; passing unit tests for comic downloads --- yoink/bounty.py | 66 -------------------- yoink/cli.py | 2 + yoink/comic.py | 100 ++++++++++++++++++++++++++++++ yoink/common.py | 19 ++++++ yoink/provider.py | 126 -------------------------------------- yoink/scraper.py | 25 ++++++++ yoink/tests/test_basic.py | 53 ++++++++-------- yoink/torrent.py | 76 +++++++++++++++++++++++ 8 files changed, 248 insertions(+), 219 deletions(-) delete mode 100644 yoink/bounty.py create mode 100644 yoink/comic.py create mode 100644 yoink/common.py delete mode 100644 yoink/provider.py create mode 100644 yoink/scraper.py create mode 100644 yoink/torrent.py diff --git a/yoink/bounty.py b/yoink/bounty.py deleted file mode 100644 index ca658c4..0000000 --- a/yoink/bounty.py +++ /dev/null @@ -1,66 +0,0 @@ -import os -from qbittorrent import Client -from yoink.provider import PirateBay, Provider, ReadAllComics - - - -class Downloader: - def __init__(self) -> None: - self.qb = Client('http://127.0.0.1:8080') - self.qb.login('admin', 'adminadmin') - self.headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.75 Safari/537.36'} - self.limit = 1 - self.queue = [] - self.config_path = self.set_path(os.path.abspath(os.path.join(os.environ.get('HOME'), '.config/yoink'))) - self.root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) - self.download_path = self.set_path(os.path.join(os.environ.get('HOME'), 'yoink/downloads')) - - - def __download_torrent(self, magnetlink): - pass - - - def set_path(self, path): - if path.strip() == '': raise ValueError('Path cannot be an empty string') - - if not os.path.exists(path): - os.makedirs(path) - - return path - - def empty_queue(self): - self.queue = [] - - def add(self, item): - self.queue.append(item) - - def download(self, file): - if isinstance(file, ReadAllComics): - pass - elif isinstance(file, PirateBay): - pass - else: - raise TypeError('Downloads from this site are not yet supported') - - -class Bounty: - def __init__(self, url): - self.provider = Provider(site=url) - self.downloader = Downloader() - - def plunder(self, *args, **kwargs): - if isinstance(self.provider, ReadAllComics): - pass - else: - raise TypeError(f'{self.provider} is not a valid provider') - - - - - - -if __name__ == '__main__': - item = Bounty('http://readallcomics.com/static-season-one-4-2021/') - # downloader = Downloader() - # print(downloader.download_path) - item.provider.download() \ No newline at end of file diff --git a/yoink/cli.py b/yoink/cli.py index 4c16ca9..a9a546d 100644 --- a/yoink/cli.py +++ b/yoink/cli.py @@ -1,5 +1,7 @@ import click +from yoink.common import qb_client, app_root, library_path, config_path + @click.group() diff --git a/yoink/comic.py b/yoink/comic.py new file mode 100644 index 0000000..2c1961b --- /dev/null +++ b/yoink/comic.py @@ -0,0 +1,100 @@ +from click import format_filename +from soupsieve import select +from yoink.common import required_comic_files, skippable_images, library_path +from yoink.scraper import Scrapable + +import os +import shutil +import urllib + + + +class Comic(Scrapable): + def __init__(self, url) -> None: + super().__init__(url) + + + def __get_image_src(self, comic): + if comic.attrs: + return comic.attrs['src'] + + for image in comic: + return image.attrs['src'] + + def __parse_soup(self): + soup = { + 'default': self.soup.find_all('div', class_='separator'), + 'no-div': self.soup.find_all('img', attrs={'width': '1000px'}), + 'excaliber': self.soup.find_all('img') + } + + for case in soup.keys(): + comics = soup.get(case) + + if len(comics) > 0: + return comics + + @property + def filelist(self): + comics = self.__parse_soup() + return [comic for comic in list(map(self.__get_image_src, comics)) if not comic.endswith(skippable_images)] + + + @property + def title(self): return self.soup.title.string.replace(' | Read All Comics Online For Free', '').replace('…', '').replace('#', '').replace(':', '').strip() + + @property + def category(self): + data = self.soup.find('a', attrs={'rel': 'category tag'} ) + return data.text + + def can_remove(self, filename): + return not filename.endswith(required_comic_files) + + +class ComicArchiver: + def __init__(self, comic : Comic) -> None: + self.comic = comic + self.worktree = os.path.join(library_path, f'comics/{self.comic.title}') + + def download(self): + + if not os.path.exists(self.worktree): + os.makedirs(self.worktree, mode=0o777) + + for index,url in enumerate(self.comic.filelist): + opener = urllib.request.build_opener() + opener.addheaders = [('User-Agent', 'Mozilla/5.0')] + urllib.request.install_opener(opener) + + if not url.endswith('.jpg'): + formatted_file = os.path.join(self.worktree, f'{self.comic.title} ' + ''.join([str(index).zfill(3), '.jpg'])) + print(formatted_file, end='\r') + urllib.request.urlretrieve(url, filename=formatted_file) + else: + page_number = url.split('/')[-1].split('.')[0].zfill(3) + file_extension = url.split('/')[-1].split('.')[1] + urllib.request.urlretrieve(url, filename=os.path.join(self.worktree, f'{self.comic.title}{page_number}.{file_extension}')) + + def generate_archive(self, archive_format='.cbr'): + if os.path.exists(os.path.join(self.worktree, f'{self.comic.title}{archive_format}')): + return + + output = shutil.make_archive(self.comic.title, 'zip', self.worktree) + os.rename(output, os.path.join(self.worktree, f'{self.comic.title}{archive_format}')) + + + def cleanup_worktree(self): + for image in os.listdir(self.worktree): + if not image.endswith(required_comic_files): + os.remove(os.path.join(self.worktree, image)) + +if __name__ == '__main__': + comic = Comic('http://www.readallcomics.com/static-season-one-4-2021/') + # # print(comic.filelist) + # # print(len(comic.filelist)) + # archiver = ComicArchiver(comic) + # archiver.download() + # archiver.generate_archive() + # archiver.cleanup_worktree() + print(comic.category) \ No newline at end of file diff --git a/yoink/common.py b/yoink/common.py new file mode 100644 index 0000000..47906f0 --- /dev/null +++ b/yoink/common.py @@ -0,0 +1,19 @@ +import pathlib + +from qbittorrent import Client + +import pathlib +# TODO replace os path with pathlib +import os + + + +app_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) +config_path = os.path.abspath(os.path.join(os.environ.get('HOME'), '.config/yoink')) +library_path = os.path.abspath(os.path.join(os.environ.get('HOME'), 'yoink/library')) +required_comic_files = ('.cbr', '.cbz', '000.jpg', '001.jpg') +skippable_images = ('logo-1.png', 'logo.png', 'report.png', 'request.png', 'prev.png', 'Next.png', 'Donate.png', '11.png') +torrent_concurrent_download_limit = 1 +qb_client = Client('127.0.0.1:8080').login('admin', 'adminadmin') +supported_sites = ['readallcomics.com', 'tpb.party'] +headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.75 Safari/537.36'} diff --git a/yoink/provider.py b/yoink/provider.py deleted file mode 100644 index 5b7d895..0000000 --- a/yoink/provider.py +++ /dev/null @@ -1,126 +0,0 @@ -import os -import requests -import urllib -from bs4 import BeautifulSoup -from urllib.parse import urlparse - - -root_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) -config_dir = os.path.abspath(os.environ.get('HOME')) - -class Downloadable(object): - stopped_state = ('pausedUP', 'stalledUP', 'uploading', 'seeding') - - def __init__(self, uri) -> None: - self.uri = uri - self.headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.75 Safari/537.36'} - - @property - def markup(self): - return requests.get(self.uri) - - @property - def soup(self): - return BeautifulSoup(self.markup.content, 'html.parser') - - def download(self): - pass - - -class PirateBay(Downloadable): - @property - def magnet(self): - self.soup.find('', attrs={'title': 'Get this torrent'}).attrs['href'] - -class ReadAllComics(Downloadable): - - def __init__(self, uri) -> None: - super().__init__(uri) - self.filelist = self.__get_comic_filelist() - - @classmethod - def get_frontpage_links(cls): - markup = requests.get('http://www.readallcomics.com') - soup = BeautifulSoup(markup.content, 'html.parser') - posts = soup.find_all('div', class_='type-post') - links = [] - - for post in posts: - links.append({ - 'title': post.find('h2').text, - 'image': post.find('img', height='250').attrs['src'], - 'uri': post.find('a', class_='font-link').attrs['href'] - }) - - return links - - @property - def title(self): - return self.soup.title.string.replace(' | Read All Comics Online For Free', '').replace('…', '').replace('#', '').replace(':', '').strip() - - @property - def category(self): - data = self.soup.find('a', attrs={'rel': 'category tag'}) - return data.text - - def __can_remove(self, filename): - ignore = ('.cbr', '.cbz', '000.jpg', '001.jpg') - return not filename.endswith(ignore) - - def __get_image_src(self, comic): - if comic.attrs: - return comic.attrs['src'] - - for image in comic: - return image.attrs['src'] - - def __parse_soup(self): - soup = { - 'default': self.soup.find_all('div', class_='separator'), - 'no-div': self.soup.find_all('img', attrs={'width': '1000px'}), - 'excaliber': self.soup.find_all('img') - } - - for case in soup.keys(): - comics = soup.get(case) - - if len(comics) > 0: - return comics - - def __get_comic_filelist(self): - comics = self.__parse_soup() - return list(map(self.__get_image_src, comics)) - - - def download(self): - skippable_files = ('logo-1.png', 'logo.png', 'report.png', 'request.png', 'prev.png', 'Next.png', 'Donate.png', '11.png') - - for url in self.filelist: - opener = urllib.request.build_opener() - opener.addheaders = [('User-agent', self.headers['user-agent'])] - urllib.request.install_opener(opener) - - if url.endswith(skippable_files): - continue - - if not url.endswith('.jpg'): - urllib.request.urlretrieve(url, filename=os.path.join(self.download_path + f'/{self.title}', f'{self.title}'.join([str(url.index(url)).zfill(3), '.jpg']))) - else: - page_number = url.split('/')[-1].split('.')[0].zfill(3) - file_extension = url.split('/')[-1].split('.')[1] - urllib.request.urlretrieve(url, filename=os.path.join(self.download_path + f'/{self.title}', f'{self.title}{page_number}.{file_extension}')) - - -def Provider(site='http://readallcomics.com'): - providers = { - 'readallcomics': ReadAllComics - } - - domain = urlparse(site) - - name=domain.netloc.split('.')[0] - - if name not in providers: - raise ValueError('Downloads for this site are not yet supported') - - return providers[name](uri=site) \ No newline at end of file diff --git a/yoink/scraper.py b/yoink/scraper.py new file mode 100644 index 0000000..bd98c7a --- /dev/null +++ b/yoink/scraper.py @@ -0,0 +1,25 @@ +import requests +from bs4 import BeautifulSoup + + +from yoink.common import supported_sites + + + +class Scrapable: + def __init__(self, url) -> None: + self.url = url + + for link in supported_sites: + if link in self.url: + return + else: + raise ValueError('Unsupported site') + + + + @property + def markup(self) -> str: return requests.get(self.url).content + + @property + def soup(self) -> BeautifulSoup: return BeautifulSoup(self.markup, 'html.parser') \ No newline at end of file diff --git a/yoink/tests/test_basic.py b/yoink/tests/test_basic.py index eb632da..ba2c6ac 100644 --- a/yoink/tests/test_basic.py +++ b/yoink/tests/test_basic.py @@ -1,42 +1,41 @@ +import imp +from bs4 import BeautifulSoup + import os import unittest -from bs4 import BeautifulSoup -from yoink.bounty import Bounty, Downloader -from yoink.provider import Provider, ReadAllComics + +from yoink.common import app_root, library_path, config_path, skippable_images, supported_sites, qb_client, required_comic_files, torrent_concurrent_download_limit, headers +from yoink.comic import Comic, ComicArchiver class BasicTestCase(unittest.TestCase): def setUp(self): self.test_comic = 'http://readallcomics.com/static-season-one-4-2021/' - self.item = Bounty(self.test_comic) + self.comic = Comic(self.test_comic) + self.archiver = ComicArchiver(self.comic) - def test_000_provider_generates_or_fails_correctly(self): - # ensure valid comic link returns correct factory - self.assertTrue(isinstance(self.item.provider, ReadAllComics)) + def test_000_comic_generates_valid_markup(self): + self.assertTrue('!DOCTYPE html' in str(self.comic.markup)) - # ensure invalid comic link raises ValueError stating lack of support - def busted(): - return Bounty('http://viz.com') + def test_001_comic_has_valid_title(self): + self.assertEqual('Static Season One 4 (2021)', self.comic.title) - with self.assertRaises(ValueError) as context: - busted() + def test_002_comic_has_valid_category(self): + self.assertEqual('Static: Season One', self.comic.category) - self.assertTrue('Downloads for this site are not yet supported' in context.exception) + def test_003_empty_comic_folder(self): + self.assertEqual(len(os.listdir(os.path.join(library_path, 'comics'))), 0) + def test_004_comic_folder_created_and_populated(self): + self.archiver.download() + self.assertTrue(os.path.exists(os.path.join(library_path, f'comics/{self.comic.title}'))) + self.assertGreater(len(os.listdir(os.path.join(library_path, f'comics/{self.comic.title}'))), 0) - def test_001_provider_markup_returns_200(self): - self.assertEqual(self.item.provider.markup.status_code, 200) - - - def test_002_provider_soup_object_exists(self): - self.assertTrue(isinstance(self.item.provider.soup, BeautifulSoup)) - - - def test_003_downloader_object_exists(self): - self.assertTrue(isinstance(self.item.downloader, Downloader)) - - def test_004_downloader_paths_exist(self): - self.assertTrue(os.path.exists(self.item.downloader.root_path)) - self.assertTrue(os.path.exists(self.item.downloader.config_path)) + def test_005_comic_archive_generated(self): + self.archiver.generate_archive() + self.assertTrue(os.path.exists(os.path.join(library_path, f'comics/{self.comic.title}/{self.comic.title}.cbr'))) + def test_006_folder_cleaned_after_archive_generation(self): + self.archiver.cleanup_worktree() + self.assertAlmostEqual(len(os.listdir(os.path.join(library_path, f'comics/{self.comic.title}'))), 3) \ No newline at end of file diff --git a/yoink/torrent.py b/yoink/torrent.py new file mode 100644 index 0000000..2695d3d --- /dev/null +++ b/yoink/torrent.py @@ -0,0 +1,76 @@ +from bs4 import BeautifulSoup +import requests + +import os + +from yoink.common import qb_client, library_path, config_path, app_root, headers +from yoink.scraper import Scrapable + + + +stopped_state = ('pausedUP', 'stalledUP', 'uploading', 'seeding') + + + +class TorrentDownloader: + def __init__(self) -> None: + self.limit = 1 + self.queue = [] + self.download_path = self.set_path(os.path.join(library_path, 'downloads')) + + @classmethod + def create_torrent(cls, url): + return Torrent(url) + + @classmethod + def get_torrent(cls, name): + return [torrent for torrent in qb_client.torrents() if name == torrent['name']][0] + + @classmethod + def quick_download(cls, url): + if not isinstance(url, str): + raise TypeError('URL string expected') + + if not url.startswith('magnet'): + markup = requests.get(url, headers=headers).content + soup = BeautifulSoup(markup, 'html.parser') + magnet_link = soup.find('a', attrs={'title': 'Get this torrent'}.attrs['href']) + + qb_client.download_from_link(url if url.startswith('magnet') else magnet_link) + + + def set_path(self, path): + if path.strip() == '': raise ValueError('Path cannot be an empty string') + + if not os.path.exists(path): + os.makedirs(path) + + return path + + def empty_queue(self): + self.queue = [] + + def add(self, torrent): + if not isinstance(torrent, Torrent): + raise TypeError('Not a valid torrent') + + self.queue.append(torrent) + + def download(self): + while len(self.queue) > 0: + for torrent in self.queue: + if not isinstance(torrent, Torrent): + raise TypeError('Not a valid torrent') + + qb_client.download_from_link(torrent.magnet_link) + + +class Torrent(Scrapable): + def __init__(self, url) -> None: + super().__init__(url) + + @property + def name(self) -> str: return self.soup.find('div', attrs={'id': 'title'}) + + @property + def magnet_link(self) -> str: return self.soup.find('a', attrs={'title': 'Get this torrent'}).attrs['href']