Writing a SLIPS Module

How to Create a New Slips Module

What is Slips and why modules are useful

Slips is a machine learning-based intrusion prevention system for Linux and MacOS, developed at the Stratosphere Laboratories from the Czech Technical University in Prague. Slips reads network traffic flows from several sources, applies multiple detections (including machine learning detections) and detects infected computers and attackers in the network. It is easy to extend the functionality of Slips by writing a new module. This blog shows how to create a new module for Slips from scratch.

Goal of this Blog

This blog creates an example module to detect when any private IP address communicates with another private IP address. We want to know if, for example, the IP 192.168.4.2, is communicating with the IP 192.168.4.87. This simple idea, but still useful, is going to be the purpose of our module. Also, it will generate an alert for Slips to consider this situation.

Our example module will be called ‘local_connection_detector’.


High-level View of how a Module Works in slips

A Slip’s module starts with the init() function for initialization of the different components, such as the database, setting up the outputqueue for printing and logging, subscribing to channels, etc.

The main function of each module is run(), this function should contain a ‘while True’ that keeps looping as long as Slips is running so that the module doesn’t terminate.

Each module has it’s own print() function that handles text printing and logging by passing everything to the OutputProcess.py for processing

Each Module also has it’s own shutdown_gracefully() function that handles cleaning up after the module is done processing. It handles, for example:

  • Saving a model before Slips stops

  • Saving alerts in a .txt file if the module’s job is to export alerts

  • Telling the main module (slips.py) that the module is done processing so slips.py can kill it,

etc.


Developing the local_connection_detector Module

1. Copying the template module

When Slips runs, it automatically loads all the modules inside the modules/ directory. Therefore, our new module should be placed there as a new directory and it will automatically run. Slips has a template module directory for users to copy, so we are going to copy it and modify for our purposes.

 cp -a modules/template modules/local_connection_detector  

After this command you should have a structure like this:

modules/  
├─ local_connection_detector/  
│   ├─ __init__.py 
│   ├─ local_connection_detector.py  

The __init__.py is to make sure the module is treated as a python package, don’t delete it.


2. Changing the Name of the ModulE

Each module in Slips should have a name, author and description. We should change the name inside the local_connection_detector.py file by finding the lines with the name and description in the class ‘Module’ and changing them:

name = 'local_connection_detector' 
description = 'detects connections to other devices in your local network'
authors = ['Your name']  

3. Setting the Redis Pub/Sub chanels

Now we need to tell our module which data it will receive. This is done by subscribing to the appropriate Redis channels. The channel we will use is called new_flow. It is used to receive every new flow seen in the network. See that you have this line in the __init_() function.

 self.c1 = __database__.subscribe('new_flow')  

So now every time Slips receives a new flow, the module will receive it in this channel. You don’t need to modify the module to do this, since it is already done in the line:

 message = self.c1.get_message(timeout=self.timeout)  

Which is run in the while True loop.

To verify that the message received is correct before using it, Slips uses the function utils.is_msg_intended_for. You don’t have to modify your module, since this is already done in the line:

 if utils.is_msg_intended_for(message, 'new_flow'):  

Now, you can access the content of the flow using the line:

 message = message['data']  

Thus far, we have the following code that gets a message every time Slips receives a new flow:

def __init__(self, outputqueue, config, redis_port):
        self.c1 = __database__.subscribe('new_flow')
def run(self):
        utils.drop_root_privs()
        while True:
            try:
                message = self.c1.get_message(timeout=self.timeout)
                if message and message['data'] == 'stop_process':
                    self.shutdown_gracefully()
                    return True

                if utils.is_msg_intended_for(message, 'new_flow'):
                    #TODO
                    pass

            except KeyboardInterrupt:
                self.shutdown_gracefully()
            except Exception as inst:
                exception_line = sys.exc_info()[2].tb_lineno
                self.print(str(inst), 0, 1)

4. Detecting connections to local devices

In order to implement our idea of detecting when a private IP contacts another private IP, we need to:

  • Extract the source IP

  • Extract the destination IP

  • Check if both of them are private

  • Generate an evidence

5. Extracting source and destination ips from the flow

Once you received your message, extracting IPs is done by the following code:

message = message['data']
message = json.loads(message)
flow = json.loads(message['flow'])
uid = next(iter(flow))
flow = json.loads(flow[uid])
saddr = flow['saddr']
daddr = flow['daddr']

As you can see, the message comes in JSON format. The line uid = next(iter(flow)) extracts the flow id, which is a random string generated by Zeek and used as key in the JSON.

6. Verify that the ips are private

Now we need to check if both IPs of them are private. For this we will import the ipaddress library.

