// use zap logging cli options
//set the controller-runtime logger
opts := zap.Options{}
//add the zap flagset to the operator’s command line flags
opts.BindFlags(flag.CommandLine)
//定义 监控暴露的端口
flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
//定义 feature gate 开关
flag.StringVar(&featureGatesString, "feature-gates", "", "Feature gate to enable, format is a command separated list enabling features, for instance RunAsNonRoot=false")
//是否开启leader 选举
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
"Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
//是否启用webhook
flag.BoolVar(&skipWebhookConfig, "skip-webhook-config", true,
"When set, don't setup webhook TLS certificates. Useful in OpenShift where this step is handled already.")
flag.Parse()
// create logger using zap cli options
// for instance --zap-log-level=debug
logger := zap.New(zap.UseFlagOptions(&opts))
ctrl.SetLogger(logger)
2.初始化manager
watchNamespace := os.Getenv(watchNamespaceEnvVar)
//Options are the arguments for creating a new Manager
mgrOpts := ctrl.Options{
Scheme: scheme,
Namespace: watchNamespace,
MetricsBindAddress: metricsAddr,
LeaderElection: enableLeaderElection,
Port: 9443,
CertDir: certDir,
}
mgrOpts 作为实例化manager的参数
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), mgrOpts)
if err != nil {
setupLog.Error(err, "unable to create manager")
os.Exit(1)
}
// NewManager returns a new Manager for creating Controllers.
NewManager = manager.New
// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
// Set default values for options fields
options = setOptionsDefaults(options)
cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
clusterOptions.Scheme = options.Scheme
clusterOptions.MapperProvider = options.MapperProvider
clusterOptions.Logger = options.Logger
clusterOptions.SyncPeriod = options.SyncPeriod
clusterOptions.Namespace = options.Namespace
clusterOptions.NewCache = options.NewCache
clusterOptions.ClientBuilder = options.ClientBuilder
clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
clusterOptions.DryRunClient = options.DryRunClient
clusterOptions.EventBroadcaster = options.EventBroadcaster
})
在cluster.New函数中初始化了MapperProvider,cache和client
// New constructs a brand new cluster
func New(config *rest.Config, opts ...Option) (Cluster, error) {
if config == nil {
return nil, errors.New("must specify Config")
}
options := Options{}
for _, opt := range opts {
opt(&options)
}
options = setOptionsDefaults(options)
// Create the mapper provider
mapper, err := options.MapperProvider(config)
if err != nil {
options.Logger.Error(err, "Failed to get API Group-Resources")
return nil, err
}
// Create the cache for the cached read client and registering informers
cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
if err != nil {
return nil, err
}
clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper}
MapperProvider返回RESTMapper,RESTMapper作为GVK到GVR的映射, GVK可以从资源对象json或yaml文件中提取, 通过GVK可以得到GVR, 通过GVR可以拼接出资源的http api请求路径。主要作用是在listWatch时, 根据Schema定义的类型GVK解析出GVR, 向apiserver发起http请求获取资源, 然后watch。 watch基于http的Chunk实现,维护长连接。 mapper主要是在ListAndWatch中使用到创建cache: 使用默认的cache.New创建, cache中可以储存原生api对象也可以存储crd对象
cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace})
// Allow newCache to be mocked
if options.NewCache == nil {
options.NewCache = cache.New
}
// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
opts, err := defaultOpts(config, opts)
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
return &informerCache{InformersMap: im}, nil
}
创建client:client 创建了两个一个用于读,一个用于写,用于读的会直接使用上面的 cache,用于写的才会直接和 APIServer 进行交互
clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper}
apiReader, err := client.New(config, clientOptions)
if err != nil {
return nil, err
}
writeObj, err := options.ClientBuilder.
WithUncached(options.ClientDisableCacheFor...).
Build(cache, config, clientOptions)
if err != nil {
return nil, err
}
3.启动controller
if err = (&controllers.MemcachedReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Memcached")
os.Exit(1)
}
main.go 的方法里面主要是初始化了 Controller 的结构体,然后调用了 SetupWithManager方法
// SetupWithManager sets up the controller with the Manager.
func (r *MemcachedReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&cachev1alpha1.Memcached{}).
Complete(r)
}
setupLog.Info("starting manager")
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
func (cm *controllerManager) Start(ctx context.Context) (err error) {
if err := cm.Add(cm.cluster); err != nil {
return fmt.Errorf("failed to add cluster to runnables: %w", err)
}
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)
// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
stopComplete := make(chan struct{})
defer close(stopComplete)
// This must be deferred after closing stopComplete, otherwise we deadlock.
defer func() {
stopErr := cm.engageStopProcedure(stopComplete)
if stopErr != nil {
if err != nil {
// Utilerrors.Aggregate allows to use errors.Is for all contained errors
// whereas fmt.Errorf allows wrapping at most one error which means the
// other one can not be found anymore.
err = utilerrors.NewAggregate([]error{err, stopErr})
} else {
err = stopErr
}
}
}()
// initialize this here so that we reset the signal channel state on every start
// Everything that might write into this channel must be started in a new goroutine,
// because otherwise we might block this routine trying to write into the full channel
// and will not be able to enter the deferred cm.engageStopProcedure() which drains
// it.
cm.errChan = make(chan error)
// Metrics should be served whether the controller is leader or not.
// (If we don't serve metrics for non-leaders, prometheus will still scrape
// the pod but will get a connection refused)
if cm.metricsListener != nil {
go cm.serveMetrics()
}
// Serve health probes
if cm.healthProbeListener != nil {
go cm.serveHealthProbes()
}
go cm.startNonLeaderElectionRunnables()
go func() {
if cm.resourceLock != nil {
err := cm.startLeaderElection()
if err != nil {
cm.errChan <- err
}
} else {
// Treat not having leader election enabled the same as being elected.
cm.startLeaderElectionRunnables()
close(cm.elected)
}
}()
select {
case <-ctx.Done():
// We are done
return nil
case err := <-cm.errChan:
// Error starting or running a runnable
return err
}
}
func (cm *controllerManager) startNonLeaderElectionRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.waitForCache(cm.internalCtx)
// Start the non-leaderelection Runnables after the cache has synced
for _, c := range cm.nonLeaderElectionRunnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
cm.startRunnable(c)
}
}
func (cm *controllerManager) startRunnable(r Runnable) {
cm.waitForRunnable.Add(1)
go func() {
defer cm.waitForRunnable.Done()
if err := r.Start(cm.internalCtx); err != nil {
cm.errChan <- err
}
}()
}
最终调用了controller中的start方法
// Start implements controller.Controller
func (c *Controller) Start(ctx context.Context) error {
// use an IIFE to get proper lock handling
// but lock outside to get proper handling of the queue shutdown
c.mu.Lock()
if c.Started {
return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
}
c.initMetrics()
// Set the internal context.
c.ctx = ctx
c.Queue = c.MakeQueue()
defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed
err := func() error {
defer c.mu.Unlock()
// TODO(pwittrock): Reconsider HandleCrash
defer utilruntime.HandleCrash()
// NB(directxman12): launch the sources *before* trying to wait for the
// caches to sync so that they have a chance to register their intendeded
// caches.
for _, watch := range c.startWatches {
c.Log.Info("Starting EventSource", "source", watch.src)
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
c.Log.Info("Starting Controller")
for _, watch := range c.startWatches {
syncingSource, ok := watch.src.(source.SyncingSource)
if !ok {
continue
}
if err := func() error {
// use a context with timeout for launching sources and syncing caches.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()
// WaitForSync waits for a definitive timeout, and returns if there
// is an error or a timeout
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
c.Log.Error(err, "Could not wait for Cache to sync")
return err
}
return nil
}(); err != nil {
return err
}
}
// All the watches have been started, we can reset the local slice.
//
// We should never hold watches more than necessary, each watch source can hold a backing cache,
// which won't be garbage collected if we hold a reference to it.
c.startWatches = nil
if c.JitterPeriod == 0 {
c.JitterPeriod = 1 * time.Second
}
// Launch workers to process resources
c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go wait.UntilWithContext(ctx, func(ctx context.Context) {
// Run a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the reconcileHandler is never invoked concurrently with the same object.
for c.processNextWorkItem(ctx) {
}
}, c.JitterPeriod)
}
c.Started = true
return nil
}()
if err != nil {
return err
}
<-ctx.Done()
c.Log.Info("Stopping workers")
return nil
}
Reconcile 方法的触发是通过 Cache 中的 Informer 获取到资源的变更事件,然后再通过生产者消费者的模式触发我们自己实现的 Reconcile 方法的。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)