JavaEE企业级实战项目 智牛股第四天 NACOS、ceph集群和Netty

JavaEE企业级实战项目 智牛股第四天 NACOS、ceph集群和Netty,第1张

交易平台 - Day 4 学习目标

目标1:Nacos背景与基本原理

目标2:Nacos的使用

目标3:Ceph分布式存储原理

目标4:Ceph部署与使用

目标5:Netty通讯机制

第1章 Nacos的介绍与使用 1. 目标
  • 了解Nacos的基本概念, 功能特性, 整体设计与工作处理机制。
  • 掌握Nacos的工程项目的整合使用
2. 分析
  • Nacos简介
  • Nacos的特性
  • Nacos与其他主流组件的横纵对比
  • Nacos的功能地图
  • Nacos的架构设计
  • 服务设计
  • 工程结构与代码实现
  • 服务启动与功能验证
3. 讲解 3.1 Nacos简介

Nacos是一款服务注册发现,配置和管理的开源组件。在2018年7月对外开源。

Nacos 提供了一组简单易用的特性集,快速实现动态服务发现、服务配置、服务元数据及流量管理。更容易地构建、交付和管理微服务平台。

Nacos官方文档地址

3.2 Nacos特性
  • 服务发现和服务健康监测

Nacos 支持基于 DNS 和基于 RPC 的服务发现。Nacos 提供对服务的实时的健康检查,阻止向不健康的主机或服务实例发送请求。Nacos 支持传输层 (PING 或 TCP)和应用层 (如 HTTP、MySQL、用户自定义)的健康检查。 对于复杂的云环境和网络拓扑环境中(如 VPC、边缘网络等)服务的健康检查,Nacos 提供了 agent 上报模式和服务端主动检测2种健康检查模式。

  • 动态配置服务

Nacos 提供了一个简洁易用的UI 管理所有的服务和应用的配置。提供包括配置版本跟踪、金丝雀发布、一键回滚配置以及客户端配置更新状态跟踪在内的一系列开箱即用的配置管理特性。

  • 动态 DNS 服务

动态 DNS 服务支持权重路由, 更灵活的路由策略、流量控制以及数据中心内网的简单DNS解析服务。动态DNS服务能更容易地实现以 DNS 协议为基础的服务发现,Nacos 提供了一些简单的 DNS APIs TODO 帮助管理服务的关联域名和可用的 IP:PORT 列表.

  • 服务及其元数据管理

Nacos 可视化管理所有服务及元数据,包括管理服务的描述、生命周期、服务的静态依赖分析、服务的健康状态、服务的流量管理、路由及安全策略、服务的 SLA 以及最首要的 metrics 统计数据。

3.3 Nacos横纵对比

3.4 Nacos功能地图

3.5 Nacos 架构设计
  1. 基本架构

主要分为Nacos Server 和 Nacos Console。 Nacos Server 主要包含:

  • ConfigService: 配置服务, 提供服务动态配置,元数据以及配置管理的服务的提供者。
  • Naming Service: 名称服务, 提供对象的名称与关联的元数据之间的映射管理服务,服务发现和DNS服务就是名称服务的主要应用场景, 比如ServiceName -> Endpoints Info; DNS Domain Name -> IP 。
  • 什么是元数据? 是指包括服务端点(endpoints)、服务标签、服务版本号、服务实例权重、路由规则、安全策略等描述服务的数据。
  1. 逻辑架构

采用领域模型, 模块化设计, 可插拔式插件管理; OpenAPI(标准Rest风格HTTP接口)统一接入,一致性协议与统一存储, 保障数据的一致性与分区容错性。

领域模型设计:

什么是领域设计? 业务领域内的人员, 比如专家、设计人员、开发人员, 以一种大家都能理解的语言作为相互交流的工具, 在交流过程中, 不断发现, 并抽象形成具体的概念, 将这些概念设计成一个领域模型, 由领域模型驱动软件设计,用代码来实现该领域模型, 这个就称为领域设计。

  • 数据模型: Nacos 数据模型 Key 由三元组唯一确定, Namespace默认是空串,公共命名空间(public),分组默认是 DEFAULT_GROUP。
  • 服务领域模型:该服务领域存在三个层级, 服务、集群与实例。服务包含健康检查、元数据、路由机制与阈值保护; 集群包含健康检查、元数据和同步机制; 实例则包含IP、端口、权重、运行状态、元数据和响应时间等。
  • 配置领域模型:主要两个实体, 一个是配置变更历史, 一个是服务标签, 通过ID关联 。
  1. UML类视图设计
3.6 服务设计
  • 配置服务

  • 注册服务

3.7 工程结构

  • nacos-demo : 父级工程, 管理依赖。

    POM文件依赖:

        
            
            
                org.springframework.boot
                spring-boot-starter-web
            
            
            
                com.alibaba.cloud
                spring-cloud-starter-alibaba-nacos-discovery
            
        
    
  • nacos-config-demo

    配置服务工程。

    NacosConfigDemoApplication类:

    @SpringBootApplication
    @RestController
    @RefreshScope
    public class NacosConfigDemoApplication {
        @Value(value = "${stockName:中国平安}")
        private String stockName;
        public static void main(String[] args) {
            SpringApplication.run(NacosConfigDemoApplication.class, args);
        }
    
        /**
         * 获取股票名称接口
         * @return
         */
        @RequestMapping("/getStock")
        public String getStock() {
            return "股票名称:" + stockName;
        }
    }
    

    @RefreshScope注解, 动态配置才能生效。

    bootstrap.properties配置文件:

    server.port=8080
    spring.application.name=nacos_config_demo
    spring.cloud.nacos.config.server-addr=127.0.0.1:8848
    spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
    # 指向配置中心的文件ID
    spring.cloud.nacos.config.shared-data-ids=nacos_config_demo.properties
    # 指定需要刷新的配置中心文件ID
    spring.cloud.nacos.config.refreshable-dataids=nacos_config_demo.properties
    

    要指定Nacos配置的data id, 注意需加上完整的扩展名, 动态配置生效, 需要加上refreshable-dataids,支持多个配置文件, 以逗号","分隔。

    一定要采用bootstrap.properties作文配置文件, 它比application.properties的启动优先级要高, 远程配置要先读取到配置中心的信息。

    POM依赖:

        
            
            
                org.springframework.cloud
                spring-cloud-alibaba-nacos-config
            
        
    
    
  • nacos-discovery-client

    调用股票价格服务的客户端工程。

    NacosDiscoveryClientApplication类:

    @SpringBootApplication
    @EnableDiscoveryClient
    @RestController
    public class NacosDiscoveryClientApplication {
        @Autowired
        private RestTemplate restTemplate;
    
        /**
         * 声明RestTemplate, 支持负载均衡
         * @return
         */
        @LoadBalanced
        @Bean
        public RestTemplate restTemplate() {
            return new RestTemplate();
        }
        public static void main(String[] args) {
            SpringApplication.run(NacosDiscoveryClientApplication.class, args);
        }
    
        /**
         * 客户端调用接口
         * @param name
         * @return
         */
        @RequestMapping("/client")
        public String client(@RequestParam String name) {
            return restTemplate.getForObject("http://nacos-discovery-server/getPrice?name=" + name, String.class);
        }
    }
    
    

    这里定义了HTTP请求接口, 通过RestTemplate方式调用股票价格服务。

    application.properties配置文件
    server.port=8092
    spring.application.name=nacos-discovery-client
    spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848

    
    
  • nacos-discovery-server

    股票价格服务工程

    NacosDiscoveryServerApplication类:

    @SpringBootApplication
    @EnableDiscoveryClient
    @RestController
    public class NacosDiscoveryServerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(NacosDiscoveryServerApplication.class, args);
        }
    
        /**
         * 获取股票价格接口
         * @param name
         * @return
         */
        @RequestMapping("/getPrice")
        public String getPrice(@RequestParam(defaultValue = "中国平安") String name) {
          return "股票名称:" + name + ", 股票价格:" + (new Random().nextInt(100-20)+20);
        }
    }
    
    

    application.properties配置文件:

    server.port=8091
    spring.application.name=nacos-discovery-server
    spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
    
    
3.8 启动验证
  1. 验证Nacos配置功能

    安装并启动好Nacos服务, Nacos地址

    访问地址: http://127.0.0.1:8080/getStock

    通过管理后台, 修改为”平安银行“

    后台日志已收到配置更新信息:

    再次访问地址, 正确更新:

  2. 验证Nacos的注册发现功能

    启动nacos-discovery-server与nacos-discovery-client两个服务。

    在nacos控制台可以看到两个服务已注册成功。

    访问客户端工程地址, http://127.0.0.1:8092/client?name=中国银行

    能够成功通过Nacos找到股票价格服务, 并进行访问。