import ipaddress
srcip_obj = ipaddress.ip_address(saddr)
dstip_obj = ipaddress.ip_address(daddr)
if srcip_obj.is_private and dstip_obj.is_private:
    #TODO
    pass

Now that we’re sure both IPs are private, we need to generate an evidence. An evidence in Slips is any detection or information that the module needs to tell Slips about. It is not an alert and it doesn’t mean that this will be blocked. The blocking decision is taken by Slips later.
For sending evidence, Slips requires the following info about the evidence:

# on a scale of 0 to 1, how confident you are of this evidence
confidence = 0.8
# how dangerous is this evidence? info, low, medium, high, critical?
threat_level = 'high'

# the name of your evidence, you can put any descriptive string here
type_evidence = 'ConnectionToLocalDevice'
# what is this evidence category according to IDEA categories 
category = 'Anomaly.Connection'
# which ip is the attacker here? the src or the dst?
type_detection = 'srcip'
# what is the ip of the attacker?
detection_info = saddr
# describe the evidence
description = f'Detected a connection to a local device '
timestamp = datetime.datetime.now().strftime('%Y/%m/%d-%H:%M:%S')

After the basic information about the evidence was filled, there are two more pieces of info that you don’t have to modify, the id of the profile, and the id of the timewindow when the evidence happened:

# the current profile is the source ip, this comes in 
# the msg received in the channel
profileid = message['profileid']

# Profiles are split into timewindows, each timewindow is 1h    
# this comes in the msg received in the channel 
twid = message['twid'] 

Then, we only need to send the evidence

__database__.setEvidence(
  type_evidence,
  type_detection,     
  detection_info,     
  threat_level,     
  confidence,     
  description,     
  timestamp,     
  category,     
  profileid=profileid,     
  twid=twid,
)

7. Testing the Module

The module is now ready to be used. You can copy/paste the complete code from here.

First we start Slips by using the following command:

 ./slips.py -i wlp3s0 -o local_conn_detector  

-o is to store the output in the local_conn_detector/ directory.

Then we make a connection to a private IP:

 ping 192.168.1.18  

And you should see the alerts file ./local_conn_detector/alerts.log. Check by using:

 cat local_conn_detector/alerts.log  

An example output would be:

Using develop - 9f5f9412a3c941b3146d92c8cb2f1f12aab3699e - 2022-06-02 16:51:43.989778

2022/06/02-16:51:57: Src IP 192.168.1.18 Detected Detected a connection to a local device 192.168.1.12
2022/06/02-16:51:57: Src IP 192.168.1.12 Detected Detected a connection to a local device 192.168.1.18

This is an animated gif of the module working:

Demo

Conclusion

Due to the high modularity of slips, adding a new slips module is as easy as modifying a few lines in the template module. Slips will handle running your module and integrating it for you.

This is the current list of Slips modules. You can enhance them, add detections, suggest new ideas using our Discord or by opening a PR. For more info about the threat levels, check the docs. A detailed explanation of IDEA categories is here. A detailed explanation of Slips profiles and timewindows is here. And here the Contributing guidelines.

Complete Code

Here is the whole local_connection_detector.py code for copy/paste.

# Must imports
from slips_files.common.abstracts import Module
import multiprocessing
from slips_files.core.database import __database__
from slips_files.common.slips_utils import utils
import platform
import sys

# Your imports
import datetime
import ipaddress
import json

