Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge #9

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@ etlpy是基于配置文件的数据采集和清洗工具。

写爬虫和数据清洗代码总是很烦人。因此,应该通过工具生成爬虫和数据清洗的代码! etlpy就是为了解决这个问题而生的。

通过可视化和图形化设计工具,快速生成爬虫和数据清洗流程,并保存为xml文件,并由etlpy引擎解析它,即可获得最终的数据结果。
通过可视化和图形化设计工具(Hawk),快速生成爬虫和数据清洗流程,并保存为xml文件,并由etlpy引擎解析它,即可获得最终的数据结果。

##2.使用
使用起来非常简单:
使用起来非常简单执行main.py文件):
```
from etl import ETLTool
tool = ETLTool();
tool.LoadProject('project.xml', '数据清洗ETL-大众点评');
datas = tool.RefreshDatas();
for r in datas:
print(r)
from classInit import projectLoad
from classInit.projectExecutor import projExecute

path = 'xmlFile'
project = projectLoad.Project_LoadXml(path + '/demo.xml')
print(project.modules)
proj = projExecute(project)
t = proj.projectFunction()

```
RefreshDatas函数返回的是生成器,通过for循环,即可自动读取所有数据。

##3.基本原理
模块分为 生成,过滤,排序,转换,执行四种。

利用Python的生成器,可以将不同模块组织起来,定义一个流水线,数据(python的字典)会在流水线上被加工和消费。

图形化工具是用C#开发的,使用了类似Python生成器的Linq技术。其原始思路来自于Lisp的s-表达式。

##4. 用途
爬虫,计算,清洗,任何符合一定计算范式的数据,都可以使用它来完成。
29 changes: 29 additions & 0 deletions classInit/ETLTask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# -*- encoding: utf-8 -*-
"""
@File : ETLTask.py
@Time : 19/8/2019 08:57
@Author : liyang

数据清洗模块
"""


class ETLTask():
'''SmartETLTool(数据清洗)的子任务

'''

def __init__(self):
self.AllETLTools = []


class ETLTool():
def __init__(self):
self.Enabled = True
self.Column = ''

def process(self, data):
return data

def init(self):
pass
91 changes: 91 additions & 0 deletions classInit/ETLTool/Executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# -*- encoding: utf-8 -*-
"""
@File : Executor.py
@Time : 19/8/2019 08:59
@Author : liyang

执行器
"""
import sys

sys.path.append('../')
from classInit.ETLTask import ETLTool


class Executor(ETLTool):
'''ETLTool的执行类组(在xml文件中为Group="Executor")

'''

def execute(self, data):
pass

def process(self, data):
for r in data:
self.execute(r)
yield r


def create(item):
'''
类实列化
:param item: 待实例化的类名
:return: 实例化后的类(对象)
'''
return eval('%s()' % item)


class EtlEX(Executor):
pass
# def execute(self, datas):
# subetl = self.__proj__.modules[self.ETLSelector]
# for data in datas:
# if spider.IsNone(self.NewColumn):
# doc = data.copy()
# else:
# doc = {}
# extends.MergeQuery(doc, data, self.NewColumn + " " + self.Column)
# result = (r for r in generate(subetl.AllETLTools, [doc]))
# count = 0
# for r in result:
# count += 1
# print(r)
# print(count)
# yield data


class TableEX(Executor):
'''写入数据表,将数据保存为EXCEL

'''
pass
# def __init__(self):
# super(TableEX, self).__init__()
# self.Table = 'Table'
#
# def execute(self, data):
# tables = self.__proj__.tables
# tname = self.Table
# if tname not in tables:
# tables[tname] = []
# for r in data:
# tables[tname].append(r)
# yield r


class SaveFileEX(Executor):
def __init__(self):
super(SaveFileEX, self).__init__()
self.SavePath = ''

def execute(self, data):
pass
# save_path = extends.Query(data, self.SavePath)
# (folder, file) = os.path.split(save_path)
# if not os.path.exists(folder):
# os.makedirs(folder)
# urllib.request.urlretrieve(data[self.Column], save_path)


