AnsibleがPlaybookを処理する流れを追ってみた

最近、サーバの構成管理ツールAnsibleのコードを読む機会があった。忘れる前にメモしたことをまとめておく。

このエントリでまとめること

このエントリでは、Ansibleのplaybookを処理する部分の実行の流れをまとめる。playbookを一度でも書いたことのある人なら、大体分かる内容だと思う。

読んだコードは、2014年10月30日時点の開発版。次のURLを参照してソースを取得後、コミットID 7cd5b13e34270dd5be79269a0b88c8c408c18663 でチェックアウトする。ちなみにソースからの実行方法も同じURLに書いてある。(コードを貼るときはgithubへのリンクを付けるので、そちらを参照していただいてもOKです)

http://docs.ansible.com/intro_installation.html#running-from-source

もしおかしな点や間違いがありましたら、ご指摘をお願いします。

ansible-playbookコマンドの実行の流れ

playbookを処理するときは、binディレクトリにあるansible-playbookコマンドをつかう。そこで、ansible-playbookコマンドを切り口に見ていくことにする(ただし、オプションの処理や実行結果の表示など、あまり重要でない部分は省略)。

1. Inventoryクラスのインスタンスを作成

まずは、ansible-playbookのiオプションで与えられたinventoryをもとに、Inventoryクラスのインスタンスを作成する。

bin/ansible-playbook#L160

    inventory = ansible.inventory.Inventory(options.inventory, vault_password=vault_pass)

Inventoryクラスは、inventoryに記述されたホストやグループなどの情報を保持する。

lib/ansible/inventory/init.py#L34-L41

class Inventory(object):
    """
    Host inventory for ansible.
    """

    __slots__ = [ 'host_list', 'groups', '_restriction', '_also_restriction', '_subset', 
                  'parser', '_vars_per_host', '_vars_per_group', '_hosts_cache', '_groups_list',
                  '_pattern_cache', '_vault_password', '_vars_plugins', '_playbook_basedir']

Inventoryクラスの__init__関数では、inventoryファイルのパースをする。

lib/ansible/inventory/init.py#L97-L135

                # check to see if the specified file starts with a
                # shebang (#!/), so if an error is raised by the parser
                # class we can show a more apropos error
                shebang_present = False
                try:
                    inv_file = open(host_list)
                    first_line = inv_file.readlines()[0]
                    inv_file.close()
                    if first_line.startswith('#!'):
                        shebang_present = True
                except:
                    pass

                if utils.is_executable(host_list):
                    try:
                        self.parser = InventoryScript(filename=host_list)
                        self.groups = self.parser.groups.values()
                    except:
                        if not shebang_present:
                            raise errors.AnsibleError("The file %s is marked as executable, but failed to execute correctly. " % host_list + \
                                                      "If this is not supposed to be an executable script, correct this with `chmod -x %s`." % host_list)
                        else:
                            raise
                else:
                    try:
                        self.parser = InventoryParser(filename=host_list)
                        self.groups = self.parser.groups.values()
                    except:
                        if shebang_present:
                            raise errors.AnsibleError("The file %s looks like it should be an executable inventory script, but is not marked executable. " % host_list + \
                                                      "Perhaps you want to correct this with `chmod +x %s`?" % host_list)
                        else:
                            raise

ファイルの実行権限を確認し、権限があればファイルを実行し、その結果をパースする。実行権限がなければ、そのままパースする。shebangの確認もしているが、ここで実行するかどうかを決めているわけではないようだ。

2. Playbookクラスのインスタンスを作成

次に、ansible-playbookコマンドの引数で与えられたplaybookファイルをもとに、Playbookクラスのインスタンスを作成する。playbookファイルだけでなく、inventoryインスタンス、ユーザ情報、変数情報など種々の情報を渡している。

bin/ansible-playbook#L176-L202

        pb = ansible.playbook.PlayBook(
            playbook=playbook,
            module_path=options.module_path,
            inventory=inventory,
            forks=options.forks,
            remote_user=options.remote_user,
            remote_pass=sshpass,
            callbacks=playbook_cb,
            runner_callbacks=runner_cb,
            stats=stats,
            timeout=options.timeout,
            transport=options.connection,
            sudo=options.sudo,
            sudo_user=options.sudo_user,
            sudo_pass=sudopass,
            extra_vars=extra_vars,
            private_key_file=options.private_key_file,
            only_tags=only_tags,
            skip_tags=skip_tags,
            check=options.check,
            diff=options.diff,
            su=options.su,
            su_pass=su_pass,
            su_user=options.su_user,
            vault_password=vault_pass,
            force_handlers=options.force_handlers
        )

