import requests from bs4 import BeautifulSoup import time import json import hashlib from datetime import datetime import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import logging import os import schedule from flask import Flask, jsonify, request import threading import signal import sys class EnhancedBarronsCrawler: def __init__(self): self.url = "https://www.barrons.com/market-data/stocks/stock-picks?mod=BOL_TOPNAV" self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } self.data_file = "/app/data/barrons_data.json" self.running = True # 從環境變數讀取設定 self.check_interval = int(os.getenv('CHECK_INTERVAL', 300)) self.email_config = self.load_email_config() self.webhook_url = os.getenv('WEBHOOK_URL') self.discord_webhook = os.getenv('DISCORD_WEBHOOK') # 啟動時是否強制寄出一次目前內容 self.always_notify_on_startup = os.getenv('ALWAYS_NOTIFY_ON_STARTUP', 'false').lower() in ('1', 'true', 'yes') self._first_check_done = False # 設定日誌 log_level = os.getenv('LOG_LEVEL', 'INFO') logging.basicConfig( level=getattr(logging, log_level), format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('/app/logs/barrons_crawler.log'), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) # 統計資料 self.stats = { 'start_time': datetime.now().isoformat(), 'total_checks': 0, 'new_picks_found': 0, 'last_check': None, 'last_notification': None, 'errors': 0 } def load_email_config(self): """從環境變數載入電子郵件設定""" if all(os.getenv(key) for key in ['EMAIL_SMTP_SERVER', 'EMAIL_FROM', 'EMAIL_TO', 'EMAIL_USERNAME', 'EMAIL_PASSWORD']): security = os.getenv('EMAIL_SMTP_SECURITY', 'starttls').lower() # 根據安全機制推導預設連接埠 default_port = 465 if security == 'ssl' else 587 if security == 'starttls' else 25 smtp_port = int(os.getenv('EMAIL_SMTP_PORT', default_port)) return { 'smtp_server': os.getenv('EMAIL_SMTP_SERVER'), 'smtp_port': smtp_port, 'smtp_security': security, # 'ssl' | 'starttls' | 'none' 'from_email': os.getenv('EMAIL_FROM'), 'to_email': os.getenv('EMAIL_TO'), 'username': os.getenv('EMAIL_USERNAME'), 'password': os.getenv('EMAIL_PASSWORD') } return None def fetch_page(self): """獲取網頁內容""" try: response = requests.get(self.url, headers=self.headers, timeout=30) response.raise_for_status() return response.text except requests.RequestException as e: self.logger.error(f"獲取網頁失敗: {e}") self.stats['errors'] += 1 return None def parse_stock_picks(self, html_content): """解析股票推薦內容""" soup = BeautifulSoup(html_content, 'html.parser') stock_picks = [] try: # 多種選擇器策略 selectors = [ 'article[data-module="ArticleItem"]', '.WSJTheme--headline', '.MarketDataModule-headline', 'h3 a, h4 a', '[data-module] a[href*="articles"]' ] for selector in selectors: elements = soup.select(selector) if elements: self.logger.info(f"使用選擇器找到內容: {selector}") break for element in elements[:10]: # 限制最多10個 title = element.get_text(strip=True) if element.name != 'a' else element.get_text(strip=True) link = element.get('href') if element.name == 'a' else element.find('a', href=True) if isinstance(link, dict): link = link.get('href') elif hasattr(link, 'get'): link = link.get('href') if link and link.startswith('/'): link = "https://www.barrons.com" + link if title and len(title) > 10: # 過濾太短的標題 stock_picks.append({ 'title': title, 'link': link, 'scraped_at': datetime.now().isoformat(), 'hash': hashlib.md5(title.encode()).hexdigest()[:8] }) return stock_picks except Exception as e: self.logger.error(f"解析網頁內容失敗: {e}") self.stats['errors'] += 1 return [] def load_previous_data(self): """載入之前的資料""" try: with open(self.data_file, 'r', encoding='utf-8') as f: return json.load(f) except FileNotFoundError: return {'content_hash': None, 'last_update': None, 'stock_picks': []} def save_data(self, data): """儲存資料""" try: os.makedirs(os.path.dirname(self.data_file), exist_ok=True) with open(self.data_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) except Exception as e: self.logger.error(f"儲存資料失敗: {e}") self.stats['errors'] += 1 def send_notifications(self, new_picks): """發送各種通知""" notification_sent = False # 電子郵件通知 if self.email_config: try: self.send_email_notification(new_picks) notification_sent = True except Exception as e: self.logger.error(f"電子郵件通知失敗: {e}") # Slack/Teams Webhook if self.webhook_url: try: self.send_webhook_notification(new_picks) notification_sent = True except Exception as e: self.logger.error(f"Webhook 通知失敗: {e}") # Discord Webhook if self.discord_webhook: try: self.send_discord_notification(new_picks) notification_sent = True except Exception as e: self.logger.error(f"Discord 通知失敗: {e}") if notification_sent: self.stats['last_notification'] = datetime.now().isoformat() def send_email_notification(self, new_picks): """發送電子郵件通知""" msg = MIMEMultipart() msg['From'] = self.email_config['from_email'] msg['To'] = self.email_config['to_email'] msg['Subject'] = f"📈 Barron's 新股票推薦 ({len(new_picks)}條)" body = f"發現 {len(new_picks)} 條新的股票推薦:\n\n" for pick in new_picks: body += f"📊 {pick['title']}\n" if pick.get('link'): body += f"🔗 {pick['link']}\n" body += f"🕒 {pick['scraped_at']}\n" body += "-" * 60 + "\n" msg.attach(MIMEText(body, 'plain', 'utf-8')) smtp_server = self.email_config['smtp_server'] smtp_port = self.email_config['smtp_port'] security = self.email_config.get('smtp_security', 'starttls') if security == 'ssl': server = smtplib.SMTP_SSL(smtp_server, smtp_port) else: server = smtplib.SMTP(smtp_server, smtp_port) server.ehlo() if security == 'starttls': server.starttls() server.ehlo() server.login(self.email_config['username'], self.email_config['password']) server.send_message(msg) server.quit() def send_webhook_notification(self, new_picks): """發送 Webhook 通知(Slack/Teams)""" message = f"🚨 發現 {len(new_picks)} 條新的 Barron's 股票推薦!\n\n" for pick in new_picks[:5]: # 限制5條避免訊息太長 message += f"📊 {pick['title']}\n" if pick.get('link'): message += f"🔗 {pick['link']}\n" payload = {"text": message} requests.post(self.webhook_url, json=payload) def send_discord_notification(self, new_picks): """發送 Discord 通知""" embed = { "title": f"📈 Barron's 新股票推薦", "description": f"發現 {len(new_picks)} 條新推薦", "color": 0x00ff00, "fields": [] } for pick in new_picks[:5]: embed["fields"].append({ "name": pick['title'][:256], "value": pick.get('link', '無連結')[:1024], "inline": False }) payload = {"embeds": [embed]} requests.post(self.discord_webhook, json=payload) def find_new_picks(self, current_picks, previous_picks): """找出新的股票推薦""" previous_hashes = {pick['hash'] for pick in previous_picks if 'hash' in pick} return [pick for pick in current_picks if pick['hash'] not in previous_hashes] def run_check(self): """執行一次檢查""" self.logger.info("開始檢查 Barron's 股票推薦...") self.stats['total_checks'] += 1 self.stats['last_check'] = datetime.now().isoformat() try: # 獲取和解析內容 html_content = self.fetch_page() if not html_content: return current_picks = self.parse_stock_picks(html_content) if not current_picks: self.logger.warning("未找到股票推薦內容") return # 載入之前的資料 previous_data = self.load_previous_data() previous_picks = previous_data.get('stock_picks', []) # 檢查新內容 new_picks = self.find_new_picks(current_picks, previous_picks) if new_picks: self.logger.info(f"🚨 發現 {len(new_picks)} 條新推薦") self.stats['new_picks_found'] += len(new_picks) # 發送通知 self.send_notifications(new_picks) # 儲存資料 new_data = { 'last_update': datetime.now().isoformat(), 'stock_picks': current_picks, 'stats': self.stats } self.save_data(new_data) return new_picks else: # 啟動後第一次且啟用 ALWAYS_NOTIFY_ON_STARTUP,則寄出目前內容 if (not self._first_check_done) and self.always_notify_on_startup and current_picks: self.logger.info("🟢 啟動首次檢查:沒有新內容,但已依設定寄出目前清單") # 發送通知(使用全部目前項目) self.send_notifications(current_picks) # 儲存資料(仍以目前清單為準) new_data = { 'last_update': datetime.now().isoformat(), 'stock_picks': current_picks, 'stats': self.stats } self.save_data(new_data) return current_picks self.logger.info("✅ 沒有發現新內容") return [] except Exception as e: self.logger.error(f"檢查過程中發生錯誤: {e}") self.stats['errors'] += 1 return None def signal_handler(self, signum, frame): """處理停止信號""" self.logger.info("收到停止信號,正在關閉...") self.running = False def run(self): """主運行循環""" # 註冊信號處理 signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) # 使用 schedule 庫進行調度 schedule.every(self.check_interval).seconds.do(self.run_check) self.logger.info(f"🚀 爬蟲已啟動,每 {self.check_interval} 秒檢查一次") # 立即執行一次檢查 self.run_check() self._first_check_done = True while self.running: schedule.run_pending() time.sleep(1) self.logger.info("爬蟲已停止") # Flask Web API app = Flask(__name__) crawler_instance = None @app.route('/health') def health_check(): """健康檢查端點""" return jsonify({"status": "healthy", "timestamp": datetime.now().isoformat()}) @app.route('/stats') def get_stats(): """獲取統計資料""" if crawler_instance: return jsonify(crawler_instance.stats) return jsonify({"error": "Crawler not initialized"}) @app.route('/check') def manual_check(): """手動觸發檢查""" if crawler_instance: result = crawler_instance.run_check() return jsonify({"result": f"Found {len(result) if result else 0} new picks"}) return jsonify({"error": "Crawler not initialized"}) @app.route('/notify_test') def notify_test(): """手動測試通知(預設只寄 Email)。可加參數 ?channel=email|webhook|discord""" if not crawler_instance: return jsonify({"error": "Crawler not initialized"}), 500 channel = (request.args.get('channel') or 'email').lower() test_pick = [{ 'title': f"[測試] Barron's 通知發送 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", 'link': 'https://example.com/test', 'scraped_at': datetime.now().isoformat(), 'hash': hashlib.md5(str(datetime.now().timestamp()).encode()).hexdigest()[:8] }] try: if channel == 'email': if not crawler_instance.email_config: return jsonify({"error": "Email config not set"}), 400 crawler_instance.send_email_notification(test_pick) elif channel == 'webhook': if not crawler_instance.webhook_url: return jsonify({"error": "Webhook URL not set"}), 400 crawler_instance.send_webhook_notification(test_pick) elif channel == 'discord': if not crawler_instance.discord_webhook: return jsonify({"error": "Discord webhook not set"}), 400 crawler_instance.send_discord_notification(test_pick) else: return jsonify({"error": f"Unsupported channel: {channel}"}), 400 return jsonify({"result": f"Test notification sent via {channel}"}) except Exception as e: crawler_instance.logger.error(f"測試通知發送失敗: {e}") return jsonify({"error": str(e)}), 500 def run_flask_app(): """運行 Flask 應用""" app.run(host='0.0.0.0', port=8080, debug=False) if __name__ == "__main__": # 創建爬蟲實例 crawler_instance = EnhancedBarronsCrawler() # 在背景執行 Flask API flask_thread = threading.Thread(target=run_flask_app, daemon=True) flask_thread.start() # 運行主爬蟲 crawler_instance.run()