Last active
September 26, 2018 10:04
-
-
Save fcharmy/7aa147914166fe512316b9fd81b78c2f to your computer and use it in GitHub Desktop.
Crawl IG hashtag posts including post link, main images url, thumbnail, is_video, username, likes, caption, posted time, user profile image url. Only show the spider implementation, pipeline need to be implement separately.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import urllib.parse as urlparse | |
from scrapy import Item, Field, Spider, Request | |
from scrapy.exceptions import CloseSpider | |
# ------ Instagram settings ------ | |
IG_SERVER = "https://www.instagram.com" | |
IG_PATH = '/explore/tags/' | |
IG_POST_PATH = "p/" | |
IG_DEFAULT_PARAM = '?__a=1' | |
IG_CURSOR_PARAM = '&max_id=' | |
IG_CURSOR_NODE = ['graphql', 'hashtag', 'edge_hashtag_to_media', 'page_info', 'end_cursor'] | |
IG_NEXT_NODE = ['graphql', 'hashtag', 'edge_hashtag_to_media', 'page_info', 'has_next_page'] | |
# define post nodes and extract post items | |
IG_POSTS_NODE = ['graphql', 'hashtag', 'edge_hashtag_to_media', 'edges'] | |
IG_SHORT_CUT = ['shortcode'] | |
IG_ITEM = { | |
"url": ['display_url'], | |
"thumbnail_url": ['thumbnail_src'], | |
"posted": ['taken_at_timestamp'], | |
"like": ["edge_media_preview_like", "count"], | |
"caption": ['edge_media_to_caption', 'edges', 0, 'node', 'text'], | |
"is_video": ['is_video'] | |
} | |
# define a post node from post page and extract user info | |
IG_POST_NODE = ['graphql', 'shortcode_media'] | |
IG_USER_NAME = IG_POST_NODE + ['owner', 'username'] | |
IG_USER_PROFILE = IG_POST_NODE + ['owner', 'profile_pic_url'] | |
# ------ End of Instagram settings ------ | |
class ImageItem(Item): | |
url = Field() | |
thumbnail_url = Field() | |
username = Field() | |
link = Field() | |
posted = Field(serializer=int) | |
like = Field(serializer=int) | |
caption = Field() | |
profile = Field() | |
is_video = Field(serializer=bool) | |
class HashtagSpider(Spider): | |
name = "hashtag_spider" | |
def __init__(self, *, hashtag=None, max_times=None, **kwargs): | |
""" | |
Crawl IG by hashtag | |
:param hashtag: hashtag name without '#', | |
eg. hashtag='aloha' if post with #aloha | |
:param max_times: max time of calling api, | |
every call will return roughly 60 items, | |
so if you need 300 posts just set max_times as 5 | |
:param kwargs: overwrite default settings | |
Ps: pipeline can set close_down to true to stop crawler | |
to prevent duplicate posts. | |
""" | |
super(HashtagSpider, self).__init__(**kwargs) | |
for (k, v) in kwargs.items(): | |
setattr(self, k, v) | |
self.SERVER = kwargs.get('server', IG_SERVER) | |
self.PATH = kwargs.get('path', IG_PATH) | |
self.DEFAULT_PARAM = kwargs.get('param', IG_DEFAULT_PARAM) | |
self.PARAM_NAME = kwargs.get('param', IG_CURSOR_PARAM) | |
self.POST_PATH = kwargs.get('user_url', IG_POST_PATH) | |
self.POST_NODE = kwargs.get('user_node', IG_POST_NODE) | |
self.CURSOR_NODE = kwargs.get('id_node', IG_CURSOR_NODE) | |
self.NEXT_NODE = kwargs.get('next_node', IG_NEXT_NODE) | |
self.NODE = kwargs.get('node', IG_POSTS_NODE) | |
self.ITEM = kwargs.get('item', IG_ITEM) | |
self.SHOTCUT = kwargs.get('shortcut', IG_SHORT_CUT) | |
self.USERNAME = kwargs.get('username', IG_USER_NAME) | |
self.USERPROFILE = kwargs.get('profile', IG_USER_PROFILE) | |
if hashtag: | |
self.start_urls = [self.get_url(hashtag)] | |
self.max = max_times or 1 | |
self.count = 0 | |
self.close_down = False | |
def get_item(self, node): | |
item = dict() | |
for (k, v) in self.ITEM.items(): | |
item[k] = get_property(node, v) | |
item['link'] = urlparse.urljoin( | |
self.SERVER, self.POST_PATH + get_property(node, self.SHOTCUT) + '/') | |
return ImageItem(item) | |
def parse(self, response): | |
data = json.loads(response.text) | |
next_id = self.get_next_id(data) | |
has_next = self.has_next_page(data) | |
self.count = self.count + 1 | |
nodes = get_property(data, self.NODE) | |
if nodes and len(nodes): | |
for node in nodes: | |
node = node.get('node') | |
item = self.get_item(node) | |
# get username, if do not need username can directly yield item | |
request = Request(url=self.get_user_url( | |
node.get('shortcode')), callback=self.parse_user, errback=self.parse_user) | |
request.meta['item'] = item | |
yield request | |
if next_id and has_next and self.count < self.max: | |
yield Request(url=self.get_next_url(response.url, next_id), callback=self.parse) | |
if self.close_down: | |
raise CloseSpider(reason='Too many duplicate image, maximum number exceeded') | |
def parse_user(self, response): | |
item = response.meta['item'] | |
try: | |
data = json.loads(response.text) | |
item["username"] = get_property(data, self.USERNAME) | |
item["profile"] = get_property(data, self.USERPROFILE) | |
except: | |
pass | |
yield ImageItem(**item) | |
def get_url(self, hashtag): | |
return urlparse.urljoin(self.SERVER, self.PATH + hashtag + '/' + self.DEFAULT_PARAM) | |
def get_next_url(self, url, next_id): | |
return urlparse.urljoin(url.split('?')[0], self.DEFAULT_PARAM + self.PARAM_NAME + next_id) | |
def get_user_url(self, short_code): | |
return urlparse.urljoin(self.SERVER, self.POST_PATH + short_code + '/' + self.DEFAULT_PARAM) | |
def has_next_page(self, data): | |
return get_property(data, self.NEXT_NODE) | |
def get_next_id(self, data): | |
return get_property(data, self.CURSOR_NODE) | |
def get_property(data, path): | |
v = data | |
for p in path: | |
if isinstance(p, int): | |
if isinstance(v, list) and len(v) > p: | |
v = v[p] | |
else: | |
v = None | |
elif isinstance(p, str): | |
v = v.get(p, None) | |
if v is None: | |
break | |
return v |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment