import com.alibaba.fastjson.JSONObject; import com.poizon.bigdata.flink.job.baseFlinkJob; import com.poizon.bigdata.flink.utils.TimeUtil; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; @Slf4j public class UnifyStateJobUV4 extends baseFlinkJob { private static Logger logger = LoggerFactory.getLogger(UnifyStateJobUV4.class); public static void main(String[] args) throws Exception { ParameterTool parameters = ParameterTool.fromArgs(args); MapkvMap = parameters.toMap(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(30000L); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60000L); env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000); env.setStateBackend(new FsStateBackend("file:///Users/wangpengpeng/dewu/checkpoint")); DataStreamSource stringDataStreamSource = env.addSource(MyKafkaUtil.getKafkaConsumer("test", "wppp")); //TODO 3.将每行数据转换为JSON对象 OutputTag outputTag = new OutputTag ("Dirty") {}; SingleOutputStreamOperator mapStream = stringDataStreamSource.map(new RichMapFunction () { @Override public EventBean map(String s) throws Exception { EventBean eventBean= JSONObject.parseObject(s, EventBean.class); return eventBean; } }).setParallelism(1); mapStream.map(new RichMapFunction () { @Override public void open(Configuration parameters) throws Exception { } @Override public EventBean map(EventBean eventBean) throws Exception { return eventBean; } }).setParallelism(1); SingleOutputStreamOperator process = mapStream .keyBy(row-> row.getUid()) .process(new KeyedProcessFunction () { MapState mapState; ValueState valueState; @Override public void open(Configuration configuration){ MapStateDescriptor mapStateDescriptor = new MapStateDescriptor ("uv", String.class, Integer.class); mapState = getRuntimeContext().getMapState(mapStateDescriptor); valueState = getRuntimeContext().getState(new ValueStateDescriptor ("uv_cnt", Long.class)); } @Override public void processElement(EventBean eventBean, Context context, Collector collector) throws Exception { try { String dateTime = TimeUtil.getShortDateFormat(eventBean.getTime()); String key = dateTime + eventBean.getUid(); Integer value = mapState.get(key); logger.info(dateTime +",value:" +value+"," +key); if (value == null ) { Long currentCnt = valueState.value() == null ? 0L: valueState.value(); logger.info("currentCnt" +currentCnt +", valueState.value()" + valueState.value()); valueState.update(currentCnt+ 1L); }else { valueState.update(valueState.value()); } mapState.put(key, 1); collector.collect(dateTime + "的uv:" + valueState.value()); } catch (Exception e) { System.out.println(e.getMessage()); } } }).setParallelism(1); process.print("正常数据"); env.execute("ddd"); } }
import lombok.Data; import lombok.Getter; @Data public class EventBean { public String uid; public Long time; }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)