import datetime import time from random import randint import re import requests import scrapy from scrapy.crawler import CrawlerProcess from scrapy.exceptions import NotSupported, CloseSpider from scrapy.linkextractors import LinkExtractor from django.conf import settings from django.db import transaction, IntegrityError from django.utils import timezone from . import models from .utils import clean_url, append_to_results, get_domain FACEBOOK_DOMAINS = ("facebook.com", "facebook.net", "fbcdn.net") TWITTER_DOMAINS = ("twitter.com", "twimg.com", "twttr.net", "twttr.com", "abs.twimg.com") INSTAGRAM_DOMAINS = ("instagram.com", "cdninstagram.com") YOUTUBE_DOMAINS = ("youtu.be", "youtube.com") DAILYMOTION_DOMAINS = ("dailymotion.com",) VIMEO_DOMAINS = ("vimeo.com",) VIDEO_EXTS = (".webm", ".mkv", ".flv", ".ogv", ".mov", ".wmv", ".avi", ".mpg", ".mp4", ".m4v", ".mp2", ".mpeg") AUDIO_EXTS = (".aac", ".flac", ".m4a", ".mp3", ".ogg", ".oga", ".opus", ".wma", ".webm") OFFICE_EXTS = (".csv", ".doc", ".docx", ".odt", ".rtf", ".ods", ".xls", ".xlsx") CALENDAR_KEYS = ["agenda", "calendar"] DATE_REG = re.compile(r'20[0-9]{2}-[0-9]{2}-[0-9]{2}') DATE_FORMATS = ['%Y-%m-%d', '%Y/%m/%d', '%d/%m/%Y'] DATE_MONTH_DELAY = 1 MAX_LINKS = None # if None no max TIMEOUT = datetime.timedelta(minutes=settings.CRAWL_TIMEOUT) NUMBER_PER_PAGE = settings.NUMBER_PER_SESSION ONLY_FIRST_PAGE = True class DefaultSpider: name = None start_urls = None allowed_domains = [] excluded_domains = [] crawl_id = None target_id = None crawl_result = None links_reached = set() redirect = None def start_requests(self): q = { "crawl_id": self.crawl_id, "target_id": self.target_id, "status__in": ["F", "T"], } if models.CrawlResult.objects.filter(**q).count(): return [] q.pop("status__in") if models.CrawlResult.objects.filter(**q).count(): # delete a previous interrupted attempt res = models.CrawlResult.objects.get(**q) res.delete() for url in self.start_urls: yield scrapy.Request(url, self.parse) def _parse_image(self, response, result): if "images" not in result: result["images"] = [] for img in response.css('img'): attributes = img.attrib if "src" not in attributes: continue src = attributes["src"] is_a_real_src = src.startswith("http") or src.startswith("/") if not src or not is_a_real_src or src in result["images"]: continue result["images"].append(src) def _parse_iframe(self, response, result): for img in response.css('iframe'): attributes = img.attrib if "src" not in attributes: continue src = attributes["src"] is_a_real_src = src.startswith("http") or src.startswith("/") if not src or not is_a_real_src: continue current_domain = get_domain(src) if current_domain in YOUTUBE_DOMAINS: append_to_results(result, "youtube", src) elif current_domain in DAILYMOTION_DOMAINS: append_to_results(result, "dailymotion", src) elif current_domain in VIMEO_DOMAINS: append_to_results(result, "vimeo", src) def _parse_internal_files(self, url, result): types = (("video", VIDEO_EXTS), ("audio", AUDIO_EXTS), ("internal_pdf", ("pdf",)), ("internal_office", OFFICE_EXTS)) return self._parse_files(url, result, types) def _parse_external_files(self, url, result): types = (("external_pdf", ("pdf",)), ("external_office", OFFICE_EXTS)) return self._parse_files(url, result, types) def _parse_files(self, url, result, types): """ Parse url for file :return: True if is a file """ url = url.lower() for content_type, extensions in types: if [1 for ext in extensions if url.endswith(ext)]: append_to_results(result, content_type, url) return True def timeout(self): if not self.crawl_result: q = { "crawl_id": self.crawl_id, "target_id": self.target_id, } if not models.CrawlResult.objects.filter(**q).count(): return self.crawl_result = models.CrawlResult.objects.get(**q) duration = timezone.now() - self.crawl_result.started if duration < TIMEOUT: return with transaction.atomic(): result = models.CrawlResult.objects.select_for_update().get( pk=self.crawl_result.pk) result.status = "T" result.duration = (timezone.now() - result.started) result.save() raise CloseSpider('timeout') def parse(self, response): result = { "url": response.url, } self.timeout() for domain in self.excluded_domains: if domain in response.url: result["is_online"] = False if result.get("is_online", None) is False: yield result else: result["is_online"] = True try: self._parse_image(response, result) self._parse_iframe(response, result) for link in LinkExtractor().extract_links(response): full_url = link.url url = clean_url(link.url) if url is None or url in self.links_reached: continue match = DATE_REG.match(full_url) calendar_view, too_old = False, False if match: for calendar_key in CALENDAR_KEYS: if calendar_key in url: calendar_view = True if calendar_view: for date_format in DATE_FORMATS: try: d = datetime.datetime.strptime(match.group(), date_format) if datetime.date.today() - d > \ datetime.timedelta( 31 * DATE_MONTH_DELAY): too_old = True except ValueError: pass if too_old: continue is_internal = False for domain in self.allowed_domains: if domain in url: is_internal = True self.links_reached.add(url) is_file = self._parse_internal_files(url, result) if not is_file and \ not self.timeout() and ( not MAX_LINKS or len(self.links_reached) < MAX_LINKS): yield response.follow(link.url, self.parse) if not is_internal: current_domain = get_domain(url) if current_domain in FACEBOOK_DOMAINS: append_to_results(result, "facebook", url) elif current_domain in TWITTER_DOMAINS: append_to_results(result, "twitter", url) elif current_domain in INSTAGRAM_DOMAINS: append_to_results(result, "instagram", url) else: is_file = self._parse_external_files(url, result) if not is_file: append_to_results(result, "external_link", url) except NotSupported: print("No response", response.url) yield result def closed(self, reason): DbPipeline().close(self) class DbPipeline: BASE_KEYS = ["url", "crawl_id", "target_id"] NB_KEYS = ["external_link", "images", "facebook", "twitter", "instagram", "youtube", "dailymotion", "vimeo", "video", "audio", "internal_pdf", "external_pdf", "internal_office", "external_office"] def _get_result_pk(self, spider): """ Atomic creation :param spider: current spider :return: result_pk, created """ pks = { "crawl_id": spider.crawl_id, "target_id": spider.target_id, } created = False try: result = models.CrawlResult.objects.get(**pks) except models.CrawlResult.DoesNotExist: try: with transaction.atomic(): if spider.redirect: pks["redirection"] = spider.redirect result = models.CrawlResult.objects.create(**pks) created = True except IntegrityError: result = models.CrawlResult.objects.get(**pks) return result.pk, created def _update(self, result_pk, item, result_created): """ Atomic update """ with transaction.atomic(): result = models.CrawlResult.objects.select_for_update().get( pk=result_pk) crawl_result = result.crawl_result if crawl_result: crawl_result = crawl_result[0] else: crawl_result = {} if "urls" not in crawl_result: crawl_result["urls"] = [] url = item.pop("url") if url in crawl_result["urls"]: return crawl_result["urls"].append(url) for k, value in item.items(): if k == "is_online": if result_created: # only update on the first link result.is_online = value elif k in self.NB_KEYS: if k not in crawl_result: crawl_result[k] = [] for subvalue in value: if subvalue in crawl_result[k]: continue crawl_result[k].append(subvalue) setattr(result, "nb_" + k, len(crawl_result[k])) result.nb_internal_link = len(crawl_result["urls"]) - 1 result.crawl_result = [crawl_result] result.save() return True def process_item(self, item, spider): result_pk, created = self._get_result_pk(spider) self._update(result_pk, item, created) return item def close(self, spider): result_pk, created = self._get_result_pk(spider) with transaction.atomic(): result = models.CrawlResult.objects.select_for_update().get( pk=result_pk) if result.status == "P": result.status = "F" result.duration = (timezone.now() - result.started) result.save() def create_spider(name, urls, crawl, target, excluded_domains=None, redirect=None): if not excluded_domains: excluded_domains = [] return type( name, (DefaultSpider, scrapy.Spider), {"name": name, "start_urls": urls, "allowed_domains": [get_domain(url) for url in urls], "crawl_id": crawl.pk, "target_id": target.pk, "links_reached": set(), "excluded_domains": excluded_domains, "redirect": redirect} ) def launch_match(crawl_item): # reset models.CrawlRelation.objects.filter(crawl_id=crawl_item.pk).delete() for result in crawl_item.results.values( "pk", "crawl_result", "target_id", "target__url").all(): if not result["crawl_result"] or \ "external_link" not in result["crawl_result"][0]: continue domains = [ get_domain(link) for link in result["crawl_result"][0]["external_link"] ] for subresult in crawl_item.results.values( "pk", "target_id", "target__url").all(): if subresult["pk"] == result["pk"]: continue url = subresult["target__url"] if url and get_domain(url) in domains: rel, created = models.CrawlRelation.objects.get_or_create( crawl_id=crawl_item.pk, source_id=result["target_id"], destination_id=subresult["target_id"]) if not created: # multiple links rel.number += 1 rel.save() crawl_item.progression = (crawl_item.progression or 0) + 1 crawl_item.save() crawl_item.ended = timezone.now() crawl_item.status = "F" crawl_item.save() def update_db_result(result_dct, values): result, __ = models.CrawlResult.objects.get_or_create( **result_dct) for k in values.keys(): setattr(result, k, values[k]) result.save() def launch_crawl(crawl_item, excluded_domains=None): scrap_settings = settings.SCRAPPY_SETTINGS.copy() crawl_item.started = timezone.now() crawl_item.pre_crawl_ended = None crawl_item.crawl_ended = None crawl_item.ended = None crawl_item.progression = 0 crawl_item.status = "P" crawl_item.save() q = crawl_item.targets # slice total = q.count() targets = q.values("id") page = 0 page_number = total // NUMBER_PER_PAGE while page <= page_number and not (ONLY_FIRST_PAGE and page): process = CrawlerProcess(settings=scrap_settings) idx = 0 current_idx = page * NUMBER_PER_PAGE has_url_to_process = False while current_idx < total and idx < NUMBER_PER_PAGE: target = models.Target.objects.filter(pk=targets[current_idx]['id']) idx += 1 current_idx = idx + page * NUMBER_PER_PAGE if not target.count(): # target has disappear continue target = target.all()[0] result_dct = { "crawl_id": crawl_item.pk, "target_id": target.pk, } response, verify_ssl = None, True url = target.url while response is None: try: response = requests.get(target.url, verify=verify_ssl, timeout=45) except requests.exceptions.SSLError: if not verify_ssl: # new error on SSL response = False else: update_db_result(result_dct, {"bad_ssl": True}) verify_ssl = False time.sleep( settings.SCRAPPY_SETTINGS["DOWNLOAD_DELAY"] - 1 + randint(0, 20) / 10) except requests.exceptions.RequestException: update_db_result(result_dct, {"is_online": False, "status": "F"}) response = False if response is False: # scrapy is more permissive - try it pass elif response.status_code == 404: update_db_result(result_dct, {"is_online": False, "status": "F"}) continue else: url = target.url has_url_to_process = True redirect = None if response and getattr(response, 'history', None): url = response.url redirect = url domain = get_domain(url) if domain in excluded_domains: update_db_result( result_dct, {"is_online": False, "status": "F", "redirection": redirect}) continue process.crawl( create_spider( "Crawl{}Target{}".format(crawl_item.pk, target.pk), [url], crawl_item, target, excluded_domains, redirect ) ) if has_url_to_process: process.start(stop_after_crawl=ONLY_FIRST_PAGE) page += 1 crawl_item.crawl_ended = timezone.now() crawl_item.status = "M" crawl_item.save() launch_match(crawl_item)