遇到的问题:
对一个多节点的集群的某个节点重启后,该节点无法正常加入ZK集群。
具体现象bin/zkServer.sh status
JMX enabled by default
......
Error contacting service. It is probably not running.
该节点的 zk进程处于运行状态,日志持续出现此类信息:
“Have smaller server identifier, so dropping the connections: (3,2)”;
“Notification timeout: 60000”
临时解决方案重启其他节点的zk进程,则该节点能正常加入zk集群。
根本原因zookeeper 某版本的bug:https://issues.apache.org/jira/browse/ZOOKEEPER-2186
Zookeeper启动时,会创建一个Listener子线程,该子线程创建绑定端口为选举端口的SocketServer,该SocketServer监听ZK集群其他peer节点socket连接,接收并进行处理。
public class Listener extends Thread {
volatile ServerSocket ss = null;
/**
* Sleeps on accept().
*/
@Override
public void run() {
int numRetries = 0;
InetSocketAddress addr;
while((!shutdown) && (numRetries < 3)){
try {
ss = new ServerSocket();
ss.setReuseAddress(true);
if (self.getQuorumListenOnAllIPs()) {
int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
addr = new InetSocketAddress(port);
} else {
addr = self.quorumPeers.get(self.getId()).electionAddr;
}
LOG.info("My election bind port: " + addr.toString());
setName(self.quorumPeers.get(self.getId()).electionAddr
.toString());
ss.bind(addr);
while (!shutdown) {
Socket client = ss.accept();
setSockOpts(client);
LOG.info("Received connection request "
+ client.getRemoteSocketAddress());
receiveConnection(client);
numRetries = 0;
}
} catch (IOException e) {
LOG.error("Exception while listening", e);
numRetries++;
try {
ss.close();
Thread.sleep(1000);
} catch (IOException ie) {
LOG.error("Error closing server socket", ie);
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping. " +
"Ignoring exception", ie);
}
}
}
LOG.info("Leaving listener");
if (!shutdown) {
LOG.error("As I'm leaving the listener thread, "
+ "I won't be able to participate in leader "
+ "election any longer: "
+ self.quorumPeers.get(self.getId()).electionAddr);
}
}
/**
* Halts this listener thread.
*/
void halt(){
try{
LOG.debug("Trying to close listener: " + ss);
if(ss != null) {
LOG.debug("Closing listener: " + self.getId());
ss.close();
}
} catch (IOException e){
LOG.warn("Exception when shutting down listener: " + e);
}
}
}
当Listener线程接收到其他peer节点发送socket连接时,日志会打印“Received connection request xxxx”信息,但此时集群所有节点均没有该日志信息。异常节点的日志持续输出“Have smaller server identifier, so dropping the connections: (3,2)”,“Notification timeout: 60000”。这些日志表明异常节点在持续发送选举notification信息至节点3。正常情况下,节点3接收到server id比它小的异常节点的socket连接时,会关闭socket,并主动发起socket连接至异常节点。但此时,节点3没有日志显示接收到异常节点发来的socket连接。异常节点的日志也没有显示连接失败Exception信息。尽管Listener线程crash后,其主线程依然在运行,相应的选举端口依然处于listen状态,客户端虽然能建立socket连接,但Listener线程已无法处理请求,导致选举失败。通过重启其他节点 zk服务,让Listener恢复至正常监听状态,异常节点即可选举成功。
public boolean receiveConnection(Socket sock) {
Long sid = null;
try {
// Read server id
DataInputStream din = new DataInputStream(sock.getInputStream());
sid = din.readLong();
if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
sid = din.readLong();
// next comes the #bytes in the remainder of the message
int num_remaining_bytes = din.readInt();
byte[] b = new byte[num_remaining_bytes];
// remove the remainder of the message from din
int num_read = din.read(b);
if (num_read != num_remaining_bytes) {
LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
}
}
if (sid == QuorumPeer.OBSERVER_ID) {
/*
* Choose identifier at random. We need a value to identify
* the connection.
*/
sid = observerCounter--;
LOG.info("Setting arbitrary identifier to observer: " + sid);
}
} catch (IOException e) {
closeSocket(sock);
LOG.warn("Exception reading or writing challenge: " + e.toString());
return false;
}
//If wins the challenge, then close the new connection.
if (sid < self.getId()) {
/*
* This replica might still believe that the connection to sid is
* up, so we have to shut down the workers before trying to open a
* new connection.
*/
SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
/*
* Now we start a new connection
*/
LOG.debug("Create new connection to server: " + sid);
closeSocket(sock);
connectOne(sid);
// Otherwise start worker threads to receive data.
} else {
SendWorker sw = new SendWorker(sock, sid);
RecvWorker rw = new RecvWorker(sock, sid, sw);
sw.setRecv(rw);
SendWorker vsw = senderWorkerMap.get(sid);
if(vsw != null)
vsw.finish();
senderWorkerMap.put(sid, sw);
if (!queueSendMap.containsKey(sid)) {
queueSendMap.put(sid, new ArrayBlockingQueue(
SEND_CAPACITY));
}
sw.start();
rw.start();
return true;
}
return false;
}
对于Listener子线程crash掉的情况下,为什么Socket客户端还能正常与ServerSocket建立连接,且没有抛出连接异常信息?我做一个演示。先创建一个主线程,并在主线程下创建ServerSocket子线程。最后通过Socket客户端测试该问题。
主线程类MainThread,在该主线程类中,创建一个子线程SimpleSocketServer,监听端口3888,接收到的socket请求,调用receiveConnection方法进行处理:
public class MainThread extends Thread {
@Override
public void run() {
int port = 3888;
System.out.println("Start server on port: " + port);
SimpleSocketServer simpleSocketServer = new SimpleSocketServer(port);
simpleSocketServer.start();
while (true) {
System.out.println("MainThread running...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class SimpleSocketServer extends Thread {
private ServerSocket serverSocket;
private int port;
private boolean running = false;
public SimpleSocketServer(int port) {
this.port = port;
}
public void stopServer() {
running = false;
this.interrupt();
}
@Override
public void run() {
try {
serverSocket = new ServerSocket(port);
} catch (IOException e) {
e.printStackTrace();
}
running = true;
while (running) {
try {
System.out.println("Listening for a connection");
// 调用accept()方法阻塞在这里接收下一个连接
Socket socket = serverSocket.accept();
System.out.println("Received a connection");
receiveConnection(socket);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void receiveConnection(Socket sock) {
Long sid = null;
try {
// Read server id
DataInputStream din = new DataInputStream(sock.getInputStream());
sid = din.readLong();
System.out.println("1 sid: " + sid);
if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
sid = din.readLong();
System.out.println("2 sid: " + sid);
// next comes the #bytes in the remainder of the message
int num_remaining_bytes = din.readInt();
System.out.println("num_remaining_bytes: " + num_remaining_bytes);
byte[] b = new byte[num_remaining_bytes];
// remove the remainder of the message from din
int num_read = din.read(b);
System.out.println("num_read: " + num_read);
if (num_read != num_remaining_bytes) {
System.out.println("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
}
}
} catch (IOException e) {
closeSocket(sock);
System.out.format("Exception reading or writing challenge: \n");
e.printStackTrace();
return;
}
}
private void closeSocket(Socket sock) {
try {
sock.close();
} catch (IOException ie) {
System.out.format("Exception while closing %s \n", ie.toString());
}
}
}
Main类:
public class Main {
public static void main(String[] args) {
MainThread mainThread = new MainThread();
mainThread.start();
}
}
把主线程运行起来,console输出信息:
Start server on port: 3888
MainThread running...
Listening for a connection
Socket客户端类,该类与SocketServer建立连接后,发送特定字符串:
public class SocketClient {
public static void main(String[] args) {
try {
Socket socket = new Socket("localhost", 3888);
System.out.println("Connected!");
DataOutputStream out = new DataOutputStream(socket.getOutputStream());
// protocol version - a negative number
out.writeLong(0xffff0000);
// server id
out.writeLong(new Long(2));
// addr
String addr = "1.2.3.4:3888";
byte[] addrBytes = addr.getBytes();
out.writeInt(addrBytes.length);
out.write(addrBytes);
out.flush();
System.out.println("Sent!");
} catch (Exception e) {
System.out.println("捕获Exception");
e.printStackTrace();
}
}
}
运行正常的SocketClient,SimpleSocketServer线程接收到请求,进行正常处理,控制台输出信息:
Received a connection
1 sid: -65536
2 sid: 2
num_remaining_bytes: 12
num_read: 12
这时,修改SocketClient的代码,往SimpleSocketServer发送一个很大的int数值,
out.writeInt(2147483647);
public class SocketClient {
public static void main(String[] args) {
try {
Socket socket = new Socket("localhost", 3888);
System.out.println("Connected!");
DataOutputStream out = new DataOutputStream(socket.getOutputStream());
// protocol version - a negative number
out.writeLong(0xffff0000);
// server id
out.writeLong(new Long(2));
// addr
String addr = "1.2.3.4:3888";
byte[] addrBytes = addr.getBytes();
// out.writeInt(addrBytes.length);
// 发送一个很大的int数值
out.writeInt(2147483647);
out.write(addrBytes);
out.flush();
System.out.println("Sent!");
} catch (Exception e) {
System.out.println("捕获Exception");
e.printStackTrace();
}
}
}
SimpleSocketServer抛出异常信息:
Received a connection
1 sid: -65536
2 sid: 2
num_remaining_bytes: 2147483647
Exception in thread "Thread-1" java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at com.demo.SimpleSocketServer.receiveConnection(SimpleSocketServer.java:108)
at com.demo.SimpleSocketServer.run(SimpleSocketServer.java:48)
SocketClient尝试再次连接SimpleSocketServer,连接无显示任何异常信息,且telnet localhost 3888也能通过,但服务端再也接收不到客户端所发来的任何信息!
回到本文的bug,在新版本的zookeeper得到了修复,修复代码为:
// next comes the #bytes in the remainder of the message
// note that 0 bytes is fine (old servers)
int num_remaining_bytes = din.readInt();
// 如果num_remaining_bytes超出最大值,就抛出ERROR日志信息,并返回
if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) {
LOG.error("Unreasonable buffer length: {}", num_remaining_bytes);
closeSocket(sock);
return;
}
byte[] b = new byte[num_remaining_bytes];
// remove the remainder of the message from din
// 执行这一步的时候,b字节组则为一个合理长度的字节数组,最后把din的字节内容读入至b中,并返回读取到内容的长度
int num_read = din.read(b);
if (num_read != num_remaining_bytes) {
LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)