架构设计IPTV行业桌面数据分析,分析用户行为数据。每天数据量可达3000万。统计用户访问量(PV)、UV(独立用户)、VV(视频播放次数)、
DAU(日活)、WAU(周活)、MAU(月活)、月开机率、点击次数、排行榜数据等等。
client上报数据存入缓存中定时将缓存的字符流刷新到文件,并将文件上传到hdfs通过mq 客户端发送至服务端mq服务端监听到hdfs进行处理( 将字节数组反序列化为实体Bean)将其实体Bean写入mongo数据库利用mongoDB聚合函数aggregate()查询(分库分表) 环境部署
HADOOP环境搭建:[Hadoop环境搭建地址]
RabbitMQ环境搭建:[RabbitMQ环境搭建地址]
@RequestMapping(value = "/singleData/{policyId}") public ResponseData postSingleData(@PathVariable("policyId") String policyId, HttpServletRequest resuest) { ResponseData response = getResponseData(); try { ServletInputStream inputStream = resuest.getInputStream(); StringBuffer sb = new StringBuffer(); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, "utf-8")); String data = null; while ((data = bufferedReader.readLine()) != null) { sb.append(data); sb.append("n"); } if (sb.length() <= 0) { throw new Exception("请提供提交数据"); } DataStr s = new DataStr(policyId, sb); service.receiveNewData(s); } catch (Exception e) { response.setCode(0); response.setMsg(e.getMessage()); } return response; }定时将缓存的字符流刷新到文件
private void flushData() throws Exception { Queue文件上传到hdfs,并通过mq发送dataCache = DataPond.getDataCache(); DataStr dataIte = null; Integer size = dataCache.size(); logger.info("There are [" + size + "] of datas in queue"); while (size > 0 && (dataIte = dataCache.poll()) != null) { String policyId = dataIte.getPolicyId(); Map fileCache = DataPond.getFileCache(); FileStruct fileStruct = fileCache.get(policyId); if (fileStruct == null) { fileStruct = new FileStruct(policyId); fileCache.put(policyId, fileStruct); } fileStruct.write(dataIte.getData().toString()); fileStruct.write("n"); size--; } }
private void uploadFilesAndSendToMQ() throws Exception { Mapmq服务端监听到hdfs进行处理fileCache = DataPond.getFileCache(); Set keySet = fileCache.keySet(); for (String key : keySet) { FileStruct fs = fileCache.get(key); Boolean shallBeFlush = false; if (fs.getFielSize() >= SystemChangeableConstant.MAX_FILESIZE) { shallBeFlush = true; } if (System.currentTimeMillis() - fs.getLastUpdateTime() >= SystemChangeableConstant.MAX_FILE_NOACTION_TIMESPAN) { shallBeFlush = true; } if (shallBeFlush) { if (!hdfsUtil.isOpened()) { //TODO 临时获取hadoop环境变量 System.setProperty("hadoop.home.dir", "D:\hadoop-2.6.5"); hdfsUtil.open(); } logger.info("File of policy [" + key + "] is full and will send out!"); fs.flush(); try { transferFileToDfs(fs); logger.info("File of policy [" + key + "] send to hdfs success"); } catch (Exception e) { logger.error("File of policy [" + key + "] send to hdfs fail as " + e.getMessage()); } try { sendToMq(fs); logger.info("File of policy [" + key + "] send to MQ success"); } catch (Exception e) { logger.error("File of policy [" + key + "] send to MQ fail as " + e.getMessage()); } fileCache.remove(key); fs.destroy(); } } hdfsUtil.close(); }
Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { logger.info("Router [" + SystemConstant.NEW_FILE_QUEUE + "] received new file"); FileStruct fileStruct = (FileStruct) SerializingUtil.deserialize(body); factory.createNewTask(fileStruct); } }; channel.basicConsume(SystemConstant.NEW_FILE_QUEUE, true, consumer);数据入mongo
for (HashMap aMap : parsedDatas) { // 预置的默认时间列 aMap.put(DataTable.DEFAUL_TTIMECOLUMN, now); dataTable.genId(aMap); String tableName = dataTable.getSplitTableName(aMap); document doc = new document(aMap); if (dataTable.hasIdentify()) { mongoConnector.insertSingle(tableName, doc); } else { List聚合分析list = tableToDatasMap.get(tableName); if (list == null) { list = new ArrayList (parsedDatas.size() / 2); tableToDatasMap.put(tableName, list); } list.add(doc); } } for (String key : tableToDatasMap.keySet()) { mongoConnector.insert(key, tableToDatasMap.get(key)); }
public List展示
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)