Source code for stoqlib.lib.session

# -*- Mode: Python; coding: utf-8 -*-
# vi:si:et:sw=4:sts=4:ts=4

## Copyright (C) 2017 Async Open Source <>
## All rights reserved
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU Lesser General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## GNU Lesser General Public License for more details.
## You should have received a copy of the GNU Lesser General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., or visit:
## Author(s): Stoq Team <>

import errno
import httplib
import logging
import os
import urlparse

from nss import io
from nss import nss
from nss import ssl
from nss.error import NSPRError

log = logging.getLogger(__name__)
_certdb = None
_password_callback = None
_certificate_callback = None

[docs]def nss_setup(certdb, password_callback=None, certificate_callback=None): global _certdb global _password_callback global _certificate_callback _certdb = certdb _password_callback = password_callback _certificate_callback = certificate_callback
class _NssHTTPConnection(httplib.HTTPConnection): default_port = 443 def __init__(self, host, port, strict=None, timeout=3, **kwargs): httplib.HTTPConnection.__init__( self, host, port, strict=strict, timeout=timeout, **kwargs)'%s init %s', self.__class__.__name__, host) self.sock = None self._timeout = timeout self._certdb = nss.get_default_certdb() def connect(self):"connect: host=%s port=%s",, self.port) try: addr_info = io.AddrInfo( except Exception as e: log.error("could not resolve host address '%s'", raise for net_addr in addr_info: net_addr.port = self.port self._create_socket( try:"try connect: %s", net_addr) self.sock.connect(net_addr, timeout=io.seconds_to_interval(self._timeout)) except Exception as e:"connect failed: %s (%s)", net_addr, e) else:"connected to: %s", net_addr) break else: raise IOError(errno.ENOTCONN, "Could not connect to %s at port %d" % (, self.port)) def _create_socket(self, family): self.sock = ssl.SSLSocket(family) self.sock.set_ssl_option(ssl.SSL_SECURITY, True) self.sock.set_ssl_option(ssl.SSL_HANDSHAKE_AS_CLIENT, True) self.sock.set_hostname( # Provide a callback to verify the servers certificate self.sock.set_auth_certificate_callback( self._auth_certificate_callback, self._certdb) self.sock.set_client_auth_data_callback( self._client_auth_data_callback, '', '', self._certdb) def _auth_certificate_callback(self, sock, check_sig, is_server, certdb): cert = sock.get_peer_certificate() pin_args = sock.get_pkcs11_pin_arg() or () intended_usage = nss.certificateUsageSSLServer try: # If the cert fails validation it will raise an exception, the errno attribute # will be set to the error code matching the reason why the validation failed # and the strerror attribute will contain a string describing the reason. approved_usage = cert.verify_now(certdb, check_sig, intended_usage, *pin_args) except Exception as e: # XXX: Why isn't the certificate valid?'cert validation failed for "%s" (%s)', cert.subject, e.strerror) approved_usage = intended_usage logging.debug("approved_usage = %s intended_usage = %s", ', '.join(nss.cert_usage_flags(approved_usage)), ', '.join(nss.cert_usage_flags(intended_usage))) if not bool(approved_usage & intended_usage): logging.debug('cert not valid for "%s"', cert.subject) return False # Certificate is OK. Since this is the client side of an SSL # connection, we need to verify that the name field in the cert # matches the desired hostname. This is our defense against # man-in-the-middle attacks. hostname = sock.get_hostname() try: # If the cert fails validation it will raise an exception cert_is_valid = cert.verify_hostname(hostname) except Exception, e: logging.error('failed verifying socket hostname "%s" matches cert subject "%s" (%s)', hostname, cert.subject, e.strerror) return False logging.debug('cert valid %s for "%s"', cert_is_valid, cert.subject) return cert_is_valid def _client_auth_data_callback(self, ca_names, chosen_nickname, password, nicknames): nickname = _certificate_callback( nss.get_cert_nicknames(self._certdb, nss.SEC_CERT_NICKNAMES_USER)) try: cert = nss.find_cert_from_nickname(nickname, password) priv_key = nss.find_key_by_any_cert(cert, password) except NSPRError: return False return cert, priv_key
[docs]class NssResponse(object): """Nss response object. This maps the nss response os a request to the same API that requests used, making it easier to exchange one for another """ def __init__(self, response): self._response = response self.status_code = response.status self.reason = response.reason @property def content(self): return
[docs]class NssSession(object): """Nss session to communicate with Sefaz using a certificate. When using this, make sure to :meth:`.init` it and :meth:`.shutdown` after. This is specially important for A3 certificates so it can free the token for the signature code to work. The easies way for doing that is by using a contextmanager like:: >> with NssSession() as s: ..'some_url') """ SCHEME_PORT_MAP = { 'http': 80, 'https': 443, } def __init__(self): # Reuse socks as much as we can. This dict will map # the netloc:port to an open _NssHTTPConnection to that location self._conns = {} def __enter__(self): self.init() return self def __exit__(self, *args): for conn in self._conns.values(): conn.close() self.shutdown()
[docs] def init(self): if nss.nss_is_initialized(): return if _password_callback is not None: nss.set_password_callback(_password_callback) nss.nss_init(_certdb) ssl.set_domestic_policy()
[docs] def shutdown(self): if not nss.nss_is_initialized(): return try: ssl.clear_session_cache() except Exception: pass try: nss.nss_shutdown() except Exception: pass
[docs] def get(self, url, headers=None): return self.request('GET', url, headers=headers)
[docs] def post(self, url, data=None, headers=None): return self.request('POST', url, data=data, headers=headers)
[docs] def request(self, method, url, data=None, headers=None): parsed = urlparse.urlparse(url) port = parsed.port if not port: port = self.SCHEME_PORT_MAP[parsed.scheme] key = (parsed.netloc, port) conn = self._conns.get(key, None) if conn is None: conn = self._conns.setdefault(key, _NssHTTPConnection(parsed.netloc, port)) conn.connect() # FIXME: python-nss stores password_callback on a per-thread dict # Since this object will be called from different threads, # some would not find it. It is not a big problem since setting the # password callback is a fast operation, but maybe there's # some better solution here? if _password_callback is not None: nss.set_password_callback(_password_callback) conn.request(method, parsed.path, body=data, headers=headers) return NssResponse(conn.getresponse())
if __name__ == '__main__': firefoxdir = os.path.join(os.environ['HOME'], '.mozilla', 'firefox') if not os.path.exists(firefoxdir): raise AssertionError import ConfigParser cfg = ConfigParser.ConfigParser(), 'profiles.ini')) nss_setup(os.path.join(firefoxdir, cfg.get('Profile0', 'Path'))) url = '' data = ('<soap:Envelope xmlns:soap="" ' 'xmlns:xsd="" ' 'xmlns:xsi="">' '<soap:Header>' '<nfeCabecMsg xmlns="">' '<versaoDados>3.10</versaoDados><cUF>43</cUF></nfeCabecMsg></soap:Header>' '<soap:Body>' '<nfeDadosMsg xmlns="">' '<consStatServ xmlns="" versao="3.10">' '<tpAmb>2</tpAmb><cUF>43</cUF><xServ>STATUS</xServ></consStatServ>' '</nfeDadosMsg></soap:Body></soap:Envelope>') headers = {'Content-type': u'application/soap+xml; charset=utf-8', 'Accept': u'application/soap+xml; charset=utf-8'} with NssSession() as s: res =, data, headers) print "status:", res.status_code print "reason:", res.reason print "text:", res.text