优秀的编程知识分享平台

网站首页 > 技术文章 正文

使用scikit - learn K - Means聚类指定自定义距离函数的探讨

nanyue 2025-05-22 12:29:28 技术文章 3 ℃

技术背景

在聚类分析中,K - Means是一种广泛使用的算法。通常,K - Means算法默认使用欧几里得距离来计算数据点与聚类中心的距离。然而,在某些场景下,欧几里得距离可能不是最佳选择,用户可能希望使用自定义的距离函数,如曼哈顿距离、余弦距离等。但scikit - learn当前的K - Means实现存在一定限制,需要探讨能否以及如何指定自定义距离函数。

实现步骤

1. 检查scikit - learn原生支持

scikit - learn当前版本的K - Means实现默认仅使用欧几里得距离,不直接支持指定自定义距离函数。

2. 其他替代方案

方案一:自定义K - Means实现

可以自己实现一个K - Means算法,支持使用scipy.spatial.distance中的20多种距离度量,或用户自定义的函数。以下是一个示例代码:

from __future__ import division
import random
import numpy as np
from scipy.spatial.distance import cdist
from scipy.sparse import issparse

def kmeans( X, centres, delta=.001, maxiter=10, metric="euclidean", p=2, verbose=1 ):
    if not issparse(X):
        X = np.asanyarray(X)
    centres = centres.todense() if issparse(centres) else centres.copy()
    N, dim = X.shape
    k, cdim = centres.shape
    if dim != cdim:
        raise ValueError( "kmeans: X %s and centres %s must have the same number of columns" % (
            X.shape, centres.shape ))
    if verbose:
        print "kmeans: X %s  centres %s  delta=%.2g  maxiter=%d  metric=%s" % (
            X.shape, centres.shape, delta, maxiter, metric)
    allx = np.arange(N)
    prevdist = 0
    for jiter in range( 1, maxiter+1 ):
        D = cdist_sparse( X, centres, metric=metric, p=p )
        xtoc = D.argmin(axis=1)
        distances = D[allx,xtoc]
        avdist = distances.mean()
        if verbose >= 2:
            print "kmeans: av |X - nearest centre| = %.4g" % avdist
        if (1 - delta) * prevdist <= avdist <= prevdist or jiter == maxiter:
            break
        prevdist = avdist
        for jc in range(k):
            c = np.where( xtoc == jc )[0]
            if len(c) > 0:
                centres[jc] = X[c].mean( axis=0 )
    if verbose:
        print "kmeans: %d iterations  cluster sizes:" % jiter, np.bincount(xtoc)
    if verbose >= 2:
        r50 = np.zeros(k)
        r90 = np.zeros(k)
        for j in range(k):
            dist = distances[ xtoc == j ]
            if len(dist) > 0:
                r50[j], r90[j] = np.percentile( dist, (50, 90) )
        print "kmeans: cluster 50 % radius", r50.astype(int)
        print "kmeans: cluster 90 % radius", r90.astype(int)
    return centres, xtoc, distances

def kmeanssample( X, k, nsample=0, **kwargs ):
    N, dim = X.shape
    if nsample == 0:
        nsample = max( 2*np.sqrt(N), 10*k )
    Xsample = randomsample( X, int(nsample) )
    pass1centres = randomsample( X, int(k) )
    samplecentres = kmeans( Xsample, pass1centres, **kwargs )[0]
    return kmeans( X, samplecentres, **kwargs )

def cdist_sparse( X, Y, **kwargs ):
    sxy = 2*issparse(X) + issparse(Y)
    if sxy == 0:
        return cdist( X, Y, **kwargs )
    d = np.empty( (X.shape[0], Y.shape[0]), np.float64 )
    if sxy == 2:
        for j, x in enumerate(X):
            d[j] = cdist( x.todense(), Y, **kwargs ) [0]
    elif sxy == 1:
        for k, y in enumerate(Y):
            d[:,k] = cdist( X, y.todense(), **kwargs ) [0]
    else:
        for j, x in enumerate(X):
            for k, y in enumerate(Y):
                d[j,k] = cdist( x.todense(), y.todense(), **kwargs ) [0]
    return d

def randomsample( X, n ):
    sampleix = random.sample( xrange( X.shape[0] ), int(n) )
    return X[sampleix]

def nearestcentres( X, centres, metric="euclidean", p=2 ):
    D = cdist( X, centres, metric=metric, p=p )
    return D.argmin(axis=1)

def Lqmetric( x, y=None, q=.5 ):
    return (np.abs(x - y) ** q) .mean() if y is not None else (np.abs(x) ** q) .mean()

class Kmeans:
    def __init__( self, X, k=0, centres=None, nsample=0, **kwargs ):
        self.X = X
        if centres is None:
            self.centres, self.Xtocentre, self.distances = kmeanssample(
                X, k=k, nsample=nsample, **kwargs )
        else:
            self.centres, self.Xtocentre, self.distances = kmeans(
                X, centres, **kwargs )

    def __iter__(self):
        for jc in range(len(self.centres)):
            yield jc, (self.Xtocentre == jc)

