Chase up Datper’s Communication Logs with Splunk/Elastic Stack

The last article introduced some features of Datper malware and a Python script for detecting its distinctive communication. Based on that, we are presenting how to search proxy logs for Datper’s communication using log management tools – Splunk and Elastic Stack (Elasticsearch, Logstash and Kibana).

<For Splunk>

To extract Datper’s communication log using Splunk, the first thing you need to do is to create a custom search command as follows. We confirmed that the commands presented below work on Splunk 6.3 and newer versions.

Generate custom search command and investigate logs

Generate a custom search command following the instruction below. For more detailed instructions, please refer to Splunk Documentation [1].

  1. Set up Splunklib

Use “splunklib” included in SplunkSDK (SoftwareDevelopmentKit) for Python. You can download it from the below website:

Splunk SDK for Python
http://dev.splunk.com/goto/sdk-python

Copy the “splunklib” folder from SplunkSDK for Python to $SPLUNK_HOME/etc/apps/search/bin.

  1. Implement the script for custom search command

Save the script for custom search command “datper_splunk.py” in $SPLUNK_HOME/etc/apps/search/bin.

The script “datper_splunk.py” is available on GitHub. Please download it from the following website:

JPCERTCC/aa-tools · GitHub
https://github.com/JPCERTCC/aa-tools/blob/master/datper_splunk.py

  1. Configuration files

Create two config files in $SPLUNK_HOME/etc/apps/search/local and specify them as below. If the files already exist, add configuration lines as instructed.

commands.conf

[datper]
filename = datper_splunk.py
streaming = true
chunked = true
local = true

authorize.conf

[capability::run_script_datper]
[role_admin]
run_script_datper = enabled

The configuration creates a custom search command named “datper”.

  1. Restart Splunk

Restart Splunk to activate the new configuration. After the restart, you can use the newly added custom search command “datper”.

  1. Run

Now it is time to run the custom search command. When executing, a new field “is_datper” is created for each event (indexed log), and the boolean result (whether the log features match Datper’s communication) is given in “yes” or “no”. Figure 1 shows an example result of command execution. It also retrieves destination IP addresses and URLs that the malware communicated with. Note that indexes and tables should be configured according to the environment.

index=proxy_log | datper | table _time,is_datper,dst_ip,url
Figure 1: Custom search command “datper” execution result
Fig1_rev

<For Elastic Stack>

The latter section will provide instructions on how to add configuration to search for Datper’s communication in an Elastic Stack environment. In this test environment, proxy server logs are taken from Squid. The software versions used for this test was Elasticsearch 5.5.1, Logstash 5.5.1 and Kibana 5.5.1.

Configuration of Elastic Stack

1. Install a script

Download “datper_elk.py” from the following website and install it in a directory.

JPCERTCC/aa-tools · GitHub
https://github.com/JPCERTCC/aa-tools/blob/master/datper_elk.py

In the test environment, we installed it in /opt/bin.

2. Configure Logstash

Create a config file so that Logstash reads proxy logs and a new field is created for boolean results if it matches Datper’s Communication. If Logstash is already configured to read proxy logs, you can skip step 2.1 and proceed to 2.2.

2.1 Configuration for input
input { 
  file {
    type => "squid"
    start_position => "beginning"
    path => ["/var/log/squid3/access.log"]
  }
}

By default, Logstash is configured to process a newly added row in the log. To get the entire log, you need to define explicitly as follows: start_position => “beginning”.

“path” defines the log file to import.

 2.2 Configuration for filter
filter {
  if [type] == "squid" {
    grok {
      match => [ "message", "%{NUMBER:timestamp}\s+%{NUMBER:response_time} %{IP:src_ip} %{WORD:squid_request_status}/%{NUMBER:http_status_code} %{NUMBER:reply_size_include_header} %{WORD:http_method} %{WORD:http_protocol}://%{HOSTNAME:dst_host}%{NOTSPACE:request_url} %{NOTSPACE:user} %{WORD:squid}/(?:-|%{IP:dst_ip}) %{NOTSPACE:content_type}" ]
      add_tag => ["squid"]
    }
    date {
      match => ["timestamp", "UNIX"]
    }
    ruby {
      code => 'require "open3"
               message = event.get("message")
               cmd = "/opt/bin/datper_elk.py \'#{message}\'"
               stdin, stdout, stderr = Open3.popen3(cmd)
               event.set("datper", stdout.read)
               '
    }
  }
}

This configuration is to extract information from each field in Squid logs using “grok” filter, and convert UNIX timestamp into normal format with “date” filter. The above configuration applies to default Squid log format setting. If the log format is “combined”, “date” filter pattern needs to be changed by extracting a field using “COMBINEDAPACHELOG” or others as shown below.

    grok {
      match => {
"message" => "%{COMBINEDAPACHELOG} %{WORD:squid_request_status}:%{WORD:squid}"
}
    }
    date {
      match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
    }

The ruby filter runs the script, and boolean results (whether the log features match Datper’s communication) will be returned to “datper” field. The path to cmd script in “code” section in the ruby filter needs to point the location as defined in “1. Install a script” (/opt/bin in this case). The above code for ruby filter is applicable for Event API in Logstash 5.0 and later versions. If you are using older versions, the code needs to be written as follows:

    code => 'require "open3"
         message = event["message"]
               cmd = "/opt/bin/datper_elk.py \'#{message}\'"
               stdin, stdout, stderr, status = Open3.popen3(cmd)
               event["datper"] = stdout.read
               '
2.3 Configuration for output
output {
   elasticsearch {
       hosts => "localhost:9200"
       index => "squid-access"
   }
}

Configure an Elasticsearch host name in the “hosts”, and arbitrary name in “index”.

When Logstash reads and runs the config files created in 2.1 to 2.3 and the log file are imported, a new index is created in Elasticsearch.

 3. Create an index pattern and search logs on Kibana

Access Kibana’s Web UI to input an index name and create a pattern. As explained in “2. Configuration for Logstash”, we set the index name “squid-access” and time filter field “timestamp” as in Figure 2.

Figure 2: Create an index pattern
Fig2

In the “Discover” page, you can see the result obtained by the script in “datper” field as in Figure 3.

Figure 3: “Discover” page
Fig3

If you wish to add “datper” field in an existing index, you can do so by running “datper_elk.py”, specifying an Elasticsearch URL and an index name. If the Elasticsearch URL is http://localhost:9200 and the index name of the proxy log is squid-access, the command will be as follows:

/opt/bin/datper_elk.py http://localhost:9200 squid-access

Conclusion

One of the keys to effective incident response is early detection. Based on the script introduced here, you can also add customised functions as to raise an alert upon detecting Datper’s communication. If you investigate some past logs with this command, you may be able to find attack activities that had been overlooked. In case you find any suspicious communication logs, we recommend that you conduct further analysis and report to info [at] jpcert.or.jp.

- Tomoaki Tani (for Splunk) & Hiroshi Soeda (for Elastic Stack)

(Translated by Yukako Uchida)

Reference

[1] How to create custom search commands using Splunk SDK for Python (Splunk Documentation)

http://dev.splunk.com/view/python-sdk/SP-CAAAEU2

Back
Top
Next