Playbookクラスの__init__関数では、引数で与えられた変数をインスタンス変数に設定した後、_load_playbook_from_file関数を呼ぶ。この関数は、playbookの内容をplayのリストにする。ちなみにplayはタスクの集合で、通常はplayごとにタスクの実行対象ホストを指定する。playbookのトップレベルのデータ構造はリストだが、そのリストの個々の要素がplayに相当する。

lib/ansible/playbook/init.py#L255-L300

    def _load_playbook_from_file(self, path, vars={}, vars_files=[]):
        '''
        run top level error checking on playbooks and allow them to include other playbooks.
        '''

        playbook_data = utils.parse_yaml_from_file(path, vault_password=self.vault_password)
        accumulated_plays = []
        play_basedirs = []

        if type(playbook_data) != list:
            raise errors.AnsibleError("parse error: playbooks must be formatted as a YAML list, got %s" % type(playbook_data))

        basedir = os.path.dirname(path) or '.'
        utils.plugins.push_basedir(basedir)
        for play in playbook_data:
            if type(play) != dict:
                raise errors.AnsibleError("parse error: each play in a playbook must be a YAML dictionary (hash), received: %s" % play)

            if 'include' in play:
                # a playbook (list of plays) decided to include some other list of plays
                # from another file.  The result is a flat list of plays in the end.

                play_vars = self._get_playbook_vars(play, vars)
                play_vars_files = self._get_playbook_vars_files(play, vars_files)
                inc_vars, inc_path = self._get_include_info(play, basedir, play_vars)
                play_vars.update(inc_vars)

                included_path = utils.path_dwim(basedir, template(basedir, inc_path, play_vars))
                (plays, basedirs) = self._load_playbook_from_file(included_path, vars=play_vars, vars_files=play_vars_files)
                for p in plays:
                    # support for parameterized play includes works by passing
                    # those variables along to the subservient play
                    p['vars'] = self._extend_play_vars(p, play_vars)
                    # now add in the vars_files
                    p['vars_files'] = utils.list_union(p.get('vars_files', []), play_vars_files)

                accumulated_plays.extend(plays)
                play_basedirs.extend(basedirs)

            else:

                # this is a normal (non-included play)
                accumulated_plays.append(play)
                play_basedirs.append(basedir)

        return (accumulated_plays, play_basedirs)

playbookの内容をパースして、トップレベルのデータ構造がリストであることを確認した後、playのリスト(accumulated_plays)を作成している。

if 'include' in play:の箇所は、playレベルでplaybookをincludeしている場合の処理をしている。まずは、includeされたplaybook内のvarsまたはvars_filesキーワードの値を、include元にマージ。それから、playbook内のtaskのリストも、include元にマージしている。

3. Playbookの実行

最後に、Playbookを実行する。

bin/ansible-playbook#L264

            pb.run()

Playbookの実行は、PlayクラスとTaskクラスのインスタンス作成 → PlayクラスとTaskクラスの実行、という流れで進む。

3-1. PlayクラスとTaskクラスのインスタンス作成

run関数では、まず、Playbookクラスの保持するplayのリストを、Playクラスのリストにする。Playbookクラスがもつplayのデータは、まだYAMLをロードしただけの状態なので、ここでPlayクラスのインスタンスのリストにする。

lib/ansible/playbook/init.py#L304-L328

    def run(self):
        ''' run all patterns in the playbook '''
        plays = []
        matched_tags_all = set()
        unmatched_tags_all = set()

        # loop through all patterns and run them
        self.callbacks.on_start()
        for (play_ds, play_basedir) in zip(self.playbook, self.play_basedirs):
            play = Play(self, play_ds, play_basedir, vault_password=self.vault_password)
            assert play is not None

            matched_tags, unmatched_tags = play.compare_tags(self.only_tags)
            matched_tags_all = matched_tags_all | matched_tags
            unmatched_tags_all = unmatched_tags_all | unmatched_tags

            # Remove tasks we wish to skip
            matched_tags = matched_tags - set(self.skip_tags)

            # if we have matched_tags, the play must be run.
            # if the play contains no tasks, assume we just want to gather facts
            # in this case there are actually 3 meta tasks (handler flushes) not 0
            # tasks, so that's why there's a check against 3
            if (len(matched_tags) > 0 or len(play.tasks()) == 3):
                plays.append(play)

tags周りの処理(matched_tagsとか扱っているところ)は、実行するtaskをtagsで絞るコマンドオプションに対応している。オプションで与えられたtagsと、play内のtasksのtagsが1つでもマッチすれば、playをPlayクラスのリストに追加する。

Playクラスは、playの中で定義されたhostsやtasksなどの情報を保持する。

lib/ansible/playbook/play.py#L33-L42

