# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
##
## Copyright (C) 2011 Async Open Source
##
## This program is free software; you can redistribute it and/or
## modify it under the terms of the GNU Lesser General Public License
## as published by the Free Software Foundation; either version 2
## of the License, or (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU Lesser General Public License for more details.
##
## You should have received a copy of the GNU Lesser General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., or visit: http://www.gnu.org/.
##
##
## Author(s): Stoq Team <stoq-devel@async.com.br>
##
##
"""
OFX importing
"""
import datetime
import decimal
import logging
import sgmllib
from storm.expr import And
from stoqlib.database.expr import Trim
from stoqlib.domain.account import Account, AccountTransaction
from stoqlib.importers.importer import Importer
from stoqlib.lib.parameters import sysparam
log = logging.getLogger(__name__)
[docs]class OFXTagParser(sgmllib.SGMLParser):
def __init__(self):
sgmllib.SGMLParser.__init__(self)
self.transactions = []
self.fi = None
self.account_id = None
self.account_type = None
self._is_statement = False
self._is_fi = False
self._is_account_id = False
self._is_account_type = False
self._tag = None
self._tags = {}
[docs] def unknown_starttag(self, tag, attrs):
self._tag = tag
if tag == u'stmttrn':
self._is_statement = True
elif tag == u'fi':
self._is_fi = True
elif tag == u'acctid':
self._is_account_id = True
elif tag == u'accttype':
self._is_account_type = True
[docs] def unknown_endtag(self, tag):
if tag == u'stmttrn':
self._is_statement = False
self.transactions.append(self._tags)
self._tags = {}
if tag == u'fi':
self._is_fi = False
self.fi = {u'org': self._tags[u'org'],
u'fid': self._tags[u'fid']}
self._tags = {}
[docs] def handle_data(self, data):
data = data.strip()
if not data:
return
if self._tag == u'acctid':
self.account_id = data
elif self._tag == u'accttype':
self.account_type = data
else:
self._tags[self._tag] = data
[docs]class OFXImporter(Importer):
"""Class to assist the process of importing ofx files.
"""
def __init__(self):
Importer.__init__(self)
self._headers = {}
#
# Public API
#
[docs] def feed(self, fp, filename='<stdin>'):
data = fp.read()
if '\r' in data:
data = data.replace('\r\n', '\n')
data = data.replace('\r', '\n')
lines = data.split('\n')
for i, line in enumerate(lines):
if not line:
continue
if line.startswith('<OFX>'):
self._parse_tags(lines[i:])
break
else:
header, value = line.split(':', 1)
self._headers[header] = value
def _parse_tags(self, data):
self.tp = OFXTagParser()
self.tp.feed('\n'.join(data))
def _parse_number(self, data):
data = data.strip()
data = data.replace(',', '.')
try:
number = decimal.Decimal(data)
except decimal.InvalidOperation:
log.info("Couldn't parse number: %r" % (data, ))
number = 0
return number
def _parse_string(self, data):
return unicode(data, self._headers['CHARSET'])
def _parse_date(self, data):
# BB Juridica: 20110207
# BB Fisica: 20110401120000[-3:BRT]
data = data.strip()
for length, format in [(14, '%Y%m%d%H%M%S'),
(8, "%Y%m%d")]:
short = data[:length]
try:
return datetime.datetime.strptime(
short, format)
except ValueError:
continue
log.info("Couldn't parse date: %r" % (data, ))
return None
[docs] def before_start(self, store):
account = store.find(Account,
code=unicode(self.tp.account_id)).one()
if account is None:
account = Account(description=self.get_account_id(),
code=unicode(self.tp.account_id),
account_type=Account.TYPE_BANK,
parent=sysparam.get_object(store, 'BANKS_ACCOUNT'),
store=store)
self.account_id = account.id
self.source_account_id = sysparam.get_object_id('IMBALANCE_ACCOUNT')
self.skipped = 0
[docs] def get_n_items(self):
return len(self.tp.transactions)
[docs] def process_item(self, store, i):
t = self.tp.transactions[i]
date = self._parse_date(t['dtposted'])
# Do not import transactions with broken dates
if date is None:
self.skipped += 1
return False
value = self._parse_number(t['trnamt'])
description = self._parse_string(t['memo'])
if value == 0:
self.skipped += 1
# We can't import transactions without a value = 0, skip it.
return False
elif value > 0:
operation_type = AccountTransaction.TYPE_IN
source_account = store.get(Account, self.source_account_id)
account = store.get(Account, self.account_id)
elif value < 0:
# Only register absolute values - Indicating positive/negative values,
# using the operation type.
value = abs(value)
operation_type = AccountTransaction.TYPE_OUT
source_account = store.get(Account, self.account_id)
account = store.get(Account, self.source_account_id)
code = self._parse_string(t['checknum'])
if not store.find(AccountTransaction,
date=date, code=code,
value=value).is_empty():
# Skip already present transactions
self.skipped += 1
return False
# TODO: Check if value and code are enough to consider a match.
existing = list(store.find(
AccountTransaction,
And(AccountTransaction.value == value,
Trim(u'LEADING', u'0', AccountTransaction.code) == code.lstrip('0'))))
if len(existing) == 1:
t = existing[0]
t.description = description
t.date = date
# Categorize the transaction if it was still on imbalance
if sysparam.compare_object('IMBALANCE_ACCOUNT', t.source_account):
t.source_account = source_account
if sysparam.compare_object('IMBALANCE_ACCOUNT', t.account):
t.account = account
else:
t = AccountTransaction(
store=store,
source_account=source_account,
account=account,
description=description,
code=code,
value=value,
date=date,
operation_type=operation_type)
store.flush()
return True
[docs] def when_done(self, store):
log.info("Imported %d transactions" % (len(self.tp.transactions), ))
if self.skipped:
log.info("Couldn't parse %d transactions" % (self.skipped, ))
[docs] def get_account_id(self):
if self.tp.fi:
return u'%s - %s' % (self.tp.fi['org'],
self.tp.account_type)
return unicode(self.tp.account_type)
if __name__ == '__main__': # pragma nocover
import sys
ofx = OFXImporter()
ofx.feed(sys.argv[1])
ofx.process()