Skip to content

Commit

Permalink
first commit for V0.3.22
Browse files Browse the repository at this point in the history
  • Loading branch information
bigbrother666sh committed Dec 5, 2024
1 parent 6125154 commit f18d9ba
Show file tree
Hide file tree
Showing 10 changed files with 640 additions and 0 deletions.
30 changes: 30 additions & 0 deletions core/custom_process/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
wiseflow 致力于通过一套通用流程(使用视觉大模型驱动的可以自主使用爬虫工具的智能体)处理所有页面。

不过我们也为客户保留自定义处理的灵活性。

为添加自定义处理逻辑单元请遵照如下规范:

1、逻辑处理单元应该是一个函数(而不是类);

2、入参只接受两个:url(要处理的 url,请只提供一个 url,而不是列表,因为主函数会处理队列逻辑) 和 logger 对象(这意味着不要为你的自定义处理单元添加日志对象);

3、出参需要返回解析后的文章详情(dict),和信息列表(list),以及解析出来的需要添加到工作队列的 url 结合(元组)。出参必须同时返回这三个结果,没有的话,可以分别传出 {} [] set()

article 的字典必须包括 'url'(str), 'title'(str), 'author'(str), 'publish_date'(date 日期对象,注意是日期)四个键值,额外可以添加一个 'sceenshort' 值是一个 png 文件的路径。

4、在 core/custom_crawlers/__init__.py 中注册,参考:

```pyhton
from .mp_crawler import mp_crawler
customer_crawler_map = {'mp.weixin.qq.com': mp_crawler}
```

注意键使用域名,可以使用 urllib.parse获取:

```pyhton
from urllib.parse import urlparse
parsed_url = urlparse("site's url")
domain = parsed_url.netloc
```
4 changes: 4 additions & 0 deletions core/custom_process/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .mp_process import mp_crawler

customer_crawler_map = {}
# customer_crawler_map = {'mp.weixin.qq.com': mp_crawler}
109 changes: 109 additions & 0 deletions core/custom_process/mp_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# -*- coding: utf-8 -*-
# warining: the mp_crawler will be deprecated in future version, we try to use general_process handle mp articles

from core.agents import get_info
import httpx
from bs4 import BeautifulSoup
from datetime import datetime, date
import re
import asyncio


header = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/605.1.15 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/604.1 Edg/112.0.100.0'}


async def mp_crawler(url: str, logger) -> tuple[dict, list, set]:
if not url.startswith('https://mp.weixin.qq.com') and not url.startswith('http://mp.weixin.qq.com'):
logger.warning(f'{url} is not a mp url, you should not use this function')
return {}, [], set()

url = url.replace("http://", "https://", 1)

async with httpx.AsyncClient() as client:
for retry in range(2):
try:
response = await client.get(url, headers=header, timeout=30)
response.raise_for_status()
break
except Exception as e:
if retry < 1:
logger.info(f"{e}\nwaiting 1min")
await asyncio.sleep(60)
else:
logger.warning(e)
return {}, [], set()

soup = BeautifulSoup(response.text, 'html.parser')

if url.startswith('https://mp.weixin.qq.com/mp/appmsgalbum'):
# 文章目录
urls = {li.attrs['data-link'].replace("http://", "https://", 1) for li in soup.find_all('li', class_='album__list-item')}
simple_urls = set()
for url in urls:
cut_off_point = url.find('chksm=')
if cut_off_point != -1:
url = url[:cut_off_point - 1]
simple_urls.add(url)
return {}, [], simple_urls

# Get the original release date first
pattern = r"var createTime = '(\d{4}-\d{2}-\d{2}) \d{2}:\d{2}'"
match = re.search(pattern, response.text)

if match:
date_only = match.group(1)
publish_time = datetime.strptime(date_only, "%Y-%m-%d")
else:
publish_time = date.today()

# Get description content from < meta > tag
try:
meta_description = soup.find('meta', attrs={'name': 'description'})
summary = meta_description['content'].strip() if meta_description else ''
# card_info = soup.find('div', id='img-content')
# Parse the required content from the < div > tag
rich_media_title = soup.find('h1', id='activity-name').text.strip() \
if soup.find('h1', id='activity-name') \
else soup.find('h1', class_='rich_media_title').text.strip()
profile_nickname = soup.find('div', class_='wx_follow_nickname').text.strip()
except Exception as e:
logger.warning(f"not mp format: {url}\n{e}")
# For mp.weixin.qq.com types, mp_crawler won't work, and most likely neither will the other two
return {}, [], set()

if not rich_media_title or not profile_nickname:
logger.warning(f"failed to analysis {url}, no title or profile_nickname")
return {}, [], set()

# Parse text and image links within the content interval
# Todo This scheme is compatible with picture sharing MP articles, but the pictures of the content cannot be obtained,
# because the structure of this part is completely different, and a separate analysis scheme needs to be written
# (but the proportion of this type of article is not high).
texts = []
content_area = soup.find('div', id='js_content')
if content_area:
# 提取文本
for section in content_area.find_all(['section', 'p'], recursive=False): # 遍历顶级section
text = section.get_text(separator=' ', strip=True)
if text and text not in texts:
texts.append(text)
cleaned_texts = [t for t in texts if t.strip()]
content = '\n'.join(cleaned_texts)
else:
logger.warning(f"failed to analysis contents {url}")
return {}, [], set()
if content:
content = f"[from {profile_nickname}]{content}"
else:
# If the content does not have it, but the summary has it, it means that it is an mp of the picture sharing type.
# At this time, you can use the summary as the content.
content = f"[from {profile_nickname}]{summary}"

infos = get_info(content, logger)
article = {'url': url,
'title': rich_media_title,
'author': profile_nickname,
'publish_date': publish_time}