4. 总结
  • 先了解Nacos基本概念, 功能与特性, 再与其他注册配置中心组件做对比, 加深理解。
  • 了解Nacos的功能地图, 知道Nacos具体能提供的服务作用。
  • 了解Nacos的架构设计, 工作机制, 如何处理实现。
  • 先做好服务规划设计, 体现Nacos的功能作用, 再搭建配置好工程, 学会如何在项目当中与Nacos整合, 最后再启动服务进行验证, 掌握Nacos在实际项目当中的运用。
第2章 Ceph介绍 1. 目标
  • 了解Ceph的基本概念, 功能与作用。
  • 了解Ceph的逻辑架构与工作机制
2. 分析
  • Ceph背景
  • Ceph特点
  • Ceph与主流存储系统对比
  • Ceph整体设计
  • Ceph核心组件与概念
  • Ceph的逻辑架构
  • Ceph的IO工作流程图
  • Ceph RBD IO流程图
  • Ceph 心跳机制
  • Ceph 通信机制
3. 讲解 3.1 背景

Ceph是一个去中心化的分布式存储系统, 提供较好的性能、可靠性和可扩展性。Ceph项目最早起源于Sage就读博士期间的工作(最早的成果于2004年发表),并随后贡献给开源社区。在经过了数年的发展之后,目前已得到众多云计算厂商的支持并被广泛应用。

3.2 特点

Ceph适合跨集群的小文件存储, 拥有以下特点:

  • 高性能

    采用CRUSH算法,数据分布均衡,并行度高,支持上千个存储节点, 支持TB及PB级数据。

  • 高可用性

    支持故障域分隔,数据强一致性; 没有单点故障,自动管理。

  • 高扩展性

    去中心化、灵活、随节点增加线性增长。

  • 场景丰富

    支持三种存储接口类型: 块存储、文件存储、对象存储。 同时支持自定义接口,C++为底层实现, 兼容多种语言。

    • 块存储: 将磁盘空间映射给主机使用, 适用 docker容器、虚拟机磁盘存储分配;日志存储, 文件存储。
    • 文件存储: 解决块存储无法共享问题, 在服务器架设FTP和NFS服务器,适用目录结构的存储、日志存储等。
    • 对象存储: 大容量硬盘, 安装存储管理软件, 对外提供读写访问能力, 具备块存储的高速读写能力, 也具备文件存储共享的特性; 适用图片存储或视频存储。
3.3 存储系统横纵对比
对比说明TFSFASTDFSMooseFSGlusterFSCEPH
开发语言C++CCCC++
数据存储方式文件/Trunk文件/块对象/文件/块
在线扩容支持支持支持支持支持
冗余备份支持支持支持支持支持
单点故障存在不存在存在不存在不存在
易用性安装复杂,官方文档少安装简单,社区相对活跃安装简单,官方文档多安装简单,官方文档专业化安装简单,官方文档专业化
适用场景跨集群的小文件单集群的中小文件单集群的大中文件跨集群云存储单集群的大中小文件
3.4 Ceph整体设计

  • 基础存储系统RADOS(Reliable, Autonomic,Distributed Object Store,即可靠的、自动化的、分布式的对象存储)

    这就是一个完整的对象存储系统,所有存储在Ceph系统中的用户数据事实上最终都是由这一层来存储的。而Ceph的高可靠、高可扩展、高性能、高自动化等等特性本质上也是由这一层所提供的。因此,理解RADOS是理解Ceph的基础与关键。物理上,RADOS由大量的存储设备节点组层,每个节点拥有自己的硬件资源(CPU、内存、硬盘、网络),并运行着 *** 作系统和文件系统。

  • 基础库librados

    这层的功能是对RADOS进行抽象和封装,并向上层提供API,以便直接基于RADOS(而不是整个Ceph)进行应用开发。特别要注意的是,RADOS是一个对象存储系统,因此,librados实现的API也只是针对对象存储功能的。RADOS采用C++开发,所提供的原生librados API包括C和C++两种。物理上,librados和基于其上开发的应用位于同一台机器,因而也被称为本地API。应用调用本机上的librados API,再由后者通过socket与RADOS集群中的节点通信并完成各种 *** 作。

  • 高层应用接口

    这层包括了三个部分:RADOS GW(RADOS Gateway)、 RBD(Reliable Block Device)和Ceph FS(Ceph File System),其作用是在librados库的基础上提供抽象层次更高、更便于应用或客户端使用的上层接口。其中,RADOS GW是一个提供与Amazon S3和Swift兼容的RESTful API的gateway,以供相应的对象存储应用开发使用。RADOS GW提供的API抽象层次更高,但功能则不如librados强大。因此,开发者应针对自己的需求选择使用。RBD则提供了一个标准的块设备接口,常用于在虚拟化的场景下为虚拟机创建volume。目前,Red Hat已经将RBD驱动集成在KVM/QEMU中,以提高虚拟机访问性能。Ceph FS是一个POSIX兼容的分布式文件系统( POSIX表示可移植 *** 作系统接口, 定义了 *** 作系统与应用程序交互的接口标准, linux和windows都要实现基本的posix标准, 程序在源代码级别就能实现可移植性)。由于还处在开发状态,因而Ceph官网并不推荐将其用于生产环境中。

  • 应用层

    这层是不同场景下对于Ceph各个应用接口的各种应用方式,例如基于librados直接开发的对象存储应用,基于RADOS GW开发的对象存储应用,基于RBD实现的云硬盘等等。在上文的介绍中,有一个地方可能容易引起困惑:RADOS自身既然已经是一个对象存储系统,并且也可以提供librados API,为何还要再单独开发一个RADOS GW?

    理解这个问题,事实上有助于理解RADOS的本质,因此有必要在此加以分析。粗看起来,librados和RADOS GW的区别在于,librados提供的是本地API,而RADOS GW提供的则是RESTfulAPI,二者的编程模型和实际性能不同。而更进一步说,则和这两个不同抽象层次的目标应用场景差异有关。换言之,虽然RADOS和S3、Swift同属分布式对象存储系统,但RADOS提供的功能更为基础和底层、 *** 作接口也更为丰富。这一点可以通过对比看出。

    由于Swift和S3支持的API功能近似,这里以Swift举例说明。Swift提供的API功能主要包括:

    • 用户管理 *** 作:用户认证、获取账户信息、列出容器列表等;
    • 容器管理 *** 作:创建/删除容器、读取容器信息、列出容器内对象列表等;
    • 对象管理 *** 作:对象的写入、读取、复制、更新、删除、访问许可设置、元数据读取或更新等。
3.5 核心组件与概念
  • Monitor 一个Ceph集群需要多个Monitor组成的小集群,它们通过Paxos同步数据,用来保存OSD的元数据。
  • OSD OSD全称Object Storage Device,也就是负责响应客户端请求返回具体数据的进程。一个Ceph集群一般都有很多个OSD。
  • MDS MDS全称Ceph Metadata Server,是CephFS服务依赖的元数据服务。
  • Object Ceph最底层的存储单元是Object对象,每个Object包含元数据和原始数据。
  • PG PG全称Placement Groups,是一个逻辑的概念,一个PG包含多个OSD。引入PG这一层其实是为了更好的分配数据和定位数据。
  • RADOS RADOS全称Reliable Autonomic Distributed Object Store,是Ceph集群的精华,用户实现数据分配、Failover等集群 *** 作。
  • Librados Librados是Rados提供库,因为RADOS是协议很难直接访问,因此上层的RBD、RGW和CephFS都是通过librados访问的,目前提供PHP、Ruby、Java、Python、C和C++支持。
  • CRUSH CRUSH是Ceph使用的数据分布算法,类似一致性哈希,让数据分配到预期的地方。
  • RBD RBD全称RADOS block device,是Ceph对外提供的块设备服务。
  • RGW RGW全称RADOS gateway,是Ceph对外提供的对象存储服务,接口与S3和Swift兼容。
  • CephFS CephFS全称Ceph File System,是Ceph对外提供的文件系统服务。
3.6 逻辑架构

Ceph Client 是 Ceph 文件系统的用户。 Ceph Metadata Daemon 提供了元数据服务器。 Ceph Object Storage Daemon 提供了实际存储(对数据和元数据两者)。 Ceph Monitor 提供了集群管理。

3.7 IO流程图

