开源项目KafkaCenter 版本持续集成(CI)实践

开篇

本文简单介绍开源项目KafkaCenter 版本持续集成(CI)实践方案,主要解决了三个问题:

  1. 前后端项目编译
  2. 发布Github release包
  3. 制作docker镜像

希望能给你带来一点参考。

详细信息可以参考 https://github.com/xaecbd/KafkaCenter

正文

版本管理

KafkaCenter 后端服务是java,使用maven管理的,有多个module,为了做到版本一致,我们使用了${revision}。这个是maven3.5+ 才支持,主要是为了对CI友好。

例如:

父pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<?xml version="1.0" encoding="UTF-8"?>
<project >
<modelVersion>4.0.0</modelVersion>
<groupId>org.nesc.ec.bigdata</groupId>
<artifactId>KafkaCenter</artifactId>
<version>${revision}</version>
<packaging>pom</packaging>
<name>KafkaCenter</name>
<url>https://github.com/xaecbd/KafkaCenter</url>
<description>Kafka Center Platform</description>
...
<properties>
<revision>1.0.0-SNAPSHOT</revision>
</properties>

<modules>
<module>KafkaCenter-Base</module>
<module>KafkaCenter-Core</module>
</modules>
</project>

子module

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<?xml version="1.0"?>
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.nesc.ec.bigdata</groupId>
<artifactId>KafkaCenter</artifactId>
<version>${revision}</version>
</parent>
<artifactId>KafkaCenter-Base</artifactId>
<name>KafkaCenter-Base</name>
<url>https://github.com/xaecbd/KafkaCenter</url>

</project>

通过如下命令,可以在打包的时候指定版本号:

1
mvn -Drevision=2.1.0 -Dchangelist= clean package

Docker镜像

在项目根目录下新建docker文件夹,包含三个文件:

docker-build-release.sh build镜像及发布镜像的脚本

1
2
3
4
5
6
7
8
9
10
#!/usr/bin/env bash

