爬虫:获取ins指定用户的粉丝数

操作流程

1.分析网页URL

浏览器F12进入开发者模式,点击Network—>XHR或JS,或者burpsuite抓包,分析目标数据所在的真实URL。

2.模拟请求网页数据

2.1 HTTP 消息结构

HTTP 消息是客户端和服务器之间通信的基础,它们由一系列的文本行组成,遵循特定的格式和结构。
HTTP消息分为两种类型:请求消息和响应消息。

2.1.1 客户端请求消息

请求行、请求头部、空行和请求数据
请求报文的一般格式:

  • 请求行
    • 请求方法:GET、POST、PUT、DELETE等。
    • 请求URL:请求的资源路径,通常包括主机名、端口号(如果非默认)、路径和查询字符串。
    • HTTP 版本:如 HTTP/1.1 或 HTTP/2
      请求行的格式示例:GET /index.html HTTP/1.1
  • 请求头
    在模拟请求时,如果不设置请求头,很容易被网站发现是爬虫脚本,从而对这种模拟请求进行拒绝。使用 params 关键字参数,以一个字符串字典来提供这些参数。
  • 空行
    请求头和请求体之间的分隔符,表示请求头的结束。
  • 请求体(可选)
    在某些类型的HTTP请求(如 POST 和 PUT)中,请求体包含要发送给服务器的数据。
2.1.2 服务器请求消息

状态行、消息报头、空行和响应正文

  • 状态行
    • HTTP 版本:与请求消息中的版本相匹配。
    • 状态码:三位数,表示请求的处理结果。
      • 1xx(信息性状态码):表示接收的请求正在处理。
      • 2xx(成功状态码):表示请求正常处理完毕。
      • 3xx(重定向状态码):需要后续操作才能完成这一请求。
      • 4xx(客户端错误状态码):表示请求包含语法错误或无法完成。
      • 5xx(服务器错误状态码):服务器在处理请求的过程中发生了错误。
    • 状态信息:状态码的简短描述。
      状态行的格式示例:HTTP/1.1 200 OK
  • 响应头
    • 包含了服务器环境信息、响应体的大小、服务器支持的压缩类型等。
  • 空行
    • 响应头和响应体之间的分隔符,表示响应头的结束。
  • 响应体(可选)
    • 包含服务器返回的数据,如请求的网页内容、图片、JSON数据等

2.2 定制请求头

headers = {
        "Host": "www.instagram.com",
        "Connection": "close",
        "sec-ch-ua-mobile": "?0",
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
        "Accept": "*/*",
        "sec-ch-ua-platform-version": "\"12.2.1\"",
        "X-Requested-With": "XMLHttpRequest",
        "X-ASBD-ID": "129477",
        "X-CSRFToken": "aMFPLyFGuRw9PXGLAg8CXx",
        "X-IG-App-ID": "936619743392459",
        "Referer": f"https://www.instagram.com/{sanitized_username}/",
        "Accept-Encoding": "gzip, deflate",
        "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"
    }

3.获取数据

if 'data' in data and 'user' in data['data'] and 'edge_followed_by' in data['data']['user']:
follower_count = data['data']['user']['edge_followed_by']['count']

过程中遇到的问题

1.登录状态时,无法有效模拟请求

经大神指点,发现登录状态和不登录状态下用户粉丝数所在的URL不同,请求方法也不同。

因此选择退出登录,使用不登录状态下的请求URL。

2.如何远程连接服务器

任务是获取100万个用户的粉丝数,在笔记本上跑代码效率太低,因此申请了一台服务器。之前只接触过用向日葵远程操控服务器,因此在搜索资料后,我只进行了远程服务器与pycharm的连接。后经大神指点,下载了Termius,远程操作方便又快捷。除此之外,通过nohup命令让服务器在后台运行,实现本机自由。

3.Error:429


因此需要走代理。

拿到以上代理,设置如下:

HTTP/HTTPS代理适用于网络请求,而SOCKS5代理支持更多协议和应用场景(如邮件、FTP等)
每次请求时,从代理池中随机选择一个代理服务器,使用相应的协议和认证信息发起请求。请求由代理服务器转发到目标服务器,目标服务器的响应再通过代理服务器返回给客户端。这种中转机制有效避免了IP被封禁,并且可以通过多次轮换代理来模拟多个客户端的行为。

4.爬虫效率较低

