节点运行ECS机器上的脚本

更新时间: 2023-11-16 16:01:52

本实践通过在节点中使用python三方模块Paramiko,实现远程连接ECS并调用ECS上指定路径的Shell脚本执行,并提供两类节点登录ECS的代码示例。

注意事项

  • 通过DataWorks节点运行ECS机器上的脚本,当DataWorks侧对应节点终止运行时,已经在机器上执行的文件仍然会继续执行。

  • 通过DataWorks节点运行ECS机器上脚本的方案仅建议在数据迁移场景下使用,不建议在日常生产中使用。

功能概述

环境准备

  1. 准备独享调度资源组,并配置独享调度资源组的网络连通。

    • 访问内网环境,网络连通配置参考网络连通方案,ECS安全组入网规则配置独享调度资源组绑定的交换机网段。

    • 访问公网环境,可直接访问,若ECS存在安全组配置,ECS安全组入网规则配置独享调度资源组EIP。

    说明

    获取独享调度资源组相关IP的方式参考:使用独享调度资源组执行任务需要添加的IP白名单

  2. 在独享调度资源组安装python第三方包。

    通过独享调度资源组的运维助手功能安装本案例需要使用的三方包Paramiko,命令如下。

    /home/tops/bin/pip3 install --upgradepip -i https://pypi.tuna.tsinghua.edu.cn/simple
    /home/tops/bin/pip3 install paramiko==2.11.0 -i https://pypi.tuna.tsinghua.edu.cn/simple

    pyodps使用第三方详情请参见:pyodps使用开源的三方包

方式一:PyODPS节点通过用户名和密码登录ECS

在PYODPS节点中通过用户名密码登录ECS,本示例使用的节点为PYODPS3,代码示例如下:


# 参考:在PyODPS节点中调用第三方包 https://help.aliyun.com/document_detail/94159.html#section-f47-6lb-txv
# /home/tops/bin/pip3 install paramiko==2.11.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
# /home/tops/bin/pip3 install --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple


from paramiko import SSHClient
import paramiko
import sys
client = SSHClient()

client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

client.connect('172.16.0.0', username='root', password='****')


stdin, stdout, stderr = client.exec_command('sh /root/upload_mc_tpcds_1000_warehouse.sh')
stdout.channel.set_combine_stderr(True)
# print(type(stdin))  # <class 'paramiko.channel.ChannelStdinFile'>
# print(type(stdout))  # <class 'paramiko.channel.ChannelFile'>
# print(type(stderr))  # <class 'paramiko.channel.ChannelStderrFile'>

# Print output of command. Will wait for command to finish.
print(f'STDOUT: {stdout.read().decode("utf8")}')
# print(f'STDERR: {stderr.read().decode("utf8")}')

# Get return code from command (0 is default for success)
print(f'Return code: {stdout.channel.recv_exit_status()}')
return_code = stdout.channel.recv_exit_status()

stdin.close()
stdout.close()
stderr.close()
client.close()

if return_code == 0:
    print(f'Over...')
    sys.exit(0)

print(f'Remote shell is failed...')
sys.exit(-1)
                

方式二:PyODPS节点通过用户名和私钥登录ECS并使用工具类

