You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
137 lines
4.0 KiB
137 lines
4.0 KiB
#!/usr/bin/env python3 |
|
|
|
import iptc |
|
import sqlite3 |
|
import time |
|
import os |
|
import re |
|
|
|
script_path = os.path.dirname(os.path.abspath(__file__)) |
|
script_path += '/' |
|
print(script_path) |
|
sqlite_db = sqlite3.connect('/root/firewall.sqlite') |
|
cursor = sqlite_db.cursor() |
|
cursor.execute(''' |
|
CREATE TABLE IF NOT EXISTS iptables(mac TEXT PRIMARY KEY, duration TEXT) |
|
''') |
|
sqlite_db.commit() |
|
|
|
def remove_mac_address(mac_addr): |
|
rule = iptc.Rule() |
|
match = iptc.Match(rule, "mac") |
|
match.mac_source = mac_addr |
|
rule.add_match(match) |
|
rule.target = iptc.Target(rule, "ACCEPT") |
|
chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "FORWARD") |
|
for r in chain.rules: |
|
if r == rule: |
|
chain.delete_rule(r) |
|
cursor.execute(''' |
|
DELETE FROM iptables WHERE mac=\'{mac}\' |
|
'''.format(mac=mac_addr)) |
|
sqlite_db.commit() |
|
print('Found {0}, removing'.format(mac_addr)) |
|
|
|
def check_expired_sessions(): |
|
cursor.execute(''' |
|
SELECT mac, duration FROM iptables |
|
''') |
|
entries = cursor.fetchall() |
|
for entry in entries: |
|
mac = entry[0] # mac address |
|
duration = entry[1] |
|
if duration != 'never': |
|
duration = int(entry[1]) # duration |
|
now = int(time.time()) |
|
if now > duration: |
|
remove_mac_address(mac) |
|
print('{} is out of time, removing...'.format(mac)) |
|
|
|
# check if iptables entry is the same with database |
|
def check_iptables_entry(): |
|
cursor.execute(''' |
|
SELECT mac FROM iptables |
|
''') |
|
macs = cursor.fetchall() |
|
chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "FORWARD") |
|
for m in macs: |
|
mac = m[0] |
|
rule = iptc.Rule() |
|
match = iptc.Match(rule, "mac") |
|
match.mac_source = mac |
|
rule.add_match(match) |
|
rule.target = iptc.Target(rule, "ACCEPT") |
|
for r in chain.rules: |
|
if r == rule: |
|
print('Found {}, ignoring'.format(mac)) |
|
break |
|
else: |
|
print('Inserting {}'.format(mac)) |
|
chain.insert_rule(rule) |
|
|
|
# duration is in seconds |
|
# mac_addr is a string object representing a mac address |
|
def allow_mac_address(mac_addr, duration, comment): |
|
rule = iptc.Rule() |
|
match = iptc.Match(rule, "mac") |
|
match.mac_source = mac_addr |
|
rule.add_match(match) |
|
rule.target = iptc.Target(rule, "ACCEPT") |
|
chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "FORWARD") |
|
for r in chain.rules: |
|
if r == rule: |
|
break |
|
else: |
|
print('[LOGIC] allowing {}'.format(mac_addr)) |
|
chain.insert_rule(rule) |
|
|
|
if duration != 'never': |
|
duration = int(duration) + int(time.time()) |
|
|
|
cursor.execute(''' |
|
INSERT OR IGNORE INTO iptables(mac, duration, comment) |
|
VALUES(\'{mac}\', \'{duration}\', \'{comm}\') |
|
'''.format(mac=mac_addr, duration=duration,comm=comment)) |
|
sqlite_db.commit() |
|
|
|
def get_unknown_macs(): |
|
cursor.execute(''' |
|
SELECT mac FROM iptables |
|
''') |
|
entries = cursor.fetchall() |
|
macs = [] |
|
cmd = '''/sbin/arp -na | grep -v incomplete > {sc}/arp.list'''.format(sc=script_path) |
|
os.system(cmd) |
|
with open('{}arp.list'.format(script_path)) as f: |
|
for line in f: |
|
ip = line[line.find('(')+1:line.find(')')] |
|
mac_regex = re.compile(r'(?:[0-9a-fA-F]:?){12}') |
|
mac_addr = re.findall(mac_regex, line) |
|
for e in entries: |
|
if e[0].lower() == mac_addr[0].lower(): |
|
break |
|
else: |
|
macs.append((mac_addr[0].lower(), ip)) |
|
return macs |
|
|
|
def get_known_macs(): |
|
cursor.execute(''' |
|
SELECT mac, comment FROM iptables |
|
''') |
|
macs = [] |
|
|
|
entries = cursor.fetchall() |
|
for e in entries: |
|
if e[1]: |
|
com = e[1].lower() |
|
else: |
|
com = '' |
|
macs.append((e[0].lower(), com)) |
|
return macs |
|
|
|
def main(): |
|
check_iptables_entry() |
|
check_expired_sessions() |
|
|
|
if __name__ == '__main__': |
|
main()
|
|
|