Files
stock-info-crawler/app/crawlers/openinsider_top.py
MH Hung e015eef61e fix(openinsider_top): treat negative Value on sales as absolute for threshold parsing
Handle strings like '-,234,567' by normalizing and parsing magnitude; ensures sales rows are included when exceeding INSIDER_MIN_AMOUNT.
2025-09-09 21:26:52 +08:00

243 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import hashlib
import os
from datetime import datetime
from typing import List, Dict, Optional
import requests
from bs4 import BeautifulSoup
from app.crawlers.base import BaseCrawler
class OpenInsiderTopCrawler(BaseCrawler):
"""Crawler for OpenInsider Top-of-the-day pages.
Pages:
- http://openinsider.com/top-insider-sales-of-the-day
- http://openinsider.com/top-insider-purchases-of-the-day
- http://openinsider.com/top-officer-purchases-of-the-day
Filters rows where Value/Amount >= 1,000,000 (default, configurable via
env var INSIDER_MIN_AMOUNT).
"""
DEFAULT_URLS = [
"http://openinsider.com/top-insider-sales-of-the-day",
"http://openinsider.com/top-insider-purchases-of-the-day",
"http://openinsider.com/top-officer-purchases-of-the-day",
]
def __init__(self, config, logger, urls: Optional[List[str]] = None, min_amount: Optional[int] = None):
super().__init__(
name="OpenInsider 當日大額內部人交易",
config=config,
logger=logger,
data_filename="openinsider_top.json",
)
self.urls = urls or self.DEFAULT_URLS
# Allow override via env var INSIDER_MIN_AMOUNT
env_min = os.getenv('INSIDER_MIN_AMOUNT')
self.min_amount = (
int(env_min) if (env_min and env_min.isdigit()) else (min_amount if min_amount is not None else 1_000_000)
)
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/114.0 Safari/537.36'
}
# For multi-page crawler, fetch_page is not used; keep for interface
def fetch_page(self) -> Optional[str]:
return ""
def _fetch_one(self, url: str) -> Optional[str]:
try:
resp = requests.get(url, headers=self.headers, timeout=30)
resp.raise_for_status()
return resp.text
except requests.RequestException as e:
self.logger.error(f"獲取 OpenInsider 頁面失敗: {e} ({url})")
self.stats['errors'] += 1
return None
@staticmethod
def _parse_money(val: str) -> Optional[int]:
"""Parse money text into absolute integer dollars.
Handles formats like:
- "$1,234,567"
- "($1,234,567)" (treat as negative but return magnitude)
- "-$1,234,567" (treat as negative but return magnitude)
- "1,234,567"
Returns None if no digits found.
"""
if not val:
return None
s = val.strip()
# Detect negative indicators before stripping
is_negative = s.startswith('-') or '(' in s
# Normalize: remove currency symbols, commas, parentheses, plus/minus, spaces
for ch in ['$', ',', '(', ')', '+', '-', ' ']:
s = s.replace(ch, '')
# Keep only leading digits
num = ''
for c in s:
if c.isdigit():
num += c
elif c == '.':
# ignore decimal points; values appear to be whole dollars
continue
else:
break
if not num:
return None
try:
value = int(num)
# We return absolute magnitude; sign is not needed for threshold
return abs(value)
except ValueError:
return None
def parse_items_from_html(self, html_content: str, url: str) -> List[Dict]:
soup = BeautifulSoup(html_content, 'html.parser')
items: List[Dict] = []
# Find table with headers we care about
tables = soup.find_all('table')
target_table = None
expected_any = {'value', 'amount', 'qty', 'ticker', 'transaction', 'trans type', 'trade date'}
for tbl in tables:
headers = [th.get_text(strip=True).lower() for th in tbl.find_all('th')]
if not headers:
continue
hset = set(headers)
if any(h in hset for h in expected_any):
target_table = tbl
break
if not target_table:
return items
header_map = {}
headers = [th.get_text(strip=True).lower() for th in target_table.find_all('th')]
for idx, h in enumerate(headers):
header_map[h] = idx
def find_idx(possible):
for k in possible:
if k in header_map:
return header_map[k]
for k, v in header_map.items():
if any(p in k for p in possible):
return v
return None
idx_ticker = find_idx(['ticker'])
idx_insider = find_idx(['insider', 'insider name', 'name'])
idx_type = find_idx(['trans type', 'transaction', 'type'])
idx_qty = find_idx(['qty', 'quantity', 'shares'])
idx_price = find_idx(['price'])
idx_value = find_idx(['value', 'amount'])
idx_trade_date = find_idx(['trade date', 'date'])
rows = [r for r in target_table.find_all('tr') if r.find('td')]
for row in rows:
cols = row.find_all('td')
def cell(i):
if i is None or i >= len(cols):
return ''
return cols[i].get_text(strip=True)
ticker = (cell(idx_ticker) or '').upper()
insider = cell(idx_insider)
trans_type = cell(idx_type)
qty = cell(idx_qty)
price = cell(idx_price)
value_text = cell(idx_value)
trade_date = cell(idx_trade_date)
amount = self._parse_money(value_text)
if amount is None or amount < self.min_amount:
continue
title = f"{ticker} {trans_type} - {insider} qty {qty} @ {price} value ${amount:,} on {trade_date}"
hash_src = f"{ticker}|{insider}|{trans_type}|{qty}|{price}|{trade_date}|{amount}|{url}"
items.append({
'title': title,
'link': url,
'scraped_at': datetime.now().isoformat(),
'hash': hashlib.md5(hash_src.encode('utf-8')).hexdigest()[:12],
})
return items
def parse_items(self, html_content: str) -> List[Dict]:
# Not used; we fetch multiple pages in run_check
return []
# Override run_check to handle multiple pages and combine results
def run_check(self) -> Optional[List[Dict]]:
self.logger.info(f"開始檢查 {self.name} (閾值 ${self.min_amount:,}) ...")
self.stats['total_checks'] += 1
self.stats['last_check'] = datetime.now().isoformat()
try:
combined: List[Dict] = []
for url in self.urls:
html = self._fetch_one(url)
if not html:
continue
items = self.parse_items_from_html(html, url)
combined.extend(items)
if not combined:
self.logger.info("✅ 沒有符合金額門檻的交易")
return []
prev = self._load_previous()
new_items = self.find_new(combined, prev)
if new_items:
self.logger.info(f"🚨 發現 {len(new_items)} 筆新交易(>= ${self.min_amount:,}")
self.stats['new_picks_found'] += len(new_items)
self._send_notifications(new_items)
self._save_current(combined)
return new_items
# Notify on first run if requested
if (not self._first_check_done) and self.config.always_notify_on_startup and combined:
self.logger.info("🟢 啟動首次檢查:無新內容,但依設定寄出目前清單")
self._send_notifications(combined)
self._save_current(combined)
return combined
self.logger.info("✅ 沒有發現新內容")
return []
except Exception as e:
self.logger.error(f"檢查過程錯誤: {e}")
self.stats['errors'] += 1
return None
def _load_previous(self) -> List[Dict]:
from app.services import storage
return storage.load_json(self.data_path).get('stock_picks', [])
def _save_current(self, items: List[Dict]) -> None:
from app.services import storage
storage.save_json(self.data_path, {
'last_update': datetime.now().isoformat(),
'stock_picks': items,
'stats': self.stats,
})
def _build_email(self, items: List[Dict]):
subject = f"OpenInsider 當日大額內部人交易(≥${self.min_amount:,} - {len(items)}"
lines = []
for it in items[:10]:
lines.append(f"{it.get('title','')}")
body = (
f"發現 {len(items)} 筆符合金額門檻的內部人交易OpenInsider\n\n" + "\n".join(lines) + "\n\n"
f"抓取時間:{datetime.now().isoformat()}\n來源:\n- " + "\n- ".join(self.urls)
)
return subject, body