class Module(Module, multiprocessing.Process):
    # Name: short name of the module. Do not use spaces
    name = 'local_connection_detector'
    description = 'detects connections to other devices in your local network'
    authors = ['Template Author']

    def __init__(self, outputqueue, config, redis_port):
        multiprocessing.Process.__init__(self)
        # All the printing output should be sent to the outputqueue.
        # The outputqueue is connected to another process called OutputProcess
        self.outputqueue = outputqueue
        # In case you need to read the slips.conf configuration file for
        # your own configurations
        self.config = config
        # Start the DB
        __database__.start(self.config, redis_port)
        # To which channels do you wnat to subscribe? When a message
        # arrives on the channel the module will wakeup
        # The options change, so the last list is on the
        # slips/core/database.py file. However common options are:
        # - new_ip
        # - tw_modified
        # - evidence_added
        # Remember to subscribe to this channel in database.py
        self.c1 = __database__.subscribe('new_flow')
        self.timeout = 0.0000001

    def print(self, text, verbose=1, debug=0):
        """
        Function to use to print text using the outputqueue of slips.
        Slips then decides how, when and where to print this text by taking all the processes into account
        :param verbose:
            0 - don't print
            1 - basic operation/proof of work
            2 - log I/O operations and filenames
            3 - log database/profile/timewindow changes
        :param debug:
            0 - don't print
            1 - print exceptions
            2 - unsupported and unhandled types (cases that may cause errors)
            3 - red warnings that needs examination - developer warnings
        :param text: text to print. Can include format like 'Test {}'.format('here')
        """

        levels = f''
        self.outputqueue.put(f'{levels}|{self.name}|')

    def shutdown_gracefully(self):
        # Confirm that the module is done processing
        __database__.publish('finished_modules', self.name)

    def run(self):
        utils.drop_root_privs()
        # Main loop function
        while True:
            try:
                message = self.c1.get_message(timeout=self.timeout)
                if message and message['data'] == 'stop_process':
                    self.shutdown_gracefully()
                    return True

                if utils.is_msg_intended_for(message, 'new_flow'):
                    message = message['data']
                    message = json.loads(message)
                    flow = json.loads(message['flow'])
                    uid = next(iter(flow))
                    flow = json.loads(flow[uid])
                    saddr = flow['saddr']
                    daddr = flow['daddr']
                    srcip_obj = ipaddress.ip_address(saddr)
                    dstip_obj = ipaddress.ip_address(daddr)
                    if srcip_obj.is_private and dstip_obj.is_private:
                        # on a scale of 0 to 1, how confident you are of this evidence
                        confidence = 0.8
                        # how dangerous is this evidence? info, low, medium, high, critical?
                        threat_level = 'high'
                        # the name of your evidence, you can put any descriptive string here
                        type_evidence = 'ConnectionToLocalDevice'
                        # what is this evidence category according to IDEA categories
                        category = 'Anomaly.Connection'
                        # which ip is the attacker here? the src or the dst?
                        type_detection = 'srcip'
                        # what is the ip of the attacker?
                        detection_info = saddr
                        # describe the evidence
                        description = f'Detected a connection to a local device '
                        timestamp = datetime.datetime.now().strftime('%Y/%m/%d-%H:%M:%S')
                        # the crrent profile is the source ip, this comes in the msg received in the channel
                        profileid = message['profileid']
                        # Profiles are split into timewindows, each timewindow is 1h, this comes in the msg received in the channel
                        twid = message['twid']

                        __database__.setEvidence(
                            type_evidence,
                            type_detection,
                            detection_info,
                            threat_level,
                            confidence,
                            description,
                            timestamp,
                            category,
                            profileid=profileid,
                            twid=twid,
                        )

            except KeyboardInterrupt:
                self.shutdown_gracefully()
                return True
            except Exception as inst:
                exception_line = sys.exc_info()[2].tb_lineno
                self.print(f'Problem on the run() line ', 0, 1)
                self.print(str(type(inst)), 0, 1)
                self.print(str(inst.args), 0, 1)
                self.print(str(inst), 0, 1)
                return True

Line by Line Explanation of the Module

This section is for more detailed explaination of what each line of the module does.

 self.outputqueue = outputqueue  

the outputqueue is used whenever the module wants to print something, each module has it’s own print() function that uses this queue. So in order to print you simply write

self.print(“some text”, 1, 0)

and the text will be sent to the outputqueue to process, log, and print to the terminal.

 self.config = config  

This line is necessary if you need to read the slips.conf configuration file for your own configurations

 __database__.start(self.config, redis_port)  

This line starts the redis database, Slips mainly depends on redis Pub/Sub system for modules communications, so if you need to listen on a specific channel after starting the db you can add the following line to init()

 self.timeout = 0.0000001  

Is used for listening on the redis channel, if your module will be using 1 channel, timeout=0 will work fine, but in order to listen on more than 1 channel, you need to set a timeout so that the module won’t be stck listening on the same channel forever.

Now here’s the run() function, this is the main function of each module, it’s the one that gets executed when the module starts. All the code in this function should be run in a loop or else the module will finish execution and terminate.

 utils.drop_root_privs()  

the above line is responsible for dropping root privileges, so if slips starts with sudo and the module doesn’t need the sudo permissions, we drop them.

 message = self.c1.get_message(timeout=self.timeout)  

The above line listen on the c1 channel (‘new ip’) that we subscribed to earlier. The messages recieved in the channel can either be stop_process or a message with data

 if message and message['data'] == 'stop_process':  

The stop_message is sent from the main slips.py to tell the module that slips is stopping and the module should finish all the processing it’s doing and shutdown.

So, for example if you’re training a ML model in your module, and you want to save it before the module stops. You should place the save_model() function right above the following line, or inside the function

self.shutdown_gracefully() 

inside shutdown_gracefully() we have the following line

__database__.publish('finished_modules', self.name)  

This is the module, responding to the stop_message, telling slips.py that it successfully finished processing and is terminating.