( ..)φカキカキ pythonとiptableでアクセス制御

httpのベーシック認証しているサーバに辞書アタック的なことをしてきたIPを弾こうと思ったので、pythonで書いてみました。
まあ、/var/log/httpd/error_logのログ上で連続しているという条件で見ているので漏れる可能性は充分にあるけど機械的に連続実行しているなら多分大丈夫なはず。あとはこれをcronのジョブにして。

やっていることは大体こんな感じです。

  • 最後にチェックした日時をファイルに書いておいて、次回チェック時はその時刻以降のものをチェック対象にする
  • iptableでリジェクトしたIPはファイルに時刻、IPを書き出し
  • リジェクトするようにしてから30分経過したものはルールから削除
  • 3回連続でエラーログに"user (.*) not found"を残しているIPからのアクセスをiptablesでリジェクト(今はポート80番へのアクセスのみリジェクトで)

連続アクセスの条件をもうちょっとよくするとか、LogDataListは普通のリストで良いかもという点はあるけど、一応動く版はこちら。

#!/usr/bin/env python

from datetime import timedelta
from datetime import datetime
import os
import io
import re
import csv

class LogData:
    def __init__(self, access_date, level, client_ip, detail):
        self.access_date = access_date
        self.level = level
        self.client_ip = client_ip
        self.detail = detail
        
class LogDataList:
    def __init__(self):
        self.data = []
        self.index = 0

    def append(self, logData):
        self.data.append(logData)

    def __iter__(self):
        return self

    def next(self):
        if self.index >= len(self.data):
            self.index = 0
            raise StopIteration
        ret = self.data[self.index]
        self.index += 1
        return ret

def get_log_file_name():
    return "/var/log/httpd/error_log"

def get_last_timestamp_file_name():
    return "/tmp/last_timestamp.txt"

def get_blocked_list_file_name():
    return "/tmp/blocked_ip_list.txt"

def get_datetime_from_string(s):
    return datetime.strptime(s, "%a %b %d %H:%M:%S %Y")
    
def get_last_timestamp():
    name = get_last_timestamp_file_name()
    if os.path.isfile(name) == False:
        return datetime(1970, 1, 1)

    return get_datetime_from_string(open(name).read().strip())

def set_last_timestamp():
    with open(get_last_timestamp_file_name(), 'w') as f:
        s = datetime.now().strftime("%a %b %d %H:%M:%S %Y")
        f.write(s)

def read_log_file():
    last_timestamp = get_last_timestamp()
    print "Last checked time is %(timestamp)s" % {'timestamp':last_timestamp}
    result = LogDataList()

    for line in open(get_log_file_name(), 'r'):
        target_date = get_datetime_from_string(line.split(']')[0][1:])

        # Do I have new data?
        if target_date >= last_timestamp:
            # Check detail string
            match = re.search(r'user (.*) not found', line)
            if match is not None:
                data = line.split(']')
                logData = LogData(data[0].split('[')[1], data[1].split('[')[1], data[2].split(' ')[2], data[3].strip())
                result.append(logData)
 
    return result

def exec_iptables_cmd(cmd):
    print "Do %(cmd)s" % {'cmd':cmd}
    ret = os.system(cmd)
    if ret != 0:
        print "cmd failed"

def add_iptables_rule(ip):
    # Reject all packet or http port
    #cmd = "iptables -I INPUT -s %(ip)s -j REJECT" % {'ip':ip}
    cmd = "iptables -I INPUT -s %(ip)s -j REJECT -p tcp --dport 80" % {'ip':ip}
    exec_iptables_cmd(cmd)

def del_iptables_rule(ip):
    # Reject all packet or http port
    #cmd = "iptables -D INPUT -s %(ip)s -j REJECT" % {'ip':ip}
    cmd = "iptables -D INPUT -s %(ip)s -j REJECT -p tcp --dport 80" % {'ip':ip}
    exec_iptables_cmd(cmd)

def create_need_block_list(blocked_ips, suspect_list):
    ret = {}
    cnt = 0
    last_ip = ""

    for data in suspect_list:
        if (data.client_ip in blocked_ips) == True:
            print "IP %(ip)s is already in blocked list" % {'ip':data.client_ip}

        if data.client_ip == last_ip:
            cnt += 1
        else:
            cnt = 0
            last_ip = data.client_ip
        
        if cnt >= 3:
            # I am going to block this IP.
            # Add it to the list
            ret[data.client_ip] = (data.access_date, data.client_ip)

    return ret

def block_ips(need_block_list):
    for key in need_block_list:
        data = need_block_list[key]
        # Let's block!
        add_iptables_rule(data[1])

def unblock_ips(blocked_list):
    ret = {}
    curdatetime = datetime.now()

    del_list = []

    for key in blocked_list:
        data = blocked_list[key]

        d = get_datetime_from_string(data[0])
        if curdatetime > d + timedelta(minutes=30):
            print "Unblock IP %(ip)s" % {'ip':data[1]}
            del_iptables_rule(data[1])
            del_list.append(key)

    # remove this ip from map
    for k in del_list:
        del blocked_list[k]

    return blocked_list

def get_blocked_ips():
    ip_map = {}

    if os.path.exists(get_blocked_list_file_name()) == False:
        print "File %(blocked_file)s is not found" % {'blocked_file':get_blocked_list_file_name()}
        return ip_map

    # file format should be
    # date, ip
    reader = csv.reader(file(get_blocked_list_file_name(), 'r'))

    for line in reader:
        if len(line) == 0:
            break
        if line[0][0] == '#':
            continue

        d = line[0]
        ip = line[1].strip()

        if (ip in ip_map) == False:
            ip_map[ip] = (d, ip)

    return ip_map

def write_back_blocked_ips(need_block_list, blocked_ips):

    with open(get_blocked_list_file_name(), 'w') as f:
        writer = csv.writer(f)

        for key in need_block_list:
            writer.writerow(need_block_list[key])

        for key in blocked_ips:
            writer.writerow(blocked_ips[key])

def start():
    # Read blocked ip list file. 
    blocked_ips = get_blocked_ips()

    # Unblock ip if this ip access date is 30 minutes before.
    # And re-create blocked ip list that does not contain unblocked ip.
    blocked_ips = unblock_ips(blocked_ips)

    # Read error_log file to craete suspect ip list.
    suspect_list = read_log_file()

    # Get IPs that need block
    need_block_list = create_need_block_list(blocked_ips, suspect_list)

    # Do it!
    block_ips(need_block_list)

    # Write blocked ip list
    write_back_blocked_ips(need_block_list, blocked_ips)

    # write timestampe
    set_last_timestamp()

if __name__ == "__main__":
    start()

gistはこちら