From 76dc73114b10c29211c099b0a7184e75b0cd14aa Mon Sep 17 00:00:00 2001 From: kikolouro Date: Mon, 15 Mar 2021 11:57:11 +0000 Subject: [PATCH] Added support to database and support to notification via Email and Slack --- requirements.txt | 2 + ssl_checker.py | 139 +++++++++++++++++++++++++++++++++-------------- 2 files changed, 101 insertions(+), 40 deletions(-) diff --git a/requirements.txt b/requirements.txt index e7683b6..f5b9559 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ pyopenssl json2html +requests +mysql-connector-python \ No newline at end of file diff --git a/ssl_checker.py b/ssl_checker.py index 58857e2..b54f845 100755 --- a/ssl_checker.py +++ b/ssl_checker.py @@ -1,8 +1,6 @@ -#!/usr/bin/env python3 import socket import sys import json - from argparse import ArgumentParser, SUPPRESS from datetime import datetime from ssl import PROTOCOL_TLSv1 @@ -12,18 +10,32 @@ from csv import DictWriter try: from OpenSSL import SSL from json2html import * + import requests except ImportError: print('Please install required modules: pip install -r requirements.txt') sys.exit(1) -class Clr: - """Text colors.""" +def SendEmail(host, valid_days_to_expire): + url = "URL_TO_API" + r = requests.post(url, data={ + "host": host, + "valid_days_to_expire": valid_days_to_expire + }) + + if valid_days_to_expire < 0: + text = "O certificado expirou no host: {}. Expirou a {} Dias".format(host, valid_days_to_expire) + else: + text = "O certificado está a expirar no host: {}. Falta {} Dias.".format(host, valid_days_to_expire) + + url = "https://slack.com/api/chat.postMessage" + data = {"channel": "CHANNEL_NAME","text": text} + + r = requests.post(url,headers={ + "Content-type": "application/json", + "Authorization": "Bearer TOKEN"}, + data=json.dumps(data)) - RST = '\033[39m' - RED = '\033[31m' - GREEN = '\033[32m' - YELLOW = '\033[33m' class SSLChecker: @@ -38,14 +50,15 @@ class SSLChecker: if user_args.socks: import socks if user_args.verbose: - print('{}Socks proxy enabled{}\n'.format(Clr.YELLOW, Clr.RST)) + print('Socks proxy enabled\n') socks_host, socks_port = self.filter_hostname(user_args.socks) - socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, socks_host, int(socks_port), True) + socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, + socks_host, int(socks_port), True) socket.socket = socks.socksocket if user_args.verbose: - print('{}Connecting to socket{}\n'.format(Clr.YELLOW, Clr.RST)) + print('Connecting to socket\n') sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) osobj = SSL.Context(PROTOCOL_TLSv1) @@ -57,7 +70,7 @@ class SSLChecker: cert = oscon.get_peer_certificate() sock.close() if user_args.verbose: - print('{}Closing socket{}\n'.format(Clr.YELLOW, Clr.RST)) + print('Closing socket\n') return cert @@ -78,18 +91,19 @@ class SSLChecker: api_url = 'https://api.ssllabs.com/api/v3/' while True: if user_args.verbose: - print('{}Requesting analyze to {}{}\n'.format(Clr.YELLOW, api_url, Clr.RST)) + print('Requesting analyze to\n') - main_request = json.loads(urlopen(api_url + 'analyze?host={}'.format(host)).read().decode('utf-8')) + main_request = json.loads( + urlopen(api_url + 'analyze?host={}'.format(host)).read().decode('utf-8')) if main_request['status'] in ('DNS', 'IN_PROGRESS'): if user_args.verbose: - print('{}Analyze waiting for reports to be finished (5 secs){}\n'.format(Clr.YELLOW, Clr.RST)) + print('Analyze waiting for reports to be finished (5 secs)\n') sleep(5) continue elif main_request['status'] == 'READY': if user_args.verbose: - print('{}Analyze is ready{}\n'.format(Clr.YELLOW, Clr.RST)) + print('Analyze is ready\n') break @@ -97,7 +111,7 @@ class SSLChecker: host, main_request['endpoints'][0]['ipAddress'])).read().decode('utf-8')) if user_args.verbose: - print('{}Analyze report message: {}{}\n'.format(Clr.YELLOW, endpoint_data['statusMessage'], Clr.RST)) + print('Analyze report message: \n') # if the certificate is invalid if endpoint_data['statusMessage'] == 'Certificate not valid for domain name': @@ -130,6 +144,7 @@ class SSLChecker: def get_cert_info(self, host, cert): """Get all the information about cert and create a JSON file.""" + notification_days = 30 context = {} cert_subject = cert.get_subject() @@ -168,7 +183,10 @@ class SSLChecker: # Valid days left context['valid_days_to_expire'] = (datetime.strptime(context['valid_till'], - '%Y-%m-%d') - datetime.now()).days + '%Y-%m-%d') - datetime.now()).days + + if context['valid_days_to_expire'] < notification_days: + SendEmail(context['host'], context['valid_days_to_expire'],) if cert.has_expired(): self.total_expired += 1 @@ -183,27 +201,36 @@ class SSLChecker: def print_status(self, host, context, analyze=False): """Print all the usefull info about host.""" - print('\t{}[+]{} {}\n\t{}'.format(Clr.GREEN, Clr.RST, host, '-' * (len(host) + 5))) + print('\t[+] {}\n\t{}'.format(host, '-' * (len(host) + 5))) print('\t\tIssued domain: {}'.format(context[host]['issued_to'])) print('\t\tIssued to: {}'.format(context[host]['issued_o'])) - print('\t\tIssued by: {} ({})'.format(context[host]['issuer_o'], context[host]['issuer_c'])) + print('\t\tIssued by: {} ({})'.format( + context[host]['issuer_o'], context[host]['issuer_c'])) print('\t\tValid from: {}'.format(context[host]['valid_from'])) - print('\t\tValid to: {} ({} days left)'.format(context[host]['valid_till'], context[host]['valid_days_to_expire'])) + print('\t\tValid to: {} ({} days left)'.format( + context[host]['valid_till'], context[host]['valid_days_to_expire'])) print('\t\tValidity days: {}'.format(context[host]['validity_days'])) print('\t\tCertificate valid: {}'.format(context[host]['cert_valid'])) print('\t\tCertificate S/N: {}'.format(context[host]['cert_sn'])) print('\t\tCertificate SHA1 FP: {}'.format(context[host]['cert_sha1'])) print('\t\tCertificate version: {}'.format(context[host]['cert_ver'])) - print('\t\tCertificate algorithm: {}'.format(context[host]['cert_alg'])) + print('\t\tCertificate algorithm: {}'.format( + context[host]['cert_alg'])) if analyze: print('\t\tCertificate grade: {}'.format(context[host]['grade'])) - print('\t\tPoodle vulnerability: {}'.format(context[host]['poodle_vuln'])) - print('\t\tHeartbleed vulnerability: {}'.format(context[host]['heartbleed_vuln'])) - print('\t\tHeartbeat vulnerability: {}'.format(context[host]['heartbeat_vuln'])) - print('\t\tFreak vulnerability: {}'.format(context[host]['freak_vuln'])) - print('\t\tLogjam vulnerability: {}'.format(context[host]['logjam_vuln'])) - print('\t\tDrown vulnerability: {}'.format(context[host]['drownVulnerable'])) + print('\t\tPoodle vulnerability: {}'.format( + context[host]['poodle_vuln'])) + print('\t\tHeartbleed vulnerability: {}'.format( + context[host]['heartbleed_vuln'])) + print('\t\tHeartbeat vulnerability: {}'.format( + context[host]['heartbeat_vuln'])) + print('\t\tFreak vulnerability: {}'.format( + context[host]['freak_vuln'])) + print('\t\tLogjam vulnerability: {}'.format( + context[host]['logjam_vuln'])) + print('\t\tDrown vulnerability: {}'.format( + context[host]['drownVulnerable'])) print('\t\tExpired: {}'.format(context[host]['cert_exp'])) print('\t\tCertificate SAN\'s: ') @@ -223,11 +250,12 @@ class SSLChecker: self.border_msg(' Analyzing {} host(s) '.format(len(hosts))) if not user_args.json_true and user_args.analyze: - print('{}Warning: -a/--analyze is enabled. It takes more time...{}\n'.format(Clr.YELLOW, Clr.RST)) + print( + 'Warning: -a/--analyze is enabled. It takes more time...\n') for host in hosts: if user_args.verbose: - print('{}Working on host: {}{}\n'.format(Clr.YELLOW, host, Clr.RST)) + print('Working on host:{}\n'.format(host)) host, port = self.filter_hostname(host) @@ -248,14 +276,16 @@ class SSLChecker: self.print_status(host, context, user_args.analyze) except SSL.SysCallError: if not user_args.json_true: - print('\t{}[-]{} {:<20s} Failed: Misconfigured SSL/TLS\n'.format(Clr.RED, Clr.RST, host)) + print( + '\t[-] {:<20s} Failed: Misconfigured SSL/TLS\n'.format(host)) self.total_failed += 1 except Exception as error: if not user_args.json_true: - print('\t{}[-]{} {:<20s} Failed: {}\n'.format(Clr.RED, Clr.RST, host, error)) + print( + '\t[-] {:<20s} Failed: {}\n'.format(host, error)) self.total_failed += 1 except KeyboardInterrupt: - print('{}Canceling script...{}\n'.format(Clr.YELLOW, Clr.RST)) + print('Canceling script...\n') sys.exit(1) if not user_args.json_true: @@ -291,10 +321,11 @@ class SSLChecker: """Export all context results to CSV file.""" # prepend dict keys to write column headers if user_args.verbose: - print('{}Generating CSV export{}\n'.format(Clr.YELLOW, Clr.RST)) + print('Generating CSV export\n') with open(filename, 'w') as csv_file: - csv_writer = DictWriter(csv_file, list(context.items())[0][1].keys()) + csv_writer = DictWriter( + csv_file, list(context.items())[0][1].keys()) csv_writer.writeheader() for host in context.keys(): csv_writer.writerow(context[host]) @@ -310,7 +341,8 @@ class SSLChecker: def filter_hostname(self, host): """Remove unused characters and split by address and port.""" - host = host.replace('http://', '').replace('https://', '').replace('/', '') + host = host.replace( + 'http://', '').replace('https://', '').replace('/', '') port = 443 if ':' in host: host, port = host.split(':') @@ -332,6 +364,7 @@ class SSLChecker: setattr(args, 'socks', False) setattr(args, 'analyze', False) setattr(args, 'hosts', json_args['hosts']) + setattr(args, 'db', False) return args group = parser.add_mutually_exclusive_group(required=True) @@ -339,6 +372,8 @@ class SSLChecker: required=False, help='Hosts as input separated by space') group.add_argument('-f', '--host-file', dest='host_file', required=False, help='Hosts as input from file') + group.add_argument('-db', '--database', dest='db', + default=False, action="store_true", help='Uses Certificados Database') parser.add_argument('-s', '--socks', dest='socks', default=False, metavar='HOST:PORT', help='Enable SOCKS proxy for connection') @@ -368,12 +403,36 @@ class SSLChecker: help='Show this help message and exit') args = parser.parse_args() + + if args.db: + pw = "PASSWORD" + import mysql.connector + mydb = mysql.connector.connect( + host="localhost", + user="USER", + password=pw, + database="DATABASE" + ) + mycursor = mydb.cursor() + sql = "SELECT host FROM hosts" + mycursor.execute(sql) + + myresult = mycursor.fetchall() + array = [] + for x in range(len(myresult)): + array.append(myresult[x][0]) + args.hosts = array + # Get hosts from file if provided - if args.host_file: - with open(args.host_file) as f: - args.hosts = f.read().splitlines() - + else: + if args.host_file: + with open(args.host_file) as f: + args.hosts = f.read().splitlines() + print(args.hosts) + exit() + + # Checks hosts list if isinstance(args.hosts, list): if len(args.hosts) == 0: