from __future__ import annotations import hashlib import os from datetime import datetime from typing import List, Dict, Optional import requests from bs4 import BeautifulSoup from app.crawlers.base import BaseCrawler class OpenInsiderTopCrawler(BaseCrawler): """Crawler for OpenInsider Top-of-the-day pages. Pages: - http://openinsider.com/top-insider-sales-of-the-day - http://openinsider.com/top-insider-purchases-of-the-day - http://openinsider.com/top-officer-purchases-of-the-day Filters rows where Value/Amount >= 1,000,000 (default, configurable via env var INSIDER_MIN_AMOUNT). """ DEFAULT_URLS = [ "http://openinsider.com/top-insider-sales-of-the-day", "http://openinsider.com/top-insider-purchases-of-the-day", "http://openinsider.com/top-officer-purchases-of-the-day", ] def __init__(self, config, logger, urls: Optional[List[str]] = None, min_amount: Optional[int] = None): super().__init__( name="OpenInsider 當日大額內部人交易", config=config, logger=logger, data_filename="openinsider_top.json", ) self.urls = urls or self.DEFAULT_URLS # Allow override via env var INSIDER_MIN_AMOUNT env_min = os.getenv('INSIDER_MIN_AMOUNT') self.min_amount = ( int(env_min) if (env_min and env_min.isdigit()) else (min_amount if min_amount is not None else 1_000_000) ) self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/114.0 Safari/537.36' } # For multi-page crawler, fetch_page is not used; keep for interface def fetch_page(self) -> Optional[str]: return "" def _fetch_one(self, url: str) -> Optional[str]: try: resp = requests.get(url, headers=self.headers, timeout=30) resp.raise_for_status() return resp.text except requests.RequestException as e: self.logger.error(f"獲取 OpenInsider 頁面失敗: {e} ({url})") self.stats['errors'] += 1 return None @staticmethod def _parse_money(val: str) -> Optional[int]: if not val: return None s = val.strip() # Remove $ and commas and any parentheses for ch in ['$', ',', '(', ')', '+']: s = s.replace(ch, '') # Some cells may include text like "$1,234,567 (incl. options)" # Keep only leading numeric part num = '' for c in s: if c.isdigit(): num += c elif c in ' .': continue else: break if not num: return None try: return int(num) except ValueError: return None def parse_items_from_html(self, html_content: str, url: str) -> List[Dict]: soup = BeautifulSoup(html_content, 'html.parser') items: List[Dict] = [] # Find table with headers we care about tables = soup.find_all('table') target_table = None expected_any = {'value', 'amount', 'qty', 'ticker', 'transaction', 'trans type', 'trade date'} for tbl in tables: headers = [th.get_text(strip=True).lower() for th in tbl.find_all('th')] if not headers: continue hset = set(headers) if any(h in hset for h in expected_any): target_table = tbl break if not target_table: return items header_map = {} headers = [th.get_text(strip=True).lower() for th in target_table.find_all('th')] for idx, h in enumerate(headers): header_map[h] = idx def find_idx(possible): for k in possible: if k in header_map: return header_map[k] for k, v in header_map.items(): if any(p in k for p in possible): return v return None idx_ticker = find_idx(['ticker']) idx_insider = find_idx(['insider', 'insider name', 'name']) idx_type = find_idx(['trans type', 'transaction', 'type']) idx_qty = find_idx(['qty', 'quantity', 'shares']) idx_price = find_idx(['price']) idx_value = find_idx(['value', 'amount']) idx_trade_date = find_idx(['trade date', 'date']) rows = [r for r in target_table.find_all('tr') if r.find('td')] for row in rows: cols = row.find_all('td') def cell(i): if i is None or i >= len(cols): return '' return cols[i].get_text(strip=True) ticker = (cell(idx_ticker) or '').upper() insider = cell(idx_insider) trans_type = cell(idx_type) qty = cell(idx_qty) price = cell(idx_price) value_text = cell(idx_value) trade_date = cell(idx_trade_date) amount = self._parse_money(value_text) if amount is None or amount < self.min_amount: continue title = f"{ticker} {trans_type} - {insider} qty {qty} @ {price} value ${amount:,} on {trade_date}" hash_src = f"{ticker}|{insider}|{trans_type}|{qty}|{price}|{trade_date}|{amount}|{url}" items.append({ 'title': title, 'link': url, 'scraped_at': datetime.now().isoformat(), 'hash': hashlib.md5(hash_src.encode('utf-8')).hexdigest()[:12], }) return items def parse_items(self, html_content: str) -> List[Dict]: # Not used; we fetch multiple pages in run_check return [] # Override run_check to handle multiple pages and combine results def run_check(self) -> Optional[List[Dict]]: self.logger.info(f"開始檢查 {self.name} (閾值 ${self.min_amount:,}) ...") self.stats['total_checks'] += 1 self.stats['last_check'] = datetime.now().isoformat() try: combined: List[Dict] = [] for url in self.urls: html = self._fetch_one(url) if not html: continue items = self.parse_items_from_html(html, url) combined.extend(items) if not combined: self.logger.info("✅ 沒有符合金額門檻的交易") return [] prev = self._load_previous() new_items = self.find_new(combined, prev) if new_items: self.logger.info(f"🚨 發現 {len(new_items)} 筆新交易(>= ${self.min_amount:,})") self.stats['new_picks_found'] += len(new_items) self._send_notifications(new_items) self._save_current(combined) return new_items # Notify on first run if requested if (not self._first_check_done) and self.config.always_notify_on_startup and combined: self.logger.info("🟢 啟動首次檢查:無新內容,但依設定寄出目前清單") self._send_notifications(combined) self._save_current(combined) return combined self.logger.info("✅ 沒有發現新內容") return [] except Exception as e: self.logger.error(f"檢查過程錯誤: {e}") self.stats['errors'] += 1 return None def _load_previous(self) -> List[Dict]: from app.services import storage return storage.load_json(self.data_path).get('stock_picks', []) def _save_current(self, items: List[Dict]) -> None: from app.services import storage storage.save_json(self.data_path, { 'last_update': datetime.now().isoformat(), 'stock_picks': items, 'stats': self.stats, }) def _build_email(self, items: List[Dict]): subject = f"OpenInsider 當日大額內部人交易(≥${self.min_amount:,}) - {len(items)} 筆" lines = [] for it in items[:10]: lines.append(f"• {it.get('title','')}") body = ( f"發現 {len(items)} 筆符合金額門檻的內部人交易(OpenInsider):\n\n" + "\n".join(lines) + "\n\n" f"抓取時間:{datetime.now().isoformat()}\n來源:\n- " + "\n- ".join(self.urls) ) return subject, body