由于数据规模较大,要提升爬虫效率,可以使用多线程。典型的应用场景包括:

  • I/O密集型任务:如文件读写、网络通信等。
  • 并发处理:同时处理多个用户请求。
  • 异步编程:利用多线程进行非阻塞的异步操作。
    Python的threading模块简化了多线程编程。以下是一个简单的多线程示例:
    import threading
    import time

    # 定义一个简单的线程任务:每隔一秒打印当前线程的名字和计数器的值
    def thread_task():
    for i in range(5):
    time.sleep(1)
    print(f"Thread {threading.current_thread().name}: Count {i}")

    # 创建两个线程
    thread1 = threading.Thread(target=thread_task, name='Thread-1')
    thread2 = threading.Thread(target=thread_task, name='Thread-2')

    # 启动线程
    thread1.start()
    thread2.start()

    # 等待线程结束
    thread1.join()
    thread2.join()
    在多线程编程中,线程之间可能需要进行数据交换或通信。使用queue模块实现。
    import threading
    import time

    # 定义一个线程安全的队列
    shared_queue = queue.Queue()

    # 定义生产者任务
    def producer():
    for i in range(5):
    shared_queue.put(i)
    time.sleep(1)

    # 定义消费者任务
    def consumer():
    while True:
    data = shared_queue.get()
    if data is None:
    break
    print(f"Consumed: {data}")

    # 创建生产者和消费者线程
    producer_thread = threading.Thread(target=producer)
    consumer_thread = threading.Thread(target=consumer)

    # 启动线程
    producer_thread.start() consumer_thread.start()

    # 等待生产者线程结束
    producer_thread.join()

    # 在队列中放入结束标志,通知消费者线程结束
    shared_queue.put(None)

    # 等待消费者线程结束
    consumer_thread.join()
    多线程编程需要注意线程安全性,防止多个线程同时修改共享数据。Python提供了一些线程安全的数据结构,如queueLock等,用于解决多线程并发访问共享资源的问题。

5.进程killed

分析原因可能是该进程超出了资源限制,设置最大queue_size=10000,强制队列大小保持在一定范围内。

代码实现

# coding=utf-8
import time
import requests
import csv
import urllib3
import threading
from queue import Queue
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

# 避免输出不必要的HTTPS安全警告信息
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


# 用户名处理函数:去掉`@`符号
def sanitize_username(username):
    if username.startswith('@'):
        return username.lstrip('@')  
    elif '@' in username:
        return username.split('@')[0]  
    return username


# 创建带重试机制的请求会话
def create_session_with_retries():
    session = requests.Session()
    retries = Retry(total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
    session.mount('http://', HTTPAdapter(max_retries=retries))
    session.mount('https://', HTTPAdapter(max_retries=retries))
    return session


# 获取Instagram粉丝数量
def get_instagram_follower_count(username, proxies):
    sanitized_username = sanitize_username(username)
    url = "https://www.instagram.com/api/v1/users/web_profile_info/"
    params = {
        "username": sanitized_username
    }
   
    headers = {
        "Host": "www.instagram.com",
        "Connection": "close",
        "sec-ch-ua-mobile": "?0",
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36",
        "Accept": "*/*",
        "sec-ch-ua-platform-version": "\"12.2.1\"",
        "X-Requested-With": "XMLHttpRequest",
        "X-ASBD-ID": "129477",
        "X-CSRFToken": "aMFPLyFGuRw9PXGLAg8CXx",
        "X-IG-App-ID": "936619743392459",
        "Referer": f"https://www.instagram.com/{sanitized_username}/",
        "Accept-Encoding": "gzip, deflate",
        "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"
    }
   
    session = create_session_with_retries()
   
    time.sleep(1)  
   
    try:
   
        response = session.get(url, headers=headers, params=params, proxies=proxies, verify=False, timeout=10)
       
        if response.status_code == 200:
            try:
           
                data = response.json()
               
                if isinstance(data, dict) and data is not None:
                    if 'data' in data and 'user' in data['data'] and 'edge_followed_by' in data['data']['user']:
                   
                        follower_count = data['data']['user']['edge_followed_by']['count']
                       
                        return follower_count
                    else:
                        return None
                else:
                    return None  
            except ValueError:
                return None
        else:
            return None  
    except requests.RequestException:
        return None  


# 线程工作函数
def worker(queue, output_file, proxies, lock):
    while True:
   
        username = queue.get()
       
        if username is None:
            break
        try:
       
            result = get_instagram_follower_count(username, proxies)
           
            if result is not None:
           
                with lock:
               
                    with open(output_file, mode='a', newline='', encoding='utf-8') as outfile:
                   
                        writer = csv.writer(outfile)
                        writer.writerow([username, result])
                       
        except Exception:
            pass  
        queue.task_done()


# 读取用户名并创建线程处理
def process_usernames(input_csv, output_csv, num_threads=4, queue_size=10000):
    queue = Queue(maxsize=queue_size)
    lock = threading.Lock()
    proxies = {
        'http': ' ',
        'https': ' '
    }
   
    # Write the header to the output CSV file
    with open(output_csv, mode='w', newline='', encoding='utf-8') as outfile:
        writer = csv.writer(outfile)
        writer.writerow(['username', 'follower_counts'])

    # Start threads
    threads = []
    for _ in range(num_threads):
        thread = threading.Thread(target=worker, args=(queue, output_csv, proxies, lock))
        thread.start()
        threads.append(thread)
       
    # Read usernames from the input CSV file and process them
    with open(input_csv, mode='r', encoding='utf-8') as infile:
        reader = csv.reader(infile)
        next(reader)  # Skip the header row
        for row in reader:
            username = row[0]
            queue.put(username)

    # Stop workers
    for _ in range(num_threads):
        queue.put(None)  # Add a None for each worker to signal them to exit

    # Wait for all threads to finish
    for thread in threads:
        thread.join()
       
    print(f"Results have been written to {output_csv}")

if __name__ == "__main__":

    input_csv = 'instagram.csv'
    output_csv = 'output.csv'
    process_usernames(input_csv, output_csv)