Browse Source

Add tabu search

Signed-off-by: wmb <wmb@teknik.io>
master
wmb 7 months ago
parent
commit
52ea0cbda5
1 changed files with 200 additions and 0 deletions
  1. 200
    0
      tabu.py

+ 200
- 0
tabu.py View File

@@ -0,0 +1,200 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-

import json
import math
import random
import statistics
import sys
from collections import deque

BENCHMARK_JSON = 'benchmark-data.json'
N_CLUSTERS = None
N_VECTORS = None
VECTOR_SIZE = None
N_ITERATIONS = None
N_NEIGHBORS = 128
TABU_SIZE = 1024

def read_benchmark_data(filename):
d = None
with open(BENCHMARK_JSON, 'r', encoding="utf-8") as f:
d = json.load(f)
return d


def init_values(data):
global N_VECTORS, VECTOR_SIZE, N_CLUSTERS, N_ITERATIONS
N_VECTORS = len(data)
VECTOR_SIZE = len(data[0])
try:
N_CLUSTERS = int(sys.argv[1])
N_ITERATIONS = int(sys.argv[2])
except IndexError:
print("Usage: {} N_CLUSTERS N_ITERATION".format(sys.argv[0]))
exit(0)


def distance(u, v):
return math.sqrt(sum((i - j)**2 for i, j in zip(u, v)))


def is_solution_valid(s, tabu=[]):
cluster_empty = [True] * N_CLUSTERS

for cluster in s:
cluster_empty[cluster] = False

return not bool(list(filter(None, cluster_empty))) and s not in tabu


def generate_initial_solution():
s = [random.randrange(N_CLUSTERS) for _ in range(N_VECTORS)]

while not is_solution_valid(s):
s = [random.randrange(N_CLUSTERS) for _ in range(N_VECTORS)]

return s


def print_solution(s, name="Solution"):
print(name + ": [")
for i, cluster in enumerate(s):
end = "]\n" if i + 1 == len(s) \
else ",\n" if ((i + 1) % 23 == 0) else ", "
print(cluster, end=end)


def print_vector(v, name="Vector"):
print(name + ": [")
for i, attr in enumerate(v):
end = "]\n" if i + 1 == len(v) \
else ",\n" if ((i + 1) % 7 == 0) else ", "
print("{:.5f}".format(attr), end=end)


def neighbor_solution(s, n_changes=1):
new_s = s[:]

change_locations = get_change_locations(n_changes)

# print("Changes:")
for loc in change_locations:
old_cluster = s[loc]
new_cluster = random.randrange(N_CLUSTERS - 1)
new_cluster += int(new_cluster >= old_cluster)
# print("\tvector {}: ({} -> {})".format(loc, old_cluster, new_cluster))
new_s[loc] = new_cluster

return new_s


def get_change_locations(n):
locations = []
for _ in range(n):
loc = random.randrange(N_VECTORS)
# Make sure the locations are unique
while loc in locations:
loc = random.randrange(N_VECTORS)
locations.append(loc)
return locations


def objective_function(data, s, global_center):
centers = calculate_centers(data, s, global_center)
inter = calculate_inter(global_center, centers)
intra = calculate_intra(data, s, centers)

return (inter, intra, inter - intra)


def calculate_inter(global_center, centers):
return sum([distance(global_center, c) for c in centers]) / N_CLUSTERS


def calculate_intra(data, s, centers):
intras = [0.0] * N_CLUSTERS
vectors_in_cluster = [0] * N_CLUSTERS

for vector, cluster in zip(data, s):
vectors_in_cluster[cluster] += 1
intras[cluster] += distance(centers[cluster], vector)

for cluster, intra in enumerate(intras):
intras[cluster] = intra / vectors_in_cluster[cluster]

return statistics.fmean(intras)


def calculate_global_center(data):
gc = [0.0] * VECTOR_SIZE
for vector in data:
for i, attr in enumerate(vector):
gc[i] += attr

for i, attr in enumerate(gc):
gc[i] = attr / N_VECTORS

return gc


def calculate_centers(data, s, global_center):
centers = [[0.0] * VECTOR_SIZE for _ in range(N_CLUSTERS)]
vectors_in_cluster = [0] * N_CLUSTERS

for vector, cluster in zip(data, s):
vectors_in_cluster[cluster] += 1
for i, attr in enumerate(vector):
centers[cluster][i] += attr

for cluster, vector in enumerate(centers):
for i, attr in enumerate(vector):
centers[cluster][i] = attr / vectors_in_cluster[cluster]

return centers


def main():
data = read_benchmark_data(BENCHMARK_JSON)
init_values(data)
print("N_VECTORS", N_VECTORS)
print("VECTOR_SIZE", VECTOR_SIZE)
print("N_CLUSTERS", N_CLUSTERS)
global_center = calculate_global_center(data)
print_vector(global_center, "Global center")
s0 = generate_initial_solution()
print_solution(s0, "Initial solution")
f0 = objective_function(data, s0, global_center)

print("F = {:f} - {:f} = {:f}".format(*f0))

tabu = deque([], TABU_SIZE)
neighbors = []

for _ in range(N_ITERATIONS):
# Generate N_NEIGHBORS neighbor solutions of s0
# TODO: make sure they're all distinct
for _ in range(N_NEIGHBORS):
s = neighbor_solution(s0)
while not is_solution_valid(s, tabu):
s = neighbor_solution(s0)
f = objective_function(data, s, global_center)
neighbors.append((s, f))

best_neighbor = max(neighbors, key=lambda neighbor: neighbor[1][2])
s, f = best_neighbor
if f[2] > f0[2]:
print_solution(s, "New optimal solution")
print("F = {:f} - {:f} = {:f} ({:f}% improvement)"
.format(*f, math.fabs((f[2] - f0[2]) / f0[2] * 100)))
s0 = s
f0 = f

tabu.append(s)

print_solution(s0, "Final solution")
print("F = {:f} - {:f} = {:f}".format(*f0))


if __name__ == "__main__":
main()

Loading…
Cancel
Save