Поиск в Stack Overflow и GitHub кода на указанном языке

Этот код предназначен для очистки Stack Overflow и GitHub, извлечения информации на основе указанного пользователем языка программирования и обработки данных в формате для обучения ИИ.

Он использует ряд методов, таких как кэширование, прокси и регулирование, чтобы предотвратить перегрузку веб-сайтов и блокировку. Он также использует многопоточность, чтобы ускорить задачу и сделать программу более эффективной.

Он выводит данные в виде файлов JSON и CSV в указанную пользователем папку.

import requests
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
import re
import threading
from user_agent import generate_user_agent
import time
import os
import sys
import pymongo
import argparse
import random
import json
import hashlib
import pickle
import argparse
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer, PorterStemmer

# Define the user agent pool for the requests
user_agent_pool = [generate_user_agent() for _ in range(100)]

parser = argparse.ArgumentParser(description='Web scraper for Github and Stack Overflow')
parser.add_argument('--language', type=str, help='Programming language to scrape for', required=True)
parser.add_argument('--pages', type=int, help='Number of pages to scrape for', default=10)
parser.add_argument('--output_dir', type=str, help='Directory to save output file', default="output")
args = parser.parse_args()

# Check if output directory exists, create it if it doesn't
if not os.path.exists(args.output_dir):
    os.makedirs(args.output_dir)

def get_random_user_agent():
    """
    This function generates a random user agent string to be used for the requests header.
    """
    user_agent = UserAgent()
    return user_agent.random

def get_proxies():
    proxy_url = "https://api.getproxylist.com/proxy"
    proxy_list = []
    for i in range(5):  # Generate 5 proxies
        try:
            response = requests.get(proxy_url)
            proxy_data = response.json()
            proxy = {
                "http": "http://" + proxy_data["ip"] + ":" + str(proxy_data["port"]),
                "https": "https://" + proxy_data["ip"] + ":" + str(proxy_data["port"])
            }
            proxy_list.append(proxy)
        except:
            pass
    return proxy_list

def get_random_proxy(proxy_list):
    return random.choice(proxy_list)

def scrape_with_threads(data_list, scrape_func, num_threads=5):
    """
    Scrape data using multiple threads.

    :param data_list: List of data to be scraped
    :param scrape_func: Function to be used for scraping
    :param num_threads: Number of threads to use
    :return: List of scraped data
    """
    results = []
    threads = []

    # Define worker function
    def worker():
        while True:
            try:
                # Get data from the queue
                data = data_list.pop()
            except IndexError:
                # Queue is empty, thread can exit
                return

            # Scrape data using the given function
            result = scrape_func(data)

            # Add result to the results list
            results.append(result)

    # Start threads
    for i in range(num_threads):
        t = threading.Thread(target=worker)
        t.start()
        threads.append

    # Wait for threads to finish
    for t in threads:
        t.join()

    return results


def scrape_github_data(language, num_pages, proxy_pool, user_agent_pool):
    # Initialize the temporary data frame
    df_temp = pd.DataFrame(columns=['Language', 'Title', 'Link', 'Content'])

    # Define the base URL for Github search
    base_url = "https://github.com/search?p={page}&q=language%3A{language}&type=Repositories"

    # Iterate over the specified number of pages
    for i in range(1, num_pages+1):
        # Define the URL for the current page
        url = base_url.format(page=i, language=language)

        # Select a random user agent from the pool
        user_agent = random.choice(user_agent_pool)

        # Select a random proxy from the pool
        proxy = random.choice(proxy_pool)

        # Define the headers for the request
        headers = {'User-Agent': user_agent}

        try:
            # Send a GET request to the URL with the headers and proxy
            response = requests.get(url, headers=headers, proxies={"http": proxy, "https": proxy})

            # Parse the HTML content with BeautifulSoup
            soup = BeautifulSoup(response.content, 'html.parser')

            # Find all the search result items
            items = soup.find_all('li', class_='repo-list-item hx_hit-repo d-flex flex-justify-start py-4 public source')

            # Iterate over the search result items
            for item in items:
                # Get the title and link for the item
                title = item.find('a', class_='v-align-middle').text.strip()
                link = "https://github.com" + item.find('a', class_='v-align-middle')['href']

                # Get the content for the item
                content = item.find('p', class_='col-9 text-gray my-1 pr-4').text.strip()

                # Append the data to the temporary data frame
                df_temp = df_temp.append({'Language': language, 'Title': title, 'Link': link, 'Content': content}, ignore_index=True)

            # Wait for a random time to avoid being detected and blocked
            time.sleep(random.uniform(1, 3))

        except:
            # Print an error message if there is an issue with the request
            print(f"Error scraping page {i} of {num_pages} for {language} from Github")
            continue

    # Return the temporary data frame
    return df_temp

