From: Michael Brown Date: Fri, 17 Apr 2026 12:54:49 +0000 (+0100) Subject: [cloud] Do not rely on ECS instances to import images to Alibaba Cloud X-Git-Url: http://git.ipfire.org/index.cgi?a=commitdiff_plain;h=HEAD;p=thirdparty%2Fipxe.git [cloud] Do not rely on ECS instances to import images to Alibaba Cloud Spinning up ECS instances is supported in all ECS regions (unlike Function Compute), but turns out to be unacceptably unreliable since Alibaba Cloud has a very irritating tendency to fail to launch ECS instances for a variety of spurious and unpredictable reasons. Rewrite the censorship bypass mechanism to use the (extremely slow) CopyImage API call to copy an imported image from an uncensored region to a censored region. Signed-off-by: Michael Brown --- diff --git a/contrib/cloud/ali-import b/contrib/cloud/ali-import index 30d2ba763..d51a4e159 100755 --- a/contrib/cloud/ali-import +++ b/contrib/cloud/ali-import @@ -1,10 +1,10 @@ #!/usr/bin/env python3 import argparse -import base64 from collections import namedtuple from concurrent.futures import ThreadPoolExecutor, as_completed import datetime +from itertools import cycle import logging from pathlib import Path import subprocess @@ -25,9 +25,6 @@ import alibabacloud_tea_openapi.models import alibabacloud_tea_util as util import alibabacloud_tea_util.client import alibabacloud_tea_util.models -import alibabacloud_vpc20160428 as vpc -import alibabacloud_vpc20160428.client -import alibabacloud_vpc20160428.models # For regions in mainland China, the Chinese state censorship laws # prohibit direct access to OSS bucket contents. @@ -40,20 +37,16 @@ logger = logging.getLogger('ali-import') ECS_ENDPOINT = 'ecs.aliyuncs.com' -OSS_FORBIDDEN_REGION_CODE = 'ForbidCreateNewBucket' OSS_BUCKET_NAME_LEN = 63 IPXE_STORAGE_PREFIX = 'ipxe-upload-temp-' IPXE_STORAGE_TAG = 'ipxe-upload-temp' -IPXE_STORAGE_DISK_CATEGORY = 'cloud_essd' -IPXE_SG_TAG = 'ipxe-default-sg' -IPXE_VSWITCH_TAG = 'ipxe-default-vswitch' +Clients = namedtuple('Clients', ['region', 'ecs', 'oss']) +Image = namedtuple('Image', + ['path', 'family', 'name', 'arch', 'mode', 'public']) -Clients = namedtuple('Clients', ['region', 'ecs', 'oss', 'vpc']) -Image = namedtuple('Image', ['path', 'family', 'name', 'arch', 'mode']) - -def image(filename, basefamily, basename): +def image(filename, basefamily, basename, public): """Construct image description""" with tempfile.NamedTemporaryFile(mode='w+t') as mtoolsrc: mtoolsrc.writelines([ @@ -76,7 +69,7 @@ def image(filename, basefamily, basename): name = '%s%s' % (basename, suffix) arch = uefi[0] if len(uefi) == 1 else None if uefi else 'x86_64' mode = 'UEFI' if uefi else 'BIOS' - return Image(path, family, name, arch, mode) + return Image(path, family, name, arch, mode, public) def all_regions(): """Get list of all regions""" @@ -94,183 +87,47 @@ def all_clients(region): ecsconf = openapi.models.Config(credential=cred, region_id=region) osscred = oss.credentials.EnvironmentVariableCredentialsProvider() ossconf = oss.config.Config(credentials_provider=osscred, region=region) - vpcconf = openapi.models.Config(credential=cred, region_id=region) clients = Clients( region=region, ecs=ecs.client.Client(ecsconf), oss=oss.client.Client(ossconf), - vpc=vpc.client.Client(vpcconf), ) return clients -def delete_temp_instance(clients, instance, retry=False): - """Remove temporary instance""" - logger.info("%s: deleting %s" % (clients.region, instance)) - while True: - req = ecs.models.DeleteInstanceRequest( - instance_id=instance, - force=True, - force_stop=True, - ) - try: - rsp = clients.ecs.delete_instance(req) - except openapi.exceptions.ClientException: - # Very recently created instances often cannot be - # terminated until some undocumented part of the control - # plane decides that enough time has elapsed - if retry: - time.sleep(1) - continue - raise - break - -def run_temp_instance_command(clients, instance, command): - """Run command on temporary instance""" - command_content=' '.join(command) - logger.info("%s: running %s" % (clients.region, command_content)) - req = ecs.models.RunCommandRequest( - region_id=clients.region, - instance_id=[instance], - type='RunShellScript', - command_content=command_content, - ) - rsp = clients.ecs.run_command(req) - invocation = rsp.body.invoke_id - while True: - time.sleep(1) - req = ecs.models.DescribeInvocationResultsRequest( - region_id=clients.region, - invoke_id=invocation, - ) - rsp = clients.ecs.describe_invocation_results(req) - result = rsp.body.invocation.invocation_results.invocation_result[0] - if result.invoke_record_status not in ('Pending', 'Running'): - break - output = base64.b64decode(result.output).decode() - if result.invocation_status != 'Success': - raise RuntimeError(output if output else result.invocation_status) - return result - -def create_temp_instance(clients, family, machine, role): - """Create temporary instance (and remove any stale temporary instances)""" - tag = ecs.models.DescribeInstancesRequestTag( - key=IPXE_STORAGE_TAG, - value=IPXE_STORAGE_TAG, - ) - req = ecs.models.DescribeInstancesRequest( - region_id=clients.region, - tag=[tag], - ) - rsp = clients.ecs.describe_instances(req) - for instance in rsp.body.instances.instance or []: - delete_temp_instance(clients, instance.instance_id) - req = ecs.models.DescribeAvailableResourceRequest( - region_id=clients.region, - destination_resource='Zone', - instance_type=machine, - ) - rsp = clients.ecs.describe_available_resource(req) - if rsp.body.available_zones is None: - # Cannot create instances in this region - logger.warning("%s: no zones support %s" % (clients.region, machine)) - return None - zone_id = next(x.zone_id - for x in rsp.body.available_zones.available_zone or [] - if x.status == 'Available') - logger.info("%s: creating %s in %s" % (clients.region, machine, zone_id)) - tag = ecs.models.DescribeSecurityGroupsRequestTag( - key=IPXE_SG_TAG, - value=IPXE_SG_TAG, - ) - req = ecs.models.DescribeSecurityGroupsRequest( - region_id=clients.region, - tag=[tag], - ) - rsp = clients.ecs.describe_security_groups(req) - sgs = rsp.body.security_groups.security_group or [] - sg_id = sgs[0].security_group_id - vpc_id = sgs[0].vpc_id - tag = vpc.models.DescribeVSwitchesRequestTag( - key=IPXE_VSWITCH_TAG, - value=IPXE_VSWITCH_TAG, - ) - req = vpc.models.DescribeVSwitchesRequest( - region_id=clients.region, - vpc_id=vpc_id, - zone_id=zone_id, - tag=[tag], - ) - rsp = clients.vpc.describe_vswitches(req) - vswitches = rsp.body.v_switches.v_switch or [] - vswitch_id = vswitches[0].v_switch_id - name = '%s%s' % (IPXE_STORAGE_PREFIX, uuid4()) - sysdisk = ecs.models.RunInstancesRequestSystemDisk( - category=IPXE_STORAGE_DISK_CATEGORY, - ) - now = datetime.datetime.now(datetime.UTC) - lifetime = datetime.timedelta(hours=1) - release = (now + lifetime).strftime('%Y-%m-%dT%H:%M:%SZ') - tag = ecs.models.RunInstancesRequestTag( - key=IPXE_STORAGE_TAG, - value=IPXE_STORAGE_TAG, - ) - req = ecs.models.RunInstancesRequest( - region_id=clients.region, - image_family=family, - instance_type=machine, - instance_name=name, - auto_release_time=release, - ram_role_name=role, - system_disk=sysdisk, - security_group_ids=[sg_id], - v_switch_id=vswitch_id, - internet_charge_type='PayByTraffic', - internet_max_bandwidth_out=100, - tag=[tag], - ) - try: - rsp = clients.ecs.run_instances(req) - except openapi.exceptions.ClientException as exc: - if exc.code in ('RegionUnauthorized', - 'InvalidPeriod.RegionDiscontinued', - 'InvalidInstanceType.ValueNotSupported'): - logger.warning("%s: ECS lied about availability" % clients.region) - return None - raise - instance_id = rsp.body.instance_id_sets.instance_id_set[0] - logger.info("%s: created %s" % (clients.region, instance_id)) - command = ['aliyun', 'configure', 'set', '--mode', 'EcsRamRole', - '--region', clients.region] - run_temp_instance_command(clients, instance_id, command) - return instance_id - -def delete_temp_bucket(clients, instance, bucket): +def delete_temp_bucket(clients, bucket): """Remove temporary bucket""" logger.info("%s: deleting %s" % (clients.region, bucket)) assert bucket.startswith(IPXE_STORAGE_PREFIX) - command = ['aliyun', 'oss', 'rm', 'oss://%s' % bucket, - '--bucket', '--recursive', '--force', - '--endpoint', ('oss-%s-internal.aliyuncs.com' % clients.region)] - run_temp_instance_command(clients, instance, command) + req = oss.models.ListObjectsV2Request( + bucket=bucket, + prefix=IPXE_STORAGE_PREFIX, + ) + rsp = clients.oss.list_objects_v2(req) + delete = [x.key for x in rsp.contents or ()] + if delete: + req = oss.models.DeleteMultipleObjectsRequest( + bucket=bucket, + objects=[oss.models.DeleteObject(x) for x in delete], + ) + rsp = clients.oss.delete_multiple_objects(req) + req = oss.models.DeleteBucketRequest(bucket=bucket) + rsp = clients.oss.delete_bucket(req) -def create_temp_bucket(clients, instance): +def create_temp_bucket(clients): """Create temporary bucket (and remove any stale temporary buckets)""" + if clients.region.startswith('cn-'): + # Object storage is non-functional in Chinese mainland regions + # due to censorship restrictions + return None prefix = '%s%s-' % (IPXE_STORAGE_PREFIX, clients.region) req = oss.models.ListBucketsRequest(prefix=prefix) rsp = clients.oss.list_buckets(req) buckets = [x.name for x in rsp.buckets or ()] for bucket in buckets: - delete_temp_bucket(clients, instance, bucket) + delete_temp_bucket(clients, bucket) bucket = ('%s%s' % (prefix, uuid4()))[:OSS_BUCKET_NAME_LEN] req = oss.models.PutBucketRequest(bucket=bucket) - try: - rsp = clients.oss.put_bucket(req) - except oss.exceptions.OperationError as exc: - # AliCloud provides no other way to detect non-functional regions - if exc.unwrap().code == OSS_FORBIDDEN_REGION_CODE: - logger.warning("%s: non-functional OSS" % clients.region) - return None - raise exc + rsp = clients.oss.put_bucket(req) logger.info("%s: created %s" % (clients.region, bucket)) return bucket @@ -282,20 +139,8 @@ def upload_image(clients, bucket, image): rsp = clients.oss.put_object_from_file(req, image.path) return key -def copy_images(clients, instance, bucket, source): - """Copy disk images to bucket from uncensored bucket""" - logger.info("%s: syncing from %s" % (clients.region, source['bucket'])) - command = ['aliyun', 'oss', 'sync', - 'oss://%s' % source['bucket'], 'syncdir', - '--endpoint', ('oss-%s.aliyuncs.com' % source['region'])] - run_temp_instance_command(clients, instance, command) - logger.info("%s: syncing to %s" % (clients.region, bucket)) - command = ['aliyun', 'oss', 'sync', 'syncdir', 'oss://%s' % bucket, - '--endpoint', ('oss-%s-internal.aliyuncs.com' % clients.region)] - run_temp_instance_command(clients, instance, command) - -def delete_images(clients, name): - """Remove existing images""" +def delete_image(clients, name): + """Remove existing image (if applicable)""" req = ecs.models.DescribeImagesRequest( region_id=clients.region, image_name=name, @@ -318,10 +163,43 @@ def delete_images(clients, name): ) rsp = clients.ecs.delete_image(req) -def import_image(clients, image, bucket, key, public, overwrite): +def wait_for_task(clients, task_id): + """Wait for task to complete""" + while True: + time.sleep(5) + req = ecs.models.DescribeTasksRequest( + region_id=clients.region, + task_ids=task_id, + ) + rsp = clients.ecs.describe_tasks(req) + assert len(rsp.body.task_set.task) == 1 + assert rsp.body.task_set.task[0].task_id == task_id + status = rsp.body.task_set.task[0].task_status + if status not in ('Waiting', 'Processing'): + break + if status != 'Finished': + raise RuntimeError(status) + +def wait_for_image(clients, image_id): + """Wait for image to become available""" + while True: + time.sleep(5) + req = ecs.models.DescribeImagesRequest( + region_id=clients.region, + image_id=image_id, + ) + rsp = clients.ecs.describe_images(req) + if len(rsp.body.images.image): + assert len(rsp.body.images.image) == 1 + assert rsp.body.images.image[0].image_id == image_id + status = rsp.body.images.image[0].status + if status != 'Creating': + break + if status != 'Available': + raise RuntimeError(status) + +def import_image(clients, image, bucket, key): """Import image""" - if overwrite: - delete_images(clients, image.name) logger.info("%s: importing %s" % (clients.region, image.name)) disk = ecs.models.ImportImageRequestDiskDeviceMapping( disk_image_size = 1, @@ -339,34 +217,46 @@ def import_image(clients, image, bucket, key, public, overwrite): rsp = clients.ecs.import_image(req) image_id = rsp.body.image_id task_id = rsp.body.task_id - while True: - time.sleep(5) - req = ecs.models.DescribeTasksRequest( - region_id=clients.region, - task_ids=task_id, - ) - rsp = clients.ecs.describe_tasks(req) - status = rsp.body.task_set.task[0].task_status - if status not in ('Waiting', 'Processing'): - break - if status != 'Finished': - raise RuntimeError(status) + wait_for_task(clients, task_id) + wait_for_image(clients, image_id) + logger.info("%s: imported %s (%s)" % + (clients.region, image.name, image_id)) + return image_id + +def copy_image(clients, image, image_id, censored): + """Copy imported image to censored region""" + logger.info("%s: copying %s (%s) to %s" % + (clients.region, image.name, image_id, censored.region)) + req = ecs.models.CopyImageRequest( + region_id=clients.region, + image_id=image_id, + destination_region_id=censored.region, + destination_image_name=image.name, + ) + rsp = clients.ecs.copy_image(req) + copy_id = rsp.body.image_id + wait_for_image(censored, copy_id) + logger.info("%s: copied %s (%s) to %s" % + (clients.region, image.name, copy_id, censored.region)) + return copy_id + +def finalise_image(clients, image, image_id): + """Finalise image attributes and permissions""" + logger.info("%s: finalising %s (%s)" % + (clients.region, image.name, image_id)) req = ecs.models.ModifyImageAttributeRequest( region_id=clients.region, image_id=image_id, image_family=image.family, ) rsp = clients.ecs.modify_image_attribute(req) - if public: + if image.public: req = ecs.models.ModifyImageSharePermissionRequest( region_id=clients.region, image_id=image_id, is_public=True, ) rsp = clients.ecs.modify_image_share_permission(req) - logger.info("%s: imported %s (%s)" % - (clients.region, image.name, image_id)) - return image_id # Parse command-line arguments parser = argparse.ArgumentParser(description="Import Alibaba Cloud image") @@ -376,18 +266,11 @@ parser.add_argument('--name', '-n', parser.add_argument('--family', '-f', default='ipxe', help="Base family name") parser.add_argument('--public', '-p', action='store_true', - help="Make image public") + help="Make image(s) public") parser.add_argument('--overwrite', action='store_true', help="Overwrite any existing image with same name") parser.add_argument('--region', '-r', action='append', help="AliCloud region(s)") -parser.add_argument('--role', '-R', default="iPXECensorshipBypassRole", - help="AliCloud OSS censorship bypass role") -parser.add_argument('--helper-family', - default="acs:alibaba_cloud_linux_4_lts_x64", - help="Helper OS image family") -parser.add_argument('--helper-machine', default="ecs.e-c1m1.large", - help="Helper machine type") parser.add_argument('image', nargs='+', help="iPXE disk image") args = parser.parse_args() @@ -401,101 +284,93 @@ if not args.name: args.name = '%s-%s' % (args.family, datetime.date.today().strftime('%Y%m%d')) -# Construct image list -images = [image(x, args.family, args.name) for x in args.image] - # Use all regions if none specified -if not args.region: - args.region = all_regions() +regions = args.region or all_regions() -# Construct per-region clients -clients = {region: all_clients(region) for region in args.region} +# Construct image list +images = [image(x, args.family, args.name, args.public) for x in args.image] +imports = [(region, image) for region in regions for image in images] -# Create temporary instances in each region -with ThreadPoolExecutor(max_workers=len(args.region)) as executor: - futures = {executor.submit(create_temp_instance, - clients=clients[region], - family=args.helper_family, - machine=args.helper_machine, - role=args.role): region - for region in args.region} - instances = {futures[x]: x.result() for x in as_completed(futures)} - -# Create temporary buckets in each region (requires instance to exist) -with ThreadPoolExecutor(max_workers=len(args.region)) as executor: +# Construct per-region clients +clients = {region: all_clients(region) for region in regions} + +# Delete existing images from all regions, if applicable +if args.overwrite: + with ThreadPoolExecutor(max_workers=len(imports)) as executor: + futures = {executor.submit(delete_image, + clients=clients[region], + name=image.name): (region, image) + for region, image in imports} + done = {futures[x]: x.result() for x in as_completed(futures)} + +# Create temporary buckets in all uncensored regions +with ThreadPoolExecutor(max_workers=len(regions)) as executor: futures = {executor.submit(create_temp_bucket, - clients=clients[region], - instance=instances[region]): region - for region in args.region - if instances[region] is not None} + clients=clients[region]): region + for region in regions} buckets = {futures[x]: x.result() for x in as_completed(futures)} +if not any(buckets.values()): + parser.error("At least one non-Chinese region is required") -# Select an uncensored region with functioning object storage -uncensored = next((k for k, v in buckets.items() - if v is not None and not k.startswith('cn-')), None) -if uncensored is None: - parser.error("At least one available uncensored region is required") - -# Upload images directly to chosen uncensored region -with ThreadPoolExecutor(max_workers=len(images)) as executor: +# Upload images directly to uncensored regions +with ThreadPoolExecutor(max_workers=len(imports)) as executor: futures = {executor.submit(upload_image, - clients=clients[uncensored], - bucket=buckets[uncensored], - image=image): image - for image in images} - keys = {futures[x]: x.result() for x in as_completed(futures)} - -# Copy images to all other regions -with ThreadPoolExecutor(max_workers=len(args.region)) as executor: - source = {'region': uncensored, 'bucket': buckets[uncensored]} - futures = {executor.submit(copy_images, clients=clients[region], - instance=instances[region], bucket=buckets[region], - source=source): region - for region in args.region - if instances[region] is not None and buckets[region] is not None - and region != uncensored} - done = {futures[x]: x.result() for x in as_completed(futures)} + image=image): (region, image) + for region, image in imports if buckets[region]} + keys = {futures[x]: x.result() for x in as_completed(futures)} -# Import all images -imports = [(region, image) for region in args.region for image in images] +# Import images to uncensored regions with ThreadPoolExecutor(max_workers=len(imports)) as executor: futures = {executor.submit(import_image, clients=clients[region], image=image, bucket=buckets[region], - key=keys[image], - public=args.public, - overwrite=args.overwrite): (region, image) - for region, image in imports - if instances[region] is not None and buckets[region] is not None} + key=keys[(region, image)]): (region, image) + for region, image in imports if buckets[region]} results = {futures[x]: x.result() for x in as_completed(futures)} -# Remove temporary buckets -with ThreadPoolExecutor(max_workers=len(args.region)) as executor: - futures = {executor.submit(delete_temp_bucket, +# Select source uncensored region for each copy +# +# Copies are rate-limited by source region, so spread the copies +# across all available uncensored regions. +# +copies = [(region, censored, image) for region, (censored, image) in zip( + cycle(region for region in regions if buckets[region]), + ((region, image) for region, image in imports if not buckets[region]), +)] + +# Copy images to censored regions +with ThreadPoolExecutor(max_workers=len(imports)) as executor: + futures = {executor.submit(copy_image, clients=clients[region], - instance=instances[region], - bucket=buckets[region]): region - for region in args.region - if instances[region] is not None and buckets[region] is not None} + censored=clients[censored], + image=image, + image_id=results[(region, image)]): + (censored, image) + for region, censored, image in copies} + results.update({futures[x]: x.result() for x in as_completed(futures)}) + +# Finalise images +with ThreadPoolExecutor(max_workers=len(imports)) as executor: + futures = {executor.submit(finalise_image, + clients=clients[region], + image=image, + image_id=results[(region, image)]): + (region, image) + for region, image in imports} done = {futures[x]: x.result() for x in as_completed(futures)} -# Remove temporary instances -with ThreadPoolExecutor(max_workers=len(args.region)) as executor: - futures = {executor.submit(delete_temp_instance, +# Remove temporary buckets +with ThreadPoolExecutor(max_workers=len(regions)) as executor: + futures = {executor.submit(delete_temp_bucket, clients=clients[region], - instance=instances[region], - retry=True): region - for region in args.region - if instances[region] is not None} + bucket=buckets[region]): region + for region in regions if buckets[region]} done = {futures[x]: x.result() for x in as_completed(futures)} # Show created images for region, image in imports: - mark = "(*)" if region == uncensored else "" - result = ("[no ECS]" if instances[region] is None else - "[no OSS]" if buckets[region] is None else - results[(region, image)]) - print("%s%s %s (%s) %s" % (region, mark, image.name, image.family, result)) + image_id = results[(region, image)] + print("%s %s (%s) %s" % (region, image.name, image.family, image_id))