Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add headers to prevent "<urlopen error [Errno 111] Connection refused>" and format code. #12

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions tutorials/queuebaseddataingestion/data_producer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'''
Simple example for studying queue-based data ingestion
Simple example for studying queue-based data ingestion.
'''
import os
from rq import Queue
Expand All @@ -9,9 +9,8 @@
import time
import argparse
from dotenv import load_dotenv

load_dotenv()
'''
'''
parser = argparse.ArgumentParser()
parser.add_argument('--uri', help='the URI of the dataset')
parser.add_argument('--queuename', help='a queue name')
Expand All @@ -26,13 +25,13 @@
Make a simple task queue.
You can try to design different names, priorities, etc.
'''
queue_name=args.queuename # make a simple queue name
q = Queue(queue_name,connection=redis)
queue_name = args.queuename # make a simple queue name
q = Queue(queue_name, connection=redis)

'''
Just call a single job
Just call a single job.
You can try to study how to schedule the jobs, etc.
If you call many jobs, you have to manage the queues and the way to receive the result
If you call many jobs, you have to manage the queues and the way to receive the result.
'''
job = q.enqueue(ingest_csv_file, args.uri)
'''
Expand Down
32 changes: 22 additions & 10 deletions tutorials/queuebaseddataingestion/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,47 @@
import tempfile
import csv
import urllib.request
#import urllib.request.urlretrieve
import socket

socket.setdefaulttimeout(5)

'''
this task assume that we get an CSV and store into a database.
This task assumes that we get a CSV and store it in a database.

However, we do not implement any database task. you can just do it.
However, we do not implement a database task, you can just do it.

Note that the assumption is that we must be able to get the file.
'''


def ingest_csv_file(url):
'''
The assumpsion is to stored the file temporary in the same directory
It is assumed that the file is stored in a new temporary directory.
'''
try:
basename = os.path.basename(url)
input_file=tempfile.mkdtemp()+'/'+basename
print("Log: ",url)
input_file = tempfile.mkdtemp() + '/' + basename
print("Log: ", url)
print("Copy data to temp file")
urllib.request.urlretrieve(url,input_file)

'''
Add headers to prevent "<urlopen error [Errno 111] Connection refused>"
'''
opener = urllib.request.build_opener()
opener.addheaders = [('User-agent', 'Mozilla/5.0')]
urllib.request.install_opener(opener)
urllib.request.urlretrieve(url, input_file)

with open(input_file, encoding='utf-8') as csv_file:
print("Process csv data")
'''
No store operation has been implemented. You can implement the operation with common databases.
You can laso just store data into files and let other components to do the storing.
You can also just store data into files and let other components to do the storing.
'''
csvReader = csv.DictReader(csv_file)
for row in csvReader:
print(row)
return {"url":url,"result":"OK"}
return {"url": url, "result": "OK"}
except Exception as err:
print(err)
return {"url":url,"error":str(err)}
return {"url": url, "error": str(err)}
9 changes: 6 additions & 3 deletions tutorials/queuebaseddataingestion/test_ingesttask.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from task import ingest_csv_file


def test_ingestion():
msg=ingest_csv_file("https://raw.githubusercontent.com/rdsea/IoTCloudSamples/master/data/bts/alarm-2017-10-23-12-vn.csv")
assert msg['url'] == 'https://raw.githubusercontent.com/rdsea/IoTCloudSamples/master/data/bts/alarm-2017-10-23-12-vn.csv'
assert msg['result'] =='OK'
msg = ingest_csv_file(
"https://raw.githubusercontent.com/rdsea/IoTCloudSamples/master/data/bts/alarm-2017-10-23-12-vn.csv")
assert msg[
'url'] == 'https://raw.githubusercontent.com/rdsea/IoTCloudSamples/master/data/bts/alarm-2017-10-23-12-vn.csv'
assert msg['result'] is 'OK'