步骤:

  1. client连接monitor获取集群map信息。
  2. 同时新主osd1由于没有pg数据会主动上报monitor告知让osd2临时接替为主。
  3. 临时主osd2会把数据全量同步给新主osd1。
  4. client IO读写直接连接临时主osd2进行读写。
  5. osd2收到读写io,同时写入另外两副本节点。
  6. 等待osd2以及另外两副本写入成功。
  7. osd2三份数据都写入成功返回给client, 此时client io读写完毕。
  8. 如果osd1数据同步完毕,临时主osd2会交出主角色。
  9. osd1成为主节点,osd2变成副本。
3.8 Ceph RBD 块存储 IO流程图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jsOic4TM-1650684168165)(images/ceph rbd io flow.webp)]

osd写入过程:

  1. 采用的是librbd的形式,使用librbd创建一个块设备,向这个块设备中写入数据。
  2. 在客户端本地通过调用librados接口,然后经过pool,rbd,object、pg进行层层映射,在PG这一层中,可以知道数据是保存在哪三个OSD上,这三个OSD分别为主从的关系。
  3. 客户端与primary OSD建立SOCKET 通信,将要写入的数据传给primary OSD,由primary OSD再将数据发送给其他replica OSD数据节点。
3.9 Ceph 心跳和故障检测机制

问题:

故障检测时间和心跳报文带来的负载, 如何权衡降低压力?

  1. 心跳频率太高则过多的心跳报文会影响系统性能。
  2. 心跳频率过低则会延长发现故障节点的时间,从而影响系统的可用性。

故障检测策略应该能够做到:

及时性:节点发生异常如宕机或网络中断时,集群可以在可接受的时间范围内感知。

适当的压力:包括对节点的压力,和对网络的压力。

容忍网络抖动:网络偶尔延迟。

扩散机制:节点存活状态改变导致的元信息变化需要通过某种机制扩散到整个集群。

OSD节点会监听public、cluster、front和back四个端口

  • public端口:监听来自Monitor和Client的连接。
  • cluster端口:监听来自OSD Peer的连接。
  • front端口:客户端连接集群使用的网卡, 这里临时给集群内部之间进行心跳。
  • back端口:在集群内部使用的网卡。集群内部之间进行心跳。
  • hbclient:发送ping心跳的messenger(送信者)。

Ceph OSD之间相互心跳检测

  • 同一个PG内OSD互相心跳,他们互相发送PING/PONG信息。
  • 每隔6s检测一次(实际会在这个基础上加一个随机时间来避免峰值)。
  • 20s没有检测到心跳回复,加入failure队列。
3.10 Ceph 通信机制

网络通信框架三种不同的实现方式:

  • Simple线程模式
    • 特点:每一个网络链接,都会创建两个线程,一个用于接收,一个用于发送。
    • 缺点:大量的链接会产生大量的线程,会消耗CPU资源,影响性能。
  • Async事件的I/O多路复用模式
    • 特点:这种是目前网络通信中广泛采用的方式。新版默认已经使用Asnyc异步方式了。
  • XIO方式使用了开源的网络通信库accelio来实现
    • 特点:这种方式需要依赖第三方的库accelio稳定性,目前处于试验阶段。

消息的内容主要分为三部分:

  • header //消息头类型消息的信封
  • user data //需要发送的实际数据
    • payload // *** 作保存元数据
    • middle //预留字段
    • data //读写数据
  • footer //消息的结束标记

步骤:

  • Accepter监听peer的请求, 调用 SimpleMessenger::add_accept_pipe() 创建新的 Pipe, 给 SimpleMessenger::pipes 来处理该请求。

  • Pipe用于消息的读取和发送。该类主要有两个组件,Pipe::Reader,Pipe::Writer用来处理消息读取和发送。

  • Messenger作为消息的发布者, 各个 Dispatcher 子类作为消息的订阅者, Messenger 收到消息之后, 通过 Pipe 读取消息,然后转给 Dispatcher 处理。

  • Dispatcher是订阅者的基类,具体的订阅后端继承该类,初始化的时候通过 Messenger::add_dispatcher_tail/head 注册到 Messenger::dispatchers. 收到消息后,通知该类处理。

  • DispatchQueue该类用来缓存收到的消息, 然后唤醒 DispatchQueue::dispatch_thread 线程找到后端的 Dispatch 处理消息。

更多参考:Ceph介绍及原理架构分享

4. 总结
  • 从Ceph背景开始, 了解其特点, 与主流存储系统对比, 更好的了解其优势和适用场景;
  • 从Ceph整体设计分析, 了解其核心组件, 专业术语与概念, 加深理解。
  • 从逻辑架构图, 先了解Ceph的工作流程, 再了解其核心, IO处理流程以及RBD的处理流程, 从而了解其工作机制; 作为分布式文件存储系统, 了解其心跳机制和通信机制, 帮助我们理解Ceph在集群高可用方面的处理。
第3章 Ceph集群使用 1. 目标
  • 了解Ceph的基本用法, 与项目的集成使用(因为Ceph必须集群模式使用, 配置较为复杂,本章可以不要求掌握集群安装)
2. 步骤
  • Ceph的部署结构
  • Ceph安装准备工作, 系统配置修改
  • Ceph集群搭建配置
  • Ceph安装管理后台
  • 创建Cephfs与客户端连接验证
  • Ceph与项目的集成使用
3. 实现

集群安装较为复杂, 可先跳过,通过我提供安装好的虚拟机, 直接至3.7节学习如何 *** 作使用。

3.1 部署结构

虚拟机创建三台服务器,CENTOS版本为7.6, IP网端10.10.20.0/24。三台主机名称为:

  • CENTOS7-1: IP为10.10.20.11, 既做管理节点, 又做子节点。
  • CENTOS7-2: IP为10.10.20.12, 子节点。
  • CENTOS7-2: IP为10.10.20.13, 子节点。
3.2 系统配置

系统配置工作, 三台节点依次执行:

  1. 修改主机名称 [root@CENTOS7-1 ~]# vi /etc/hostname

     CENTOS7-1
    
  2. 编辑hosts文件

    10.10.20.11 CENTOS7-1
    
    10.10.20.12 CENTOS7-2
    
    10.10.20.13 CENTOS7-3
    

    注意, 这里面的主机名称要和节点名称保持一致, 否则安装的时候会出现问题

  3. 修改yum源 vi /etc/yum.repos.d/ceph.repo, 为避免网速过慢问题, 这里采用的是清华镜像源:

    [Ceph]
    
    name=Ceph packages for $basearch
    
    baseurl=https://mirrors.tuna.tsinghua.edu.cn/ceph/rpm-mimic/el7/x86_64/
    
    enabled=1
    
    gpgcheck=1
    
    type=rpm-md
    
    gpgkey=https://download.ceph.com/keys/release.asc
    
    
    
    [Ceph-noarch]
    
    name=Ceph noarch packages
    
    # 官方源
    
    #baseurl=http://download.ceph.com/rpm-mimic/el7/noarch
    
    # 清华源
    
    baseurl=https://mirrors.tuna.tsinghua.edu.cn/ceph/rpm-mimic/el7/noarch/
    
    enabled=1
    
    gpgcheck=1
    
    type=rpm-md
    
    gpgkey=https://download.ceph.com/keys/release.asc
    
    
    
    [ceph-source]
    
    name=Ceph source packages
    
    baseurl=https://mirrors.tuna.tsinghua.edu.cn/ceph/rpm-mimic/el7/SRPMS/
    
    enabled=1
    
    gpgcheck=1
    
    type=rpm-md
    
    gpgkey=https://download.ceph.com/keys/release.asc
    
    
    
    
    
  4. 安装ceph与ceph-deploy组件

    yum update && yum -y install ceph ceph-deploy 
    

    安装完成, 如果执行ceph-deploy出现ImportError: No module named pkg_resources

    安装python2-pip: yum -y install python2-pip

  5. 安装NTP时间同步工具

    yum install ntp ntpdate ntp-doc -y
    

    确保时区是正确, 设置开机启动:

    systemctl enable ntpd
    

    并将时间每隔1小时自动校准同步。编辑 vi /etc/rc.d/rc.local 追加:

    /usr/sbin/ntpdate ntp1.aliyun.com > /dev/null 2>&1; /sbin/hwclock -w
    

    配置定时任务, 执行crontab -e 加入:

    0 */1 * * * ntpdate ntp1.aliyun.com > /dev/null 2>&1; /sbin/hwclock -w
    
