httpのベーシック認証しているサーバに辞書アタック的なことをしてきたIPを弾こうと思ったので、pythonで書いてみました。
まあ、/var/log/httpd/error_logのログ上で連続しているという条件で見ているので漏れる可能性は充分にあるけど機械的に連続実行しているなら多分大丈夫なはず。あとはこれをcronのジョブにして。
やっていることは大体こんな感じです。
- 最後にチェックした日時をファイルに書いておいて、次回チェック時はその時刻以降のものをチェック対象にする
- iptableでリジェクトしたIPはファイルに時刻、IPを書き出し
- リジェクトするようにしてから30分経過したものはルールから削除
- 3回連続でエラーログに"user (.*) not found"を残しているIPからのアクセスをiptablesでリジェクト(今はポート80番へのアクセスのみリジェクトで)
連続アクセスの条件をもうちょっとよくするとか、LogDataListは普通のリストで良いかもという点はあるけど、一応動く版はこちら。
#!/usr/bin/env python from datetime import timedelta from datetime import datetime import os import io import re import csv class LogData: def __init__(self, access_date, level, client_ip, detail): self.access_date = access_date self.level = level self.client_ip = client_ip self.detail = detail class LogDataList: def __init__(self): self.data = [] self.index = 0 def append(self, logData): self.data.append(logData) def __iter__(self): return self def next(self): if self.index >= len(self.data): self.index = 0 raise StopIteration ret = self.data[self.index] self.index += 1 return ret def get_log_file_name(): return "/var/log/httpd/error_log" def get_last_timestamp_file_name(): return "/tmp/last_timestamp.txt" def get_blocked_list_file_name(): return "/tmp/blocked_ip_list.txt" def get_datetime_from_string(s): return datetime.strptime(s, "%a %b %d %H:%M:%S %Y") def get_last_timestamp(): name = get_last_timestamp_file_name() if os.path.isfile(name) == False: return datetime(1970, 1, 1) return get_datetime_from_string(open(name).read().strip()) def set_last_timestamp(): with open(get_last_timestamp_file_name(), 'w') as f: s = datetime.now().strftime("%a %b %d %H:%M:%S %Y") f.write(s) def read_log_file(): last_timestamp = get_last_timestamp() print "Last checked time is %(timestamp)s" % {'timestamp':last_timestamp} result = LogDataList() for line in open(get_log_file_name(), 'r'): target_date = get_datetime_from_string(line.split(']')[0][1:]) # Do I have new data? if target_date >= last_timestamp: # Check detail string match = re.search(r'user (.*) not found', line) if match is not None: data = line.split(']') logData = LogData(data[0].split('[')[1], data[1].split('[')[1], data[2].split(' ')[2], data[3].strip()) result.append(logData) return result def exec_iptables_cmd(cmd): print "Do %(cmd)s" % {'cmd':cmd} ret = os.system(cmd) if ret != 0: print "cmd failed" def add_iptables_rule(ip): # Reject all packet or http port #cmd = "iptables -I INPUT -s %(ip)s -j REJECT" % {'ip':ip} cmd = "iptables -I INPUT -s %(ip)s -j REJECT -p tcp --dport 80" % {'ip':ip} exec_iptables_cmd(cmd) def del_iptables_rule(ip): # Reject all packet or http port #cmd = "iptables -D INPUT -s %(ip)s -j REJECT" % {'ip':ip} cmd = "iptables -D INPUT -s %(ip)s -j REJECT -p tcp --dport 80" % {'ip':ip} exec_iptables_cmd(cmd) def create_need_block_list(blocked_ips, suspect_list): ret = {} cnt = 0 last_ip = "" for data in suspect_list: if (data.client_ip in blocked_ips) == True: print "IP %(ip)s is already in blocked list" % {'ip':data.client_ip} if data.client_ip == last_ip: cnt += 1 else: cnt = 0 last_ip = data.client_ip if cnt >= 3: # I am going to block this IP. # Add it to the list ret[data.client_ip] = (data.access_date, data.client_ip) return ret def block_ips(need_block_list): for key in need_block_list: data = need_block_list[key] # Let's block! add_iptables_rule(data[1]) def unblock_ips(blocked_list): ret = {} curdatetime = datetime.now() del_list = [] for key in blocked_list: data = blocked_list[key] d = get_datetime_from_string(data[0]) if curdatetime > d + timedelta(minutes=30): print "Unblock IP %(ip)s" % {'ip':data[1]} del_iptables_rule(data[1]) del_list.append(key) # remove this ip from map for k in del_list: del blocked_list[k] return blocked_list def get_blocked_ips(): ip_map = {} if os.path.exists(get_blocked_list_file_name()) == False: print "File %(blocked_file)s is not found" % {'blocked_file':get_blocked_list_file_name()} return ip_map # file format should be # date, ip reader = csv.reader(file(get_blocked_list_file_name(), 'r')) for line in reader: if len(line) == 0: break if line[0][0] == '#': continue d = line[0] ip = line[1].strip() if (ip in ip_map) == False: ip_map[ip] = (d, ip) return ip_map def write_back_blocked_ips(need_block_list, blocked_ips): with open(get_blocked_list_file_name(), 'w') as f: writer = csv.writer(f) for key in need_block_list: writer.writerow(need_block_list[key]) for key in blocked_ips: writer.writerow(blocked_ips[key]) def start(): # Read blocked ip list file. blocked_ips = get_blocked_ips() # Unblock ip if this ip access date is 30 minutes before. # And re-create blocked ip list that does not contain unblocked ip. blocked_ips = unblock_ips(blocked_ips) # Read error_log file to craete suspect ip list. suspect_list = read_log_file() # Get IPs that need block need_block_list = create_need_block_list(blocked_ips, suspect_list) # Do it! block_ips(need_block_list) # Write blocked ip list write_back_blocked_ips(need_block_list, blocked_ips) # write timestampe set_last_timestamp() if __name__ == "__main__": start()
gistはこちら。