eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.CoreV1().RESTClient()).Events("")})
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addDeployment, UpdateFunc: dc.updateDeployment, // This will enter the sync loop and no-op, because the deployment has been deleted from the store. DeleteFunc: dc.deleteDeployment, }) rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addReplicaSet, UpdateFunc: dc.updateReplicaSet, DeleteFunc: dc.deleteReplicaSet, }) podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: dc.deletePod, })
// NewDeploymentController creates a new DeploymentController. funcNewDeploymentController(dInformer extensionsinformers.DeploymentInformer, rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface)(*DeploymentController, error) { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) // TODO: remove the wrapper when every clients have moved to use the clientset. eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.CoreV1().RESTClient()).Events("")})
dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addDeployment, UpdateFunc: dc.updateDeployment, // This will enter the sync loop and no-op, because the deployment has been deleted from the store. DeleteFunc: dc.deleteDeployment, }) rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: dc.addReplicaSet, UpdateFunc: dc.updateReplicaSet, DeleteFunc: dc.deleteReplicaSet, }) podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: dc.deletePod, })
// WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages // indicating that the controller identified by controllerName is waiting for syncs, followed by // either a successful or failed sync. funcWaitForCacheSync(controllerName string, stopCh <-chanstruct{}, cacheSyncs ...cache.InformerSynced)bool { glog.Infof("Waiting for caches to sync for %s controller", controllerName)
if !cache.WaitForCacheSync(stopCh, cacheSyncs...) { utilruntime.HandleError(fmt.Errorf("Unable to sync caches for %s controller", controllerName)) returnfalse }
glog.Infof("Caches are synced for %s controller", controllerName) returntrue }
// worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func(dc *DeploymentController)worker() { for dc.processNextWorkItem() { } }
// syncDeployment will sync the deployment with the given key. // This function is not meant to be invoked concurrently with the same key. func(dc *DeploymentController)syncDeployment(key string)error { startTime := time.Now() glog.V(4).Infof("Started syncing deployment %q (%v)", key, startTime) deferfunc() { glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Since(startTime)) }()
namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } deployment, err := dc.dLister.Deployments(namespace).Get(name) if errors.IsNotFound(err) { glog.V(2).Infof("Deployment %v has been deleted", key) returnnil } if err != nil { return err }
// Deep-copy otherwise we are mutating our cache. // TODO: Deep-copy only when needed. d := deployment.DeepCopy()
everything := metav1.LabelSelector{} if reflect.DeepEqual(d.Spec.Selector, &everything) { dc.eventRecorder.Eventf(d, v1.EventTypeWarning, "SelectingAll", "This deployment is selecting all pods. A non-empty selector is required.") if d.Status.ObservedGeneration < d.Generation { d.Status.ObservedGeneration = d.Generation dc.client.ExtensionsV1beta1().Deployments(d.Namespace).UpdateStatus(d) } returnnil }
// List ReplicaSets owned by this Deployment, while reconciling ControllerRef // through adoption/orphaning. rsList, err := dc.getReplicaSetsForDeployment(d) if err != nil { return err } // List all Pods owned by this Deployment, grouped by their ReplicaSet. // Current uses of the podMap are: // // * check if a Pod is labeled correctly with the pod-template-hash label. // * check that no old Pods are running in the middle of Recreate Deployments. podMap, err := dc.getPodMapForDeployment(d, rsList) if err != nil { return err }
if d.DeletionTimestamp != nil { return dc.syncStatusOnly(d, rsList, podMap) }
// Update deployment conditions with an Unknown condition when pausing/resuming // a deployment. In this way, we can be sure that we won't timeout when a user // resumes a Deployment with a set progressDeadlineSeconds. if err = dc.checkPausedConditions(d); err != nil { return err }
if d.Spec.Paused { return dc.sync(d, rsList, podMap) }
// rollback is not re-entrant in case the underlying replica sets are updated with a new // revision so we should ensure that we won't proceed to update replica sets until we // make sure that the deployment has cleaned up its rollback spec in subsequent enqueues. if d.Spec.RollbackTo != nil { return dc.rollback(d, rsList, podMap) }
// get namespace and deployment name namespace, name, err := cache.SplitMetaNamespaceKey(key) // get deployment by name deployment, err := dc.dLister.Deployments(namespace).Get(name)
4.2. getReplicaSetsForDeployment
1 2 3
// List ReplicaSets owned by this Deployment, while reconciling ControllerRef // through adoption/orphaning. rsList, err := dc.getReplicaSetsForDeployment(d)
// getReplicaSetsForDeployment uses ControllerRefManager to reconcile // ControllerRef by adopting and orphaning. // It returns the list of ReplicaSets that this Deployment should manage. func(dc *DeploymentController)getReplicaSetsForDeployment(d *apps.Deployment)([]*apps.ReplicaSet, error) { // List all ReplicaSets to find those we own but that no longer match our // selector. They will be orphaned by ClaimReplicaSets(). rsList, err := dc.rsLister.ReplicaSets(d.Namespace).List(labels.Everything()) if err != nil { returnnil, err } deploymentSelector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector) if err != nil { returnnil, fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err) } // If any adoptions are attempted, we should first recheck for deletion with // an uncached quorum read sometime after listing ReplicaSets (see #42639). canAdoptFunc := controller.RecheckDeletionTimestamp(func()(metav1.Object, error) { fresh, err := dc.client.AppsV1().Deployments(d.Namespace).Get(d.Name, metav1.GetOptions{}) if err != nil { returnnil, err } if fresh.UID != d.UID { returnnil, fmt.Errorf("original Deployment %v/%v is gone: got uid %v, wanted %v", d.Namespace, d.Name, fresh.UID, d.UID) } return fresh, nil }) cm := controller.NewReplicaSetControllerRefManager(dc.rsControl, d, deploymentSelector, controllerKind, canAdoptFunc) return cm.ClaimReplicaSets(rsList) }
4.3. getPodMapForDeployment
1 2 3 4 5 6
// List all Pods owned by this Deployment, grouped by their ReplicaSet. // Current uses of the podMap are: // // * check if a Pod is labeled correctly with the pod-template-hash label. // * check that no old Pods are running in the middle of Recreate Deployments. podMap, err := dc.getPodMapForDeployment(d, rsList)
// getPodMapForDeployment returns the Pods managed by a Deployment. // // It returns a map from ReplicaSet UID to a list of Pods controlled by that RS, // according to the Pod's ControllerRef. func(dc *DeploymentController)getPodMapForDeployment(d *apps.Deployment, rsList []*apps.ReplicaSet)(map[types.UID]*v1.PodList, error) { // Get all Pods that potentially belong to this Deployment. selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector) if err != nil { returnnil, err } pods, err := dc.podLister.Pods(d.Namespace).List(selector) if err != nil { returnnil, err } // Group Pods by their controller (if it's in rsList). podMap := make(map[types.UID]*v1.PodList, len(rsList)) for _, rs := range rsList { podMap[rs.UID] = &v1.PodList{} } for _, pod := range pods { // Do not ignore inactive Pods because Recreate Deployments need to verify that no // Pods from older versions are running before spinning up new Pods. controllerRef := metav1.GetControllerOf(pod) if controllerRef == nil { continue } // Only append if we care about this UID. if podList, ok := podMap[controllerRef.UID]; ok { podList.Items = append(podList.Items, *pod) } } return podMap, nil }
4.4. checkPausedConditions
1 2 3 4 5 6 7 8 9 10
// Update deployment conditions with an Unknown condition when pausing/resuming // a deployment. In this way, we can be sure that we won't timeout when a user // resumes a Deployment with a set progressDeadlineSeconds. if err = dc.checkPausedConditions(d); err != nil { return err }
// checkPausedConditions checks if the given deployment is paused or not and adds an appropriate condition. // These conditions are needed so that we won't accidentally report lack of progress for resumed deployments // that were paused for longer than progressDeadlineSeconds. func(dc *DeploymentController)checkPausedConditions(d *apps.Deployment)error { if !deploymentutil.HasProgressDeadline(d) { returnnil } cond := deploymentutil.GetDeploymentCondition(d.Status, apps.DeploymentProgressing) if cond != nil && cond.Reason == deploymentutil.TimedOutReason { // If we have reported lack of progress, do not overwrite it with a paused condition. returnnil } pausedCondExists := cond != nil && cond.Reason == deploymentutil.PausedDeployReason
// isScalingEvent checks whether the provided deployment has been updated with a scaling event // by looking at the desired-replicas annotation in the active replica sets of the deployment. // // rsList should come from getReplicaSetsForDeployment(d). // podMap should come from getPodMapForDeployment(d, rsList). func(dc *DeploymentController)isScalingEvent(d *apps.Deployment, rsList []*apps.ReplicaSet)(bool, error) { newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false) if err != nil { returnfalse, err } allRSs := append(oldRSs, newRS) for _, rs := range controller.FilterActiveReplicaSets(allRSs) { desired, ok := deploymentutil.GetDesiredReplicasAnnotation(rs) if !ok { continue } if desired != *(d.Spec.Replicas) { returntrue, nil } } returnfalse, nil }
4.6. rolloutRecreate
1 2 3
switch d.Spec.Strategy.Type { case apps.RecreateDeploymentStrategyType: return dc.rolloutRecreate(d, rsList, podMap)
// rolloutRecreate implements the logic for recreating a replica set. func(dc *DeploymentController)rolloutRecreate(d *apps.Deployment, rsList []*apps.ReplicaSet, podMap map[types.UID]*v1.PodList)error { // Don't create a new RS if not already existed, so that we avoid scaling up before scaling down. newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(d, rsList, false) if err != nil { return err } allRSs := append(oldRSs, newRS) activeOldRSs := controller.FilterActiveReplicaSets(oldRSs)
// scale down old replica sets. scaledDown, err := dc.scaleDownOldReplicaSetsForRecreate(activeOldRSs, d) if err != nil { return err } if scaledDown { // Update DeploymentStatus. return dc.syncRolloutStatus(allRSs, newRS, d) }
// Do not process a deployment when it has old pods running. if oldPodsRunning(newRS, oldRSs, podMap) { return dc.syncRolloutStatus(allRSs, newRS, d) }
// If we need to create a new RS, create it now. if newRS == nil { newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, true) if err != nil { return err } allRSs = append(oldRSs, newRS) }
// scale up new replica set. if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil { return err }
if util.DeploymentComplete(d, &d.Status) { if err := dc.cleanupDeployment(oldRSs, d); err != nil { return err } }