3.3 免密码SSH登陆
  1. 官方建议不用系统内置用户, 创建名为ceph_user用户, 密码也设为ceph_user:

    useradd -d /home/ceph_user -m ceph_user
    
    passwd ceph_user
    
  2. 设置sudo权限

    echo "ceph_user ALL = (root) NOPASSWD:ALL" | sudo tee /etc/sudoers.d/ceph_user
    
    sudo chmod 0440 /etc/sudoers.d/ceph_user
    

    1、2两个步骤依次在三台机器上执行。

    接下来在主节点, 继续执行:

  3. 生成密钥:

    切换用户: su ceph_user

    执行ssh-keygen,一直按默认提示点击生成RSA密钥信息。

  4. 分发密钥至各机器节点

    ssh-copy-id ceph_user@CENTOS7-1
    
    ssh-copy-id ceph_user@CENTOS7-2
    
    ssh-copy-id ceph_user@CENTOS7-3
    
  5. 修改管理节点上的 ~/.ssh/config 文件, 简化SSH远程连接时的输入信息:

    管理节点是会有root和ceph_user多个用户, ssh远程连接默认会以当前用户身份进行登陆,

    如果我们是root身份进行远程连接, 还是需要输入密码, 我们想简化, 该怎么处理?

    切换root身份,

    su root 
    

    修改~/.ssh/config 文件

    Host CENTOS7-1
    
       Hostname CENTOS7-1
    
       User ceph_user
    
    Host CENTOS7-2
    
       Hostname CENTOS7-2
    
       User ceph_user
    
    Host CENTOS7-3
    
       Hostname CENTOS7-3
    
       User ceph_user
    

    注意修改文件权限, 不能采用777最大权限:

    chmod 600 ~/.ssh/config
    

    进行ssh远程连接时, Host的主机名称是区分大小写的, 所以要注意配置文件的主机名称。

  6. 开放端口, 非生产环境, 可以直接禁用防火墙:

    systemctl stop firewalld.service
    
    systemctl disable firewalld.service
    
  7. SELINUX设置

    SELinux设为禁用:

    setenforce 0
    

    永久生效: 编辑 vi /etc/selinux/config修改:

    SELINUX=disabled
    
3.4 集群搭建配置

采用root身份进行安装

  1. 在管理节点创建集群配置目录,cd /usr/local:

    mkdir ceph-cluster
    
    cd ceph-cluster
    

    注意: 此目录作为ceph *** 作命令的基准目录, 会存储处理配置信息。

  2. 创建集群, 包含三台机器节点:

    ceph-deploy new CENTOS7-1  CENTOS7-2 CENTOS7-3
    

    创建成功后, 会生一个配置文件。

  3. 如果接下来集群的安装配置出现问题, 可以执行以下命令清除, 再重新安装:

    ceph-deploy purge CENTOS7-1  CENTOS7-2 CENTOS7-3
    
    ceph-deploy purgedata CENTOS7-1  CENTOS7-2 CENTOS7-3
    
    ceph-deploy forgetkeys
    

    将三台节点的mon信息也删除

    rm -rf /var/run/ceph/
    
  4. 修改配置文件, 有些配置后面需用到:

    vi /usr/local/ceph-cluster/ceph.conf
    

    加入:

    [global]
    
    public network = 10.10.20.0/24
    
    # 设置pool池默认分配数量
    
    osd pool default size = 2
    
    # 容忍更多的时钟误差
    
    mon clock drift allowed = 2
    
    mon clock drift warn backoff = 30
    
    # 允许删除pool
    
    mon_allow_pool_delete = true
    
    [mgr]
    
    # 开启WEB仪表盘
    
    mgr modules = dashboard
    

    第一项为副本数, 设为2份。

    第二项为对外IP访问网段,注意根据实际IP修改网段。

    第三、四项为允许一定时间的漂移误差。

  5. 执行安装:

    ceph-deploy install  CENTOS7-1  CENTOS7-2 CENTOS7-3
    

    如果出现错误:

    ceph_deploy][ERROR ] RuntimeError: Failed to execute command: ceph --version
    

    可以在各节点上单独进行安装:

    yum -y install ceph 
    

    如果没有仓库文件ceph.repo, 按上面的步骤手工创建。

  6. 初始monitor信息:

    ceph-deploy mon create-initial
    

    执行完成后, 会生成以下文件:

  7. 同步管理信息:

    下发配置文件和管理信息至各节点:

    ceph-deploy admin  CENTOS7-1  CENTOS7-2 CENTOS7-3
    
  8. 安装mgr(管理守护进程), 大于12.x版本需安装, 我们装的是最新版,需执行:

    ceph-deploy mgr create CENTOS7-1  CENTOS7-2 CENTOS7-3
    
  9. 安装OSD(对象存储设备)

    注意: 新版本的OSD没有prepare与activate命令。

    这里需要新的硬盘作为OSD存储设备, 关闭虚拟机, 增加一块硬盘, 不用格式化。

    重启, fdisk -l 查看新磁盘名称:

    执行创建OSD命令:

    ceph-deploy osd create --data /dev/sdb CENTOS7-1
    

    三台节点都需分别依次执行。

  10. 验证节点:

    输入ceph health 或 ceph -s查看, 出现HEALTH_OK代表正常。

通过虚拟机启动, 如果出现错误:

[root@CENTOS7-1 ~]# ceph -s
  cluster:
    id:     0ec99aa9-e97e-43d3-b5b9-90eb21c4abff
    health: HEALTH_WARN
            1 filesystem is degraded
            1 osds down
            1 host (1 osds) down
            Reduced data availability: 41 pgs inactive
            Degraded data redundancy: 134/268 objects degraded (50.000%), 22 pgs degraded, 87 pgs undersized
            39 slow ops, oldest one blocked for 2286 sec, daemons [osd.0,mon.CENTOS7-2,mon.CENTOS7-3] have slow ops.
            clock skew detected on mon.CENTOS7-2, mon.CENTOS7-3
 
  services:
    mon: 3 daemons, quorum CENTOS7-1,CENTOS7-2,CENTOS7-3
    mgr: centos7-1(active), standbys: centos7-3, centos7-2
    mds: fs_test-1/1/1 up  {0=centos7-1=up:replay}
    osd: 3 osds: 1 up, 2 in
 
  data:
    pools:   9 pools, 128 pgs
    objects: 134  objects, 64 KiB
    usage:   1.0 GiB used, 19 GiB / 20 GiB avail
    pgs:     32.031% pgs unknown
             134/268 objects degraded (50.000%)
             65 active+undersized
             41 unknown
             22 active+undersized+degraded

在各节点执行命令, 确保时间同步一致:

ntpdate ntp1.aliyun.com 
3.5 安装管理后台
  1. 开启dashboard模块

    ceph mgr module enable dashboard
    
  2. 生成签名

    ceph dashboard create-self-signed-cert
    
  3. 创建目录

    mkdir mgr-dashboard

    [root@CENTOS7-1 mgr-dashboard]# pwd
    
    /usr/local/ceph-cluster/mgr-dashboard
    
  4. 生成密钥对

    cd  /usr/local/ceph-cluster/mgr-dashboard
    
    openssl req -new -nodes -x509   -subj "/O=IT/CN=ceph-mgr-dashboard" -days 3650   -keyout dashboard.key -out dashboard.crt -extensions v3_ca
    
    [root@CENTOS7-1 mgr-dashboard]# ll
    
    total 8
    
    -rw-rw-r-- 1 ceph_user ceph_user 1155 Jul 14 02:26 dashboard.crt
    
    -rw-rw-r-- 1 ceph_user ceph_user 1704 Jul 14 02:26 dashboard.key
    
  5. 启动dashboard

    ceph mgr module disable dashboard
    
    ceph mgr module enable dashboard
    
  6. 设置IP与PORT

    ceph config set mgr mgr/dashboard/server_addr 10.10.20.11
    
    ceph config set mgr mgr/dashboard/server_port 18843
    
  7. 关闭HTTPS

    ceph config set mgr mgr/dashboard/ssl false
    
  8. 查看服务信息

    [root@CENTOS7-1 mgr-dashboard]# ceph mgr services
    
    {
    
        "dashboard": "https://10.10.20.11:8443/"
    
    }
    
  9. 设置管理用户与密码

    ceph dashboard set-login-credentials admin admin
    
  10. 访问

3.6 创建Cephfs

