节点运行ECS机器上的脚本

本实践通过在节点中使用Python第三方模块Paramiko,实现远程连接ECS并调用ECS上指定路径的Shell脚本执行,同时,提供两类节点登录ECS的代码示例。

注意事项

  • 通过DataWorks节点运行ECS机器上的脚本,当DataWorks侧对应节点终止运行时,已经在机器上执行的文件仍然会继续执行。

  • 通过DataWorks节点运行ECS机器上脚本的方案仅建议在数据迁移场景下使用,不建议在日常生产中使用。

功能概述

本文提供了PyODPS、EMR Shell两类节点登录ECS的案例,具体如下:

说明

DataWorks有SSH节点,支持在节点内访问ECS并调用执行ECS指定路径脚本。

环境准备

  1. 准备Serverless资源组,并配置Serverless资源组的网络连通。

    网络连通配置请参考网络连通方案

    说明

    获取Serverless资源组相关IP的方式请参考资源组IP网段获取

  2. 在Serverless资源组安装Python第三方包。

    通过Serverless资源组的自定义镜像安装第三方包功能,安装本案例需要使用的第三方包Paramiko。详情请参见镜像管理

方式一:PyODPS节点通过用户名和密码登录ECS

在PyODPS节点中通过用户名密码登录ECS,本示例使用的节点为PyODPS3,代码示例如下:


# 参考:在PyODPS节点中调用第三方包 https://help.aliyun.com/document_detail/94159.html#section-f47-6lb-txv
# /home/tops/bin/pip3 install paramiko==2.11.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
# /home/tops/bin/pip3 install --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple


from paramiko import SSHClient
import paramiko
import sys
client = SSHClient()

client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

client.connect('172.16.0.0', username='root', password='****')


stdin, stdout, stderr = client.exec_command('sh /root/upload_mc_tpcds_1000_warehouse.sh')
stdout.channel.set_combine_stderr(True)
# print(type(stdin))  # <class 'paramiko.channel.ChannelStdinFile'>
# print(type(stdout))  # <class 'paramiko.channel.ChannelFile'>
# print(type(stderr))  # <class 'paramiko.channel.ChannelStderrFile'>

# Print output of command. Will wait for command to finish.
print(f'STDOUT: {stdout.read().decode("utf8")}')
# print(f'STDERR: {stderr.read().decode("utf8")}')

# Get return code from command (0 is default for success)
print(f'Return code: {stdout.channel.recv_exit_status()}')
return_code = stdout.channel.recv_exit_status()

stdin.close()
stdout.close()
stderr.close()
client.close()

if return_code == 0:
    print(f'Over...')
    sys.exit(0)

print(f'Remote shell is failed...')

方式二:PyODPS节点通过用户名和私钥登录ECS并使用工具类

关于Paramiko私钥使用的相关说明请参见python - How do use paramiko.RSAKey.from_private_key()? - Stack Overflo

  1. 定义工具类。

    通过DataWorks Python资源定义工具类,新建Python资源请参考创建并使用MaxCompute资源,本案例中该工具类为RemoteShell.py。代码示例参考如下:

    # -*- coding: utf-8 -*-
    import sys
    
    from paramiko import SSHClient
    import paramiko
    
    class RemoteShell:
    
        def __init__(self, hostname, username, key_filename):
            self.hostname = hostname
            self.username = username
            self.key_filename = key_filename
    
        def run_remote_shell(self, cmd):
            client = SSHClient()
    
            client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            client.connect(hostname=self.hostname, username=self.username, key_filename=self.key_filename)
    
            stdin, stdout, stderr = client.exec_command(cmd)
            stdout.channel.set_combine_stderr(True)
            # Print output of command. Will wait for command to finish.
            print(f'STDOUT: {stdout.read().decode("utf8")}')
            # print(f'STDERR: {stderr.read().decode("utf8")}')
    
            # Get return code from command (0 is default for success)
            print(f'Return code: {stdout.channel.recv_exit_status()}')
            return_code = stdout.channel.recv_exit_status()
    
            stdin.close()
            stdout.close()
            stderr.close()
            client.close()
    
            if return_code == 0:
                print(f'Over...')
                sys.exit(0)
    
            print(f'Remote shell is failed...')
        
  2. 创建密钥文件run_remote_shell_user.pem

    将私钥文件作为File资源上传,资源上传方式请参考创建并使用MaxCompute资源

  3. 新建PyODPS节点,使用工具类与资源。

    1. 本案例中新建的PODPS3节点名为run_remote_shell_private_key_template,在该节点中引用工具类RemoteShell.py,以及密钥文件run_remote_shell_user.pem,资源引用成功后将会在节点run_remote_shell_private_key_template代码中产生一条注释。

    2. 在节点run_remote_shell_private_key_template中定义变量,并为其赋值调度参数,PyODPS节点中使用调度参数的方式,详情请参见PyODPS使用调度参数

      ##@resource_reference{"run_remote_shell_user.pem"}
      ##@resource_reference{"RemoteShell.py"}
      # 参考:在PyODPS节点中调用第三方包 https://help.aliyun.com/document_detail/94159.html#section-f47-6lb-txv
      # /home/tops/bin/pip3 install paramiko==2.11.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
      # /home/tops/bin/pip3 install --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple
      # PYODPS参入参数: https://help.aliyun.com/document_detail/417492.htm#section-uv0-uvh-oau
      # 依赖普通的Python脚本 https://help.aliyun.com/document_detail/94159.html
      import sys
      import os
      sys.path.append(os.path.dirname(os.path.abspath('RemoteShell.py'))) #引入资源至当前环境。
      from RemoteShell import RemoteShell #引用资源,资源名需要删除后缀.py。
      
      remoteShell = RemoteShell(hostname='172.16.0.0', username='****', key_filename='run_remote_shell_user.pem')
      
      dp = args['dp']
      cmd = f'sh /root/dw-emr-shell.sh {dp}'
      print(f'Remote shell is : {cmd}')
      remoteShell.run_remote_shell(cmd)
      
      print("Remote Shell is finished.")                      

方式三:EMR Shell节点通过用户名和密码登录ECS并使用工具类

  1. 定义工具类。

    将Python文件作为EMR资源文件上传,关于Python资源请参考创建EMR资源,本案例中定义的Python资源名为ecs.py

    from paramiko import SSHClient
    import paramiko
    import sys, getopt
    
    username = ''
    password = ''
    ip = ''
    cmd = ''
    try:
        opts, args = getopt.getopt(sys.argv[1:], "u:p:i:c:", ["user=", "password=", "ip=", "cmd="])
    except getopt.GetoptError:
        print('error get inputs')
        sys.exit(2)
    for opt, arg in opts:
    
        if opt in ("-u", "--user"):
            username = arg
            print('username: ' + username)
        elif opt in ("-p", "--password"):
            password = arg
            print('password: ' + password)
        elif opt in ("-i", "--ip"):
            ip = arg
            print('ip: ' + ip)
        elif opt in ("-c", "--cmd"):
            cmd = arg
            print('cmd: ' + cmd)
    
    print(username, password, ip, cmd)
    
    client = SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(ip, username=username, password=password)
    
    stdin, stdout, stderr = client.exec_command(cmd)
    stdout.channel.set_combine_stderr(True)
    # Print output of command. Will wait for command to finish.
    print(f'STDOUT: {stdout.read().decode("utf8")}')
    # print(f'STDERR: {stderr.read().decode("utf8")}')
    
    # Get return code from command (0 is default for success)
    return_code = stdout.channel.recv_exit_status()
    print(f'Return code: {return_code}')
    
    stdin.close()
    stdout.close()
    stderr.close()
    client.close()
    
    if return_code == 0:
        print(f'Over...')
        sys.exit(0)
    
    print(f'Remote shell is failed...')
  2. 新建EMR Shell节点,使用工具类。

    本案例中定义EMR Shell节点名为run_remote_shell_EMR,在节点run_remote_shell_EMR中引用工具类ecs.py,引用成功后将会产生一条注释。

    ##@resource_reference{"ecs.py"}
    /home/tops/bin/python3 ecs.py  -u root -p 'password' -i '172.0.X.X' -c 'sh /abc.sh'
    说明

    其中参数含义如下:-u: 登录ECS的用户名;-p: 密码;-i: ecs 内网IP地址;-c: 需要执行的命令。

方式四:EMR Shell节点通过用户名和私钥登录ECS并使用工具类

关于Paramiko私钥使用的相关说明请参见python - How do use paramiko.RSAKey.from_private_key()? - Stack Overflo

  1. 上传私钥。

    将登录ECS的私钥上传为EMR的File资源文件,操作详情请参考创建EMR资源。本案例中使用的私钥为ssh_pair_yunlin_beijing.pem

  2. 定义工具类。

    通过将Python文件上传为EMR的资源文件定义工具类,上传Python资源请参考创建EMR资源。本案例中定义的Python资源名为run_remote_shell.py。示例代码如下。

    说明

    上传Python文件前,请根据实际情况修改相关参数。

    • username:对应的用户名称。

    • ip:连接ECS的IP。

    • private_key:步骤一上传的EMR资源名称(即私钥)。

    from paramiko import SSHClient
    import paramiko
    import sys, getopt
    
    username = 'emr-user'
    # private_key名称替换成上传的私钥的资源的名称。
    private_key = 'ssh_pair_yunlin_beijing.pem'
    password = ''
    ip = '172.16.8.188'
    cmd = sys.argv[1]
    
    print(f'params - ip: {ip}, username: {username}, ip: {ip}, cmd: {cmd}')
    
    
    client = SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(ip, username=username, key_filename=private_key)
    
    stdin, stdout, stderr = client.exec_command(cmd)
    stdout.channel.set_combine_stderr(True)
    # Print output of command. Will wait for command to finish.
    print(f'STDOUT: {stdout.read().decode("utf8")}')
    
    # Get return code from command (0 is default for success)
    return_code = stdout.channel.recv_exit_status()
    print(f'Return code: {return_code}')
    
    stdin.close()
    stdout.close()
    stderr.close()
    client.close()
    
    if return_code == 0:
        print(f'Over...')
        sys.exit(0)
    
    print(f'Remote shell is failed...')
  3. 新建EMR Shell节点,使用工具类。

    本案例中定义的EMR Shell节点名为run_remote_shell,在该节点中引用工具类run_remote_shell.py及私钥ssh_pair_yunlin_beijing.pem,引用成功后将会产生两条注释。新建EMR Shell节点并引用资源,详情请参见创建EMR Shell节点

    ##@resource_reference{"run_remote_shell.py"}
    ##@resource_reference{"ssh_pair_yunlin_beijing.pem"}
    /home/tops/bin/python3 run_remote_shell.py "ls /tmp/" 
    
    说明
    • 实际使用时,可修改resource_reference中的资源为您所上传的资源名称。

    • run_remote_shell.py后的命令即为需要在ECS执行的命令。