-
Notifications
You must be signed in to change notification settings - Fork 47
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
13 changed files
with
257 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
__pycache__/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
import base64 | ||
import pickle | ||
import os | ||
from io import BytesIO | ||
from os import path | ||
|
||
import aiohttp | ||
from PIL import Image | ||
|
||
from .._res import Res as R | ||
from hoshino.service import Service | ||
from hoshino.typing import HoshinoBot, CQEvent | ||
from hoshino.util import DailyNumberLimiter, FreqLimiter | ||
from .._util import extract_url_from_event | ||
from .config import * | ||
from .data_source import detect_face, concat, KyaruHead, auto_head, gen_head | ||
|
||
conf_path = path.join(path.dirname(__file__), 'user_conf') | ||
sv = Service('接头霸王') | ||
_nlt = DailyNumberLimiter(DAILY_MAX_NUM) | ||
_flt = FreqLimiter(30) | ||
|
||
try: | ||
with open(conf_path, 'rb') as f: | ||
user_conf_dic = pickle.load(f) | ||
except FileNotFoundError: | ||
user_conf_dic = {} | ||
|
||
@sv.on_prefix(('接头霸王', '接头')) | ||
async def concat_head(bot: HoshinoBot, ev: CQEvent): | ||
uid = ev.user_id | ||
if not _nlt.check(uid): | ||
await bot.finish(ev, '今日已经到达上限!') | ||
|
||
if not _flt.check(uid): | ||
await bot.finish(ev, '太频繁了,请稍后再来') | ||
|
||
url = extract_url_from_event(ev) | ||
if not url: | ||
await bot.finish(ev, '请附带图片!') | ||
url = url[0] | ||
await bot.send(ev, '请稍等片刻~') | ||
|
||
_nlt.increase(uid) | ||
_flt.start_cd(uid, 30) | ||
|
||
# download picture and generate base64 str | ||
# b百度人脸识别api无法使用QQ图片服务器的图片,所以使用base64 | ||
async with aiohttp.ClientSession() as session: | ||
async with session.get(url) as resp: | ||
cont = await resp.read() | ||
b64 = (base64.b64encode(cont)).decode() | ||
img = Image.open(BytesIO(cont)) | ||
|
||
face_data_list = await detect_face(b64) | ||
#print(face_data_list) | ||
if not face_data_list: | ||
await bot.finish(ev, '未检测到人脸信息') | ||
|
||
uid = ev.user_id | ||
head_name = user_conf_dic.get(uid, 'auto') | ||
output = '' ###### | ||
head_gener = gen_head() | ||
for dat in face_data_list: | ||
if head_name == 'auto': | ||
#head = auto_head(dat) | ||
head = head_gener.__next__() | ||
else: | ||
head = KyaruHead.from_name(head_name) | ||
output = concat(img, head, dat) | ||
pic = R.image_from_memory(output) | ||
#print(pic) | ||
await bot.send(ev, pic) | ||
|
||
|
||
@sv.on_prefix('选头') | ||
async def choose_head(bot: HoshinoBot, ev: CQEvent): | ||
global user_conf_dic | ||
uid = ev.user_id | ||
head_name = ev.raw_message.strip(ev.prefix) | ||
if head_name == 'auto': | ||
user_conf_dic[uid] = 'auto' | ||
with open(conf_path, 'wb') as f: | ||
pickle.dump(user_conf_dic, f) | ||
await bot.finish(ev, '已切换为自动选头') | ||
|
||
user_conf_dic[uid] = head_name | ||
if not KyaruHead.exist_head(head_name): | ||
await bot.finish(ev, '没有这个头哦~') | ||
with open(conf_path, 'wb') as f: | ||
pickle.dump(user_conf_dic, f) | ||
head = KyaruHead.from_name(head_name) | ||
await bot.send(ev, f'猫猫头已经切换为{head.cqcode}') | ||
|
||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
CLIENT_ID = 'dkGndecuTLxEq7NvePAz0eQk' | ||
CLIENT_SECRET = 'MweFIf6DeqGANH7azo0gWAFjmtAONN1B' | ||
DAILY_MAX_NUM = 5 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
import json | ||
import requests | ||
from datetime import datetime, timedelta | ||
from math import sqrt | ||
from os import path, listdir | ||
from random import choice, randint, shuffle | ||
|
||
import aiohttp | ||
from PIL import Image | ||
|
||
from .config import CLIENT_ID, CLIENT_SECRET | ||
from .._util import load_config | ||
from .._res import Res as R | ||
|
||
class KyaruHead: | ||
def __init__(self, head_name, angle, face_width, chin_tip_x, chin_tip_y, cqcode) -> None: | ||
self.head_name = head_name | ||
self.angle = angle | ||
self.face_width = face_width | ||
self.X = chin_tip_x | ||
self.Y = chin_tip_y | ||
self.cqcode = cqcode | ||
|
||
@property | ||
def img(self): | ||
head = path.join(path.dirname(__file__), 'head', self.head_name, f'{self.head_name}.png') | ||
return Image.open(head) | ||
|
||
@classmethod | ||
def from_name(cls, head_name): | ||
dat_path = path.join(path.dirname(__file__), 'head', head_name, 'dat.json') | ||
pic_path = path.join(path.dirname(__file__), 'head', head_name, f'{head_name}.png') | ||
dat = load_config(dat_path) | ||
cqcode = R.image(pic_path) | ||
return cls(head_name, dat['angle'], dat['face_width'], dat['chin_tip_x'], dat['chin_tip_y'], cqcode) | ||
|
||
@staticmethod | ||
def exist_head(head_name): | ||
if not path.exists(path.join(path.dirname(__file__), 'head', head_name)): | ||
return False | ||
if not path.exists(path.join(path.dirname(__file__), 'head', head_name, f'{head_name}.png')): | ||
return False | ||
return True | ||
|
||
@classmethod | ||
def rand_head(cls): | ||
heads = [] | ||
for i in listdir(path.join(path.dirname(__file__), 'head')): | ||
if path.isdir(path.join(path.dirname(__file__), 'head', i)): | ||
heads.append(i) | ||
return cls.from_name(choice(heads)) | ||
|
||
|
||
def auto_head(face_dat: dict) -> KyaruHead: | ||
# TODO | ||
return KyaruHead.from_name(str(randint(1, 3))) | ||
|
||
def gen_head(): | ||
heads = [] | ||
for i in listdir(path.join(path.dirname(__file__), 'head')): | ||
if path.isdir(path.join(path.dirname(__file__), 'head', i)): | ||
heads.append(i) | ||
shuffle(heads) | ||
for head in heads: | ||
yield KyaruHead.from_name(head) | ||
|
||
def get_token() -> str: | ||
grant_type = 'client_credentials' | ||
client_id = CLIENT_ID | ||
client_secret = CLIENT_SECRET | ||
url = 'https://aip.baidubce.com/oauth/2.0/token' | ||
params = {'grant_type':grant_type,'client_id':client_id,'client_secret':client_secret} | ||
with requests.get(url, params) as resp: | ||
data = resp.json() | ||
token = data['access_token'] | ||
expire_time = datetime.now() + timedelta(seconds=data['expires_in']) | ||
return token, expire_time | ||
|
||
token = get_token() | ||
|
||
async def detect_face(imgb64: str) -> dict: | ||
global token | ||
if not token or datetime.now() > token[1]: | ||
token = get_token() | ||
api_url = f'https://aip.baidubce.com/rest/2.0/face/v3/detect?access_token={token[0]}' | ||
data = { | ||
"image" : imgb64, | ||
"image_type" : "BASE64", | ||
"face_field" : "face_type,eye_status,landmark", | ||
"max_face_num" : 3 | ||
} | ||
async with aiohttp.ClientSession() as session: | ||
async with session.post(api_url, data = data) as resp: | ||
text = await resp.text() | ||
data = json.loads(text) | ||
if data['error_msg'] == 'SUCCESS': | ||
face_data_list = [] | ||
for face in data['result']['face_list']: | ||
left = face['location']['left'] | ||
top = face['location']['top'] | ||
location = (left, top, left + face['location']['width'], top + face['location']['height']) | ||
l_eye_pos = face['landmark'][0]['x'], face['landmark'][0]['y'] | ||
r_eye_pos = face['landmark'][1]['x'], face['landmark'][1]['y'] | ||
face_data_list.append({ | ||
"location" : location, | ||
"left_eye" : l_eye_pos, | ||
"right_eye" : r_eye_pos, | ||
"rotation" : face['location']['rotation'], | ||
"landmark72" : face['landmark72'] | ||
}) | ||
return face_data_list | ||
else: | ||
return None | ||
|
||
def distance_between_point(p1, p2): | ||
if isinstance(p1, dict): | ||
p1 = (p1['x'], p1['y']) | ||
if isinstance(p2, dict): | ||
p2 = (p2['x'], p2['y']) | ||
return sqrt((p1[0]-p2[0])**2 + (p2[1]-p2[1])**2) | ||
|
||
def concat(img: Image.Image, head: KyaruHead, face_dat) -> Image.Image: | ||
lm = face_dat['landmark72'] | ||
scale = distance_between_point(lm[0], lm[12])/head.face_width/0.97 # 设置缩放, 0.97为系数,无实际意义 | ||
w, h = head.img.size | ||
head_img = head.img.resize((int(scale*w), int(scale*h)), Image.BILINEAR) | ||
head_img = head_img.rotate(-face_dat['rotation'] - head.angle, center=(head.X*scale, head.Y*scale), resample=Image.BILINEAR) | ||
img.paste(head_img, (int(lm[6]['x']-head.X*scale), int(lm[6]['y']-head.Y*scale)), mask=head_img.split()[3]) | ||
return img |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
{ | ||
"angle" : 7.7, | ||
"face_width" : 97, | ||
"chin_tip_x" : 430, | ||
"chin_tip_y" : 400 | ||
} |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
{ | ||
"angle" : -9, | ||
"face_width" : 68, | ||
"chin_tip_x" : 415, | ||
"chin_tip_y" : 375 | ||
} |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
{ | ||
"angle" : 0, | ||
"face_width" : 99, | ||
"chin_tip_x" : 425, | ||
"chin_tip_y" : 410 | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
安装说明: | ||
1. 注册百度云账号,并开通人脸识别应用 | ||
2. 将得到的CLIENT_ID CLIENT_SECRET填入config.py | ||
|
||
注意:百度的人脸识别api对二次元人物识别准确度欠佳,所以插件有时会检测不到人脸或者出现 | ||
生草接头(也算是一种乐趣吧) | ||
——by 倚栏待月 |
Empty file.