集群创建完后, 默认没有文件系统, 我们创建一个Cephfs可以支持对外访问的文件系统。

  1. 创建两个存储池, 执行两条命令:

    ceph osd pool create cephfs_data 128
    
    ceph osd pool create cephfs_metadata 64
    

    少于5个OSD可把pg_num设置为128

    OSD数量在5到10,可以设置pg_num为512

    OSD数量在10到50,可以设置pg_num为4096

    OSD数量大于50,需要计算pg_num的值

    通过下面命令可以列出当前创建的存储池:

    ceph osd lspools
    
  2. 创建fs, 名称为fs_test:

    ceph fs new fs_test cephfs_metadata cephfs_data
    
  3. 状态查看, 以下信息代表正常:

    [root@CENTOS7-1 mgr-dashboard]# ceph fs ls
    
    name: fs_test, metadata pool: cephfs_metadata, data pools: [cephfs_data ]
    
    [root@CENTOS7-1 mgr-dashboard]# ceph mds stat
    
    fs_test-1/1/1 up  {0=centos7-1=up:active}
    

    附: 如果创建错误, 需要删除, 执行:

    ceph fs rm fs_test --yes-i-really-mean-it
    
    ceph osd pool delete cephfs_data cephfs_data  --yes-i-really-really-mean-it
    

    确保在ceph.conf中开启以下配置:

    [mon]
    
    mon allow pool delete = true
    
  4. 采用fuse挂载

    先确定ceph-fuse命令能执行, 如果没有, 则安装:

     yum -y install ceph-fuse
    
  5. 创建挂载目录

    mkdir -p /usr/local/cephfs_directory
    
  6. 挂载cephfs

    [root@node3 ~]# ceph-fuse -k /etc/ceph/ceph.client.admin.keyring -m 10.10.20.11:6789 /usr/local/cephfs_directory
    
    ceph-fuse[6687]: starting ceph client
    
    2019-07-14 21:39:09.644181 7fa5be56e040 -1 init, newargv = 0x7fa5c940b500 newargc=9
    
    ceph-fuse[6687]: starting fuse
    
  7. 查看磁盘挂载信息

    [root@CENTOS7-1 mgr-dashboard]# df -h
    
    Filesystem               Size  Used Avail Use% Mounted on
    
    /dev/mapper/centos-root   38G  3.0G   35G   8% /
    
    devtmpfs                 1.9G     0  1.9G   0% /dev
    
    tmpfs                    1.9G     0  1.9G   0% /dev/shm
    
    tmpfs                    1.9G   20M  1.9G   2% /run
    
    tmpfs                    1.9G     0  1.9G   0% /sys/fs/cgroup
    
    /dev/sda1                197M  167M   31M  85% /boot
    
    tmpfs                    378M     0  378M   0% /run/user/0
    
    tmpfs                    1.9G   24K  1.9G   1% /var/lib/ceph/osd/ceph-0
    
    ceph-fuse                 27G     0   27G   0% /usr/local/cephfs_directory
    
    tmpfs                    378M     0  378M   0% /run/user/1000
    
    
    

    /usr/local/cephfs_directory目录已成功挂载。

3.7 客户端连接验证(Rados Java)
  1. 安装好JDK、GIT和MAVEN。

  2. 下载rados java客户端源码

    git clone https://github.com/ceph/rados-java.git

    下载目录位置:

    [root@CENTOS7-1 rados-java]# pwd
    
    /usr/local/sources/rados-java
    
  3. 执行MAVEN安装, 忽略测试用例:

    [root@CENTOS7-1 rados-java]# mvn install -Dmaven.test.skip=true
    

    生成jar包, rados-0.6.0-SNAPSHOT.jar

    [root@CENTOS7-1 target]# ll
    
    total 104
    
    drwxr-xr-x 3 root root     17 Jul 14 19:31 classes
    
    drwxr-xr-x 2 root root     27 Jul 14 19:31 dependencies
    
    drwxr-xr-x 3 root root     25 Jul 14 19:31 generated-sources
    
    drwxr-xr-x 2 root root     28 Jul 14 19:31 maven-archiver
    
    drwxr-xr-x 3 root root     35 Jul 14 19:31 maven-status
    
    -rw-r--r-- 1 root root 105570 Jul 14 19:31 rados-0.6.0-SNAPSHOT.jar
    
  4. 创建软链接, 加入CLASSPATH

    ln -s /usr/local/sources/rados-java/target/rados-0.6.0-SNAPSHOT.jar /usr/local/jdk1.8.0_181/jre/lib/ext/rados-0.6.0-SNAPSHOT.jar
    

    安装jna

    yum -y install jna
    

    创建软链接

    ln -s /usr/share/java/jna.jar /usr/local/jdk1.8.0_181/jre/lib/ext/jna.jar
    

    查看

    [root@CENTOS7-1 target]# ll /usr/local/jdk1.8.0_181/jre/lib/ext/jna.jar 
    
    lrwxrwxrwx 1 root root 23 Jul 14 10:23 /usr/local/jdk1.8.0_181/jre/lib/ext/jna.jar -> /usr/share/java/jna.jar
    
    [root@CENTOS7-1 target]# ll /usr/local/jdk1.8.0_181/jre/lib/ext/rados-0.6.0-SNAPSHOT.jar 
    
    lrwxrwxrwx 1 root root 40 Jul 14 10:25 /usr/local/jdk1.8.0_181/jre/lib/ext/rados-0.6.0-SNAPSHOT.jar -> /usr/share/java/rados-0.6.0-SNAPSHOT.jar
    
  5. 创建JAVA测试类

    CephClient类,注意, 最新版0.6的异常处理包位置已发生变化。

    import com.ceph.rados.Rados;
    
    import com.ceph.rados.exceptions.*;
    
    
    
    import java.io.File;
    
    
    
    public class CephClient {
    
            public static void main (String args[]){
    
    
    
                    try {
    
                            Rados cluster = new Rados("admin");
    
                            System.out.println("Created cluster handle.");
    
    
    
                            File f = new File("/etc/ceph/ceph.conf");
    
                            cluster.confReadFile(f);
    
                            System.out.println("Read the configuration file.");
    
    
    
                            cluster.connect();
    
                            System.out.println("Connected to the cluster.");
    
    
    
                    } catch (RadosException e) {
    
                            System.out.println(e.getMessage() + ": " + e.getReturnValue());
    
                    }
    
            }
    
    }
    
  6. 运行验证

    需要在linux环境下运行,且要在client节点。

    编译并运行:

    [root@CENTOS7-1 sources]# javac CephClient.java 
    
    [root@CENTOS7-1 sources]# java CephClient
    
    Created cluster handle.
    
    Read the configuration file.
    
    Connected to the cluster.
    

    成功与ceph建立连接。

