HEX
Server: Apache/2.4.6 () PHP/7.4.33
System: Linux chile-dev-app-1 5.4.17-2136.315.5.el7uek.x86_64 #2 SMP Wed Dec 21 19:57:57 PST 2022 x86_64
User: apache (48)
PHP: 7.4.33
Disabled: NONE
Upload Files
File: //lib/python2.7/site-packages/cloudinit/event.py
# This file is part of cloud-init. See LICENSE file for license information.

"""Classes and functions related to event handling."""

import copy

from cloudinit import log as logging
from cloudinit import util


LOG = logging.getLogger(__name__)


# Event types which can generate maintenance requests for cloud-init.
class EventType(object):
    BOOT = "System boot"
    BOOT_NEW_INSTANCE = "New instance first boot"
    HOTPLUG = "Hotplug add event"

    # TODO: Cloud-init will grow support for the follow event types:
    # METADATA_CHANGE
    # USER_REQUEST


EventTypeMap = {
    'boot': EventType.BOOT,
    'boot-new-instance': EventType.BOOT_NEW_INSTANCE,
    'hotplug': EventType.HOTPLUG,
}


# inverted mapping
iteritems = lambda d: ((hasattr(d, 'iteritems') and d.iteritems) or d.items)()
EventNameMap = {}
for k, v in iteritems(EventTypeMap):
    EventNameMap[v] = k


def get_allowed_events(sys_events, ds_events):
    '''Merge datasource capabilties with system config to determine which
       update events are allowed.'''

    # updates:
    #   policy-version: 1
    #   network:
    #     when: [boot-new-instance, boot, hotplug]
    #   storage:
    #     when: [boot-new-instance, hotplug]
    #     watch: http://169.254.169.254/metadata/storage_config/

    LOG.debug('updates: system   cfg: %s', sys_events)
    LOG.debug('updates: datasrc caps: %s', ds_events)

    events = {}
    for etype in [scope for scope, val in sys_events.items()
                  if type(val) == dict and 'when' in val]:
        ds_ecap = ds_events.get(etype, {}).get('when', [])
        LOG.debug('updates: datasrc %s cap: %s', etype, ds_ecap)
        events[etype] = (
            set([EventTypeMap.get(evt)
                 for evt in sys_events.get(etype, {}).get('when', [])
                 if evt in EventTypeMap and evt in ds_ecap]))

    LOG.debug('updates: allowed events: %s', events)
    return events


def get_update_events_config(update_events):
    '''Return a dictionary of updates config'''
    evt_cfg = {'policy-version': 1}
    for scope, events in update_events.items():
        evt_cfg[scope] = {'when': [EventNameMap[evt] for evt in events]}

    return evt_cfg

# vi: ts=4 expandtab