AnsibleがPlaybookを処理する流れを追ってみた(2)
前回のエントリで、Ansibleがplaybookを処理する部分の実行の流れをまとめた。今回は、前回まとめきれなかったRunnerクラス周りの処理をまとめる。
このエントリでまとめること
今回はRunnerクラスの実行の流れをまとめる。Runnerクラスは、moduleを設定対象ホスト上で実行する部分を担当しており、Ansibleの動作を理解するには欠かせない部分。
読んだコードは前回と同様、2014年10月30日時点の開発版。ソースの取得・実行方法については、前回のエントリを参照して下さい。
もしおかしな点や間違いがありましたら、ご指摘下さい。
Runnerクラスの実行の流れ
前回は、taskを実行する中で、Runnerクラスのrun関数が呼ばれるまでをみた。できればrun関数の実行の流れから始めたいのだが、run関数からmodule実行部分(今回特にまとめたい箇所)までは、run関数 → executor関数 → executor_internal関数 → executor_internal_inter関数 という呼び出し関係があり、辿り着くまでに1エントリ消費してしまう。なので、executor_internal_inter関数の実行の流れから始めることにする。
(ちなみに飛ばした箇所でも、multiprocessingライブラリによるタスクの並列実行、各所に散らばった変数の集約など、結構重要な処理をしています)
1. handlerのロード
Ansibleにはaction_pluginsというプラグインの仕組みがある。これを使うと、設定対象ホストでmoduleを実行する前に、自ホストで下準備を行える。_executor_internal_inter関数では、モジュールごとに最適なactionプラグインをロードし、handlerという変数に保持している。
lib/ansible/runner/init.py#L804-L823
def _executor_internal_inner(self, host, module_name, module_args, inject, port, is_chained=False, complex_args=None): ''' decides how to invoke a module ''' # late processing of parameterized sudo_user (with_items,..) if self.sudo_user_var is not None: self.sudo_user = template.template(self.basedir, self.sudo_user_var, inject) if self.su_user_var is not None: self.su_user = template.template(self.basedir, self.su_user_var, inject) # module_name may be dynamic (but cannot contain {{ ansible_ssh_user }}) module_name = template.template(self.basedir, module_name, inject) if module_name in utils.plugins.action_loader: if self.background != 0: raise errors.AnsibleError("async mode is not supported with the %s module" % module_name) handler = utils.plugins.action_loader.get(module_name, self) elif self.background == 0: handler = utils.plugins.action_loader.get('normal', self) else: handler = utils.plugins.action_loader.get('async', self)
actionプラグインはmodule_name
によって決まるが、大抵はデフォルトのnormalプラグインが選ばれる。normalプラグインは特に何もしないが、例えばtemplateモジュールのプラグインはtemplateの適用をこのプラグイン内でしている。
2. 実行条件の確認
次に_executor_internal_inter関数は、実行条件の確認を行う。
lib/ansible/runner/init.py#L825-L833
if type(self.conditional) != list: self.conditional = [ self.conditional ] for cond in self.conditional: if not utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars): result = utils.jsonify(dict(changed=False, skipped=True)) self.callbacks.on_skipped(host, inject.get('item',None)) return ReturnData(host=host, result=result)
taskにwhenキーワードが与えられていた場合に、その条件を満たしているか確認している。条件を満たさない場合は、taskの実行をskipする。
ちなみに、条件のリストを順番に確認する処理になっているので、whenキーワードには条件のリストを与えてもいいことがわかる。
3. 接続前の準備
次に、ホストに接続するためのSSH周りの準備をする。具体的には、ホスト名、ポート番号、ユーザ名などを設定する。コードは、長い割に単純な処理しかしていないので、割愛。
4. ホストに接続
次に、設定対象ホストに接続する。
lib/ansible/runner/init.py#L914-L934
try: conn = self.connector.connect(actual_host, actual_port, actual_user, actual_pass, actual_transport, actual_private_key_file) if self.delegate_to or host != actual_host: conn.delegate = host default_shell = getattr(conn, 'default_shell', '') shell_type = inject.get('ansible_shell_type') if not shell_type: if default_shell: shell_type = default_shell else: shell_type = os.path.basename(C.DEFAULT_EXECUTABLE) shell_plugin = utils.plugins.shell_loader.get(shell_type) if shell_plugin is None: shell_plugin = utils.plugins.shell_loader.get('sh') conn.shell = shell_plugin except errors.AnsibleConnectionFailed, e: result = dict(failed=True, msg="FAILED: %s" % str(e)) return ReturnData(host=host, comm_ok=False, result=result)
Ansibleにはconnection_pluginsというプラグインの仕組みがあり、ホストへの接続方法をプラグインで切り替えられる。ここでは、次に示すself.connector.connect関数が、connectionプラグインをロードし、そのプラグインのconnect関数を呼んでいる。
lib/ansible/runner/connection.py#L33-L45
def connect(self, host, port, user, password, transport, private_key_file): conn = utils.plugins.connection_loader.get(transport, self.runner, host, port, user=user, password=password, private_key_file=private_key_file) if conn is None: raise AnsibleError("unsupported connection type: %s" % transport) if private_key_file: # If private key is readable by user other than owner, flag an error st = os.stat(private_key_file) if st.st_mode & (stat.S_IRGRP | stat.S_IROTH): raise AnsibleError("private_key_file (%s) is group-readable or world-readable and thus insecure - " "you will probably get an SSH failure" % (private_key_file,)) self.active = conn.connect() return self.active
ただ、よく使われるsshプラグインのconnect関数を見てみると、この段階ではssh接続のためのコマンドラインを準備しただけで、まだ接続はしていないようだ。
ちなみにデフォルトの接続方法は"smart"だが、これはRunnerインスタンスの初期化処理の中で"ssh"か"paramiko"に変更されるため、connectionプラグインとしては存在しない。
connect関数に続いて、shell_pluginsの設定をしている。Ansibleにはshell_pluginsというプラグインの仕組みがあり、設定対象ホストに合わせたプラグインを選択することで、ホスト間のシェルコマンドの差異を吸収できる。例えばファイルを削除したい場合、shプラグインを選べばrmが、powershellプラグインを選べばRemove-Itemが得られる。
5. handlerの実行
次に、moduleの転送と実行をする。もし実行結果が指定した条件を満たさない場合、一定の間隔をおいてリトライする。このリトライ機能については次のURL参照。
http://docs.ansible.com/playbooks_loops.html#do-until-loops
lib/ansible/runner/init.py#L985-L1015
result = handler.run(conn, tmp, module_name, module_args, inject, complex_args) # Code for do until feature until = self.module_vars.get('until', None) if until is not None and result.comm_ok: inject[self.module_vars.get('register')] = result.result cond = template.template(self.basedir, until, inject, expand_lists=False) if not utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars): retries = self.module_vars.get('retries') delay = self.module_vars.get('delay') for x in range(1, int(retries) + 1): # template the delay, cast to float and sleep delay = template.template(self.basedir, delay, inject, expand_lists=False) delay = float(delay) time.sleep(delay) tmp = '' if self._early_needs_tmp_path(module_name, handler): tmp = self._make_tmp_path(conn) result = handler.run(conn, tmp, module_name, module_args, inject, complex_args) result.result['attempts'] = x vv("Result from run %i is: %s" % (x, result.result)) inject[self.module_vars.get('register')] = result.result cond = template.template(self.basedir, until, inject, expand_lists=False) if utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars): break if result.result['attempts'] == retries and not utils.check_conditional(cond, self.basedir, inject, fail_on_undefined=self.error_on_undefined_vars): result.result['failed'] = True result.result['msg'] = "Task failed as maximum retries was encountered" else: result.result['attempts'] = 0 conn.close()
handler.run関数内の処理はactionプラグインによるが、デフォルトのnormalプラグインの場合、Runnerクラスの_execute_module()関数を呼んでいるだけ。
_execute_module()関数は、実際にmoduleの転送と実行をしている。その処理を5-1から5-4にまとめる。
5-1. moduleのロードと書き換え
_execute_module()関数はまず、moduleのロードと書き換えをする。
lib/ansible/runner/init.py#L445-L460
def _execute_module(self, conn, tmp, module_name, args, async_jid=None, async_module=None, async_limit=None, inject=None, persist_files=False, complex_args=None, delete_remote_tmp=True): ''' transfer and run a module along with its arguments on the remote side''' # hack to support fireball mode if module_name == 'fireball': args = "%s password=%s" % (args, base64.b64encode(str(utils.key_for_hostname(conn.host)))) if 'port' not in args: args += " port=%s" % C.ZEROMQ_PORT ( module_style, shebang, module_data ) = self._configure_module(conn, module_name, args, inject, complex_args)
_configure_module関数は次のような実装になっている。
lib/ansible/runner/init.py#L1247-L1266
def _configure_module(self, conn, module_name, module_args, inject, complex_args=None): ''' find module and configure it ''' # Search module path(s) for named module. module_suffixes = getattr(conn, 'default_suffixes', None) module_path = utils.plugins.module_finder.find_plugin(module_name, module_suffixes, transport=self.transport) if module_path is None: module_path2 = utils.plugins.module_finder.find_plugin('ping', module_suffixes) if module_path2 is not None: raise errors.AnsibleFileNotFound("module %s not found in configured module paths" % (module_name)) else: raise errors.AnsibleFileNotFound("module %s not found in configured module paths. Additionally, core modules are missing. If this is a checkout, run 'git submodule update --init --recursive' to correct this problem." % (module_name)) # insert shared code and arguments into the module (module_data, module_style, module_shebang) = module_replacer.modify_module( module_path, complex_args, module_args, inject ) return (module_style, module_shebang, module_data)
まず、utils.plugins.module_finder.find_plugin関数でmoduleをロードしている。ロードには、プラグインをロードするときと仕組みを使っている。
次に、module_replacer.modify_module関数でmoduleを書き換えている。
lib/ansible/module_common.py#L142-L170
def modify_module(self, module_path, complex_args, module_args, inject): with open(module_path) as f: # read in the module source module_data = f.read() (module_data, module_style) = self._find_snippet_imports(module_data, module_path) complex_args_json = utils.jsonify(complex_args) # We force conversion of module_args to str because module_common calls shlex.split, # a standard library function that incorrectly handles Unicode input before Python 2.7.3. # Note: it would be better to do all this conversion at the border # (when the data is originally parsed into data structures) but # it's currently coming from too many sources to make that # effective. try: encoded_args = repr(module_args.encode('utf-8')) except UnicodeDecodeError: encoded_args = repr(module_args) try: encoded_complex = repr(complex_args_json.encode('utf-8')) except UnicodeDecodeError: encoded_complex = repr(complex_args_json.encode('utf-8')) # these strings should be part of the 'basic' snippet which is required to be included module_data = module_data.replace(REPLACER_VERSION, repr(__version__)) module_data = module_data.replace(REPLACER_ARGS, encoded_args) module_data = module_data.replace(REPLACER_COMPLEX, encoded_complex)
この関数では、転送の効率化のために、moduleに書かれたansible.module_utilsパッケージのimport文を、import先のスクリプトで差し替える処理をしている(self._find_snippet_imports関数の中)。この差し替えにより、ホストに転送するファイル数を減らしている。
さらに、moduleの引数をmoduleに埋め込む処理をしている。引数にはmodule_args
とcomplex_args
がある。module_args
には、単純なkey=valueで表現できる引数が入る。complex_args
には、リストや辞書をつかった複雑な引数がJSONで入る。ここではその両方を、moduleに埋め込んでいる。
5-2. 一時ディレクトリの作成
次に_execute_module()関数は、moduleの転送先となる一時ディレクトリを作成する。
lib/ansible/runner/init.py#L462-L466
# a remote tmp path may be necessary and not already created if self._late_needs_tmp_path(conn, tmp, module_style): tmp = self._make_tmp_path(conn) remote_module_path = conn.shell.join_path(tmp, module_name)
lib/ansible/runner/init.py#L1182-L1194
def _make_tmp_path(self, conn): ''' make and return a temporary path on a remote box ''' basefile = 'ansible-tmp-%s-%s' % (time.time(), random.randint(0, 2**48)) use_system_tmp = False if (self.sudo and self.sudo_user != 'root') or (self.su and self.su_user != 'root'): use_system_tmp = True tmp_mode = None if self.remote_user != 'root' or ((self.sudo and self.sudo_user != 'root') or (self.su and self.su_user != 'root')): tmp_mode = 'a+rx' cmd = conn.shell.mkdtemp(basefile, use_system_tmp, tmp_mode) result = self._low_level_exec_command(conn, cmd, None, sudoable=False)
conn.shell.mkdtemp関数でディレクトリを作るためのシェルコマンドを作成した後、low_level_exec_command関数でコマンドを実行している。low_level_exec_command関数は、(sshによる接続が選ばれていれば)sshプラグインのexec_command関数を呼び出す。exec_command関数は、sshコマンドの末尾に実行したいコマンドをappendして、sshを実行する。
ちなみに、ansible-playbookにvvvオプションをつけて実行すると、大抵は 1.EXECから始まる行 2.PUTから始まる行 3.EXECから始まる行 が出力される。このうち、1のログは、一時ディレクトリの作成に伴うもの。
5-3. moduleの転送
次に、_execute_module()関数は、作成した一時ディレクトリに対して、実行するmoduleをホストに転送する。
lib/ansible/runner/init.py#L468-L474
if (module_style != 'new' or async_jid is not None or not conn.has_pipelining or not C.ANSIBLE_SSH_PIPELINING or C.DEFAULT_KEEP_REMOTE_FILES or self.su): self._transfer_str(conn, tmp, module_name, module_data)
lib/ansible/runner/init.py#L279-L304
def _transfer_str(self, conn, tmp, name, data): ''' transfer string to remote file ''' if type(data) == dict: data = utils.jsonify(data) afd, afile = tempfile.mkstemp() afo = os.fdopen(afd, 'w') try: if not isinstance(data, unicode): #ensure the data is valid UTF-8 data.decode('utf-8') else: data = data.encode('utf-8') afo.write(data) except: raise errors.AnsibleError("failure encoding into utf-8") afo.flush() afo.close() remote = conn.shell.join_path(tmp, name) try: conn.put_file(afile, remote) finally: os.unlink(afile) return remote
moduleを一度 自ホストの一時ファイルに書き込んだ後、conn.put_file関数で転送している。デフォルトだとsftpで転送するようだ。
ちなみに上述したvvvオプションのログのうち、PUTから始まる行のログは、ここの処理に伴うもの。
5-4. moduleの実行
転送したmoduleを実行する。
lib/ansible/runner/init.py#L532-L544
cmd = conn.shell.build_module_command(environment_string, shebang, cmd, rm_tmp) cmd = cmd.strip() sudoable = True if module_name == "accelerate": # always run the accelerate module as the user # specified in the play, not the sudo_user sudoable = False if self.su: res = self._low_level_exec_command(conn, cmd, tmp, su=True, in_data=in_data) else: res = self._low_level_exec_command(conn, cmd, tmp, sudoable=sudoable, in_data=in_data)
lib/ansible/runner/shell_plugins/sh.py#L82-L87
def build_module_command(self, env_string, shebang, cmd, rm_tmp=None): cmd_parts = [env_string.strip(), shebang.replace("#!", "").strip(), cmd] new_cmd = " ".join(cmd_parts) if rm_tmp: new_cmd = '%s; rm -rf %s >/dev/null 2>&1' % (new_cmd, rm_tmp) return new_cmd
まず、conn.shell.build_module_command関数で、実行したいコマンドの前後に環境変数、sudo周りの処理やmoduleの削除処理を加える。次に、一時ディレクトリの作成でもでてきた_low_level_exec_command関数で、コマンドを実行している。一時ディレクトリの時と同様に、sshコマンドの末尾に実行したいコマンドがappendされ、sshが実行される。
6. 実行結果の確認
最後に、_executor_internal_inner関数は実行結果を確認する。雑多な処理を含むのでコードは割愛するが、大体次のような処理をしている。
通信エラー(あるいはパースエラー)の確認。該当するエラーがあった場合、on_unreachable関数がコールバックとして呼ばれる。
failed_whenの確認。Ansibleは、タスクをfailedにする条件を自分で指定できる。その条件を満たしているかを確認する。
実行の成否の確認。実行が失敗していた場合、on_failed関数がコールバックとして呼ばれる。成功していた場合は、on_ok関数。
まとめと感想
このエントリでは、Runnerクラスの処理のうち、特に_executor_internal_inter関数周りをまとめた。個人的にはプラグインの仕組みを複数用意して、接続方法や環境の差異をうまく吸収していたのが面白かった。他にも面白い箇所はあったが、今回はまとめきれなかった。そのあたりは、またいつか。