Этот код предназначен для очистки Stack Overflow и GitHub, извлечения информации на основе указанного пользователем языка программирования и обработки данных в формате для обучения ИИ.
Он использует ряд методов, таких как кэширование, прокси и регулирование, чтобы предотвратить перегрузку веб-сайтов и блокировку. Он также использует многопоточность, чтобы ускорить задачу и сделать программу более эффективной.
Он выводит данные в виде файлов JSON и CSV в указанную пользователем папку.
import requests
from bs4 import BeautifulSoup
import pandas as pd
import numpy as np
import re
import threading
from user_agent import generate_user_agent
import time
import os
import sys
import pymongo
import argparse
import random
import json
import hashlib
import pickle
import argparse
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer, PorterStemmer
# Define the user agent pool for the requests
user_agent_pool = [generate_user_agent() for _ in range(100)]
parser = argparse.ArgumentParser(description='Web scraper for Github and Stack Overflow')
parser.add_argument('--language', type=str, help='Programming language to scrape for', required=True)
parser.add_argument('--pages', type=int, help='Number of pages to scrape for', default=10)
parser.add_argument('--output_dir', type=str, help='Directory to save output file', default="output")
args = parser.parse_args()
# Check if output directory exists, create it if it doesn't
if not os.path.exists(args.output_dir):
os.makedirs(args.output_dir)
def get_random_user_agent():
"""
This function generates a random user agent string to be used for the requests header.
"""
user_agent = UserAgent()
return user_agent.random
def get_proxies():
proxy_url = "https://api.getproxylist.com/proxy"
proxy_list = []
for i in range(5): # Generate 5 proxies
try:
response = requests.get(proxy_url)
proxy_data = response.json()
proxy = {
"http": "http://" + proxy_data["ip"] + ":" + str(proxy_data["port"]),
"https": "https://" + proxy_data["ip"] + ":" + str(proxy_data["port"])
}
proxy_list.append(proxy)
except:
pass
return proxy_list
def get_random_proxy(proxy_list):
return random.choice(proxy_list)
def scrape_with_threads(data_list, scrape_func, num_threads=5):
"""
Scrape data using multiple threads.
:param data_list: List of data to be scraped
:param scrape_func: Function to be used for scraping
:param num_threads: Number of threads to use
:return: List of scraped data
"""
results = []
threads = []
# Define worker function
def worker():
while True:
try:
# Get data from the queue
data = data_list.pop()
except IndexError:
# Queue is empty, thread can exit
return
# Scrape data using the given function
result = scrape_func(data)
# Add result to the results list
results.append(result)
# Start threads
for i in range(num_threads):
t = threading.Thread(target=worker)
t.start()
threads.append
# Wait for threads to finish
for t in threads:
t.join()
return results
def scrape_github_data(language, num_pages, proxy_pool, user_agent_pool):
# Initialize the temporary data frame
df_temp = pd.DataFrame(columns=['Language', 'Title', 'Link', 'Content'])
# Define the base URL for Github search
base_url = "https://github.com/search?p={page}&q=language%3A{language}&type=Repositories"
# Iterate over the specified number of pages
for i in range(1, num_pages+1):
# Define the URL for the current page
url = base_url.format(page=i, language=language)
# Select a random user agent from the pool
user_agent = random.choice(user_agent_pool)
# Select a random proxy from the pool
proxy = random.choice(proxy_pool)
# Define the headers for the request
headers = {'User-Agent': user_agent}
try:
# Send a GET request to the URL with the headers and proxy
response = requests.get(url, headers=headers, proxies={"http": proxy, "https": proxy})
# Parse the HTML content with BeautifulSoup
soup = BeautifulSoup(response.content, 'html.parser')
# Find all the search result items
items = soup.find_all('li', class_='repo-list-item hx_hit-repo d-flex flex-justify-start py-4 public source')
# Iterate over the search result items
for item in items:
# Get the title and link for the item
title = item.find('a', class_='v-align-middle').text.strip()
link = "https://github.com" + item.find('a', class_='v-align-middle')['href']
# Get the content for the item
content = item.find('p', class_='col-9 text-gray my-1 pr-4').text.strip()
# Append the data to the temporary data frame
df_temp = df_temp.append({'Language': language, 'Title': title, 'Link': link, 'Content': content}, ignore_index=True)
# Wait for a random time to avoid being detected and blocked
time.sleep(random.uniform(1, 3))
except:
# Print an error message if there is an issue with the request
print(f"Error scraping page {i} of {num_pages} for {language} from Github")
continue
# Return the temporary data frame
return df_temp
def scrape_stackoverflow(language, pages, proxies):
base_url="https://stackoverflow.com/questions/tagged/{}?page={}&sort=votes&pagesize=50"
data = []
for i in range(1, pages+1):
url = base_url.format(language, i)
headers = {'User-Agent': get_random_user_agent()}
try:
response = requests.get(url, headers=headers, proxies=proxies)
soup = BeautifulSoup(response.content, 'html.parser')
questions = soup.find_all('div', {'class': 'question-summary'})
for q in questions:
title = q.find('a', {'class': 'question-hyperlink'}).text.strip()
votes = q.find('div', {'class': 'votes'}).text.strip()
answers = q.find('div', {'class': 'status answered'}).find('strong').text.strip()
tags = [t.text.strip() for t in q.find_all('a', {'class': 'post-tag'})]
code_snippets = []
for answer in q.find_all('div', {'class': 'answer'}):
code = answer.find('code')
if code:
code_snippets.append(code.text.strip())
data.append({'title': title, 'votes': votes, 'answers': answers, 'tags': tags, 'code_snippets': code_snippets})
except Exception as e:
print(f'Error while scraping {url}: {e}')
time.sleep(random.uniform(1, 3))
df = pd.DataFrame(data)
return df
def handle_edge_cases(language, data):
"""Handle edge cases for rare programming languages or unusual code structures"""
if language == 'Prolog':
# Prolog code is often organized in a tree structure, so we'll flatten it to make it easier to process
data = flatten_prolog_code(data)
elif language == 'Assembly':
# Assembly code often has a lot of special characters, so we'll remove them for better readability
data = remove_special_characters(data)
return data
def handle_exceptions(e):
"""Handle unexpected errors or exceptions"""
print(f"An error occurred: {str(e)}")
# Optionally, log the error for future reference
# logging.error(f"An error occurred: {str(e)}")
# Return None so the program can continue running without crashing
return None
CACHE_DIR = "cache"
def cache_request(url, params=None):
"""
Check if a response for a given request has been cached.
If it has, return the cached response. Otherwise, send the request and cache the response.
"""
if not os.path.exists(CACHE_DIR):
os.mkdir(CACHE_DIR)
# Create a unique filename for this request
request_key = hashlib.md5(pickle.dumps((url, params))).hexdigest()
cache_file = os.path.join(CACHE_DIR, request_key)
# Check if we have a cached response
if os.path.exists(cache_file):
with open(cache_file, "rb") as f:
response, timestamp = pickle.load(f)
# Check if the cached response has expired (1 hour by default)
if time.time() - timestamp < 3600:
return response
# If we don't have a cached response, send the request and cache the response
response = requests.get(url, params=params)
with open(cache_file, "wb") as f:
pickle.dump((response, time.time()), f)
return response
#Define a function to manage database
class DBManager:
def __init__(self, dbname, collection_name):
self.client = pymongo.MongoClient()
self.dbname = dbname
self.collection_name = collection_name
self.db = self.client[dbname]
self.collection = self.db[collection_name]
def insert_many(self, data):
if data:
try:
result = self.collection.insert_many(data)
return result
except pymongo.errors.BulkWriteError as e:
print(f"Failed to insert data into database: {e.details}")
else:
print("No data to insert.")
def find_one(self, query):
return self.collection.find_one(query)
def find_many(self, query):
return self.collection.find(query)
def delete_many(self, query):
return self.collection.delete_many(query)
def clean_data(data):
stop_words = set(stopwords.words('english'))
lemmatizer = WordNetLemmatizer()
stemmer = PorterStemmer()
clean_text = []
for text in data:
# remove unwanted characters and numbers
text = re.sub('[^a-zA-Z]', ' ', text)
# convert to lowercase
text = text.lower()
# tokenization
tokens = word_tokenize(text)
# remove stopwords
tokens = [word for word in tokens if not word in stop_words]
# stemming
stemmed_tokens = [stemmer.stem(token) for token in tokens]
# lemmatization
lemmatized_tokens = [lemmatizer.lemmatize(token) for token in stemmed_tokens]
# join tokens to form cleaned text
cleaned_text=" ".join(lemmatized_tokens)
clean_text.append(cleaned_text)
return clean_text
def split_data(data, train_ratio, val_ratio, test_ratio, output_dir):
"""
Splits the preprocessed data into training, validation, and test sets.
:param data: The preprocessed data to split.
:param train_ratio: The ratio of the data to use for training.
:param val_ratio: The ratio of the data to use for validation.
:param test_ratio: The ratio of the data to use for testing.
:param output_dir: The directory to save the resulting data sets.
:return: The resulting training, validation, and test data sets.
"""
# Shuffle the data
data = data.sample(frac=1, random_state=42)
# Calculate the sizes of each set
num_samples = len(data)
num_train = int(num_samples * train_ratio)
num_val = int(num_samples * val_ratio)
num_test = num_samples - num_train - num_val
# Split the data
train_data = data[:num_train]
val_data = data[num_train:num_train + num_val]
test_data = data[num_train + num_val:]
# Save the data sets to disk
os.makedirs(output_dir, exist_ok=True)
train_data.to_csv(os.path.join(output_dir, 'train.csv'), index=False)
val_data.to_csv(os.path.join(output_dir, 'val.csv'), index=False)
test_data.to_csv(os.path.join(output_dir, 'test.csv'), index=False)
return train_data, val_data, test_data
def main():
# Define command-line arguments
parser = argparse.ArgumentParser(description="Scrape code snippets and other beneficial data for a specified programming language from GitHub and Stack Overflow.")
parser.add_argument("--language", type=str, required=True, help="the programming language to scrape for")
parser.add_argument("--pages", type=int, default=5, help="the number of pages to scrape for each website")
parser.add_argument("--output_dir", type=str, required=True, help="the directory to save the output files")
parser.add_argument("--train_ratio", type=float, default=0.8, help="the ratio of data to use for training")
parser.add_argument("--valid_ratio", type=float, default=0.1, help="the ratio of data to use for validation")
parser.add_argument("--test_ratio", type=float, default=0.1, help="the ratio of data to use for testing")
args = parser.parse_args()
language = args.language
pages = args.pages
output_dir = args.output_dir
train_ratio = args.train_ratio
valid_ratio = args.valid_ratio
test_ratio = args.test_ratio
# Define the user agent pool for the requests
user_agent_pool = UserAgent()
# Define the proxy pool
proxy_pool = get_proxies()
# Scrape data from GitHub and Stack Overflow
github_data = scrape_github(language, pages, user_agent_pool, proxy_pool)
stackoverflow_data = scrape_stackoverflow(language, pages, user_agent_pool, proxy_pool)
# Merge and preprocess the scraped data
merged_data = merge_data(github_data, stackoverflow_data)
preprocessed_data = preprocess_data(merged_data)
# Split the preprocessed data into training, validation, and test sets
train_data, valid_data, test_data = split_data(preprocessed_data, train_ratio, valid_ratio, test_ratio)
# Save the resulting data sets to the specified directory
save_data(train_data, output_dir, "train")
save_data(valid_data, output_dir, "valid")
save_data(test_data, output_dir, "test")
# Save final processed data file as a csv and Json to a user specified folder
final_data = pd.concat([train_data, valid_data, test_data])
final_data.to_csv(os.path.join(output_dir, "final_data.csv"), index=False)
final_data.to_json(os.path.join(output_dir, "final_data.json"), orient="records")```
Роберт3737