Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Switzerland's CSIF funds #69

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion beanprice/price.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from beancount.ops import find_prices

from beanprice import date_utils
from beanprice.source import MissingDate
import beanprice


Expand Down Expand Up @@ -589,7 +590,11 @@ def fetch_price(dprice: DatedPrice, swap_inverted: bool = False) -> Optional[dat
source = psource.module.Source()
except AttributeError:
continue
srcprice = fetch_cached_price(source, psource.symbol, dprice.date)
try:
srcprice = fetch_cached_price(source, psource.symbol, dprice.date)
except MissingDate:
logging.debug("Missing date {} for symbol {}".format(dprice.date, psource.symbol))
return None
if srcprice is not None:
break
else:
Expand Down
4 changes: 4 additions & 0 deletions beanprice/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
('quote_currency', Optional[str])])


class MissingDate(BaseException):
"""An attempt to read a missing date, ignore and continue"""


class Source:
"""Interface to be implemented by all price sources.

Expand Down
134 changes: 134 additions & 0 deletions beanprice/sources/csif.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""A source fetching prices and exchangerates from https://amfunds.credit-suisse.com

Valid tickers for prices are in the form "IBAN", such as "CH0031341875".

Here is the API documentation:
https://www.alphavantage.co/documentation/

Example:
https://amfunds.credit-suisse.com/ch/de/institutional/fund/history/CH0031341875

Based on: https://github.com/buchen/portfolio/blob/effa5b7baf9a918e1b5fe83942ddc480e0fd48b9/name.abuchen.portfolio/src/name/abuchen/portfolio/online/impl/CSQuoteFeed.java

"""

from decimal import Decimal
from typing import Optional
from dateutil.tz import tz
from dateutil.parser import parse
import logging
import subprocess
from beanprice import source
from pathlib import Path


class CsifApiError(ValueError):
"""An error from the CSIF API."""


def _fetch_response(ticker):
# Download file, or was it cached?
filename = '/tmp/beanprice_csif_{}.html'.format(ticker)
path = Path(filename)
if not path.is_file():
logging.debug('Fetching data from server for ticker {}'.format(ticker))
# Fetch the HTML workbook, we have to use curl, see PortfolioPerformance documentation
link = 'https://amfunds.credit-suisse.com/ch/de/institutional/fund/history/{}?currency=CHF'.format(ticker)
try:
response = subprocess.check_output(['curl', '-s', link]).decode("utf-8")
except BaseException as e:
raise CsifApiError('Error connecting to server on URL {}'.format(link))

# Save to file for future access
with open(path, "w") as text_file:
text_file.write(response)
else:
# Read the response from text file
logging.debug('Retrieving cached data for ticker {}'.format(ticker))
with open(path, "r") as text_file:
response = text_file.read()

# Find first occurrence of HTML tag "<td>IBAN</td>"
pos = response.find('<td>{}</td>'.format(ticker))
if pos < 0:
raise CsifApiError('Ticker {} not fund'.format(ticker))
pos = pos + 4

# Next occurrence of "<td>": security number
pos = pos + response[pos:].find('<td>') + 4
end_pos = pos + response[pos:].find('</td>')
sec_number = response[pos:end_pos]

# Next occurrence of "<td>": currency
pos = pos + response[pos:].find('<td>') + 4
end_pos = pos + response[pos:].find('</td>')
currency = response[pos:end_pos]
logging.debug('Ticker {} data loaded: sec. number {}, currency {}'.format(
ticker,
sec_number,
currency
))

return response, currency, sec_number


class Source(source.Source):

def get_latest_price(self, ticker) -> Optional[source.SourcePrice]:
# Fetch data
response, currency, sec_number = _fetch_response(ticker)

# Find first occurrence of security number
pos = response.find('<td>{}</td>'.format(sec_number))
if pos < 0:
return None
pos = pos + 4

# Next two occurrences of HTML tags "<td>" and "</td>"
pos = pos + response[pos:].find('<td>') + 4
pos = pos + response[pos:].find('<td>') + 4
end_pos = pos + response[pos:].find('</td>')

# Parse date
date_str = response[pos:end_pos]
logging.debug('Date: {}'.format(date_str))
date = parse(date_str).replace(tzinfo=tz.gettz('Europe/Zurich'))

# Next occurrence of HTML tags "<td>" and "</td>"
pos = pos + response[pos:].find('<td>') + 4
end_pos = pos + response[pos:].find('</td>')

# Parse value
logging.debug('Price: {}'.format(response[pos:end_pos]))
price = Decimal(response[pos:end_pos])

logging.debug('Latest price: {} {}, {}'.format(price, currency, date_str))
return source.SourcePrice(price, date, currency)

def get_historical_price(self, ticker, time) -> Optional[source.SourcePrice]:
# Fetch data
response, currency, sec_number = _fetch_response(ticker)

# Find relevant date
date_str = time.strftime("%d.%m.%Y")
find_str = '<td>' + date_str + '</td>'
pos = response.find(find_str)

