ssl-checker/ssl_checker.py
2020-12-09 18:05:54 +03:30

389 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
import socket
import sys
import json
from argparse import ArgumentParser, SUPPRESS
from datetime import datetime
from ssl import PROTOCOL_TLSv1
from time import sleep
from csv import DictWriter
try:
from OpenSSL import SSL
from json2html import *
except ImportError:
print('Please install required modules: pip install -r requirements.txt')
sys.exit(1)
class Clr:
"""Text colors."""
RST = '\033[39m'
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
class SSLChecker:
total_valid = 0
total_expired = 0
total_failed = 0
total_warning = 0
def get_cert(self, host, port, user_args):
"""Connection to the host."""
if user_args.socks:
import socks
if user_args.verbose:
print('{}Socks proxy enabled{}\n'.format(Clr.YELLOW, Clr.RST))
socks_host, socks_port = self.filter_hostname(user_args.socks)
socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, socks_host, int(socks_port), True)
socket.socket = socks.socksocket
if user_args.verbose:
print('{}Connecting to socket{}\n'.format(Clr.YELLOW, Clr.RST))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
osobj = SSL.Context(PROTOCOL_TLSv1)
sock.connect((host, int(port)))
oscon = SSL.Connection(osobj, sock)
oscon.set_tlsext_host_name(host.encode())
oscon.set_connect_state()
oscon.do_handshake()
cert = oscon.get_peer_certificate()
sock.close()
if user_args.verbose:
print('{}Closing socket{}\n'.format(Clr.YELLOW, Clr.RST))
return cert
def border_msg(self, message):
"""Print the message in the box."""
row = len(message)
h = ''.join(['+'] + ['-' * row] + ['+'])
result = h + '\n' "|" + message + "|"'\n' + h
print(result)
def analyze_ssl(self, host, context, user_args):
"""Analyze the security of the SSL certificate."""
try:
from urllib.request import urlopen
except ImportError:
from urllib2 import urlopen
api_url = 'https://api.ssllabs.com/api/v3/'
while True:
if user_args.verbose:
print('{}Requesting analyze to {}{}\n'.format(Clr.YELLOW, api_url, Clr.RST))
main_request = json.loads(urlopen(api_url + 'analyze?host={}'.format(host)).read().decode('utf-8'))
if main_request['status'] in ('DNS', 'IN_PROGRESS'):
if user_args.verbose:
print('{}Analyze waiting for reports to be finished (5 secs){}\n'.format(Clr.YELLOW, Clr.RST))
sleep(5)
continue
elif main_request['status'] == 'READY':
if user_args.verbose:
print('{}Analyze is ready{}\n'.format(Clr.YELLOW, Clr.RST))
break
endpoint_data = json.loads(urlopen(api_url + 'getEndpointData?host={}&s={}'.format(
host, main_request['endpoints'][0]['ipAddress'])).read().decode('utf-8'))
if user_args.verbose:
print('{}Analyze report message: {}{}\n'.format(Clr.YELLOW, endpoint_data['statusMessage'], Clr.RST))
# if the certificate is invalid
if endpoint_data['statusMessage'] == 'Certificate not valid for domain name':
return context
context[host]['grade'] = main_request['endpoints'][0]['grade']
context[host]['poodle_vuln'] = endpoint_data['details']['poodle']
context[host]['heartbleed_vuln'] = endpoint_data['details']['heartbleed']
context[host]['heartbeat_vuln'] = endpoint_data['details']['heartbeat']
context[host]['freak_vuln'] = endpoint_data['details']['freak']
context[host]['logjam_vuln'] = endpoint_data['details']['logjam']
context[host]['drownVulnerable'] = endpoint_data['details']['drownVulnerable']
return context
def get_cert_sans(self, x509cert):
"""
Get Subject Alt Names from Certificate. Shameless taken from stack overflow:
https://stackoverflow.com/users/4547691/anatolii-chmykhalo
"""
san = ''
ext_count = x509cert.get_extension_count()
for i in range(0, ext_count):
ext = x509cert.get_extension(i)
if 'subjectAltName' in str(ext.get_short_name()):
san = ext.__str__()
# replace commas to not break csv output
san = san.replace(',', ';')
return san
def get_cert_info(self, host, cert):
"""Get all the information about cert and create a JSON file."""
context = {}
cert_subject = cert.get_subject()
context['host'] = host
context['issued_to'] = cert_subject.CN
context['issued_o'] = cert_subject.O
context['issuer_c'] = cert.get_issuer().countryName
context['issuer_o'] = cert.get_issuer().organizationName
context['issuer_ou'] = cert.get_issuer().organizationalUnitName
context['issuer_cn'] = cert.get_issuer().commonName
context['cert_sn'] = str(cert.get_serial_number())
context['cert_sha1'] = cert.digest('sha1').decode()
context['cert_alg'] = cert.get_signature_algorithm().decode()
context['cert_ver'] = cert.get_version()
context['cert_sans'] = self.get_cert_sans(cert)
context['cert_exp'] = cert.has_expired()
context['cert_valid'] = False if cert.has_expired() else True
# Valid from
valid_from = datetime.strptime(cert.get_notBefore().decode('ascii'),
'%Y%m%d%H%M%SZ')
context['valid_from'] = valid_from.strftime('%Y-%m-%d')
# Valid till
valid_till = datetime.strptime(cert.get_notAfter().decode('ascii'),
'%Y%m%d%H%M%SZ')
context['valid_till'] = valid_till.strftime('%Y-%m-%d')
# Validity days
context['validity_days'] = (valid_till - valid_from).days
# Validity in days from now
now = datetime.now()
context['days_left'] = (valid_till - now).days
# Valid days left
context['valid_days_to_expire'] = (datetime.strptime(context['valid_till'],
'%Y-%m-%d') - datetime.now()).days
if cert.has_expired():
self.total_expired += 1
else:
self.total_valid += 1
# If the certificate has less than 15 days validity
if context['valid_days_to_expire'] <= 15:
self.total_warning += 1
return context
def print_status(self, host, context, analyze=False):
"""Print all the usefull info about host."""
print('\t{}[+]{} {}\n\t{}'.format(Clr.GREEN, Clr.RST, host, '-' * (len(host) + 5)))
print('\t\tIssued domain: {}'.format(context[host]['issued_to']))
print('\t\tIssued to: {}'.format(context[host]['issued_o']))
print('\t\tIssued by: {} ({})'.format(context[host]['issuer_o'], context[host]['issuer_c']))
print('\t\tValid from: {}'.format(context[host]['valid_from']))
print('\t\tValid to: {} ({} days left)'.format(context[host]['valid_till'], context[host]['valid_days_to_expire']))
print('\t\tValidity days: {}'.format(context[host]['validity_days']))
print('\t\tCertificate valid: {}'.format(context[host]['cert_valid']))
print('\t\tCertificate S/N: {}'.format(context[host]['cert_sn']))
print('\t\tCertificate SHA1 FP: {}'.format(context[host]['cert_sha1']))
print('\t\tCertificate version: {}'.format(context[host]['cert_ver']))
print('\t\tCertificate algorithm: {}'.format(context[host]['cert_alg']))
if analyze:
print('\t\tCertificate grade: {}'.format(context[host]['grade']))
print('\t\tPoodle vulnerability: {}'.format(context[host]['poodle_vuln']))
print('\t\tHeartbleed vulnerability: {}'.format(context[host]['heartbleed_vuln']))
print('\t\tHeartbeat vulnerability: {}'.format(context[host]['heartbeat_vuln']))
print('\t\tFreak vulnerability: {}'.format(context[host]['freak_vuln']))
print('\t\tLogjam vulnerability: {}'.format(context[host]['logjam_vuln']))
print('\t\tDrown vulnerability: {}'.format(context[host]['drownVulnerable']))
print('\t\tExpired: {}'.format(context[host]['cert_exp']))
print('\t\tCertificate SAN\'s: ')
for san in context[host]['cert_sans'].split(';'):
print('\t\t \\_ {}'.format(san.strip()))
print('\n')
def show_result(self, user_args):
"""Get the context."""
context = {}
start_time = datetime.now()
hosts = user_args.hosts
if not user_args.json_true and not user_args.summary_true:
self.border_msg(' Analyzing {} host(s) '.format(len(hosts)))
if not user_args.json_true and user_args.analyze:
print('{}Warning: -a/--analyze is enabled. It takes more time...{}\n'.format(Clr.YELLOW, Clr.RST))
for host in hosts:
if user_args.verbose:
print('{}Working on host: {}{}\n'.format(Clr.YELLOW, host, Clr.RST))
host, port = self.filter_hostname(host)
# Check duplication
if host in context.keys():
continue
try:
cert = self.get_cert(host, port, user_args)
context[host] = self.get_cert_info(host, cert)
context[host]['tcp_port'] = int(port)
# Analyze the certificate if enabled
if user_args.analyze:
context = self.analyze_ssl(host, context, user_args)
if not user_args.json_true and not user_args.summary_true:
self.print_status(host, context, user_args.analyze)
except SSL.SysCallError:
if not user_args.json_true:
print('\t{}[-]{} {:<20s} Failed: Misconfigured SSL/TLS\n'.format(Clr.RED, Clr.RST, host))
self.total_failed += 1
except Exception as error:
if not user_args.json_true:
print('\t{}[-]{} {:<20s} Failed: {}\n'.format(Clr.RED, Clr.RST, host, error))
self.total_failed += 1
except KeyboardInterrupt:
print('{}Canceling script...{}\n'.format(Clr.YELLOW, Clr.RST))
sys.exit(1)
if not user_args.json_true:
self.border_msg(' Successful: {} | Failed: {} | Valid: {} | Warning: {} | Expired: {} | Duration: {} '.format(
len(hosts) - self.total_failed, self.total_failed, self.total_valid,
self.total_warning, self.total_expired, datetime.now() - start_time))
if user_args.summary_true:
# Exit the script just
return
# CSV export if -c/--csv is specified
if user_args.csv_enabled:
self.export_csv(context, user_args.csv_enabled, user_args)
# HTML export if -x/--html is specified
if user_args.html_true:
self.export_html(context)
# While using the script as a module
if __name__ != '__main__':
return json.dumps(context)
# Enable JSON output if -j/--json argument specified
if user_args.json_true:
print(json.dumps(context))
if user_args.json_save_true:
for host in context.keys():
with open(host + '.json', 'w', encoding='UTF-8') as fp:
fp.write(json.dumps(context[host]))
def export_csv(self, context, filename, user_args):
"""Export all context results to CSV file."""
# prepend dict keys to write column headers
if user_args.verbose:
print('{}Generating CSV export{}\n'.format(Clr.YELLOW, Clr.RST))
with open(filename, 'w') as csv_file:
csv_writer = DictWriter(csv_file, list(context.items())[0][1].keys())
csv_writer.writeheader()
for host in context.keys():
csv_writer.writerow(context[host])
def export_html(self, context):
"""Export JSON to HTML."""
html = json2html.convert(json=context)
file_name = datetime.strftime(datetime.now(), '%Y_%m_%d_%H_%M_%S')
with open('{}.html'.format(file_name), 'w') as html_file:
html_file.write(html)
return
def filter_hostname(self, host):
"""Remove unused characters and split by address and port."""
host = host.replace('http://', '').replace('https://', '').replace('/', '')
port = 443
if ':' in host:
host, port = host.split(':')
return host, port
def get_args(self, json_args={}):
"""Set argparse options."""
parser = ArgumentParser(prog='ssl_checker.py', add_help=False,
description="""Collects useful information about given host's SSL certificates.""")
if len(json_args) > 0:
args = parser.parse_args()
setattr(args, 'json_true', True)
setattr(args, 'verbose', False)
setattr(args, 'csv_enabled', False)
setattr(args, 'html_true', False)
setattr(args, 'json_save_true', False)
setattr(args, 'socks', False)
setattr(args, 'analyze', False)
setattr(args, 'hosts', json_args['hosts'])
return args
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-H', '--host', dest='hosts', nargs='*',
required=False, help='Hosts as input separated by space')
group.add_argument('-f', '--host-file', dest='host_file',
required=False, help='Hosts as input from file')
parser.add_argument('-s', '--socks', dest='socks',
default=False, metavar='HOST:PORT',
help='Enable SOCKS proxy for connection')
parser.add_argument('-c', '--csv', dest='csv_enabled',
default=False, metavar='FILENAME.CSV',
help='Enable CSV file export')
parser.add_argument('-j', '--json', dest='json_true',
action='store_true', default=False,
help='Enable JSON in the output')
parser.add_argument('-S', '--summary', dest='summary_true',
action='store_true', default=False,
help='Enable summary output only')
parser.add_argument('-x', '--html', dest='html_true',
action='store_true', default=False,
help='Enable HTML file export')
parser.add_argument('-J', '--json-save', dest='json_save_true',
action='store_true', default=False,
help='Enable JSON export individually per host')
parser.add_argument('-a', '--analyze', dest='analyze',
default=False, action='store_true',
help='Enable SSL security analysis on the host')
parser.add_argument('-v', '--verbose', dest='verbose',
default=False, action='store_true',
help='Enable verbose to see what is going on')
parser.add_argument('-h', '--help', default=SUPPRESS,
action='help',
help='Show this help message and exit')
args = parser.parse_args()
# Get hosts from file if provided
if args.host_file:
with open(args.host_file) as f:
args.hosts = f.read().splitlines()
# Checks hosts list
if isinstance(args.hosts, list):
if len(args.hosts) == 0:
parser.print_help()
sys.exit(0)
return args
if __name__ == '__main__':
SSLCheckerObject = SSLChecker()
SSLCheckerObject.show_result(SSLCheckerObject.get_args(json_args={}))