Hadoop RPC进程通信应用

Hadoop RPC进程通信应用,第1张

Hadoop的RPC是基于socket的半双工通信机制,两边如果需要互相向对方传数据则需要建立两个通道;还要注意同一时间只能存在一个通道,否则双方的通信都会进入阻塞状态 

package com.linux.rpc2;


import org.apache.hadoop.io.Text;

public interface RPCProtocol {

    long versionID = 999;

    public void testProgram(int inputData);

    public void receiveResult(Text res, Text name);

}
package com.linux.rpc2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Scanner;

public class StudentCheck implements RPCProtocol {

//    private static ArrayList students;
    private static HashMap students;
    private static int testInputData;
    private static String trueRes;
    private static RPC.Server server;
    private static Text teacherRes;
    private static Text teacherName;
    private static HashMap results;
//    private static RPCProtocol student02;

    public static void init() throws IOException {
        teacherRes = new Text();
        teacherName = new Text();
        teacherName.set("Teacher");
        students = new HashMap<>();
        results = new HashMap<>();
//        students = new ArrayList<>();

        server = new RPC.Builder(new Configuration())
                .setBindAddress("localhost")
                .setPort(9870)
                .setProtocol(RPCProtocol.class)
                .setInstance(new StudentCheck())
                .build();
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        init();
        Scanner scanner = new Scanner(System.in);

        server.start();
        System.out.println("StudentCheck main thread is running");
//        Thread.sleep(10000);
        RPCProtocol studentCheck = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9870), new Configuration());
        RPCProtocol student01 = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9871), new Configuration());
        RPCProtocol student02 = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9872), new Configuration());
        RPCProtocol student03 = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9873), new Configuration());

        System.out.println("Please input your problem data");
        testInputData = scanner.nextInt();
        studentCheck.testProgram(testInputData);
//        students.add(studentCheck);  // 必须最前面
//        students.add(student01);
//        students.add(student02);
//        students.add(student03);

        students.put("Student01", student01);
        students.put("Student02", student02);
        students.put("Student03", student03);

//        for (RPCProtocol student : students) {
//            student.testProgram(testInputData);
//            Thread.sleep(100);
//        }
        for (RPCProtocol student : students.values()) {
            Thread.sleep(100);
            student.testProgram(testInputData);
        }

        Thread.sleep(500);

        for (String student : results.keySet()) {
            if (results.get(student)) {
                teacherRes.set("right");
            } else {
                teacherRes.set("wrong");
            }
            students.get(student).receiveResult(teacherRes, teacherName);
        }
    }

    @Override
    public void testProgram(int inputData) {
        trueRes = Integer.toBinaryString(inputData);
    }

    @Override
    public void receiveResult(Text res, Text name) {
        String studentName = name.toString();
//        System.out.println(studentName);
        if (trueRes.equals(res.toString())) {
            System.out.println(new Date(Time.now()) + "\t" + studentName + "'s result is right");;
            results.put(studentName, true);
        } else {
            System.out.println(new Date(Time.now()) + "\t" + studentName + "'s result is wrong");
            results.put(studentName, false);
        }
//        student02.receiveResult(res, teacherName);
//        try {
//            RPCProtocol student02 = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9872), new Configuration());
//            student02.receiveResult(res, name);
//        } catch (IOException e) {
//            e.printStackTrace();
//        }

    }
}
package com.linux.rpc2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;

public class Student01 implements RPCProtocol {

    private static RPCProtocol studentClient;
    private static Text name;
    private static Text resOutput;

    public static void main(String[] args) throws IOException, InterruptedException {
        name = new Text();
        name.set("Student01");
        resOutput = new Text();

        RPC.Server server = new RPC.Builder(new Configuration())
                .setBindAddress("localhost")
                .setPort(9871)
                .setProtocol(RPCProtocol.class)
                .setInstance(new Student01())
                .build();

        server.start();
        System.out.println("Student01 main thread is running");

    }


    @Override
    public void testProgram(int inputData) {
        try {
            studentClient = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9870), new Configuration());
        } catch (IOException e) {
            e.printStackTrace();
        }
        int res;
        ArrayList stack = new ArrayList<>();

        while (inputData > 0) {
            res = inputData % 2;
            stack.add(res);
            inputData /= 2;
        }

        String resData = "";
        for (int i = stack.size() - 1; i >= 0 ; i--) {
            resData += stack.get(i);
        }

        resOutput.set(resData);
        studentClient.receiveResult(resOutput, name);

    }

    @Override
    public void receiveResult(Text res, Text name) {
        System.out.println(new Date(Time.now()) + "\t" + name.toString() + " say the answer is " + res.toString());
    }
}

package com.linux.rpc2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;

public class Student02 implements RPCProtocol {

    private static RPCProtocol studentClient;
    private static Text name;
    private static Text resOutput;

    public static void main(String[] args) throws IOException, InterruptedException {
        name = new Text();
        name.set("Student02");
        resOutput = new Text();

        RPC.Server server = new RPC.Builder(new Configuration())
                .setBindAddress("localhost")
                .setPort(9872)
                .setProtocol(RPCProtocol.class)
                .setInstance(new Student02())
                .build();

        server.start();
        System.out.println("Student02 main thread is running");

    }

    @Override
    public void testProgram(int inputData) {
        try {
            studentClient = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9870), new Configuration());
        } catch (IOException e) {
            e.printStackTrace();
        }
        int res;
        ArrayList stack = new ArrayList<>();

        while (inputData > 0) {
            res = inputData % 2;
            stack.add(res);
            inputData /= 2;
        }

        String resData = "";
        for (int i = stack.size() - 1; i >= 0 ; i--) {
            resData += stack.get(i);
        }

        resOutput.set(resData);
        studentClient.receiveResult(resOutput, name);
    }

    @Override
    public void receiveResult(Text res, Text name) {
        System.out.println(new Date(Time.now()) + "\t" + name.toString() + " say the answer is " + res.toString());
    }
}

package com.linux.rpc2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Time;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Date;

public class Student03 implements RPCProtocol {

    private static RPCProtocol studentClient;
    private static Text name;
    private static Text resOutput;

    public static void main(String[] args) throws IOException, InterruptedException {
        name = new Text();
        name.set("Student03");
        resOutput = new Text();

        RPC.Server server = new RPC.Builder(new Configuration())
                .setBindAddress("localhost")
                .setPort(9873)
                .setProtocol(RPCProtocol.class)
                .setInstance(new Student03())
                .build();

        server.start();
        System.out.println("Student03 main thread is running");

    }

    @Override
    public void testProgram(int inputData) {
        try {
            studentClient = RPC.getProxy(RPCProtocol.class, RPCProtocol.versionID, new InetSocketAddress("localhost", 9870), new Configuration());
        } catch (IOException e) {
            e.printStackTrace();
        }
        int res;
        ArrayList stack = new ArrayList<>();

        while (inputData > 0) {
            res = inputData % 2;
            stack.add(res);
            inputData /= 2;
        }

        String resData = "";
        for (int i = 0; i <  stack.size(); i++) {
            resData += stack.get(i);
        }

        resOutput.set(resData);
        studentClient.receiveResult(resOutput, name);
    }

    @Override
    public void receiveResult(Text res, Text name) {
        System.out.println(new Date(Time.now()) + "\t" + name.toString() + " say the answer is " + res.toString());
    }
}

 

 

 

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/871990.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存