def scrape_stackoverflow(language, pages, proxies):
    base_url="https://stackoverflow.com/questions/tagged/{}?page={}&sort=votes&pagesize=50"
    data = []
    for i in range(1, pages+1):
        url = base_url.format(language, i)
        headers = {'User-Agent': get_random_user_agent()}
        try:
            response = requests.get(url, headers=headers, proxies=proxies)
            soup = BeautifulSoup(response.content, 'html.parser')
            questions = soup.find_all('div', {'class': 'question-summary'})
            for q in questions:
                title = q.find('a', {'class': 'question-hyperlink'}).text.strip()
                votes = q.find('div', {'class': 'votes'}).text.strip()
                answers = q.find('div', {'class': 'status answered'}).find('strong').text.strip()
                tags = [t.text.strip() for t in q.find_all('a', {'class': 'post-tag'})]
                code_snippets = []
                for answer in q.find_all('div', {'class': 'answer'}):
                    code = answer.find('code')
                    if code:
                        code_snippets.append(code.text.strip())
                data.append({'title': title, 'votes': votes, 'answers': answers, 'tags': tags, 'code_snippets': code_snippets})
        except Exception as e:
            print(f'Error while scraping {url}: {e}')
        time.sleep(random.uniform(1, 3))
    df = pd.DataFrame(data)
    return df

def handle_edge_cases(language, data):
    """Handle edge cases for rare programming languages or unusual code structures"""
    if language == 'Prolog':
        # Prolog code is often organized in a tree structure, so we'll flatten it to make it easier to process
        data = flatten_prolog_code(data)
    elif language == 'Assembly':
        # Assembly code often has a lot of special characters, so we'll remove them for better readability
        data = remove_special_characters(data)
    return data

def handle_exceptions(e):
    """Handle unexpected errors or exceptions"""
    print(f"An error occurred: {str(e)}")
    # Optionally, log the error for future reference
    # logging.error(f"An error occurred: {str(e)}")
    # Return None so the program can continue running without crashing
    return None


CACHE_DIR = "cache"

def cache_request(url, params=None):
    """
    Check if a response for a given request has been cached.
    If it has, return the cached response. Otherwise, send the request and cache the response.
    """
    if not os.path.exists(CACHE_DIR):
        os.mkdir(CACHE_DIR)

    # Create a unique filename for this request
    request_key = hashlib.md5(pickle.dumps((url, params))).hexdigest()
    cache_file = os.path.join(CACHE_DIR, request_key)

    # Check if we have a cached response
    if os.path.exists(cache_file):
        with open(cache_file, "rb") as f:
            response, timestamp = pickle.load(f)
        # Check if the cached response has expired (1 hour by default)
        if time.time() - timestamp < 3600:
            return response

    # If we don't have a cached response, send the request and cache the response
    response = requests.get(url, params=params)
    with open(cache_file, "wb") as f:
        pickle.dump((response, time.time()), f)

    return response

#Define a function to manage database

class DBManager:
    def __init__(self, dbname, collection_name):
        self.client = pymongo.MongoClient()
        self.dbname = dbname
        self.collection_name = collection_name
        self.db = self.client[dbname]
        self.collection = self.db[collection_name]
        
    def insert_many(self, data):
        if data:
            try:
                result = self.collection.insert_many(data)
                return result
            except pymongo.errors.BulkWriteError as e:
                print(f"Failed to insert data into database: {e.details}")
        else:
            print("No data to insert.")
            
    def find_one(self, query):
        return self.collection.find_one(query)
    
    def find_many(self, query):
        return self.collection.find(query)
    
    def delete_many(self, query):
        return self.collection.delete_many(query)


