Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Semi Supervised Clustering #30

Merged
merged 17 commits into from
May 4, 2024
4 changes: 2 additions & 2 deletions config/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,10 @@ exclude-too-few-public-methods=
ignored-parents=

# Maximum number of arguments for function / method.
max-args=5
max-args=7

# Maximum number of attributes for a class (see R0902).
max-attributes=7
max-attributes=15

# Maximum number of boolean expressions in an if statement (see R0916).
max-bool-expr=5
Expand Down
2 changes: 1 addition & 1 deletion lkmeans/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from lkmeans.clustering import LKMeans
from lkmeans.clustering import HardSSLKMeans, LKMeans, SoftSSLKMeans
132 changes: 0 additions & 132 deletions lkmeans/clustering.py

This file was deleted.

2 changes: 2 additions & 0 deletions lkmeans/clustering/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from lkmeans.clustering.supervised import HardSSLKMeans, SoftSSLKMeans
from lkmeans.clustering.unsupervised import LKMeans
48 changes: 48 additions & 0 deletions lkmeans/clustering/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from abc import ABC
from typing import Union

import numpy as np
from numpy.typing import NDArray

from lkmeans.clustering.utils import assign_to_cluster, set_type
from lkmeans.distance import DistanceCalculator
from lkmeans.optimizers import get_optimizer


class Clustering(ABC):
def __init__(self, n_clusters: int, *, p: Union[float, int] = 2,
max_iter: int = 100, max_iter_with_no_progress: int = 15) -> None:
self._n_clusters = n_clusters
self._max_iter = max_iter
self._p = p
self._max_iter_with_no_progress = max_iter_with_no_progress

self._distance_calculator = DistanceCalculator(self._p)
self._optimizer = get_optimizer(self._p)

self._inertia = 0.
self._cluster_centers = np.array([])

@property
def inertia_(self) -> float:
return self._inertia

@property
def cluster_centers_(self) -> NDArray:
return self._cluster_centers

@staticmethod
def _validate_data(data: NDArray, n_clusters: int) -> None:
if data.shape[0] < n_clusters:
raise ValueError(f'Clustering of {data.shape[0]} samples with {n_clusters} centers is not possible')

def predict(self, X: NDArray) -> list[int]:
X = set_type(X)
_, labels = assign_to_cluster(X, self._cluster_centers, self._n_clusters, self._distance_calculator)
return labels

def _get_repr_params(self) -> str:
return f'n_clusters={self._n_clusters}, p={self._p}'

def __repr__(self) -> str:
return f'{self.__class__.__name__} ({self._get_repr_params()})'
3 changes: 3 additions & 0 deletions lkmeans/clustering/supervised/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from lkmeans.clustering.supervised.hard_supervised_lkmeans import HardSSLKMeans
from lkmeans.clustering.supervised.soft_supervised_lkmeans import SoftSSLKMeans
from lkmeans.clustering.supervised.utils import assign_to_cluster_with_supervision, select_supervisor_targets
38 changes: 38 additions & 0 deletions lkmeans/clustering/supervised/hard_supervised_lkmeans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from copy import deepcopy

import numpy as np
from numpy.typing import NDArray

from lkmeans.clustering.supervised.supervised_clustering import SupervisedClustering
from lkmeans.clustering.supervised.utils import assign_to_cluster_with_supervision
from lkmeans.clustering.utils import calculate_inertia


class HardSSLKMeans(SupervisedClustering):

def _fit(self, X: NDArray, targets: NDArray) -> None:
self._validate_data(X, self._n_clusters)

centroids = self._init_supervised_centroids(X, self._n_clusters, targets)

iter_with_no_progress = 0
for _ in range(self._max_iter):
if iter_with_no_progress >= self._max_iter_with_no_progress:
break

bias_centroids = deepcopy(centroids)
clusters, _ = assign_to_cluster_with_supervision(X, centroids, self._n_clusters,
self._distance_calculator, targets)

# update centroids using the specified optimizer
for cluster_id, cluster in enumerate(clusters):
cluster = np.array(cluster, copy=True)
centroids[cluster_id] = deepcopy(self._optimize_centroid(cluster))

if np.array_equal(bias_centroids, centroids):
iter_with_no_progress += 1
else:
iter_with_no_progress = 0

self._inertia = calculate_inertia(X, centroids)
self._cluster_centers = deepcopy(centroids)
36 changes: 36 additions & 0 deletions lkmeans/clustering/supervised/soft_supervised_lkmeans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from copy import deepcopy

