storm源码分析(十)

storm源码分析(十),第1张

storm源码分析(十)

文章目录
    • NoneGrouping
    • AllGrouping
    • PartialKeyGrouping

2021SC@SDUSC

NoneGrouping

不关注并行处理负载均衡策略时使用该方式,目前等同于Shuffle Grouping,另外Storm将会把bolt任务和他的上游提供数据的任务安排在同一个线程下。

  public static class NoneGrouping implements CustomStreamGrouping {
 
        private final Random random;
        private List targetTasks;
        private int numTasks;
 
        public NoneGrouper() {
            random = new Random();
        }
 
        @Override
        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) {
            this.targetTasks = targetTasks;
            this.numTasks = targetTasks.size();
        }
 
        @Override
        public List chooseTasks(int taskId, List values) {
            int index = random.nextInt(numTasks);
            return Collections.singletonList(targetTasks.get(index));
        }
    }
 

这里通过random.nextInt(numTasks)随机取task

AllGrouping

广播发送,对于每一个tuple,所有的bolts都会收到 。

public static class AllGrouping implements CustomStreamGrouping {
 
        private List targetTasks;
 
        @Override
        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) {
            this.targetTasks = targetTasks;
        }
 
        @Override
        public List chooseTasks(int taskId, List values) {
            return targetTasks;
        }
    }
 

这里返回所有的targetTasks。

PartialKeyGrouping
public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
    private static final long serialVersionUID = -1672360572274911808L;
    private List targetTasks;
    private Fields fields = null;
    private Fields outFields = null;

    private AssignmentCreator assignmentCreator;
    private TargetSelector targetSelector;

    public PartialKeyGrouping() {
        this(null);
    }

    public PartialKeyGrouping(Fields fields) {
        this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector());
    }

    public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) {
        this(fields, assignmentCreator, new BalancedTargetSelector());
    }

    public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) {
        this.fields = fields;
        this.assignmentCreator = assignmentCreator;
        this.targetSelector = targetSelector;
    }

    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) {
        this.targetTasks = targetTasks;
        if (this.fields != null) {
            this.outFields = context.getComponentOutputFields(stream);
        }
    }

    @Override
    public List chooseTasks(int taskId, List values) {
        List boltIds = new ArrayList<>(1);
        if (values.size() > 0) {
            final byte[] rawKeyBytes = getKeyBytes(values);

            final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes);
            final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey);

            boltIds.add(selectedTask);
        }
        return boltIds;
    }

    private byte[] getKeyBytes(List values) {
        byte[] raw;
        if (fields != null) {
            List selectedFields = outFields.select(fields, values);
            ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);
            for (Object o : selectedFields) {
                if (o instanceof List) {
                    out.putInt(Arrays.deepHashCode(((List) o).toArray()));
                } else if (o instanceof Object[]) {
                    out.putInt(Arrays.deepHashCode((Object[]) o));
                } else if (o instanceof byte[]) {
                    out.putInt(Arrays.hashCode((byte[]) o));
                } else if (o instanceof short[]) {
                    out.putInt(Arrays.hashCode((short[]) o));
                } else if (o instanceof int[]) {
                    out.putInt(Arrays.hashCode((int[]) o));
                } else if (o instanceof long[]) {
                    out.putInt(Arrays.hashCode((long[]) o));
                } else if (o instanceof char[]) {
                    out.putInt(Arrays.hashCode((char[]) o));
                } else if (o instanceof float[]) {
                    out.putInt(Arrays.hashCode((float[]) o));
                } else if (o instanceof double[]) {
                    out.putInt(Arrays.hashCode((double[]) o));
                } else if (o instanceof boolean[]) {
                    out.putInt(Arrays.hashCode((boolean[]) o));
                } else if (o != null) {
                    out.putInt(o.hashCode());
                } else {
                    out.putInt(0);
                }
            }
            raw = out.array();
        } else {
            raw = values.get(0).toString().getBytes(); // assume key is the first field
        }
        return raw;
    }

    

    
    public interface AssignmentCreator extends Serializable {
        int[] createAssignment(List targetTasks, byte[] key);
    }

    
    public interface TargetSelector extends Serializable {
        Integer chooseTask(int[] assignedTasks);
    }

    

    
    public static class RandomTwoTaskAssignmentCreator implements AssignmentCreator {
        
        @Override
        public int[] createAssignment(List tasks, byte[] key) {
        
            // 这需要产生一个基于密钥的确定性赋值。
            final long seedForRandom = Arrays.hashCode(key);
            final Random random = new Random(seedForRandom);
            final int choice1 = random.nextInt(tasks.size());
            int choice2 = random.nextInt(tasks.size());
            
            // 确保选项1和选项2不是同一个任务。
            choice2 = choice1 == choice2 ? (choice2 + 1) % tasks.size() : choice2;
            return new int[]{ tasks.get(choice1), tasks.get(choice2) };
        }
    }

    
    public static class BalancedTargetSelector implements TargetSelector {
        private Map targetTaskStats = Maps.newHashMap();

        
        @Override
        public Integer chooseTask(int[] assignedTasks) {
            Integer taskIdWithMinLoad = null;
            Long minTaskLoad = Long.MAX_VALUE;

            for (Integer currentTaskId : assignedTasks) {
                final Long currentTaskLoad = targetTaskStats.getOrDefault(currentTaskId, 0L);
                if (currentTaskLoad < minTaskLoad) {
                    minTaskLoad = currentTaskLoad;
                    taskIdWithMinLoad = currentTaskId;
                }
            }

            targetTaskStats.put(taskIdWithMinLoad, targetTaskStats.getOrDefault(taskIdWithMinLoad, 0L) + 1);
            return taskIdWithMinLoad;
        }
    }
}
 

在prepare的时候,初始化了long[] targetTaskStats用于统计每个task
partialKeyGrouping如果没有指定fields,则默认按outputFields的第一个field来计算。
BalancedTargetSelector根据选中的taskId,然后根据targetTaskStats计算taskIdWithMinLoad返回。
这里通过RandomTwoTaskAssignmentCreator来选中两个taskId,然后选择使用次数小的那个。
getKeyBytes()方法从输入的Tuple中提取键。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5619131.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)