return article, infos, set()
155 changes: 155 additions & 0 deletions core/general_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# -*- coding: utf-8 -*-
from utils.pb_api import PbTalker
from utils.general_utils import get_logger
from agents.get_info import GeneralInfoExtractor
from bs4 import BeautifulSoup
import os
import json
import asyncio
from custom_process import customer_crawler_map
from urllib.parse import urlparse, urljoin
import hashlib
from crawlee.playwright_crawler import PlaywrightCrawler, PlaywrightCrawlingContext


project_dir = os.environ.get("PROJECT_DIR", "")
if project_dir:
os.makedirs(project_dir, exist_ok=True)
os.environ['CRAWLEE_STORAGE_DIR'] = os.path.join(project_dir, 'crawlee_storage')
screenshot_dir = os.path.join(project_dir, 'crawlee_storage', 'screenshots')

wiseflow_logger = get_logger('general_process', f'{project_dir}/general_process.log')
pb = PbTalker(wiseflow_logger)
ie = GeneralInfoExtractor(pb, wiseflow_logger)

# Global variables
working_list = set()
existing_urls = {url['url'] for url in pb.read(collection_name='articles', fields=['url']) if url['url']}
lock = asyncio.Lock()

async def save_to_pb(article: dict, infos: list):
# saving to pb process
screenshot = article.pop('screenshot') if 'screenshot' in article else None
article_id = pb.add(collection_name='articles', body=article)
if not article_id:
wiseflow_logger.error('add article failed, writing to cache_file')
with open(os.path.join(project_dir, 'cache_articles.json'), 'a', encoding='utf-8') as f:
json.dump(article, f, ensure_ascii=False, indent=4)
return
if screenshot:
file = open(screenshot, 'rb')
file_name = os.path.basename(screenshot)
message = pb.upload('articles', article_id, 'screenshot', file_name, file)
file.close()
if not message:
wiseflow_logger.warning(f'{article_id} upload screenshot failed, file location: {screenshot}')

for info in infos:
info['articles'] = [article_id]
_ = pb.add(collection_name='agents', body=info)
if not _:
wiseflow_logger.error('add insight failed, writing to cache_file')
with open(os.path.join(project_dir, 'cache_insights.json'), 'a', encoding='utf-8') as f:
json.dump(info, f, ensure_ascii=False, indent=4)


async def pipeline(url: str):
global working_list, existing_urls
working_list.add(url)
crawler = PlaywrightCrawler(
# Limit the crawl to max requests. Remove or increase it for crawling all links.
max_requests_per_crawl=100,
)
# Define the default request handler, which will be called for every request.
@crawler.router.default_handler
async def request_handler(context: PlaywrightCrawlingContext) -> None:
context.log.info(f'Processing {context.request.url} ...')
# Handle dialogs (alerts, confirms, prompts)
async def handle_dialog(dialog):
context.log.info(f'Closing dialog: {dialog.message}')
await dialog.accept()

context.page.on('dialog', handle_dialog)

# Extract data from the page.
# future work: try to use a visual-llm do all the job...
text = await context.page.inner_text('body')
wiseflow_logger.debug(f"got text: {text}")

html = await context.page.inner_html('body')
soup = BeautifulSoup(html, 'html.parser')
links = soup.find_all('a', href=True)
base_url = context.request.url
link_dict = {}
for a in links:
new_url = a.get('href')
text = a.text.strip()
if new_url and text:
absolute_url = urljoin(base_url, new_url)
link_dict[text] = absolute_url
wiseflow_logger.debug(f'found {len(link_dict)} more links')

screenshot_file_name = f"{hashlib.sha256(context.request.url.encode()).hexdigest()}.png"
await context.page.screenshot(path=os.path.join(screenshot_dir, screenshot_file_name), full_page=True)
wiseflow_logger.debug(f'screenshot saved to {screenshot_file_name}')

# get infos by llm
infos, author, publish_date, related_urls = await ie(text, link_dict, base_url, wiseflow_logger)
if infos:
# get author and publish date by llm
wiseflow_logger.debug(f'LLM result -- author: {author}, publish_date: {publish_date}')
article = {
'url': context.request.url,
'title': await context.page.title(),
'author': author,
'publish_date': publish_date,
'screenshot': os.path.join(screenshot_dir, screenshot_file_name),
'tags': [info['name'] for info in infos]
}
await save_to_pb(article, infos)

# find any related urls
related_urls = await get_more_related_urls(html, wiseflow_logger)
wiseflow_logger.debug(f'got {len(related_urls)} more urls')
if related_urls:
async with lock:
new_urls = related_urls - existing_urls
working_list.update(new_urls)

# todo: use llm to determine next action

while working_list:
async with lock:
if not working_list:
break
url = working_list.pop()
existing_urls.add(url)
parsed_url = urlparse(url)
domain = parsed_url.netloc
if domain in customer_crawler_map:
wiseflow_logger.debug(f'routed to customer process for {domain}')
try:
article, infos, related_urls = await customer_crawler_map[domain](url, wiseflow_logger)
except Exception as e:
wiseflow_logger.error(f'error occurred in crawling {url}: {e}')
continue

if infos and article:
wiseflow_logger.debug("receiving new infos from customer crawler, saving to pb")
article['tags'] = [info['name'] for info in infos]
await save_to_pb(article, infos)
if related_urls:
wiseflow_logger.debug('receiving new related_urls from customer crawler, adding to working_list')
new_urls = related_urls - existing_urls
working_list.update(new_urls)
continue
try:
await crawler.run([url])
except Exception as e:
wiseflow_logger.error(f'error occurred in crawling {url}: {e}')


if __name__ == '__main__':
import asyncio

asyncio.run(pipeline())
Loading

0 comments on commit f18d9ba

Please sign in to comment.