I solved it using :
requests.post(url,headers=hdr,json={"filterList":[{}]}, cert='myprivate.pem')
Answer from user5023028 on Stack OverflowI solved it using :
requests.post(url,headers=hdr,json={"filterList":[{}]}, cert='myprivate.pem')
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,cafile='myCA.crt.pem') ctx.load_cert_chain(certfile='myprivate.pem')
load_cert_chain loads the cert and private key for use as client certificate - which would be the cert argument with requests. cafile describes the CA it should use to verify the server certificate - which would be the verify argument for requests. Combined this would result in:
requests.post(..., cert='myprivate.pem', verify='myCA.crt.pem')
In order to make the requests library use a custom ssl context, you need to create a custom HTTPAdapter class and override the init_poolmanager method to pass in extra arguments to the base class's implementation.
See sample code here:
from requests import Session
from requests.adapters import HTTPAdapter
import ssl
class CustomHTTPAdapter(HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
# this creates a default context with secure default settings,
# which enables server certficiate verification using the
# system's default CA certificates
context = ssl.create_default_context()
# alternatively, you could create your own context manually
# but this does NOT enable server certificate verification
# context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
super().init_poolmanager(*args, **kwargs, ssl_context=context)
def main():
client_session = Session()
client_session.mount("https://", CustomHTTPAdapter())
# now you can use the client_session to make requests
# r = client_session.get("https://<web address>/rest/info?f=json")
Creating an ssl.SSLContext() on its own doesn't enable certificate verification or load CA certificates by default. This is why you're not seeing SSL errors. Using ssl.create_ssl_context() does set verification by default.
So the issue here isn't with SSLContext or certifi, it's with the website's certificate and how you're constructing your SSLContext. Next step would be to look into why the certificate the website is presenting isn't valid.
In your callback, cb_context is the same context on which wrap_socket() was called, and the same as socket.context, so socket.context = cb_context sets the context to the same it was before.
Changing the certificate chain of a context does not affect the certificate used for the current wrap_socket() operation. The explanation for this lies in how openssl creates its underlying objects, in this case the underlying SSL structures have already been created and use copies of the chains:
NOTES
The chains associate with an SSL_CTX structure are copied to any SSL structures when SSL_new() is called. SSL structures will not be affected by any chains subsequently changed in the parent SSL_CTX.
When setting a new context, the SSL structures are updated, but that update is not performed when the new context is equal to the old one.
You need to set sock.context to a different context to make it work. You currently instantiate a new context on each new incoming connection, which is not needed. Instead you should instantiate your standard context only once and reuse that. Same goes for the dynamically loaded contexts, you could create them all on startup and put them in a dict so you can just do a lookup, e.g:
...
contexts = {}
for hostname in os.listdir("ssl"):
print('Loading certs for {}'.format(hostname))
server_cert = "ssl/{}/server".format(hostname)
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(certfile="{}.crt".format(server_cert),
keyfile="{}.key".format(server_cert))
contexts[hostname] = context
def servername_callback(sock, req_hostname, cb_context, as_callback=True):
context = contexts.get(req_hostname)
if context is not None:
sock.context = context
else:
pass # handle unknown hostname case
def run_server(hostname, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((hostname, port))
s.listen(8)
print("Serving on {}:{}".format(hostname, port))
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
context.set_servername_callback(servername_callback)
default_cert = "ssl/3.1/server"
context.load_cert_chain(certfile="{}.crt".format(default_cert),
keyfile="{}.key".format(default_cert))
try:
while True:
(c, a) = s.accept()
ssl_sock = context.wrap_socket(c, server_side=True)
try:
handle_client(ssl_sock, a)
finally:
c.close()
except KeyboardInterrupt:
s.close()
So after looking at this post and a few others online, I put together a version of the code above, that worked for me perfectly... so I just thought I would share. In case it helps anyone else.
import sys
import ssl
import socket
import os
from pprint import pprint
DOMAIN_CONTEXTS = {}
ssl_root_path = "c:/ssl/"
# ----------------------------------------------------------------------------------------------------------------------
#
# As an example create domains in the ssl root path...ie
#
# c:/ssl/example.com
# c:/ssl/johndoe.com
# c:/ssl/test.com
#
# And then create self signed ssl certificates for each domain to test... and put them in the corresponding domain
# directory... in this case the cert and key files are called cert.pem, and key.pem....
#
def setup_ssl_certs():
global DOMAIN_CONTEXTS
for hostname in os.listdir(ssl_root_path):
#print('Loading certs for {}'.format(hostname))
# Establish the certificate and key folder...for the various domains...
server_cert = '{rp}{hn}/'.format(rp=ssl_root_path, hn=hostname)
# Setup the SSL Context manager object, for authentication
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
# Load the certificate file, and key file...into the context manager.
context.load_cert_chain(certfile="{}cert.pem".format(server_cert), keyfile="{}key.pem".format(server_cert))
# Set the context object to the global dictionary
DOMAIN_CONTEXTS[hostname] = context
# Uncomment for testing only.
#pprint(contexts)
# ----------------------------------------------------------------------------------------------------------------------
def servername_callback(sock, req_hostname, cb_context, as_callback=True):
"""
This is a callback function for the SSL Context manager, this is what does the real work of pulling the
domain name in the origional request.
"""
# Uncomment for testing only
#print(sock)
#print(req_hostname)
#print(cb_context)
context = DOMAIN_CONTEXTS.get(req_hostname)
if context:
try:
sock.context = context
except Exception as error:
print(error)
else:
sock.server_hostname = req_hostname
else:
pass # handle unknown hostname case
def handle_client(conn, a):
request_domain = conn.server_hostname
request = conn.recv()
client_ip = conn.getpeername()[0]
resp = 'Hello {cip} welcome, from domain {d} !'.format(cip=client_ip, d=request_domain)
conn.write(b'HTTP/1.1 200 OK\n\n%s' % resp.encode())
def run_server(hostname, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((hostname, port))
s.listen(8)
#print("Serving on {}:{}".format(hostname, port))
context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
# For Python 3.4+
context.set_servername_callback(servername_callback)
# Only available in 3.7 !!!! have not tested it yet...
#context.sni_callback(servername_callback)
default_cert = "{rp}default/".format(rp=ssl_root_path)
context.load_cert_chain(certfile="{}cert.pem".format(default_cert), keyfile="{}key.pem".format(default_cert))
context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 # optional
context.set_ciphers('EECDH+AESGCM:EDH+AESGCM:AES256+EECDH:AES256+EDH')
try:
while True:
ssock, addr = s.accept()
try:
conn = context.wrap_socket(ssock, server_side=True)
except Exception as error:
print('!!! Error, {e}'.format(e=error))
except ssl.SSLError as e:
print(e)
else:
handle_client(conn, addr)
if conn:
conn.close()
#print('Connection closed !')
except KeyboardInterrupt:
s.close()
# ----------------------------------------------------------------------------------------------------------------------
def main():
setup_ssl_certs()
# Don't forget to update your static name resolution... ie example.com = 127.0.0.1
run_server('example.com', 443)
# ----------------------------------------------------------------------------------------------------------------------
if __name__ == '__main__':
main()