[go: nahoru, domu]

Skip to content

Commit

Permalink
重绘图
Browse files Browse the repository at this point in the history
  • Loading branch information
luokn committed May 21, 2022
1 parent a4135c2 commit 1070bed
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 143 deletions.
77 changes: 45 additions & 32 deletions src/adaboost.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ class AdaBoost:
Adaptive Boosting(自适应提升算法)
"""

def __init__(self, n_estimators: int):
def __init__(self, n_estimators: int, lr=0.01, eps=1e-5):
"""
Args:
n_estimators (int): 弱分类器个数
lr (float, optional): 学习率. Defaults to 0.01.
eps (float, optional): 误差下限. Defaults to 1e-5.
"""
self.n_estimators, self.estimators = n_estimators, [] # 弱分类器
self.estimators = [] # 弱分类器
self.n_estimators, self.lr, self.eps = n_estimators, lr, eps

def fit(self, X: np.ndarray, Y: np.ndarray, lr=0.01, eps=1e-5):
def fit(self, X: np.ndarray, y: np.ndarray):
"""
Args:
X (np.ndarray): ...
Expand All @@ -30,62 +33,72 @@ def fit(self, X: np.ndarray, Y: np.ndarray, lr=0.01, eps=1e-5):
"""
weights = np.full(len(X), 1 / len(X)) # 样本权重
for _ in range(self.n_estimators):
estimator = WeakEstimator()
error = estimator.fit(X, Y, weights=weights, lr=lr) # 带权重训练弱分类器
if error < eps: # 误差达到下限,提前停止迭代
estimator = WeakEstimator(lr=self.lr)
error = estimator.fit(X, y, weights=weights) # 带权重训练弱分类器
if error < self.eps: # 误差达到下限,提前停止迭代
break
alpha = np.log((1 - error) / error) / 2 # 更新弱分类器权重
weights *= np.exp(-alpha * Y * estimator(X)) # 更新样本权重
weights *= np.exp(-alpha * y * estimator(X)) # 更新样本权重
weights /= np.sum(weights) # 除以规范化因子
self.estimators += [(alpha, estimator)] # 添加此弱分类器

def __call__(self, X: np.ndarray):
pred = sum((alpha * estimator(X) for alpha, estimator in self.estimators))
return np.where(pred > 0, 1, -1)
def __call__(self, X: np.ndarray) -> np.ndarray:
y_pred = sum((alpha * estimator(X) for alpha, estimator in self.estimators))
return np.where(y_pred > 0, 1, -1)


class WeakEstimator: # 弱分类器, 一阶决策树
def __init__(self):
self.feature, self.threshold, self.sign = None, None, None # 划分特征、划分阈值,符号{-1,1}
def __init__(self, lr: float):
self.lr, self.feature, self.threshold, self.sign = lr, None, None, None # 划分特征、划分阈值,符号{-1,1}

def fit(self, X: np.ndarray, Y: np.ndarray, weights: np.ndarray, lr: float):
def fit(self, X: np.ndarray, y: np.ndarray, weights: np.ndarray):
min_error = float("inf") # 最小带权误差
for feature, x in enumerate(X.T):
for threshold in np.arange(np.min(x) - lr, np.max(x) + lr, lr):
for f, x in enumerate(X.T):
for threshold in np.arange(np.min(x) - self.lr, np.max(x) + self.lr, self.lr):
for sign in [1, -1]:
error = np.sum(weights[np.where(x > threshold, sign, -sign) != Y]) # 取分类错误的样本权重求和
error = np.sum(weights[np.where(x > threshold, sign, -sign) != y]) # 取分类错误的样本权重求和
if error < min_error:
self.feature, self.threshold, self.sign, min_error = feature, threshold, sign, error
self.feature, self.threshold, self.sign, min_error = f, threshold, sign, error
return min_error

def __call__(self, X: np.ndarray):
def __call__(self, X: np.ndarray) -> np.ndarray:
return np.where(X[:, self.feature] > self.threshold, self.sign, -self.sign)


def load_data():
x = np.stack([np.random.randn(500, 2) + np.array([2, 0]), np.random.randn(500, 2) + np.array([0, 2])])
y = np.stack([np.full([500], -1), np.full([500], 1)])
return x, y
def load_data(n_samples_per_class=500):
X = np.concatenate(
[
np.random.randn(n_samples_per_class, 2) + np.array([2, 0]),
np.random.randn(n_samples_per_class, 2) + np.array([0, 2]),
]
)
y = np.array([1] * n_samples_per_class + [-1] * n_samples_per_class)

training_set, test_set = np.split(np.random.permutation(len(X)), [int(len(X) * 0.6)])

return X, y, training_set, test_set


if __name__ == "__main__":
x, y = load_data()
X, y, training_set, test_set = load_data()

X_neg, X_pos = X[y == -1], X[y == 1]
plt.figure("AdaBoost", figsize=[12, 6])
plt.subplot(1, 2, 1)
plt.title("Ground Truth")
plt.scatter(x[0, :, 0], x[0, :, 1], color="r", marker=".")
plt.scatter(x[1, :, 0], x[1, :, 1], color="g", marker=".")
plt.scatter(X_neg[:, 0], X_neg[:, 1], marker=".")
plt.scatter(X_pos[:, 0], X_pos[:, 1], marker=".")

x, y = x.reshape(-1, 2), y.flatten()
adaboost = AdaBoost(50)
adaboost.fit(x, y)
pred = adaboost(x)
acc = np.sum(pred == y) / len(pred)
adaboost.fit(X[training_set], y[training_set])
y_pred = adaboost(X)
acc = np.sum(y_pred[test_set] == y[test_set]) / len(test_set)
print(f"Accuracy = {100 * acc:.2f}%")

x0, x1 = x[pred == -1], x[pred == 1]
X_neg, X_pos = X[y_pred == -1], X[y_pred == 1]
plt.subplot(1, 2, 2)
plt.title("Prediction")
plt.scatter(x0[:, 0], x0[:, 1], color="r", marker=".")
plt.scatter(x1[:, 0], x1[:, 1], color="g", marker=".")
plt.scatter(X_neg[:, 0], X_neg[:, 1], marker=".")
plt.scatter(X_pos[:, 0], X_pos[:, 1], marker=".")

plt.show()
75 changes: 41 additions & 34 deletions src/decision_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,61 +9,68 @@

class DecisionTree:
"""
Decision tree classifier(决策树分类器ID3生成算法)
Decision tree classifier(决策树分类器, ID3生成算法)
"""

def __init__(self):
self.rate, self.root = None, None

def fit(self, X: np.ndarray, Y: np.ndarray, rate: float = 0.95):
def fit(self, X: np.ndarray, y: np.ndarray, rate: float = 0.95):
self.rate = rate
self.root = self._create_tree(X, Y, np.arange(X.shape[0]), np.arange(X.shape[1]))
self.root = self.build_tree(X, y, np.arange(X.shape[0]), np.arange(X.shape[1]))

def __call__(self, X: np.ndarray):
return np.array([self._predict(self.root, x) for x in X])
return np.array([self.predict(self.root, x) for x in X])

def _predict(self, node, x: np.ndarray):
def predict(self, node, x: np.ndarray):
if isinstance(node, dict): # 如果节点是树(字典)类型
feature, trees = node["feature"], node["trees"]
return self._predict(trees[x[feature]], x) # 根据值进行下一次递归
col, trees = node["col"], node["trees"]
# 根据值进行下一次递归
return self.predict(trees[x[col]], x)
return node # 如果节点是叶子类型则直接返回该值

def _create_tree(self, X: np.ndarray, Y: np.ndarray, indices: np.ndarray, features: np.ndarray):
cat_counter = np.bincount(Y[indices]) # 计算类别个数
if len(features) == 0 or np.max(cat_counter) / len(indices) > self.rate: # 无特征可分或者满足一定的单一性
return np.argmax(cat_counter) # 返回最单一的类别
k = np.argmax([self._calc_info_gain(X, Y, indices, f) for f in features]) # 最大信息增益特征
feature = features[k]
features = np.delete(features, k) # 除去选择的特征
def build_tree(self, X: np.ndarray, y: np.ndarray, rows: np.ndarray, cols: np.ndarray):
cats = np.bincount(y[rows])

# 无特征可分或者满足一定的单一性
if len(cols) == 0 or np.max(cats) / len(rows) > self.rate:
return np.argmax(cats) # 返回最单一的类别

# 最大信息增益特征
k = np.argmax([self.calc_info_gain(X, y, rows, f) for f in cols])
col = cols[k]
cols = np.delete(cols, k) # 除去选择的特征

# 为选择的特征创建子树
trees = {
value: self._create_tree(X, Y, indices[X[indices, feature] == value], features)
for value in np.unique(X[indices, feature]).tolist() # 为该特征的每一个取值都建立子树
value: self.build_tree(X, y, rows[X[rows, col] == value], cols)
for value in np.unique(X[rows, col]).tolist() # 为该特征的每一个取值都建立子树
}
return {"feature": feature, "trees": trees}
return {"col": col, "trees": trees}

@staticmethod
def _calc_exp_ent(Y: np.ndarray, indices: np.ndarray): # 计算经验熵
prob = np.bincount(Y[indices]) / len(indices)
def calc_exp_ent(y: np.ndarray, rows: np.ndarray): # 计算经验熵
prob = np.bincount(y[rows]) / len(rows)
prob = prob[prob.nonzero()] # 除去0概率
return np.sum(-prob * np.log(prob)) # 经验熵

@classmethod
def _calc_cnd_ent(cls, X: np.ndarray, Y: np.ndarray, indices: np.ndarray, feature: int): # 计算条件熵
def calc_cnd_ent(cls, X: np.ndarray, y: np.ndarray, rows: np.ndarray, col: int): # 计算条件熵
ent = 0 # 经验条件熵
for value in np.unique(X[indices, feature]):
indices_ = indices[X[indices, feature] == value]
ent += len(indices_) / len(indices) * cls._calc_exp_ent(Y, indices_)
for value in np.unique(X[rows, col]):
indices_ = rows[X[rows, col] == value]
ent += len(indices_) / len(rows) * cls.calc_exp_ent(y, indices_)
return ent # 条件熵

@classmethod
def _calc_info_gain(cls, X: np.ndarray, Y: np.ndarray, indices: np.ndarray, feature: int): # 计算信息增益
exp_ent = cls._calc_exp_ent(Y, indices) # 经验熵
cnd_ent = cls._calc_cnd_ent(X, Y, indices, feature) # 经验条件熵
def calc_info_gain(cls, X: np.ndarray, y: np.ndarray, rows: np.ndarray, col: int): # 计算信息增益
exp_ent = cls.calc_exp_ent(y, rows) # 经验熵
cnd_ent = cls.calc_cnd_ent(X, y, rows, col) # 经验条件熵
return exp_ent - cnd_ent # 信息增益


def load_data():
x = np.array(
X = np.array(
[
[0, 0, 0],
[0, 0, 0],
Expand All @@ -83,19 +90,19 @@ def load_data():
[1, 1, 1],
]
)
y = np.where(x.sum(axis=1) >= 2, 1, 0)
return x, y
y = np.where(X.sum(axis=1) >= 2, 1, 0)
return X, y


if __name__ == "__main__":
x, y = load_data()
X, y = load_data()
decision_tree = DecisionTree()
decision_tree.fit(x, y, rate=0.95)
pred = decision_tree(x)
decision_tree.fit(X, y, rate=0.95)
y_pred = decision_tree(X)

print(decision_tree.root)
print(y)
print(pred)
print(y_pred)

acc = np.sum(pred == y) / len(pred)
acc = np.sum(y_pred == y) / len(y_pred)
print(f"Accuracy = {100 * acc:.2f}%")
42 changes: 21 additions & 21 deletions src/gmm.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,36 @@ class GMM:
Gaussian mixture model(高斯混合模型)
"""

