class OpenShift::Runtime::Utils::Cgroups::Throttler

Attributes

interval[R]
running_apps[R]
threshold[R]
uuids[R]
wanted_keys[R]

Public Class Methods

new() click to toggle source
# File lib/openshift-origin-node/utils/cgroups/throttler.rb, line 18
def initialize
  # Make sure we create a MonitoredGear for the root OpenShift cgroup
  # Keys for information we want from cgroups
  @wanted_keys = %w(usage throttled_time nr_periods cfs_quota_us).map(&:to_sym)
  # Allow us to synchronize destructive operations to @running_apps
  @mutex = Mutex.new
  @uuids = []

  throttler_config = ::OpenShift::Runtime::Utils::Cgroups::Config.new(@@conf_file).get_group('cg_template_throttled')

  # Set the interval to save
  @interval = throttler_config.get('apply_period').to_i rescue nil
  # The threshold to query against
  @threshold = throttler_config.get('apply_threshold').to_i rescue nil

  raise ArgumentError, "#{@@conf_file} requires 'apply_period' in '[cg_template_throttled]' group" if @interval.nil?
  raise ArgumentError, "#{@@conf_file} requires 'apply_threshold' in '[cg_template_throttled]' group" if @threshold.nil?

  MonitoredGear.intervals = [@interval]

  # Allow us to lazy initialize MonitoredGears
  @running_apps = Hash.new do |h,k|
    h[k] = MonitoredGear.new(k)
  end

  Syslog.open(File.basename($0), Syslog::LOG_PID, Syslog::LOG_DAEMON) unless Syslog.opened?
  Syslog.info("Starting throttler => threshold: #{@threshold.to_f}%%/#{@interval}s, check_interval: #{MonitoredGear.delay}")

  # Start our collector thread
  start
end

Public Instance Methods

apply_action(hash, cur_util) click to toggle source
# File lib/openshift-origin-node/utils/cgroups/throttler.rb, line 160
def apply_action(hash, cur_util)
  util = cur_util.inject({}) do |h,(uuid,vals)|
    h[uuid] = (vals.map{|k,v| v[:usage_percent] }.first || '???')
    h
  end

  hash.each do |action, gears|
    str = action || "over_threshold"
    gears.each do |uuid, g|
      g.gear.send(action) if action
      log_action(str, uuid, util[uuid])
    end
  end
end
find(options) click to toggle source
# File lib/openshift-origin-node/utils/cgroups/throttler.rb, line 113
def find(options)
  apps = running_apps.clone

  cur_util = utilization
  if (usage = options[:usage])
    period = options[:period] || MonitoredGear.intervals.first
    # Find any utilization values with the correct period
    with_period = cur_util.inject({}) do |h,(k,v)|
      if (vals = (v.find{|k,v| k == period} || []).last)
        h[k] = vals
      end
      h
    end
    # Find any gears over the threshold
    over_usage = with_period.select do |k,v|
      v[:usage_percent] >= usage
    end.keys
    apps.select!{|k,v| over_usage.include?(k) }
  end

  if (state = options[:state])
    apps.select!{|k,v| v.gear.profile == state}
  end
  # Return current utilization with the apps for logging
  [apps, cur_util]
end
log_action(action, uuid, value) click to toggle source
# File lib/openshift-origin-node/utils/cgroups/throttler.rb, line 175
def log_action(action, uuid, value)
  Syslog.info("Throttler: #{action} => #{uuid} (#{value})")
end
start() click to toggle source

Loop through all lines from grep and contruct a hash TODO: Should this be moved into libcgroup?

# File lib/openshift-origin-node/utils/cgroups/throttler.rb, line 53
def start
  Thread.new do
    loop do
      tick
      sleep MonitoredGear.delay
    end
  end
end
throttle(args) click to toggle source
# File lib/openshift-origin-node/utils/cgroups/throttler.rb, line 140
def throttle(args)
  (bad_gears, cur_util) = find(args)
  # If this is our first run, make sure we find any previously throttled gears
  # NOTE: There is a corner case where we won't find non-running throttled applications
  @old_bad_gears ||= find(state: :throttled).first

  # Separate the good and bad gears
  (@old_bad_gears, good_gears) = @old_bad_gears.partition{|k,v| bad_gears.has_key?(k) }.map{|a| Hash[a] }
  # Only throttle bad gears that aren't throttled
  (@old_bad_gears, bad_gears) = bad_gears.partition{|k,v| @old_bad_gears.has_key?(k) }.map{|a| Hash[a] }

  apply_action({
    :restore => good_gears,
    :throttle => bad_gears,
    nil => @old_bad_gears
  }, cur_util)

  @old_bad_gears.merge!(bad_gears)
end
tick() click to toggle source
# File lib/openshift-origin-node/utils/cgroups/throttler.rb, line 62
def tick
  vals = Libcgroup.usage
  vals = Hash[vals.map{|uuid,hash| [uuid,hash.select{|k,v| wanted_keys.include?k}]}]

  update(vals)
end
update(vals) click to toggle source

Update our MonitoredGears based on new data

# File lib/openshift-origin-node/utils/cgroups/throttler.rb, line 70
def update(vals)
  # Synchronize this in case uuids are deleted
  @mutex.synchronize do
    _uuids = uuids
    threads = vals.select{|k,v| _uuids.include?(k) }.map do |uuid,data|
      Thread.new do
        begin
          @running_apps[uuid].update(data)
        rescue ArgumentError
          # Sometimes we enter a race condition with app creation/deletion
          # This will cause the MonitoredGear object to not be created
          # We can ignore this error and retry it next time
        end
      end
    end
    threads.map(&:join)
  end
end
utilization(apps = running_apps) click to toggle source
# File lib/openshift-origin-node/utils/cgroups/throttler.rb, line 100
def utilization(apps = running_apps)
  utilization = {}
  @mutex.synchronize do
    threads = apps.map do |k,v|
      Thread.new do
        utilization[k] = v.utilization
      end
    end
    threads.map(&:join)
  end
  Hash[utilization.sort]
end
uuids=(new_uuids) click to toggle source

Update the list of uuids Synchronize this in case we remove applications in the middle of an update

# File lib/openshift-origin-node/utils/cgroups/throttler.rb, line 91
def uuids=(new_uuids)
  @mutex.synchronize do
    # Set the uuids of running gears
    @uuids = new_uuids
    # Delete any missing gears to free up memory
    @running_apps.select!{|k,v| uuids.include?(k) }
  end
end