A simple web portal for Linux iptables
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

137 lines
4.0 KiB

#!/usr/bin/env python3
import iptc
import sqlite3
import time
import os
import re
script_path = os.path.dirname(os.path.abspath(__file__))
script_path += '/'
print(script_path)
sqlite_db = sqlite3.connect('/root/firewall.sqlite')
cursor = sqlite_db.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS iptables(mac TEXT PRIMARY KEY, duration TEXT)
''')
sqlite_db.commit()
def remove_mac_address(mac_addr):
rule = iptc.Rule()
match = iptc.Match(rule, "mac")
match.mac_source = mac_addr
rule.add_match(match)
rule.target = iptc.Target(rule, "ACCEPT")
chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "FORWARD")
for r in chain.rules:
if r == rule:
chain.delete_rule(r)
cursor.execute('''
DELETE FROM iptables WHERE mac=\'{mac}\'
'''.format(mac=mac_addr))
sqlite_db.commit()
print('Found {0}, removing'.format(mac_addr))
def check_expired_sessions():
cursor.execute('''
SELECT mac, duration FROM iptables
''')
entries = cursor.fetchall()
for entry in entries:
mac = entry[0] # mac address
duration = entry[1]
if duration != 'never':
duration = int(entry[1]) # duration
now = int(time.time())
if now > duration:
remove_mac_address(mac)
print('{} is out of time, removing...'.format(mac))
# check if iptables entry is the same with database
def check_iptables_entry():
cursor.execute('''
SELECT mac FROM iptables
''')
macs = cursor.fetchall()
chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "FORWARD")
for m in macs:
mac = m[0]
rule = iptc.Rule()
match = iptc.Match(rule, "mac")
match.mac_source = mac
rule.add_match(match)
rule.target = iptc.Target(rule, "ACCEPT")
for r in chain.rules:
if r == rule:
print('Found {}, ignoring'.format(mac))
break
else:
print('Inserting {}'.format(mac))
chain.insert_rule(rule)
# duration is in seconds
# mac_addr is a string object representing a mac address
def allow_mac_address(mac_addr, duration, comment):
rule = iptc.Rule()
match = iptc.Match(rule, "mac")
match.mac_source = mac_addr
rule.add_match(match)
rule.target = iptc.Target(rule, "ACCEPT")
chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "FORWARD")
for r in chain.rules:
if r == rule:
break
else:
print('[LOGIC] allowing {}'.format(mac_addr))
chain.insert_rule(rule)
if duration != 'never':
duration = int(duration) + int(time.time())
cursor.execute('''
INSERT OR IGNORE INTO iptables(mac, duration, comment)
VALUES(\'{mac}\', \'{duration}\', \'{comm}\')
'''.format(mac=mac_addr, duration=duration,comm=comment))
sqlite_db.commit()
def get_unknown_macs():
cursor.execute('''
SELECT mac FROM iptables
''')
entries = cursor.fetchall()
macs = []
cmd = '''/sbin/arp -na | grep -v incomplete > {sc}/arp.list'''.format(sc=script_path)
os.system(cmd)
with open('{}arp.list'.format(script_path)) as f:
for line in f:
ip = line[line.find('(')+1:line.find(')')]
mac_regex = re.compile(r'(?:[0-9a-fA-F]:?){12}')
mac_addr = re.findall(mac_regex, line)
for e in entries:
if e[0].lower() == mac_addr[0].lower():
break
else:
macs.append((mac_addr[0].lower(), ip))
return macs
def get_known_macs():
cursor.execute('''
SELECT mac, comment FROM iptables
''')
macs = []
entries = cursor.fetchall()
for e in entries:
if e[1]:
com = e[1].lower()
else:
com = ''
macs.append((e[0].lower(), com))
return macs
def main():
check_iptables_entry()
check_expired_sessions()
if __name__ == '__main__':
main()