

// use zap logging cli options
	//set the controller-runtime logger 
	opts := zap.Options{}
	//add the zap flagset to the operator’s command line flags
	//定义 监控暴露的端口
	flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
	//定义 feature gate 开关
	flag.StringVar(&featureGatesString, "feature-gates", "", "Feature gate to enable, format is a command separated list enabling features, for instance RunAsNonRoot=false")
	//是否开启leader 选举
	flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
		"Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
	flag.BoolVar(&skipWebhookConfig, "skip-webhook-config", true,
		"When set, don't setup webhook TLS certificates. Useful in OpenShift where this step is handled already.")

	// create logger using zap cli options
	// for instance --zap-log-level=debug
	logger := zap.New(zap.UseFlagOptions(&opts))


watchNamespace := os.Getenv(watchNamespaceEnvVar)
	//Options are the arguments for creating a new Manager
	mgrOpts := ctrl.Options{
		Scheme:             scheme,
		Namespace:          watchNamespace,
		MetricsBindAddress: metricsAddr,
		LeaderElection:     enableLeaderElection,
		Port:               9443,
		CertDir:            certDir,

mgrOpts 作为实例化manager的参数

	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), mgrOpts)
	if err != nil {
		setupLog.Error(err, "unable to create manager")
// NewManager returns a new Manager for creating Controllers.
	NewManager = manager.New
// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
	// Set default values for options fields
	options = setOptionsDefaults(options)

	cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
		clusterOptions.Scheme = options.Scheme
		clusterOptions.MapperProvider = options.MapperProvider
		clusterOptions.Logger = options.Logger
		clusterOptions.SyncPeriod = options.SyncPeriod
		clusterOptions.Namespace = options.Namespace
		clusterOptions.NewCache = options.NewCache
		clusterOptions.ClientBuilder = options.ClientBuilder
		clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
		clusterOptions.DryRunClient = options.DryRunClient
		clusterOptions.EventBroadcaster = options.EventBroadcaster


// New constructs a brand new cluster
func New(config *rest.Config, opts ...Option) (Cluster, error) {
	if config == nil {
		return nil, errors.New("must specify Config")

	options := Options{}
	for _, opt := range opts {
	options = setOptionsDefaults(options)

	// Create the mapper provider
	mapper, err := options.MapperProvider(config)
	if err != nil {
		options.Logger.Error(err, "Failed to get API Group-Resources")
		return nil, err

	// Create the cache for the cached read client and registering informers
	cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
	if err != nil {
		return nil, err

	clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper}

MapperProvider返回RESTMapper,RESTMapper作为GVK到GVR的映射, GVK可以从资源对象json或yaml文件中提取, 通过GVK可以得到GVR, 通过GVR可以拼接出资源的http api请求路径。主要作用是在listWatch时, 根据Schema定义的类型GVK解析出GVR, 向apiserver发起http请求获取资源, 然后watch。 watch基于http的Chunk实现,维护长连接。 mapper主要是在ListAndWatch中使用到创建cache: 使用默认的cache.New创建, cache中可以储存原生api对象也可以存储crd对象
	cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
	// Allow newCache to be mocked
	if options.NewCache == nil {
		options.NewCache = cache.New
// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
	opts, err := defaultOpts(config, opts)
	if err != nil {
		return nil, err
	im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
	return &informerCache{InformersMap: im}, nil
创建client:client 创建了两个一个用于读,一个用于写,用于读的会直接使用上面的 cache,用于写的才会直接和 APIServer 进行交互
	clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper}

	apiReader, err := client.New(config, clientOptions)
	if err != nil {
		return nil, err

	writeObj, err := options.ClientBuilder.
		Build(cache, config, clientOptions)
	if err != nil {
		return nil, err
	if err = (&controllers.MemcachedReconciler{
		Client: mgr.GetClient(),
		Scheme: mgr.GetScheme(),
	}).SetupWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "Memcached")
main.go 的方法里面主要是初始化了 Controller 的结构体,然后调用了 SetupWithManager方法
// SetupWithManager sets up the controller with the Manager.
func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
	setupLog.Info("starting manager")
	if err := mgr.Start(ctx); err != nil {
		setupLog.Error(err, "problem running manager")
func (cm *controllerManager) Start(ctx context.Context) (err error) {
	if err := cm.Add(cm.cluster); err != nil {
		return fmt.Errorf("failed to add cluster to runnables: %w", err)
	cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

	// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
	stopComplete := make(chan struct{})
	defer close(stopComplete)
	// This must be deferred after closing stopComplete, otherwise we deadlock.
	defer func() {
		stopErr := cm.engageStopProcedure(stopComplete)
		if stopErr != nil {
			if err != nil {
				// Utilerrors.Aggregate allows to use errors.Is for all contained errors
				// whereas fmt.Errorf allows wrapping at most one error which means the
				// other one can not be found anymore.
				err = utilerrors.NewAggregate([]error{err, stopErr})
			} else {
				err = stopErr

	// initialize this here so that we reset the signal channel state on every start
	// Everything that might write into this channel must be started in a new goroutine,
	// because otherwise we might block this routine trying to write into the full channel
	// and will not be able to enter the deferred cm.engageStopProcedure() which drains
	// it.
	cm.errChan = make(chan error)

	// Metrics should be served whether the controller is leader or not.
	// (If we don't serve metrics for non-leaders, prometheus will still scrape
	// the pod but will get a connection refused)
	if cm.metricsListener != nil {
		go cm.serveMetrics()

	// Serve health probes
	if cm.healthProbeListener != nil {
		go cm.serveHealthProbes()

	go cm.startNonLeaderElectionRunnables()

	go func() {
		if cm.resourceLock != nil {
			err := cm.startLeaderElection()
			if err != nil {
				cm.errChan <- err
		} else {
			// Treat not having leader election enabled the same as being elected.

	select {
	case <-ctx.Done():
		// We are done
		return nil
	case err := <-cm.errChan:
		// Error starting or running a runnable
		return err
func (cm *controllerManager) startNonLeaderElectionRunnables() {
	defer cm.mu.Unlock()


	// Start the non-leaderelection Runnables after the cache has synced
	for _, c := range cm.nonLeaderElectionRunnables {
		// Controllers block, but we want to return an error if any have an error starting.
		// Write any Start errors to a channel so we can return them
func (cm *controllerManager) startRunnable(r Runnable) {
	go func() {
		defer cm.waitForRunnable.Done()
		if err := r.Start(cm.internalCtx); err != nil {
			cm.errChan <- err


// Start implements controller.Controller
func (c *Controller) Start(ctx context.Context) error {
	// use an IIFE to get proper lock handling
	// but lock outside to get proper handling of the queue shutdown
	if c.Started {
		return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")


	// Set the internal context.
	c.ctx = ctx

	c.Queue = c.MakeQueue()
	defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed

	err := func() error {
		defer c.mu.Unlock()

		// TODO(pwittrock): Reconsider HandleCrash
		defer utilruntime.HandleCrash()

		// NB(directxman12): launch the sources *before* trying to wait for the
		// caches to sync so that they have a chance to register their intendeded
		// caches.
		for _, watch := range c.startWatches {
			c.Log.Info("Starting EventSource", "source", watch.src)

			if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
				return err

		// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
		c.Log.Info("Starting Controller")

		for _, watch := range c.startWatches {
			syncingSource, ok := watch.src.(source.SyncingSource)
			if !ok {

			if err := func() error {
				// use a context with timeout for launching sources and syncing caches.
				sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
				defer cancel()

				// WaitForSync waits for a definitive timeout, and returns if there
				// is an error or a timeout
				if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
					err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
					c.Log.Error(err, "Could not wait for Cache to sync")
					return err

				return nil
			}(); err != nil {
				return err

		// All the watches have been started, we can reset the local slice.
		// We should never hold watches more than necessary, each watch source can hold a backing cache,
		// which won't be garbage collected if we hold a reference to it.
		c.startWatches = nil

		if c.JitterPeriod == 0 {
			c.JitterPeriod = 1 * time.Second

		// Launch workers to process resources
		c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
		for i := 0; i < c.MaxConcurrentReconciles; i++ {
			go wait.UntilWithContext(ctx, func(ctx context.Context) {
				// Run a worker thread that just dequeues items, processes them, and marks them done.
				// It enforces that the reconcileHandler is never invoked concurrently with the same object.
				for c.processNextWorkItem(ctx) {
			}, c.JitterPeriod)

		c.Started = true
		return nil
	if err != nil {
		return err

	c.Log.Info("Stopping workers")
	return nil

Reconcile 方法的触发是通过 Cache 中的 Informer 获取到资源的变更事件,然后再通过生产者消费者的模式触发我们自己实现的 Reconcile 方法的。


原文地址: http://outofmemory.cn/langs/990826.html

打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21