class DbEX(Executor):
pass
97 changes: 97 additions & 0 deletions classInit/ETLTool/Filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# -*- encoding: utf-8 -*-
"""
@File : Filter.py
@Time : 19/8/2019 08:59
@Author : liyang
"""
import sys

sys.path.append('../')
from classInit.ETLTask import ETLTool
import re


class Filter(ETLTool):
'''ETLTool的过滤类组(在xml文件中为Group="Filter")

'''

def __init__(self):
super(Filter, self).__init__()
self.Revert = False

def filter(self, data):

return True

def process(self, data):
for r in data:
item = None
if self.Column in r:
item = r[self.Column]
if item is None and self.__class__ != NullFT:
continue
result = self.filter(item)
if result == True and self.Revert == False:
yield r
elif result == False and self.Revert == True:
yield r


def create(item):
'''
类实列化
:param item: 待实例化的类名
:return: 实例化后的类(对象)
'''
return eval('%s()' % item)


class RegexFT(Filter):

def init(self):
self.Regex = re.compile(self.Script)
self.Count = 1

def filter(self, data):
v = self.Regex.findall(data)
if v is None:
return False
else:
return self.Count <= len(v)


class RangeFT(Filter):

def filter(self, item):
f = float(item)
return self.Min <= f <= self.Max


class RepeatFT(Filter):

def init(self):
self.set = set()

def filter(self, data):
if data in self.set:
return False
else:
self.set.add(data)
return True


class NullFT(Filter):
'''空对象过滤器

'''
def filter(self, data):
if data is None:
return False
if isinstance(data, str):
return data.strip() != ''
return True


class NumRangeFT(Filter):
pass
122 changes: 122 additions & 0 deletions classInit/ETLTool/Generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# -*- encoding: utf-8 -*-
"""
@File : Generator.py
@Time : 19/8/2019 08:55
@Author : liyang

生成器
"""

import sys

sys.path.append('../')
from classInit.ETLTask import ETLTool


class Generator(ETLTool):
'''ETLTool的生成类组(在xml文件中为"Group="Generator")

生成数据清洗的工具。eg.ETLTool类型为RangeGE
'''

def __init__(self):
# 继承父类的初始化
super(Generator, self).__init__()
# 初始化合并类型为append
self.MergeType = 'Append'
# 初始化位置为0
self.Position = 0

def generate(self, generator):
pass

def process(self, generator):
pass
# if generator is None:
# return self.generate(None)
# else:
# if self.MergeType == 'Append':
# return extends.Append(generator, self.process(None))
# elif self.MergeType == 'Merge':
# return extends.Merge(generator, self.process(None))
# else:
# return extends.Cross(generator, self.generate)


def create(item):
'''
类实列化
:param item: 待实例化的类名
:return: 实例化后的类(对象)
'''
return eval('%s()' % item)


class RangeGE(Generator):
'''数据清洗任务中数据清洗工具(ETLTool)类型(Type)为RangeGE的处理。

继承于生成类,生成区间数
'''

def __init__(self):
super(RangeGE, self).__init__()
# 初始化设置间隔数
self.Interval = '1'
# 初始化设置数的最大值
self.MaxValue = '1'
# 初始化最小值
self.MinValue = '1'

def generate(self):
items = []
# 生成由最小值到最大值,间隔为Interval的int序列
interval = int(self.Interval)
maxvalue = int(self.MaxValue)
minvalue = int(self.MinValue)
# 包括最大值
for i in range(minvalue, maxvalue + 1, interval):
item = {self.Column: round(i, 5)}
items.append(item)
return items
# yield item


class EtlGE(Generator):
'''子任务生成

'''
# def generate(self, data):
# subetl = self.__proj__.modules[self.ETLSelector]
# for r in generate(subetl.AllETLTools):
# yield r
pass


class TextGE(Generator):
'''从文本生成。

直接导入url,若导入url必须有'https://'或'http://'
'''

def __init__(self):
super(TextGE, self).__init__()
self.Content = ''
def generate(self):
result = []
self.arglists = [r.strip() for r in self.Content.split('\n')]
for i in range(self.Position, len(self.arglists)):
result.append({self.Column: self.arglists[i]})
return result
# yield


class BfsGE(Generator):
pass


class FolderGE(Generator):
pass


class TableGE(Generator):
pass
Loading