EV3 直接命令 - 第一課 無為的藝術
LEGO 的 EV3 是一個極好的遊戲工具。它的標準程式設計方式是 LEGO 的圖形化程式設計工具。你可以編寫程式,把它們傳到你的 EV3 brick 上,然後啟動它們。但還有另外一種與你的 EV3 互動的方式。把它看作一個伺服器並給它傳送命令,命令將以資料和/或行為來應答。在這種情形下,你的程式所執行的機器是客戶端。這打開了迷人的新視角。如果程式執行在你的智慧手機上,你將獲得很好的互動性和便利性。如果你的客戶端是一個 PC 或筆記本電腦,你將獲得舒服的鍵盤和顯示器。另一種新選項是在一個機器人中結合多個 EV3。一個客戶端與多個伺服器通訊,這允許機器人具有無限多個馬達和感測器。或者把你的 EV3 當做一臺機器,生產資料的那種。客戶端可以持續地從 EV3 的感測器接收資料,這也將開啟新的機會之門。如果你想進入這個新世界,你不得不使用 EV3 的直接命令,這需要你的一些工作。如果你準備投資它,則繼續閱讀,否則,開心地玩你的 EV3 並等待其它人做出很酷的新應用吧。
在我們開始討論正式的內容之前,我們先瞥一眼 EV3 的通訊協議和 LEGO Direct Commands 的特性。你的 EV3 提供了三種通訊型別,藍芽,WiFI 和 USB。藍芽和 USB 無需其它額外的裝置即可使用,通過 WiFi 通訊需要一個介面卡。所有三種都允許你開發應用程式,執行在任何計算機上與你的 EV3 brick 通訊。也許你瞭解 EV3 的前身,NXT 及其通訊協議。它提供了大約 20 個系統命令,類似於函式呼叫。LEGO 改變了它的理念,EV3 以機器碼提供了命令語法。一方面,這意味著實現真正演算法的自由,但另一方面,它變得更難入門。
官方文件,我發現,是純粹的技術文件。他們缺乏動力和教育方面。我希望這份文件能成為你深入你的 EV3 的適當入口。如果你已經是一個程式設計師,或試圖稱為一個程式設計師,你將沉浸在位和位元組的世界。如果不是,它將變得困難,但唯一的選擇是,保持雙手分開。試一試,在與樂高 EV3 的有效溝通之下,你將學到很多關於機器程式碼,所有計算機的基礎的知識。
要知道的文件
LEGO 已經發布了很好的詳細的文件,你可以在這裡下載:ofollow,noindex">http://www.lego.com/en-gb/mindstorms/downloads 。出於我們的目的,你絕對需要檢視文件,你可以在標題為Advanced Users - Developer Kits(PC / MAC) 的標題下找到。 EV3 Firmware Developer Kit 是 LEGO EV3 直接命令的參考書。我希望你能深入閱讀它。
有一個 C# 的通訊庫,它使用直接命令與 LEGO EV3 通訊。如果你喜歡使用開箱即用的軟體,如果你喜歡 C#,這可能是你的選擇:http://www.monobrick.dk。
我最初的想法是不釋出任何原始碼。當功能逐步增長時,程式設計更有趣。Bugs 是遊戲的一部分,搜尋 bugs 很難,但它是每個程式設計人員的日常生活。簡短的結論,程式碼增長到一定的規模和複雜性,最初的想法變得不現實。我已經在 Github 上釋出了程式碼,你可以在這裡下載:ev3-python3 。
第一課 無為的藝術
你做到了,你真的想介入,那沒很好!這一課是關於非常基本的通訊的。我們將實現第一個呼叫和應答迴圈。通過 WiFi,藍芽或 USB 給你的 EV3 傳送資訊,你將獲得一個定義良好的應答。不要屏住呼吸,我們不會從一個令人驚奇的應用程式開始。相反,它什麼都不做。這聽起來不像是,如果你設法做到這一點,向後傾斜並感到快樂,你就在路上。
位元和位元組命名的小遊覽
也許你已經知道了如何編寫二進位制和十六進位制數字,大小尾端的含義等等。如果你真的可以把值 156 寫為一個小尾端的 4 位元組整數表示格式,則你可以跳過這一節。如果不能,你需要閱讀它,因為你真的需要直到它。
讓我們從基礎開始!幾乎所有現代計算機都將 8 位組合為 1 個位元組,並且在記憶體中按位元組定址(你的 EV3 是一臺現代計算機,因而也是這樣)。在下文中我們使用如下表示法表示二進位制數字:0b 1001 1100
。
前導的0b
告訴我們,後面是數字的二進位制表示法,每個位元組的 8 個數字被分為 4 和 4 的兩個半個位元組。它是數字 156 的二進位制表示法,你可以把它讀作:156 = 1128 + 0
64 + 032 + 1
16 + 18 + 1
4 + 02 + 0
1。可以對相同位元組進行替代解釋。它可以被讀做 8 個標記的序列,或它可以是符號 £ 的
ASCII 碼。解釋依賴於上下文。目前我們專注於數字。
二進位制表示法非常長,因此常見的習慣是把半個位元組寫為 16 進位制數,其中字母 A 到 F 表示數字 10 到 15。十六進位制表示法是緊湊的,它與二進位制表示法的轉換很容易。這是因為一個十六進位制數字表示半個位元組。十六進位制數字(這裡是值 156)表示是: 0x 9C。你可以把它讀作:156 = 916 + 12 1。前導的 0x 告訴我們,後面是十六進位制表示。因為它的緊湊型,我們可以編寫和讀更大的數字。作為一個 4 位元組整數,值 156 被寫為:0x 00 00 00 9C。
我們將用冒號 “:” 或豎條 “|” 把位元組分開。我們使用豎線表示高階分離,使用冒號表示低階。我們將把值為 156 的 2 位元組整數寫作:0x|00:9C|。現在我們可以在一行中儲存值列表。256(無符號 1 位元組整數),156(2 位元組整數)和 65536(4 位元組整數)的序列可以寫為:0x|FF|00:9C|00:01:00:00|。
那負數呢?大多數計算機語言區分有符號和無符號整數。如果整數是有符號的,則它們的第一位是負號標誌,整數是另一個範圍的。有符號 1 位元組整數的範圍是 -128 到 127,有符號 2 位元組整數的範圍是 -32,768 到 32,767 等等。負數的值的計算方法為最小的值(-128,-32,768 等)加上其餘的值。有符號 1 位元組整數的最小值,-128 寫為 0b 1000 0000 或 0x|80|,有符號 2
位元組整數值 -1 (-32,768 + 32,767) 為:0b 1111 1111 1111 1111 或 0x|FF:FF|
那什麼是小尾端 呢?OK,我不再保守這個祕密了。小尾端格式反轉位元組的位置(你常常使用的,被稱作大尾端 )。2 位元組整數值 156 以小尾端格式則被寫作:0x|9C:00|。
也許這聽起來像個糟糕的玩笑,但非常抱歉,EV3 直接命令讀和寫所有數字都是以小尾端進行的,那不是我的錯。但我可以給你一些安慰。首先,本課程使用數字。其次,存在管理小端數的好工具。在 Python 中,你可以使用struct
模組,在 Java 中,ByteBuffer
可能是你選擇的物件。
什麼都不做的直接命令
第一個例子展示所有可能的直接命令中最簡單的那個。你將向你的 EV3 傳送一條訊息,並期待它將有所應答。讓我們看一下要傳送的訊息,它由如下 8 個位元組組成:
------------------------- \len \cnt \ty\hd\op\ ------------------------- 0x|06:00|2A:00|00|00:00|01| ------------------------- \6\42\Re\0,0 \N \ \\\ \\o \ \\\ \\p \ -------------------------
訊息本身是以 0x 開頭的那一行。在訊息的頂部,你會看到有關訊息部分型別的一些註釋。底部顯示有關其值的註釋。訊息的 8 個位元組由如下的部分組成:
-
訊息的長度(位元組 0,1):開頭的兩個位元組不是直接命令本身的組成部分。它們是通訊協議的一部分,在 EV3 的情況下可以是 Wifi,藍芽或 USB。長度被編碼為小尾端格式的 2 位元組無符號整數,因此
0x|06:00|
表示值 6。 -
訊息計數器(位元組 2,3):接下來的兩個位元組是這個直接命令的指紋。訊息計數器將包含在應答中,並可以用來匹配直接命令和它的應答。這也是一個小尾端格式的 2 位元組無符號整數。在我們的例子中把訊息計數器設定為
0x|2A:00|
,其值為 42。 -
訊息型別(位元組 4):它可以是如下的兩個值之一:
- DIRECT_COMMAND_REPLY = 0x|00|
-
DIRECT_COMMAND_NO_REPLY = 0x|80|
在我們的例子中我們希望 EV3 應答訊息。
-
頭部(位元組 5,6):接下來的兩個位元組,第一個操作之前最後的部分是頭部。它包含了兩個數字,它們定義了直接命令的記憶體大小(是的,它是複數,我們有兩個記憶體,一個區域性的和一個全域性的)。我們將很快回到這個記憶體大小的具體細節。此時我們很幸運,我們的命令不需要任何記憶體,因而我們把頭部設定為
0x|00:00|
。 -
操作(從位元組 7 開始):在我們的例子中是單個位元組,它表示:
opNOP
=0x|01|
,什麼也不做,EV3 的 idle 操作。
給 EV3 傳送訊息
我們的任務是,傳送上面描述的訊息給 EV3。如何做到呢?你可以在三種通訊協議中選擇,藍芽,Wifi 和 USB,且你可以選擇支援至少一種通訊協議的任何程式語言。下面我展示了 Python 和 Java 的例子。如果沒有你喜愛的語言,將程式翻譯成你最喜愛的計算機語言並將其傳送給我會很棒。它們將被髮布在這裡。
藍芽
你需要訪問開啟了藍芽的計算機,且你需要開啟你的 EV3 上的藍芽。接下來你需要為兩個裝置做配對。這可以從 EV3 或者你的計算機發起。EV3 的使用者指南中對此過程有所描述。如果需要幫助,你可以在網上找到教程,這裡有 LEGO 頁面的連結:http://www.lego.com/en-gb/mindstorms/support/ 。配對過程將向你展示你的 EV3 的 MAC 地址。你需要注意它。此外,你也可以在你的 EV3 的顯示器中,在Brick Info /ID 下面讀取 MAC 地址。
python
你需要完成如下的步驟:
- 把程式碼複製到名為EV3_do_nothing_bluetooth.py 的檔案中。
-
把 MAC 地址由
00:16:53:42:2B:99
變為你的 EV3 的值。 - 開啟一個終端並切換到你的程式的目錄。
-
通過鍵入
python3 EV3_do_nothing_bluetooth.py
執行它。
#!/usr/bin/env python3 import socket import struct class EV3(): def __init__(self, host: str): self._socket = socket.socket( socket.AF_BLUETOOTH, socket.SOCK_STREAM, socket.BTPROTO_RFCOMM ) self._socket.connect((host, 1)) def __del__(self): if isinstance(self._socket, socket.socket): self._socket.close() def send_direct_cmd(self, ops: bytes, local_mem: int=0, global_mem: int=0)-> bytes: cmd = b''.join([ struct.pack('<h', len(ops) + 5), struct.pack('<h', 42), DIRECT_COMMAND_REPLY, struct.pack('<h', local_mem*1024 + global_mem), ops ]) self._socket.send(cmd) print_hex('Sent', cmd) reply = self._socket.recv(5 + global_mem) print_hex('Recv', reply) return reply def print_hex(desc: str, data: bytes)->None: print(desc + ' 0x|' + ':'.join('{:02X}'.format(byte) for byte in data) + '|') DIRECT_COMMAND_REPLY = b'\x00' opNop = b'\x01' my_ev3 = EV3('00:16:53:42:2B:99') ops_nothing = opNop my_ev3.send_direct_cmd(ops_nothing)
socket
的實現依賴於你的計算機的作業系統。如果AF_BLUETOOTH
不被支援(你將看到一個錯誤訊息如AttributeError: module ‘socket’ has no attribute ‘AF_BLUETOOTH’
)。你可以使用pybluez
,那意味著你需要匯入bluetooth
而不是socket
。在我的情況下那是說:
-
用 pip3 安裝pybluez :
sudo pip3 install pybluez
安裝 pybluez 有兩個前提條件:一是系統中配置的當前預設 Python 是 Python 3,這可以通過$ sudo update-alternatives --config python
完成;二是已經安裝了藍芽開發包 libbluetooth-dev,這可以通過執行命令$ sudo apt-get install libbluetooth-dev
完成,否則在安裝 pybluez 時將報出如下錯誤:creating build/lib.linux-x86_64-3.5 creating build/lib.linux-x86_64-3.5/bluetooth copying bluetooth/osx.py -> build/lib.linux-x86_64-3.5/bluetooth copying bluetooth/__init__.py -> build/lib.linux-x86_64-3.5/bluetooth copying bluetooth/btcommon.py -> build/lib.linux-x86_64-3.5/bluetooth copying bluetooth/msbt.py -> build/lib.linux-x86_64-3.5/bluetooth copying bluetooth/widcomm.py -> build/lib.linux-x86_64-3.5/bluetooth copying bluetooth/ble.py -> build/lib.linux-x86_64-3.5/bluetooth copying bluetooth/bluez.py -> build/lib.linux-x86_64-3.5/bluetooth running build_ext building 'bluetooth._bluetooth' extension creating build/temp.linux-x86_64-3.5 creating build/temp.linux-x86_64-3.5/bluez x86_64-linux-gnu-gcc -pthread -DNDEBUG -g -fwrapv -O2 -Wall -Wstrict-prototypes -g -fstack-protector-strong -Wformat -Werror=format-security -Wdate-time -D_FORTIFY_SOURCE=2 -fPIC -I./port3 -I/usr/include/python3.5m -c bluez/btmodule.c -o build/temp.linux-x86_64-3.5/bluez/btmodule.o In file included from bluez/btmodule.c:20:0: bluez/btmodule.h:5:33: fatal error: bluetooth/bluetooth.h: 沒有那個檔案或目錄 compilation terminated. error: command 'x86_64-linux-gnu-gcc' failed with exit status 1 ---------------------------------------- Command "/usr/bin/python -u -c "import setuptools, tokenize;__file__='/tmp/pip-install-mk3f5p_f/pybluez/setup.py';f=getattr(tokenize, 'open', open)(__file__);code=f.read().replace('\r\n', '\n');f.close();exec(compile(code, __file__, 'exec'))" install --record /tmp/pip-record-gpk9_xhc/install-record.txt --single-version-externally-managed --compile" failed with error code 1 in /tmp/pip-install-mk3f5p_f/pybluez/
-
修改程式:
#!/usr/bin/env python3 import bluetooth import struct class EV3(): def __init__(self, host: str): self._socket = bluetooth.BluetoothSocket(bluetooth.RFCOMM) self._socket.connect((host, 1)) def __del__(self): if isinstance(self._socket, bluetooth.BluetoothSocket): self._socket.close() ...
-
執行程式
Java
與藍芽裝置通訊,我的選擇是bluecove 。下載了 Java 包bluecove-2.1.0.jar (在 Unix 上也可以是bluecove-gpl-2.1.0.jar ) 之後,你可以把它們新增到你的 classpath 上。在我的 Unix 機器上,這通過這個命令來完成:
export CLASSPATH=$CLASSPATH:./bluecove-2.1.0.jar:./bluecove-gpl-2.1.0.jar
bluecove-2.1.0.jar 的下載地址為https://mvnrepository.com/artifact/net.sf.bluecove/bluecove/2.1.0,*bluecove-gpl-2.1.0.jar* 的下載地址為https://mvnrepository.com/artifact/net.sf.bluecove/bluecove-gpl/2.1.0。
然後,執行下面的步驟:
- 把下面的程式碼拷貝到名為EV3_do_nothing_bluetooth.java 的檔案中。
-
把 MAC 地址由
001653422B99
修改為你的 EV3 的值。 - 開啟一個終端並切換到你的程式的目錄。
-
鍵入
javac EV3_do_nothing_bluetooth.java
編譯它。 -
鍵入
java EV3_do_nothing_bluetooth
執行它。
import java.io.*; import javax.microedition.io.*; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.io.*; public class EV3_do_nothing_bluetooth { static final String mac_addr = "001653422B99"; static final byteopNop= (byte)0x01; static final byteDIRECT_COMMAND_REPLY= (byte)0x00; static InputStream in; static OutputStream out; public static void connectBluetooth () throws IOException { String s = "btspp://" + mac_addr + ":1"; StreamConnection c = (StreamConnection) Connector.open(s); in = c.openInputStream(); out = c.openOutputStream(); } public static ByteBuffer sendDirectCmd (ByteBuffer operations, int local_mem, int global_mem) throws IOException { ByteBuffer buffer = ByteBuffer.allocateDirect(operations.position() + 7); buffer.order(ByteOrder.LITTLE_ENDIAN); buffer.putShort((short) (operations.position() + 5));// length buffer.putShort((short) 42);// counter buffer.put(DIRECT_COMMAND_REPLY);// type buffer.putShort((short) (local_mem*1024 + global_mem)); // header for (int i=0; i < operations.position(); i++) {// operations buffer.put(operations.get(i)); } byte[] cmd = new byte [buffer.position()]; for (int i=0; i<buffer.position(); i++) cmd[i] = buffer.get(i); out.write(cmd); printHex("Sent", buffer); byte[] reply = new byte[global_mem + 5]; in.read(reply); buffer = ByteBuffer.wrap(reply); buffer.position(reply.length); printHex("Recv", buffer); return buffer; } public static void printHex(String desc, ByteBuffer buffer) { System.out.print(desc + " 0x|"); for (int i= 0; i < buffer.position() - 1; i++) { System.out.printf("%02X:", buffer.get(i)); } System.out.printf("%02X|", buffer.get(buffer.position() - 1)); System.out.println(); } public static void main (String args[] ) { try { connectBluetooth(); ByteBuffer operations = ByteBuffer.allocateDirect(1); operations.put(opNop); ByteBuffer reply = sendDirectCmd(operations, 0, 0); } catch (Exception e) { e.printStackTrace(System.err); } } }
Wifi
你需要一個 Wifi 介面卡將你的 EV3 連線到你的本地網路。下面文件的第一部分描述了這個過程:http://www.monobrick.dk/guides/how-to-establish-a-wifi-connection-with-the-ev3-brick/ 。現在你的 EV3 是本地網路的一部分,且具有一個網路地址了。從網路中的所有機器你都可以與之通訊。如上面提到的文件所描述的那樣,它需要如下的步驟來與 EV3 建立 TCP/IP 連線:
- 在埠 3015 監聽來自於 EV3 的 UDP 廣播。
- 向 EV3 發回一個 UDP 訊息以使它接受一個 TCP/IP 連線。
- 在埠 5555 上建立一個 TCP/IP 連線。
- 通過 TCP/IP 給 EV3 傳送一條解鎖訊息。
Python
你需要完成如下的步驟:
- 把程式碼複製到名為EV3_do_nothing_wifi.py 的檔案中。
- 開啟一個終端,並切換到你的程式的目錄。
-
鍵入
python3 EV3_do_nothing_wifi.py
執行它。
#!/usr/bin/env python3 import socket import struct import re class EV3(): def __init__(self, host: str): # listen on port 3015 for a UDP broadcast from the EV3 UDPSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) UDPSock.bind(('', 3015)) data, addr = UDPSock.recvfrom(67) # pick serial number, port, name and protocol # from the broadcast message matcher = re.search('Serial-Number: (\w*)\s\n' + 'Port: (\d{4,4})\s\n' + 'Name: (\w+)\s\n' + 'Protocol: (\w+)\s\n', data.decode('utf-8')) serial_number = matcher.group(1) port = matcher.group(2) name = matcher.group(3) protocol = matcher.group(4) if serial_number.upper() != host.replace(':', '').upper(): self._socket = None raise ValueError('found ev3 but not ' + host) # Send an UDP message back to the EV3 # to make it accept a TCP/IP connection UDPSock.sendto(' '.encode('utf-8'), (addr[0], int(port))) UDPSock.close() # Establish a TCP/IP connection with EV3s address and port self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.connect((addr[0], int(port))) # Send an unlock message to the EV3 over TCP/IP msg = ''.join([ 'GET /target?sn=' + serial_number + 'VMTP1.0\n', 'Protocol: ' + protocol ]) self._socket.send(msg.encode('utf-8')) reply = self._socket.recv(16).decode('utf-8') if not reply.startswith('Accept:EV340'): raise IOError('No wifi connection to ' + name + ' established') def __del__(self): if isinstance(self._socket, socket.socket): self._socket.close() def send_direct_cmd(self, ops: bytes, local_mem: int=0, global_mem: int=0)-> bytes: cmd = b''.join([ struct.pack('<h', len(ops) + 5), struct.pack('<h', 42), DIRECT_COMMAND_REPLY, struct.pack('<h', local_mem*1024 + global_mem), ops ]) self._socket.send(cmd) print_hex('Sent', cmd) reply = self._socket.recv(5 + global_mem) print_hex('Recv', reply) return reply def print_hex(desc: str, data: bytes)->None: print(desc + ' 0x|' + ':'.join('{:02X}'.format(byte) for byte in data) + '|') DIRECT_COMMAND_REPLY = b'\x00' opNop = b'\x01' my_ev3 = EV3('00:16:53:42:2B:99') ops_nothing = opNop my_ev3.send_direct_cmd(ops_nothing)
Java
你需要完成如下的步驟:
- 把程式碼複製到名為EV3_do_nothing_wifi.java 的檔案中。
- 開啟一個終端,並切換到你的程式的目錄。
-
鍵入
javac EV3_do_nothing_wifi.java
編譯它。 -
鍵入
java EV3_do_nothing_wifi
執行它。
import java.net.Socket; import java.net.SocketException; import java.net.ServerSocket; import java.net.DatagramSocket; import java.net.DatagramPacket; import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.IntBuffer; import java.nio.ByteOrder; import java.io.*; import java.util.regex.*; public class EV3_do_nothing_wifi { static final byteopNop= (byte)0x01; static final byteDIRECT_COMMAND_REPLY= (byte)0x00; static InputStream in; static OutputStream out; public static void connectWifi () throws IOException, SocketException { // listen for a UDP broadcast from the EV3 on port 3015 DatagramSocket listener = new DatagramSocket(3015); DatagramPacket packet_r = new DatagramPacket(new byte[67], 67); listener.receive(packet_r); // receive the broadcast message String broadcast_message = new String(packet_r.getData()); /* pick serial number, port, name and protocol from the broadcast message */ Pattern broadcast_pattern = Pattern.compile("Serial-Number: (\\w*)\\s\\n" + "Port:\\s(\\d{4,4})\\s\\n" + "Name:\\s(\\w+)\\s\\n" + "Protocol:\\s(\\w+)\\s\\n"); Matcher matcher = broadcast_pattern.matcher(broadcast_message); String serial_number, name, protocol; int port; if(matcher.matches()) { serial_number = matcher.group(1); port = Integer.valueOf(matcher.group(2)); name = matcher.group(3); protocol = matcher.group(4); } else { throw new IOException("Unexpected Broadcast message: " + broadcast_message); } InetAddress adr = packet_r.getAddress(); // connect the EV3 with its address and port listener.connect(adr, port); /* Send an UDP message back to the EV3 to make it accept a TCP/IP connection */ listener.send(new DatagramPacket(new byte[1], 1)); // close the UDP connection listener.close(); // Establish a TCP/IP connection with EV3s address and port Socket socket = new Socket(adr, port); in= socket.getInputStream(); out= socket.getOutputStream(); // Send an unlock message to the EV3 over TCP/IP String unlock_message = "GET /target?sn=" + serial_number + "VMTP1.0\n" + "Protocol: " + protocol; out.write(unlock_message.getBytes()); byte[] reply = new byte[16];// read reply in.read(reply); if (! (new String(reply)).startsWith("Accept:EV340")) { throw new IOException("No wifi connection established " + name); } } public static ByteBuffer sendDirectCmd (ByteBuffer operations, int local_mem, int global_mem) throws IOException { ByteBuffer buffer = ByteBuffer.allocateDirect(operations.position() + 7); buffer.order(ByteOrder.LITTLE_ENDIAN); buffer.putShort((short) (operations.position() + 5));// length buffer.putShort((short) 42);// counter buffer.put(DIRECT_COMMAND_REPLY);// type buffer.putShort((short) (local_mem*1024 + global_mem)); // header for (int i=0; i<operations.position(); i++) {// operations buffer.put(operations.get(i)); } byte[] cmd = new byte [buffer.position()]; for (int i=0; i<buffer.position(); i++) cmd[i] = buffer.get(i); out.write(cmd); printHex("Sent", buffer); byte[] reply = new byte[global_mem + 5]; in.read(reply); buffer = ByteBuffer.wrap(reply); buffer.position(reply.length); printHex("Recv", buffer); return buffer; } public static void printHex(String desc, ByteBuffer buffer) { System.out.print(desc + " 0x|"); for (int i= 0; i<buffer.position() - 1; i++) { System.out.printf("%02X:", buffer.get(i)); } System.out.printf("%02X|", buffer.get(buffer.position() - 1)); System.out.println(); } public static void main (String args[] ) { try { connectWifi(); ByteBuffer operations = ByteBuffer.allocateDirect(1); operations.put(opNop); ByteBuffer reply = sendDirectCmd(operations, 0, 0); } catch (Exception e) { e.printStackTrace(System.err); } } }
USB
通用序列匯流排是一個連線電子裝置的工業標準。你的 EV3 具有一個 2.0 Mini -B 插口(標有 PC 字樣)。這是效能最好的通訊協議,但它需要一條線。在你執行你的程式的電腦上,你需要與 LEGO EV3 通訊的許可權。在我的情況中,我需要新增如下的 udev 規則(我的作業系統是 Unix):
ATTRS{idVendor}=="0694",ATTRS{idProduct}=="0005",MODE="0666",GROUP=<group>
這通過它們的 vendor-id 0x0694 和 product-id 0x0005 標識 EV3 裝置。模式 0666 允許屬於<group>
的所有使用者具有讀和寫許可權(把<group>
修改為你所屬的那個) 。EV3-USB 裝置描述符顯示了一個具有一個介面和兩個端點的配置,0x01 用於給 EV3 傳送資料,0x81 用於從 EV3 接收資料。資料以 1024 位元組的包傳送和接收。
Python
也許你需要安裝pyusb
。對於我的系統來說這通過如下命令完成:
sudo pip3 install --pre pyusb
如果已經安裝了pyusb
,你需要完成如下的步驟:
- 把程式碼複製到名為EV3_do_nothing_usb.py 的檔案中。
- 開啟一個終端,並切換到你的程式的目錄。
-
鍵入
python3 EV3_do_nothing_usb.py
執行它。
#!/usr/bin/env python3 import usb.core import struct class EV3(): def __init__(self, host: str): self._device = usb.core.find(idVendor=ID_VENDOR_LEGO, idProduct=ID_PRODUCT_EV3) if self._device is None: raise RuntimeError("No Lego EV3 found") serial_number = usb.util.get_string(self._device, self._device.iSerialNumber) if serial_number.upper() != host.replace(':', '').upper(): raise ValueError('found ev3 but not ' + host) if self._device.is_kernel_driver_active(0) is True: self._device.detach_kernel_driver(0) self._device.set_configuration() self._device.read(EP_IN, 1024, 100) def __del__(self): pass def send_direct_cmd(self, ops: bytes, local_mem: int=0, global_mem: int=0)-> bytes: cmd = b''.join([ struct.pack('<h', len(ops) + 5), struct.pack('<h', 42), DIRECT_COMMAND_REPLY, struct.pack('<h', local_mem*1024 + global_mem), ops ]) self._device.write(EP_OUT, cmd, 100) print_hex('Sent', cmd) reply = self._device.read(EP_IN, 1024, 100)[0:5+global_mem] print_hex('Recv', reply) return reply def print_hex(desc: str, data: bytes)->None: print(desc + ' 0x|' + ':'.join('{:02X}'.format(byte) for byte in data) + '|') ID_VENDOR_LEGO = 0x0694 ID_PRODUCT_EV3 = 0x0005 EP_IN= 0x81 EP_OUT = 0x01 DIRECT_COMMAND_REPLY = b'\x00' opNop = b'\x01' my_ev3 = EV3('00:16:53:42:2B:99') ops_nothing = opNop my_ev3.send_direct_cmd(ops_nothing)
Java
我選擇的與 USB 裝置通訊的是 usb4java。下載了 Java 包之後,你可以把它們新增到你的 classpath 中。在我的 Unix 機器是,通過如下命令完成:
export CLASSPATH=$CLASSPATH:./usb4java-1.2.0.jar:./libusb4java-1.2.0-linux-x86_64.jar
然後,執行下面的步驟:
- 把下面的程式碼拷貝到名為EV3_do_nothing_usb.java 的檔案中。
- 開啟一個終端並切換到你的程式的目錄。
-
鍵入
javac EV3_do_nothing_usb.java
編譯它。 -
鍵入
java EV3_do_nothing_usb
執行它。
import org.usb4java.Device; import org.usb4java.DeviceDescriptor; import org.usb4java.DeviceHandle; import org.usb4java.DeviceList; import org.usb4java.LibUsb; import org.usb4java.LibUsbException; import java.nio.ByteBuffer; import java.nio.IntBuffer; import java.nio.ByteOrder; public class EV3_do_nothing_usb { static final short ID_VENDOR_LEGO = (short) 0x0694; static final short ID_PRODUCT_EV3 = (short) 0x0005; static final byteEP_IN= (byte)0x81; static final byteEP_OUT= (byte)0x01; static final byteopNop= (byte)0x01; static final byteDIRECT_COMMAND_REPLY= (byte)0x00; static DeviceHandle handle; public static void connectUsb () { int result = LibUsb.init(null); Device device = null; DeviceList list = new DeviceList(); result = LibUsb.getDeviceList(null, list); if (result < 0){ throw new RuntimeException("Unable to get device list. Result=" + result); } boolean found = false; for (Device dev: list) { DeviceDescriptor descriptor = new DeviceDescriptor(); result = LibUsb.getDeviceDescriptor(dev, descriptor); if (result != LibUsb.SUCCESS) { throw new LibUsbException("Unable to read device descriptor", result); } if (descriptor.idVendor()== ID_VENDOR_LEGO || descriptor.idProduct() == ID_PRODUCT_EV3) { device = dev; found = true; break; } } LibUsb.freeDeviceList(list, true); if (! found) throw new RuntimeException("Lego EV3 device not found."); handle = new DeviceHandle(); result = LibUsb.open(device, handle); if (result != LibUsb.SUCCESS) { throw new LibUsbException("Unable to open USB device", result); } boolean detach = LibUsb.kernelDriverActive(handle, 0) != 0; if (detach) result = LibUsb.detachKernelDriver(handle, 0); if (result != LibUsb.SUCCESS) { throw new LibUsbException("Unable to detach kernel driver", result); } result = LibUsb.claimInterface(handle, 0); if (result != LibUsb.SUCCESS) { throw new LibUsbException("Unable to claim interface", result); } } public static ByteBuffer sendDirectCmd (ByteBuffer operations, int local_mem, int global_mem) { ByteBuffer buffer = ByteBuffer.allocateDirect(operations.position() + 7); buffer.order(ByteOrder.LITTLE_ENDIAN); buffer.putShort((short) (operations.position() + 5));// length buffer.putShort((short) 42);// counter buffer.put(DIRECT_COMMAND_REPLY);// type buffer.putShort((short) (local_mem*1024 + global_mem)); // header for (int i=0; i < operations.position(); i++) {// operations buffer.put(operations.get(i)); } IntBuffer transferred = IntBuffer.allocate(1); int result = LibUsb.bulkTransfer(handle, EP_OUT, buffer, transferred, 100); if (result != LibUsb.SUCCESS) { throw new LibUsbException("Unable to write data", transferred.get(0)); } printHex("Sent", buffer); buffer = ByteBuffer.allocateDirect(1024); transferred = IntBuffer.allocate(1); result = LibUsb.bulkTransfer(handle, EP_IN, buffer, transferred, 100); if (result != LibUsb.SUCCESS) { throw new LibUsbException("Unable to read data", result); } buffer.position(global_mem + 5); printHex("Recv", buffer); return buffer; } public static void printHex(String desc, ByteBuffer buffer) { System.out.print(desc + " 0x|"); for (int i= 0; i < buffer.position() - 1; i++) { System.out.printf("%02X:", buffer.get(i)); } System.out.printf("%02X|", buffer.get(buffer.position() - 1)); System.out.println(); } public static void main (String args[] ) { try { connectUsb(); ByteBuffer operations = ByteBuffer.allocateDirect(1); operations.put(opNop); ByteBuffer reply = sendDirectCmd(operations, 0, 0); LibUsb.releaseInterface(handle, 0); LibUsb.close(handle); } catch (Exception e) { e.printStackTrace(System.err); } } }
應答訊息
如果你使用上面方案中的其中一個方案成功了,則會獲得以下輸出,即直接命令的回覆訊息:
---------------- \len \cnt \rs\ –--------------- 0x|03:00|2A:00|02| –--------------- \3\42\ok\ ----------------
前兩個位元組是眾所周知的,它是回覆的小尾端訊息長度。在我們情況中,恢復訊息是 3 位元組長的。接下來的兩個位元組是訊息計數器,也是廣為人知的,你傳送的訊息的指紋,是 42。
最後一個位元組是返回狀態,其有 2 個可能值:
-
DIRECT_REPLY
=0x|02|
:直接命令操作成功 -
DIRECT_REPLY_ERROR
=0x|04|
:直接命令以失敗結束
如果你真的收到了這條回覆資訊,那麼你就入門了。恭喜!
頭部的細節
上面我們跳過了頭部細節的描述。提到了它,頭部包含兩個數,它們定義了記憶體的大小。
第一個數是區域性記憶體的大小,其是你可以在其中儲存中間資訊的地址空間。第二個數描述了全域性記憶體的大小,其是輸出的地址空間。在DIRECT_COMMAND_REPLY
的情形中,全域性記憶體將作為迴應的一部分發回。
區域性記憶體具有最大 63 個位元組,全域性記憶體具有最大 1019 位元組。那意味著,區域性記憶體的大小需要 6 位,全域性記憶體的大小需要 10 位。所有的組合在一起可以包含在兩個位元組中,如果一個位元組共用的話。確實這樣做了。如果以相反的順序寫入頭位元組,這是熟悉的大尾端,並且以二進位制表示法寫半位元組組,則得到:0b LLLL LLGG GGGG GGGG
。開頭的 6 位是區域性記憶體大小,其範圍為 0-63。尾部的 10 位是全域性記憶體的大小,其範圍為 0-1020。小尾端下是:0b GGGG GGGG LLLL LLGG
。比如如果你的全域性記憶體具有 6 位元組大小,你的區域性記憶體需要 16 位元組大小,則你的頭部是0b 0000 0110 0100 0000
或以十六進位制表示是0x 06 40
。
這是描述性版本,現在是宣告式方式的第二種方法。如果local_mem
是本地記憶體的大小而global_mem
是全域性記憶體的大小,則計算:header
=local_mem
* 1024 +global_mem
。把頭部以小尾端 2 位元組整數寫入你將得到兩個頭部位元組。如果你還有疑問,請等待接下來的課程,你將看到大量的頭部並從例子中學習,這將有望解答你的疑問。
什麼都不做的變體
在離開我們的第一個例子並關閉第一章之前,我們將測試兩個頭部的變體。第一個嘗試是直接命令具有 6 位元組的全域性記憶體空間:
\len \cnt \ty\hd\op\ ------------------------- 0x|06:00|2A:00|00|06:00|01| ------------------------- \6\42\Re\0,6 \N \ \\\ \\o \ \\\ \\p \ -------------------------
我們期望獲得 6 位元組低值輸出的回覆。 因此,你必須將答案的長度從 5 增加到 11。如果這樣做,你將得到:
---------------------------------- \len \cnt \rs\Output\ –--------------------------------- 0x|09:00|2A:00|02|00:00:00:00:00:00| –--------------------------------- \9\42\ok\\ ----------------------------------
我們新增 16 位元組的區域性記憶體空間,並將直接命令更改為以下內容:
------------------------- \len \cnt \ty\hd\op\ ------------------------- 0x|06:00|2A:00|00|06:40|01| ------------------------- \6\42\Re\16,6 \N \ \\\ \\o \ \\\ \\p \ -------------------------
我們希望得到與上述相同的回覆,實際上:
---------------------------------- \len \cnt \rs\Output\ –--------------------------------- 0x|09:00|2A:00|02|00:00:00:00:00:00| –--------------------------------- \9\42\ok\\ ----------------------------------
你的家庭作業
在進行第 2 課之前,你應該完成如下的家庭作業:
- 把一個小程式轉換為你喜歡的程式語言並把它整合進你喜歡的開發環境。
-
準備一些工具,因為一遍又一遍的從頭開始可不是一件讓人愉快的事情。我想到了以下設計:
- EV3 是一個類。
- BLUETOOTH,USB,WIFI,STD,ASYNC,SYNC 和 opNop 是公共常量。
-
連線 EV3 是 EV3 物件初始化的一部分,即協議的選擇通過以特定的引數呼叫 EV3 物件的建構函式完成。EV3 物件需要記住它的協議的型別。
socket
和device
是 EV3 物件的 private 或 protected 變數。 -
傳送資料給 EV3 通過 EV3 的方法
send_direct_cmd
完成。你可以把示例的函式當作藍圖,但在內部你一定要區分協議。 -
為了從 EV3 接收資料,我們使用方法
wait_for_reply
。你必須把函式send_direct_cmd
的程式碼拆分為兩個新的方法send_direct_cmd
和wait_for_reply
。 -
新增一個屬性
verbosity
,它規定了已傳送的直接命令和收到的回覆的列印。 -
新增一個屬性
sync_mode
,它通過如下值規定了通訊的行為:-
SYNC
:總是使用型別DIRECT_COMMAND_REPLY
並等待響應。 -
ASYNC
:從不等待響應,設定DIRECT_COMMAND_NO_REPLY
,當不使用全域性記憶體時,其它情況設定DIRECT_COMMAND_REPLY
。 -
STD
:像ASYNC
那樣設定DIRECT_COMMAND_NO_REPLY
或DIRECT_COMMAND_REPLY
,但在DIRECT_COMMAND_REPLY
的情況下等待響應。
-
-
msg_cnt
是 EV3 物件的私有變數,每次呼叫send_direct_cmd
這個值都會增加。使用它來設定訊息計數器。 -
如例子中那樣,訊息長度,訊息計數器,訊息型別和頭部都是在
send_direct_cmd
內部自動新增的。因此send_direct_cmd
方法的引數ops
真正地持有操作而沒有其它東西。
-
做一些效能測試,並比較三種通訊協議(你將看到,USB 最快,藍芽最慢,Wifi 居於中間,但你可能會賭三個協議之間的絕對值和因素)。
-
以
DIRECT_COMMAND_REPLY
重複傳送opNop
並計算一個傳送接收迴圈的平均時間。 - 把連線的時間從傳送和接收的時間中分離出來。你將只連線一次,但傳送和接收迴圈的效能將限制你的應用程式。
-
以
結論
你開始編寫一個類 EV3,用於使用直接命令與 LEGO EV3 通訊。這個類允許自由地選擇通訊協議,並提供藍芽,USB 和 Wifi。我選擇的程式語言是 Python3。我使用 pydoc3 來展示我們的工程的實際狀態。我希望,你可以簡單地把它轉為你喜歡的語言。此刻,我們的類 EV3 具有如下的 API:
Help on module ev3: NAME ev3 - LEGO EV3 direct commands CLASSES builtins.object EV3 class EV3(builtins.object) |object to communicate with a LEGO EV3 using direct commands | |Methods defined here: | |__del__(self) |closes the connection to the LEGO EV3 | |__init__(self, protocol:str, host:str) |Establish a connection to a LEGO EV3 device | |Arguments: |protocol: 'Bluetooth', 'Usb' or 'Wifi' |host: mac-address of the LEGO EV3 (f.i. '00:16:53:42:2B:99') | |send_direct_cmd(self, ops:bytes, local_mem:int=0, global_mem:int=0) -> bytes |Send a direct command to the LEGO EV3 | |Arguments: |ops: holds netto data only (operations), the following fields are added: |length: 2 bytes, little endian |counter: 2 bytes, little endian |type: 1 byte, DIRECT_COMMAND_REPLY or DIRECT_COMMAND_NO_REPLY |header: 2 bytes, holds sizes of local and global memory | |Keyword Arguments: |local_mem: size of the local memory |global_mem: size of the global memory | |Returns: |sync_mode is STD: reply (if global_mem > 0) or message counter |sync_mode is ASYNC: message counter |sync_mode is SYNC: reply of the LEGO EV3 | |wait_for_reply(self, counter:bytes) -> bytes |Ask the LEGO EV3 for a reply and wait until it is received | |Arguments: |counter: is the message counter of the corresponding send_direct_cmd | |Returns: |reply to the direct command | |---------------------------------------------------------------------- |Data descriptors defined here: | |__dict__ |dictionary for instance variables (if defined) | |__weakref__ |list of weak references to the object (if defined) | |sync_mode |sync mode (standard, asynchronous, synchronous) | |STD:Use DIRECT_COMMAND_REPLY if global_mem > 0, |wait for reply if there is one. |ASYNC: Use DIRECT_COMMAND_REPLY if global_mem > 0, |never wait for reply (it's the task of the calling program). |SYNC:Always use DIRECT_COMMAND_REPLY and wait for reply. | |The general idea is: |ASYNC: Interruption or EV3 device queues direct commands, |control directly comes back. |SYNC:EV3 device is blocked until direct command is finished, |control comes back, when direct command is finished. |STD:NO_REPLY like ASYNC with interruption or EV3 queuing, |REPLY like SYNC, synchronicity of program and EV3 device. | |verbosity |level of verbosity (prints on stdout). DATA BLUETOOTH = 'Bluetooth' USB = 'Usb' WIFI = 'Wifi' STD = 'STD' ASYNC = 'ASYSNC' SYNC = 'SYNC' opNop = b'\x01'
我的類 EV3 是模組ev3
的一部分,檔名是ev3.py
。我使用如下的程式來測試我的類 EV3:
#!/usr/bin/env python3 import ev3 my_ev3 = ev3.EV3(protocol=ev3.BLUETOOTH, host='00:16:53:42:2B:99') my_ev3.verbosity = 1 ops = ev3.opNop print("*** SYNC ***") my_ev3.sync_mode = ev3.SYNC my_ev3.send_direct_cmd(ops) print("*** ASYNC (no reply) ***") my_ev3.sync_mode = ev3.ASYNC my_ev3.send_direct_cmd(ops) print("*** ASYNC (reply) ***") counter_1 = my_ev3.send_direct_cmd(ops, global_mem=1) counter_2 = my_ev3.send_direct_cmd(ops, global_mem=1) my_ev3.wait_for_reply(counter_1) my_ev3.wait_for_reply(counter_2) print("*** STD (no reply) ***") my_ev3.sync_mode = ev3.STD my_ev3.send_direct_cmd(ops) print("*** STD (reply) ***") my_ev3.send_direct_cmd(ops, global_mem=5) my_ev3.send_direct_cmd(ops, global_mem=5)
得到輸出:
*** SYNC *** 15:15:05.084275 Sent 0x|06:00|2A:00|00|00:00|01| 15:15:05.168023 Recv 0x|03:00|2A:00|02| *** ASYNC (no reply) *** 15:15:05.168548 Sent 0x|06:00|2B:00|80|00:00|01| *** ASYNC (reply) *** 15:15:05.168976 Sent 0x|06:00|2C:00|00|01:00|01| 15:15:05.169315 Sent 0x|06:00|2D:00|00|01:00|01| 15:15:05.212077 Recv 0x|04:00|2C:00|02|00| 15:15:05.212708 Recv 0x|04:00|2D:00|02|00| *** STD (no reply) *** 15:15:05.213034 Sent 0x|06:00|2E:00|80|00:00|01| *** STD (reply) *** 15:15:05.213411 Sent 0x|06:00|2F:00|00|05:00|01| 15:15:05.254032 Recv 0x|08:00|2F:00|02|00:00:00:00:00| 15:15:05.254633 Sent 0x|06:00|30:00|00|05:00|01| 15:15:05.313027 Recv 0x|08:00|30:00|02|00:00:00:00:00|
一些備註:
-
sync_mode
=SYNC
設定type
=DIRECT_COMMAND_REPLY
並自動地等待響應,ok。 -
sync_mode
=ASYNC
設定type
=DIRECT_COMMAND_NO_REPLY
,不等待,ok。 -
全域性記憶體設定
type
=DIRECT_COMMAND_REPLY
時sync_mode
=ASYNC
,不等待。顯式地呼叫wait_for_reply
方法獲得響應。 - 請特別尊重此變體。我們傳送兩個直接命令,它們都被執行,且 EV3 裝置儲存響應。
- 當我們稍後詢問響應時,我們首先讀取第一條命令的響應。它似乎就像 EV3 是一個 FIFO(先進先出)。
- 但它也可以並行執行,但它也可以按照完成執行的順序執行並行和重複。我們將回到這一點。
-
模式
ASYNC
需要一些規律。如果你忘記了讀取響應,當你等待另一件事時,它會來。 我們使用訊息計數器來揭示這種情況! - 請小心 USB 協議!協議USB請小心! 如果像我一樣直接傳送非同步命令,這可能會太快。
-
sync_mode
=STD
,沒有全域性記憶體設定type
=DIRECT_COMMAND_NO_REPLY
,並且不等待回覆,ok。 -
sync_mode
=STD
,全域性記憶體設定type
=DIRECT_COMMAND_REPLY
,每個直接命令等待響應,ok。 - 訊息計數器隨直接命令遞增,ok。
- 頭部正確儲存全域性記憶體的大小,ok。
如果你完成了作業,那麼你已經為新的冒險做好了充分的準備。 我希望在下一課中再次見到你。