import numpy as np
from numpy.typing import NDArray

from lkmeans.clustering.supervised.supervised_clustering import SupervisedClustering
from lkmeans.clustering.utils import assign_to_cluster, calculate_inertia


class SoftSSLKMeans(SupervisedClustering):

def _fit(self, X: NDArray, targets: NDArray) -> None:
self._validate_data(X, self._n_clusters)

centroids = self._init_supervised_centroids(X, self._n_clusters, targets)

iter_with_no_progress = 0
for _ in range(self._max_iter):
if iter_with_no_progress >= self._max_iter_with_no_progress:
break

bias_centroids = deepcopy(centroids)
clusters, _ = assign_to_cluster(X, centroids, self._n_clusters, self._distance_calculator)

# update centroids using the specified optimizer
for cluster_id, cluster in enumerate(clusters):
cluster = np.array(cluster, copy=True)
centroids[cluster_id] = deepcopy(self._optimize_centroid(cluster))

if np.array_equal(bias_centroids, centroids):
iter_with_no_progress += 1
else:
iter_with_no_progress = 0

self._inertia = calculate_inertia(X, centroids)
self._cluster_centers = deepcopy(centroids)
53 changes: 53 additions & 0 deletions lkmeans/clustering/supervised/supervised_clustering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from abc import abstractmethod

import numpy as np
from numpy.typing import NDArray

from lkmeans.clustering.base import Clustering
from lkmeans.clustering.unsupervised.lkmeans import init_centroids
from lkmeans.clustering.utils import set_type


class SupervisedClustering(Clustering):

def _optimize_centroid(self, cluster: NDArray) -> NDArray:
data_dimension = cluster.shape[1]
new_centroid = np.array([])

for coordinate_id in range(data_dimension):
dimension_slice = cluster[:, coordinate_id]
value = self._optimizer(dimension_slice)
new_centroid = np.append(new_centroid, value)
new_centroid = np.array(new_centroid)
return new_centroid

def _init_supervised_centroids(self, data: NDArray, n_clusters: int, targets: NDArray) -> NDArray:
unique_targets = set(targets[~np.isnan(targets)])

centroids = []
for target_id in unique_targets:
supervised_data = data[targets == target_id]
centroid = self._optimize_centroid(supervised_data)
centroids.append(np.expand_dims(centroid, axis=0))
output_centroids = np.concatenate(centroids, axis=0)

if len(unique_targets) < n_clusters:
no_target_data = data[np.isnan(targets)]
remain_centroids = n_clusters - len(unique_targets)
padding_centroids = init_centroids(no_target_data, remain_centroids)
output_centroids = np.concatenate([output_centroids, padding_centroids], axis=0)
return output_centroids

@abstractmethod
def _fit(self, X: NDArray, targets: NDArray) -> None:
...

def fit(self, X: NDArray, targets: NDArray) -> None:
X = set_type(X)
self._fit(X, targets)

def fit_predict(self, X: NDArray, targets: NDArray) -> list[int]:
X = set_type(X)
self._fit(X, targets)
labels = self.predict(X)
return labels
36 changes: 36 additions & 0 deletions lkmeans/clustering/supervised/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from copy import deepcopy

import numpy as np
from numpy.typing import NDArray

from lkmeans.distance import DistanceCalculator


def select_supervisor_targets(targets: NDArray, selection_ratio: float) -> NDArray:
targets = targets.astype(np.float16)
num_not_selected_targets = len(targets) - int(len(targets) * selection_ratio)
not_selected_indices = np.random.choice(len(targets), num_not_selected_targets, replace=False)
output_targets = deepcopy(targets)
output_targets[not_selected_indices] = np.nan
return output_targets


def assign_to_cluster_with_supervision(
X: NDArray,
centroids: NDArray,
n_clusters: int,
distance_calculator: DistanceCalculator,
targets: NDArray,
) -> tuple[list[list[float]], list[int]]:
clusters = [[] for _ in range(n_clusters)]
labels = []

for point, real_target in zip(X, targets):
if not np.isnan(real_target):
centroid = int(real_target)
else:
distances_to_each_centroid = distance_calculator.get_pairwise_distance(point, centroids)
centroid = int(np.argmin(distances_to_each_centroid))
clusters[centroid].append(point)
labels.append(centroid)
return clusters, labels
1 change: 1 addition & 0 deletions lkmeans/clustering/unsupervised/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from lkmeans.clustering.unsupervised.lkmeans import LKMeans
Loading
Loading