Added support to database and support to notification via Email and Slack

This commit is contained in:
kikolouro 2021-03-15 11:57:11 +00:00
parent 05bd3d5371
commit 76dc73114b
2 changed files with 101 additions and 40 deletions

View File

@ -1,2 +1,4 @@
pyopenssl pyopenssl
json2html json2html
requests
mysql-connector-python

View File

@ -1,8 +1,6 @@
#!/usr/bin/env python3
import socket import socket
import sys import sys
import json import json
from argparse import ArgumentParser, SUPPRESS from argparse import ArgumentParser, SUPPRESS
from datetime import datetime from datetime import datetime
from ssl import PROTOCOL_TLSv1 from ssl import PROTOCOL_TLSv1
@ -12,18 +10,32 @@ from csv import DictWriter
try: try:
from OpenSSL import SSL from OpenSSL import SSL
from json2html import * from json2html import *
import requests
except ImportError: except ImportError:
print('Please install required modules: pip install -r requirements.txt') print('Please install required modules: pip install -r requirements.txt')
sys.exit(1) sys.exit(1)
class Clr: def SendEmail(host, valid_days_to_expire):
"""Text colors.""" url = "URL_TO_API"
r = requests.post(url, data={
"host": host,
"valid_days_to_expire": valid_days_to_expire
})
if valid_days_to_expire < 0:
text = "O certificado expirou no host: {}. Expirou a {} Dias".format(host, valid_days_to_expire)
else:
text = "O certificado está a expirar no host: {}. Falta {} Dias.".format(host, valid_days_to_expire)
url = "https://slack.com/api/chat.postMessage"
data = {"channel": "CHANNEL_NAME","text": text}
r = requests.post(url,headers={
"Content-type": "application/json",
"Authorization": "Bearer TOKEN"},
data=json.dumps(data))
RST = '\033[39m'
RED = '\033[31m'
GREEN = '\033[32m'
YELLOW = '\033[33m'
class SSLChecker: class SSLChecker:
@ -38,14 +50,15 @@ class SSLChecker:
if user_args.socks: if user_args.socks:
import socks import socks
if user_args.verbose: if user_args.verbose:
print('{}Socks proxy enabled{}\n'.format(Clr.YELLOW, Clr.RST)) print('Socks proxy enabled\n')
socks_host, socks_port = self.filter_hostname(user_args.socks) socks_host, socks_port = self.filter_hostname(user_args.socks)
socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, socks_host, int(socks_port), True) socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5,
socks_host, int(socks_port), True)
socket.socket = socks.socksocket socket.socket = socks.socksocket
if user_args.verbose: if user_args.verbose:
print('{}Connecting to socket{}\n'.format(Clr.YELLOW, Clr.RST)) print('Connecting to socket\n')
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
osobj = SSL.Context(PROTOCOL_TLSv1) osobj = SSL.Context(PROTOCOL_TLSv1)
@ -57,7 +70,7 @@ class SSLChecker:
cert = oscon.get_peer_certificate() cert = oscon.get_peer_certificate()
sock.close() sock.close()
if user_args.verbose: if user_args.verbose:
print('{}Closing socket{}\n'.format(Clr.YELLOW, Clr.RST)) print('Closing socket\n')
return cert return cert
@ -78,18 +91,19 @@ class SSLChecker:
api_url = 'https://api.ssllabs.com/api/v3/' api_url = 'https://api.ssllabs.com/api/v3/'
while True: while True:
if user_args.verbose: if user_args.verbose:
print('{}Requesting analyze to {}{}\n'.format(Clr.YELLOW, api_url, Clr.RST)) print('Requesting analyze to\n')
main_request = json.loads(urlopen(api_url + 'analyze?host={}'.format(host)).read().decode('utf-8')) main_request = json.loads(
urlopen(api_url + 'analyze?host={}'.format(host)).read().decode('utf-8'))
if main_request['status'] in ('DNS', 'IN_PROGRESS'): if main_request['status'] in ('DNS', 'IN_PROGRESS'):
if user_args.verbose: if user_args.verbose:
print('{}Analyze waiting for reports to be finished (5 secs){}\n'.format(Clr.YELLOW, Clr.RST)) print('Analyze waiting for reports to be finished (5 secs)\n')
sleep(5) sleep(5)
continue continue
elif main_request['status'] == 'READY': elif main_request['status'] == 'READY':
if user_args.verbose: if user_args.verbose:
print('{}Analyze is ready{}\n'.format(Clr.YELLOW, Clr.RST)) print('Analyze is ready\n')
break break
@ -97,7 +111,7 @@ class SSLChecker:
host, main_request['endpoints'][0]['ipAddress'])).read().decode('utf-8')) host, main_request['endpoints'][0]['ipAddress'])).read().decode('utf-8'))
if user_args.verbose: if user_args.verbose:
print('{}Analyze report message: {}{}\n'.format(Clr.YELLOW, endpoint_data['statusMessage'], Clr.RST)) print('Analyze report message: \n')
# if the certificate is invalid # if the certificate is invalid
if endpoint_data['statusMessage'] == 'Certificate not valid for domain name': if endpoint_data['statusMessage'] == 'Certificate not valid for domain name':
@ -130,6 +144,7 @@ class SSLChecker:
def get_cert_info(self, host, cert): def get_cert_info(self, host, cert):
"""Get all the information about cert and create a JSON file.""" """Get all the information about cert and create a JSON file."""
notification_days = 30
context = {} context = {}
cert_subject = cert.get_subject() cert_subject = cert.get_subject()
@ -168,7 +183,10 @@ class SSLChecker:
# Valid days left # Valid days left
context['valid_days_to_expire'] = (datetime.strptime(context['valid_till'], context['valid_days_to_expire'] = (datetime.strptime(context['valid_till'],
'%Y-%m-%d') - datetime.now()).days '%Y-%m-%d') - datetime.now()).days
if context['valid_days_to_expire'] < notification_days:
SendEmail(context['host'], context['valid_days_to_expire'],)
if cert.has_expired(): if cert.has_expired():
self.total_expired += 1 self.total_expired += 1
@ -183,27 +201,36 @@ class SSLChecker:
def print_status(self, host, context, analyze=False): def print_status(self, host, context, analyze=False):
"""Print all the usefull info about host.""" """Print all the usefull info about host."""
print('\t{}[+]{} {}\n\t{}'.format(Clr.GREEN, Clr.RST, host, '-' * (len(host) + 5))) print('\t[+] {}\n\t{}'.format(host, '-' * (len(host) + 5)))
print('\t\tIssued domain: {}'.format(context[host]['issued_to'])) print('\t\tIssued domain: {}'.format(context[host]['issued_to']))
print('\t\tIssued to: {}'.format(context[host]['issued_o'])) print('\t\tIssued to: {}'.format(context[host]['issued_o']))
print('\t\tIssued by: {} ({})'.format(context[host]['issuer_o'], context[host]['issuer_c'])) print('\t\tIssued by: {} ({})'.format(
context[host]['issuer_o'], context[host]['issuer_c']))
print('\t\tValid from: {}'.format(context[host]['valid_from'])) print('\t\tValid from: {}'.format(context[host]['valid_from']))
print('\t\tValid to: {} ({} days left)'.format(context[host]['valid_till'], context[host]['valid_days_to_expire'])) print('\t\tValid to: {} ({} days left)'.format(
context[host]['valid_till'], context[host]['valid_days_to_expire']))
print('\t\tValidity days: {}'.format(context[host]['validity_days'])) print('\t\tValidity days: {}'.format(context[host]['validity_days']))
print('\t\tCertificate valid: {}'.format(context[host]['cert_valid'])) print('\t\tCertificate valid: {}'.format(context[host]['cert_valid']))
print('\t\tCertificate S/N: {}'.format(context[host]['cert_sn'])) print('\t\tCertificate S/N: {}'.format(context[host]['cert_sn']))
print('\t\tCertificate SHA1 FP: {}'.format(context[host]['cert_sha1'])) print('\t\tCertificate SHA1 FP: {}'.format(context[host]['cert_sha1']))
print('\t\tCertificate version: {}'.format(context[host]['cert_ver'])) print('\t\tCertificate version: {}'.format(context[host]['cert_ver']))
print('\t\tCertificate algorithm: {}'.format(context[host]['cert_alg'])) print('\t\tCertificate algorithm: {}'.format(
context[host]['cert_alg']))
if analyze: if analyze:
print('\t\tCertificate grade: {}'.format(context[host]['grade'])) print('\t\tCertificate grade: {}'.format(context[host]['grade']))
print('\t\tPoodle vulnerability: {}'.format(context[host]['poodle_vuln'])) print('\t\tPoodle vulnerability: {}'.format(
print('\t\tHeartbleed vulnerability: {}'.format(context[host]['heartbleed_vuln'])) context[host]['poodle_vuln']))
print('\t\tHeartbeat vulnerability: {}'.format(context[host]['heartbeat_vuln'])) print('\t\tHeartbleed vulnerability: {}'.format(
print('\t\tFreak vulnerability: {}'.format(context[host]['freak_vuln'])) context[host]['heartbleed_vuln']))
print('\t\tLogjam vulnerability: {}'.format(context[host]['logjam_vuln'])) print('\t\tHeartbeat vulnerability: {}'.format(
print('\t\tDrown vulnerability: {}'.format(context[host]['drownVulnerable'])) context[host]['heartbeat_vuln']))
print('\t\tFreak vulnerability: {}'.format(
context[host]['freak_vuln']))
print('\t\tLogjam vulnerability: {}'.format(
context[host]['logjam_vuln']))
print('\t\tDrown vulnerability: {}'.format(
context[host]['drownVulnerable']))
print('\t\tExpired: {}'.format(context[host]['cert_exp'])) print('\t\tExpired: {}'.format(context[host]['cert_exp']))
print('\t\tCertificate SAN\'s: ') print('\t\tCertificate SAN\'s: ')
@ -223,11 +250,12 @@ class SSLChecker:
self.border_msg(' Analyzing {} host(s) '.format(len(hosts))) self.border_msg(' Analyzing {} host(s) '.format(len(hosts)))
if not user_args.json_true and user_args.analyze: if not user_args.json_true and user_args.analyze:
print('{}Warning: -a/--analyze is enabled. It takes more time...{}\n'.format(Clr.YELLOW, Clr.RST)) print(
'Warning: -a/--analyze is enabled. It takes more time...\n')
for host in hosts: for host in hosts:
if user_args.verbose: if user_args.verbose:
print('{}Working on host: {}{}\n'.format(Clr.YELLOW, host, Clr.RST)) print('Working on host:{}\n'.format(host))
host, port = self.filter_hostname(host) host, port = self.filter_hostname(host)
@ -248,14 +276,16 @@ class SSLChecker:
self.print_status(host, context, user_args.analyze) self.print_status(host, context, user_args.analyze)
except SSL.SysCallError: except SSL.SysCallError:
if not user_args.json_true: if not user_args.json_true:
print('\t{}[-]{} {:<20s} Failed: Misconfigured SSL/TLS\n'.format(Clr.RED, Clr.RST, host)) print(
'\t[-] {:<20s} Failed: Misconfigured SSL/TLS\n'.format(host))
self.total_failed += 1 self.total_failed += 1
except Exception as error: except Exception as error:
if not user_args.json_true: if not user_args.json_true:
print('\t{}[-]{} {:<20s} Failed: {}\n'.format(Clr.RED, Clr.RST, host, error)) print(
'\t[-] {:<20s} Failed: {}\n'.format(host, error))
self.total_failed += 1 self.total_failed += 1
except KeyboardInterrupt: except KeyboardInterrupt:
print('{}Canceling script...{}\n'.format(Clr.YELLOW, Clr.RST)) print('Canceling script...\n')
sys.exit(1) sys.exit(1)
if not user_args.json_true: if not user_args.json_true:
@ -291,10 +321,11 @@ class SSLChecker:
"""Export all context results to CSV file.""" """Export all context results to CSV file."""
# prepend dict keys to write column headers # prepend dict keys to write column headers
if user_args.verbose: if user_args.verbose:
print('{}Generating CSV export{}\n'.format(Clr.YELLOW, Clr.RST)) print('Generating CSV export\n')
with open(filename, 'w') as csv_file: with open(filename, 'w') as csv_file:
csv_writer = DictWriter(csv_file, list(context.items())[0][1].keys()) csv_writer = DictWriter(
csv_file, list(context.items())[0][1].keys())
csv_writer.writeheader() csv_writer.writeheader()
for host in context.keys(): for host in context.keys():
csv_writer.writerow(context[host]) csv_writer.writerow(context[host])
@ -310,7 +341,8 @@ class SSLChecker:
def filter_hostname(self, host): def filter_hostname(self, host):
"""Remove unused characters and split by address and port.""" """Remove unused characters and split by address and port."""
host = host.replace('http://', '').replace('https://', '').replace('/', '') host = host.replace(
'http://', '').replace('https://', '').replace('/', '')
port = 443 port = 443
if ':' in host: if ':' in host:
host, port = host.split(':') host, port = host.split(':')
@ -332,6 +364,7 @@ class SSLChecker:
setattr(args, 'socks', False) setattr(args, 'socks', False)
setattr(args, 'analyze', False) setattr(args, 'analyze', False)
setattr(args, 'hosts', json_args['hosts']) setattr(args, 'hosts', json_args['hosts'])
setattr(args, 'db', False)
return args return args
group = parser.add_mutually_exclusive_group(required=True) group = parser.add_mutually_exclusive_group(required=True)
@ -339,6 +372,8 @@ class SSLChecker:
required=False, help='Hosts as input separated by space') required=False, help='Hosts as input separated by space')
group.add_argument('-f', '--host-file', dest='host_file', group.add_argument('-f', '--host-file', dest='host_file',
required=False, help='Hosts as input from file') required=False, help='Hosts as input from file')
group.add_argument('-db', '--database', dest='db',
default=False, action="store_true", help='Uses Certificados Database')
parser.add_argument('-s', '--socks', dest='socks', parser.add_argument('-s', '--socks', dest='socks',
default=False, metavar='HOST:PORT', default=False, metavar='HOST:PORT',
help='Enable SOCKS proxy for connection') help='Enable SOCKS proxy for connection')
@ -369,10 +404,34 @@ class SSLChecker:
args = parser.parse_args() args = parser.parse_args()
if args.db:
pw = "PASSWORD"
import mysql.connector
mydb = mysql.connector.connect(
host="localhost",
user="USER",
password=pw,
database="DATABASE"
)
mycursor = mydb.cursor()
sql = "SELECT host FROM hosts"
mycursor.execute(sql)
myresult = mycursor.fetchall()
array = []
for x in range(len(myresult)):
array.append(myresult[x][0])
args.hosts = array
# Get hosts from file if provided # Get hosts from file if provided
if args.host_file: else:
with open(args.host_file) as f: if args.host_file:
args.hosts = f.read().splitlines() with open(args.host_file) as f:
args.hosts = f.read().splitlines()
print(args.hosts)
exit()
# Checks hosts list # Checks hosts list
if isinstance(args.hosts, list): if isinstance(args.hosts, list):