关于paramiko私钥使用的相关说明请参见:如何使用paramiko.set_missing_host_key_policyey.from_private_key()?

  1. 定义工具类。

    通过DataWoks python资源定义工具类,关于新建python资源请参考创建并使用MaxCompute资源,本案例中该工具类为RemoteShell.py。代码示例参考如下:

    # -*- coding: utf-8 -*-
    import sys
    
    from paramiko import SSHClient
    import paramiko
    
    class RemoteShell:
    
        def __init__(self, hostname, username, key_filename):
            self.hostname = hostname
            self.username = username
            self.key_filename = key_filename
    
        def run_remote_shell(self, cmd):
            client = SSHClient()
    
            client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            client.connect(hostname=self.hostname, username=self.username, key_filename=self.key_filename)
    
            stdin, stdout, stderr = client.exec_command(cmd)
            stdout.channel.set_combine_stderr(True)
            # Print output of command. Will wait for command to finish.
            print(f'STDOUT: {stdout.read().decode("utf8")}')
            # print(f'STDERR: {stderr.read().decode("utf8")}')
    
            # Get return code from command (0 is default for success)
            print(f'Return code: {stdout.channel.recv_exit_status()}')
            return_code = stdout.channel.recv_exit_status()
    
            stdin.close()
            stdout.close()
            stderr.close()
            client.close()
    
            if return_code == 0:
                print(f'Over...')
                sys.exit(0)
    
            print(f'Remote shell is failed...')
            sys.exit(-1)
    
                            
  2. 创建密钥文件run_remote_shell_user.pem。

    将私钥文件作为File资源上传,资源上传方式请参考创建并使用MaxCompute资源

  3. 新建pyodps节点,使用工具类与资源。

    1. 本案例中新建的PODPS3节点名为run_remote_shell_private_key_template,在该节点中引用工具类RemoteShell.py,以及密钥文件run_remote_shell_user.pem,资源引用成功后将会在节点run_remote_shell_private_key_template代码中产生一条注释。

    2. 在节点run_remote_shell_private_key_template中定义变量,并为其赋值调度参数,pyodps节点中使用调度参数的方式,详情请参见pyodps使用调度参数

      ##@resource_reference{"run_remote_shell_user.pem"}
      ##@resource_reference{"RemoteShell.py"}
      
      # 参考:在PyODPS节点中调用第三方包 https://help.aliyun.com/document_detail/94159.html#section-f47-6lb-txv
      # /home/tops/bin/pip3 install paramiko==2.11.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
      # /home/tops/bin/pip3 install --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple
      # PYODPS参入参数: https://help.aliyun.com/document_detail/417492.htm#section-uv0-uvh-oau
      # 依赖普通的Python脚本 https://help.aliyun.com/document_detail/94159.html
      
      import sys
      import os
      sys.path.append(os.path.dirname(os.path.abspath('RemoteShell.py'))) #引入资源至当前环境。
      from RemoteShell import RemoteShell #引用资源,资源名需要删除后缀.py。
      
      remoteShell = RemoteShell(hostname='172.16.0.0', username='****', key_filename='run_remote_shell_user.pem')
      
      dp = args['dp']
      cmd = f'sh /root/dw-emr-shell.sh {dp}'
      print(f'Remote shell is : {cmd}')
      remoteShell.run_remote_shell(cmd)
      
      
      print("Remote Shell is finished.")
                                      

方式三:EMR Shell节点通过用户名和密码登录ECS并使用工具类

  1. 定义工具类。

    将python文件作为EMR资源文件上传,关于python资源请参考创建EMR资源,本案例中定义的python资源名为ecs.py。

    from paramiko import SSHClient
    import paramiko
    import sys, getopt
    
    username = ''
    password = ''
    ip = ''
    cmd = ''
    try:
        opts, args = getopt.getopt(sys.argv[1:], "u:p:i:c:", ["user=", "password=", "ip=", "cmd="])
    except getopt.GetoptError:
        print('error get inputs')
        sys.exit(2)
    for opt, arg in opts:
    
        if opt in ("-u", "--user"):
            username = arg
            print('username: ' + username)
        elif opt in ("-p", "--password"):
            password = arg
            print('password: ' + password)
        elif opt in ("-i", "--ip"):
            ip = arg
            print('ip: ' + ip)
        elif opt in ("-c", "--cmd"):
            cmd = arg
            print('cmd: ' + cmd)
    
    print(username, password, ip, cmd)
    
    client = SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(ip, username=username, password=password)
    
    stdin, stdout, stderr = client.exec_command(cmd)
    stdout.channel.set_combine_stderr(True)
    # Print output of command. Will wait for command to finish.
    print(f'STDOUT: {stdout.read().decode("utf8")}')
    # print(f'STDERR: {stderr.read().decode("utf8")}')
    
    # Get return code from command (0 is default for success)
    return_code = stdout.channel.recv_exit_status()
    print(f'Return code: {return_code}')
    
    stdin.close()
    stdout.close()
    stderr.close()
    client.close()
    
    if return_code == 0:
        print(f'Over...')
        sys.exit(0)
    
    print(f'Remote shell is failed...')
    sys.exit(-1)
  2. 新建EMR shell节点,使用工具类。

    本案例中定义EMR shell节点名为run_remote_shell_EMR,在节点run_remote_shell_EMR中引用工具类ecs.py,引用成功后将会产生一条注释。

    ##@resource_reference{"ecs.py"}
    /home/tops/bin/python3 ecs.py  -u root -p 'password' -i '172.0.X.X' -c 'sh /abc.sh'
    说明

    其中参数含义如下:-u: 登录ECS的用户名;-p: 密码;-i: ecs 内网IP地址;-c: 需要执行的命令。

阿里云首页 大数据开发治理平台 DataWorks 相关技术圈