Files
stock-info-crawler/enhanced_crawler.py
2025-09-03 16:47:02 +08:00

348 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
from bs4 import BeautifulSoup
import time
import json
import hashlib
from datetime import datetime
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import logging
import os
import schedule
from flask import Flask, jsonify
import threading
import signal
import sys
class EnhancedBarronsCrawler:
def __init__(self):
self.url = "https://www.barrons.com/market-data/stocks/stock-picks?mod=BOL_TOPNAV"
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
self.data_file = "/app/data/barrons_data.json"
self.running = True
# 從環境變數讀取設定
self.check_interval = int(os.getenv('CHECK_INTERVAL', 300))
self.email_config = self.load_email_config()
self.webhook_url = os.getenv('WEBHOOK_URL')
self.discord_webhook = os.getenv('DISCORD_WEBHOOK')
# 設定日誌
log_level = os.getenv('LOG_LEVEL', 'INFO')
logging.basicConfig(
level=getattr(logging, log_level),
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/app/logs/barrons_crawler.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
# 統計資料
self.stats = {
'start_time': datetime.now().isoformat(),
'total_checks': 0,
'new_picks_found': 0,
'last_check': None,
'last_notification': None,
'errors': 0
}
def load_email_config(self):
"""從環境變數載入電子郵件設定"""
if all(os.getenv(key) for key in ['EMAIL_SMTP_SERVER', 'EMAIL_FROM', 'EMAIL_TO', 'EMAIL_USERNAME', 'EMAIL_PASSWORD']):
return {
'smtp_server': os.getenv('EMAIL_SMTP_SERVER'),
'smtp_port': int(os.getenv('EMAIL_SMTP_PORT', 587)),
'from_email': os.getenv('EMAIL_FROM'),
'to_email': os.getenv('EMAIL_TO'),
'username': os.getenv('EMAIL_USERNAME'),
'password': os.getenv('EMAIL_PASSWORD')
}
return None
def fetch_page(self):
"""獲取網頁內容"""
try:
response = requests.get(self.url, headers=self.headers, timeout=30)
response.raise_for_status()
return response.text
except requests.RequestException as e:
self.logger.error(f"獲取網頁失敗: {e}")
self.stats['errors'] += 1
return None
def parse_stock_picks(self, html_content):
"""解析股票推薦內容"""
soup = BeautifulSoup(html_content, 'html.parser')
stock_picks = []
try:
# 多種選擇器策略
selectors = [
'article[data-module="ArticleItem"]',
'.WSJTheme--headline',
'.MarketDataModule-headline',
'h3 a, h4 a',
'[data-module] a[href*="articles"]'
]
for selector in selectors:
elements = soup.select(selector)
if elements:
self.logger.info(f"使用選擇器找到內容: {selector}")
break
for element in elements[:10]: # 限制最多10個
title = element.get_text(strip=True) if element.name != 'a' else element.get_text(strip=True)
link = element.get('href') if element.name == 'a' else element.find('a', href=True)
if isinstance(link, dict):
link = link.get('href')
elif hasattr(link, 'get'):
link = link.get('href')
if link and link.startswith('/'):
link = "https://www.barrons.com" + link
if title and len(title) > 10: # 過濾太短的標題
stock_picks.append({
'title': title,
'link': link,
'scraped_at': datetime.now().isoformat(),
'hash': hashlib.md5(title.encode()).hexdigest()[:8]
})
return stock_picks
except Exception as e:
self.logger.error(f"解析網頁內容失敗: {e}")
self.stats['errors'] += 1
return []
def load_previous_data(self):
"""載入之前的資料"""
try:
with open(self.data_file, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
return {'content_hash': None, 'last_update': None, 'stock_picks': []}
def save_data(self, data):
"""儲存資料"""
try:
os.makedirs(os.path.dirname(self.data_file), exist_ok=True)
with open(self.data_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
except Exception as e:
self.logger.error(f"儲存資料失敗: {e}")
self.stats['errors'] += 1
def send_notifications(self, new_picks):
"""發送各種通知"""
notification_sent = False
# 電子郵件通知
if self.email_config:
try:
self.send_email_notification(new_picks)
notification_sent = True
except Exception as e:
self.logger.error(f"電子郵件通知失敗: {e}")
# Slack/Teams Webhook
if self.webhook_url:
try:
self.send_webhook_notification(new_picks)
notification_sent = True
except Exception as e:
self.logger.error(f"Webhook 通知失敗: {e}")
# Discord Webhook
if self.discord_webhook:
try:
self.send_discord_notification(new_picks)
notification_sent = True
except Exception as e:
self.logger.error(f"Discord 通知失敗: {e}")
if notification_sent:
self.stats['last_notification'] = datetime.now().isoformat()
def send_email_notification(self, new_picks):
"""發送電子郵件通知"""
msg = MIMEMultipart()
msg['From'] = self.email_config['from_email']
msg['To'] = self.email_config['to_email']
msg['Subject'] = f"📈 Barron's 新股票推薦 ({len(new_picks)}條)"
body = f"發現 {len(new_picks)} 條新的股票推薦:\n\n"
for pick in new_picks:
body += f"📊 {pick['title']}\n"
if pick.get('link'):
body += f"🔗 {pick['link']}\n"
body += f"🕒 {pick['scraped_at']}\n"
body += "-" * 60 + "\n"
msg.attach(MIMEText(body, 'plain', 'utf-8'))
server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port'])
server.starttls()
server.login(self.email_config['username'], self.email_config['password'])
server.send_message(msg)
server.quit()
def send_webhook_notification(self, new_picks):
"""發送 Webhook 通知Slack/Teams"""
message = f"🚨 發現 {len(new_picks)} 條新的 Barron's 股票推薦!\n\n"
for pick in new_picks[:5]: # 限制5條避免訊息太長
message += f"📊 {pick['title']}\n"
if pick.get('link'):
message += f"🔗 {pick['link']}\n"
payload = {"text": message}
requests.post(self.webhook_url, json=payload)
def send_discord_notification(self, new_picks):
"""發送 Discord 通知"""
embed = {
"title": f"📈 Barron's 新股票推薦",
"description": f"發現 {len(new_picks)} 條新推薦",
"color": 0x00ff00,
"fields": []
}
for pick in new_picks[:5]:
embed["fields"].append({
"name": pick['title'][:256],
"value": pick.get('link', '無連結')[:1024],
"inline": False
})
payload = {"embeds": [embed]}
requests.post(self.discord_webhook, json=payload)
def find_new_picks(self, current_picks, previous_picks):
"""找出新的股票推薦"""
previous_hashes = {pick['hash'] for pick in previous_picks if 'hash' in pick}
return [pick for pick in current_picks if pick['hash'] not in previous_hashes]
def run_check(self):
"""執行一次檢查"""
self.logger.info("開始檢查 Barron's 股票推薦...")
self.stats['total_checks'] += 1
self.stats['last_check'] = datetime.now().isoformat()
try:
# 獲取和解析內容
html_content = self.fetch_page()
if not html_content:
return
current_picks = self.parse_stock_picks(html_content)
if not current_picks:
self.logger.warning("未找到股票推薦內容")
return
# 載入之前的資料
previous_data = self.load_previous_data()
previous_picks = previous_data.get('stock_picks', [])
# 檢查新內容
new_picks = self.find_new_picks(current_picks, previous_picks)
if new_picks:
self.logger.info(f"🚨 發現 {len(new_picks)} 條新推薦")
self.stats['new_picks_found'] += len(new_picks)
# 發送通知
self.send_notifications(new_picks)
# 儲存資料
new_data = {
'last_update': datetime.now().isoformat(),
'stock_picks': current_picks,
'stats': self.stats
}
self.save_data(new_data)
return new_picks
else:
self.logger.info("✅ 沒有發現新內容")
return []
except Exception as e:
self.logger.error(f"檢查過程中發生錯誤: {e}")
self.stats['errors'] += 1
return None
def signal_handler(self, signum, frame):
"""處理停止信號"""
self.logger.info("收到停止信號,正在關閉...")
self.running = False
def run(self):
"""主運行循環"""
# 註冊信號處理
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
# 使用 schedule 庫進行調度
schedule.every(self.check_interval).seconds.do(self.run_check)
self.logger.info(f"🚀 爬蟲已啟動,每 {self.check_interval} 秒檢查一次")
# 立即執行一次檢查
self.run_check()
while self.running:
schedule.run_pending()
time.sleep(1)
self.logger.info("爬蟲已停止")
# Flask Web API
app = Flask(__name__)
crawler_instance = None
@app.route('/health')
def health_check():
"""健康檢查端點"""
return jsonify({"status": "healthy", "timestamp": datetime.now().isoformat()})
@app.route('/stats')
def get_stats():
"""獲取統計資料"""
if crawler_instance:
return jsonify(crawler_instance.stats)
return jsonify({"error": "Crawler not initialized"})
@app.route('/check')
def manual_check():
"""手動觸發檢查"""
if crawler_instance:
result = crawler_instance.run_check()
return jsonify({"result": f"Found {len(result) if result else 0} new picks"})
return jsonify({"error": "Crawler not initialized"})
def run_flask_app():
"""運行 Flask 應用"""
app.run(host='0.0.0.0', port=8080, debug=False)
if __name__ == "__main__":
# 創建爬蟲實例
crawler_instance = EnhancedBarronsCrawler()
# 在背景執行 Flask API
flask_thread = threading.Thread(target=run_flask_app, daemon=True)
flask_thread.start()
# 運行主爬蟲
crawler_instance.run()