def __init__(self, n_components: int):
def __init__(self, n_components: int, iterations=100, cov_reg=1e-06):
"""
Args:
n_components (int): 聚类类别数
"""
self.n_components, self.cov_reg = n_components, None
self.n_components, self.iterations, self.cov_reg = n_components, iterations, cov_reg
self.weights = np.full(self.n_components, 1 / self.n_components)
self.means, self.covs = None, None

def fit(self, X: np.ndarray, iterations=100, cov_reg=1e-06):
def fit(self, X: np.ndarray):
"""
Args:
X (np.ndarray): 输入
iterations (int, optional): 迭代次数. Defaults to 100.
cov_reg (float, optional): 防止协方差矩阵奇异的微小变量. Defaults to 1e-06.
"""
self.cov_reg = cov_reg
# 随机选择n_components个点作为高斯分布中心
self.means = np.array(X[random.sample(range(X.shape[0]), self.n_components)])

# 初始高斯分布协方差均为单位矩阵
self.covs = np.stack([np.eye(X.shape[1]) for _ in range(self.n_components)])
for _ in range(iterations):
G = self._expect(X) # E步
self._maximize(X, G) # M步
for _ in range(self.iterations):
G = self.expect(X) # E步
self.maximize(X, G) # M步

def __call__(self, X: np.ndarray):
G = self._expect(X)
G = self.expect(X)
return np.argmax(G, axis=1)