3.8 Ceph与项目集成使用
  1. 工程设计

    演示经常使用的文件上传与下载功能, 看java是如何在项目中使用。

  2. 工程结构

    一个是启动类, 一个是ceph *** 作封装类。

  3. 工程实现

    CephDemoApplication类:

    @SpringBootApplication
    
    public class CephDemoApplication {
    
    
    
        public static void main(String[] args) {
    
    
    
            System.out.println("start....");
    
            String username = "admin";
    
            String monIp = "10.10.20.11:6789;10.10.20.12:6789;10.10.20.13:6789";
    
            String userKey = "AQBZBypdMchvBRAAbWVnIGyYNvxWQZ2UkuiYew==";
    
            String mountPath = "/";
    
            CephOperator cephOperate = null;
    
            try {
    
                String opt = (args == null || args.length < 1)? "" : args[0];
    
                cephOperate = new CephOperator(username, monIp, userKey, mountPath);
    
                if("upload".equals(opt)) {
    
                    cephOperate.uploadFileByPath("/temp_upload_fs", args[1]);
    
                }else if("download".equals(opt)) {
    
                    cephOperate.downloadFileByPath("/temp_download_fs", args[1]);
    
                }else {
    
                    System.out.println("Unrecognized Command! Usage  opt[upload|download] filename[path]!");
    
                }
    
            }catch(Exception e) {
    
                e.printStackTrace();
    
            }finally {
    
                if(null != cephOperate) {
    
                    cephOperate.umount();
    
                }
    
            }
    
            System.out.println("end....");
    
    
    
        }
    
    
    
    }
    

    monIp为ceph client节点连接地址与端口, 支持多个;

    userKey为密钥, 对应ceph.client.admin.keyring中的key值。

    启动接收两个参数, 一个是标识上传或下载, 另一个是标识文件名称。

    CephOperator类:

    
    
    public class CephOperator {
    
    
    
        private CephMount mount;
    
        private String username;
    
        private String monIp;
    
        private String userKey;
    
    
    
        public CephOperator(String username, String monIp, String userKey, String mountPath) {
    
            this.username = username;
    
            this.monIp = monIp;
    
            this.userKey = userKey;
    
            this.mount = new CephMount(username);
    
            this.mount.conf_set("mon_host", monIp);
    
            mount.conf_set("key", userKey);
    
            mount.mount(mountPath);
    
        }
    
    
    
        //查看目录列表
    
        public void listDir(String path) throws IOException {
    
            String[] dirs = mount.listdir(path);
    
            System.out.println("contents of the dir: " + Arrays.asList(dirs));
    
        }
    
    
    
        //新建目录
    
        public void mkDir(String path) throws IOException {
    
            mount.mkdirs(path, 0755);//0表示十进制
    
        }
    
    
    
        //删除目录
    
        public void delDir(String path) throws IOException {
    
            mount.rmdir(path);
    
        }
    
    
    
        //重命名目录or文件
    
        public void renameDir(String oldName, String newName) throws IOException {
    
            mount.rename(oldName, newName);
    
        }
    
    
    
        //删除文件
    
        public void delFile(String path) throws IOException {
    
            mount.unlink(path);
    
        }
    
    
    
    
    
        /**
    
         * 上传指定路径文件
    
         * @param filePath
    
         * @param fileName
    
         * @return
    
         */
    
        public Boolean uploadFileByPath(String filePath, String fileName) {
    
    
    
            // exit with null if not mount
    
            if (this.mount == null) {
    
                return null;
    
            }
    
            // file definition
    
            char pathChar = File.separatorChar;
    
            String fileFullName = "";
    
            Long fileLength = 0l;
    
            Long uploadedLength = 0l;
    
            File file = null;
    
    
    
            // Io
    
            FileInputStream fis = null;
    
    
    
            // get local file info
    
            fileFullName = filePath + pathChar + fileName;
    
            file = new File(fileFullName);
    
            if (!file.exists()) {
    
                return false;
    
            }
    
            fileLength = file.length();
    
    
    
            // get io from local file
    
            try {
    
                fis = new FileInputStream(file);
    
            } catch (FileNotFoundException e) {
    
                e.printStackTrace();
    
            }
    
    
    
            // if file exists or not
    
            String[] dirList = null;
    
            Boolean fileExist = false;
    
            try {
    
                dirList = this.mount.listdir("/");
    
                for (String fileInfo : dirList) {
    
                    if (fileInfo.equals(fileName)) {
    
                        fileExist = true;
    
                    }
    
                }
    
            } catch (FileNotFoundException e) {
    
                e.printStackTrace();
    
            }
    
    
    
            // transfer file by diff pattern
    
            if (!fileExist) {
    
                try {
    
                    // create file and set mode WRITE
    
                    this.mount.open(fileName, CephMount.O_CREAT, 0);
    
                    int fd = this.mount.open(fileName, CephMount.O_RDWR, 0);
    
    
    
                    // start transfer
    
                    int length = 0;
    
                    byte[] bytes = new byte[1024];
    
                    while ((length = fis.read(bytes, 0, bytes.length)) != -1) {
    
                        // write
    
                        this.mount.write(fd, bytes, length, uploadedLength);
    
    
    
                        // update length
    
                        uploadedLength += length;
    
    
    
                        // output transfer rate
    
                        float rate = (float) uploadedLength * 100 / (float) fileLength;
    
                        String rateValue = (int) rate + "%";
    
                        System.out.println(rateValue);
    
    
    
                        // complete flag
    
                        if (uploadedLength == fileLength) {
    
                            break;
    
                        }
    
                    }
    
                    System.out.println("文件传输成功!");
    
    
    
                    // chmod
    
                    this.mount.fchmod(fd, 0666);
    
    
    
                    // close
    
                    this.mount.close(fd);
    
                    if (fis != null) {
    
                        fis.close();
    
                    }
    
                    return true;
    
                } catch (Exception e) {
    
                    e.printStackTrace();
    
                }
    
            } else if (fileExist) {
    
                try {
    
                    // get file length
    
                    CephStat stat = new CephStat();
    
                    this.mount.stat(fileName, stat);
    
                    long lastLen= stat.size;
    
                    int fd = this.mount.open(fileName, CephMount.O_RDWR, 0);
    
    
    
                    // start transfer
    
                    int length = 0;
    
                    byte[] bytes = new byte[1024];
    
    //                fis.skip(uploadedLength);
    
                    long uploadActLen= 0;
    
                    while ((length = fis.read(bytes, 0, bytes.length)) != -1) {
    
                        // write
    
                        this.mount.write(fd, bytes, length, lastLen);
    
    
    
                        // update length
    
                        uploadActLen += length;
    
                        // output transfer rate
    
                        float rate = (float) uploadActLen * 100 / (float) fileLength;
    
                        String rateValue = (int) rate + "%";
    
                        System.out.println(rateValue);
    
    
    
                        // complete flag
    
                        if (uploadActLen == fileLength) {
    
                            break;
    
                        }
    
                    }
    
                    // 多次上传会进行追加
    
                    System.out.println("追加文件传输成功!");
    
    
    
                    // chmod
    
                    this.mount.fchmod(fd, 0666);
    
    
    
                    // close
    
                    this.mount.close(fd);
    
                    if (fis != null) {
    
                        fis.close();
    
                    }
    
                    return true;
    
                } catch (Exception e) {
    
                    e.printStackTrace();
    
                }
    
            } else {
    
                try {
    
                    if (fis != null) {
    
                        fis.close();
    
                    }
    
                } catch (Exception e) {
    
                    e.printStackTrace();
    
                }
    
                return false;
    
            }
    
    
    
            return false;
    
        }
    
    
    
        //set current dir (work dir)
    
        public void setWorkDir(String path) throws IOException {
    
            mount.chdir(path);
    
        }
    
    
    
    
    
        //外部获取mount
    
        public CephMount getMount() {
    
            return this.mount;
    
        }
    
    
    
        //umount
    
        public void umount() {
    
            mount.unmount();
    
        }
    
    
    
    
    
        /**
    
         * 下载文件到指定路径
    
         * @param filePath
    
         * @param fileName
    
         * @return
    
         */
    
        public Boolean downloadFileByPath(String filePath, String fileName) {
    
    
    
            // exit with null if not mount
    
            if (this.mount == null) {
    
                return null;
    
            }
    
    
    
            // file definition
    
            char pathChar = File.separatorChar;
    
            String fileFullName = "";
    
            Long fileLength = 0l;
    
            Long downloadedLength = 0l;
    
            File file = null;
    
    
    
            // IO
    
            FileOutputStream fos = null;
    
            RandomAccessFile raf = null;
    
    
    
            // new file object
    
            fileFullName = filePath + pathChar + fileName;
    
            file = new File(fileFullName);
    
    
    
            // get cephfs file size
    
            try {
    
                CephStat stat = new CephStat();
    
                this.mount.stat(fileName, stat);
    
                fileLength = stat.size;
    
            } catch (Exception e) {
    
                e.printStackTrace();
    
            }
    
    
    
            if (fileLength != 0) {
    
                if (!file.exists()) {
    
                    // download file
    
                    int length = 10240;
    
                    byte[] bytes = new byte[length];
    
                    try {
    
                        int fd = this.mount.open(fileName, CephMount.O_RDONLY, 0);
    
                        fos = new FileOutputStream(file);
    
                        float rate = 0;
    
                        String rateValue = "";
    
                        while ((fileLength - downloadedLength) >= length && (this.mount.read(fd, bytes, (long) length, downloadedLength)) != -1) {
    
                            fos.write(bytes, 0, length);
    
                            fos.flush();
    
                            downloadedLength += (long) length;
    
    
    
                            // output transfer rate
    
                            rate = (float) downloadedLength * 100 / (float) fileLength;
    
                            rateValue = (int) rate + "%";
    
                            System.out.println(rateValue);
    
    
    
                            if (downloadedLength == fileLength) {
    
                                break;
    
                            }
    
                        }
    
                        if (downloadedLength != fileLength) {
    
                            this.mount.read(fd, bytes, fileLength - downloadedLength, downloadedLength);
    
                            fos.write(bytes, 0, (int) (fileLength - downloadedLength));
    
                            fos.flush();
    
                            downloadedLength = fileLength;
    
    
    
                            // output transfer rate
    
                            rate = (float) downloadedLength * 100 / (float) fileLength;
    
                            rateValue = (int) rate + "%";
    
                            System.out.println(rateValue);
    
                        }
    
    
    
                        System.out.println("Download Success!");
    
                        fos.close();
    
                        this.mount.close(fd);
    
                        return true;
    
                    } catch (Exception e) {
    
                        e.printStackTrace();
    
                    }
    
                } else if (file.exists()) {
    
                    // download file
    
                    int length = 10240;
    
                    byte[] bytes = new byte[length];
    
                    Long filePoint = file.length();
    
                    try {
    
                        int fd = this.mount.open(fileName, CephMount.O_RDONLY, 0);
    
                        raf = new RandomAccessFile(file, "rw");
    
                        raf.seek(filePoint);
    
                        downloadedLength = filePoint;
    
                        float rate = 0;
    
                        String rateValue = "";
    
                        while ((fileLength - downloadedLength) >= length && (this.mount.read(fd, bytes, (long) length, downloadedLength)) != -1) {
    
                            raf.write(bytes, 0, length);
    
                            downloadedLength += (long) length;
    
    
    
                            // output transfer rate
    
                            rate = (float) downloadedLength * 100 / (float) fileLength;
    
                            rateValue = (int) rate + "%";
    
                            System.out.println(rateValue);
    
    
    
                            if (downloadedLength == fileLength) {
    
                                break;
    
                            }
    
                        }
    
                        if (downloadedLength != fileLength) {
    
                            this.mount.read(fd, bytes, fileLength - downloadedLength, downloadedLength);
    
                            raf.write(bytes, 0, (int) (fileLength - downloadedLength));
    
                            downloadedLength = fileLength;
    
    
    
                            // output transfer rate
    
                            rate = (float) downloadedLength * 100 / (float) fileLength;
    
                            rateValue = (int) rate + "%";
    
                            System.out.println(rateValue);
    
                        }
    
                        // 如果下载中断, 会从上一次下载结束位置进行上传
    
                        System.out.println("Cut Point Download Success!");
    
                        raf.close();
    
                        this.mount.close(fd);
    
                        return true;
    
                    } catch (Exception e) {
    
                        e.printStackTrace();
    
                    }
    
                } else {
    
                    return false;
    
                }
    
            }else {
    
                System.out.println(" the file is empty!");
    
            }
    
    
    
            return true;
    
    
    
        }
    
    
    
    }
    

    POM文件配置:

    
    
    
    
        
    
        
    
            org.springframework.boot
    
            spring-boot-starter-web
    
        
    
    
    
        
    
        
    
            com.ceph
    
            rados
    
            0.6.0
    
        
    
        
    
        
    
            com.ceph
    
            libcephfs
    
            0.80.5
    
        
    
    
    
    
    
    
    
    
    
    
    
    
    
        
    
            
    
                org.springframework.boot
    
                spring-boot-maven-plugin
    
            
    
        
    
    
    
  4. 测试验证

    通过maven 命令 clean install 打包生成jar文件。

    • rz命令上传至client node服务器

      [root@CENTOS7-1 sources]# ll ceph-demo.jar 
      
      -rw-r--r-- 1 root root 16915296 Jul 14  2019 ceph-demo.jar
      
    • 代码中的上传目录为/temp_upload_fs,创建名为upload.txt的文件,内容为abc123

      [root@CENTOS7-1 sources]# cat /temp_upload_fs/upload.txt
      
      abc123
      
    • 上传至cephfs服务

      [root@CENTOS7-1 sources]# java -jar ceph-demo.jar upload upload.txt
      
      start....
      
      100%
      
      文件传输成功!
      
      end....
      
    • 下载cephfs文件

      文件名称为上传时创建的upload.txt

      [root@CENTOS7-1 sources]# java -jar ceph-demo.jar download upload.txt
      
      start....
      
      100%
      
      Download Success!
      
      end....
      
    • 查看下载内容

      [root@CENTOS7-1 sources]# cat /temp_download_fs/upload.txt 
      
      abc123
      

      通过演示, 可以看到能够通过java client 在项目中成功 *** 作ceph。

  5. FAQ问题

    1. 实践过程当中会有许多问题, 如果运行过程当中出现jni找不到动态库, 需要安装相关依赖: yum -y install libcephfs2 libcephfs_jni-devel 并检查相应的软链接:
    [root@CENTOS7-1 sources]# ll /usr/lib/libcephfs_jni.so.2 
    
    lrwxrwxrwx 1 root root 25 Jul 14 11:34 /usr/lib/libcephfs_jni.so.2 -> /usr/lib64/libcephfs.so.2
    
    1. 如果rados版本0.6.0依赖, 需要手工上传至MAVEN仓库,命令:
    mvn deploy:deploy-file -DgroupId=com.ceph -DartifactId=rados -Dversion=2.0.1 -Dpackaging=jar -Dfile=d:/TestCode/rados-0.6.0-SNAPSHOT.jar -url=http://192.168.19.102:8082/repository/maven-releases/ -DrepositoryId=nexus-releases
    
