如下所示为python版的小米推送server端sdk,仅实现了topic透传。
标签:python
[Python] 工作日查询脚本
智能家居中,经常要判断当天是否工作日。今天发现工作日api返回的数据不正常,才知道api提供商已经开始收费(或者提高免费使用门槛),故自己写了一个,如下所示:
注意,这个脚本不能直接判断给定的日期是否工作日。它会根据几个全局变量的设置生成一个“20**_date.txt”的文件。文件中每一行代表每一天的假日类型,2代表节假日(含连休调休等),1代表普通周六日,0代表工作日。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
... 20160928 0 20160929 0 20160930 0 20161001 2 20161002 2 20161003 2 20161004 2 20161005 2 20161006 2 20161007 2 20161008 0 20161009 0 20161010 0 20161011 0 20161012 0 ... |
数据生成以后,可以自己解析它来实现工作日判断。
List directory tree structure using Python
写了一个小脚本来输出目录树:
效果如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
-[ java ] -[ com ] -[ nostra13 ] -[ universalimageloader ] -[ cache ] -[ disc ] |- DiskCache.java | -[ impl ] |- BaseDiskCache.java |- LimitedAgeDiskCache.java |- UnlimitedDiskCache.java | -[ ext ] |- DiskLruCache.java |- LruDiskCache.java |- StrictLineReader.java |- Util.java | -[ naming ] |- FileNameGenerator.java |- HashCodeFileNameGenerator.java |- Md5FileNameGenerator.java | -[ memory ] |- BaseMemoryCache.java |- LimitedMemoryCache.java |- MemoryCache.java | -[ impl ] |- FIFOLimitedMemoryCache.java |- FuzzyKeyMemoryCache.java |- LargestLimitedMemoryCache.java |- LimitedAgeMemoryCache.java |- LRULimitedMemoryCache.java |- LruMemoryCache.java |- UsingFreqLimitedMemoryCache.java |- WeakMemoryCache.java | -[ core ] |- DefaultConfigurationFactory.java |- DisplayBitmapTask.java |- DisplayImageOptions.java |- ImageLoader.java |- ImageLoaderConfiguration.java |- ImageLoaderEngine.java |- ImageLoadingInfo.java |- LoadAndDisplayImageTask.java |- ProcessAndDisplayImageTask.java | -[ assist ] |- ContentLengthInputStream.java |- FailReason.java |- FlushedInputStream.java |- ImageScaleType.java |- ImageSize.java |- LoadedFrom.java |- QueueProcessingType.java |- ViewScaleType.java | -[ deque ] |- BlockingDeque.java |- Deque.java |- LIFOLinkedBlockingDeque.java |- LinkedBlockingDeque.java | -[ decode ] |- BaseImageDecoder.java |- ImageDecoder.java |- ImageDecodingInfo.java | -[ display ] |- BitmapDisplayer.java |- CircleBitmapDisplayer.java |- FadeInBitmapDisplayer.java |- RoundedBitmapDisplayer.java |- RoundedVignetteBitmapDisplayer.java |- SimpleBitmapDisplayer.java | -[ download ] |- BaseImageDownloader.java |- ImageDownloader.java | -[ imageaware ] |- ImageAware.java |- ImageViewAware.java |- NonViewAware.java |- ViewAware.java | -[ listener ] |- ImageLoadingListener.java |- ImageLoadingProgressListener.java |- PauseOnScrollListener.java |- SimpleImageLoadingListener.java | -[ process ] |- BitmapProcessor.java | -[ utils ] |- DiskCacheUtils.java |- ImageSizeUtils.java |- IoUtils.java |- L.java |- MemoryCacheUtils.java |- StorageUtils.java |
增强蓝牙dumpstate log 状态码可读性的python脚本
在logcat
中,我们经常看到这样的蓝牙log:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
01-01 09:48:37.479 1326 1377 D BluetoothManagerService: Sending off request. 01-01 09:48:37.479 6111 16098 D BluetoothAdapterService(312951812): disable() called... 01-01 09:48:37.479 6111 6138 D BluetoothAdapterState: CURRENT_STATE=ON, MSG = USER_TURN_OFF 01-01 09:48:37.479 6111 6138 D BluetoothAdapterProperties: Setting state to 13 01-01 09:48:37.479 6111 6138 I BluetoothAdapterState: Bluetooth adapter state changed: 12-> 13 01-01 09:48:37.479 6111 6138 D BluetoothAdapterService: Bluetooth PBAP supproted is true 01-01 09:48:37.479 6111 6138 D BluetoothAdapterService: updateAdapterState state is 13 01-01 09:48:37.479 6111 6138 D BluetoothAdapterService(312951812): updateAdapterState() - Broadcasting state to 1 receivers. 01-01 09:48:37.479 6111 6111 I BluetoothPbapService: action: android.bluetooth.adapter.action.STATE_CHANGED, state: 13 01-01 09:48:37.489 6111 6138 D BluetoothAdapterService: Autoconnection is available 01-01 09:48:37.489 6111 6138 D BluetoothAdapterService: updateAdapterState prevState = 12newState = 13 01-01 09:48:37.489 6111 6138 D BluetoothAdapterService: terminating service from this receiver |
其中的状态码12
13
等可读性十分差,经常忘记其代表的含义。
于是我写了一个python程序,将这些状态码转换成可读性强的表述,如下所示:
原始log:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
HeadsetStateMachine: Connected process message: 9 HeadsetStateMachine: getDeviceForMessage HeadsetStateMachine: getDeviceForMessage: returning 78:F7:BE:01:AA:84 HeadsetStateMachine: mNumActive: 0 mNumHeld: 0 mCallState: 2 HeadsetStateMachine: mNumber: mType: 128 HeadsetStateMachine: terminateScoUsingVirtualVoiceCall: Received HeadsetStateMachine: terminateScoUsingVirtualVoiceCall:No present call to terminate HeadsetStateMachine: PreferredDevice : 78:F7:BE:01:AA:84 HeadsetStateMachine: setAudioParameters HeadsetStateMachine: Set NREC: 1(ON) for device:78:F7:BE:01:AA:84 HeadsetStateMachine: Set SampleRate: 8000(NB) for device:78:F7:BE:01:AA:84 HeadsetStateMachine: Connected process message: 12 HeadsetStateMachine: getDeviceForMessage HeadsetStateMachine: getDeviceForMessage(CLCC): No matching device for 104. Returning null HeadsetStateMachine: checkBlacklistWithId(78:F7:BE:01:AA:84) : false, id : 2 HeadsetStateMachine: processSendClccResponse: Failed to get device for CLCC response. Drop response HeadsetStateMachine: Connected process message: 12 HeadsetStateMachine: getDeviceForMessage HeadsetStateMachine: getDeviceForMessage(CLCC): No matching device for 104. Returning null HeadsetStateMachine: checkBlacklistWithId(78:F7:BE:01:AA:84) : false, id : 2 HeadsetStateMachine: processSendClccResponse: Failed to get device for CLCC response. Drop response HeadsetStateMachine: Connected process message: 101 HeadsetStateMachine: event type: 17 HeadsetStateMachine: EVENT_TYPE_WBS codec is 1 |
处理后:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
HeadsetStateMachine: Connected process message: 9 (CALL_STATE_CHANGED) HeadsetStateMachine: getDeviceForMessage HeadsetStateMachine: getDeviceForMessage: returning 78:F7:BE:01:AA:84 HeadsetStateMachine: mNumActive: 0 mNumHeld: 0 mCallState: 2 (CALL_STATE_DIALING) HeadsetStateMachine: mNumber: mType: 128 HeadsetStateMachine: terminateScoUsingVirtualVoiceCall: Received HeadsetStateMachine: terminateScoUsingVirtualVoiceCall:No present call to terminate HeadsetStateMachine: PreferredDevice : 78:F7:BE:01:AA:84 HeadsetStateMachine: setAudioParameters HeadsetStateMachine: Set NREC: 1(ON) for device:78:F7:BE:01:AA:84 HeadsetStateMachine: Set SampleRate: 8000(NB) for device:78:F7:BE:01:AA:84 HeadsetStateMachine: Connected process message: 12 (SEND_CCLC_RESPONSE) HeadsetStateMachine: getDeviceForMessage HeadsetStateMachine: getDeviceForMessage(CLCC): No matching device for 104. Returning null HeadsetStateMachine: checkBlacklistWithId(78:F7:BE:01:AA:84) : false, id : 2 HeadsetStateMachine: processSendClccResponse: Failed to get device for CLCC response. Drop response HeadsetStateMachine: Connected process message: 12 (SEND_CCLC_RESPONSE) HeadsetStateMachine: getDeviceForMessage HeadsetStateMachine: getDeviceForMessage(CLCC): No matching device for 104. Returning null HeadsetStateMachine: checkBlacklistWithId(78:F7:BE:01:AA:84) : false, id : 2 HeadsetStateMachine: processSendClccResponse: Failed to get device for CLCC response. Drop response HeadsetStateMachine: Connected process message: 101 (STACK_EVENT) HeadsetStateMachine: event type: 17 (EVENT_TYPE_WBS) HeadsetStateMachine: EVENT_TYPE_WBS codec is 1 |
脚本地址:https://gist.github.com/legendmohe/7c49224da62565d05ca4
定义文件地址:https://gist.github.com/legendmohe/9682ff8d97ae27be8113
使用时,输入以下命令即可:
1 2 |
python readable_state.py [target_file] |
LEHome–完整的开源智能家居方案
1 2 3 4 5 6 7 8 |
_/ _/_/_/_/ _/ _/ _/_/ _/ _/ _/_/_/_/ _/ _/ _/ _/ _/ _/ _/_/ _/_/ _/ _/ _/_/_/ _/_/_/_/ _/ _/ _/ _/ _/ _/_/_/ _/ _/ _/ _/ _/ _/ _/ _/ _/ _/_/_/_/ _/_/_/_/ _/ _/ _/_/ _/ _/ _/_/_/_/ ================================================================ |
LEHome 是一套完整的开源智能家居方案。LEHome拥有以下特性:
- 简单的控制命令编程
- ibeacon室内定位
- 高度模块设计
- 视频采集、红外控制、开关控制、传感器采集
- android,web app,微信版客户端
项目地址:https://github.com/legendmohe/LEHome
UPDATE
2015.5.26 添加定位功能
example:
1 2 |
定位某某 |
服务器接受到上述命令后,即会向所有绑定的客户端以“某某”为参数发送一个定位请求。客户端收到请求后,如果“某某”与自己设定的名字是一致的,即会调用SDK发起定位,然后将结果回传给服务器。服务器再将结果发送给所有绑定的客户端。这时候客户端便可以看到“某某”的具体位置了。如下图所示:
2015.1.13 添加触发器功能
example:
1 2 |
当打开电灯的时候播放QQ电台 |
当上述命令生效的时候,只要有打开电灯
发生,播放QQ电台
即可被触发。
注意,触发器是一次性的,如果要持续触发,请将其放在一个循环里。另外,触发的内容最
好用#
包围,以免引起命令解析错误。例如:
1 2 |
循环每工作日早上7点30分内容是当#打开电灯#的时候播放qq电台 |
部署
软件
服务端
LEHome 服务端基于Python,需要安装以下依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
- tornado - pyzmq - numpy - BeautifulSoup - scipy - audioop - alsaaudio - pyaudio - fysom - mplayer - sox - redis |
down下来后,配置init.json(后面会说明如何配置),然后在根目录下运行./start.sh即可(先用chmod添加执行权限)。
客户端
目前LEHome实现了Android,web app,微信版客户端,如有需要可与我联系legendmohe@foxmail.com。
硬件
要完整地运行本项目,需要准备以下硬件:
- reco WIFI插座 * n
- 蓝牙4.0适配器 * 2
- ibeacon模块 * n
- 蓝牙音箱 * 1
- 红外模块 * 1
- zigbee传感器 * 2
- UVC 摄像头 * 1
reco WIFI插座
淘宝大概99一个,体积略大,好在控制协议是开放的,可以很方便地整合进LEHome。
买回来后,用reco的手机客户端配置一下插座使其正常工作,然后打开路由器的管理页面,将插座的ip记下来备用。
你也可以通过更改SwitchHelper.py使系统兼容自己的wifi插座。
蓝牙4.0适配器
由于要使用ibeacon进行室内定位,故需要4.0以上的BT适配器。需要两个是因为一个负责连蓝牙音箱,一个负责接受ibeacon数据包。如果直接使用音频线来连音箱,则只需一个适配器即可
蓝牙音箱
可以用普通音箱代替 🙂
红外模块
淘宝有售,选择一个开源控制协议的即可。为了避免广告嫌疑,这里不提供链接,有需要的可以私下联系。
zigbee传感器
淘宝有许多zigbee开发板出售,选择其中之一即可。为了避免广告嫌疑,这里不提供链接,有需要的可以私下联系。
注:要根据实际采用的红外模块和zigbee传感器模块来调整LEHome的源码(RILHelper.py和sensor_server.py)。
UVC 摄像头
如果使用截图功能,需要一个UVC摄像头,很容易就可以买到。我使用的是罗技C270。
系统功能
本系统最大的特点是能支持简单的命令编程。
你可以输入:
1 2 |
打开电灯 |
可以输入:
1 2 |
打开电灯然后打开风扇 |
可以更复杂一点:
1 2 |
循环每工作日晚上7点30分内容是打开风扇然后打开电灯 |
或者更更复杂一点:
1 2 |
循环每工作日晚上7点30分内容是如果我在家里那么延时10分钟打开电灯然后如果当前温度大于数值26那么打开风扇然后播放语音#你好# |
如何查看系统支持的命令
打开usr/init.json,可以看到在”command”项下,有许多预定义的命令。
系统检测到命令词出现的时候,会调用相应的callback,所有业务逻辑都在callback里面完成。
命令格式
命令由基本命令和控制语句组成。准许以下规则:
- 基本命令 = delay + action + target + message
- 基本命令 = 基本命令 + 控制语句
- 命令 = trigger + 基本命令 + finish/stop
例如:
1 2 3 4 5 |
打开风扇 -- 打开[action]风扇[target] 延时10分钟打开电灯 -- 延时10分钟[delay]打开[action]电灯[target] 查询公交车8路 -- 查询[action]公交车[target]8路[message] 如果我在家里那么打开电灯 -- 如果[控制语句if]我在家里[基本命令]那么[控制语句then]打开电灯[基本命令] |
以上命令不能直接被系统识别,需要用trigger和finish/stop包围
例如:
1 2 |
你好打开风扇谢谢 -- 你好[trigger]打开[action]风扇[target]谢谢[finish] |
*添加trigger和finish的原因是系统支持连续语音识别命令,需要考虑断句的情况,所以要添加两个标志位来截取命令。
命令callback
所有命令对应的callback.py都保存在usr/callbacks/目录下。
在init.json文件中,可以通过:
1 2 3 4 5 6 7 8 |
"callback":{ "whiles":{ "循环":"whiles.while_callback", "重复":"whiles.while_callback" }, ... } |
这样来指定。
callback主要如下所示:
1 2 3 4 5 6 |
from lib.model import Callback class timer_callback(Callback.Callback): def callback(self, cmd, action, target, msg): ... |
当命令词被触发时,相应callback的callback()方法会被调用,传入的参数由callback函数的定义决定。
应用架构
如下图所示:
联系方式
本项目断断续续做了一年,代码风格,逻辑实现等比较幼稚,加上本README写得极简,基本不可作为开发参考使用,故如有任何疑问,可联系legendmohe@foxmail.com。
用python编写1024游戏(4)
续上一篇blog,我们来用python实现ai版1024游戏。
如前文所述,关键在于 获得最优滑动方向 这一步。整理一下思路:
1 2 3 4 5 6 |
def 获得最优滑动方向: for 方向 in 当前有意义的滑动方向: 执行滑动 方向.评估值 = 评估当前局面 return 最大评估值的方向 |
此方法会调用一个用递归实现的深度优先搜索:
1 2 3 4 5 6 7 8 9 10 11 |
def 评估当前局面: if 无需继续搜索: return 当前局面的评估函数值 累计值 = 0 for 随机数字 in 当前剩余的空格: # 注意循环嵌套的顺序 for 方向 in 当前有意义的滑动方向: 执行滑动,局面改变 累计值 += 评估当前局面 return 累计值 |
直接将其翻译成python代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
def evaluate(board, depth): if depth <= 0 \ or (board.num_of_empty == 0 and not combinable(board)): return evaluation_func(board) sum_of_ev = 0 for x, y in get_left_empty(board): for p in [2, 4]: board.set(x, y, p) for swap_dir in get_vaild_dirs(board): board_copy = board.copy() _, moved = apply_swap(board_copy, swap_dir) if moved: ev = evaluate(board_copy, depth - 1) if p == 2: sum_of_ev += ev * 0.95 else: sum_of_ev += ev * 0.05 board.set(x, y, 0) return sum_of_ev def best_dir(board): max_value = -sys.maxint best = SwapDir.DOWN for swap_dir in get_vaild_dirs(board): board_copy = board.copy() _, moved = apply_swap(board_copy, swap_dir) if moved: cur_value = evaluate(board_copy, depth=2) if cur_value >= max_value: max_value = cur_value best = swap_dir return best |
其中 evaluation_func 是关键。如何定义这个函数呢?前文提到:
1 2 |
H(x) = aA(x) + bB(x) + cC(x) + ... |
其中 H(x) 是评估函数。我们要思考究竟是什么因素影响了当前的局面,什么才算对自己有利。
- 最大值尽量靠在边角
- 尽可能地沿着某个方向呈等比数列排列
于是我们可以这样设计:
1 2 3 4 5 6 7 8 |
def evaluation_func(board): o = order_weight(board) c = corner(board)*10 m = monotonicity(board) e = empty_weight(board)*10 x = max_value_weight(board) return o + m + c + e + x |
各种因子的实现为:
1 2 3 4 5 6 |
def corner(board): # 最大值是否在角落 if board.get(0, 0) == board.max_value: return board.get(0, 0) else: return 0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 |
POS_WEIGHT_TEMPLATE = [[4,3,2,1], # 位置权重 [3,2,1,1], [2,1,1,1], [1,1,1,1]] def order_weight(board): # 根据权重矩阵计算当前局面的“方向性” global POS_WEIGHT_TEMPLATE m = 0 for i in range(board.length): for j in range(board.length): if board.get(i, j) != 0: m += board.get(i,j)*POS_WEIGHT_TEMPLATE[i][j] return m |
1 2 3 4 5 6 7 8 9 |
def monotonicity(board): # 尽量使第一行按顺序排列 value = 0 for i in range(board.length - 1): if not board.get(0, i) > board.get(0, i + 1): return 0 else: value += (board.length - i)*board.get(0, i) return value + board.get(0, board.length -1) |
1 2 3 4 5 6 |
def empty_weight(board): # 空格数 return board.num_of_empty def max_value_weight(board): # 尽量凑最大值 return board.max_value |
递归深度为2时,运行一次大概要4-5min。当我设置递归深度为1时,进行了n次实验(n>50),胜率大概在50%左右。跟这里提及的算法效果比起来简直一个天上一个地下。
如何设计更好的评估函数呢?如何制定更完善的搜索策略呢?这有点超出我目前的能力范围了。。。
完整代码我放到Gist上,如下所示:
用python编写1024游戏(3)
继这篇blog后,我们来尝试实现ai版的1024游戏。
背景
先来认识两个概念:树状搜索(Tree-Searching)和评价函数(Evaluate Function)
树状搜索
以象棋为例子、下棋的过程可以看作是棋局变化的过程。假设某时刻某个棋局,接下来可以移动棋子的方式有k种,每种移动方式会导致另外一种棋局。故整个下棋的过程可以用一棵n叉树来描述。目前的局面称为“根局面”(Root Position)或“根结点”(Root Node),后续分支称为“后续局面”(Successor Position)或“后续结点”(Successor Nodes)。
理论上可以从最初的局面开始生成n叉树,直至所有叶子节点满足游戏结束的条件为止,但随着生成层次增加,计算的次数迅速上升,往往会超出当前应用场景的接受范围。所以在一些复杂的游戏当中,会对搜索层次进行限制。同时,可以对某些特定算法进行“剪枝”优化,根据某些条件终止当前路径的搜索以减少分支数。
评估函数
假设当前局面对应的n叉树已经生成,如何选择对自己最有利的下一步呢?在此我们引入一个评估函数,该函数输入一个局面,返回对该局面的评估值,表示当前路径下可以获得的优势。我们对下一步所有可能出现的情况进行评估,然后根据评估值最大的局面来走棋。
注意到某局面的评估值依赖于下一步的所有可能出现的局面的评估值(是它们的和),这些可能出现的局面又依赖于在下一步可能出现的局面的评估值。所以我们需要从叶子节点开始计算评估值,然后回溯到顶层的同时更新父节点的评估值。
思路
首先,很明显,玩1024的过程可以用一棵n叉树来描述。游戏状态的变化按照规则:
1 2 3 4 5 6 7 |
if 棋盘上存在格子的值为1024: 游戏状态=胜利 elif 棋盘上存在剩余空格 或者 棋盘上依然存在可以合并的元素: 游戏状态=未结束 else: 游戏状态=失败 |
设棋盘大小为4*4, 每一回合的平均空格数为4*4/2,大部分回合都可以朝4个方向滑动,那么随着搜索层数的增长,节点的个数如下增长:
1 2 3 4 5 6 7 |
层数 节点个数 0 1 1 32 2 1024 3 32768 4 1048576 |
仅四层节点个数便达到百万级别(我没算错的话),所以我们必须采用启发式搜索。
接下来我们大致写出整个ai游戏过程:
1 2 |
游戏开始-> ai下棋-> 游戏结束 |
具体其中的 ai下棋 过程:
1 2 |
初始化-> 随机出现数字-> 评估当前局面-> 执行滑动,局面改变-> 随即出现数字-> ... -> 游戏结束 |
用循环和判断语句描述一下:
1 2 3 4 5 |
while 未结束: 随机出现一个数字 获得最优滑动方向 执行滑动,局面改变 |
关键在 获得最优滑动方向 这一步。具体化一下:
1 2 3 4 5 6 |
def 获得最优滑动方向: for 方向 in 当前有意义的滑动方向: 执行滑动 方向.累计值 = 评估当前局面 return 最大累计值的方向 |
其中 评估当前局面 思路如下所示:
1 2 3 4 5 6 7 8 9 10 11 |
def 评估当前局面: if 无需继续搜索: return 当前局面的评估函数值 累计值 = 0 for 随机数字 in 当前剩余的空格: # 注意循环嵌套的顺序 for 方向 in 当前有意义的滑动方向: 执行滑动,局面改变 累计值 += 评估当前局面 return 累计值 |
这是一个深度优先搜索,可以用递归或者栈来实现,后续文章会有实例代码。
注意到 当前有意义的滑动方向 这一步。如何获得这个方向序列呢?我们来思考一下。“局面改变”是指:
1 2 |
存在空单元格被占 or 存在元素被合并 |
于是我们可以获得两个判断“可移动”的条件
1 2 3 |
当前方向上存在空单元格 当前方向上存在可合并单元格 |
接下来观察 计算评估函数值部分。设评估值为H, 影响因子(决定评估值得因素)的值为A, B, C…,它们的权重为a, b, c, …,则有:
1 2 |
H = aA + bB + cC + ... |
可以写成:
1 2 |
H(x) = aA(x) + bB(x) + cC(x) + ... |
其中x代表当前局面。
我们暂时简单地设计一些影响因子函数,例如“当前局面的剩余空格数”,“当前累计的总分”,“形成可组合的对数”等等。然后根据实验来调整权重。
好了,说完思路,接下来将采用python实现。