# Found?
if pos < 0:
# It can happen that a date is missing
raise source.MissingDate
pos = pos + 14

# Next occurrences of HTML tags "<td>" and "</td>"
pos = pos + response[pos:].find('<td>') + 4
end_pos = pos + response[pos:].find('</td>')

# Parse value
try:
price = Decimal(response[pos:end_pos])
except BaseException as e:
raise CsifApiError('Error parsing price {} for date {}'.format(response[pos:end_pos], date_str))

logging.debug('Historical price: {} {}, {}'.format(price, currency, date_str))
return source.SourcePrice(price, time, currency)
50 changes: 50 additions & 0 deletions beanprice/sources/csif_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import unittest
from unittest import mock
import datetime
from dateutil import tz
from dateutil.parser import parse

from decimal import Decimal
from beanprice.sources import csif
from beanprice.source import MissingDate, SourcePrice


def _fetch_response(contents):
"""Return a context manager to patch a JSON response."""
response = mock.Mock()
response.status_code = 404
response.text = ""
response.json.return_value = contents
return mock.patch('subprocess.check_output', return_value=response)


class CsifPriceFetcher(unittest.TestCase):
def test_error_invalid_ticker(self):
with self.assertRaises(csif.CsifApiError):
csif.Source().get_latest_price('INVALID')

def test_error_invalid_date(self):
with self.assertRaises(MissingDate):
csif.Source().get_historical_price('CH0030849712', parse("2050-01-01"))

def test_valid_response(self):
data = '' \
'<some gibber>' \
' <td>CH0030849712</td>' \
' <td>3084971</td>' \
' <td>USD</td> ' \
' <td>21.03.2022</td>' \
' <td>3037.75000</td>' \
'<more gibber>'

with _fetch_response(data):
srcprice: SourcePrice = csif.Source().get_latest_price('CH0030849712')
self.assertIsInstance(srcprice, SourcePrice)
self.assertEqual(Decimal('3037.75000'), srcprice.price)
self.assertEqual('USD', srcprice.quote_currency)
self.assertEqual(datetime.datetime(2022, 3, 21, 0, 0, 0, tzinfo=tz.gettz('Europe/Zurich')), srcprice.time)
pass


if __name__ == '__main__':
unittest.main()
99 changes: 99 additions & 0 deletions beanprice/sources/sfd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""
A source fetching prices from https://www.swissfunddata.ch/sfdpub/investment-funds

"""
import datetime
from decimal import Decimal
from typing import Optional, Dict
from csv import DictReader
from dateutil.tz import tz
from dateutil.parser import parse
import logging
import subprocess
from beanprice import source
from pathlib import Path


class SfdApiError(ValueError):
"""An error from the SCIF API."""


def _fetch_prices(fund_id: str) -> (Dict[str, source.SourcePrice], source.SourcePrice):
# Download file, or was it cached?
filename = '/tmp/beanprice_sfd_{}.csv'.format(fund_id)
path = Path(filename)
if not path.is_file():
logging.debug('Fetching data from server for fund {}'.format(fund_id))
# Fetch the HTML workbook, we have to use curl, see PortfolioPerformance documentation
link = 'https://www.swissfunddata.ch/sfdpub/en/funds/excelData/{}'.format(fund_id)
try:
response = subprocess.check_output(['curl', '-s', link]).decode("utf-8")
except BaseException as e:
raise SfdApiError('Error connecting to server on URL {}'.format(link))

# Save to file for future access
with open(path, "w") as text_file:
text_file.write(response)

# Read CSV file
prices: Dict[str, source.SourcePrice] = dict()
latest_price = None

with open(filename, 'r', encoding='utf8') as csvfile:
reader = DictReader(
csvfile,
fieldnames=[
"date",
"ccy",
"price",
],
delimiter=";"
)

# This skips the first row of the CSV file.
next(reader)
next(reader)
next(reader)

for row in reader:
the_date = parse(row["date"]).replace(tzinfo=tz.gettz('Europe/Zurich'))
key = the_date.strftime("%Y%m%d")
latest_price = source.SourcePrice(
Decimal(row["price"].replace("'", '')),
the_date,
row["ccy"].strip()
)
prices[key] = latest_price

return prices, latest_price


class Source(source.Source):

def get_latest_price(self, ticker) -> Optional[source.SourcePrice]:
# Fetch data
_, latest_price = _fetch_prices(ticker)

logging.debug('Latest price: {} {}, {}'.format(
latest_price.price,
latest_price.time,
latest_price.quote_currency
))
return latest_price

def get_historical_price(self, ticker, time) -> Optional[source.SourcePrice]:
# Fetch data
prices, _ = _fetch_prices(ticker)

# Find relevant date
key = time.strftime("%Y%m%d")
if key not in prices:
return None
else:
the_price = prices[key]
logging.debug('Historical price: {} {}, {}'.format(
the_price.price,
the_price.time,
the_price.quote_currency
))
return the_price