AnsibleがPlaybookを処理する流れを追ってみた(2)

前回のエントリで、Ansibleがplaybookを処理する部分の実行の流れをまとめた。今回は、前回まとめきれなかったRunnerクラス周りの処理をまとめる。

このエントリでまとめること

今回はRunnerクラスの実行の流れをまとめる。Runnerクラスは、moduleを設定対象ホスト上で実行する部分を担当しており、Ansibleの動作を理解するには欠かせない部分。

読んだコードは前回と同様、2014年10月30日時点の開発版。ソースの取得・実行方法については、前回のエントリを参照して下さい。

もしおかしな点や間違いがありましたら、ご指摘下さい。

Runnerクラスの実行の流れ

前回は、taskを実行する中で、Runnerクラスのrun関数が呼ばれるまでをみた。できればrun関数の実行の流れから始めたいのだが、run関数からmodule実行部分(今回特にまとめたい箇所)までは、run関数 → executor関数 → executor_internal関数 → executor_internal_inter関数 という呼び出し関係があり、辿り着くまでに1エントリ消費してしまう。なので、executor_internal_inter関数の実行の流れから始めることにする。

(ちなみに飛ばした箇所でも、multiprocessingライブラリによるタスクの並列実行、各所に散らばった変数の集約など、結構重要な処理をしています)

1. handlerのロード

Ansibleにはaction_pluginsというプラグインの仕組みがある。これを使うと、設定対象ホストでmoduleを実行する前に、自ホストで下準備を行える。_executor_internal_inter関数では、モジュールごとに最適なactionプラグインをロードし、handlerという変数に保持している。

lib/ansible/runner/init.py#L804-L823

    def _executor_internal_inner(self, host, module_name, module_args, inject, port, is_chained=False, complex_args=None):
        ''' decides how to invoke a module '''

        # late processing of parameterized sudo_user (with_items,..)
        if self.sudo_user_var is not None:
            self.sudo_user = template.template(self.basedir, self.sudo_user_var, inject)
        if self.su_user_var is not None:
            self.su_user = template.template(self.basedir, self.su_user_var, inject)

        # module_name may be dynamic (but cannot contain {{ ansible_ssh_user }})
        module_name  = template.template(self.basedir, module_name, inject)

        if module_name in utils.plugins.action_loader:
            if self.background != 0:
                raise errors.AnsibleError("async mode is not supported with the %s module" % module_name)
            handler = utils.plugins.action_loader.get(module_name, self)
        elif self.background == 0:
            handler = utils.plugins.action_loader.get('normal', self)
        else:
            handler = utils.plugins.action_loader.get('async', self)

actionプラグインmodule_nameによって決まるが、大抵はデフォルトのnormalプラグインが選ばれる。normalプラグインは特に何もしないが、例えばtemplateモジュールのプラグインはtemplateの適用をこのプラグイン内でしている。

2. 実行条件の確認

次に_executor_internal_inter関数は、実行条件の確認を行う。

lib/ansible/runner/init.py#L825-L833

        if type(self.conditional) != list:
            self.conditional = [ self.conditional ]

        for cond in self.conditional:

            if not utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
                result = utils.jsonify(dict(changed=False, skipped=True))
                self.callbacks.on_skipped(host, inject.get('item',None))
                return ReturnData(host=host, result=result)

taskにwhenキーワードが与えられていた場合に、その条件を満たしているか確認している。条件を満たさない場合は、taskの実行をskipする。

ちなみに、条件のリストを順番に確認する処理になっているので、whenキーワードには条件のリストを与えてもいいことがわかる。

3. 接続前の準備

次に、ホストに接続するためのSSH周りの準備をする。具体的には、ホスト名、ポート番号、ユーザ名などを設定する。コードは、長い割に単純な処理しかしていないので、割愛。

4. ホストに接続

次に、設定対象ホストに接続する。

lib/ansible/runner/init.py#L914-L934

        try:
            conn = self.connector.connect(actual_host, actual_port, actual_user, actual_pass, actual_transport, actual_private_key_file)
            if self.delegate_to or host != actual_host:
                conn.delegate = host

            default_shell = getattr(conn, 'default_shell', '')
            shell_type = inject.get('ansible_shell_type')
            if not shell_type:
                if default_shell:
                    shell_type = default_shell
                else:
                    shell_type = os.path.basename(C.DEFAULT_EXECUTABLE)

            shell_plugin = utils.plugins.shell_loader.get(shell_type)
            if shell_plugin is None:
                shell_plugin = utils.plugins.shell_loader.get('sh')
            conn.shell = shell_plugin

        except errors.AnsibleConnectionFailed, e:
            result = dict(failed=True, msg="FAILED: %s" % str(e))
            return ReturnData(host=host, comm_ok=False, result=result)

