Cloud Computing 22 min read

Automate AWS Tag Management: Scan, Identify Untagged Resources, and Apply Tags at Scale

The article presents an end‑to‑end automated solution for AWS resource tagging that scans all accounts, discovers services, identifies resources missing specified tags, and batch‑applies them, while offering configurable workflows, performance optimizations, permission requirements, and optional AI‑driven MCP integration for smarter governance.

Amazon Cloud Developers
Amazon Cloud Developers
Amazon Cloud Developers
Automate AWS Tag Management: Scan, Identify Untagged Resources, and Apply Tags at Scale

Background

As AWS usage grows from dozens to tens of thousands of resources, consistent tag management becomes essential for cost allocation, security, operational efficiency, and automation.

Challenges of Manual Tagging

Manual addition via console is time‑consuming and error‑prone.

Inconsistent naming conventions cause tag chaos.

Legacy resources often lack tags, especially after large migrations.

Solution Overview

Three‑stage modular tool:

Service Configuration Generation – Scans the account, discovers enabled services, and creates aws_service_config.json that maps each service’s resource types, API calls, ARN templates, and ID‑extraction logic. Implemented in generate_service_config.py.

Resource Scanning – Uses the generated configuration to enumerate resources across all regions and services, identifies those missing a specified tag key, and aggregates results. Multithreading and the Amazon Resource Groups Tagging API are used for efficient batch queries.

Tag Application – Applies the predefined tag to the identified resources via the Tagging API, producing a report with success, failure, and skipped counts.

Technical Implementation

Service Configuration Generation

Python script discovers active services and builds a JSON configuration. Example excerpt:

#!/usr/bin/env python3
import boto3, json, logging
from concurrent.futures import ThreadPoolExecutor, as_completed

SERVICE_TEMPLATES = {
    'ec2': {
        'resource_types': [
            {
                'type': 'instance',
                'list_method': 'describe_instances',
                'arn_template': 'arn:aws:ec2:{region}:{account_id}:instance/{id}',
                'extract_ids': 'lambda data: [i["InstanceId"] for r in data.get("Reservations", []) for i in r.get("Instances", [])]'
            },
            {
                'type': 'volume',
                'list_method': 'describe_volumes',
                'arn_template': 'arn:aws:ec2:{region}:{account_id}:volume/{id}',
                'extract_ids': 'lambda data: [v["VolumeId"] for v in data.get("Volumes", [])]'
            }
            # ... more resource types
        ]
    }
    # ... more service configurations
}

def discover_active_services(region):
    """Discover active AWS services in a region"""
    active_services = {}
    for service_name in POTENTIAL_SERVICES:
        try:
            client = boto3.client(service_name, region_name=region)
            if service_name in SERVICE_TEMPLATES:
                for resource_type in SERVICE_TEMPLATES[service_name]['resource_types']:
                    try:
                        list_method = resource_type['list_method']
                        response = getattr(client, list_method)()
                        extract_ids = eval(resource_type['extract_ids'])
                        resource_ids = extract_ids(response)
                        if resource_ids:
                            active_services[service_name] = SERVICE_TEMPLATES[service_name]
                            logger.info(f"Discovered active service: {service_name} ({len(resource_ids)} resources)")
                            break
                    except Exception as e:
                        logger.debug(f"Error checking service {service_name}: {str(e)}")
                        continue
        except Exception as e:
            logger.debug(f"Service {service_name} unavailable: {str(e)}")
            continue
    return active_services

Resource Scanning

Parallel region processing retrieves untagged resources. Core function:

def get_untagged_resources(account_id, regions=None, services=None, tag_key='xxx-xxxx', max_workers=10):
    """Retrieve resources missing a specific tag"""
    if regions is None:
        regions = get_available_regions()
    untagged_resources = {}
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_region = {executor.submit(process_region, account_id, region, services, tag_key): region for region in regions}
        for future in as_completed(future_to_region):
            region = future_to_region[future]
            try:
                region_resources = future.result()
                if region_resources:
                    untagged_resources[region] = region_resources
            except Exception as e:
                logger.error(f"Error processing region {region}: {str(e)}")
    return untagged_resources

