!/var/ossec/framework/python/bin/python3
Copyright (C) 2015-2022, Wazuh Inc.
Template by WhatDoesKmean
import json import sys import time import os from socket import socket, AF_UNIX, SOCK_DGRAM
try: import requests from requests.auth import HTTPBasicAuth except Exception as e: print("No module 'requests' found. Install: pip install requests") sys.exit(1)
Global vars
debug_enabled = False pwd = os.path.dirname(os.path.dirname(os.path.realpath(file)))
print(pwd)
exit()
json_alert = {} now = time.strftime("%a %b %d %H:%M:%S %Z %Y")
Set paths
log_file = '{0}/logs/integrations.log'.format(pwd) socket_addr = '{0}/queue/sockets/queue'.format(pwd)
def main(args): debug("# Starting")
Read args
alert_file_location = args[1]
apikey = args[2]
debug("# API Key")
debug(apikey)
debug("# File location")
debug(alert_file_location)
# Load alert. Parse JSON object.
with open(alert_file_location) as alert_file:
json_alert = json.load(alert_file)
debug("# Processing alert")
debug(json_alert)
# Request whois info
msg = request_whois_info(json_alert,apikey)
# If positive match, send event to Wazuh Manager
if msg:
send_event(msg, json_alert["agent"])
def debug(msg): if debug_enabled: msg = "{0}: {1}\n".format(now, msg) print(msg) f = open(log_file,"a") f.write(str(msg)) f.close()
def collect(data):
domainName = data['domainName']
registrarName = data['registrarName']
contactEmail = data['contactEmail']
domainNameExt = data['domainNameExt']
estimatedDomainAge = data['estimatedDomainAge']
registryData = data['registryData']['strippedText']
return domainName,registrarName,contactEmail,domainNameExt,estimatedDomainAge,registryData
def in_database(data, srcip): result = data['domainName'] if result == 0: return False return True
def query_api(domainName, apikey): params = {"domainName": domainName, "apiKey": apikey, "outputFormat":"JSON"} url = "https://www.whoisxmlapi.com/whoisserver/WhoisService" response = requests.get(url, params=params)
if response.status_code == 200: json_response = response.json()["WhoisRecord"] if "estimatedDomainAge" not in json_response: json_response["estimatedDomainAge"] = "NotFound"
data = json_response
return data
else: alert_output = {} alert_output["whois"] = {} alert_output["integration"] = "custom-whois" json_response = response.json() debug("# Error: The whois encountered an error") alert_output["whois"]["error"] = response.status_code alert_output["whois"]["description"] = json_response["errors"][0]["detail"] send_event(alert_output) exit(0)
def request_whois_info(alert, apikey): alert_output = {}
If there is no source ip address present in the alert. Exit.
if not "srcip" in alert["data"]:
return(0)
# Request info using whois API
data = query_api(alert["data"]["srcip"], apikey)
# Create alert
alert_output["whois"] = {}
alert_output["integration"] = "custom-whois"
alert_output["whois"]["found"] = 0
alert_output["whois"]["source"] = {}
alert_output["whois"]["source"]["alert_id"] = alert["id"]
alert_output["whois"]["source"]["rule"] = alert["rule"]["id"]
alert_output["whois"]["source"]["description"] = alert["rule"]["description"]
alert_output["whois"]["source"]["full_log"] = alert["full_log"]
alert_output["whois"]["source"]["srcip"] = alert["data"]["srcip"]
srcip = alert["data"]["srcip"]
# Check if whois has any info about the srcip
if in_database(data, srcip):
alert_output["whois"]["found"] = 1
# Info about the IP found in whois
if alert_output["whois"]["found"] == 1:
domainName,registrarName,contactEmail,domainNameExt,estimatedDomainAge,registryData = collect(data)
# Populate JSON Output object with whois request
alert_output["whois"]["domainName"] = domainName
alert_output["whois"]["registrarName"] = registrarName
alert_output["whois"]["contactEmail"] = contactEmail
alert_output["whois"]["domainNameExt"] = domainNameExt
alert_output["whois"]["estimatedDomainAge"] = estimatedDomainAge
alert_output["whois"]["registryData"] = registryData
debug(alert_output)
return(alert_output)
def send_event(msg, agent = None): if not agent or agent["id"] == "000": string = '1:whois:{0}'.format(json.dumps(msg)) else: string = '1:[{0}] ({1}) {2}->whois:{3}'.format(agent["id"], agent["name"], agent["ip"] if "ip" in agent else "any", json.dumps(msg))
debug(string)
sock = socket(AF_UNIX, SOCK_DGRAM)
sock.connect(socket_addr)
sock.send(string.encode())
sock.close()
if name == "main": try:
Read arguments
bad_arguments = False
if len(sys.argv) >= 4:
msg = '{0} {1} {2} {3} {4}'.format(now, sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4] if len(sys.argv) > 4 else '')
debug_enabled = (len(sys.argv) > 4 and sys.argv[4] == 'debug')
else:
msg = '{0} Wrong arguments'.format(now)
bad_arguments = True
# Logging the call
f = open(log_file, 'a')
f.write(str(msg) + '\n')
f.close()
if bad_arguments:
debug("# Exiting: Bad arguments.")
sys.exit(1)
# Main function
main(sys.argv)
except Exception as e:
debug(str(e))
raise