Ansibleにはconnection_pluginsというプラグインの仕組みがあり、ホストへの接続方法をプラグインで切り替えられる。ここでは、次に示すself.connector.connect関数が、connectionプラグインをロードし、そのプラグインのconnect関数を呼んでいる。

lib/ansible/runner/connection.py#L33-L45

    def connect(self, host, port, user, password, transport, private_key_file):
        conn = utils.plugins.connection_loader.get(transport, self.runner, host, port, user=user, password=password, private_key_file=private_key_file)
        if conn is None:
            raise AnsibleError("unsupported connection type: %s" % transport)
        if private_key_file:
            # If private key is readable by user other than owner, flag an error
            st = os.stat(private_key_file)
            if st.st_mode & (stat.S_IRGRP | stat.S_IROTH):
                raise AnsibleError("private_key_file (%s) is group-readable or world-readable and thus insecure - "
                                   "you will probably get an SSH failure"
                                   % (private_key_file,))
        self.active = conn.connect()
        return self.active

ただ、よく使われるsshプラグインのconnect関数を見てみると、この段階ではssh接続のためのコマンドラインを準備しただけで、まだ接続はしていないようだ。

ちなみにデフォルトの接続方法は"smart"だが、これはRunnerインスタンスの初期化処理の中で"ssh"か"paramiko"に変更されるため、connectionプラグインとしては存在しない。

connect関数に続いて、shell_pluginsの設定をしている。Ansibleにはshell_pluginsというプラグインの仕組みがあり、設定対象ホストに合わせたプラグインを選択することで、ホスト間のシェルコマンドの差異を吸収できる。例えばファイルを削除したい場合、shプラグインを選べばrmが、powershellプラグインを選べばRemove-Itemが得られる。

5. handlerの実行

次に、moduleの転送と実行をする。もし実行結果が指定した条件を満たさない場合、一定の間隔をおいてリトライする。このリトライ機能については次のURL参照。

http://docs.ansible.com/playbooks_loops.html#do-until-loops

lib/ansible/runner/init.py#L985-L1015

        result = handler.run(conn, tmp, module_name, module_args, inject, complex_args)
        # Code for do until feature
        until = self.module_vars.get('until', None)
        if until is not None and result.comm_ok:
            inject[self.module_vars.get('register')] = result.result

            cond = template.template(self.basedir, until, inject, expand_lists=False)
            if not utils.check_conditional(cond,  self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
                retries = self.module_vars.get('retries')
                delay   = self.module_vars.get('delay')
                for x in range(1, int(retries) + 1):
                    # template the delay, cast to float and sleep
                    delay = template.template(self.basedir, delay, inject, expand_lists=False)
                    delay = float(delay)
                    time.sleep(delay)
                    tmp = ''
                    if self._early_needs_tmp_path(module_name, handler):
                        tmp = self._make_tmp_path(conn)
                    result = handler.run(conn, tmp, module_name, module_args, inject, complex_args)
                    result.result['attempts'] = x
                    vv("Result from run %i is: %s" % (x, result.result))
                    inject[self.module_vars.get('register')] = result.result
                    cond = template.template(self.basedir, until, inject, expand_lists=False)
                    if utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
                        break
                if result.result['attempts'] == retries and not utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars):
                    result.result['failed'] = True
                    result.result['msg'] = "Task failed as maximum retries was encountered"
            else:
                result.result['attempts'] = 0
        conn.close()

handler.run関数内の処理はactionプラグインによるが、デフォルトのnormalプラグインの場合、Runnerクラスの_execute_module()関数を呼んでいるだけ。

_execute_module()関数は、実際にmoduleの転送と実行をしている。その処理を5-1から5-4にまとめる。

5-1. moduleのロードと書き換え

_execute_module()関数はまず、moduleのロードと書き換えをする。

lib/ansible/runner/init.py#L445-L460

    def _execute_module(self, conn, tmp, module_name, args,
        async_jid=None, async_module=None, async_limit=None, inject=None, persist_files=False, complex_args=None, delete_remote_tmp=True):

        ''' transfer and run a module along with its arguments on the remote side'''

        # hack to support fireball mode
        if module_name == 'fireball':
            args = "%s password=%s" % (args, base64.b64encode(str(utils.key_for_hostname(conn.host))))
            if 'port' not in args:
                args += " port=%s" % C.ZEROMQ_PORT

        (
        module_style,
        shebang,
        module_data
        ) = self._configure_module(conn, module_name, args, inject, complex_args)