def clean_data(data):
    stop_words = set(stopwords.words('english'))
    lemmatizer = WordNetLemmatizer()
    stemmer = PorterStemmer()
    clean_text = []
    for text in data:
        # remove unwanted characters and numbers
        text = re.sub('[^a-zA-Z]', ' ', text)
        # convert to lowercase
        text = text.lower()
        # tokenization
        tokens = word_tokenize(text)
        # remove stopwords
        tokens = [word for word in tokens if not word in stop_words]
        # stemming
        stemmed_tokens = [stemmer.stem(token) for token in tokens]
        # lemmatization
        lemmatized_tokens = [lemmatizer.lemmatize(token) for token in stemmed_tokens]
        # join tokens to form cleaned text
        cleaned_text=" ".join(lemmatized_tokens)
        clean_text.append(cleaned_text)
    return clean_text

def split_data(data, train_ratio, val_ratio, test_ratio, output_dir):
    """
    Splits the preprocessed data into training, validation, and test sets.
    :param data: The preprocessed data to split.
    :param train_ratio: The ratio of the data to use for training.
    :param val_ratio: The ratio of the data to use for validation.
    :param test_ratio: The ratio of the data to use for testing.
    :param output_dir: The directory to save the resulting data sets.
    :return: The resulting training, validation, and test data sets.
    """
    # Shuffle the data
    data = data.sample(frac=1, random_state=42)

    # Calculate the sizes of each set
    num_samples = len(data)
    num_train = int(num_samples * train_ratio)
    num_val = int(num_samples * val_ratio)
    num_test = num_samples - num_train - num_val

    # Split the data
    train_data = data[:num_train]
    val_data = data[num_train:num_train + num_val]
    test_data = data[num_train + num_val:]

    # Save the data sets to disk
    os.makedirs(output_dir, exist_ok=True)
    train_data.to_csv(os.path.join(output_dir, 'train.csv'), index=False)
    val_data.to_csv(os.path.join(output_dir, 'val.csv'), index=False)
    test_data.to_csv(os.path.join(output_dir, 'test.csv'), index=False)

    return train_data, val_data, test_data

def main():
    # Define command-line arguments
    parser = argparse.ArgumentParser(description="Scrape code snippets and other beneficial data for a specified programming language from GitHub and Stack Overflow.")
    parser.add_argument("--language", type=str, required=True, help="the programming language to scrape for")
    parser.add_argument("--pages", type=int, default=5, help="the number of pages to scrape for each website")
    parser.add_argument("--output_dir", type=str, required=True, help="the directory to save the output files")
    parser.add_argument("--train_ratio", type=float, default=0.8, help="the ratio of data to use for training")
    parser.add_argument("--valid_ratio", type=float, default=0.1, help="the ratio of data to use for validation")
    parser.add_argument("--test_ratio", type=float, default=0.1, help="the ratio of data to use for testing")

    args = parser.parse_args()

    language = args.language
    pages = args.pages
    output_dir = args.output_dir
    train_ratio = args.train_ratio
    valid_ratio = args.valid_ratio
    test_ratio = args.test_ratio

    # Define the user agent pool for the requests
    user_agent_pool = UserAgent()

    # Define the proxy pool
    proxy_pool = get_proxies()

    # Scrape data from GitHub and Stack Overflow
    github_data = scrape_github(language, pages, user_agent_pool, proxy_pool)
    stackoverflow_data = scrape_stackoverflow(language, pages, user_agent_pool, proxy_pool)

    # Merge and preprocess the scraped data
    merged_data = merge_data(github_data, stackoverflow_data)
    preprocessed_data = preprocess_data(merged_data)

    # Split the preprocessed data into training, validation, and test sets
    train_data, valid_data, test_data = split_data(preprocessed_data, train_ratio, valid_ratio, test_ratio)

    # Save the resulting data sets to the specified directory
    save_data(train_data, output_dir, "train")
    save_data(valid_data, output_dir, "valid")
    save_data(test_data, output_dir, "test")

    # Save final processed data file as a csv and Json to a user specified folder
    final_data = pd.concat([train_data, valid_data, test_data])
    final_data.to_csv(os.path.join(output_dir, "final_data.csv"), index=False)
    final_data.to_json(os.path.join(output_dir, "final_data.json"), orient="records")```

 

0

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *