Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgiving committed May 25, 2024
1 parent 074e691 commit 050719c
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 304 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ experiments/
*__pycache__
.DS_Store
.vscode
logs
7 changes: 1 addition & 6 deletions lkmeans/examples/data/experiment_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,16 @@ def get_experiment_data(num_clusters: int, dimension: int) -> tuple[int, float,
Function for generation the synthetic data for experiments
by dimension and number of clusters
'''
n_clusters: int = 0
prob: float = 0.
mu_prefix: list[list[float | int]] = [[]]
sigma_list: list[float | int] = []

if num_clusters == 2:
print('Experiment with 2 clusters')
n_clusters = 2
sigma_list = [1, 1]
prob = 0.5
mu_prefix = [[-4, 0], [4, 0]]

elif num_clusters == 3:
print('Experiment with 3 clusters')
n_clusters = 3
sigma_list = [1, 1, 1]
prob = 1/3
mu_prefix = [[4, 0, 0], [0, 4, 0], [0, 0, 4]]
Expand All @@ -35,4 +30,4 @@ def get_experiment_data(num_clusters: int, dimension: int) -> tuple[int, float,

mu_list = [np.array([x + [0] * (dimension - len(x))]) for x in mu_prefix]
cov_matrix = [get_covariance_matrix(sigma, dimension) for sigma in sigma_list]
return n_clusters, prob, mu_list, cov_matrix
return num_clusters, prob, mu_list, cov_matrix
138 changes: 0 additions & 138 deletions lkmeans/examples/experiment.py

This file was deleted.

136 changes: 89 additions & 47 deletions lkmeans/examples/main.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,96 @@
from argparse import ArgumentParser
from pathlib import Path
import time
from collections import defaultdict
from enum import Enum
from typing import Dict

import numpy as np
from numpy.typing import NDArray
from sklearn.metrics import accuracy_score, adjusted_mutual_info_score, adjusted_rand_score
from tap import Tap

from lkmeans.clustering import HardSSLKMeans, LKMeans, SoftSSLKMeans
from lkmeans.clustering.base import Clustering
from lkmeans.clustering.supervised.utils import select_supervisor_targets
from lkmeans.examples.data.experiment_data import get_experiment_data
from lkmeans.examples.experiment import run_experiment

parser = ArgumentParser()

parser.add_argument(
'--path',
type=Path,
default=Path('experiments'),
help='Path to save results'
)

parser.add_argument(
'--num-clusters',
type=int,
default=2
)


def main():
args = parser.parse_args()
experiments_path = args.path

minkowski_parameter = [0.2, 0.6, 1, 1.5, 2, 3, 5]
T_parameter = [0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
repeats = 100
n_points = [100, 500, 1000]

dimension = 20
n_clusters, prob, mu_list, cov_matrices = get_experiment_data(
num_clusters=args.num_clusters, dimension=dimension)

for points in n_points:
experiment_name = f'Clusters:{n_clusters}, points:{points}'
output_path = experiments_path / f'exp_{args.num_clusters}_points_{points}'

run_experiment(
n_clusters=n_clusters,
distance_parameters=T_parameter,
minkowski_parameters=minkowski_parameter,
repeats=repeats,
n_points=points,
cov_matrices=cov_matrices,
prob=prob,
from lkmeans.examples.data.points_generator import generate_mix_distribution


class ClusteringAlgorithmType(Enum):
LKMEANS = 'lkmeans'
SOFT_SS_LKMEANS = 'soft_ss_lkmeans'
HARD_SS_LKMEANS = 'hard_ss_lkmeans'


class ExperimentArguments(Tap):
minkowski_parameter: float
t_parameter: float
n_points: int
clustering_algorithm: ClusteringAlgorithmType = ClusteringAlgorithmType.LKMEANS

num_clusters: int = 2
dimension: int = 20
repeats: int = 10
supervision_ratio: float = 0


def get_clustering_algorithm(clustering_type: ClusteringAlgorithmType) -> Clustering:
clustering_map: Dict[clustering_type, Clustering] = {
ClusteringAlgorithmType.LKMEANS: LKMeans,
ClusteringAlgorithmType.SOFT_SS_LKMEANS: SoftSSLKMeans,
ClusteringAlgorithmType.HARD_SS_LKMEANS: HardSSLKMeans
}
return clustering_map[clustering_type]


def calculate_metrics(labels: NDArray, generated_labels: NDArray) -> Dict[str, float]:
return {
'ari': float(adjusted_rand_score(labels, generated_labels)),
'ami': float(adjusted_mutual_info_score(labels, generated_labels)),
'accuracy': float(accuracy_score(labels, generated_labels)),
}


def main() -> None:
args = ExperimentArguments(underscores_to_dashes=True).parse_args()

_, prob, mu_list, cov_matrices = get_experiment_data(args.num_clusters, args.dimension)

clustering = get_clustering_algorithm(args.clustering_algorithm)

average_result = defaultdict(list)

for _ in range(args.repeats):

clusters, labels, _ = generate_mix_distribution(
probability=prob,
mu_list=mu_list,
experiment_name=experiment_name,
output_path=output_path
cov_matrices=cov_matrices,
n_samples=args.n_points,
t=args.t_parameter
)

lkmeans = clustering(n_clusters=args.num_clusters, p=args.minkowski_parameter)

if args.clustering_algorithm is ClusteringAlgorithmType.LKMEANS:

experiment_time = time.perf_counter()
generated_labels = lkmeans.fit_predict(clusters)
else:
experiment_time = time.perf_counter()
supervisor_targets = select_supervisor_targets(labels, args.supervision_ratio)
generated_labels = lkmeans.fit_predict(clusters, supervisor_targets)
experiment_time = time.perf_counter() - experiment_time

metrics_dict = calculate_metrics(
labels=labels,
generated_labels=generated_labels,
)
result = {**metrics_dict, 'time': experiment_time, 'inertia': lkmeans.inertia_}
for key, value in result.items():
average_result[key].append(value)
for key, value in result.items():
average_result[key] = np.mean(value)
print(dict(average_result))


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 050719c

Please sign in to comment.