_configure_module関数は次のような実装になっている。

lib/ansible/runner/init.py#L1247-L1266

    def _configure_module(self, conn, module_name, module_args, inject, complex_args=None):
        ''' find module and configure it '''

        # Search module path(s) for named module.
        module_suffixes = getattr(conn, 'default_suffixes', None)
        module_path = utils.plugins.module_finder.find_plugin(module_name, module_suffixes, transport=self.transport)
        if module_path is None:
            module_path2 = utils.plugins.module_finder.find_plugin('ping', module_suffixes)
            if module_path2 is not None:
                raise errors.AnsibleFileNotFound("module %s not found in configured module paths" % (module_name))
            else:
                raise errors.AnsibleFileNotFound("module %s not found in configured module paths.  Additionally, core modules are missing. If this is a checkout, run 'git submodule update --init --recursive' to correct this problem." % (module_name))


        # insert shared code and arguments into the module
        (module_data, module_style, module_shebang) = module_replacer.modify_module(
            module_path, complex_args, module_args, inject
        )

        return (module_style, module_shebang, module_data)

まず、utils.plugins.module_finder.find_plugin関数でmoduleをロードしている。ロードには、プラグインをロードするときと仕組みを使っている。

次に、module_replacer.modify_module関数でmoduleを書き換えている。

lib/ansible/module_common.py#L142-L170

    def modify_module(self, module_path, complex_args, module_args, inject):

        with open(module_path) as f:

            # read in the module source
            module_data = f.read()

            (module_data, module_style) = self._find_snippet_imports(module_data, module_path)

            complex_args_json = utils.jsonify(complex_args)
            # We force conversion of module_args to str because module_common calls shlex.split,
            # a standard library function that incorrectly handles Unicode input before Python 2.7.3.
            # Note: it would be better to do all this conversion at the border
            # (when the data is originally parsed into data structures) but
            # it's currently coming from too many sources to make that
            # effective.
            try:
                encoded_args = repr(module_args.encode('utf-8'))
            except UnicodeDecodeError:
                encoded_args = repr(module_args)
            try:
                encoded_complex = repr(complex_args_json.encode('utf-8'))
            except UnicodeDecodeError:
                encoded_complex = repr(complex_args_json.encode('utf-8'))

            # these strings should be part of the 'basic' snippet which is required to be included
            module_data = module_data.replace(REPLACER_VERSION, repr(__version__))
            module_data = module_data.replace(REPLACER_ARGS, encoded_args)
            module_data = module_data.replace(REPLACER_COMPLEX, encoded_complex)

この関数では、転送の効率化のために、moduleに書かれたansible.module_utilsパッケージのimport文を、import先のスクリプトで差し替える処理をしている(self._find_snippet_imports関数の中)。この差し替えにより、ホストに転送するファイル数を減らしている。

さらに、moduleの引数をmoduleに埋め込む処理をしている。引数にはmodule_argscomplex_argsがある。module_argsには、単純なkey=valueで表現できる引数が入る。complex_argsには、リストや辞書をつかった複雑な引数がJSONで入る。ここではその両方を、moduleに埋め込んでいる。

5-2. 一時ディレクトリの作成

次に_execute_module()関数は、moduleの転送先となる一時ディレクトリを作成する。

lib/ansible/runner/init.py#L462-L466

        # a remote tmp path may be necessary and not already created
        if self._late_needs_tmp_path(conn, tmp, module_style):
            tmp = self._make_tmp_path(conn)

        remote_module_path = conn.shell.join_path(tmp, module_name)

lib/ansible/runner/init.py#L1182-L1194

    def _make_tmp_path(self, conn):
        ''' make and return a temporary path on a remote box '''
        basefile = 'ansible-tmp-%s-%s' % (time.time(), random.randint(0, 2**48))
        use_system_tmp = False
        if (self.sudo and self.sudo_user != 'root') or (self.su and self.su_user != 'root'):
            use_system_tmp = True

        tmp_mode = None
        if self.remote_user != 'root' or ((self.sudo and self.sudo_user != 'root') or (self.su and self.su_user != 'root')):
            tmp_mode = 'a+rx'

        cmd = conn.shell.mkdtemp(basefile, use_system_tmp, tmp_mode)
        result = self._low_level_exec_command(conn, cmd, None, sudoable=False)

