# -*- Mode: Python; coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4
##
## Copyright (C) 2017 Async Open Source <http://www.async.com.br>
## All rights reserved
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU Lesser General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU Lesser General Public License for more details.
##
## You should have received a copy of the GNU Lesser General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., or visit: http://www.gnu.org/.
##
## Author(s): Stoq Team <stoq-devel@async.com.br>
##
import contextlib
import hashlib
import os
import platform
import shutil
import subprocess
import tempfile
from OpenSSL import crypto
from stoqlib.domain.certificate import Certificate
from stoqlib.lib.osutils import get_application_dir
from stoqlib.lib.settings import get_settings
from stoqlib.lib.xmlutils import get_signer
_is_windows = platform.system() == 'Windows'
certdb_path = os.path.join(get_application_dir(), 'certdb')
pkcs12_cert_path = os.path.join(certdb_path, 'cert.pfx')
pkcs11_lib_path = os.path.join(certdb_path, 'cert.so')
cert_path = {
Certificate.TYPE_PKCS11: pkcs11_lib_path,
Certificate.TYPE_PKCS12: pkcs12_cert_path,
}
[docs]def check_certdb():
"""Check if the certdb exists."""
if not os.path.isdir(certdb_path):
return False
# FIXME: There's no nss on Windows
if _is_windows:
return True
return subprocess.call(['modutil', '-dbdir', certdb_path, '-list'],
stdout=subprocess.PIPE) == 0
[docs]def init_certdb():
"""Initialie the certdb, removing the old one if if exists."""
if os.path.exists(certdb_path):
shutil.rmtree(certdb_path)
os.makedirs(certdb_path)
# FIXME: There's no nss on Windows
if _is_windows:
return
with tempfile.NamedTemporaryFile(delete=False) as f:
subprocess.check_call(['certutil', '-N',
'-f', f.name, '-d', certdb_path])
[docs]def import_pkcs11(content):
"""Import a pkcs11 (A3) certificate.
:param content: The content of the certificate library
"""
with open(pkcs11_lib_path, 'wb') as f:
f.write(content)
# FIXME: There's no nss on Windows
if _is_windows:
return
subprocess.check_call(['modutil', '-add', 'ca_certs', '-force',
'-libfile', pkcs11_lib_path, '-dbdir', certdb_path])
[docs]def import_pkcs12(content, password):
"""Import a pkcs12 (A1) certificate.
:param content: The content of the certificate file
:param password: The certificate password
"""
with open(pkcs12_cert_path, 'wb') as f:
f.write(content)
# FIXME: There's no nss on Windows
if _is_windows:
return
subprocess.check_call(['pk12util', '-d', certdb_path,
'-i', pkcs12_cert_path, '-W', password])
[docs]class CertificateManager(object):
_instance = None
def __init__(self):
self.setup_done = False
self._certificate = None
self._cert_callback = None
self._password = None
self._pw_callback = None
self._cert_type = None
self._cert_name = None
#
# Public API
#
@classmethod
[docs] def get_instance(cls):
"""Get the singleton instance of this manager."""
if cls._instance is None:
cls._instance = cls()
return cls._instance
[docs] def setup_certificate(self, certificate,
password_callback=None, certificate_callback=None):
"""Setup the certificate in the manager.
Setup the certificate and make sure it is ready to be used for
signing and transmission.
:param certificate: The :class:`stoqnfe.domain.cert.NfeCertificate`
that is going to be configured
:param password_callback: A callable that will be used to ask
for the certificate password
:param certificate_callback: A callable that will be used to decide
which if the existing certificates (PKCS11 only) will be used
"""
self._cert_name = certificate.name
self._cert_type = certificate.type
self._password = certificate.password
self._pw_callback = password_callback
self._cert_callback = certificate_callback
md5sum_file = os.path.join(certdb_path, 'cert.md5sum')
data = (certificate.content +
(self._password.hashed_password or u'').encode('utf-8'))
md5sum = hashlib.md5(data).hexdigest()
if os.path.isfile(md5sum_file):
try:
with open(md5sum_file, 'r') as f:
force = md5sum != f.read()
except Exception:
force = True
else:
force = True
if not check_certdb() or force:
init_certdb()
if certificate.type == Certificate.TYPE_PKCS11:
import_pkcs11(certificate.content)
elif certificate.type == Certificate.TYPE_PKCS12:
import_pkcs12(certificate.content, self._password.password)
with open(md5sum_file, 'w') as f:
f.write(md5sum)
if not _is_windows:
# FIXME: There's no nss on Windows
from stoqlib.lib.session import nss_setup
nss_setup(certdb_path,
password_callback=self._password_callback,
certificate_callback=self._certificate_callback)
self.setup_done = True
@contextlib.contextmanager
[docs] def get_certs(self):
"""Get the certificate and its key in a temporary file.
This will separate the certificate and the key in temporary files.
The path to those files will be yielded in the context, and then
removed from the file system.
Note that this only works for PKCS12 since there's no way to extract
the private key from a PKCS11 token.
:returns: (cert_file_path, key_file_path)
"""
# This is not supported for PKCS11 yet (and maybe it will never be)
assert self._cert_type == Certificate.TYPE_PKCS12
with open(cert_path[self._cert_type], 'rb') as f:
pkcs12 = crypto.load_pkcs12(f.read(), self._get_password())
with tempfile.NamedTemporaryFile(delete=False) as cert_file:
cert_file.write(crypto.dump_certificate(crypto.FILETYPE_PEM,
pkcs12.get_certificate()))
with tempfile.NamedTemporaryFile(delete=False) as key_file:
key_file.write(crypto.dump_privatekey(crypto.FILETYPE_PEM,
pkcs12.get_privatekey()))
yield (cert_file.name, key_file.name)
os.unlink(cert_file.name)
os.unlink(key_file.name)
[docs] def sign(self, xml):
"""Sign the given xml."""
backend = get_signer(self._cert_type)
return backend.get_signature(xml, cert_path[self._cert_type],
self._get_password,
self._certificate_callback)
#
# Private
#
def _get_password(self, token_name=None, retry=False):
# Ask the password in the following conditions:
# * We are retrying (password was incorrect)
# * The password was not yet supplied
password = self._password and self._password.password
token_name = token_name or self._cert_name
if retry or password is None:
self._password = self._pw_callback(token_name, retry)
password = self._password.password
return password
#
# Callbacks
#
def _password_callback(self, slot, retry, old_password=None):
return self._get_password(slot.token_name, retry)
def _certificate_callback(self, certificates):
if self._certificate is not None and self._certificate in certificates:
return self._certificate
# If there's only one certificate, don't need to ask the user
if len(certificates) == 1:
self._certificate = certificates[0]
else:
settings = get_settings()
last_used = settings.get('nfe-certificate-last-used', None)
self._certificate = self._cert_callback(certificates,
last_used)
settings.set('nfe-certificate-last-used', self._certificate)
return self._certificate