2. 步骤
本文重点接收的是持久化到MySQL中的配置方式:
2.1 添加MySQL驱动
首先需要把MySql的驱动放到ActiveMQ的Lib目录下,我用的文件名字是:mysql-connector-java-5.1.30-bin.jar
2.2 修改配置文件activemq.xml
将其中的这段配置:
修改为下面这段内容:
3、另外需要在节点的下面定义id为mysql-ds的bean,如下:
4、 新建数据库
从配置中可以看出数据库的名称是activemq,需要手动在MySql中新建一个activemq的空数据库。
此时,重新启动MQ,就会发现activemq库中多了三张表:activemq_acks,activemq_lock,activemq_msgs,OK,说明已经持久化成功啦!
首先明确说明它不是数据库,它没有schema,也没有表,更没有索引。它仅仅是生产消息流、消费消息流而已。从这个角度来说Kafka的确不像数据库,至少不像我们熟知的关系型数据库。那么到底什么是数据库呢?或者说什么特性使得一个系统可以被称为数据库?经典的教科书是这么说的:数据库是提供 ACID 特性的,即atomicity、consistency、isolation和durability。好了,现在问题演变成了Apache Kafka支持ACID吗?如果它支持,Kafka又是怎么支持的呢?要回答这些问题,我们依次讨论下ACID。
1、持久性(durability)
我们先从最容易的持久性开始说起,因为持久性最容易理解。在80年代持久性指的是把数据写入到磁带中,这是一种很古老的存储设备,现在应该已经绝迹了。目前实现持久性更常见的做法是将数据写入到物理磁盘上,而这也只能实现单机的持久性。当演进到分布式系统时代后,持久性指的是将数据通过备份机制拷贝到多台机器的磁盘上。很多数据库厂商都有自己的分布式系统解决方案,如GreenPlum和Oracle RAC。它们都提供了这种多机备份的持久性。和它们类似,Apache Kafka天然也是支持这种持久性的,它提供的副本机制在实现原理上几乎和数据库厂商的方案是一样的。
2、原子性(atomicity)
数据库中的原子性和多线程领域内的原子性不是一回事。我们知道在Java中有AtomicInteger这样的类能够提供线程安全的整数 *** 作服务,这里的atomicity关心的是在多个线程并发的情况下如何保证正确性的问题。而在数据库领域,原子性关心的是如何应对错误或异常情况,特别是对于事务的处理。如果服务发生故障,之前提交的事务要保证已经持久化,而当前运行的事务要终止(abort),它执行的所有 *** 作都要回滚,最终的状态就好像该事务从未运行过那样。举个实际的例子,
第三个方法是采用基于日志结构的消息队列来实现,比如使用Kafka来做,如下图所示:
在这个架构中app仅仅是向Kafka写入消息,而下面的数据库、cache和index作为独立的consumer消费这个日志——Kafka分区的顺序性保证了app端更新 *** 作的顺序性。如果某个consumer消费速度慢于其他consumer也没关系,毕竟消息依然在Kafka中保存着。总而言之,有了Kafka所有的异质系统都能以相同的顺序应用app端的更新 *** 作,从而实现了数据的最终一致性。这种方法有个专属的名字,叫capture data change,也称CDC。
3、隔离性(isolation)
在传统的关系型数据库中最强的隔离级别通常是指serializability,国内一般翻译成可串行化或串行化。表达的思想就是连接数据库的每个客户端在执行各自的事务时数据库会给它们一个假象:仿佛每个客户端的事务都顺序执行的,即执行完一个事务之后再开始执行下一个事务。其实数据库端同时会处理多个事务,但serializability保证了它们就像单独执行一样。举个例子,在一个论坛系统中,每个新用户都需要注册一个唯一的用户名。一个简单的app实现逻辑大概是这样的:
4、一致性(consistency)
最后说说一致性。按照Kelppmann大神的原话,这是一个很奇怪的属性:在所有ACID特性中,其他三项特性的确属于数据库层面需要实现或保证的,但只有一致性是由用户来保证的。严格来说,它不属于数据库的特性,而应该属于使用数据库的一种方式。坦率说第一次听到这句话时我本人还是有点震惊的,因为从没有往这个方面考虑过,但仔细想想还真是这么回事。比如刚才的注册用户名的例子中我们要求每个用户名是唯一的。这种一致性约束是由我们用户做出的,而不是数据库本身。数据库本身并不关心或并不知道用户名是否应该是唯一的。针对Kafka而言,这种一致性又意味着什么呢?Kelppmann没有具体展开,但我个人认为他应该指的是linearizability、消息顺序之间的一致性以及分布式事务。幸运的是,Kafka的备份机制实现了linearizability和total order broadcast,而且在Kafka 0.11开始也支持分布式事务了。
1.读取表格
import java.io.Fileimport java.io.FileInputStream
import java.io.FileNotFoundException
import java.io.IOException
import java.util.Iterator
import org.apache.poi.hssf.usermodel.HSSFWorkbook
import org.apache.poi.ss.usermodel.Cell
import org.apache.poi.ss.usermodel.Row
import org.apache.poi.ss.usermodel.Sheet
import org.apache.poi.ss.usermodel.Workbook
import org.apache.poi.xssf.usermodel.XSSFWorkbook
public class Read {
public Read(){
}
public static void start(String path) throws FileNotFoundException, IOException{
Workbook book = getWorkBook(path)
Sheet sheet = getSheets(book)
SheetIterator(sheet)
}
private static void SheetIterator(Sheet sheet){
Iterator<Row> iterator = sheet.iterator()
while(iterator.hasNext()){
Row nextRow = iterator.next()
if(nextRow.getRowNum()<3){
continue
}
Iterator<Cell> cellIterator = nextRow.cellIterator()
while (cellIterator.hasNext()) {
Cell cell = cellIterator.next()
switch (cell.getCellType()) {
case Cell.CELL_TYPE_STRING:
System.out.print(cell.getStringCellValue())
break
case Cell.CELL_TYPE_BOOLEAN:
System.out.print(cell.getBooleanCellValue())
break
case Cell.CELL_TYPE_NUMERIC:
System.out.print(cell.getNumericCellValue())
break
}
System.out.print(" ")
}
System.out.println(" ")
}
System.out.println(" ")
}
private static Sheet getSheets(Workbook book) {
return(Sheet) book.getSheetAt(0)
}
public static Workbook getWorkBook(String path) throws FileNotFoundException, IOException{
return path.endsWith(".xls")?(new HSSFWorkbook(new FileInputStream(new File(path)))):(path.endsWith(".xlsx")?(new XSSFWorkbook(new FileInputStream(new File(path)))):(null))
}
}
model
public class Course implements Serializable {/**
*
*/
private static final long serialVersionUID = -993930170402063910L
private Integer serial
private String dept
private String major
private String course_name
private String credit
private String natrue
private String test_way
private String teacher_in_charge
public Course(){
super()
}
public Integer getSerial() {
return serial
}
public void setSerial(Integer serial) {
this.serial = serial
}
public String getDept() {
return dept
}
public void setDept(String dept) {
this.dept = dept
}
public String getMajor() {
return major
}
public void setMajor(String major) {
this.major = major
}
public String getCourse_name() {
return course_name
}
public void setCourse_name(String course_name) {
this.course_name = course_name
}
public String getCredit() {
return credit
}
public void setCredit(String credit) {
this.credit = credit
}
public String getNatrue() {
return natrue
}
public void setNatrue(String natrue) {
this.natrue = natrue
}
public String getTest_way() {
return test_way
}
public void setTest_way(String test_way) {
this.test_way = test_way
}
public String getTeacher_in_charge() {
return teacher_in_charge
}
public void setTeacher_in_charge(String teacher_in_charge) {
this.teacher_in_charge = teacher_in_charge
}
}
2.保存到数据库:
import java.io.Fileimport java.io.FileInputStream
import java.io.FileNotFoundException
import java.io.IOException
import java.util.Iterator
import org.apache.poi.hssf.usermodel.HSSFWorkbook
import org.apache.poi.ss.usermodel.Cell
import org.apache.poi.ss.usermodel.Row
import org.apache.poi.ss.usermodel.Sheet
import org.apache.poi.ss.usermodel.Workbook
import org.apache.poi.xssf.usermodel.XSSFWorkbook
import org.lee.dao.Imp.BookImp
import org.lee.dao.Imp.CourseImp
import org.lee.model.Book
import org.lee.model.Course
public class Insert {
public Insert() {
}
public static void start(String path) throws FileNotFoundException, IOException {
Workbook book = getWorkBook(path)
Sheet sheet = getSheets(book)
System.out.println("sheet name:" + sheet.getSheetName())
SheetIterator(sheet)
}
private static void SheetIterator(Sheet sheet) {
Iterator<Row> iterator = sheet.iterator()
while (iterator.hasNext()) {
Row nextRow = iterator.next()
if (nextRow.getRowNum() < 3) {
continue
}
Iterator<Cell> cellIterator = nextRow.cellIterator()
StringBuffer course_str = new StringBuffer("")
StringBuffer main_book_str = new StringBuffer("")
StringBuffer sub_book_str = new StringBuffer("")
while (cellIterator.hasNext()) {
Cell cell = cellIterator.next()
nextRow.getCell(cell.getColumnIndex()).setCellType(Cell.CELL_TYPE_STRING)
if (cell.getColumnIndex() < 8) {
Object course_obj = cell.getStringCellValue()
course_str.append(course_obj + "-")
} else if (cell.getColumnIndex() >= 8 && cell.getColumnIndex() < 16) {
Object main_book_obj = cell.getStringCellValue()
main_book_str.append(main_book_obj + ":")
} else if (cell.getColumnIndex() >= 16 && cell.getColumnIndex() <= 22) {
Object sub_book_obj = cell.getStringCellValue()
sub_book_str.append(sub_book_obj + ":")
}
}
Object[] course_obj = new Object[20]
Object[] main_book_obj = new Object[20]
Object[] sub_book_obj = new Object[20]
String c_str = new String(course_str)
System.out.println(c_str)
String m_str = new String(main_book_str)
String s_str = new String(sub_book_str)
course_obj = c_str.split("-")
main_book_obj = m_str.split(":")
sub_book_obj = s_str.split(":")
System.out.println(course_str)
System.out.println(main_book_str)
System.out.println(sub_book_str)
Course course = new Course()
course.setSerial(Integer.parseInt(course_obj[0].toString()))
course.setDept(course_obj[1].toString())
course.setMajor(course_obj[2].toString())
course.setCourse_name(course_obj[3].toString())
course.setCredit(course_obj[4].toString())
course.setNatrue(course_obj[5].toString())
course.setTest_way(course_obj[6].toString())
course.setTeacher_in_charge(course_obj[7].toString())
Book main_book = new Book()
main_book.setBook_num(main_book_obj[0].toString())
main_book.setBook_name(main_book_obj[1].toString())
main_book.setPublishing(main_book_obj[2].toString())
main_book.setAuthor(main_book_obj[3].toString())
main_book.setPublish_date(main_book_obj[4].toString())
main_book.setIsGaozhi(main_book_obj[5].toString())
main_book.setIsSanlei(main_book_obj[6].toString())
main_book.setT_book_numbers(main_book_obj[7].toString())
Book sub_book = new Book()
sub_book.setBook_num(sub_book_obj[0].toString())
sub_book.setBook_name(sub_book_obj[1].toString())
sub_book.setPublishing(sub_book_obj[2].toString())
sub_book.setAuthor(sub_book_obj[3].toString())
sub_book.setPublish_date(sub_book_obj[4].toString())
sub_book.setIsGaozhi(sub_book_obj[5].toString())
sub_book.setIsSanlei(sub_book_obj[6].toString())
CourseImp c = new CourseImp()
BookImp b = new BookImp()
try {
c.save(course)
b.save(main_book)
b.save(sub_book)
} catch (Exception e) {
e.printStackTrace()
}
System.out.println(" ")
}
System.out.println(" ")
}
private static Sheet getSheets(Workbook book) {
return (Sheet) book.getSheetAt(0)
}
public static Workbook getWorkBook(String path) throws FileNotFoundException, IOException {
return path.endsWith(".xls") ? (new HSSFWorkbook(new FileInputStream(new File(path))))
: (path.endsWith(".xlsx") ? (new XSSFWorkbook(new FileInputStream(new File(path)))) : (null))
}
}
3.数据库连接
import java.io.IOExceptionimport java.io.InputStream
import java.sql.Connection
import java.sql.DriverManager
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.SQLException
import java.util.Properties
public class DBUtil {
static String driver
static String username
static String pwd
static String url
static {
try {
ClassLoader classLoader = DBUtil.class.getClassLoader()
InputStream is = classLoader.getResourceAsStream("config/props/db.properties")
// System.out.println(is.available())
Properties props = new Properties()
props.load(is)
url = props.getProperty("url")
username = props.getProperty("user")
pwd = props.getProperty("password")
driver = props.getProperty("driver")
Class.forName(driver)
} catch (IOException e) {
e.printStackTrace()
} catch (ClassNotFoundException e) {
e.printStackTrace()
}
}
public static Connection getConnection() throws SQLException {
Connection conn = (Connection) DriverManager.getConnection(url, username, pwd)
if (conn == null) {
System.out.println("Failed to connect database...")
} else {
System.out.println("database connected successful...")
}
return conn
}
public static void release(ResultSet rs, PreparedStatement sta, Connection conn) {
if (rs != null) {
try {
rs.close()
} catch (SQLException e) {
e.printStackTrace()
}
}
if (sta != null) {
try {
sta.close()
} catch (SQLException e) {
e.printStackTrace()
}
}
if (conn != null) {
try {
conn.close()
} catch (SQLException e) {
e.printStackTrace()
}
}
System.out.println("Resource release successful...")
}
public static void release(PreparedStatement sta, Connection conn) {
if (sta != null) {
try {
sta.close()
} catch (SQLException e) {
e.printStackTrace()
}
}
if (conn != null) {
try {
conn.close()
} catch (SQLException e) {
e.printStackTrace()
}
}
System.out.println("Resource release successful...")
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)