(Python3) Creates an sqlite3 database from Xonotic's game server database.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

176 lines
6.0 KiB

import re, argparse
import sqlite3 as sql
import logging
import logging.handlers
from os import listdir, mkdir
from os.path import isfile
from urllib.parse import unquote
from modules.readsvdb import readfile, cleanup
from modules.log import filename
def initl():
fn = filename("_logs/dbimport-%s.log")
fh = open(fn, mode='a', encoding='utf-8')
logging.basicConfig(stream=fh, level=logging.DEBUG)
#------------------------------------------------+
# Functions: Clean up.
#------------------------------------------------+
# Unlike other rows,
# the separator character, '/' is part of the value of the second column.
# so an ordinary match for '/' or '\' can not be done like the other types of rows.
# example from game server db:
# \/uid2name/Mnumg2Yh/yxNFDTqGI+YyhlM7QDI0fpEmAaBJ8cI5dU=\Tuxxy
# it should become:
# ["uid2name", "Mnumg2Yh/yxNFDTqGI+YyhlM7QDI0fpEmAaBJ8cI5dU=", "Tuxxy"]
def uid2namefix(row):
# quick fix
# replace first and last occurrence of backslash
# this results in [,/uid2name/cryptoid_fp, name]
e = re.sub(r'^([^\\]*)\\|\\(?=[^\\]*$)', ',', row)
# replace first two occurence of forward slash
# this results in [,,uid2name,cryptoid_fp, name]
ee = e.replace('/', ',', 2)
# split on comma
# but start from index 2 because the first commas are left over
# c is now a list of strings.
# ["uid2name", <crypto_idfp value>, <player name value>]
c = ee[2:].split(',')
c[2] = unquote(c[2])
c[2] = c[2].strip('\n')
return c
# O(n) and organize cts related data into list of rows.
def filters(db):
tt = []
tr = []
ti = []
rank_index = 2
for d in db:
if d.find("uid2name") != -1:
ti.append(uid2namefix(d))
else:
# regex:
# find substrings that do not contain backslash, forwardslash, or newline.
e = re.findall(r'[^\\/\n]+', d)
if d.find("cts100record/time") != -1:
e[rank_index] = int(e[rank_index].replace("time", ""))
tt.append(e)
if d.find("cts100record/crypto_idfp") != -1:
e[3] = unquote(e[3])
e[rank_index] = int(e[rank_index].replace("crypto_idfp", ""))
tr.append(e)
return tt, tr, ti
#------------------------------------------------+
# Functions: Database Creation
#------------------------------------------------+
def inserttodb(c, q, d):
for x in d:
# possible to do executemany
# but want to be able to catch the problematic rows
# as it is iterated through.
# and proceed with adding OK rows.
try:
c.execute(q, x)
except sql.ProgrammingError as e:
print(e)
print(x)
#------------------------------------------------+
# insert new data directly into new database file
def i(d, s):
con = sql.connect(d)
with con:
csr = con.cursor()
try:
times, ranks, ids = filters(cleanup(readfile(s)))
if times:
inserttodb(csr, "INSERT OR REPLACE INTO Cts_times VALUES(?, ?, ?, ?)", times)
logging.info('\n'.join(y for y in [str(x) for x in times]))
if ranks:
inserttodb(csr, "INSERT OR REPLACE INTO Cts_ranks VALUES(?, ?, ?, ?)", ranks)
logging.info('\n'.join(y for y in [str(x) for x in ranks]))
if ids:
inserttodb(csr, "INSERT OR REPLACE INTO Id2alias VALUES(?, ?, ?)", ids)
logging.info('\n'.join(y for y in [str(x) for x in ids]))
except sql.Error:
logging.exception("sql error encountered in function 'i'")
if con:
con.rollback()
# 'insert' new data into a file i.e sql query file
def f(d, s):
with open(d, 'w', encoding='utf-8') as h:
times, ranks, ids = filters(cleanup(readfile(s)))
for t in times:
h.write("INSERT OR REPLACE INTO Cts_times VALUES(%s, %s, %s, %s)\n" % tuple(t))
pass
for r in ranks:
h.write("INSERT OR REPLACE INTO Cts_ranks VALUES(%s, %s, %s, %s)\n" % tuple(r))
pass
for i in ids:
h.write("INSERT OR REPLACE INTO Id2aslias VALUES(%s, %s, %s)\n" % tuple(i))
pass
pass
pass
# Test whether repeat rows are added.
def duplicatestest(d, s):
c = sql.connect(d)
p = True
with c:
cs = c.cursor()
try:
logging.info("Inserting into database (1/2)")
i(d, s)
logging.info("Querying (1/2)")
cs.execute("SELECT * FROM Cts_times")
a = cs.fetchall()
cs.execute("SELECT * FROM Cts_ranks")
b = cs.fetchall()
cs.execute("SELECT * FROM Id2alias")
c = cs.fetchall()
logging.info("Inserting into database (2/2)")
i(d, s)
logging.info("Querying (2/2)")
cs.execute("SELECT * FROM Cts_times")
x = cs.fetchall()
cs.execute("SELECT * FROM Cts_ranks")
y = cs.fetchall()
cs.execute("SELECT * FROM Id2alias")
z = cs.fetchall()
if len(a) != len(x):
logging.error("Issue with Cts_times")
p = False
if len(b) != len(y):
logging.error("Issue with Cts_ranks")
p = False
if len(c) != len(z):
logging.error("Issue with Id2alias")
p = False
if p:
logging.info("Database ok - no repeat rows added.")
except sql.Error:
logging.exception("encountered sql error in function 'duplicate test'.")
if __name__ == "__main__":
ap = argparse.ArgumentParser()
ap.add_argument('db')
ap.add_argument('src')
ap.add_argument('-t', '--test', action='store_true')
ap.add_argument('-q', '--sql', action='store_true')
args = ap.parse_args()
initl()
if args.test:
duplicatestest(args.db, args.src)
if args.sql:
f(args.db, args.src)
else:
i(args.db, args.src)