def _expect(self, X: np.ndarray): # E步
def expect(self, X: np.ndarray): # E步
C = np.zeros([X.shape[0], self.n_components])
for k, mean, cov in zip(range(self.n_components), self.means, self.covs):
dist = multivariate_normal(mean=mean, cov=cov)
Expand All @@ -53,7 +53,7 @@ def _expect(self, X: np.ndarray): # E步
S[S == 0] = self.n_components
return C / S

def _maximize(self, X: np.ndarray, G: np.ndarray): # M步
def maximize(self, X: np.ndarray, G: np.ndarray): # M步
N = np.sum(G, axis=0)
for k in range(self.n_components):
G_k = G[:, k].reshape(-1, 1)
Expand All @@ -65,31 +65,31 @@ def _maximize(self, X: np.ndarray, G: np.ndarray): # M步


def load_data():
x = np.stack(
X = np.stack(
[
np.random.multivariate_normal(mean=[4, 0], cov=[[2, 0], [0, 2]], size=[1000]),
np.random.multivariate_normal(mean=[0, 4], cov=[[2, 0], [0, 2]], size=[1000]),
]
)
return x
return X


if __name__ == "__main__":
x = load_data()
X = load_data()
plt.figure(figsize=[12, 6])
plt.subplot(1, 2, 1)
plt.title("Ground Truth")
plt.scatter(x[0, :, 0], x[0, :, 1], color="r", marker=".")
plt.scatter(x[1, :, 0], x[1, :, 1], color="g", marker=".")
plt.scatter(X[0, :, 0], X[0, :, 1], marker=".")
plt.scatter(X[1, :, 0], X[1, :, 1], marker=".")

x = x.reshape(-1, 2)
X = X.reshape(-1, 2)
gmm = GMM(2)
gmm.fit(x, iterations=1000)
pred = gmm(x)
gmm.fit(X)
y_pred = gmm(X)

x0, x1 = x[pred == 0], x[pred == 1]
x0, x1 = X[y_pred == 0], X[y_pred == 1]
plt.subplot(1, 2, 2)
plt.title("Prediction")
plt.scatter(x0[:, 0], x0[:, 1], color="r", marker=".")
plt.scatter(x1[:, 0], x1[:, 1], color="g", marker=".")
plt.scatter(x0[:, 0], x0[:, 1], marker=".")
plt.scatter(x1[:, 0], x1[:, 1], marker=".")
plt.show()
Loading

0 comments on commit 1070bed

Please sign in to comment.