本实践通过在节点中使用Python第三方模块Paramiko,实现远程连接ECS并调用ECS上指定路径的Shell脚本执行,同时,提供两类节点登录ECS的代码示例。
注意事项
通过DataWorks节点运行ECS机器上的脚本,当DataWorks侧对应节点终止运行时,已经在机器上执行的文件仍然会继续执行。
通过DataWorks节点运行ECS机器上脚本的方案仅建议在数据迁移场景下使用,不建议在日常生产中使用。
功能概述
本文提供了PyODPS、EMR Shell两类节点登录ECS的案例,具体如下:
DataWorks有SSH节点,支持在节点内访问ECS并调用执行ECS指定路径脚本。
环境准备
方式一:PyODPS节点通过用户名和密码登录ECS
在PyODPS节点中通过用户名密码登录ECS,本示例使用的节点为PyODPS3,代码示例如下:
# 参考:在PyODPS节点中调用第三方包 https://help.aliyun.com/document_detail/94159.html#section-f47-6lb-txv
# /home/tops/bin/pip3 install paramiko==2.11.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
# /home/tops/bin/pip3 install --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple
from paramiko import SSHClient
import paramiko
import sys
client = SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect('172.16.0.0', username='root', password='****')
stdin, stdout, stderr = client.exec_command('sh /root/upload_mc_tpcds_1000_warehouse.sh')
stdout.channel.set_combine_stderr(True)
# print(type(stdin)) # <class 'paramiko.channel.ChannelStdinFile'>
# print(type(stdout)) # <class 'paramiko.channel.ChannelFile'>
# print(type(stderr)) # <class 'paramiko.channel.ChannelStderrFile'>
# Print output of command. Will wait for command to finish.
print(f'STDOUT: {stdout.read().decode("utf8")}')
# print(f'STDERR: {stderr.read().decode("utf8")}')
# Get return code from command (0 is default for success)
print(f'Return code: {stdout.channel.recv_exit_status()}')
return_code = stdout.channel.recv_exit_status()
stdin.close()
stdout.close()
stderr.close()
client.close()
if return_code == 0:
print(f'Over...')
sys.exit(0)
print(f'Remote shell is failed...')
方式二:PyODPS节点通过用户名和私钥登录ECS并使用工具类
关于Paramiko私钥使用的相关说明请参见python - How do use paramiko.RSAKey.from_private_key()? - Stack Overflo。
定义工具类。
通过DataWorks Python资源定义工具类,新建Python资源请参考创建并使用MaxCompute资源,本案例中该工具类为
RemoteShell.py
。代码示例参考如下:# -*- coding: utf-8 -*- import sys from paramiko import SSHClient import paramiko class RemoteShell: def __init__(self, hostname, username, key_filename): self.hostname = hostname self.username = username self.key_filename = key_filename def run_remote_shell(self, cmd): client = SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(hostname=self.hostname, username=self.username, key_filename=self.key_filename) stdin, stdout, stderr = client.exec_command(cmd) stdout.channel.set_combine_stderr(True) # Print output of command. Will wait for command to finish. print(f'STDOUT: {stdout.read().decode("utf8")}') # print(f'STDERR: {stderr.read().decode("utf8")}') # Get return code from command (0 is default for success) print(f'Return code: {stdout.channel.recv_exit_status()}') return_code = stdout.channel.recv_exit_status() stdin.close() stdout.close() stderr.close() client.close() if return_code == 0: print(f'Over...') sys.exit(0) print(f'Remote shell is failed...')
创建密钥文件
run_remote_shell_user.pem
。将私钥文件作为File资源上传,资源上传方式请参考创建并使用MaxCompute资源。
新建PyODPS节点,使用工具类与资源。
本案例中新建的PODPS3节点名为run_remote_shell_private_key_template,在该节点中引用工具类
RemoteShell.py
,以及密钥文件run_remote_shell_user.pem
,资源引用成功后将会在节点run_remote_shell_private_key_template代码中产生一条注释。在节点run_remote_shell_private_key_template中定义变量,并为其赋值调度参数,PyODPS节点中使用调度参数的方式,详情请参见PyODPS使用调度参数。
##@resource_reference{"run_remote_shell_user.pem"} ##@resource_reference{"RemoteShell.py"} # 参考:在PyODPS节点中调用第三方包 https://help.aliyun.com/document_detail/94159.html#section-f47-6lb-txv # /home/tops/bin/pip3 install paramiko==2.11.0 -i https://pypi.tuna.tsinghua.edu.cn/simple # /home/tops/bin/pip3 install --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple # PYODPS参入参数: https://help.aliyun.com/document_detail/417492.htm#section-uv0-uvh-oau # 依赖普通的Python脚本 https://help.aliyun.com/document_detail/94159.html import sys import os sys.path.append(os.path.dirname(os.path.abspath('RemoteShell.py'))) #引入资源至当前环境。 from RemoteShell import RemoteShell #引用资源,资源名需要删除后缀.py。 remoteShell = RemoteShell(hostname='172.16.0.0', username='****', key_filename='run_remote_shell_user.pem') dp = args['dp'] cmd = f'sh /root/dw-emr-shell.sh {dp}' print(f'Remote shell is : {cmd}') remoteShell.run_remote_shell(cmd) print("Remote Shell is finished.")
方式三:EMR Shell节点通过用户名和密码登录ECS并使用工具类
定义工具类。
将Python文件作为EMR资源文件上传,关于Python资源请参考创建EMR资源,本案例中定义的Python资源名为
ecs.py
。from paramiko import SSHClient import paramiko import sys, getopt username = '' password = '' ip = '' cmd = '' try: opts, args = getopt.getopt(sys.argv[1:], "u:p:i:c:", ["user=", "password=", "ip=", "cmd="]) except getopt.GetoptError: print('error get inputs') sys.exit(2) for opt, arg in opts: if opt in ("-u", "--user"): username = arg print('username: ' + username) elif opt in ("-p", "--password"): password = arg print('password: ' + password) elif opt in ("-i", "--ip"): ip = arg print('ip: ' + ip) elif opt in ("-c", "--cmd"): cmd = arg print('cmd: ' + cmd) print(username, password, ip, cmd) client = SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(ip, username=username, password=password) stdin, stdout, stderr = client.exec_command(cmd) stdout.channel.set_combine_stderr(True) # Print output of command. Will wait for command to finish. print(f'STDOUT: {stdout.read().decode("utf8")}') # print(f'STDERR: {stderr.read().decode("utf8")}') # Get return code from command (0 is default for success) return_code = stdout.channel.recv_exit_status() print(f'Return code: {return_code}') stdin.close() stdout.close() stderr.close() client.close() if return_code == 0: print(f'Over...') sys.exit(0) print(f'Remote shell is failed...')
新建EMR Shell节点,使用工具类。
本案例中定义EMR Shell节点名为run_remote_shell_EMR,在节点run_remote_shell_EMR中引用工具类
ecs.py
,引用成功后将会产生一条注释。##@resource_reference{"ecs.py"} /home/tops/bin/python3 ecs.py -u root -p 'password' -i '172.0.X.X' -c 'sh /abc.sh'
说明其中参数含义如下:
-u
: 登录ECS的用户名;-p
: 密码;-i
: ecs 内网IP地址;-c
: 需要执行的命令。
方式四:EMR Shell节点通过用户名和私钥登录ECS并使用工具类
关于Paramiko私钥使用的相关说明请参见python - How do use paramiko.RSAKey.from_private_key()? - Stack Overflo。
上传私钥。
将登录ECS的私钥上传为EMR的File资源文件,操作详情请参考创建EMR资源。本案例中使用的私钥为
ssh_pair_yunlin_beijing.pem
。定义工具类。
通过将Python文件上传为EMR的资源文件定义工具类,上传Python资源请参考创建EMR资源。本案例中定义的Python资源名为
run_remote_shell.py
。示例代码如下。说明上传Python文件前,请根据实际情况修改相关参数。
username
:对应的用户名称。ip
:连接ECS的IP。private_key
:步骤一上传的EMR资源名称(即私钥)。
from paramiko import SSHClient import paramiko import sys, getopt username = 'emr-user' # private_key名称替换成上传的私钥的资源的名称。 private_key = 'ssh_pair_yunlin_beijing.pem' password = '' ip = '172.16.8.188' cmd = sys.argv[1] print(f'params - ip: {ip}, username: {username}, ip: {ip}, cmd: {cmd}') client = SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(ip, username=username, key_filename=private_key) stdin, stdout, stderr = client.exec_command(cmd) stdout.channel.set_combine_stderr(True) # Print output of command. Will wait for command to finish. print(f'STDOUT: {stdout.read().decode("utf8")}') # Get return code from command (0 is default for success) return_code = stdout.channel.recv_exit_status() print(f'Return code: {return_code}') stdin.close() stdout.close() stderr.close() client.close() if return_code == 0: print(f'Over...') sys.exit(0) print(f'Remote shell is failed...')
新建EMR Shell节点,使用工具类。
本案例中定义的EMR Shell节点名为
run_remote_shell
,在该节点中引用工具类run_remote_shell.py
及私钥ssh_pair_yunlin_beijing.pem
,引用成功后将会产生两条注释。新建EMR Shell节点并引用资源,详情请参见创建EMR Shell节点。##@resource_reference{"run_remote_shell.py"} ##@resource_reference{"ssh_pair_yunlin_beijing.pem"} /home/tops/bin/python3 run_remote_shell.py "ls /tmp/"
说明实际使用时,可修改resource_reference中的资源为您所上传的资源名称。
run_remote_shell.py
后的命令即为需要在ECS执行的命令。