from __future__ import annotations import hashlib from datetime import datetime from typing import List, Dict, Optional import requests from bs4 import BeautifulSoup from app.crawlers.base import BaseCrawler from app.services import notifications as notif class OpenInsiderCrawler(BaseCrawler): """Crawler for OpenInsider search results. Source: http://openinsider.com/search?q={symbol} Parses the HTML table and emits insider transactions. """ def __init__(self, config, logger, symbol: str = "PLTR"): super().__init__( name=f"OpenInsider 內部人交易:{symbol}", config=config, logger=logger, data_filename=f"openinsider_{symbol}.json", ) self.symbol = symbol.upper() self.url = f"http://openinsider.com/search?q={self.symbol}" self.headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/114.0 Safari/537.36' } def fetch_page(self) -> Optional[str]: try: resp = requests.get(self.url, headers=self.headers, timeout=30) resp.raise_for_status() return resp.text except requests.RequestException as e: self.logger.error(f"獲取 OpenInsider 頁面失敗: {e}") self.stats['errors'] += 1 return None def parse_items(self, html_content: str) -> List[Dict]: soup = BeautifulSoup(html_content, 'html.parser') # Find the main results table by looking for expected headers best_table = None candidate_tables = soup.find_all('table') self.logger.info(f"OpenInsider:發現 {len(candidate_tables)} 個 ") expected_headers = {'insider', 'insider name', 'ticker', 'trans type', 'transaction', 'trade date', 'filing date'} for tbl in candidate_tables: headers = [th.get_text(strip=True).lower() for th in tbl.find_all('th')] if not headers: continue hset = set(headers) if any(h in hset for h in expected_headers): best_table = tbl break if not best_table and candidate_tables: best_table = candidate_tables[0] if not best_table: self.logger.warning("OpenInsider:找不到結果表格") return [] # Build header index map (robust match) header_map: Dict[str, int] = {} header_texts = [th.get_text(strip=True).lower() for th in best_table.find_all('th')] for idx, text in enumerate(header_texts): header_map[text] = idx def find_idx(possible: List[str]) -> Optional[int]: for key in possible: if key in header_map: return header_map[key] # fuzzy contains for k, v in header_map.items(): if any(p in k for p in possible): return v return None idx_insider = find_idx(['insider name', 'insider', 'name']) idx_type = find_idx(['trans type', 'transaction', 'type']) idx_qty = find_idx(['qty', 'quantity', 'shares']) idx_price = find_idx(['price']) idx_ticker = find_idx(['ticker']) idx_trade_date = find_idx(['trade date', 'date']) idx_filing_date = find_idx(['filing date', 'filed']) rows = best_table.find_all('tr') # Skip header rows (those that contain th) data_rows = [r for r in rows if r.find('td')] items: List[Dict] = [] for row in data_rows[:100]: cols = row.find_all('td') def col_text(i: Optional[int]) -> str: if i is None or i >= len(cols): return '' return cols[i].get_text(strip=True) insider = col_text(idx_insider) or 'Unknown Insider' trans_type = col_text(idx_type) or 'N/A' qty = col_text(idx_qty) or 'N/A' price = col_text(idx_price) or 'N/A' ticker = (col_text(idx_ticker) or '').upper() trade_date = col_text(idx_trade_date) filing_date = col_text(idx_filing_date) if ticker and self.symbol not in ticker: # Keep results aligned to symbol query continue title = f"{self.symbol} {trans_type} - {insider} qty {qty} @ {price} on {trade_date}" if filing_date: title += f" (filed {filing_date})" hash_src = f"{self.symbol}|{insider}|{trans_type}|{qty}|{price}|{trade_date}|{filing_date}" items.append({ 'title': title, 'link': self.url, 'scraped_at': datetime.now().isoformat(), 'hash': hashlib.md5(hash_src.encode('utf-8')).hexdigest()[:12], }) self.logger.info(f"OpenInsider:解析完成,擷取 {len(items)} 筆交易") return items def _send_notifications(self, items: List[Dict]) -> None: subject = f"OpenInsider 內部人交易異動 - {self.symbol} ({len(items)}筆)" lines = [] for it in items[:10]: lines.append(f"• {it['title']}") body = ( f"發現 {len(items)} 筆新的內部人交易異動(OpenInsider):\n\n" + "\n".join(lines) + "\n\n" f"抓取時間:{datetime.now().isoformat()}\n來源:{self.url}" ) sent = False if self.config.email: try: notif.send_custom_email(subject, body, self.config.email) sent = True except Exception as e: self.logger.error(f"電子郵件通知失敗: {e}") if self.config.webhook_url: try: notif.send_text_webhook(subject + "\n\n" + body, self.config.webhook_url) sent = True except Exception as e: self.logger.error(f"Webhook 通知失敗: {e}") if self.config.discord_webhook: try: notif.send_text_discord(title=subject, description=f"{self.symbol} 內部人交易更新(OpenInsider)", lines=lines[:10], webhook=self.config.discord_webhook) sent = True except Exception as e: self.logger.error(f"Discord 通知失敗: {e}") if sent: self.stats['last_notification'] = datetime.now().isoformat()