Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hallo! #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 84 additions & 26 deletions amps/emotiv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,71 @@

from Crypto.Cipher import AES
import numpy as np
import sys,traceback

import usb.core
import usb.util
try:
import pywinusb.hid as hid
windows = True
except:
windows = False
import usb.core
import usb.util



from amplifier import Amplifier


VENDOR_ID = 0x1234
PRODUCT_ID = 0xed02

ENDPOINT_IN = usb.util.ENDPOINT_IN | 2 # second endpoint
if not windows:
ENDPOINT_IN = usb.util.ENDPOINT_IN | 2 # second endpoint



class Epoc(Amplifier):

def __init__(self):
# find amplifier
self.dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID)

# the data is read by a data handler, and stored in this queue
if windows:
self.packets = []

# find amplifier
if(not windows):
self.dev = usb.core.find(idVendor=VENDOR_ID, idProduct=PRODUCT_ID)
else:
filter = hid.HidDeviceFilter(product_name='Emotiv RAW DATA')
devices = filter.get_devices()
assert len(devices) > 0
self.dev = devices[0]
self.dev.open()

def handle(data):
assert data[0] == 0
self.packets.append(''.join(map(chr, data[1:])))

self.dev.set_raw_data_handler(handle)
serial = self.dev.serial_number

if self.dev is None:
raise RuntimeError('Emotiv device is not connected.')
# pyusb docs say you *have* to call set_configuration, but it does not
# work unless i *don't* call it.
#dev.set_configuration()
# get the serial number
serial = usb.util.get_string(self.dev, 17, self.dev.iSerialNumber)
# claim the device and it's two interfaces
if self.dev.is_kernel_driver_active(0):
self.dev.detach_kernel_driver(0)
usb.util.claim_interface(self.dev, 0)
if self.dev.is_kernel_driver_active(1):
self.dev.detach_kernel_driver(1)
usb.util.claim_interface(self.dev, 1)

if(not windows):
# pyusb docs say you *have* to call set_configuration, but it does not
# work unless i *don't* call it.
#dev.set_configuration()
# get the serial number
serial = usb.util.get_string(self.dev, 17, self.dev.iSerialNumber)
# claim the device and it's two interfaces
if self.dev.is_kernel_driver_active(0):
self.dev.detach_kernel_driver(0)
usb.util.claim_interface(self.dev, 0)
if self.dev.is_kernel_driver_active(1):
self.dev.detach_kernel_driver(1)
usb.util.claim_interface(self.dev, 1)

# prepare AES
self.cipher = AES.new(self.generate_key(serial, True))
# internal states for battery and impedance we have to store since it
Expand All @@ -52,16 +85,31 @@ def __init__(self):
'Quality T8', 'Quality F8', 'Quality AF4',
'Quality FC6', 'Quality F4']

def get_quality(self):
return self._quality

def get_data(self):

raw = None

try:
raw = self.dev.read(ENDPOINT_IN, 32, 1, timeout=1000)
raw = self.decrypt(raw)
data = self.parse_raw(raw)
data = np.array(data)
if not windows:
raw = self.dev.read(ENDPOINT_IN, 32, 1, timeout=1000)
else:
if(len(self.packets)):
raw = self.packets.pop(0)

if raw is not None:
raw = self.decrypt(raw)
data = self.parse_raw(raw)
data = np.array(data)
return data.reshape(1, -1)

except Exception as e:
print e
data = np.array()
return data.reshape(1, -1)

return None

def generate_key(self, sn, research=True):
"""Generate the encryption key.
Expand All @@ -85,10 +133,12 @@ def generate_key(self, sn, research=True):
def decrypt(self, raw):
"""Decrypt a raw package."""
data = self.cipher.decrypt(raw[:16]) + self.cipher.decrypt(raw[16:])

tmp = 0
for i in range(32):
tmp = tmp << 8
tmp += ord(data[i])

return tmp

def parse_raw(self, raw):
Expand All @@ -100,18 +150,20 @@ def parse_raw(self, raw):
# otherwise the remaining bits are the battery
shift -= 8
tmp = (raw >> shift) & 0b11111111
#print("bat=%X" % tmp)
if tmp & 0b10000000:
# battery
counter = 128
self._battery = tmp & 0b01111111
self.curren_battery = tmp & 0b01111111
else:
# counter
counter = tmp & 0b01111111

data.append(counter)
data.append(self._battery)
# 7x data (14 bit)
for i in range(7):
shift -= 14
shift -= 14
data.append((raw >> shift) & 0b11111111111111)
# 1x quality (14 bit)
# we assume it is the contact quality for an electrode, the counter
Expand Down Expand Up @@ -148,7 +200,13 @@ def parse_raw(self, raw):
print 'Reading...'
while 1:
try:
print amp.get_data()
d = amp.get_data()
if d is not None:
print amp.get_quality()

except Exception as e:
print e
print e
print("-"*60)
traceback.print_exc(file=sys.stdout)
print("-"*60)
break