forked from JeongHyeon-01/cwal_m
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcwal_encar_v2.py
222 lines (197 loc) · 10.3 KB
/
cwal_encar_v2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import os
import re
import json
import requests
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlencode
from db.pipeline import fetch_vehicle_data, upsert_vehicle_data
from db.pipeline import SessionLocal1
from db.total_query import get_maker
from db.pipeline import *
from db.models import CarsWithCategory
from utils.etc_filter import fuel_filter, color_filter
from utils.slack_utils import send_main_message, send_thread_messages
# 초기 설정
headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'ko,ko-KR;q=0.9,en-US;q=0.8,en;q=0.7',
'Connection': 'keep-alive',
'Host': 'api.encar.com',
'Origin': 'http://www.encar.com',
'Referer': 'http://www.encar.com/',
'User-Agent': 'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Mobile Safari/537.36'
}
base_url = "https://api.encar.com/search/car/list/general"
max_range = '120000'
current_year = datetime.now().year + 1
min_year = str(current_year - 6)
start_time = time.time()
# 메이커 데이터 가져오기
# makers = get_data_from_db_2(get_maker)
# maker_names = [data['name'] for data in makers]
maker_names = ['제네시스']
total_data = fetch_vehicle_data(maker_names)
for i in total_data:
print(i.get('model_name'))
# # 쿼리 생성 함수
# def generate_query_params_with_badge(base_url, maker_name, model_name, submodel_name, badge_group, badge, max_range, min_year, batch_size=200, start=0):
# domestic_makers = ['현대', '기아', '쉐보레(GM대우)', '르노코리아(삼성)', '제네시스']
# car_type = "C.CarType.Y" if maker_name in domestic_makers else "C.CarType.N"
# q = f"(And.Hidden.N._.SellType.일반._.Year.range({min_year}..)._.Mileage.range(..{max_range})._.({car_type}._.(C.Manufacturer.{maker_name}._.(C.ModelGroup.{model_name}._.(C.Model.{submodel_name}._.(C.BadgeGroup.{badge_group}._.Badge.{badge}.))))))"
# query = {
# "count": "true",
# "q": q,
# "sr": f"|MobileModifiedDate|{start}|{batch_size}",
# "inav": "|Metadata|Sort"
# }
# return f"{base_url}?{urlencode(query, safe='()')}"
# # API 호출 함수
# def fetch_api_data(url):
# try:
# response = requests.get(url, headers=headers)
# if response.status_code == 200:
# return response.json()
# else:
# print(f"Failed: {url}, Status Code: {response.status_code}")
# return None
# except Exception as e:
# print(f"Error fetching URL {url}: {e}")
# return None
# # 병렬 호출 함수
# def fetch_all_data(categories, max_workers=5):
# results = []
# with ThreadPoolExecutor(max_workers=max_workers) as executor:
# futures = []
# for category in categories:
# maker_name = category.get('maker_name', '').strip()
# model_name = category.get('model_name', '').strip()
# submodel_name = category.get('encar_sub_model_name', '')
# drive_fuel_type = category.get('drive_fuel_type', '')
# grade_name = category.get('encar_grade_name', '')
# batch_size = 200
# vehicle_total_count = category.get('vehicle_total_count', 0)
# pages = vehicle_total_count // batch_size + (1 if vehicle_total_count % batch_size else 0)
# for page in range(pages):
# start = page * batch_size
# url = generate_query_params_with_badge(base_url, maker_name, model_name, submodel_name, drive_fuel_type, grade_name, max_range, min_year, batch_size, start)
# futures.append(executor.submit(fetch_api_data, url))
# for future in as_completed(futures):
# result = future.result()
# if result:
# results.append(result)
# return results
# def fetch_details_from_urls(car_data):
# """
# 차량 데이터를 보완하기 위해 3개의 URL에서 상세 정보를 가져옵니다.
# """
# try:
# # URL 1: 차량 기본 상세 정보
# url1 = f"http://api.encar.com/v1/readside/vehicles/view?vehicleIds={car_data.encar_serial_number}"
# response1 = requests.get(url1, headers=headers)
# if response1.status_code == 200:
# details = response1.json()[0]
# # 사진 처리
# photos = details.get('photos', [])
# if photos:
# sorted_photos = sorted(photos, key=lambda x: int(x['code']))
# car_data.thumbnail = 'http://ci.encar.com' + sorted_photos[0]['path']
# car_data.color = color_filter(details.get('spec', {}).get('colorName', ''))
# car_data.transmission = details.get('spec', {}).get('transmissionName', '')
# car_data.displacement = f"{details.get('spec', {}).get('displacement', 0)}cc"
# # URL 2: 차량의 딜러 및 회사 정보
# url2 = f"http://api.encar.com/v1/readside/vehicle/{car_data.encar_serial_number}"
# response2 = requests.get(url2, headers=headers)
# if response2.status_code == 200:
# dealer_data = response2.json().get('partnership', {}).get('dealer', {})
# if dealer_data:
# car_data.encar_dealer_name = dealer_data.get('name', '')
# firm = dealer_data.get('firm', {})
# car_data.encar_company = firm.get('name', '') if firm else ''
# if firm and 'diagnosisCenters' in firm:
# diagnosis_center = firm['diagnosisCenters'][0]
# car_data.encar_complex = diagnosis_center.get('name', '')
# car_data.encar_complex_address = diagnosis_center.get('address', '')
# # URL 3: 차량 사고 기록 및 소유자 변경 정보
# url3 = f"https://api.encar.com/v1/readside/record/vehicle/{car_data.encar_serial_number}/open?vehicleNo={car_data.car_no}"
# response3 = requests.get(url3, headers=headers)
# if response3.status_code == 200:
# record_data = response3.json()
# if record_data:
# car_data.accident = str(record_data.get('myAccidentCnt', 0) + record_data.get('otherAccidentCnt', 0))
# car_data.change_owner = str(record_data.get('ownerChangeCnt', 0))
# except Exception as e:
# print(f"Error fetching details for {car_data.encar_serial_number}: {e}")
# # 데이터 처리
# def process_data(api_results, existing_car_no_set):
# """
# API 응답 데이터를 처리하고 3개의 URL에서 상세 조회를 수행한 후 데이터를 반환합니다.
# """
# car_data_list = []
# for data in api_results:
# search_data = data.get('SearchResults', [])
# for vehicle in search_data:
# try:
# # 일반 매물만 처리
# if vehicle.get('SellType') != '일반':
# continue
# car_no = vehicle.get('vehicleNo', '')
# if not car_no or car_no in existing_car_no_set:
# # 이미 존재하는 차량 번호는 무시
# continue
# # 차량 기본 데이터
# car_data = CarsWithCategory(
# encar_serial_number=vehicle.get('Id'), # 차량 고유 ID
# maker_name=vehicle.get('Manufacturer', ''), # 제조사
# model_name=vehicle.get('ModelGroup', ''), # 모델 그룹
# encar_submodel_name=vehicle.get('Model', ''), # 서브모델
# year=str(vehicle.get('Year', ''))[:4], # 연도
# month=str(vehicle.get('Year', ''))[-2:], # 월
# km=str(int(vehicle.get('Mileage', 0))), # 주행거리
# sell_price=str(int(vehicle.get('Price', 0))), # 판매 가격
# fuel=fuel_filter(vehicle.get('FuelType', '')), # 연료 타입
# region=vehicle.get('OfficeCityState', ''), # 지역
# # 추가 필드
# color=color_filter(vehicle.get('spec', {}).get('colorName', '')), # 색상
# transmission=vehicle.get('spec', {}).get('transmissionName', ''), # 변속기 타입
# displacement=f"{vehicle.get('spec', {}).get('displacement', 0)}cc", # 배기량
# accident="0", # 사고 여부 (기본값, 상세 조회에서 업데이트 가능)
# change_owner="0", # 소유자 변경 횟수 (기본값, 상세 조회에서 업데이트 가능)
# thumbnail=vehicle.get('photoUrl', ''), # 썸네일 이미지 URL
# car_type=vehicle.get('spec', {}).get('bodyName', ''), # 차량 유형
# options=[] # 옵션 리스트 (상세 조회에서 업데이트 가능)
# )
# # 상세 정보 조회 (3개 URL 호출)
# fetch_details_from_urls(car_data)
# # 중복 방지용 차량 번호 추가
# existing_car_no_set.add(car_no)
# # 처리된 데이터 추가
# car_data_list.append(car_data)
# except Exception as e:
# print(f"Error processing vehicle: {e}")
# continue
# return car_data_list
# # 크롤링 시작
# parent_ts = send_main_message(f"Encar Crawling Start | {datetime.now().strftime('%Y-%m-%d | %H:%M:%S')}")
# categories = total_data
# existing_car_no_set = set()
# api_results = fetch_all_data(categories, max_workers=10)
# processed_data = process_data(api_results)
# # 데이터베이스 저장
# session = SessionLocal1()
# for car_data in processed_data:
# print(car_data)
# # upsert_result = upsert_vehicle_data(session, car_data)
# # if upsert_result == "created":
# # print(f"Created: {car_data.car_no}")
# # elif upsert_result == "updated":
# # print(f"Updated: {car_data.car_no}")
# # 크롤링 종료
# end_time = time.time()
# total_time = end_time - start_time
# hours, remainder = divmod(total_time, 3600)
# minutes, seconds = divmod(remainder, 60)
# print(f"총 소요 시간: {int(hours)}시간 {int(minutes)}분 {seconds:.2f}초")
# send_main_message(f"Encar Crawling End | {datetime.now().strftime('%Y-%m-%d | %H:%M:%S')} | 총 소요 시간: {int(hours)}시간 {int(minutes)}분 {seconds:.2f}초")