refactor: modularize project structure and separate API from crawlers

- Introduce app/ package with config, services (storage, notifications), API server, and crawler modules
- Add BaseCrawler and BarronsCrawler; extract notifications and storage
- Keep enhanced_crawler.py as back-compat entry delegating to app.runner
- Add template crawler for future sites
- Update README with new structure and usage
- Extend .env.template with DATA_DIR/LOG_DIR options
This commit is contained in:
2025-09-04 21:39:24 +08:00
parent 099f156e6f
commit 58cc979b5b
12 changed files with 663 additions and 666 deletions

70
app/crawlers/barrons.py Normal file
View File

@@ -0,0 +1,70 @@
from __future__ import annotations
import hashlib
from datetime import datetime
from typing import List, Dict, Optional
import requests
from bs4 import BeautifulSoup
from app.crawlers.base import BaseCrawler
class BarronsCrawler(BaseCrawler):
def __init__(self, config, logger):
super().__init__(name="Barron's 股票推薦", config=config, logger=logger, data_filename='barrons_data.json')
self.url = "https://www.barrons.com/market-data/stocks/stock-picks?mod=BOL_TOPNAV"
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
def fetch_page(self) -> Optional[str]:
try:
resp = requests.get(self.url, headers=self.headers, timeout=30)
resp.raise_for_status()
return resp.text
except requests.RequestException as e:
self.logger.error(f"獲取網頁失敗: {e}")
self.stats['errors'] += 1
return None
def parse_items(self, html_content: str) -> List[Dict]:
soup = BeautifulSoup(html_content, 'html.parser')
stock_picks: List[Dict] = []
try:
selectors = [
'article[data-module="ArticleItem"]',
'.WSJTheme--headline',
'.MarketDataModule-headline',
'h3 a, h4 a',
'[data-module] a[href*="articles"]',
]
elements = []
for selector in selectors:
elements = soup.select(selector)
if elements:
self.logger.info(f"使用選擇器找到內容: {selector}")
break
for element in elements[:10]:
title = element.get_text(strip=True) if element.name != 'a' else element.get_text(strip=True)
link = element.get('href') if element.name == 'a' else element.find('a', href=True)
if isinstance(link, dict):
link = link.get('href')
elif hasattr(link, 'get'):
link = link.get('href')
if link and isinstance(link, str) and link.startswith('/'):
link = "https://www.barrons.com" + link
if title and len(title) > 10:
stock_picks.append({
'title': title,
'link': link,
'scraped_at': datetime.now().isoformat(),
'hash': hashlib.md5(title.encode()).hexdigest()[:8],
})
return stock_picks
except Exception as e:
self.logger.error(f"解析網頁內容失敗: {e}")
self.stats['errors'] += 1
return []

136
app/crawlers/base.py Normal file
View File

