storm源码分析(十)
文章目录
NoneGrouping AllGrouping PartialKeyGrouping
2021SC@SDUSC
NoneGrouping
不关注并行处理负载均衡策略时使用该方式,目前等同于Shuffle Grouping,另外Storm将会把bolt任务和他的上游提供数据的任务安排在同一个线程下。
public static class NoneGrouping implements CustomStreamGrouping {
private final Random random;
private List targetTasks;
private int numTasks;
public NoneGrouper() {
random = new Random();
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) {
this.targetTasks = targetTasks;
this.numTasks = targetTasks.size();
}
@Override
public List chooseTasks(int taskId, List values) {
int index = random.nextInt(numTasks);
return Collections.singletonList(targetTasks.get(index));
}
}
这里通过random.nextInt(numTasks)随机取task
AllGrouping
广播发送,对于每一个tuple,所有的bolts都会收到 。
public static class AllGrouping implements CustomStreamGrouping {
private List targetTasks;
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) {
this.targetTasks = targetTasks;
}
@Override
public List chooseTasks(int taskId, List values) {
return targetTasks;
}
}
这里返回 所有的targetTasks。
PartialKeyGrouping
public class PartialKeyGrouping implements CustomStreamGrouping, Serializable {
private static final long serialVersionUID = -1672360572274911808L;
private List targetTasks;
private Fields fields = null;
private Fields outFields = null;
private AssignmentCreator assignmentCreator;
private TargetSelector targetSelector;
public PartialKeyGrouping() {
this(null);
}
public PartialKeyGrouping(Fields fields) {
this(fields, new RandomTwoTaskAssignmentCreator(), new BalancedTargetSelector());
}
public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator) {
this(fields, assignmentCreator, new BalancedTargetSelector());
}
public PartialKeyGrouping(Fields fields, AssignmentCreator assignmentCreator, TargetSelector targetSelector) {
this.fields = fields;
this.assignmentCreator = assignmentCreator;
this.targetSelector = targetSelector;
}
@Override
public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List targetTasks) {
this.targetTasks = targetTasks;
if (this.fields != null) {
this.outFields = context.getComponentOutputFields(stream);
}
}
@Override
public List chooseTasks(int taskId, List values) {
List boltIds = new ArrayList<>(1);
if (values.size() > 0) {
final byte[] rawKeyBytes = getKeyBytes(values);
final int[] taskAssignmentForKey = assignmentCreator.createAssignment(this.targetTasks, rawKeyBytes);
final int selectedTask = targetSelector.chooseTask(taskAssignmentForKey);
boltIds.add(selectedTask);
}
return boltIds;
}
private byte[] getKeyBytes(List values) {
byte[] raw;
if (fields != null) {
List selectedFields = outFields.select(fields, values);
ByteBuffer out = ByteBuffer.allocate(selectedFields.size() * 4);
for (Object o : selectedFields) {
if (o instanceof List) {
out.putInt(Arrays.deepHashCode(((List) o).toArray()));
} else if (o instanceof Object[]) {
out.putInt(Arrays.deepHashCode((Object[]) o));
} else if (o instanceof byte[]) {
out.putInt(Arrays.hashCode((byte[]) o));
} else if (o instanceof short[]) {
out.putInt(Arrays.hashCode((short[]) o));
} else if (o instanceof int[]) {
out.putInt(Arrays.hashCode((int[]) o));
} else if (o instanceof long[]) {
out.putInt(Arrays.hashCode((long[]) o));
} else if (o instanceof char[]) {
out.putInt(Arrays.hashCode((char[]) o));
} else if (o instanceof float[]) {
out.putInt(Arrays.hashCode((float[]) o));
} else if (o instanceof double[]) {
out.putInt(Arrays.hashCode((double[]) o));
} else if (o instanceof boolean[]) {
out.putInt(Arrays.hashCode((boolean[]) o));
} else if (o != null) {
out.putInt(o.hashCode());
} else {
out.putInt(0);
}
}
raw = out.array();
} else {
raw = values.get(0).toString().getBytes(); // assume key is the first field
}
return raw;
}
public interface AssignmentCreator extends Serializable {
int[] createAssignment(List targetTasks, byte[] key);
}
public interface TargetSelector extends Serializable {
Integer chooseTask(int[] assignedTasks);
}
public static class RandomTwoTaskAssignmentCreator implements AssignmentCreator {
@Override
public int[] createAssignment(List tasks, byte[] key) {
// 这需要产生一个基于密钥的确定性赋值。
final long seedForRandom = Arrays.hashCode(key);
final Random random = new Random(seedForRandom);
final int choice1 = random.nextInt(tasks.size());
int choice2 = random.nextInt(tasks.size());
// 确保选项 1和选项2不是同一个任务。
choice2 = choice1 == choice2 ? (choice2 + 1) % tasks.size() : choice2;
return new int[]{ tasks.get(choice1), tasks.get(choice2) };
}
}
public static class BalancedTargetSelector implements TargetSelector {
private Map targetTaskStats = Maps.newHashMap();
@Override
public Integer chooseTask(int[] assignedTasks) {
Integer taskIdWithMinLoad = null;
Long minTaskLoad = Long.MAX_VALUE;
for (Integer currentTaskId : assignedTasks) {
final Long currentTaskLoad = targetTaskStats.getOrDefault(currentTaskId, 0L);
if (currentTaskLoad < minTaskLoad) {
minTaskLoad = currentTaskLoad;
taskIdWithMinLoad = currentTaskId;
}
}
targetTaskStats.put(taskIdWithMinLoad, targetTaskStats.getOrDefault(taskIdWithMinLoad, 0L) + 1);
return taskIdWithMinLoad;
}
}
}
在prepare的时候,初始化了long[] targetTaskStats用于统计每个task partialKeyGrouping如果没有指定fields,则默认按outputFields的第一个field来计算。 BalancedTargetSelector根据选中 的taskId,然后根据targetTaskStats计算taskIdWithMinLoad返回。 这里通过RandomTwoTaskAssignmentCreator来选中两个taskId,然后选择使用次数小的那个。 getKeyBytes()方法从输入的Tuple中提取键。
评论列表(0条)