DOCKER_IMAGE_NAME="xaecbd/kafka-center"
VERSION=${TRAVIS_TAG#v}
echo "KafkaCenter version: $VERSION"
echo "$DOCKER_PASSWORD" | docker login -u "$DOCKER_USERNAME" --password-stdin
cp $TRAVIS_BUILD_DIR/KafkaCenter-Core/target/*.jar $TRAVIS_BUILD_DIR/docker/
docker build -t $DOCKER_IMAGE_NAME:$VERSION $TRAVIS_BUILD_DIR/docker/
docker images
docker push $DOCKER_IMAGE_NAME:$VERSION

Dockerfile docker 镜像定义文件

1
2
3
4
5
6
7
8
9
10
FROM adoptopenjdk/openjdk8:jre8u252-b09-alpine
LABEL author="Turman.P.Du"
ENV PROJECT_BASE_DIR /opt/app/kafka-center/
WORKDIR ${PROJECT_BASE_DIR}

COPY *.jar ${PROJECT_BASE_DIR}/
COPY *.sh ${PROJECT_BASE_DIR}/

RUN chmod +x *.sh
ENTRYPOINT ["sh","start.sh"]

start.sh 应用启动脚本,非必须,只是我们习惯放这么个脚本,可以在应用启动前做一些工作。推荐在启动java应用前增加exec命令,这样可以让spring容器在docker容器停止运行前执行一些操作(可以用作应用停止前执行收尾工作,例如保存数据,停止不可中断的任务)。

1
2
3
4
5
6
7
8
9
10
11
12
13
#!/bin/sh
echo "PROJECT_BASE_DIR :"$PROJECT_BASE_DIR
#cd $APP_ROOT_DIR
cd $PROJECT_BASE_DIR

appName=`ls|grep .jar$`
echo start to run $appName

if [ -n "$JAVA_OPTIONS" ];then
exec java $JAVA_OPTIONS -jar $appName $@
else
exec java -jar $appName $@
fi

travis

github action已经很好了,我没有采用的原因是需要熟悉成本有些操作可能暂时做不到。而可以预见性的travis都能很好的做到。因此目前的方案是采用travis。

实现要求

通过新建tag(只能是tag触发),自动编译前后端代码,发布github release,构建docker镜像,发布镜像。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
language: java
jdk:
- openjdk8

cache:
directories:
- $HOME/.m2
- $HOME/.npm
- $TRAVIS_BUILD_DIR/

dist: trusty
jobs:
include:
- stage: ui_build
language: node_js
node_js: 10.15.2
script: cd KafkaCenter-Frontend && npm install && npm run build

- stage: GitHubRelease
install:
- echo GitHubRelease
script: mvn -Drevision=${TRAVIS_TAG#v} clean package -Dmaven.test.skip=true
before_deploy:
- mkdir -p $TRAVIS_BUILD_DIR/deploy
- cp $TRAVIS_BUILD_DIR/KafkaCenter-Core/target/*.gz $TRAVIS_BUILD_DIR/deploy
- rm -f $TRAVIS_BUILD_DIR/KafkaCenter-Core/target/*.gz
- ls -l $TRAVIS_BUILD_DIR/deploy
deploy:
provider: releases
api_key: $API_KEY
file_glob: true
skip_cleanup: true
file: deploy/*.tar.gz
on:
tags: true
after_deploy: rm -rf $TRAVIS_BUILD_DIR/deploy

- stage: BuildDockerImageforRelease
install:
- echo Build Docker Image for Release
before_script:
- chmod +x ./docker/docker-build-release.sh
script: ./docker/docker-build-release.sh

stages:
- name: ui_build
if: tag =~ /^v\d+\.\d+\.\d+.*$/
- name: GitHubRelease
if: tag =~ /^v\d+\.\d+\.\d+.*$/
- name: BuildDockerImageforRelease
if: tag =~ /^v\d+\.\d+\.\d+.*$/

notifications:
email: true

在travis管理页面需要配置API_KEYDOCKER_USERNAMEDOCKER_PASSWORD

参考

  1. Maven CI Friendly Versions
  2. spring-boot-docker
  3. docs.travis-ci.com

基于spring security oauth2 client最佳实践

开篇语

最近很少写文章,一个是确实是很忙,另外一个原因是没有什么深度的技术文章可写。之前写blog的原因是为了技术存档,便于自己某天需要的时候再去看看,另外是总结一下。这段时间不太想写种水文,这篇文章同样不是什么深度性的文章,不过确实困扰了我超过3天时间,网络上很多文章都没能解决我的问题,基本上大家是介绍整个oauth,体系很大,文章却写的不全,要么就是方案很复杂(有点追求,不想采用),对我几乎无帮助。

按说官网文档应该够全了,但是对于一个不熟悉spring security的人,想要快速入手,还是很难,文档我就看了很久也没有找到自己想要的。官方的demo局限于github,google。我想实现的是自定义的oauth2登录。

当我解决了以后,发现别的小伙伴也有类似的疑惑。索性就写下来,只是技巧,写最少的代码,最优雅的完成自己想要的功能。本篇文章不讲解oauth认证基本知识。

实践

1.引入相应的依赖包

1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-oauth2-client</artifactId>
</dependency>

2.参数配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring.security.oauth2.client.provider.customer.authorization-uri=http://xxxxxxxx/oauth2/v1/authorize
spring.security.oauth2.client.provider.customer.token-uri=http://xxxxxxxx/oauth2/v1/token
spring.security.oauth2.client.provider.customer.user-info-uri=http://xxxxxxxx/oauth2/v1/user-info
spring.security.oauth2.client.provider.customer.user-info-authentication-method=header
spring.security.oauth2.client.provider.customer.user-name-attribute=name

spring.security.oauth2.client.registration.app.client-id=xxxxxxxxxxx
spring.security.oauth2.client.registration.app.client-secret=xxxxxxxxxxx
spring.security.oauth2.client.registration.app.client-name=Client for user scope
spring.security.oauth2.client.registration.app.provider=customer
spring.security.oauth2.client.registration.app.scope=user
spring.security.oauth2.client.registration.app.redirect-uri={baseUrl}/login/oauth2/code/{registrationId}
spring.security.oauth2.client.registration.app.client-authentication-method=basic
spring.security.oauth2.client.registration.app.authorization-grant-type=authorization_code

3.代码实现

在页面登录按钮,添加跳转地址/oauth2/authorization/app 这个是默认的地址,可以通过配置文件修改

新建Oauth2LoginSecurityConfig 实现如下功能既可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Configuration
public class Oauth2LoginSecurityConfig extends WebSecurityConfigurerAdapter {

@Override
protected void configure(HttpSecurity http) throws Exception {
http.csrf().disable()
.authorizeRequests(a -> a
.antMatchers("/**").permitAll().anyRequest().authenticated()
)
.exceptionHandling(e -> e
.authenticationEntryPoint(new HttpStatusEntryPoint(HttpStatus.UNAUTHORIZED))
)
.formLogin()
.loginPage("/#/user/login")
.permitAll()
.and()
.logout().permitAll()
.and()
.oauth2Login().userInfoEndpoint().and().successHandler(new AuthenticationSuccessHandler() {
@Override
public void onAuthenticationSuccess(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Authentication authentication) throws IOException, ServletException {
OAuth2User oAuth2User = (OAuth2User) authentication.getPrincipal();
Map<String, Object> attributes = oAuth2User.getAttributes();
// ....登录成功以后,既可获取用户信息

// ..跳转到成功页面
httpServletResponse.sendRedirect("/#/home");
}
});
}

}

上面核心代码为.oauth2Login().userInfoEndpoint().and().successHandler() 这个完成获取code,根据code获取token,根据token获取user信息。熟悉oauth2 code的认证流程,应该就能明白。希望能给你带来一点点帮助。

后话

上面没有什么说的,不过最近越来越发现在职场上需要一种能力,快速学习的能力,对未知事物有非方向上认知错误。也就是说当我们对一个技术架构,或者框架不熟的情况下,一些基本技术常识往往就发挥很大的作用。在开发的路上少追求技巧性的东西,多追究一些理论和本质。这样对快速学习一个技术会有很大的帮助。

前言

Spring Boot应用监控有很多方案,例如elastic APM,Prometheus等。各有特色,本次实践采用方案:Micrometer+Prometheus+Grafana

选择Micrometer最重要的原因是他的设计很灵活,并且和spring boot 2.x集成度很高。对于jvm的监控很容易集成,难度很小。本次实践包含jvm监控和业务性能指标监控。

环境准备

  1. 搭建promethues

    1
    2
    3
    4
    5
    docker run \
    -p 9090:9090 \
    --name prometheus
    -v /tmp/prometheus.yml:/etc/prometheus/prometheus.yml \
    prom/prometheus
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    global:
    scrape_interval: 15s # By default, scrape targets every 15 seconds.
    evaluation_interval: 15s # By default, scrape targets every 15 seconds.
    # scrape_timeout is set to the global default (10s).
    # Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
    rule_files:
    # - "first.rules"
    # - "second.rules"

    # A scrape configuration containing exactly one endpoint to scrape:
    # Here it's Prometheus itself.
    scrape_configs:
    # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
    - job_name: 'demo_platform'

    # Override the global default and scrape targets from this job every 5 seconds.
    scrape_interval: 5s

    metrics_path: '/actuator/prometheus'
    # scheme defaults to 'http'.

    static_configs:
    - targets: ['127.0.0.1:8080']
  2. 搭建grafana

    1
    docker run -d -p 3000:3000 --name grafana grafana/grafana:6.5.0

Micrometer简介

Micrometer(译:千分尺) Micrometer provides a simple facade over the instrumentation clients for the most popular monitoring systems. 翻译过来大概就它提供一个门面,类似SLF4j。支持将数据写入到很多监控系统,不过我谷歌下来,很多都是后端接入的是Prometheus.

Micrometer提供了与供应商无关的接口,包括 timers(计时器)gauges(量规)counters(计数器)distribution summaries(分布式摘要)long task timers(长任务定时器)。它具有维度数据模型,当与维度监视系统结合使用时,可以高效地访问特定的命名度量,并能够跨维度深入研究。

支持的监控系统:AppOptics , Azure Monitor , Netflix Atlas , CloudWatch , Datadog , Dynatrace , Elastic , Ganglia , Graphite , Humio , Influx/Telegraf , JMX , KairosDB , New Relic , Prometheus , SignalFx , Google Stackdriver , StatsD , Wavefront

Micrometer提供的度量类库

Meter是指一组用于收集应用中的度量数据的接口,Meter单词可以翻译为”米”或者”千分尺”,但是显然听起来都不是很合理,因此下文直接叫Meter,理解它为度量接口即可。Meter是由MeterRegistry创建和保存的,可以理解MeterRegistryMeter的工厂和缓存中心,一般而言每个JVM应用在使用Micrometer的时候必须创建一个MeterRegistry的具体实现。Micrometer中,Meter的具体类型包括:TimerCounterGaugeDistributionSummaryLongTaskTimerFunctionCounterFunctionTimerTimeGauge。一个Meter具体类型需要通过名字和Tag(这里指的是Micrometer提供的Tag接口)作为它的唯一标识,这样做的好处是可以使用名字进行标记,通过不同的Tag去区分多种维度进行数据统计。

Spring Boot集成

与spring boot 集成,这里的metric主要是由spring actuator 提供

安装

1
2
3
4
5
6
7
8
9
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
management:
endpoint:
health:
enabled: false
endpoints:
web:
exposure:
include: '*'
exclude: env,beans
metrics:
enable:
http: false
hikaricp: false

这里有几个注意的点management.endpoint.health.enabled只是为了禁用spring 默认的健康检查,非必须。exclude: env,beans也不需要配置,只是在我项目中为了减少导出的metric。同理management.metrics.enable也是为了减少收集的数据,使用方法为你定义指标的前缀。

只有management.endpoints.web.exposure.include为必须的,这里也只是为了导出/actuator/prometheus,通过该地址可以访问到响应的metric信息。

可视化

访问 http://localhost:8080/actuator/prometheus 即可看到响应的metric信息。

在grafana中中导入JVM (Micrometer)

即可看到如下效果:

自定义业务性能监控

因为系统遗留监控代码的原因,这里采用的是全局静态方法实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected static Iterable<Tag> tags(String service, String category, String method) {
return Tags.of("service", service, "category", category, "method", method);
}

protected static Iterable<Tag> tags(String service, String category) {
return Tags.of("service", service, "category", category);
}

public static void controllerMetric(String service, MonitorMetric.MonitorOperationType type, String method, long time) {
try {
Metrics.counter(Constants.HTTP_REQUESTS_TOTAL, tags(service, type.name(), method)).increment();
Metrics.timer(Constants.REQUESTS_LATENCY, tags(service, type.name())).record(Duration.ofMillis(time));
} catch (Exception e) {
e.printStackTrace();
}
}

解释一下,这里可以统计出请求数和请求延迟。

对于每秒请求数据量,可以使用increase(http_requests_total{job=~"$job",instance=~"$instance"}[1m])

对于平均请求延迟,可以使用rate(timer_sum[1m])/rate(timer_count[1m])

对于Throughput 可以使用rate(timer_count[1m])

使用中的困惑

问题

Percentile histogramsDistribution summaries性能损失还无法确定,不过查看PrometheusTimer,结合测试,还是有一定的性能损失,不过这里未深入研究。

全局使用一些开发建议

可以在定义静态方法类,初始化的时候做一点配置,registry可以使用spring 注入进来例如:

1
2
@Autowired 
MeterRegistry registry;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public MonitorMetric(MeterRegistry registry) {
registry.config().meterFilter(
new MeterFilter() {
@Override
public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticConfig config) {
if (id.getName().startsWith("requests_latency")) {
return DistributionStatisticConfig.builder()
.percentiles(0.5, 0.75, 0.9)
.sla(1)
.expiry(Duration.ofMinutes(1))
.minimumExpectedValue(1L)
.build()
.merge(config);
}
return config;
}
});
Metrics.addRegistry(registry);
}

参考

  1. 使用 Micrometer 记录 Java 应用性能指标
  2. Micrometer 快速入门
  3. JVM应用度量框架Micrometer实战
  4. Micrometer Prometheus

一站式Kafka平台KafkaCenter-开源啦

Important: https://github.com/xaecbd/KafkaCenter

前言

经过一年的不断打磨,在团队成员的共同努力下,终于能以真实的面貌呈现在大家的面前,很开心,很激动。开源软件,只是为了和大家交个朋友,喜欢的话,star,star,star,重要的事情说三遍!

之前做过Kafka 平台化的一点经验分享,以至于很多小伙伴问了,这个东西有没有开源,在团队成员的共同努力下,欢迎感兴趣的同学加入我们,做点感兴趣的事。

KafkaCenter是什么?

KafkaCenter是Kafka 集群管理和维护,生产/消费监控,生态组件使用的统一一站式平台。

KafkaCenter 解决了什么问题

在给大家说我们解决什么问题之前,先说说在没有KafkaCenter之前我们的面临的问题。

我们面临的问题

  • 创建topic,人工处理化
  • 相关kafka运维,监控孤岛化
  • 现有消费监控工具监控不准确
  • 无法拿到Kafka 集群的summay信息
  • 无法快速知晓集群健康状态
  • 无法知晓业务对team kafka使用情况
  • kafka管理,监控工具稀少,没有一个好的工具我们直接可以使用
  • 无法快速查询topic消息

Kafka Center解决了哪些问题

  • 统一: 一个平台,一站式包含自助,管理,监控,运维,使用一体化。
  • 流程化: 创建topic流程化,做到对topic使用全生命周期管理。
  • 复用: 平台支持接入多个集群,复用性很高。
  • 成本: 只用部署一套程序,节省机器资源。降低运维成本,高效运维。
  • 生态: 目前已经接入connect,ksql。
  • 便捷: 提供便捷工具,让无需有kafka使用经验的人,都可以方便生产、消费消息。
  • 全局: 可以站在不同的维度查看目前kafka使用情况
  • 权限: 完善的权限设计,减少风险漏洞。

功能模块介绍

  • Home->
    查看平台管理的Kafka Cluster集群信息及监控信息
  • Topic->
    用户可以在此模块查看自己的Topic,发起申请新建Topic,同时可以对Topic进行生产消费测试。
  • Monitor->
    用户可以在此模块中可以查看Topic的生产以及消费情况,同时可以针对消费延迟情况设置预警信息。
  • Kafka Connect->
    实现用户快速创建自己的Connect Job,并对自己的Connect进行维护。
  • KSQL->
    实现用户快速创建自己的KSQL Job,并对自己的Job进行维护。
  • Approve->
    此模块主要用于当普通用户申请创建Topic,管理员进行审批操作。
  • Setting->
    此模块主要功能为管理员维护User、Team以及kafka cluster信息
  • Kafka Manager->
    此模块用于管理员对集群的正常维护操作。

说了这么多,还是给大家看看主要系统截图吧!








成为一个更好的架构师

本文翻译于SoftwareArchitect,原创翻译,有删减,介意请查看原文,转载请联系我。

扫码关注我

多年前,有人问我:”如何成为软件架构师?”。我认为需要必要的技能,经验以及积累知识所需的时间和奉献精神。

1. 内容

  • 软件架构师的定义
  • 软件架构的级别
  • 软件架构师的常规工作内容
  • 软件架构师的重要技能
  • 架构师技术路线图

2. 软件架构师的定义

在开始之前,让我们先看看这个定义。

  • 软件架构师是一位软件专家,他可以进行高层设计选择并决定技术标准,包括软件编码标准,工具和平台。首席专家被称为首席架构师。(来源:Wikipedia: Software Architect)

3. 软件架构的级别

架构可以被抽象成很多个级别,不同的人会有不同的分发,这里我更喜欢分成三个级别:

  • 应用程序级别:最低的体系结构级别。专注于一个单一的应用程序。非常详细的底层设计。通常在一个开发团队中进行沟通
  • 解决方案级别:中级架构。专注于满足业务需求(业务解决方案)的一个或多个应用程序。有一些高层次,但主要是低层次的设计。多个开发团队之间的沟通。
  • 企业级别:最高级别的体系结构。专注于多种解决方案。高层次的抽象设计,需要解决方案或应用程序架构师对其进行详细说明。整个组织的沟通,更多详情见链接

4. 软件架构师的常规工作内容

要了解架构师所需的必要技能,我们首先需要了解常规工作内容。我认为以下(非最终)清单包含最重要的工作内容:

  • 定义和决定开发技术和平台
  • 定义开发标准,例如编码标准,工具,审查流程,测试方法等。
  • 支持识别和理解业务需求
  • 设计系统并根据需求做出决策
  • 记录并传达架构定义,设计和决策
  • 检查并回顾架构和代码,例如,检查定义的模式和编码标准是否正确实施
  • 与其他架构师和利益相关者合作
  • 指导并咨询开发人员
  • 将高级设计改进为低级设计并详细说明。注意:架构是一项连续的工作,尤其是在将其应用于敏捷软件开发中时。因此,这些工作一遍又一遍地进行。

5. 软件架构师的重要技能

为了支撑常规工作内容,需要特定技能。根据我的经验,阅读书籍和讨论,我们可以将其归结为每个软件架构师应具备的以下 10 种技能:

1
设计,决定,简化,编码,文档,沟通,估计,平衡,咨询,市场

(1)设计

什么是好的设计?这可能是最重要和最具挑战性的问题。我将在理论和实践之间进行区分。以我的经验,两者的结合是最有价值的。让我们从理论开始:

  • 了解基本的设计模式:模式是架构师开发可维护系统所需的最重要工具之一。使用模式,您可以重复使用设计,以通过可靠的解决方案解决常见问题。由 GoF 编写的《设计模式:可重用的面向对象软件的要素》一书是所有从事软件开发的人必读的书。尽管该模式已发布 20 多年,但它们仍然是现代软件体系结构的基础。例如,本书描述了模型-视图-控制器(MVC)模式,该模式在许多领域都得到了应用,或者是更新模式的基础,例如 MVVM。

  • 深入研究模式和反模式:如果您已经了解所有基本的 GoF 模式,则可以使用更多的软件设计模式来扩展您的知识。或更深入地研究您感兴趣的领域。我最喜欢的应用程序集成之一是 Gregor Hohpe 撰写的“ Enterprise Integration Patterns”一书。无论两个应用程序需要交换数据,无论是来自某些旧系统的旧式文件交换还是现代微服务体系结构的交换,这本书都适用于各个领域。

  • 了解质量度量:定义架构不是终点。它有定义,应用和控制指南和编码标准的原因。您出于质量和非功能性要求而这样做。您想要一个可维护,可靠,适应性强,安全,可测试,可伸缩,可用等的系统。要实现所有这些质量属性,一件事情就是应用良好的架构工作。您可以开始更多地了解维基百科上的质量度量。理论很重要。如果您不想成为象牙塔架构师,那么实践同样重要,甚至更为重要。

  • 尝试并了解不同的技术堆栈:如果您想成为一名更好的架构师,我认为这是最重要的活动。试用(新)技术堆栈,并了解它们的兴衰。不同或新技术具有不同的设计方面和模式。您很可能从翻阅抽象幻灯片中不会学到任何东西,而是自己尝试一下,并感到痛苦或缓解。架构师不仅应该具有广泛的知识,而且在某些领域还应具有深厚的知识。掌握所有技术堆栈并不重要,但要对您所在地区的最重要知识有深入的了解。另外,请尝试不使用您所处领域的技术,例如,如果您深入 SAP R / 3,则还应该尝试 JavaScript,反之亦然。尽管如此,双方仍会对 SAP S / 4 Hana 的最新进展感到惊讶。例如,您可以自己尝试,然后免费在 openSAP 上课程。好奇并尝试新事物。还可以尝试一些您几年前不喜欢的东西。

  • 分析和理解应用模式:查看任何当前框架,例如 Angular。您可以在实践中研究很多模式,例如“可观察物”。尝试了解它如何在框架中应用,为什么要这样做。而且,如果您真的很专心,请更深入地研究代码并了解其实现方式。

  • 充满好奇并作为一个用户

(2)决定

架构师需要能够做出决定并指导项目或整个组织朝正确的方向发展。

  • 知道什么是重要的:不要浪费时间进行无关紧要的决定或活动。了解重要的事情。据我所知,没有一本书包含这些信息(如果您知道的话,请告诉我)。我个人最喜欢的是这两个特征,我通常会在评估某些重要事项时考虑这些特征:
    1. 概念完整性:如果您决定以一种方式来做,那就坚持下去,即使有时最好以其他方式去做。通常,这会导致更简单的总体概念,简化可理解性并简化维护
    2. 一致性:例如你定义了应用的命名,不用关心是大写还是小写,但是要做到所有的地方都是一个标准,一个表达方式。
  • 优先排序:某些决定至关重要。如果不及早采取措施,就会需要很多的解决方法,这些措施通常不太可能在以后删除,并且是维护的噩梦,或者更糟的是,开发人员只能停止工作,直到做出决定。在这种情况下,有时最好做出“错误”的决定而不是没有做决定。但是在遇到这种情况之前,请考虑优先考虑即将做出的决定。有不同的方法可以这样做。我建议看一下在敏捷软件开发中广泛使用的加权最短作业优先(WSJF)模型。特别是时间紧迫性和风险降低措施对于评估架构决策的优先级至关重要。
  • 了解自己的能力:不要决定能力之外的事情。这很关键,因为如果不考虑的话,它可能会严重破坏您作为架构师的地位。为避免这种情况,请与您的同伴明确您要承担的责任以及角色的一部分。如果架构师不止一个,那么您应该尊重当前部署的架构级别。作为较低级别的架构师,您最好提出有关高层架构的建议,而不是决策。此外,我建议始终与同伴一起检查关键决策。
  • 评估多个选项:涉及决策时,请始终布置多个选项。在我参与的大多数情况下,都有不止一种可能的(好的)选择。仅选择一个选项在两个方面都是不利的:(1)您似乎没有正确地完成工作,(2)阻碍了做出正确的决定。通过定义度量,可以基于事实而不是直觉来比较选项,例如许可费用或期限。这通常会导致更好和更可持续的决策。此外,可以轻松地将决策出售给不同的利益相关者。此外,如果您没有正确评估选项,则在讨论中可能会遗漏参数。

(3)简化

请记住解决问题的原则 Occam’s Razor(它更偏爱简单)。 我将原理解释为:如果您对问题的假设太多而无法解决,则可能会出错或导致不必要的复杂解决方案。 应该减少(简化)假设以得出好的解决方案。

  • 摇动解决方案:为了简化解决方案,通常有助于“摇动”解决方案并从不同位置查看它们。尝试通过自上而下和自下而上的方式来塑造解决方案。如果您有数据流或流程,请首先从左到右,再从右到左思考。提出以下问题:“在完美的世界中您的解决方案会发生什么?”或:“ X 公司/人会做什么?”(X 可能不是您的竞争对手,而是 GAFA 公司之一。)这两个问题都迫使您减少 Occam’s Razor 建议的假设。
  • 退后一步:经过长时间的深入讨论,通常会得出高度复杂的涂鸦。您永远都不应将这些视为最终结果。退后一步:再次查看全局(抽象级别)。还是有意义吗?然后再次在抽象级别上进行遍历并进行重构。有时,它有助于停止讨论并在第二天继续。至少我的大脑需要一些时间来处理并提出更好,更优雅,更简单的解决方案。
  • 分而治之:通过将问题分成更小的部分来简化问题。然后独立解决它们。然后验证小块是否匹配。退后一步以查看总体情况。
  • 重构不是邪恶的:如果找不到更好的主意,那么从更复杂的解决方案开始是完全可以的。如果解决方案遇到麻烦,您可以稍后重新考虑解决方案并应用您的学习。重构不是邪恶的。但是在开始重构之前,请记住要进行以下工作:(1)进行足够的自动化测试,以确保系统的正确功能;(2)从利益相关者获取支持。要了解有关重构的更多信息,建议阅读“Refactoring. Improving the Design of Existing Code”,作者是 Martin Fowler。

(4)编码

即使作为企业架构师(最抽象的体系结构级别),您仍然应该知道开发人员的日常工作。而且,如果您不了解如何完成此操作,则可能会遇到两个主要问题:

  1. 开发人员不会接受您的说法。
  2. 您不了解开发人员的挑战和需求。
  • 有一个附带项目:此项目的目的是尝试新技术和工具,以了解当今和未来的开发方式。经验是观察,情感和假设的结合(Kurt Schneider 的“软件工程中的经验和知识管理”)。阅读教程或一些利弊是好的。但这仅仅是“书籍知识”。仅当您自己尝试事物时,您才能体验到情绪并建立关于事物好坏的假设。而且,您使用某项技术的时间越长,您的假设就会越好。这将帮助您在日常工作中做出更好的决定。当我开始编程时,我没有代码完成,只有一些实用程序库可以加快开发速度。显然,在这种背景下,我今天会做出错误的决定。今天,我们拥有大量的编程语言,框架,工具,过程和实践。只有您对主要趋势有一定的经验和粗略的了解,才能参与对话并引导开发朝正确的方向发展。
  • 找到正确的事物进行尝试:您无法尝试所有事物。这根本是不可能的。您需要一种更有条理的方法。我最近发现的一种来源是 ThoughtWorks 的 Technology Radar。他们将技术,工具,平台,语言和框架分为四类:采用,试用,评估和保留。 “采用”表示“强烈准备为企业使用做好准备”,“试用”表示“企业应该在一个可以处理风险的项目中进行尝试”,“评估”表示“研究它如何影响您的企业”,“持有”表示“谨慎处理”。通过这种分类,更容易获得新事物的概述及其准备情况,以更好地评估下一步要探索的趋势。

(5)文档

架构文档有时或多或少地很重要。 重要文档是例如体系结构决策或代码准则。 编码开始之前通常需要初始文档,并且需要不断完善。 其他文档可以自动生成,因为代码也可以是文档,例如 UML 类图。

  • 干净的代码:如果操作正确,代码是最好的文档。一个好的架构师应该能够区分好的代码和坏的代码。罗伯特·C·马丁(Robert C. Martin)所著的“清洁代码”一书是了解更多关于好坏代码的宝贵资源。
  • 在可能的情况下生成文档:系统日新月异,很难更新文档。无论是关于 API 还是以 CMDB 形式出现的系统格局:基础信息经常变化太快而无法手动更新相应的文档。示例:对于 API,如果您是模型驱动的,则可以基于定义文件自动生成文档,也可以直接从源代码生成文档。为此,存在许多工具,我认为 Swagger 和 RAML 是学习更多内容的一个很好的起点。
  • 尽可能多地,尽可能少地进行:无论您需要记录什么文档(例如决策文件),都一次尝试仅关注一件事,并且仅包含关于这件事的必要信息。大量的文档很难阅读和理解。附加信息应存储在附录中。特别是对于决策文件,讲一个有说服力的故事而不是仅仅发表大量论据,更为重要。此外,这为您和您的同事节省了很多时间,而后者需要阅读。看看您过去做过的一些文档(源代码,模型,决策文件等),然后问自己以下问题:“是否包含所有必要的信息才能理解它?”,“确实需要哪些信息,并且可以省略吗?”和“文档中是否有红线?”。
  • 了解有关架构框架的更多信息:该点也可以应用于所有其他“技术”点。我把它放在这里,是因为 TOGAF 或 Zachmann 之类的框架正在提供“工具”,这些工具在文档站点上感觉很沉重,尽管它们的附加值并不限于文档。在这样的框架中获得认证可以教会您更系统地解决体系结构。

(6)沟通

根据我的观察,这是最被低估的技能之一。如果您的设计精湛,却无法传达您的想法,那么您的想法可能会受到较小的影响,甚至无法成功。

  • 了解如何交表达您的想法:在董事会或活动挂图上进行协作时,必须了解如何正确使用它,以构筑您和您的同行的思想。我发现《 UZMO-用笔思考》是提高我在这一领域技能的好资源。作为架构师,您通常不仅会参加会议,而且通常需要主持会议并主持会议。
  • 与大群人进行演讲:向小群或大群人展示您的想法对您来说应该是可行的。如果您对此感到不舒服,请开始向您最好的朋友介绍。慢慢扩大小组。这是您只能通过做和离开自己的舒适区来学习的东西。请耐心等待,此过程可能需要一些时间。
  • 找到正确的沟通水平:不同的利益相关者有不同的兴趣和看法。需要在其级别上对它们进行单独处理。在进行交流之前,请退后一步并检查您要共享的信息是否具有正确的级别,有关抽象性,内容,目标,动机等。示例:开发人员通常对解决方案的很少细节感兴趣,而经理则对此感兴趣。宁愿知道哪个选项可以节省最多的钱。
  • 经常交流:如果没有人知道,一个出色的架构将毫无价值。定期并在每个(组织)级别上分发目标体系结构及其背后的思想。安排与开发人员,建筑师和管理人员的会议,以向他们展示所需或已定义的方式。
  • 保持透明:定期交流只能部分缓解缺少的透明度。您需要使决策背后的原因透明化。特别是,如果人们不参与决策过程,则很难理解和遵循其背后的决策和理由。
  • 随时准备做一个演讲:总是会有人提出问题,而您想立即给出正确的答案。尝试始终将最重要的幻灯片放在一个可以显示和解释的合并集中。它为您节省了大量时间,并为您提供安全保护。

(7)估计

  • 了解基本的项目管理原则:作为架构师或首席开发人员,经常会要求您提供估计以实现您的想法:多长时间,花费多少,多少人,哪些技能等?当然,如果您打算引入新的工具或框架,则需要为此类“管理”问题提供答案。最初,您应该能够进行粗略的估算,例如几天,几个月或几年。并且不要忘记,这不仅涉及实现,还有更多活动需要考虑,例如需求工程,测试和修复错误。因此,您应该了解所使用的软件开发过程的活动。您可以应用以获得更好的估计的一件事是使用过去的数据并从中得出您的预测。如果您没有过去的数据,也可以尝试使用 Barry W. Boehm 的 COCOMO 之类的方法。如果您部署在敏捷项目中,请学习如何进行估算和正确计划:Mike Cohn 撰写的《敏捷估算和规划》一书对此领域提供了扎实的概述。
  • 评估“未知”架构:作为架构师,您还应该能够评估架构在当前或将来上下文中的适用性。这不是一件容易的事,但是您可以通过准备一系列常见于每种架构的问题来为它做准备。它不仅涉及架构,还涉及系统的管理方式,因为这也使您了解质量。我建议始终准备一些问题并准备使用。一些一般性问题的想法:
    1. 设计实践:体系结构遵循哪些模式?因此,它们是否正确使用?设计遵循红线还是增长不受控制?是否有清晰的结构和关注点分离?
    2. 开发实践:制定并遵循了代码准则?代码如何版本化?部署实践?
    3. 质量保证:测试自动化范围?静态代码分析到位并取得良好结果?同行评论到位?
    4. 安全性:有哪些安全概念?内置安全性?渗透测试或自动安全分析工具是否到位并经常使用?

(8)平衡

  • 质量是有代价的:前面我谈到了质量和非功能性需求。如果您过度使用体系结构,则会增加成本并可能降低开发速度。您需要平衡架构和功能需求。应避免过度设计。
  • 解决矛盾的目标:矛盾的目标的一个典型示例是短期和长期目标。项目通常倾向于构建最简单的解决方案,而架构师则具有长远的眼光。通常,简单的解决方案不适合长期解决方案,并且有被以后抛弃的风险(降低成本)。为了避免实现错误的方向,需要考虑两点:
    1. 开发人员和企业需要了解长期愿景及其收益,以适应其解决方案
    2. 负责预算的经理需要参与以了解财务影响。不必直接将 100%的远景图放置在适当的位置,但是发达的图块应该适合其中。
  • 冲突管理:架构师通常是具有不同背景的多个小组之间的粘合剂。这可能会导致不同级别的通信发生冲突。为了找到一个能够反映长期战略目标的平衡解决方案,通常架构师的作用就是帮助克服冲突。关于传播理论的起点是舒尔茨·冯·图恩的“四耳模型”。基于此模型,可以显示并推论很多。但是,该理论需要一些实践,在交流研讨会上应该有经验。

(9)咨询

在咨询和辅导方面,积极主动可能是您最好的选择。 如果询问您,通常为时已晚。 而您想要避免在项目现场上进行清理。 您需要以某种方式预见接下来的几周,几个月甚至几年的时间,并为下一步做好准备。

  • 有远见:如果您部署一个项目,无论是传统的瀑布式方法还是敏捷方法,您始终需要对要实现的中长期目标有一个远见。这不是一个详细的概念,而是一个通往每个人都可以工作的路线图。由于您无法一次完成所有工作(这是一段旅程),因此我更喜欢使用成熟度模型。它们给出了易于使用的清晰结构,并且每次都给出了当前的进度状态。对于不同的方面,我使用不同的模型,例如开发实践或持续交付。成熟度模型中的每个级别都有明确的要求,这些要求遵循 SMART 准则,以便轻松衡量是否已达到要求。我发现一个很好的例子是持续交付。
  • 建立实践社区(CoP):在共同兴趣小组之间交流经验和知识有助于分发思想和标准化方法。例如,您可以每三个月左右将所有 JavaScript 开发人员和架构师聚集在一个房间中,讨论过去和当前的挑战以及如何解决它们或采用新的方法论和方法。架构师可以共享,讨论和调整其愿景,开发人员可以共享经验并向同行学习。这样的回合不仅可以为企业带来极大的好处,也可以为个人本身带来极大的好处,因为它有助于建立更强大的网络并传播思想。还可以查看 SAFe 框架中的文章实践社区,该文章在敏捷环境中解释了 CoP 概念。
  • 进行公开会议:误解或模棱两可的原因之一是缺乏沟通。阻止固定时间段,例如每周 30 分钟,用于与同行交流热门话题。本届会议没有任何议程可以讨论。尝试当场解决小事。安排对更复杂主题的后续行动。

(10)市场

您的想法很棒,您已经很好地传达了他们的想法,但仍然没人愿意遵循吗?那么您可能缺乏营销技巧。

  • 激励并说服:公司如何说服您购买产品?他们证明了它的价值和好处。但不仅仅是 5 点。他们包装得很好,并使其尽可能容易消化。

    1. 原型:显示您的想法的原型。有很多用于创建原型的工具。对于喜欢 SAP 的企业,请访问 build.me,在其中您可以快速轻松地创建外观漂亮且可单击的 UI5 应用程序。
    2. 使用视频:除了“无聊的幻灯片”,您还可以显示一个视频,该视频可以演示您的想法或至少是方向。但是请不要过度营销:从长远来看,内容为王。如果您的话不正确,从长远来看,这将损害您的声誉。
  • 为您的想法而奋斗并坚持不懈:人们有时不喜欢您的想法,或者他们懒得遵循。如果您真的对自己的想法深信不疑,则应不断追求并“奋斗”。有时这是必要的。具有长期目标的体系结构决策通常不是最容易的:开发人员不喜欢它们,因为它们的开发更加复杂。经理们不喜欢它们,因为它们在短期内更昂贵。这是您要坚持不懈并进行谈判的工作。

  • 寻找盟友:很难独自建立或执行您的想法,甚至是不可能的。尝试找到可以支持和说服他人的盟友。使用您的网络。如果还没有,请立即开始构建。您可以从与(思想开放的)同事讨论您的想法开始。如果他们喜欢它,或者至少喜欢它的一部分,那么如果别人提出来,他们很可能会支持您的想法(“ X 的想法很有趣。”)。如果他们不喜欢,问为什么:也许您错过了什么?还是您的故事不够令人信服?下一步是找到具有决定权的盟友。要求开放的讨论。如果您担心讨论,请记住,有时您需要离开舒适区。

  • 重复,相信它:“ […]研究表明,反复接触某个观点会使人们相信该观点更为普遍,即使该观点仅来自一个人。”(来源:《金融品牌》)经常发布一些消息,这可以帮助说服人们。但请注意:从我的角度来看,应该明智地使用这种策略,因为它可能适得其反,成为糟糕的营销技巧。

6. 架构师技术路线图

7. 参考

  1. SoftwareArchitect

Newegg Kafka 平台化的一点经验

本文基于IT技术圈(西安)10月份线下沙龙整理而来,略有删减。

扫码关注我

1. 前言

  • Newegg Kafka 使用规模
  • Newegg Kafka 使用场景
  • Newegg Kafka 平台化KafkaCenter
  • KafkaCenter 解决了什么问题
  • KafkaCenter 惊鸿一瞥
  • KafkaCenter 技术上的探索

2. Newegg Kafka 使用规模

我们是一家小公司,对Kafka的使用有限,这里我就放出我们系统的一个统计吧,数据截止到2019-10-30,仅统计目前已经接入Kafka平台管理的产线环境数据

每天指标如下:

MessagesIn BytesIn BytesOut
1.9b 2.26TB 12.23TB

3. Newegg Kafka 使用场景

3.1 Kafka 使用场景

  • 异步处理
  • 日常系统解耦
  • 削峰
  • 提速
  • 广播

3.2 Newegg Kafka 使用场景

  • 异构数据同步(redis/hbase/sqlserver/cassandra/solr/es)
  • 网站流量数据/日志数据
  • 流式处理

4. Newegg Kafka 平台化

这里最要介绍两个部分,一个是kafka的监控体系,一个是平台化门户KafkaCenter

4.1 集群监控告警体系

集群监控告警体系

图片3
图片4
图片5
图片6

4.2 KafkaCenter(面向用户+运维的)

Kafka Center是一个kafka治理平台,是EC Bigdata Team多年kafka使用经验的落地实践,整合集群管理,集群运维,生产监控,消费监控,周边生态等统一一站式解决方案。
图片7

kafkaCenter

5. KafkaCenter 解决了什么问题

5.1 我们面临的问题

  • 创建topic,人工处理化
  • 相关kafka运维,监控孤岛化
  • 现有消费监控工具监控不准确
  • 无法拿到Kafka 集群的summay信息
  • 无法快速知晓集群健康状态
  • 无法知晓业务对team kafka使用情况
  • kafka管理,监控工具稀少,没有一个好的工具我们直接可以使用
  • 无法快速查询topic消息

5.2 Kafka Center解决了哪些问题

  • 统一: 一个平台,一站式包含自助,管理,监控,运维,使用一体化。
  • 流程化: 创建topic流程化,做到对topic使用全生命周期管理。
  • 复用: 平台支持接入多个集群,复用性很高。
  • 成本: 只用部署一套程序,节省机器资源。降低运维成本,高效运维。
  • 生态: 目前已经接入connect,未来即将接入ksql。
  • 便捷: 提供便捷工具,让无需有kafka使用经验的人,都可以方便生产、消费消息。
  • 全局: 可以站在不同的维度查看目前kafka使用情况
  • 权限: 完善的权限设计,减少风险漏洞。

5. KafkaCenter 惊鸿一瞥

功能模块图
核心功能预览

图片10
图片11
图片12
图片13
图片14
图片15
图片16

6. KafkaCenter 技术上的探索

在实现功能的基础外,我们还做了更多工程与技术上的的探索,这里就做些删减,如果想了解更多的内容,可以私信我。

  • Kafka消费监控算法
  • 前后端技术栈完全分离
  • CI/CD持续集成与发布
  • 跨数据中心监控解决方案

通常使用的kafka的用户都关注与消费延迟,对于延迟Lag的计算,是很多用户关心的,这里就简单说一下如何计算Lag.

在计算Lag之前先普及几个基本常识

LEO(LogEndOffset): 这里说的和官网说的LEO有点区别,主要是指对consumer可见的offset.即HW(High Watermark)

CURRENT-OFFSET: consumer消费到的具体位移
知道以上信息后,可知Lag=LEO-CURRENT-OFFSET。计算出来的值即为消费延迟情况。

6.1 Kafka消费监控算法

6.1.1 broker消费方式 offset 获取

实现思路

  1. 根据topic 获取消费该topic的group
  2. 通过使用KafkaAdminClient的describeConsumerGroups读取broker上指定group和topic的消费情况,可以获取到clientId,CURRENT-OFFSET,patition,host等
  3. 通过consumer获取LogEndOffset(可见offset)
  4. 将2与3处信息合并,计算Lag

6.1.2 zk消费方式 offset 获取

实现思路

  1. 根据topic 获取消费该topic的group
  2. 读取zookeeper上指定group和topic的消费情况,可以获取到clientId,CURRENT-OFFSET,patition。
  3. 通过consumer获取LogEndOffset(可见offset)
  4. 将2与3处信息合并,计算Lag

6.2 前后端技术栈完全分离

  • 服务端Springboot
  • 前端icework(React完整解决方案)

6.3 CI/CD持续集成与发布

6.4 跨数据中心监控解决方案

部署架构

Window下Docker Desktop搭建 Kubernetes

前言

本节主要讲解如何启用Kubernetes,以及如何搭建Kubernetes Dashboard。如果排除掉网络原因,本文没有任何意思,因为众所周知的原因,谷歌资源被墙,所以才存在搭建问题,这也就是写本文的原因。

因为不了解Kubernetes能做什么,所以才想着先搭建一个环境,玩一玩,看看这个到底能做什么。

准备

Docker Desktop 版本:2.1.0.1

支持Kubernetes版本:v1.14.3

查看这个版本很重要,具体查看About Docker Desktop菜单即可知道支持哪个版本的k8s。

首先安装Docker Desktop

安装Docker Desktop步骤略….

安装好Docker Desktop先别启用k8s。

其次拉取镜像

先把需要的镜像拉取下来,可以写个docker-k8s-images.bat,放入以下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/kube-proxy:v1.14.3
docker tag registry.cn-hangzhou.aliyuncs.com/google_containers/kube-proxy:v1.14.3 k8s.gcr.io/kube-proxy:v1.14.3
docker rmi registry.cn-hangzhou.aliyuncs.com/google_containers/kube-proxy:v1.14.3

docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/kube-scheduler:v1.14.3
docker tag registry.cn-hangzhou.aliyuncs.com/google_containers/kube-scheduler:v1.14.3 k8s.gcr.io/kube-scheduler:v1.14.3
docker rmi registry.cn-hangzhou.aliyuncs.com/google_containers/kube-scheduler:v1.14.3

docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/kube-controller-manager:v1.14.3
docker tag registry.cn-hangzhou.aliyuncs.com/google_containers/kube-controller-manager:v1.14.3 k8s.gcr.io/kube-controller-manager:v1.14.3
docker rmi registry.cn-hangzhou.aliyuncs.com/google_containers/kube-controller-manager:v1.14.3

docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/kube-apiserver:v1.14.3
docker tag registry.cn-hangzhou.aliyuncs.com/google_containers/kube-apiserver:v1.14.3 k8s.gcr.io/kube-apiserver:v1.14.3
docker rmi registry.cn-hangzhou.aliyuncs.com/google_containers/kube-apiserver:v1.14.3

docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/pause:3.1
docker tag registry.cn-hangzhou.aliyuncs.com/google_containers/pause:3.1 k8s.gcr.io/pause:3.1
docker rmi registry.cn-hangzhou.aliyuncs.com/google_containers/pause:3.1

docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/etcd:3.3.10
docker tag registry.cn-hangzhou.aliyuncs.com/google_containers/etcd:3.3.10 k8s.gcr.io/etcd:3.3.10
docker rmi registry.cn-hangzhou.aliyuncs.com/google_containers/etcd:3.3.10

docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/coredns:1.3.1
docker tag registry.cn-hangzhou.aliyuncs.com/google_containers/coredns:1.3.1 k8s.gcr.io/coredns:1.3.1
docker rmi registry.cn-hangzhou.aliyuncs.com/google_containers/coredns:1.3.1


docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/kubernetes-dashboard-amd64:v1.10.1
docker tag registry.cn-hangzhou.aliyuncs.com/google_containers/kubernetes-dashboard-amd64:v1.10.1 k8s.gcr.io/kubernetes-dashboard-amd64:v1.10.1
docker rmi registry.cn-hangzhou.aliyuncs.com/google_containers/kubernetes-dashboard-amd64:v1.10.1

其中kubernetes-dashboard-amd64为Kubernetes Dashboard,不是必须镜像以外,其他都是k8s必须的镜像。

最后启动Kubernetes

在Kubernetes菜单选项里,勾选所有的选项。然后执行kubectl get pods --namespace kube-system查看k8s相关容器是否启动。当启动必须的7个容器以后,再查看Docker Desktop左下角Kubernetes状态即为绿色。

1
2
3
4
5
6
7
8
9
C:\Users\lab>kubectl get pods --namespace kube-system
NAME READY STATUS RESTARTS AGE
coredns-fb8b8dccf-4w2ht 1/1 Running 1 17m
coredns-fb8b8dccf-b5vdv 1/1 Running 1 17m
etcd-docker-desktop 1/1 Running 0 16m
kube-apiserver-docker-desktop 1/1 Running 0 16m
kube-controller-manager-docker-desktop 1/1 Running 0 16m
kube-proxy-7w9lw 1/1 Running 0 17m
kube-scheduler-docker-desktop 1/1 Running 0 16m

搭建Kubernetes Dashboard

步骤1

部署Dashboard ,执行以下命令:

kubectl apply -f https://raw.githubusercontent.com/kubernetes/dashboard/v1.10.1/src/deploy/recommended/kubernetes-dashboard.yaml

注意不同Dashboard选择不同的版本配置文件,这里的地址可以在kubernetes/dashboard/releases获取不同版本文件。

步骤2 Creating sample user

新建dashboard-adminuser.yaml文件,填写如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
apiVersion: v1
kind: ServiceAccount
metadata:
name: admin-user
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: admin-user
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
name: admin-user
namespace: kube-system

步骤3 Bearer Token

步骤2完成,执行kubectl proxy既可以访问Dashboard,但是需要登录。执行如下命令:

kubectl -n kube-system describe secret $(kubectl -n kube-system get secret | grep admin-user | awk '{print $1}')

生成如下结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
Name:         admin-user-token-6gl6l
Namespace: kube-system
Labels: <none>
Annotations: kubernetes.io/service-account.name=admin-user
kubernetes.io/service-account.uid=b16afba9-dfec-11e7-bbb9-901b0e532516

Type: kubernetes.io/service-account-token

Data
====
ca.crt: 1025 bytes
namespace: 11 bytes
token: eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJhZG1pbi11c2VyLXRva2VuLTZnbDZsIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImFkbWluLXVzZXIiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51aWQiOiJiMTZhZmJhOS1kZmVjLTExZTctYmJiOS05MDFiMGU1MzI1MTYiLCJzdWIiOiJzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZS1zeXN0ZW06YWRtaW4tdXNlciJ9.M70CU3lbu3PP4OjhFms8PVL5pQKj-jj4RNSLA4YmQfTXpPUuxqXjiTf094_Rzr0fgN_IVX6gC4fiNUL5ynx9KU-lkPfk0HnX8scxfJNzypL039mpGt0bbe1IXKSIRaq_9VW59Xz-yBUhycYcKPO9RM2Qa1Ax29nqNVko4vLn1_1wPqJ6XSq3GYI8anTzV8Fku4jasUwjrws6Cn6_sPEGmL54sq5R4Z5afUtv-mItTmqZZdxnkRqcJLlg2Y8WbCPogErbsaCDJoABQ7ppaqHetwfM_0yMun6ABOQbIwwl8pspJhpplKwyo700OSpvTT9zlBsu-b35lzXGBRHzv5g_RA

现在访问:

1
http://localhost:8001/api/v1/namespaces/kube-system/services/https:kubernetes-dashboard:/proxy/

复制以上生成的token,填入token,即可显示如下页面:

至此k8s部署成功!Enjoy!

参考

  1. 如何成功启动 Docker 自带的 Kubernetes?
  2. kubernetes/dashboard
  3. Creating-sample-user

kafka幂等性和事务使用及实现原理

开篇

在开始这篇之前,先抛出问题,这章解决如下问题:

  1. 如何开启幂等性?
  2. 如何使用事务?
  3. 幂等性的原理
  4. 事务实现原理

正文

Producer 幂等性

Producer 的幂等性指的是当发送同一条消息时,数据在 Server 端只会被持久化一次,数据不丟不重,但是这里的幂等性是有条件的:

  • 只能保证 Producer 在单个会话内不丟不重,如果 Producer 出现意外挂掉再重启是无法保证的(幂等性情况下,是无法获取之前的状态信息,因此是无法做到跨会话级别的不丢不重);
  • 幂等性不能跨多个 Topic-Partition,只能保证单个 partition 内的幂等性,当涉及多个 Topic-Partition 时,这中间的状态并没有同步。

如果需要跨会话、跨多个 topic-partition 的情况,需要使用 Kafka 的事务性来实现。

使用方式:props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

当幂等性开启的时候acks即为all。如果显性的将acks设置为0,-1,那么将会报错Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.

示例:

1
2
3
4
5
6
7
8
9
10
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
kafkaProducer.send(new ProducerRecord<String, String>("truman_kafka_center", "1", "hello world.")).get();
kafkaProducer.close();

幂等性原理

幂等性是通过两个关键信息保证的,PID(Producer ID)和sequence numbers。

  • PID 用来标识每个producer client
  • sequence numbers 客户端发送的每条消息都会带相应的 sequence number,Server 端就是根据这个值来判断数据是否重复

producer初始化会由server端生成一个PID,然后发送每条信息都包含该PID和sequence number,在server端,是按照partition同样存放一个sequence numbers 信息,通过判断客户端发送过来的sequence number与server端number+1差值来决定数据是否重复或者漏掉。

通常情况下为了保证数据顺序性,我们可以通过max.in.flight.requests.per.connection=1来保证,这个也只是针对单实例。在kafka2.0+版本上,只要开启幂等性,不用设置这个参数也能保证发送数据的顺序性。

为什么要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于5

其实这里,要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于 5 的主要原因是:Server 端的 ProducerStateManager 实例会缓存每个 PID 在每个 Topic-Partition 上发送的最近 5 个batch 数据(这个 5 是写死的,至于为什么是 5,可能跟经验有关,当不设置幂等性时,当这个设置为 5 时,性能相对来说较高,社区是有一个相关测试文档),如果超过 5,ProducerStateManager 就会将最旧的 batch 数据清除。

假设应用将 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 设置为 6,假设发送的请求顺序是 1、2、3、4、5、6,这时候 server 端只能缓存 2、3、4、5、6 请求对应的 batch 数据,这时候假设请求 1 发送失败,需要重试,当重试的请求发送过来后,首先先检查是否为重复的 batch,这时候检查的结果是否,之后会开始 check 其 sequence number 值,这时候只会返回一个 OutOfOrderSequenceException 异常,client 在收到这个异常后,会再次进行重试,直到超过最大重试次数或者超时,这样不但会影响 Producer 性能,还可能给 Server 带来压力(相当于client 狂发错误请求)。

Kafka 事务性

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//Producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional_id-0");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<String, String>("truman_kafka_center", "key"+i, "hello world.")).get();
}
kafkaProducer.commitTransaction();
kafkaProducer.close();
//Consumer
Properties config = new Properties();
config.put("group.id", "test11");
config.put("bootstrap.servers", "127.0.0.1:9092");
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(config);
consumer.subscribe(Arrays.asList(TOPIC));
boolean isConsumer = true;
while (isConsumer) {
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer
.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("consumer message: key =" + record.key() + " value:" + record.value());
}
}
consumer.close();
}

事务实现原理

(1)查找TransactionCoordinator

通过transaction_id 找到TransactionCoordinator,具体算法是Utils.abs(transaction_id.hashCode %transactionTopicPartitionCount ),获取到partition,再找到该partition的leader,即为TransactionCoordinator。

(2)获取PID

凡是开启幂等性都是需要生成PID(Producer ID),只不过未开启事务的PID可以在任意broker生成,而开启事务只能在TransactionCoordinator节点生成。这里只讲开启事务的情况,Producer Client的initTransactions()方法会向TransactionCoordinator发起InitPidRequest ,这样就能获取PID。这里面还有一些细节问题,这里不探讨,例如transaction_id 之前的事务状态什么的。但需要说明的一点是这里会将 transaction_id 与相应的 TransactionMetadata 持久化到事务日志(_transaction_state)中。

(3)开启事务

Producer调用beginTransaction开始一个事务状态,这里只是在客户端将本地事务状态转移成 IN_TRANSACTION,只有在发送第一条信息后,TransactionCoordinator才会认为该事务已经开启。

(4)Consume-Porcess-Produce Loop

这里说的是一个典型的consume-process-produce场景:

1
2
3
4
5
6
7
8
9
10
11
12
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(1000));
producer.beginTransaction();
//start
for (ConsumerRecord record : records){
producer.send(producerRecord(“outputTopic1”, record));
producer.send(producerRecord(“outputTopic2”, record));
}
producer.sendOffsetsToTransaction(currentOffsets(consumer), group);
//end
producer.commitTransaction();
}

该阶段主要经历以下几个步骤:

  1. AddPartitionsToTxnRequest
  2. ProduceRequest
  3. AddOffsetsToTxnRequest
  4. TxnOffsetsCommitRequest

关于这里的详细介绍可以查看参考链接,或者直接查看官网文档!

(5)提交或者中断事务

Producer 调用 commitTransaction() 或者 abortTransaction() 方法来 commit 或者 abort 这个事务操作。

基本上经历以下三个步骤,才真正结束事务。

  1. EndTxnRequest
  2. WriteTxnMarkerRquest
  3. Writing the Final Commit or Abort Message

其中EndTxnRequest是在Producer发起的请求,其他阶段都是在TransactionCoordinator端发起完成的。WriteTxnMarkerRquest是发送请求到partition的leader上写入事务结果信息(ControlBatch),第三步主要是在_transaction_state中标记事务的结束。

参考

1.Kafka 事务性之幂等性实现

2.Kafka Exactly-Once 之事务性实现

3KIP-98 - Exactly Once Delivery and Transactional Messaging

Kafka 消息生产及消费原理

开篇

关于客户端生产和消费不在本文中探讨,本文主要集中在Kafka服务器端对消息如何存储和如何读取消息。

本文主要探讨如下问题:

  1. 服务器端接收到消息后如何处理?
  2. 如果我指定了一个offset,Kafka怎么查找到对应的消息?
  3. 如果我指定了一个timestamp,Kafka怎么查找到对应的消息?

正文

服务器端接收到消息处理流程

Kafka Server接受消息处理流程

KafkaApis是Kafka server处理所有请求的入口,在 Kafka 中,每个副本(replica)都会跟日志实例(Log 对象)一一对应,一个副本会对应一个 Log 对象。副本由ReplicaManager管理,对于消息的写入操作在Log与LogSement中进行的。

Log append处理流程

真正的日志写入,还是在 LogSegment 的 append() 方法中完成的,LogSegment 会跟 Kafka 最底层的文件通道、mmap 打交道。数据并不是实时持久化的,mmap只是写入了页缓存,并没有flush进磁盘,当满足if (unflushedMessages >= config.flushInterval) 才会真正写入磁盘。

Consumer 获取消息

指定offset,Kafka怎么查找到对应的消息

  1. 通过文件名前缀数字x找到该绝对offset 对应消息所在文件(log/index)
  2. offset-x为在文件中的相对偏移
  3. 通过相对偏移在index文件找到最近的消息的位置(使用二分查找)
  4. 在log文件从最近位置开始逐条寻找

首先根据offset获取LogSegment,即var segmentEntry = segments.floorEntry(startOffset)segmentEntry 是个抽象的对象,包含log、index,timeindex等对象。

接下来在index中获取position(物理位置),即val startOffsetAndSize = translateOffset(startOffset)

源码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): LogOffsetPosition = {
//二分查找index文件得到下标,再根据算法计算最近的position物理位置
val mapping = offsetIndex.lookup(offset)
//根据计算的起始位置开始遍历获取准确的position及size
log.searchForOffsetWithSize(offset, max(mapping.position, startingFilePosition))
}
public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) {
for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) {
long offset = batch.lastOffset();
if (offset >= targetOffset)
return new LogOffsetPosition(offset, batch.position(), batch.sizeInBytes());
}
return null;
}

这里说明一下index中根据下标计算偏移量地址与物理地址:物理地址=n*8+4,偏移量地址=n*8

1
2
3
4
5
6
7
private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize)

private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4)

override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
}

最后根据position在log中截取相应的message,log.slice(startPosition, fetchSize)

1
2
3
4
5
6
7
8
public FileRecords slice(int position, int size) throws IOException {
//省略校验代码
int end = this.start + position + size;
// handle integer overflow or if end is beyond the end of the file
if (end < 0 || end >= start + sizeInBytes())
end = start + sizeInBytes();
return new FileRecords(file, channel, this.start + position, end, true);
}

看到searchForOffsetWithSize有个疑问,上面代码显示返回给客户端的records是批量的,假如提交的offset是这批次的中间一个,那么返回给Consumer的message是有已经被消费过的信息,我感觉不可能是这样的,查看了server端代码,再未发现删除已消费的message逻辑。

Kafka设计者真的这么蠢??

随后我查看了客户端consumer源码有发现到如下代码:if (record.offset() >= nextFetchOffset)有对大于指定offset消息抛弃的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private Record nextFetchedRecord() {
while (true) {
if (records == null || !records.hasNext()) {
//略
} else {
Record record = records.next();
// skip any records out of range
if (record.offset() >= nextFetchOffset) {
// we only do validation when the message should not be skipped.
maybeEnsureValid(record);

// control records are not returned to the user
if (!currentBatch.isControlBatch()) {
return record;
} else {
// Increment the next fetch offset when we skip a control batch.
nextFetchOffset = record.offset() + 1;
}
}
}
}
}

至此得出结论:为了提高server端的响应速度,没有对批量消息进行解压缩,然后精准返回指定信息,而是在客户端解压消息,然后再抛弃已处理过的message,这样就不会存在重复消费的问题。这个问题纠结了半天,不知是否正确,仅是自己的理解,如果有哪位同学对这里有研究,欢迎指出问题。

指定timestamp,Kafka怎么查找到对应的消息

类似根据offset获取消息,不过中间是从timeindex中获取position,然后遍历对比timestamp,获取相应的消息。

参考

1.Kafka 源码解析之 Server 端如何处理 Produce 请求(十二)

2.Kafka 源码解析之 Server 端如何处理 Fetch 请求(十三)

Kafka 逻辑架构设计

开篇

本文主要探讨如下问题;

  1. Kafka架构设计
  2. Kafka的日志目录结构

正文

Kafka架构设计

kafka为分布式消息系统,由多个broker组成。消息是通过topic来分类的,一个topic下存在多个partition,每个partition又由多个segment构成。

发布订阅者模式

kafka集群架构

主题逻辑结构

Kafka的日志目录结构

0%