自动上传APRS天气信息的python脚本
一、python脚本说明
1.python代码由ba7ib编写,转载请注明。
2.脚本可以在OpenWrt设备上运行,目前使用华为HG225D路由器运行正常。
3.脚本中拉取的温湿度、气压、风、雨、PM2.5、甲醛、空气质量、日出日落等天气数据来自和风天气。
4.运行脚本只需修改代码中的Callsign,Passcode,Key,City几个参数即可。
Key到http://www.heweather.com/注册申请,选择免费套餐。
City代码:https://where.heweather.com/index.html
二、上传python脚本
1.保存以下脚本为weather.py
2.WinSCP上传至路由器root目录
三、路由器配置
1.安装依赖环境(视不同路由器按需安装)
2.设置启动项
- python2 /etc/config/weather.py &
3.设置运行时间
①putty运行:
- crontab -e
②进入编辑状态,最后一行添加:
- */15 * * * * python2 -u weather.py (##15分钟运行一次)
四、Putty启动脚本
- python2 weather.py &
五、附:Python 脚本
- #!/usr/bin/python
- import os
- import json, urllib2, string
- from datetime import datetime
- import socket
- import sys
- from time import sleep
- import re, ssl
- ssl._create_default_https_context = ssl._create_unverified_context
- Callsign = '呼号+SSID'
- Passcode = '验证码'
- Server = 's.aprs.cn:14580'
- Protocal = 'any'
- City = '填写你所在的城市CID'
- Key = '填写你注册申请得到的key'
- def send_aprsframe(aprs_frame):
- sended = False
- Aprs_Sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- while not sended:
- Aprs_Sock.connect((Server.split(':')[0], int(Server.split(':')[1])))
- Aprs_Sock.send('user %s pass %s vers BA7IB Scripts (Python) ' % (Callsign, Passcode) )
- sReturn = Aprs_Sock.recv(4096)
- if sReturn.startswith("#"):
- print "Succesfully logged to APRS-IS! %s" % sReturn
- Aprs_Sock.send('%s ' % aprs_frame)
- sended = True
- sleep(1)
- Aprs_Sock.shutdown(0)
- Aprs_Sock.close()
- class APRSFrame:
- def __init__(self):
- self.source = None
- self.dest = None
- self.path = []
- self.payload = unicode()
- def export(self, encode=True):
- tnc2 = "%s>%s,%s:%s" % (self.source, self.dest, ','.join(self.path), self.payload)
- if len(tnc2) > 510:
- tnc2 = tnc2[:510]
- if encode:
- tnc2 = tnc2.encode('ISO-8859-1')
- return tnc2
- def bc():
- bcargs_weather = {
- 'callsign': Callsign,
- 'weather': 'https://free-api.heweather.com/v5/weather?lang=en&city=%s&key=%s' % (City, Key),
- }
- while True:
- frame = get_weather_frame(**bcargs_weather)
- if frame:
- send_aprsframe(frame)
- sleep(300)
- def process_ambiguity(pos, ambiguity):
- num = bytearray(pos)
- for i in range(0, ambiguity):
- if i > 1:
- i += 1
- i += 2
- num[-i] = " "
- return str(num)
- def encode_lat(lat):
- lat_dir = 'N' if lat > 0 else 'S'
- lat_abs = abs(lat)
- lat_deg = int(lat_abs)
- lat_min = (lat_abs % 1) * 60
- return "%02i%05.2f%c" % (lat_deg, lat_min, lat_dir)
- def encode_lng(lng):
- lng_dir = 'E' if lng > 0 else 'W'
- lng_abs = abs(lng)
- lng_deg = int(lng_abs)
- lng_min = (lng_abs % 1) * 60
- return "%03i%05.2f%c" % (lng_deg, lng_min, lng_dir)
- def mkframe(callsign, payload):
- frame = APRSFrame()
- frame.source = callsign
- frame.dest = u'APRS'
- frame.path = [u'TCPIP*']
- frame.payload = payload
- return frame.export()
- def get_weather_frame(callsign, weather):
- try:
- req = urllib2.Request(weather)
- wea_str = urllib2.urlopen(req).read()
- if wea_str == -1:
- print "Sorry that I can't find the weather info of this city now, please check or retry. weather string = %s " % wea_str
- sys.exit(-1)
- else:
- w = json.loads(wea_str, encoding='utf-8')["HeWeather5"][0]
- timestamp = w['basic']['update']['utc']
- enc_lat = process_ambiguity(encode_lat(string.atof(w['basic']['lat'])), 0)
- enc_lng = process_ambiguity(encode_lng(string.atof(w['basic']['lon'])), 0)
- wenc = "%s%s%s" % (enc_lat, '/', enc_lng)
- wind = w['now'].get('wind', {})
- wenc += "_%03d" % string.atoi(wind['deg'])
- wenc += "/%03d" % string.atoi(wind['spd'])
- wenc += "g%03d" % 0
- cond = w['now']
- wenc += "t%03d" % round(string.atof(cond['tmp']) / (float(5)/9) + 32)
- wenc += "r%03d" % 0
- wenc += "p%03d" % round(string.atof(cond['pcpn']) / 25.4)
- if 'hum' in cond:
- wenc += "h%02d" % string.atoi(cond['hum'])
- else:
- wenc += "h..."
- if 'pres' in cond:
- wenc += "b%04d" % round(string.atof(cond['pres']) * 10)
- else:
- wenc += "b..."
- if 'aqi' in w:
- aqi = w['aqi']['city']
- wenc += ",...,...,...,000,%03d,%03d" % (string.atoi(aqi['pm10']), string.atoi(aqi['pm25']))
- wenc += ",."
- if 'daily_forecast' in w:
- df = w['daily_forecast'][0]['astro']
- ext_info = " Air:%s SunR/S:%s/%s MoonR/S:%s/%s" % (aqi['qlty'], df['sr'], df['ss'], df['mr'], df['ms'])
- else:
- ext_info = ""
- payload = "=%s%s Info@UTC%s" % ( wenc, ext_info, timestamp)
- return mkframe(callsign, payload)
- except:
- print "Weather decode error"
- return None
- bc()