Source code for stoqlib.lib.sintegragenerator

# -*- coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4

## Copyright (C) 2007 Async Open Source <>
## All rights reserved
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## GNU General Public License for more details.
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., or visit:
## Author(s): Stoq Team <>

"""Generate a Sintegra archive from the Stoqlib domain classes"""

import operator

from stoqlib.database.queryexecuter import DateIntervalQueryState

from stoqlib.database.queryexecuter import QueryExecuter
from stoqlib.database.runtime import get_current_branch, get_default_store
from stoqlib.domain.devices import FiscalDayHistory
from stoqlib.domain.inventory import Inventory
from stoqlib.domain.person import (Company,
from stoqlib.domain.receiving import ReceivingOrder
from import Sale
from stoqlib.lib.sintegra import SintegraFile, SintegraError
from stoqlib.lib.translation import stoqlib_gettext

_ = stoqlib_gettext

[docs]class StoqlibSintegraGenerator(object): """This class is responsible for generating a sintegra file from the Stoq domain classes. """ def __init__(self, store, start, end): = store self.start = start self.end = end self.sintegra = SintegraFile() self._add_header() self.sintegra.close()
[docs] def write(self, filename): self.sintegra.write(filename)
def _date_query(self, search_spec, column): sfilter = object() executer = QueryExecuter( executer.set_filter_columns(sfilter, [column]) executer.set_search_spec(search_spec) state = DateIntervalQueryState(filter=sfilter, start=self.start, end=self.end) return[state]) def _add_header(self): branch = get_current_branch( manager = branch.manager.person company = address = branch.person.get_main_address() state_registry = company.get_state_registry_number() state = address.get_state() self.sintegra.add_header(company.get_cnpj_number(), str(state_registry) or 'ISENTO', branch.get_description(), address.get_city(), state, branch.person.get_fax_number_number(), self.start, self.end) self.sintegra.add_complement_header( # If we don't have a street number, use zero for sintegra address.street, address.streetnumber or 0, address.complement, address.district, address.get_postal_code_number(),, branch.person.get_phone_number_number()) self._add_registers(state) def _add_fiscal_coupons(self): for item in self._date_query(FiscalDayHistory, 'emission_date'): self.sintegra.add_fiscal_coupon( item.emission_date, item.serial, item.serial_id, item.coupon_start, item.coupon_end, item.cro, item.crz, item.period_total, for tax in item.taxes: if not tax.value: continue code = tax.code if tax.type == 'ISS': code = 'ISS' self.sintegra.add_fiscal_tax(item.emission_date, item.serial, code, tax.value) def _get_cnpj_or_cpf(self, receiving_order): person = receiving_order.supplier.person company = person.has_individual_or_company_facets() if isinstance(company, Company): if not company.cnpj: raise SintegraError( _("You need to have a CNPJ number set on Company %s") % ( cnpj = company.get_cnpj_number() elif isinstance(company, Individual): cnpj = company.get_cpf_number() else: raise AssertionError return cnpj def _get_state_registry(self, receiving_order): company = receiving_order.supplier.person.has_individual_or_company_facets() if isinstance(company, Company): if not company.state_registry: raise SintegraError( _("You need to have a State Registry set on Company %s") % ( return company.get_state_registry_number() elif isinstance(company, Individual): return "ISENTO" else: raise AssertionError def _add_registers(self, state): receiving_orders = self._date_query(ReceivingOrder, 'receival_date') # 1) Add orders (registry 50) for receiving_order in receiving_orders: self._add_receiving_order(state, receiving_order) # 2) Add order items (registry 54) and collect sellables sellables = set() for receiving_order in receiving_orders: self._add_receiving_order_items(receiving_order, sellables) self._add_receiving_order_item_special( receiving_order, 991, receiving_order.freight_total) self._add_receiving_order_item_special( receiving_order, 992, receiving_order.secure_value) self._add_receiving_order_item_special( receiving_order, 999, receiving_order.expense_value) # 3) Add fiscal coupons (registry 60) self._add_fiscal_coupons() # 4) Add sold products (registry 60R) self._add_sold_products() # 5) Add inventories (registry 74) for inventory in self._date_query(Inventory, 'close_date'): self._add_inventory(inventory, state) # 6) Add sellables (registry 75) for sellable in sorted(sellables, key=operator.attrgetter("code")): self._add_sellable(sellable) def _add_receiving_order(self, state, receiving_order): cnpj = self._get_cnpj_or_cpf(receiving_order) # Sintegra register 50 requires us to separate the receiving orders per # class of sales tax (aliquota), so first we have to check all the # items in our order and split them out per tax code sellable_per_constant = {} for item in receiving_order.get_items(): # Tax is stored as a number between 0 and 100 # We're going to use it as a percentage value, so # divide by 100, Perhaps this code should move to a method in # the SellableTaxConstant class tax_value = item.sellable.tax_constant.tax_value if tax_value: tax_value /= 100 else: tax_value = 0 tax_values = sellable_per_constant.setdefault(tax_value, []) tax_values.append(item) # There's no way to specify a global discount for the whole order, # instead we have to put the discount proportionally over all # tax constants in the order, calculate the percentage here items_total = receiving_order.products_total extra_percental = 1 + ((receiving_order.freight_total + receiving_order.secure_value + receiving_order.expense_value - receiving_order.discount_value) / items_total) state_registry = self._get_state_registry(receiving_order) for tax_value, items in sellable_per_constant.items(): item_total = sum(item.get_total() for item in items) item_total *= extra_percental total_ipi = sum(item.receiving_order.ipi_total for item in items) no_items = receiving_order.get_items().count() if tax_value: base_total = item_total else: # Lack of a tax_value means lack of a ICMS base to calculate # taxes from base_total = 0 self.sintegra.add_receiving_order( cnpj, state_registry, receiving_order.receival_date, state, 1, '1 ', receiving_order.invoice_number, receiving_order.cfop_code, 'T', item_total + (total_ipi / no_items), base_total, item_total * tax_value, 0, (receiving_order.expense_value + receiving_order.secure_value), tax_value * 100, 'N') def _add_sold_products(self): sales = self._date_query(Sale, 'confirm_date') sellables = {} for sale in sales: if sale.status != Sale.STATUS_CONFIRMED: continue for sale_item in sale.products: discount = / sale_item.quantity # XXX: Shouldn't this be sale_item.price?! cost = (sale_item.sellable.price * sale_item.quantity - discount) if not sale_item.sellable in sellables: sellables[sale_item.sellable] = 0, 0 old_quantity, old_cost = sellables[sale_item.sellable] quantity = old_quantity + sale_item.quantity cost += old_cost sellables[sale_item.sellable] = quantity, cost date = self.start.strftime("%m%Y") for sellable in sorted(sellables, key=operator.attrgetter('code')): quantity, cost = sellables[sellable] tax_value = sellable.tax_constant.tax_value or 0 self.sintegra.add_products_summarized( date=int(date), product_code=sellable.code, product_quantity=quantity, total_liquido_produto=cost, total_icms_base=cost, icms_aliquota=tax_value) def _add_receiving_order_items(self, receiving_order, sellables): cnpj = self._get_cnpj_or_cpf(receiving_order) items = receiving_order.get_items() no_items = items.count() items_total = receiving_order.products_total extra_percental = 1 + ((receiving_order.freight_total + receiving_order.secure_value + receiving_order.expense_value - receiving_order.discount_value) / items_total) for i, item in enumerate(items): tax_value = item.sellable.tax_constant.tax_value or 0 item_total = item.get_total() if tax_value: base_total = item_total base_total *= extra_percental else: base_total = 0 self.sintegra.add_receiving_order_item( cnpj, 1, '1 ', receiving_order.invoice_number, receiving_order.cfop_code, '000', i + 1, item.sellable.code, item.quantity, item_total, receiving_order.discount_value / no_items, base_total, 0, receiving_order.ipi_total / no_items, tax_value) sellables.add(item.sellable) def _add_receiving_order_item_special(self, receiving_order, code, value): if not value: return cnpj = self._get_cnpj_or_cpf(receiving_order) self.sintegra.add_receiving_order_item(cnpj, 1, '1 ', receiving_order.invoice_number, receiving_order.cfop_code, None, code, None, 0, 0, value, 0, 0, 0, 0) def _add_sellable(self, sellable): unit = 'un' if sellable.unit: unit = str(sellable.unit.description or 'un') self.sintegra.add_product(self.start, self.end, sellable.code, 0, sellable.get_description(), unit, 0, 0, 0, 0) def _add_inventory(self, inventory, state): for item in inventory.get_items(): sellable = item.product.sellable # Before bug #3708 the inventory items did not store the product's # cost, in this case, we use the current cost. if item.product_cost: total_product_value = item.get_total_cost() else: total_product_value = sellable.cost * item.actual_quantity self.sintegra.add_inventory_item( inventory.close_date, product_code=sellable.code, product_quantity=item.actual_quantity, total_product_value=total_product_value, # we are assuming that the main company owns all the products # see the link in bug #3708 for further details. owner_code=1, owner_cnpj=None, owner_state_registry=None, state=state)
[docs]def generate(filename, start, end): """Generate a sintegra file for all changes in the system between start and end dates. Start and end are normally the first and last day of a month :param filename: filename to save the sintegra file :param start: start date :type start: :param end: end date :type start: """ generator = StoqlibSintegraGenerator(get_default_store(), start, end) fp = open(filename, 'wb') for register in generator.sintegra.get_registers(): fp.write(register.get_string().replace('2007', '2006')) fp.close()