Flask Client-Side TLS Authentication
I do not consider myself an expert in this area. The following information is provided without guarantees of any kind, including the implicit guarantee of being useful for its stated purpose. This is simply an organized collection of notes for the reference of myself and my friends about how I personally accomplished client-side certificate authentication with Flask and Gunicorn. Anyone is free to use it as a learning tool or to base their own code off of it, but I make no guarantees about its security.
As usual, please feel free to contact me personally with any comments, suggestions, or corrections that you discover are appropriate for this document. Given sufficient evidence that they are correct and authoritative, I will apply them with attribution.
Introduction
TLS (formerly SSL) is a cornerstone of security on the internet. If you are not using it for your web applications, you should be. Its what turns HTTP into HTTPS. Most major browsers now either mark, or will mark very soon, any website that doesn't use HTTPS as insecure. However, many people do not pay much attention to how TLS works and what its major features are. Even with HTTPS becoming more and more prevalent, many think of it as simply a secure connection mechanism. What many do not realize is that HTTPS can and should also be used for client side authentication. In many ways it is more effective, convenient, and secure, than password-based authentication (which is what most websites use).
The difficult part with client-side authentication is two-fold and one issue feeds into the other. Specifically, it is difficult to issue a certificate to everyone who visits your website (and hence difficult to explain to them how and why they should use it) and client-side authentication is often poorly documented and poorly supported by various programming libraries and frameworks. Thankfully, the latter is improving, slowly but surely.
Another interesting tool that has gained popularity recently, is Flask, a Python "micro-framework" for RESTful services. I will not go into detail about any of those terms, you can follow the links for more information, but suffice it to say that if you are writing microservices for your systems, you should probably be doing it in Flask. Micro-services and Flask have been synonymous in my mind for a while now and I have never encountered something so well suited to that purpose.
This post, is all about how to get client-side TLS authentication working with Flask and Gunicorn - the recommended web server to use with Flask.
The Application
The thing that started this project is that sometime in early 2018 some hackers dumped a LOT of passwords onto the internet as a huge set of files, totaling some 1.5 billion accounts, including email addresses and passwords. I of course downloaded this set, but when people started asking my whether they were in that list or not, I found the need to create a web app and allow them to check for themselves.
This was of course very dangerous because you never want this kind of information to go across the internet unencrypted. Similarly, we never want someone who is not explicitly authorized to access this information to use the service. So I decided to add client-side certificate authentication to it which would ensure that only people who I issued a certificate to would be able to access the information.
If you are interested in more detail, please check out the app source code here: https://github.com/Abraxos/passwords
Client-Side Authentication
The general mechanism by which we achieve client-side authentication for Flask is that we actually do it in Gunicorn by configuring it with relevant SSL settings and with a CustomSyncWorker
object to modify every single request that goes through with several, relevant, custom headers that can then be used for identifying the user throughout our application after they have been authenticated by Gunicorn.
Gunicorn TLS Settings
The first, and most important part, is configuring Gunicorn on launch to actually check for client-side certificates, require a client-side handshake, and enforce that the client-provided certificate was valid from the point of view of a given certificate authority.
In my application, this is accomplished by the ClientAuthApplication
object, which inherits from the gunicorn.app.base.BaseApplication
object.
import gunicorn.app.base
from pathlib import Path
from gunicorn.six import iteritems
from .utils import number_of_workers
class ClientAuthApplication(gunicorn.app.base.BaseApplication):
def __init__(self, app, ca_path: Path, cert_path: Path, key_path: Path,
hostname=None, port=None, num_workers=None, timeout=None):
hostname = 'localhost' if not hostname else hostname
port = 443 if not port else port
timeout = 30 if not timeout else timeout
num_workers = number_of_workers() if not num_workers else num_workers
self.options = {
'bind': '{}:{}'.format(hostname, port),
'workers': num_workers,
'worker_class': 'passwords.client_auth_worker.CustomSyncWorker',
'timeout': timeout,
'ca_certs': str(ca_path),
'certfile': str(cert_path),
'keyfile': str(key_path),
'cert_reqs': 2,
'do_handshake_on_connect': True
}
self.application = app
super().__init__()
def init(self, parser, opts, args):
return super().init(parser, opts, args)
def load_config(self):
config = dict([(key, value) for key, value in iteritems(self.options)
if key in self.cfg.settings and value is not None])
for key, value in iteritems(config):
self.cfg.set(key.lower(), value)
def load(self):
return self.application
The key element in the code above, is the self.options
dictionary:
self.options = {
'bind': '{}:{}'.format(hostname, port),
'workers': num_workers,
'worker_class': 'passwords.client_auth_worker.CustomSyncWorker',
'timeout': timeout,
'ca_certs': str(ca_path),
'certfile': str(cert_path),
'keyfile': str(key_path),
'cert_reqs': 2,
'do_handshake_on_connect': True
}
Note that is sets some things like the timeout, which is relatively benign, the number of workers, and then goes on to set the paths to the relevant TLS keys/certs. Aside from that, we use 'worker_class': 'passwords.client_auth_worker.CustomSyncWorker',
to point Gunicorn at the custom worker that will set the relevant headers on every request, and 'cert_reqs': 2, 'do_handshake_on_connect': True
which enforce the handshakes and client-side certificate requirements. You can set different settings for your webapp server using the following as a reference: http://docs.gunicorn.org/en/stable/settings.html
Custom Sync Worker
In order to get gunicorn to pass information into our application so that we may use it to access the proper user information, we need a custom worker to set the request headers on incoming HTTP requests after the client-side certificate information has been verified.
This is accomplished with a CustomSyncWorker
object which inherits from gunicorn.workers.sync.SyncWorker
. The functionality is fairly simple, we get the certificate information, and write it in as custom headers X-USER
, X-NOT_BEFORE
, X-NOT_AFTER
, and X-ISSUER
:
"""A worker for Gunicorn that sets the client's Common Name as the X-USER header
variable for every request after the client has been authenticated so that
a Flask application can contain the authorization logic.
Based on: https://gist.github.com/kgriffs/289206f07e23b9a30d29a2b23e28c41c"""
import ssl
from gunicorn.workers.sync import SyncWorker
class CustomSyncWorker(SyncWorker):
"""A custom worker for putting authentication information into the X-USER
header variable of each request."""
def handle_request(self, listener, req, client, addr):
"""Handles each incoming request after a client has been authenticated."""
subject = dict([i for subtuple in client.getpeercert().get('subject') for i in subtuple])
issuer = dict([i for subtuple in client.getpeercert().get('issuer') for i in subtuple])
headers = dict(req.headers)
headers['X-USER'] = subject.get('commonName')
not_before = client.getpeercert().get('notBefore')
not_after = client.getpeercert().get('notAfter')
headers['X-NOT_BEFORE'] = ssl.cert_time_to_seconds(not_before)
headers['X-NOT_AFTER'] = ssl.cert_time_to_seconds(not_after)
headers['X-ISSUER'] = issuer['commonName']
req.headers = list(headers.items())
super(CustomSyncWorker, self).handle_request(listener, req, client, addr)
Writing Your Application
Reference: https://github.com/Abraxos/passwords/blob/master/passwords/webapp.py
And now, you are free to write the actual logic of your application:
def api_v0_1(route):
return '/api/v0.1/{}'.format(route)
def with_metadata(data):
return {'metadata': {'timestamp': arrow.utcnow().timestamp},
'data': data}
@APP.route(api_v0_1('hello'), methods=['GET'])
def hello():
"""Receives a sudoku to solve and returns a solution"""
return jsonify(with_metadata("Hello, world!"))
@APP.route(api_v0_1('headers'))
def headers():
"""Basic function"""
return jsonify(with_metadata({'X-USER': request.headers['X-USER'],
'X-ISSUER': request.headers['X-ISSUER'],
'X-NOT_BEFORE': request.headers['X-NOT_BEFORE'],
'X-NOT_AFTER': request.headers['X-NOT_AFTER']}))
The examples above should demonstrate the basics of how the header information can be accessed inside your application.
Launching The Application
To launch the application, you need to run the ClientAuthApplication
applicatin that we wrote above. There are some elements of Flask that require the use of globals that I didn't illustrate here, but you can always take a look at how I managed them here: https://github.com/Abraxos/passwords/blob/master/passwords/webapp.py
@click.command()
@click.argument('config-file')
def run_app_server(config_file):
"""Launches Passwords REST API (without client authentication) in a gunicorn server"""
config_file = Path(config_file)
config = ConfigParser()
config.read(config_file)
ca_path = config_file.parent / config['webapp']['ca']
cert_path = config_file.parent / config['webapp']['cert']
key_path = config_file.parent / config['webapp']['key']
hostname = config['webapp']['hostname']
port = config['webapp']['port']
configure_app(APP, config)
ClientAuthApplication(APP, ca_path, cert_path, key_path, hostname, port).run()
if __name__ == '__main__':
run_app_server()
Testing
Testing is critical, I personally use py.test, but regardless of what you use, your testing code should be able to verify key features of your application and of the TLS security mechanisms.
Testing Client-Side Certs
Because the client-side certificate authentication is provided through Gunicorn, you will actually need to run a Gunicorn server instance in a separate process to test with. Luckily, Python context managers make this really easy in concert with the multiprocessing
module:
from passwords.webapp import api_v0_1, ClientAuthApplication, APP
from passwords.utils import package_path
import requests
import pytest
from time import sleep
from multiprocessing import Process
from contextlib import contextmanager
from pathlib import Path
@contextmanager
def local_server(key_path: Path, cert_path: Path, ca_path: Path, port=8080) -> None:
"""A context manager that spins up a client-auth gunicorn server as a
separate process and then closes it when needed. Configured for
testing with a low timeout."""
def run_server():
gunicorn_app = ClientAuthApplication(APP, ca_path, cert_path, key_path,
hostname='localhost', port=port,
num_workers=1, timeout=2)
gunicorn_app.run()
server_process = Process(target=run_server)
try:
server_process.start()
sleep(0.4)
yield 'https://localhost:{}'.format(port)
finally:
server_process.terminate()
Now you can test how a client with a valid certificate is authenticated:
def test_client_auth_with_valid_credentials():
"""If someone attempts to make a connection using valid client credentials, that connection
should succeed and the headers should be set appropriately."""
server_key_path = PACKAGE_PATH / 'passwords/test/resources/server/server.key'
ca_path = PACKAGE_PATH / 'passwords/test/resources/server/ca.crt'
server_cert_path = PACKAGE_PATH / 'passwords/test/resources/server/server.crt'
alice_key_path = PACKAGE_PATH / 'passwords/test/resources/alice/alice.key'
alice_cert_path = PACKAGE_PATH / 'passwords/test/resources/alice/alice.crt'
with local_server(server_key_path, server_cert_path, ca_path) as host:
result = requests.get(host + api_v0_1('hello'), verify=str(ca_path), cert=(alice_cert_path, alice_key_path))
assert result.status_code == 200
result = requests.get(host + api_v0_1('headers'), verify=str(ca_path), cert=(alice_cert_path, alice_key_path))
assert result.json()['X-ISSUER'] == 'Kovalev Systems CA'
assert result.json()['X-USER'] == 'Alice'
as well as scenarios where someone is trying to do something malicious:
@pytest.mark.xfail(raises=requests.exceptions.SSLError)
def test_malicious_ca_client():
"""If someone attempts to make a connection using a certificate with the same CA name, but incorrect signature
this should fail."""
server_key_path = PACKAGE_PATH / 'passwords/test/resources/server/server.key'
ca_path = PACKAGE_PATH / 'passwords/test/resources/server/ca.crt'
server_cert_path = PACKAGE_PATH / 'passwords/test/resources/server/server.crt'
mallory_key_path = PACKAGE_PATH / 'passwords/test/resources/mallory/mallory.key'
mallory_cert_path = PACKAGE_PATH / 'passwords/test/resources/mallory/mallory.crt'
with local_server(server_key_path, server_cert_path, ca_path) as host:
requests.get(host + api_v0_1('hello'), verify=str(ca_path), cert=(mallory_cert_path, mallory_key_path))
Conclusion
So yeah, there it is. Its really not particularly difficult, there are a lot of useful libraries to help you. Please do not hesitate to contact me either to recommend changes to this article or to ask questions about it.
I hope this helps you write your own microservices in a more secure manner.
Notable Recommendations
- Input validation is critical to a secure application. I personally recommend either voluptuous or cerberus for input validation.
- Launching your application from the command-line is also important for ease of deployment. I recommend click.