http ftp 都行,直接网络映射也可以,但是需要对方的 Linux 支持。
所以一般还是 http/ftp 方便很多。
-----------
update:只要两边都支持的文件传送协议就行了。
主要代码:pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.0</version>
</dependency>
<!--ssh2 -->
<dependency>
<groupId>ch.ethz.ganymed</groupId>
<artifactId>ganymed-ssh2</artifactId>
<version>build210</version>
</dependency>
主类:
/**
* Project Name:kafkademo
* File Name:TailLogToKafka.java
* Package Name:cmm.kafkademo
* Date:2018年12月7日下午2:40:46
* Copyright (c) 2018, c2206190880@163.com All Rights Reserved.
*
*/
package cmm.TailLogToKafka
import java.io.BufferedInputStream
import java.io.BufferedReader
import java.io.File
import java.io.FileInputStream
import java.io.IOException
import java.io.InputStream
import java.io.InputStreamReader
import java.util.Properties
import ch.ethz.ssh2.Connection
import ch.ethz.ssh2.Session
import ch.ethz.ssh2.StreamGobbler
import cmm.util.PathUtil
/**
* ClassName:TailLogToKafka <br/>
* Function: TODO ADD FUNCTION. <br/>
* Reason: TODO ADD REASON. <br/>
* Date: 2018年12月7日 下午2:40:46 <br/>
* @author mmchen
* @version
* @sinceJDK 1.7
* @see
*/
public class TailLogToKafka {
private static Connection conn
private static int threadNum=0
private static String cmd = "echo no commond"
private static String topic = "cmmtest"
public static void main(String[] args) {
Properties properties = new Properties()
try {
InputStream in = new BufferedInputStream(new FileInputStream(new File(PathUtil.getProjectParentPath()+"/remote.properties")))
properties.load(in)
} catch (IOException e1) {
System.out.println("远程连接配置读取失败!!!")
e1.printStackTrace()
}
//远程连接linux服务器
String ip = properties.getProperty("ip")
String usr = properties.getProperty("user")
String psword = properties.getProperty("password")
cmd = properties.getProperty("shell")
topic = properties.getProperty("topic")
//创建远程连接,默认连接端口为22,如果不使用默认,可以使用方法
//new Connection(ip, port)创建对象
conn = new Connection(ip)
try {
//连接远程服务器
conn.connect()
//使用用户名和密码登录
conn.authenticateWithPassword(usr, psword)
} catch (IOException e) {
System.err.printf("用户%s密码%s登录服务器%s失败!", usr, psword, ip)
e.printStackTrace()
}
//创建线程,执行shell命令,获取实时数据流,写入kafka
threadNum=1
new Thread(new Runnable() {
@Override
public void run() {
try {
Session session = conn.openSession()
session.execCommand(cmd)
InputStream out = new StreamGobbler(session.getStdout())
BufferedReader outBufferedReader = new BufferedReader(new InputStreamReader(out))
myProducer producerDemo = new myProducer()
while (true) {
String line = outBufferedReader.readLine()
if (line == null) {
threadNum=0
outBufferedReader.close()
session.close()
conn.close()
break
}
System.out.println(line)
//数据写入kafka
producerDemo.produce(topic,line)
}
} catch (IOException e) {
System.out.println("open session fail")
e.printStackTrace()
}
}
}).start()
while (threadNum>0) {
Thread.yield()
}
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)