import os
import re
import random
import tqdm
import time
import requests
import lxml
from bs4 import BeautifulSoup
import multiprocessing
import threading
from queue import Queue
from IPython.display import clear_output
from google.colab import drive
drive.mount('/content/drive', force_remount=False)
try:
from selenium import webdriver
except:
!pip install selenium
!apt install chromium-chromedriver
!pip install retrying
from selenium import webdriver
from retrying import retry
clear_output()
#=================================================================================================================================================================================================================================================
class Flickr:
def __init__(self, url, path):
self.headers = {"user-agent" : "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36"}
self.url = re.sub('\/$', '', url)
self.path = path
def get_album_list(self, url):
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless')
chrome_options.add_argument('user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
wd = webdriver.Chrome('chromedriver',options=chrome_options)
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
wd.implicitly_wait(10)
wd.get(url)
self.window_scroll_down(wd)
album_list = wd.execute_script("""return [...document.getElementsByClassName("overlay")].map(item => "https://www.flickr.com" + item.getAttribute("href"))""")
wd.quit()
return album_list
def window_scroll_down(self, wd_chrome):
last_height = wd_chrome.execute_script("return document.body.scrollHeight")
while True:
wd_chrome.execute_script("window.scrollTo(0, document.body.scrollHeight);")
time.sleep(2)
new_height = wd_chrome.execute_script("return document.body.scrollHeight")
if new_height == last_height:
break
last_height = new_height
def get_album_photo_info(self, driver, url):
album_photo_info = {}
album_photo_list = []
driver.get(url)
album_photo_info['album_name'] = "|".join(driver.execute_script("""return document.getElementsByClassName("album-title")[0].innerText.split(" ").join("_")""").split("/"))
i = 1
while True:
self.window_scroll_down(driver)
page_photo = driver.execute_script("""return [...document.getElementsByClassName("overlay")].map(item => "https://www.flickr.com" + item.getAttribute("href"))""")
if(len(page_photo)) == 0:
break
album_photo_list.append(page_photo)
i+=1
next_url = re.sub("\/page\d+|\/$", "", url) + "/page%d"%(i)
driver.get(next_url)
album_photo_html = list(map(lambda x: re.search('(.*)\/in\/', x).group(1)+"/sizes/o/", [item for sublist in album_photo_list for item in sublist]))
album_photo_info['photo_list'] = list(dict.fromkeys(album_photo_html))
return album_photo_info
@retry(stop_max_attempt_number=10, wait_fixed=1000)
def get_Url_Response(self, url):
time.sleep(random.random())
response = requests.get(url, headers = self.headers, stream = True)
return response
def download_album_photo(self, album_path, album_name, photo_list, start_number):
threads = []
que = Queue()
number = start_number
for photo_url in photo_list:
que.put([photo_url, str(number).zfill(5)])
number += 1
for l in range(0,10):
t=threading.Thread(target=self.download_single_photo, args=(album_path, album_name, que, ))
threads.append(t)
for l in threads:
l.start()
for l in threads:
l.join()
def download_single_photo(self, album_path, album_name, que):
while not que.empty():
photo_info = que.get()
photo_html = photo_info[0]
photo_number = photo_info[1]
while True:
try:
response = self.get_Url_Response(photo_html)
soup = BeautifulSoup(response.text, "lxml")
img_url = soup.find("div", {"id": "allsizes-photo"}).select_one("img").get('src')
photo_name = "|".join("_".join(soup.title.getText().split("|")[1].split(" ")).split("/"))
photo_id = photo_html.split("/")[-4]
r = self.get_Url_Response(img_url)
img_path = os.path.join(album_path,"{}-{}-{}-{}.jpg".format(photo_number, album_name, photo_name, photo_id))
with open(img_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=128):
f.write(chunk)
break
except Exception as e:
print(photo_html, str(e))
time.sleep(1.5)
def execute_download(self, path, queue, album_url, index):
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--headless')
chrome_options.add_argument('user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
driver = webdriver.Chrome('chromedriver',options=chrome_options)
chrome_options.add_experimental_option('excludeSwitches', ['enable-automation'])
driver.implicitly_wait(10)
album_photo_info = self.get_album_photo_info(driver, album_url)
photo_list = album_photo_info['photo_list']
album_name = album_photo_info['album_name']
album_path = os.path.join(path, album_name)
try:
start_number = len(os.listdir(album_path)) + 1
except:
os.makedirs(album_path)
start_number = 1
self.download_album_photo(album_path, album_name, photo_list, start_number)
print(" "*10+f"No.{index+1}: <{album_name}> 原本有{len(photo_list)}張, 已下載{len(os.listdir(album_path))-start_number+1}張。")
driver.quit()
queue.put("done")
def execute_command(self):
try:
os.listdir(self.path)
except:
os.makedirs(self.path)
try:
re.search('(albums\/\d+)', self.url).group(1)
album_list = [self.url]
except:
album_list = self.get_album_list(self.url)
print("Photo_album total: ", len(album_list))
pool = multiprocessing.Pool(4)
queue = multiprocessing.Manager().Queue()
for index, album_url in enumerate(album_list):
pool.apply_async(self.execute_download, args=(self.path, queue, album_url, index, ))
pool.close()
for i in tqdm.tqdm(range(len(album_list))):
file = queue.get()
print("\nfinished")
#=================================================================================================================================================================================================================================================
if __name__ == "__main__":
url = input("Please enter albums flickr url\n") #https://www.flickr.com/photos/191564160@N06/albums #https://www.flickr.com/photos/nobadgers/albums
path = "/content/drive/MyDrive/flickr_photo/"
Flickr(url, path).execute_command()