class Play(object):

    __slots__ = [
       'hosts', 'name', 'vars', 'default_vars', 'vars_prompt', 'vars_files',
       'handlers', 'remote_user', 'remote_port', 'included_roles', 'accelerate',
       'accelerate_port', 'accelerate_ipv6', 'sudo', 'sudo_user', 'transport', 'playbook',
       'tags', 'gather_facts', 'serial', '_ds', '_handlers', '_tasks',
       'basedir', 'any_errors_fatal', 'roles', 'max_fail_pct', '_play_hosts', 'su', 'su_user',
       'vault_password', 'no_log',
    ]

Playクラスの__init__関数の中で、play内のtaskのリストは、Taskクラスのインスタンスのリストに変換される。Taskクラスは、モジュール名やモジュールの引数などの情報を保持する。

lib/ansible/playbook/task.py#L25-L36

class Task(object):

    __slots__ = [
        'name', 'meta', 'action', 'when', 'async_seconds', 'async_poll_interval',
        'notify', 'module_name', 'module_args', 'module_vars', 'default_vars',
        'play', 'notified_by', 'tags', 'register', 'role_name',
        'delegate_to', 'first_available_file', 'ignore_errors',
        'local_action', 'transport', 'sudo', 'remote_user', 'sudo_user', 'sudo_pass',
        'items_lookup_plugin', 'items_lookup_terms', 'environment', 'args',
        'any_errors_fatal', 'changed_when', 'failed_when', 'always_run', 'delay', 'retries', 'until',
        'su', 'su_user', 'su_pass', 'no_log', 'run_once',
    ]

ちなみにplay内のロールは、Playクラスのインスタンスを作成する段階でtaskのリストにされ、play内のtaskのリストにマージされる。Roleクラスがあるわけではない。

3-2. PlayクラスとTaskクラスの実行

続いてrun関数は、playbook内のPlayクラスのリストを順番に実行する。

lib/ansible/playbook/init.py#L344-L348

        for play in plays:
            ansible.callbacks.set_play(self.callbacks, play)
            ansible.callbacks.set_play(self.runner_callbacks, play)
            if not self._run_play(play):
                break

_run_play関数では、まず、playのhostsキーワードの値と、inventoryに記述したホストあるいはグループとのマッチングが行われ、設定対象ホストのリストが作られる。

lib/ansible/playbook/init.py#L655-L664

    def _run_play(self, play):
        ''' run a list of tasks for a given pattern, in order '''

        self.callbacks.on_play_start(play.name)
        # Get the hosts for this play
        play._play_hosts = self.inventory.list_hosts(play.hosts)
        # if no hosts matches this play, drop out
        if not play._play_hosts:
            self.callbacks.on_no_hosts_matched()
            return True

lib/ansible/inventory/init.py#L489-L496

    def list_hosts(self, pattern="all"):

        """ return a list of hostnames for a pattern """

        result = [ h.name for h in self.get_hosts(pattern) ]
        if len(result) == 0 and pattern in ["localhost", "127.0.0.1"]:
            result = [pattern]
        return result

run_play関数は、次に、Playクラスに紐付いたTaskクラスのリストを順番に実行する。handlersを強制的に実行すべきかの判定、taskのtagsとコマンドのオプションで与えられたtagsをマッチングしてtaskを実行すべきか判定した後、taskを実行すべき場合は、run_task関数で実行する。

lib/ansible/playbook/init.py#L707-L737

            for task in play.tasks():

                if task.meta is not None:
                    # meta tasks can force handlers to run mid-play
                    if task.meta == 'flush_handlers':
                        self.run_handlers(play)

                    # skip calling the handler till the play is finished
                    continue

                # only run the task if the requested tags match
                should_run = False
                for x in self.only_tags:

                    for y in task.tags:
                        if x == y:
                            should_run = True
                            break

                # Check for tags that we need to skip
                if should_run:
                    if any(x in task.tags for x in self.skip_tags):
                        should_run = False

                if should_run:

                    if not self._run_task(play, task, False):
                        # whether no hosts matched is fatal or not depends if it was on the initial step.
                        # if we got exactly no hosts on the first step (setup!) then the host group
                        # just didn't match anything and that's ok
                        return False

_run_task関数は、taskを実行していいかを、上述した条件とは別の条件で確認し、taskを実行する。