Tag Application

Resources are batched (max 20 per request) and tagged via tag_resources. The result includes total processed count and success rate.

def _apply_tags(self, params):
    """Apply tags to resources"""
    from add_service_tag import tag_resources
    resources_by_region = {}
    for arn in params['resource_arns']:
        region = self._extract_region_from_arn(arn)
        resources_by_region.setdefault(region, {'batch': {'resources': []}})['batch']['resources'].append(arn)
    results = {}
    for region, region_data in resources_by_region.items():
        region_results = tag_resources(
            resources={region: region_data},
            tag_key=list(params['tags'].keys())[0],
            tag_value=list(params['tags'].values())[0],
            dry_run=params.get('dry_run', True)
        )
        results[region] = region_results
    return {
        'operation': 'tag_application',
        'dry_run': params.get('dry_run', True),
        'results': results,
        'summary': {
            'total_processed': len(params['resource_arns']),
            'success_rate': self._calculate_success_rate(results)
        }
    }

Workflow

Execute the three steps sequentially or inspect intermediate output:

Generate Service Config

python generate_service_config.py --regions us-east-1 us-west-2 --output aws_service_config.json --verbose

Scan Untagged Resources

python add_service_tag.py --tag-key xxx-xxxx --output untagged_resources.json --regions us-east-1 us-west-2

Apply Tags (add --apply for real execution)

python add_service_tag.py --input untagged_resources.json --tag-value xxxx-xx-xxx --apply

Permissions

IAM role must include at least tag:GetResources, tag:TagResources, ec2:DescribeRegions, and the specific Describe* permissions required by each service (e.g., ec2:DescribeInstances).

Performance Optimizations

Limit scan scope with --regions and --services.

Adjust max_workers based on system capacity.

Process regions or services in separate batches.

Remove unnecessary services from the generated config to reduce workload.

MCP Integration (AI‑Driven Governance)

The tool can be wrapped as an MCP service, exposing two operations: resource discovery and tag application. Example MCP definition:

{
  "aws-resource-discovery": {
    "name": "aws-resource-discovery",
    "description": "Discover and analyze AWS resource tag state",
    "parameters": {
      "type": "object",
      "properties": {
        "regions": {"type": "array", "items": {"type": "string"}, "default": ["us-east-1"]},
        "services": {"type": "array", "items": {"type": "string"}, "default": ["ec2", "s3", "rds"]},
        "tag_key": {"type": "string", "default": "Environment"},
        "analysis_type": {"type": "string", "enum": ["untagged", "inconsistent", "cost-analysis"], "default": "untagged"}
      }
    }
  },
  "aws-tag-application": {
    "name": "aws-tag-application",
    "description": "Batch apply tags to AWS resources",
    "parameters": {
      "type": "object",
      "properties": {
        "resource_arns": {"type": "array", "items": {"type": "string"}},
        "tags": {"type": "object", "additionalProperties": {"type": "string"}},
        "dry_run": {"type": "boolean", "default": true}
      },
      "required": ["resource_arns", "tags"]
    }
  }
}

Sample AI interaction:

User: "Tag all EC2 instances in us-east-1 that have been idle for over 30 days with 'candidate-for-termination'"
AI: Scanning us-east-1 EC2 instances...
AI: Found 12 idle instances
AI: Applying tag...
AI: Completed: 12 instances tagged successfully

Future Outlook

Integrating MCP enables natural‑language driven governance, intelligent cost‑optimization suggestions, and automated compliance checks, extending the tool beyond pure scripting to an interactive AI‑assisted operations platform.

Conclusion

Automated tag management eliminates manual errors, enforces consistent metadata, and improves operational efficiency in large AWS environments. The solution—service discovery, high‑performance scanning, and bulk tag application—can be applied to migration projects, cost allocation, security enforcement, and continuous compliance.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

PythonautomationMCPAWSTaggingCloud Resource Management
Amazon Cloud Developers
Written by

Amazon Cloud Developers

Official technical community of Amazon Cloud. Shares practical AI/ML, big data, database, modern app development, IoT content, offers comprehensive learning resources, hosts regular developer events, and continuously empowers developers.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.