4. 总结
  • 掌握Ceph的用法,以及与项目的集成使用; CEPH集群安装配置较为复杂, 如果不能掌握, 可以放到最后面的集群专讲章节来学习搭建, 这里只要掌握CEPH的集成使用即可, 给后面项目运用做铺垫, 在后面课程中我们还会对其使用做详细讲解, 后面还会再深入讲解Swift Api封装以及在项目当中的集成使用。
第4章 Netty介绍 1. 目标
  • 了解Netty的基本概念
  • 了解NIO模型与作用
  • 理解Netty的工作机制, Reactor线程模型。
  • 了解Netty的工作原理, 底层相关知识。
2. 分析
  • Netty简介
  • TCP/IP五层模型
  • 什么是NIO
  • Netty特点
  • Netty功能设计
  • I/O复用模型
  • 引进Buffer, 改善I/O数据流
  • 事件驱动模型
  • Reactor线程模型
  • Netty工作线程模型
3. 讲解 3.1 Netty简介

Netty是由JBOSS提供的一个java开源框架。是一个基于NIO的客户、服务器端编程框架,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

3.2 TCP/IP五层模型

什么是Socket套接字编程接口? 是指在 TCP/IP 协议族中,应用层进入传输层的接口, 用套接字编写使用 TCP或UDP 的网络应用程序。

3.2 什么是NIO?
  • 单线程阻塞模式
  • NIO模型

NIO模型中的Selector是Java的非阻塞I/O实现的关键,它使用了事件通知API以确定在一组非阻塞套接字中有哪些已经就绪能够进行I/O相关的 *** 作。因为可以在任何的时间检查任意的读 *** 作或者写 *** 作的完成状态,一个单一的线程便可以处理多个并发的连接。

3.3 Netty特点

JDK 原生也有一套网络应用程序 API,但使用繁杂, 不够完善, 需要消耗开发者较多精力去完善整个NIO场景。Netty对JDK的API进行封装,解决上述主要问题, 主要特点:

  • 设计优雅

    用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池;真正的无连接数据报套接字支持(自 3.1 版本起)。

  • 高性能,吞吐量更高,使用方便

    延迟更低;减少资源消耗;最小化不必要的内存复制。详细记录的 Javadoc,用户指南和示例, 没有其他过多依赖。

  • 安全,社区活跃,不断更新

    社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入。

3.4 Netty功能设计

Netty 功能特性如下:

  • **传输服务,**支持 BIO 和 NIO。
  • **容器集成,**支持 OSGI(开放服务网关协议)、JBossMC、Spring、Guice 容器。
  • **协议支持,**HTTP、Protobuf、二进制、文本、WebSocket 等一系列常见协议都支持。还支持通过实行编码解码逻辑来实现自定义协议。
  • **Core 核心,**可扩展事件模型、通用通信 API、支持零拷贝的 ByteBuf 缓冲对象。
3.5 I/O复用模型

Netty 的非阻塞 I/O 的实现关键是基于 I/O 复用模型 在 I/O 复用模型中,会用到 Select,这个函数也会使进程阻塞,但是和阻塞 I/O 所不同的是这两个函数可以同时对多个 I/O *** 作。而且可以同时对多个读 *** 作,多个写 *** 作的 I/O 函数进行检测,直到有数据可读或可写时,才真正调用 I/O *** 作函数。

3.6 引进Buffer

传统的 I/O 是面向字节流或字符流的,以流式的方式顺序地从一个 Stream 中读取一个或多个字节, 因此也就不能随意改变读取指针的位置。

在 NIO 中,抛弃了传统的 I/O 流,而是引入了 Channel 和 Buffer 的概念。ByteBuffer是除了Selector、Channel之外的另一个很重要的组件;在 NIO 中,只能从 Channel 中读取数据到 Buffer 中或将数据从 Buffer 中写入到 Channel。基于 Buffer *** 作不像传统 IO 的顺序 *** 作,NIO 中可以随意地读取任意位置的数据。

3.7 事件驱动模型

何时创建连接?何时读取数据,什么时候开始对数据包做解码与编码?如何触发调用业务处理逻辑? 各环节如何协调处理, 是采用轮询方式还是事件驱动方式更为高效?

事件驱动模型流程图

主要包含4个组件:

  • **事件队列(event queue):**接收事件的入口,存储待处理事件。
  • **分发器(event mediator):**将不同的事件分发到不同的业务逻辑单元。
  • **事件通道(event channel):**分发器与处理器之间的联系渠道。
  • **事件处理器(event processor):**实现业务逻辑,处理完成后会发出事件,触发下一步 *** 作。

事件驱动的优点:

  • **可扩展性好,**分布式的异步架构,事件处理器之间高度解耦,可以方便扩展事件处理逻辑。
  • **高性能,**基于队列暂存事件,能方便并行异步处理事件。