@@ -0,0 +1,136 @@
from __future__ import annotations
import hashlib
import time
import signal
from abc import ABC, abstractmethod
from datetime import datetime
from typing import List, Dict, Optional
import schedule
from app.config import AppConfig
from app.services import storage
from app.services import notifications as notif
class BaseCrawler(ABC):
def __init__(self, name: str, config: AppConfig, logger, data_filename: str):
self.name = name
self.config = config
self.logger = logger
self.data_path = storage.data_file_path(config.data_dir, data_filename)
self.running = True
self._first_check_done = False
self.stats = {
'start_time': datetime.now().isoformat(),
'total_checks': 0,
'new_picks_found': 0,
'last_check': None,
'last_notification': None,
'errors': 0,
}
# --- Abstract site-specific hooks ---
@abstractmethod
def fetch_page(self) -> Optional[str]:
...
@abstractmethod
def parse_items(self, html_content: str) -> List[Dict]:
...
# --- Generic helpers ---
def find_new(self, current: List[Dict], previous: List[Dict]) -> List[Dict]:
prev_hashes = {p.get('hash') for p in previous if 'hash' in p}
return [p for p in current if p.get('hash') not in prev_hashes]
# --- Main check ---
def run_check(self) -> Optional[List[Dict]]:
self.logger.info(f"開始檢查 {self.name}...")
self.stats['total_checks'] += 1
self.stats['last_check'] = datetime.now().isoformat()
try:
html = self.fetch_page()
if not html:
return []
current = self.parse_items(html)
if not current:
self.logger.warning("未找到內容")
return []
prev = storage.load_json(self.data_path).get('stock_picks', [])
new_items = self.find_new(current, prev)
if new_items:
self.logger.info(f"🚨 發現 {len(new_items)} 條新內容")
self.stats['new_picks_found'] += len(new_items)
self._send_notifications(new_items)
storage.save_json(self.data_path, {
'last_update': datetime.now().isoformat(),
'stock_picks': current,
'stats': self.stats,
})
return new_items
# Optionally notify on first run
if (not self._first_check_done) and self.config.always_notify_on_startup and current:
self.logger.info("🟢 啟動首次檢查:無新內容,但依設定寄出目前清單")
self._send_notifications(current)
storage.save_json(self.data_path, {
'last_update': datetime.now().isoformat(),
'stock_picks': current,
'stats': self.stats,
})
return current
self.logger.info("✅ 沒有發現新內容")
return []
except Exception as e:
self.logger.error(f"檢查過程錯誤: {e}")
self.stats['errors'] += 1
return None
def _send_notifications(self, items: List[Dict]) -> None:
sent = False
if self.config.email:
try:
notif.send_email(items, self.config.email)
sent = True
except Exception as e:
self.logger.error(f"電子郵件通知失敗: {e}")
if self.config.webhook_url:
try:
notif.send_webhook(items, self.config.webhook_url)
sent = True
except Exception as e:
self.logger.error(f"Webhook 通知失敗: {e}")
if self.config.discord_webhook:
try:
notif.send_discord(items, self.config.discord_webhook)
sent = True
except Exception as e:
self.logger.error(f"Discord 通知失敗: {e}")
if sent:
self.stats['last_notification'] = datetime.now().isoformat()
# --- Run loop ---
def _signal_handler(self, signum, frame):
self.logger.info("收到停止信號,正在關閉...")
self.running = False
def run(self):
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
schedule.every(self.config.check_interval).seconds.do(self.run_check)
self.logger.info(f"🚀 爬蟲已啟動,每 {self.config.check_interval} 秒檢查一次")
self.run_check()
self._first_check_done = True
while self.running:
schedule.run_pending()
time.sleep(1)
self.logger.info("爬蟲已停止")

53
app/crawlers/template.py Normal file
View File

@@ -0,0 +1,53 @@
from __future__ import annotations
from typing import List, Dict, Optional
import requests
from bs4 import BeautifulSoup
from datetime import datetime
import hashlib
from app.crawlers.base import BaseCrawler
class TemplateCrawler(BaseCrawler):
"""範本:建立新站點時複製本檔並改名。
必要實作fetch_page 與 parse_items
- parse_items 請回傳包含 title、可選 link、scraped_at、hash 的清單
"""
def __init__(self, config, logger):
super().__init__(name="Template Site", config=config, logger=logger, data_filename='template_site.json')
self.url = "https://example.com"
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
def fetch_page(self) -> Optional[str]:
try:
resp = requests.get(self.url, headers=self.headers, timeout=30)
resp.raise_for_status()
return resp.text
except requests.RequestException as e:
self.logger.error(f"獲取網頁失敗: {e}")
self.stats['errors'] += 1
return None
def parse_items(self, html_content: str) -> List[Dict]:
soup = BeautifulSoup(html_content, 'html.parser')
items: List[Dict] = []
# TODO: 依站點結構實作解析邏輯,以下為示意
for a in soup.select('a')[:5]:
title = a.get_text(strip=True)
link = a.get('href')
if title and len(title) > 5:
items.append({
'title': title,
'link': link,
'scraped_at': datetime.now().isoformat(),
'hash': hashlib.md5(title.encode()).hexdigest()[:8],
})
return items