【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part1
【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part2
【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part3
//静态块,构建JanusGraphVertex反序列化对象,构建JanusGraphHadoopSetupImpl对象时,会打开图 static { refCounter = new RefCountedCloseable<>((conf) -> new JanusGraphVertexDeserializer(new JanusGraphHadoopSetupImpl(conf))); } //构建HadoopRecordReader对象 @Override public RecordReader7,JanusGraphHadoopSetupImplcreateRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { //创建该对象时,inputFormat对象为:配置文件中配置的HbaseBinaryInputFormat对象; return new HadoopRecordReader(refCounter, inputFormat.createRecordReader(split, context)); }
public JanusGraphHadoopSetupImpl(final Configuration config) { scanConf = ModifiableHadoopConfiguration.of(JanusGraphHadoopConfiguration.MAPRED_NS, config); //获取图配置信息 BasicConfiguration bc = scanConf.getJanusGraphConf(); //打开图对象 graph = (StandardJanusGraph) JanusGraphFactory.open(bc); //开始事务 tx = (StandardJanusGraphTx)graph.buildTransaction().readOnly().vertexCacheSize(200).start(); }8, JanusGraphFactory
public static JanusGraph open(ReadConfiguration configuration, String backupName) { final ModifiableConfiguration config = new ModifiableConfiguration(ROOT_NS, (WriteConfiguration) configuration, BasicConfiguration.Restriction.NONE); final String graphName = config.has(GRAPH_NAME) ? config.get(GRAPH_NAME) : backupName; final JanusGraphManager jgm = JanusGraphManagerUtility.getInstance(); if (null != graphName) { Preconditions.checkNotNull(jgm, JANUS_GRAPH_MANAGER_EXPECTED_STATE_MSG); return (JanusGraph) jgm.openGraph(graphName, gName -> new StandardJanusGraph(new GraphDatabaseConfigurationBuilder().build(configuration))); } else { if (jgm != null) { log.warn("You should supply "graph.graphname" in your .properties file configuration if you are opening " + "a graph that has not already been opened at server start, i.e. it was " + "defined in your YAML file. This will ensure the graph is tracked by the JanusGraphManager, " + "which will enable autocommit and rollback functionality upon all gremlin script executions. " + "Note that JanusGraphFactory#open(String === shortcut notation) does not support consuming the property " + ""graph.graphname" so these graphs should be accessed dynamically by supplying a .properties file here " + "or by using the ConfiguredGraphFactory."); } //构建图对象 return new StandardJanusGraph(new GraphDatabaseConfigurationBuilder().build(configuration)); } }9,GraphDatabaseConfigurationBuilder
构建graph存储库配置信息(e.g:后端存储为hbase:Hbase库的配置信息)
public GraphDatabaseConfiguration build(ReadConfiguration localConfig){ Preconditions.checkNotNull(localConfig); BasicConfiguration localBasicConfiguration = new BasicConfiguration(ROOT_NS,localConfig, BasicConfiguration.Restriction.NONE); ModifiableConfiguration overwrite = new ModifiableConfiguration(ROOT_NS,new CommonsConfiguration(), BasicConfiguration.Restriction.NONE); //此处获取的对象是:HbaseStoreManeger,数据存储管理对象 final KeyColumnValueStoreManager storeManager = Backend.getStorageManager(localBasicConfiguration); final StoreFeatures storeFeatures = storeManager.getFeatures(); //获取全局配置信息: final ReadConfiguration globalConfig = new ReadConfigurationBuilder().buildGlobalConfiguration( localConfig, localBasicConfiguration, overwrite, storeManager, new ModifiableConfigurationBuilder(), new KCVSConfigurationBuilder()); //Copy over local config options //获取局部本地配置信息;即创建人物时,传入参数信息 ModifiableConfiguration localConfiguration = new ModifiableConfiguration(ROOT_NS, new CommonsConfiguration(), BasicConfiguration.Restriction.LOCAL); localConfiguration.setAll(getLocalSubset(localBasicConfiguration.getAll())); Configuration combinedConfig = new MixedConfiguration(ROOT_NS,globalConfig,localConfig); //Compute unique instance id String uniqueGraphId = UniqueInstanceIdRetriever.getInstance().getOrGenerateUniqueInstanceId(combinedConfig); overwrite.set(UNIQUE_INSTANCE_ID, uniqueGraphId); checkAndOverwriteTransactionLogConfiguration(combinedConfig, overwrite, storeFeatures); checkAndOverwriteSystemManagementLogConfiguration(combinedConfig, overwrite); MergedConfiguration configuration = new MergedConfiguration(overwrite,combinedConfig); return new GraphDatabaseConfiguration(localConfig, localConfiguration, uniqueGraphId, configuration); }10 StandardJanusGraph
静态代码块,注册相关策略 static { TraversalStrategies graphStrategies = TraversalStrategies.GlobalCache.getStrategies(Graph.class) .clone() .addStrategies(AdjacentVertexFilterOptimizerStrategy.instance(), AdjacentVertexHasIdOptimizerStrategy.instance(), AdjacentVertexIsOptimizerStrategy.instance(), JanusGraphLocalQueryOptimizerStrategy.instance(), JanusGraphStepStrategy.instance(), JanusGraphIoRegistrationStrategy.instance()); //Register with cache TraversalStrategies.GlobalCache.registerStrategies(StandardJanusGraph.class, graphStrategies); TraversalStrategies.GlobalCache.registerStrategies(StandardJanusGraphTx.class, graphStrategies); } private final GraphDatabaseConfiguration config; private final Backend backend; private final IDManager idManager; private final VertexIDAssigner idAssigner; private final TimestampProvider times; //Serializers protected final IndexSerializer indexSerializer; protected final EdgeSerializer edgeSerializer; protected final Serializer serializer; //Caches public final SliceQuery vertexExistenceQuery; private final RelationQueryCache queryCache; private final SchemaCache schemaCache; //Log private final ManagementLogger managementLogger; //Shutdown hook private volatile ShutdownThread shutdownHook; private volatile boolean isOpen; private final AtomicLong txCounter; private final SetopenTransactions; private final String name; //构造函数,配置相关属性 public StandardJanusGraph(GraphDatabaseConfiguration configuration) { this.config = configuration; //获取存储对象 this.backend = configuration.getBackend(); this.name = configuration.getGraphName(); this.idAssigner = config.getIDAssigner(backend); this.idManager = idAssigner.getIDManager(); this.serializer = config.getSerializer(); StoreFeatures storeFeatures = backend.getStoreFeatures(); this.indexSerializer = new IndexSerializer(configuration.getConfiguration(), this.serializer, this.backend.getIndexInformation(), storeFeatures.isDistributed() && storeFeatures.isKeyOrdered()); this.edgeSerializer = new EdgeSerializer(this.serializer); this.vertexExistenceQuery = edgeSerializer.getQuery(baseKey.VertexExists, Direction.OUT, new EdgeSerializer.TypedInterval[0]).setLimit(1); this.queryCache = new RelationQueryCache(this.edgeSerializer); this.schemaCache = configuration.getTypeCache(typeCacheRetrieval); this.times = configuration.getTimestampProvider(); isOpen = true; txCounter = new AtomicLong(0); openTransactions = Collections.newSetFromMap(new ConcurrentHashMap (100, 0.75f, 1)); //Register instance and ensure uniqueness String uniqueInstanceId = configuration.getUniqueGraphId(); ModifiableConfiguration globalConfig = getGlobalSystemConfig(backend); final boolean instanceExists = globalConfig.has(REGISTRATION_TIME, uniqueInstanceId); final boolean replaceExistingInstance = configuration.getConfiguration().get(REPLACE_INSTANCE_IF_EXISTS); if (instanceExists && !replaceExistingInstance) { throw new JanusGraphException(String.format("A JanusGraph graph with the same instance id [%s] is already open. Might required forced shutdown.", uniqueInstanceId)); } else if (instanceExists && replaceExistingInstance) { log.debug(String.format("Instance [%s] already exists. Opening the graph per " + REPLACE_INSTANCE_IF_EXISTS.getName() + " configuration.", uniqueInstanceId)); } globalConfig.set(REGISTRATION_TIME, times.getTime(), uniqueInstanceId); Log managementLog = backend.getSystemMgmtLog(); managementLogger = new ManagementLogger(this, managementLog, schemaCache, this.times); managementLog.registerReader(ReadMarker.fromNow(), managementLogger); shutdownHook = new ShutdownThread(this); Runtime.getRuntime().addShutdownHook(shutdownHook); log.debug("Installed shutdown hook {}", shutdownHook, new Throwable("Hook creation trace")); }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)