Websites today make extensive use of client-side JavaScript to provide a rich user experience. These scripts run on your users’ browsers and their activity isn’t logged anywhere by your server-side security stack. However, visibility into this client-side attack surface should be a critical component of any web application security operations playbook.
HUMAN Code Defender is a client-side application security solution that provides this missing visibility. Using behavioral analysis and advanced machine learning, it detects vulnerable scripts, suspicious PII access and data leakage from your users browsers. These prioritized incidents are a very important component of your security operations workflow for protecting your website against digital skimming, formjacking and Magecart attacks.
Code Defender surfaces these incidents within the web-based Dashboard. As a Code Defender user, you can also have these incidents generate alerts on your existing security workflow tools such as:
This blog post shows you how to send Code Defender alerts into a Security Orchestration, Automation and Response (SOAR) system. Once the alerts have been added to your SOAR, you can then create investigation workflows that can deliver fully automated verdicts or enrich the information to enable security analysts to make timely decisions, all within a single platform.
This blog post will require some knowledge of Python, a language many SOAR platforms support, but also provides some sample code to help get you started. We will break this down into the following steps so that you can tackle each part in increments.
Why Slack? Great question and why not? Slack is a common integration point between Code Defender and many SOAR platforms. If you are already using Slack within your organization, Slack provides alert visibility. Slack can also serve as a way to preserve alerts and will act as an alert message bus for the purposes of this integration. With so much focus on Slack, it is important that you have a Slack workspace and channel setup for the integration.
If you want to set up Slack and do not currently run Slack, then you will want to follow Slack’s Create a Slack workspace. Once you have a workspace, you will next need to create a channel to be used for alerts, create a Slack App, and create an API token for the App. I won’t spend too much time on these steps since there are a number of resources available on these topics. A few important notes regarding the App and API token though:
Make sure you provide your app with at least the following permissions to your Slack channel:
With your Slack all setup and ready to go, you will want to open a ticket with HUMAN Support so they can configure alerting to your Slack channel. HUMAN support will need the following information in order to configure alerting:
With all of the basic steps completed, it is now time to get your integration code up and running. I’ll start by admitting that Slack does provide its own SDK for a number of languages and you can always choose to use them if you like. I went with a generic approach on this integration since not all SOAR platforms will allow you to bring your own SDKs/libraries/modules into your custom modules. For this reason, I created the below code to account for an inability to bring in outside libraries and only use available Python libraries. The nice thing about the SOAR I used for my example, Siemplify, is that it does make the requests library available. Let’s walk through building out the code a piece at a time.
I started by first building out my own little class that I could use across other future scripts. This class is going to be at the core of my Slack integration.
import requests
CD_MSG_TITLE = 'Code Defender has detected a new incident'
class PerimeterXManagerException(Exception):
""" General Exception for PerimeterX manager """
pass
class PerimeterXManager(object):
def __init__(self, slack_channel=None, slack_api_key=None, connector_type=None, offset_in_seconds=0):
self.slack_channel = slack_channel
self.slack_api_key = slack_api_key
self.slack_offset = offset_in_seconds
self.connector_type = connector_type
self.slack_cursor = ''
self.paginated = False
self.messages = []
This starts pretty simple with importing the requests function since we will be making web related calls to the Slack API. Code Defender sends alerts to Slack with a title of “Code Defender has detected a new incident”. We will use this title to be able to locate the messages we’ll want to bring into the SOAR. The PerimeterXManagerException class is just a simple one that I can use to capture generic error messages later.
We start with the bulk of our code with the PerimeterXManager class and we define the various items we’ll want to be able to use as follows:
Object Property | Property Description |
slack_channel | The Slack channel where your Code Defender alerts are sent. |
slack_api_key | The Slack App API key you created for your integration. |
slack_offset | This is used so that we don’t constantly scan back over all of the conversations in the channel. By default, this will be 0, all messages, but once we’ve started running the integration, we should be able to keep track of the last time we received messages and come back to that point. |
connector_type | Unused at this time in case I wanted to develop additional integrations that weren’t just for Slack. |
slack_cursor | In order to support paginated results, we’ll store the cursor for the next results. |
paginated | This is used as a test to determine if we have received paginated results or not. |
messages | The messages we plan to return to the SOAR. |
With my basic class defined, we need our next function, getslackchannel_id:
def get_slack_channel_id(self):
response = requests.get(
'https://slack.com/api/conversations.list',
headers={'Authorization': 'Bearer ' + self.slack_api_key}
)
# curl -H 'Authorization: Bearer slack_api_key' https://slack.com/api/conversations.list
# foreach channels if name == slack_channel, then return id
if response.status_code != 200:
print('Failure')
return False
json_response = response.json()
# check to make sure we've got a channels array
if 'channels' not in json_response:
print('No Channels Identified')
return False
# check to make sure the channels is a list
if type(json_response['channels']) != list:
print('Not a valid list of channels')
return False
# step through the channels looking for the one we want
for x in json_response['channels']:
# if this is the channel we want then return the id
if x['name'] == self.slack_channel:
return x['id']
return False
This will connect to the Slack API’s conversations.list endpoint and search for the provided channel by name. If the channel is found by name, we return the id associated with that channel. In order to connect to Slack, we need to know the channel id and not just the channel name. With channel id in hand, we next grab all of the messages on the channel using getslackmessages:
def get_slack_messages(self):
channelId = self.get_slack_channel_id()
if channelId == False:
print('No Channel ID Given for get_slack_messages')
return False
response = requests.get(
'https://slack.com/api/conversations.history',
params={'channel': channelId, 'limit': 1, 'cursor': self.slack_cursor, 'oldest': self.slack_offset},
headers={'Authorization': 'Bearer ' + self.slack_api_key}
)
if response.status_code != 200:
print('Failure')
return False
json_response = response.json()
if json_response['has_more'] == True:
self.pagination = 1
self.slack_cursor = json_response['response_metadata']['next_cursor']
else:
self.pagination = 0
self.slack_cursor = ''
if 'messages' not in json_response:
return False
# Check to make sure we got some messages returned
if json_response['messages'] == False:
return False
# Check to make sure there's messages in the list
if len(json_response['messages']) < 1:
print('Empty messages')
return False
# walk through our retrieved messages to find CD related entries
for x in json_response['messages']:
# Check for a Code Defender specific message
if x['type'] == 'message' and 'attachments' in x and x['attachments'][0]['title'] == CD_MSG_TITLE:
self.messages.append(self.formatSlackMsg(x))
if self.pagination == 1:
self.get_slack_messages()
return self.messages
This is a recursive function that’ll handle pagination if we receive more alerts than a single API call will return. You’ll also see a helper function in here called formatSlackMsg. An example Code Defender alert as seen raw from the Slack API looks like this:
{
"type": "message",
"subtype": "bot_message",
"text": "",
"ts": "1600689944.000500",
"bot_id": "B01BSD0CHSL",
"attachments": [
{
"text": "Test from PerimeterX \n more details",
"title": "Code Defender has detected a new incident",
"id": 1,
"thumb_height": 200,
"thumb_width": 200,
"thumb_url": "",
"color": "FF4A55",
"fields": [
{
"title": "Script",
"value": "`www.somedomain.com/scripts/script.js`",
"short": false
},
{
"title": "Risk Level",
"value": "High",
"short": true
},
{
"title": "Host Domain",
"value": "<http://example.com|example.com>",
"short": true
}
],
"actions": [
{
"id": "1",
"text": "View in Console",
"type": "button",
"style": "primary",
"url": "https://console.perimeterx.com/codeDefender/analyzer/script/<some random hash>?from=slack"
}
]
}
]
}
The formatSlackMsg function parses this JSON into an object that we can use later with our connector.
With a set of functions built to handle connecting to Slack and grabbing messages, we need a way to leverage that and get those messages into Siemplify as alerts. This can be done by creating a new connector that leverages the PerimeterX Manager above. This script has quite a bit of code in it so I’m only going to focus on the snippet below:
...
def _fetch_alert(self, alert):
"""Returns an alert, which is an aggregation of basic events. (ie: Arcsight's correlation, QRadar's Offense)"""
self.logger.info("-------------- Started processing Alert {}".format(round(float(alert['ts'])*100000)), alert_id=round(float(alert['ts'])*100000))
alert_info = AlertInfo()
# ----------------------------- Alert Fields initialization START -----------------------------
alert_info.display_id = round(float(alert['ts'])*100000)
alert_info.ticket_id = round(float(alert['ts'])*100000)
alert_info.name = 'Code Defender ' + alert['severity'] + ' Alert'
alert_info.rule_generator = alert['title']
alert_info.start_time = round(float(alert['ts'])*1000)
alert_info.end_time = round(float(alert['ts'])*1000)+1 # Take the current time from Slack but we need to +1 ms since slack timestamps are finer and we could end up in a loop
alert_info.priority = self.VENDOR_RISK_MAP[alert['severity']] # Informative = -1,Low = 40,Medium = 60,High = 80,Critical = 100.
alert_info.device_vendor = VENDOR
alert_info.device_product = PRODUCT
# ----------------------------- Alert Fields initialization END -----------------------------
self.logger.info("---------- Events fetching started for alert {}".format(round(float(alert['ts'])*100000)))
# Gather the Event Details
event = {}
event["StartTime"] = round(float(alert['ts'])*1000)
event["EndTime"] = round(float(alert['ts'])*1000)
event["category"] = alert['text']
event["name"] = alert['title']
event["scriptName"] = alert['script']
event["hostDomain"] = alert['domain']
event["portalDeepLink"] = alert['deepLink']
event["fullURL"] = alert['script'][2:-2]
event["details"] = alert['fullText']
alert_info.events.append(event)
self.logger.info("-------------- Finished processing Alert {}".format(round(float(alert['ts'])*100000)), alert_id=round(float(alert['ts'])*100000))
return alert_info
...
This code is how we build the alert and related event from the formatted Slack message to something that can be consumed by Siemplify.
The complete details of this script can be found here in this GitHub repo.
This step will vary from organization to organization but let’s consider some potential investigation flows. Code Defender alerts contain the script that was discovered accessing content on a particular domain. While the alerts contain useful information, it might also be a good idea to vet the detected script. A flow to discover more details regarding a script could follow a path similar to the below:
There are a number of services both free and paid that can handle steps 2 and 3. I’ve only listed a few to get the juices flowing. Some other possible steps you can take for investigating the script could be:
Once you have your desired plan of action, you’ll want to build the playbook so that it can perform your desired steps for each alert type. The below screenshot shows a very simple playbook that can be built to process Code Defender Alerts.
The connector created in this example will export the alerts with a product name of Code Defender. This product name can be used as the trigger for your playbook. The table below captures some of the other useful items that are added to each alert. These items can be used in steps throughout a playbook:
Item Name | Item Description |
category | Basic categorization of the alert (ex: PII Sniffing, Vulnerable library, etc…) |
hostDomain | Top level domain where the script was discovered being served |
fullURL | The full URL to the script in question |
details | Additional details regarding the discovered script |
portalDeepLink | Deep link to the specific event in the portal |
priority | Mapping of PerimeterX severity level to Siemplify risk level |
Now that you have your connector and playbook created, it is time to start getting alerts into the system. You can enable and configure your connector and begin to wait for alerts to come through the system to process!