Ansible ships with thousands of modules and dozens of plugin types. Before writing custom code, search hard — there's almost certainly a module that already does what you need. But for genuinely organisation-specific logic (your internal CMDB, your secrets store, your audit-logging requirements), a small Python file in the right folder gives you a first-class extension to Ansible.
A module is a Python script Ansible ships to the managed node, executes there, and
collects the JSON result. Drop one in library/ next to your playbook and
Ansible auto-discovers it.
#!/usr/bin/env python3
"""
Custom Ansible module — manage Oracle DB users idempotently.
Usage in a task:
- manage_oracle_user:
name: appuser
password: "{{{{ vault_appuser_password }}}}"
tablespace: USERS
state: present
"""
from ansible.module_utils.basic import AnsibleModule
def get_user_state(module, name):
"""Return dict of user info, or None if user doesn't exist."""
rc, stdout, stderr = module.run_command([
"sqlplus", "-s", "/", "as", "sysdba", "@-",
], data=f"SET PAGESIZE 0 FEEDBACK OFF; SELECT username, default_tablespace FROM dba_users WHERE username = UPPER(\'{name}\');")
if rc != 0:
module.fail_json(msg=f"sqlplus failed: {{stderr}}")
if not stdout.strip():
return None
parts = stdout.split()
return {{"username": parts[0], "tablespace": parts[1]}}
def main():
module = AnsibleModule(
argument_spec=dict(
name=dict(type="str", required=True),
password=dict(type="str", required=False, no_log=True),
tablespace=dict(type="str", default="USERS"),
state=dict(type="str", default="present", choices=["present", "absent"]),
),
supports_check_mode=True,
)
name = module.params["name"]
state = module.params["state"]
current = get_user_state(module, name)
changed = False
if state == "present" and current is None:
if not module.check_mode:
module.run_command([
"sqlplus", "-s", "/", "as", "sysdba", "@-",
], data=f"CREATE USER {{name}} IDENTIFIED BY \"{{module.params[\"password\"]}}\" DEFAULT TABLESPACE {{module.params[\"tablespace\"]}};")
changed = True
elif state == "absent" and current is not None:
if not module.check_mode:
module.run_command([
"sqlplus", "-s", "/", "as", "sysdba", "@-",
], data=f"DROP USER {{name}} CASCADE;")
changed = True
module.exit_json(changed=changed, user=name, state=state)
if __name__ == "__main__":
main()
module.params, do the work, call module.exit_json(changed=...) on success or module.fail_json(msg=...) on failure. supports_check_mode=True + module.check_mode guards make it dry-run safe."""
Custom Jinja2 filters for network calculations.
Usage in a template:
{{{{ "10.0.0.0/24" | network_size }}}} → 256
{{{{ "10.0.0.5" | next_ip }}}} → "10.0.0.6"
"""
import ipaddress
def network_size(cidr):
"""Number of addresses in a CIDR block."""
return ipaddress.ip_network(cidr, strict=False).num_addresses
def next_ip(addr):
"""The next IP after a given address."""
return str(ipaddress.ip_address(addr) + 1)
def hosts_in_subnet(cidr, exclude_first=2, exclude_last=1):
"""Usable hosts in a /24 (skip network, broadcast, gateway)."""
net = ipaddress.ip_network(cidr, strict=False)
return [str(h) for h in list(net.hosts())[exclude_first - 1:-exclude_last]]
class FilterModule:
"""Required class name. Maps Python functions to filter names."""
def filters(self):
return {{
"network_size": network_size,
"next_ip": next_ip,
"hosts_in_subnet": hosts_in_subnet,
}}
---
- hosts: localhost
vars:
db_subnet: "10.0.1.0/24"
tasks:
- name: Show subnet size
ansible.builtin.debug:
msg: "Subnet has {{{{ db_subnet | network_size }}}} addresses"
- name: List usable host IPs
ansible.builtin.debug:
msg: "{{{{ db_subnet | hosts_in_subnet }}}}"
Lookups run on the control node and return data into a variable. Built-in
examples: file, env, password, vars. A custom one is great
for fetching secrets from your internal vault:
"""
Custom lookup — fetch a value from our internal KV store.
Usage:
{{{{ lookup('internal_kv', 'database/mysql/root_password') }}}}
"""
from ansible.errors import AnsibleError
from ansible.plugins.lookup import LookupBase
import requests
class LookupModule(LookupBase):
def run(self, terms, variables=None, **kwargs):
ret = []
for term in terms:
try:
resp = requests.get(
f"https://kv.internal.example.com/v1/{{term}}",
headers={{"X-Token": variables.get("kv_token", "")}},
timeout=5,
)
resp.raise_for_status()
ret.append(resp.json()["value"])
except Exception as e:
raise AnsibleError(f"KV lookup failed for {{term}}: {{e}}")
return ret
---
- hosts: databases
vars:
kv_token: "{{{{ lookup('env', 'INTERNAL_KV_TOKEN') }}}}"
mysql_root_password: "{{{{ lookup('internal_kv', 'database/mysql/root_password') }}}}"
tasks:
- name: Connect to MySQL with vault-fetched password
community.mysql.mysql_query:
login_user: root
login_password: "{{{{ mysql_root_password }}}}"
query: "SELECT VERSION()"
An action plugin runs on the control node before the module ships to the
remote. Useful for pre-processing, conditional skipping, or chaining multiple modules
into one logical task. Most built-in modules have a corresponding action plugin
(copy, template, fetch) that handles the file transfer.
"""
Action plugin: deploy a release atomically.
Combines fetch → unpack → symlink swap into one task.
"""
from ansible.plugins.action import ActionBase
class ActionModule(ActionBase):
def run(self, tmp=None, task_vars=None):
result = super().run(tmp, task_vars)
version = self._task.args.get("version")
app_dir = self._task.args.get("app_dir", "/opt/app")
# 1. Fetch the release (calls the get_url module)
fetch_result = self._execute_module(
module_name="ansible.builtin.get_url",
module_args={{
"url": f"https://artifacts/example.com/{{version}}.tgz",
"dest": f"/tmp/{{version}}.tgz",
}},
task_vars=task_vars,
)
if fetch_result.get("failed"):
return fetch_result
# 2. Unpack
unpack_result = self._execute_module(
module_name="ansible.builtin.unarchive",
module_args={{
"src": f"/tmp/{{version}}.tgz",
"dest": f"{{app_dir}}/releases",
"remote_src": True,
}},
task_vars=task_vars,
)
if unpack_result.get("failed"):
return unpack_result
# 3. Atomic symlink swap
symlink_result = self._execute_module(
module_name="ansible.builtin.file",
module_args={{
"src": f"{{app_dir}}/releases/{{version}}",
"dest": f"{{app_dir}}/current",
"state": "link",
"force": True,
}},
task_vars=task_vars,
)
result.update(symlink_result)
result["changed"] = True
result["version_deployed"] = version
return result
"""
Callback plugin: send Slack notifications on play_recap.
Enable in ansible.cfg:
[defaults]
callbacks_enabled = slack_notify
"""
from ansible.plugins.callback import CallbackBase
import json
import os
import urllib.request
class CallbackModule(CallbackBase):
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = "notification"
CALLBACK_NAME = "slack_notify"
def __init__(self):
super().__init__()
self.webhook = os.environ.get("SLACK_WEBHOOK")
self.stats = {{}}
def v2_playbook_on_stats(self, stats):
if not self.webhook:
return
summary = []
for host in stats.processed:
s = stats.summarize(host)
line = f"{{host}}: ok={{s[\"ok\"]}} changed={{s[\"changed\"]}} failed={{s[\"failures\"]}}"
if s["failures"] > 0:
line = f":x: {{line}}"
elif s["changed"] > 0:
line = f":warning: {{line}}"
else:
line = f":white_check_mark: {{line}}"
summary.append(line)
payload = {{"text": "Playbook recap:\\n" + "\\n".join(summary)}}
req = urllib.request.Request(
self.webhook,
data=json.dumps(payload).encode(),
headers={{"Content-Type": "application/json"}},
)
urllib.request.urlopen(req, timeout=5)
| Scope | Location |
|---|---|
| Single project | Folders next to playbook (library/, filter_plugins/, etc) |
| Single role | Same folders inside the role: roles/myrole/library/ |
| Org-wide | Bundle into a Collection — plugins/modules/, plugins/filter/, etc. |
| System-wide | ~/.ansible/plugins/ or /usr/share/ansible/plugins/ |