方案二:使用其他库

  • nltk:可以使用nltk.cluster.kmeans.KMeansClusterer指定自定义距离函数。示例代码如下:
from nltk.cluster.kmeans import KMeansClusterer
NUM_CLUSTERS = 5
data = your_data.toarray()
kclusterer = KMeansClusterer(NUM_CLUSTERS, distance=nltk.cluster.util.cosine_distance, repeats=25)
assigned_clusters = kclusterer.cluster(data, assign_clusters=True)
  • pyclustering:该库是Python/C++实现,速度较快,支持指定自定义距离函数。示例代码如下:
from pyclustering.cluster.kmeans import kmeans
from pyclustering.utils.metric import type_metric, distance_metric

user_function = lambda point1, point2: point1[0] + point2[0] + 2
metric = distance_metric(type_metric.USER_DEFINED, func=user_function)
start_centers = [[4.7, 5.9], [5.7, 6.5]]
kmeans_instance = kmeans(sample, start_centers, metric=metric)
kmeans_instance.process()
clusters = kmeans_instance.get_clusters()

方案三:修改scikit - learn源码

在scikit - learn 1.2.2版本中,可以替换sklearn.cluster._kmeans中的_euclidean_distances函数。示例代码如下:

import sklearn.cluster._kmeans as kmeans
from sklearn.metrics import pairwise_distances

def custom_distances(X, Y=None, Y_norm_squared=None, squared=False):
    if squared:
        return pairwise_distances(X,Y, metric='minkowski', p=1.5)
    else:
        return pairwise_distances(X,Y, metric='minkowski', p=1.5)
    
kmeans._euclidean_distances = custom_distances
kmeans.euclidean_distances = custom_distances
km = kmeans.KMeans(init="k-means++", n_clusters=clusters, n_init=4, random_state=0)

方案四:继承KMeans类

在scikit - learn 1.1.3版本中,可以创建一个继承自sklearn.cluster.KMeans的类,并重写其_transform方法。示例代码如下:

import sklearn.cluster
import numpy as np

def anchor_iou(box_dims, centroid_box_dims):
    box_w, box_h = box_dims[..., 0], box_dims[..., 1]
    centroid_w, centroid_h = centroid_box_dims[..., 0], centroid_box_dims[..., 1]
    inter_w = np.minimum(box_w[..., np.newaxis], centroid_w[np.newaxis, ...])
    inter_h = np.minimum(box_h[..., np.newaxis], centroid_h[np.newaxis, ...])
    inter_area = inter_w * inter_h
    centroid_area = centroid_w * centroid_h
    box_area = box_w * box_h
    return inter_area / (
        centroid_area[np.newaxis, ...] + box_area[..., np.newaxis] - inter_area
    )

class IOUKMeans(sklearn.cluster.KMeans):
    def __init__(
        self,
        n_clusters=8,
        *,
        init="k-means++",
        n_init=10,
        max_iter=300,
        tol=1e-4,
        verbose=0,
        random_state=None,
        copy_x=True,
        algorithm="lloyd",
    ):
        super().__init__(
            n_clusters=n_clusters,
            init=init,
            n_init=n_init,
            max_iter=max_iter,
            tol=tol,
            verbose=verbose,
            random_state=random_state,
            copy_x=copy_x,
            algorithm=algorithm
        )

    def _transform(self, X):
        return anchor_iou(X, self.cluster_centers_)

rng = np.random.default_rng(12345)
num_boxes = 10
bboxes = rng.integers(low=0, high=100, size=(num_boxes, 2))
kmeans = IOUKMeans(num_boxes).fit(bboxes)

最佳实践

  • 数据预处理:在使用自定义距离函数之前,对数据进行适当的预处理,如归一化等,以确保距离计算的有效性。
  • 距离函数选择:根据数据的特点和问题的需求选择合适的距离函数。例如,对于文本数据,余弦距离可能更合适;对于时间序列数据,动态时间规整(DTW)距离可能更合适。
  • 验证聚类结果:使用聚类评估指标(如轮廓系数、Calinski - Harabasz指数等)验证聚类结果的质量。

常见问题

  • 收敛问题:K - Means算法是基于欧几里得距离设计的,使用其他距离函数可能导致算法不收敛。在这种情况下,可以考虑使用K - Medoids等算法。
  • 性能问题:一些自定义距离函数的计算复杂度较高,可能会导致算法性能下降。可以通过优化代码或使用更高效的库来解决。
  • 中心计算问题:不同的距离函数可能需要不同的中心计算方法。例如,对于曼哈顿距离,需要使用中位数作为聚类中心,而不是平均值。

Tags:

最近发表
标签列表