conn.shell.mkdtemp関数でディレクトリを作るためのシェルコマンドを作成した後、low_level_exec_command関数でコマンドを実行している。low_level_exec_command関数は、(sshによる接続が選ばれていれば)sshプラグインのexec_command関数を呼び出す。exec_command関数は、sshコマンドの末尾に実行したいコマンドをappendして、sshを実行する。

ちなみに、ansible-playbookにvvvオプションをつけて実行すると、大抵は 1.EXECから始まる行 2.PUTから始まる行 3.EXECから始まる行 が出力される。このうち、1のログは、一時ディレクトリの作成に伴うもの。

5-3. moduleの転送

次に、_execute_module()関数は、作成した一時ディレクトリに対して、実行するmoduleをホストに転送する。

lib/ansible/runner/init.py#L468-L474

        if (module_style != 'new'
           or async_jid is not None
           or not conn.has_pipelining
           or not C.ANSIBLE_SSH_PIPELINING
           or C.DEFAULT_KEEP_REMOTE_FILES
           or self.su):
            self._transfer_str(conn, tmp, module_name, module_data)

lib/ansible/runner/init.py#L279-L304

    def _transfer_str(self, conn, tmp, name, data):
        ''' transfer string to remote file '''

        if type(data) == dict:
            data = utils.jsonify(data)

        afd, afile = tempfile.mkstemp()
        afo = os.fdopen(afd, 'w')
        try:
            if not isinstance(data, unicode):
                #ensure the data is valid UTF-8
                data.decode('utf-8')
            else:
                data = data.encode('utf-8')
            afo.write(data)
        except:
            raise errors.AnsibleError("failure encoding into utf-8")
        afo.flush()
        afo.close()

        remote = conn.shell.join_path(tmp, name)
        try:
            conn.put_file(afile, remote)
        finally:
            os.unlink(afile)
        return remote

moduleを一度 自ホストの一時ファイルに書き込んだ後、conn.put_file関数で転送している。デフォルトだとsftpで転送するようだ。

ちなみに上述したvvvオプションのログのうち、PUTから始まる行のログは、ここの処理に伴うもの。

5-4. moduleの実行

転送したmoduleを実行する。

lib/ansible/runner/init.py#L532-L544

        cmd = conn.shell.build_module_command(environment_string, shebang, cmd, rm_tmp)
        cmd = cmd.strip()

        sudoable = True
        if module_name == "accelerate":
            # always run the accelerate module as the user
            # specified in the play, not the sudo_user
            sudoable = False

        if self.su:
            res = self._low_level_exec_command(conn, cmd, tmp, su=True, in_data=in_data)
        else:
            res = self._low_level_exec_command(conn, cmd, tmp, sudoable=sudoable, in_data=in_data)

lib/ansible/runner/shell_plugins/sh.py#L82-L87

    def build_module_command(self, env_string, shebang, cmd, rm_tmp=None):
        cmd_parts = [env_string.strip(), shebang.replace("#!", "").strip(), cmd]
        new_cmd = " ".join(cmd_parts)
        if rm_tmp:
            new_cmd = '%s; rm -rf %s >/dev/null 2>&1' % (new_cmd, rm_tmp)
        return new_cmd

まず、conn.shell.build_module_command関数で、実行したいコマンドの前後に環境変数、sudo周りの処理やmoduleの削除処理を加える。次に、一時ディレクトリの作成でもでてきた_low_level_exec_command関数で、コマンドを実行している。一時ディレクトリの時と同様に、sshコマンドの末尾に実行したいコマンドがappendされ、sshが実行される。

6. 実行結果の確認

最後に、_executor_internal_inner関数は実行結果を確認する。雑多な処理を含むのでコードは割愛するが、大体次のような処理をしている。

  • 通信エラー(あるいはパースエラー)の確認。該当するエラーがあった場合、on_unreachable関数がコールバックとして呼ばれる。

  • failed_whenの確認。Ansibleは、タスクをfailedにする条件を自分で指定できる。その条件を満たしているかを確認する。

  • 実行の成否の確認。実行が失敗していた場合、on_failed関数がコールバックとして呼ばれる。成功していた場合は、on_ok関数。

まとめと感想

このエントリでは、Runnerクラスの処理のうち、特に_executor_internal_inter関数周りをまとめた。個人的にはプラグインの仕組みを複数用意して、接続方法や環境の差異をうまく吸収していたのが面白かった。他にも面白い箇所はあったが、今回はまとめきれなかった。そのあたりは、またいつか。