142 lines
5.4 KiB
Python
142 lines
5.4 KiB
Python
from __future__ import annotations
|
||
|
||
import hashlib
|
||
from datetime import datetime
|
||
from typing import List, Dict, Optional
|
||
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
|
||
from app.crawlers.base import BaseCrawler
|
||
|
||
|
||
class OpenInsiderCrawler(BaseCrawler):
|
||
"""Crawler for OpenInsider search results.
|
||
|
||
Source: http://openinsider.com/search?q={symbol}
|
||
Parses the HTML table and emits insider transactions.
|
||
"""
|
||
|
||
def __init__(self, config, logger, symbol: str = "PLTR"):
|
||
super().__init__(
|
||
name=f"OpenInsider 內部人交易:{symbol}",
|
||
config=config,
|
||
logger=logger,
|
||
data_filename=f"openinsider_{symbol}.json",
|
||
)
|
||
self.symbol = symbol.upper()
|
||
self.url = f"http://openinsider.com/search?q={self.symbol}"
|
||
self.headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
|
||
'AppleWebKit/537.36 (KHTML, like Gecko) '
|
||
'Chrome/114.0 Safari/537.36'
|
||
}
|
||
|
||
def fetch_page(self) -> Optional[str]:
|
||
try:
|
||
resp = requests.get(self.url, headers=self.headers, timeout=30)
|
||
resp.raise_for_status()
|
||
return resp.text
|
||
except requests.RequestException as e:
|
||
self.logger.error(f"獲取 OpenInsider 頁面失敗: {e}")
|
||
self.stats['errors'] += 1
|
||
return None
|
||
|
||
def parse_items(self, html_content: str) -> List[Dict]:
|
||
soup = BeautifulSoup(html_content, 'html.parser')
|
||
|
||
# Find the main results table by looking for expected headers
|
||
best_table = None
|
||
candidate_tables = soup.find_all('table')
|
||
self.logger.info(f"OpenInsider:發現 {len(candidate_tables)} 個 <table>")
|
||
expected_headers = {'insider', 'insider name', 'ticker', 'trans type', 'transaction', 'trade date', 'filing date'}
|
||
for tbl in candidate_tables:
|
||
headers = [th.get_text(strip=True).lower() for th in tbl.find_all('th')]
|
||
if not headers:
|
||
continue
|
||
hset = set(headers)
|
||
if any(h in hset for h in expected_headers):
|
||
best_table = tbl
|
||
break
|
||
if not best_table and candidate_tables:
|
||
best_table = candidate_tables[0]
|
||
|
||
if not best_table:
|
||
self.logger.warning("OpenInsider:找不到結果表格")
|
||
return []
|
||
|
||
# Build header index map (robust match)
|
||
header_map: Dict[str, int] = {}
|
||
header_texts = [th.get_text(strip=True).lower() for th in best_table.find_all('th')]
|
||
for idx, text in enumerate(header_texts):
|
||
header_map[text] = idx
|
||
|
||
def find_idx(possible: List[str]) -> Optional[int]:
|
||
for key in possible:
|
||
if key in header_map:
|
||
return header_map[key]
|
||
# fuzzy contains
|
||
for k, v in header_map.items():
|
||
if any(p in k for p in possible):
|
||
return v
|
||
return None
|
||
|
||
idx_insider = find_idx(['insider name', 'insider', 'name'])
|
||
idx_type = find_idx(['trans type', 'transaction', 'type'])
|
||
idx_qty = find_idx(['qty', 'quantity', 'shares'])
|
||
idx_price = find_idx(['price'])
|
||
idx_ticker = find_idx(['ticker'])
|
||
idx_trade_date = find_idx(['trade date', 'date'])
|
||
idx_filing_date = find_idx(['filing date', 'filed'])
|
||
|
||
rows = best_table.find_all('tr')
|
||
# Skip header rows (those that contain th)
|
||
data_rows = [r for r in rows if r.find('td')]
|
||
|
||
items: List[Dict] = []
|
||
for row in data_rows[:100]:
|
||
cols = row.find_all('td')
|
||
def col_text(i: Optional[int]) -> str:
|
||
if i is None or i >= len(cols):
|
||
return ''
|
||
return cols[i].get_text(strip=True)
|
||
|
||
insider = col_text(idx_insider) or 'Unknown Insider'
|
||
trans_type = col_text(idx_type) or 'N/A'
|
||
qty = col_text(idx_qty) or 'N/A'
|
||
price = col_text(idx_price) or 'N/A'
|
||
ticker = (col_text(idx_ticker) or '').upper()
|
||
trade_date = col_text(idx_trade_date)
|
||
filing_date = col_text(idx_filing_date)
|
||
|
||
if ticker and self.symbol not in ticker:
|
||
# Keep results aligned to symbol query
|
||
continue
|
||
|
||
title = f"{self.symbol} {trans_type} - {insider} qty {qty} @ {price} on {trade_date}"
|
||
if filing_date:
|
||
title += f" (filed {filing_date})"
|
||
hash_src = f"{self.symbol}|{insider}|{trans_type}|{qty}|{price}|{trade_date}|{filing_date}"
|
||
items.append({
|
||
'title': title,
|
||
'link': self.url,
|
||
'scraped_at': datetime.now().isoformat(),
|
||
'hash': hashlib.md5(hash_src.encode('utf-8')).hexdigest()[:12],
|
||
})
|
||
|
||
self.logger.info(f"OpenInsider:解析完成,擷取 {len(items)} 筆交易")
|
||
return items
|
||
|
||
# Use BaseCrawler._send_notifications for unified flow
|
||
|
||
def _build_email(self, items: List[Dict]):
|
||
subject = f"OpenInsider 內部人交易異動 - {self.symbol} ({len(items)}筆)"
|
||
lines = []
|
||
for it in items[:10]:
|
||
lines.append(f"• {it.get('title','')}")
|
||
body = (
|
||
f"發現 {len(items)} 筆新的內部人交易異動(OpenInsider):\n\n" + "\n".join(lines) + "\n\n"
|
||
f"抓取時間:{datetime.now().isoformat()}\n來源:{self.url}"
|
||
)
|
||
return subject, body
|