#!/var/ossec/framework/python/bin/python3 # Copyright (C) 2015-2022, Wazuh Inc. # Template by WhatDoesKmean import json import sys import time import os from socket import socket, AF_UNIX, SOCK_DGRAM try: import requests from requests.auth import HTTPBasicAuth except Exception as e: print("No module 'requests' found. Install: pip install requests") sys.exit(1) # Global vars debug_enabled = False pwd = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) print(pwd) #exit() json_alert = {} now = time.strftime("%a %b %d %H:%M:%S %Z %Y") # Set paths log_file = '{0}/logs/integrations.log'.format(pwd) socket_addr = '{0}/queue/sockets/queue'.format(pwd) def main(args): debug("# Starting") # Read args alert_file_location = args[1] apikey = args[2] debug("# API Key") debug(apikey) debug("# File location") debug(alert_file_location) # Load alert. Parse JSON object. with open(alert_file_location) as alert_file: json_alert = json.load(alert_file) debug("# Processing alert") debug(json_alert) # Request whois info msg = request_whois_info(json_alert,apikey) # If positive match, send event to Wazuh Manager if msg: send_event(msg, json_alert["agent"]) def debug(msg): if debug_enabled: msg = "{0}: {1}\n".format(now, msg) print(msg) f = open(log_file,"a") f.write(str(msg)) f.close() def collect(data): domainName = data['domainName'] registrarName = data['registrarName'] contactEmail = data['contactEmail'] domainNameExt = data['domainNameExt'] estimatedDomainAge = data['estimatedDomainAge'] registryData = data['registryData']['strippedText'] return domainName,registrarName,contactEmail,domainNameExt,estimatedDomainAge,registryData def in_database(data, srcip): result = data['domainName'] if result == 0: return False return True def query_api(domainName, apikey): params = {"domainName": domainName, "apiKey": apikey, "outputFormat":"JSON"} url = "https://www.whoisxmlapi.com/whoisserver/WhoisService" response = requests.get(url, params=params) if response.status_code == 200: json_response = response.json()["WhoisRecord"] if "estimatedDomainAge" not in json_response: json_response["estimatedDomainAge"] = "NotFound" data = json_response return data else: alert_output = {} alert_output["whois"] = {} alert_output["integration"] = "custom-whois" json_response = response.json() debug("# Error: The whois encountered an error") alert_output["whois"]["error"] = response.status_code alert_output["whois"]["description"] = json_response["errors"][0]["detail"] send_event(alert_output) exit(0) def request_whois_info(alert, apikey): alert_output = {} # If there is no source ip address present in the alert. Exit. if not "srcip" in alert["data"]: return(0) # Request info using whois API data = query_api(alert["data"]["srcip"], apikey) # Create alert alert_output["whois"] = {} alert_output["integration"] = "custom-whois" alert_output["whois"]["found"] = 0 alert_output["whois"]["source"] = {} alert_output["whois"]["source"]["alert_id"] = alert["id"] alert_output["whois"]["source"]["rule"] = alert["rule"]["id"] alert_output["whois"]["source"]["description"] = alert["rule"]["description"] alert_output["whois"]["source"]["full_log"] = alert["full_log"] alert_output["whois"]["source"]["srcip"] = alert["data"]["srcip"] srcip = alert["data"]["srcip"] # Check if whois has any info about the srcip if in_database(data, srcip): alert_output["whois"]["found"] = 1 # Info about the IP found in whois if alert_output["whois"]["found"] == 1: domainName,registrarName,contactEmail,domainNameExt,estimatedDomainAge,registryData = collect(data) # Populate JSON Output object with whois request alert_output["whois"]["domainName"] = domainName alert_output["whois"]["registrarName"] = registrarName alert_output["whois"]["contactEmail"] = contactEmail alert_output["whois"]["domainNameExt"] = domainNameExt alert_output["whois"]["estimatedDomainAge"] = estimatedDomainAge alert_output["whois"]["registryData"] = registryData debug(alert_output) return(alert_output) def send_event(msg, agent = None): if not agent or agent["id"] == "000": string = '1:whois:{0}'.format(json.dumps(msg)) else: string = '1:[{0}] ({1}) {2}->whois:{3}'.format(agent["id"], agent["name"], agent["ip"] if "ip" in agent else "any", json.dumps(msg)) debug(string) sock = socket(AF_UNIX, SOCK_DGRAM) sock.connect(socket_addr) sock.send(string.encode()) sock.close() if __name__ == "__main__": try: # Read arguments bad_arguments = False if len(sys.argv) >= 4: msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '') debug_enabled = (len(sys.argv) > 4 and sys.argv[4] == 'debug') else: msg = '{0} Wrong arguments'.format(now) bad_arguments = True # Logging the call f = open(log_file, 'a') f.write(str(msg) + '\n') f.close() if bad_arguments: debug("# Exiting: Bad arguments.") sys.exit(1) # Main function main(sys.argv) except Exception as e: debug(str(e)) raise