lib/ansible/playbook/init.py#L459-L481

    def _run_task(self, play, task, is_handler):
        ''' run a single task in the playbook and recursively run any subtasks.  '''

        ansible.callbacks.set_task(self.callbacks, task)
        ansible.callbacks.set_task(self.runner_callbacks, task)

        if task.role_name:
            name = '%s | %s' % (task.role_name, task.name)
        else:
            name = task.name

        self.callbacks.on_task_start(template(play.basedir, name, task.module_vars, lookup_fatal=False, filter_fatal=False), is_handler)
        if hasattr(self.callbacks, 'skip_task') and self.callbacks.skip_task:
            ansible.callbacks.set_task(self.callbacks, None)
            ansible.callbacks.set_task(self.runner_callbacks, None)
            return True

        # template ignore_errors
        cond = template(play.basedir, task.ignore_errors, task.module_vars, expand_lists=False)
        task.ignore_errors =  utils.check_conditional(cond, play.basedir, task.module_vars, fail_on_undefined=C.DEFAULT_UNDEFINED_VAR_BEHAVIOR)

        # load up an appropriate ansible runner to run the task in parallel
        results = self._run_task_internal(task)

_run_task関数はまず、PlaybookCallbacksクラスのインスタンスとPlaybookRunnerCallbacksクラスのインスタンスに、taskを設定している。これらのクラスは、それぞれPlaybookの実行と後述するRunnerの実行の要所々々で呼ばれるコールバックをまとめたもの。

次に、早速PlaybookCallbacksクラスのon_task_startコールバックを呼ぶ。このコールバック中では、ansible-playbookの--stepオプションおよび--start-at-taskオプションに基づいて、taskを実行すべきかを判定している。もし実行すべきでないと判定された場合、skip_taskフラグが立つので、if hasattr(self.callbacks, 'skip_task') and self.callbacks.skip_task:の条件文によって実行がスキップされる。

それから、_run_task_internal関数を呼ぶ。

lib/ansible/playbook/init.py#L384-L445

    def _run_task_internal(self, task):
        ''' run a particular module step in a playbook '''

        hosts = self._trim_unavailable_hosts(self.inventory.list_hosts(task.play._play_hosts))
        self.inventory.restrict_to(hosts)

        runner = ansible.runner.Runner(
            pattern=task.play.hosts,
            inventory=self.inventory,
            module_name=task.module_name,
            module_args=task.module_args,
            forks=self.forks,
            remote_pass=self.remote_pass,
            module_path=self.module_path,
            timeout=self.timeout,
            remote_user=task.remote_user,
            remote_port=task.play.remote_port,
            module_vars=task.module_vars,
            default_vars=task.default_vars,
            extra_vars=self.extra_vars,
            private_key_file=self.private_key_file,
            setup_cache=self.SETUP_CACHE,
            vars_cache=self.VARS_CACHE,
            basedir=task.play.basedir,
            conditional=task.when,
            callbacks=self.runner_callbacks,
            sudo=task.sudo,
            sudo_user=task.sudo_user,
            transport=task.transport,
            sudo_pass=task.sudo_pass,
            is_playbook=True,
            check=self.check,
            diff=self.diff,
            environment=task.environment,
            complex_args=task.args,
            accelerate=task.play.accelerate,
            accelerate_port=task.play.accelerate_port,
            accelerate_ipv6=task.play.accelerate_ipv6,
            error_on_undefined_vars=C.DEFAULT_UNDEFINED_VAR_BEHAVIOR,
            su=task.su,
            su_user=task.su_user,
            su_pass=task.su_pass,
            vault_pass = self.vault_password,
            run_hosts=hosts,
            no_log=task.no_log,
            run_once=task.run_once,
        )

        runner.module_vars.update({'play_hosts': hosts})
        runner.module_vars.update({'ansible_version': self._ansible_version})

        if task.async_seconds == 0:
            results = runner.run()
        else:
            results, poller = runner.run_async(task.async_seconds)
            self.stats.compute(results)
            if task.async_poll_interval > 0:
                # if not polling, playbook requested fire and forget, so don't poll
                results = self._async_poll(poller, task.async_seconds, task.async_poll_interval)
            else:
                for (host, res) in results.get('contacted', {}).iteritems():
                    self.runner_callbacks.on_async_ok(host, res, poller.runner.vars_cache[host]['ansible_job_id'])

_run_task_internal関数では、Runnerクラスの作成と実行がされている。特に実行部分は、moduleをホストに転送し、ホスト上で実行する処理をしている。

task.async_secondsの値によって、taskを同期実行するか非同期実行するかを切り替えている。task.async_secondsの値は、taskのasyncキーワードの値が反映される。taskの非同期実行については、次のURL参照。

http://docs.ansible.com/playbooks_async.html

Runnerクラスの実行の流れもまとめたいところだが、大分長くなってしまったので、今日はここまでにする。

追記:Runnerクラスについてもまとめました → AnsibleがPlaybookを処理する流れを追ってみた(2) - ks888の日記