3.8 Reactor 线程模型

Reactor是一个执行 while (true) { selector.select(); …} 循环的线程,会源源不断的产生新的事件,类似反应堆。 Reactor模型是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫 Dispatcher 模式,即 I/O 多了复用统一监听事件,收到事件后分发(Dispatch 给某进程),是编写高性能网络服务器的必备技术之一。

Reactor 模型中有 2 个关键组成:

  • **Reactor,**Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO 事件做出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人。
  • **Handlers,**处理程序执行 I/O 事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际职员。Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞 *** 作。
3.9 Netty 工作线程模型

Netty 主要基于主从 Reactors 多线程模型(如下图)做了一定的修改,其中主从 Reactor 多线程模型有多个 Reactor:

  • MainReactor 负责客户端的连接请求,并将请求转交给 SubReactor。
  • SubReactor 负责相应通道的 IO 读写请求。
  • 非 IO 请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理。

更多请参考:https://juejin.im/post/5be00763e51d453d4a5cf289 (作者:JavaDoop)

4. 总结
  • Netty是一款比较优秀的NIO开源框架, 这里做了简单介绍, 了解其作用与特点, 同时可以延伸了解NIO机制, 对Netty有更为清晰的认知与了解。
  • 从Netty的功能设计, 了解其核心组件与功能特性;I/O复用模型, 理解其NIO非阻塞式处理机制, 可以再拓展延伸了解selector、poll与epoll模型的差别。
  • 了解Netty的事件驱动模型, 有助于开发运用; 理解其Reactor线程与Netty工作模型, 加深对Netty的处理机制理解, 了解其底层, 有助于我们更好的运用Netty框架,更好的排查实际使用中的问题。

以及与项目的集成使用; CEPH集群安装配置较为复杂, 如果不能掌握, 可以放到最后面的集群专讲章节来学习搭建, 这里只要掌握CEPH的集成使用即可, 给后面项目运用做铺垫, 在后面课程中我们还会对其使用做详细讲解, 后面还会再深入讲解Swift Api封装以及在项目当中的集成使用。

第4章 Netty介绍 1. 目标
  • 了解Netty的基本概念
  • 了解NIO模型与作用
  • 理解Netty的工作机制, Reactor线程模型。
  • 了解Netty的工作原理, 底层相关知识。
2. 分析
  • Netty简介
  • TCP/IP五层模型
  • 什么是NIO
  • Netty特点
  • Netty功能设计
  • I/O复用模型
  • 引进Buffer, 改善I/O数据流
  • 事件驱动模型
  • Reactor线程模型
  • Netty工作线程模型
3. 讲解 3.1 Netty简介

Netty是由JBOSS提供的一个java开源框架。是一个基于NIO的客户、服务器端编程框架,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

3.2 TCP/IP五层模型

[外链图片转存中…(img-zhjwYoqC-1650684168181)]

什么是Socket套接字编程接口? 是指在 TCP/IP 协议族中,应用层进入传输层的接口, 用套接字编写使用 TCP或UDP 的网络应用程序。

3.2 什么是NIO?
  • 单线程阻塞模式 [外链图片转存中…(img-IKSlixLk-1650684168182)]
  • NIO模型 [外链图片转存中…(img-q3o3BabN-1650684168183)]

NIO模型中的Selector是Java的非阻塞I/O实现的关键,它使用了事件通知API以确定在一组非阻塞套接字中有哪些已经就绪能够进行I/O相关的 *** 作。因为可以在任何的时间检查任意的读 *** 作或者写 *** 作的完成状态,一个单一的线程便可以处理多个并发的连接。

3.3 Netty特点

JDK 原生也有一套网络应用程序 API,但使用繁杂, 不够完善, 需要消耗开发者较多精力去完善整个NIO场景。Netty对JDK的API进行封装,解决上述主要问题, 主要特点:

  • 设计优雅

    用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池;真正的无连接数据报套接字支持(自 3.1 版本起)。

  • 高性能,吞吐量更高,使用方便

    延迟更低;减少资源消耗;最小化不必要的内存复制。详细记录的 Javadoc,用户指南和示例, 没有其他过多依赖。

  • 安全,社区活跃,不断更新

    社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入。

3.4 Netty功能设计

[外链图片转存中…(img-zShM391S-1650684168184)]

Netty 功能特性如下:

  • **传输服务,**支持 BIO 和 NIO。
  • **容器集成,**支持 OSGI(开放服务网关协议)、JBossMC、Spring、Guice 容器。
  • **协议支持,**HTTP、Protobuf、二进制、文本、WebSocket 等一系列常见协议都支持。还支持通过实行编码解码逻辑来实现自定义协议。
  • **Core 核心,**可扩展事件模型、通用通信 API、支持零拷贝的 ByteBuf 缓冲对象。
3.5 I/O复用模型

[外链图片转存中…(img-NEbpEM7p-1650684168184)] Netty 的非阻塞 I/O 的实现关键是基于 I/O 复用模型 [外链图片转存中…(img-YmIVqDxB-1650684168185)] 在 I/O 复用模型中,会用到 Select,这个函数也会使进程阻塞,但是和阻塞 I/O 所不同的是这两个函数可以同时对多个 I/O *** 作。而且可以同时对多个读 *** 作,多个写 *** 作的 I/O 函数进行检测,直到有数据可读或可写时,才真正调用 I/O *** 作函数。

3.6 引进Buffer

传统的 I/O 是面向字节流或字符流的,以流式的方式顺序地从一个 Stream 中读取一个或多个字节, 因此也就不能随意改变读取指针的位置。

在 NIO 中,抛弃了传统的 I/O 流,而是引入了 Channel 和 Buffer 的概念。ByteBuffer是除了Selector、Channel之外的另一个很重要的组件;在 NIO 中,只能从 Channel 中读取数据到 Buffer 中或将数据从 Buffer 中写入到 Channel。基于 Buffer *** 作不像传统 IO 的顺序 *** 作,NIO 中可以随意地读取任意位置的数据。

3.7 事件驱动模型

何时创建连接?何时读取数据,什么时候开始对数据包做解码与编码?如何触发调用业务处理逻辑? 各环节如何协调处理, 是采用轮询方式还是事件驱动方式更为高效?

事件驱动模型流程图

[外链图片转存中…(img-CQ5UGCcI-1650684168187)]

主要包含4个组件:

  • **事件队列(event queue):**接收事件的入口,存储待处理事件。
  • **分发器(event mediator):**将不同的事件分发到不同的业务逻辑单元。
  • **事件通道(event channel):**分发器与处理器之间的联系渠道。
  • **事件处理器(event processor):**实现业务逻辑,处理完成后会发出事件,触发下一步 *** 作。

事件驱动的优点:

  • **可扩展性好,**分布式的异步架构,事件处理器之间高度解耦,可以方便扩展事件处理逻辑。
  • **高性能,**基于队列暂存事件,能方便并行异步处理事件。
3.8 Reactor 线程模型

[外链图片转存中…(img-8v7iwB0Q-1650684168188)]

Reactor是一个执行 while (true) { selector.select(); …} 循环的线程,会源源不断的产生新的事件,类似反应堆。 Reactor模型是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫 Dispatcher 模式,即 I/O 多了复用统一监听事件,收到事件后分发(Dispatch 给某进程),是编写高性能网络服务器的必备技术之一。

Reactor 模型中有 2 个关键组成:

  • **Reactor,**Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO 事件做出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人。
  • **Handlers,**处理程序执行 I/O 事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际职员。Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞 *** 作。
3.9 Netty 工作线程模型

[外链图片转存中…(img-nMCocdI8-1650684168189)]

Netty 主要基于主从 Reactors 多线程模型(如下图)做了一定的修改,其中主从 Reactor 多线程模型有多个 Reactor:

  • MainReactor 负责客户端的连接请求,并将请求转交给 SubReactor。
  • SubReactor 负责相应通道的 IO 读写请求。
  • 非 IO 请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理。

更多请参考:https://juejin.im/post/5be00763e51d453d4a5cf289 (作者:JavaDoop)

4. 总结
  • Netty是一款比较优秀的NIO开源框架, 这里做了简单介绍, 了解其作用与特点, 同时可以延伸了解NIO机制, 对Netty有更为清晰的认知与了解。
  • 从Netty的功能设计, 了解其核心组件与功能特性;I/O复用模型, 理解其NIO非阻塞式处理机制, 可以再拓展延伸了解selector、poll与epoll模型的差别。
  • 了解Netty的事件驱动模型, 有助于开发运用; 理解其Reactor线程与Netty工作模型, 加深对Netty的处理机制理解, 了解其底层, 有助于我们更好的运用Netty框架,更好的排查实际使用中的问题。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/727324.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)

保存