Build REST API using Python and Flask – Part4

We are in part 5 of tutorial series of Building Raspberry Pi Line Follower Robot! If you want to jump back and forth or missed any of my previous tutorials – here is the consolidated list.

All Tutorials to Build IOT Raspberry Pi Robot

  1. Build Raspberry Pi Robot controlled by Internet – IOT Robot
  2. Setup Static IP Address on Raspberry Pi
  3. Installing Web Server on Raspberry Pi
  4. Controlling Raspberry Pi LED via internet
  5. Raspberry Pi LED controlled by Internet
  6. Build REST API using Python and Flask
  7. Raspberry Pi Robot Logitech Camera Server
  8. Create Web Dashboard for IOT Robot Control

Entire project Video Tutorial


 

Now, we have basic IOT LED ready but our main goal is to control Robot that we have built using Raspberry Pi. Let’s get started!

REST (REpresentational State Transfer) has developed as the standard engineering outline for Web APIs. In this article I will demonstrate to you to make a RESTful web api utilizing Python and the Flask microframework.

What is REST?

The characteristics of a REST system are defined by six design rules:

  • Client-Server: There should be a separation between the server that offers a service, and the client that consumes it.
  • Stateless: Each request from a client must contain all the information required by the server to carry out the request. In other words, the server cannot store information provided by the client in one request and use it in another request.
  • Cacheable: The server must indicate to the client if requests can be cached or not.
  • Layered System: Communication between a client and a server should be standardized in such a way that allows intermediaries to respond to requests instead of the end server, without the client having to do anything different.
  • Uniform Interface: The method of communication between a client and a server must be uniform.
  • Code on demand: Servers can provide executable code or scripts for clients to execute in their context. This constraint is the only one that is optional.

Taking Employee as an example – The HTTP request methods are typically designed to affect a given resource in standard ways:

HTTP Method Action Examples
GET Obtain information about a resource http://example.com/api/employee
(retrieve employees list)
GET Obtain information about a resource http://example.com/api/employee/123
(retrieve employee #123)
POST Create a new resource http://example.com/api/employee
(create a new employee, from data provided with the request)
PUT Update a resource http://example.com/api/employee/123
(update employee#123, from data provided with the request)
DELETE Delete a resource http://example.com/api/employee/123
(delete employee #123)

Flask Microframework

Flask is a simple, yet very powerful Python web framework. Hence, we can use it for creating RESTful Web APIs.

Install Flask on Pi

sudo apt-get install flask

Start with very simple application and testing Flask framework:

from flask import Flask

app = Flask(__name__)

@app.route('/')
def index():
    return "Hello World!"

if __name__ == '__main__':
    app.run(debug=True)

Save the file as firstflask.py. Now to run the application just type sudo python firstflask.py in terminal.

This will run the application on port 5000 by default and when you type the URL: http://192.168.1.100:5000 you will see the output “Hello World!” in browser”

**Note: I used http://192.168.1.100 as I have configured static IP in Pi. You can change in your case as per your configuration.

Normally, we want to return JSON object. Hence, modifying our code to return json.

from flask import Flask, jsonify

app = Flask(__name__)

employees = [
    {
        'id': 1,
        'name': 'Vishal'
    },
    {
        'id': 2,
        'title': 'Amisha'
    }
]

@app.route('/todo/api/v1.0/tasks', methods=['GET'])
def get_tasks():
    return jsonify({'tasks': tasks})

@app.route('/')
def index():
    return "Hello World!"

@app.route('/api/employees', methods=['GET'])
def get_employees():
    return jsonify({'employees': employees})


if __name__ == '__main__':
    app.run(debug=True)

Now, if you type URL: http://192.168.1.100:5000/api/employees you will get all the users in json format.

{
  "employees": [
    {
      "id": 1, 
      "name": "Vishal"
    }, 
    {
      "id": 2, 
      "title": "Amisha"
    }
  ]
}

It is so simple to create Web Api that returns the JSON document. We also want to add basic security now to our code so that we can fully secure it. We are going to use Basic Authentication for securing Web Api’s.

Securing Web Service

First we will have to install Flask-HttpAuth. We do it by executing below lines in terminal:

pip install Flask-HTTPAuth

Very basic authentication example.

from flask import Flask
from flask_httpauth import HTTPBasicAuth

app = Flask(__name__)
auth = HTTPBasicAuth()

users = {
    "john": "hello",
    "susan": "bye"
}

@auth.get_password
def get_pw(username):
    if username in users:
        return users.get(username)
    return None

@app.route('/')
@auth.login_required
def index():
    return "Hello, %s!" % auth.username()

if __name__ == '__main__':
    app.run()

Code Explanation:

  • We import namespace from flask_httpauth import HTTPBasicAuth 
  • We define auth variable as auth = HTTPBasicAuth() 
  • Next we define object of users. We have username and password combination. We are going to eventually store it in secure storage.
  • We decorate the method as @auth.get_password. So this method will be called when we want to get password for a particular user. We pass username here. It is done by below function:
    @auth.get_password
    def get_pw(username):
        if username in users:
            return users.get(username)
        return None
    
  • We just need to decorate the methods that require the authentication as @auth.login_required
  • You might have also noticed routes which are decorated as @app.route(‘/’). The requested will be routed to this methods.

That’s all the basics. Now below code will be self explanatory. I have merged all above code with needed calls to control robot. Below APIs does the job:

API Control Method
/api/login Login POST
/api/forward Forward GET
/api/lighton Light On on PORT 37 GET
/api/lightoff Light Off on PORT 37 GET
/api/reverse Reverse GET
/api/left Left GET
/api/right Right GET
/api/brake Brake GET
/api/frontsensor FrontSensor Reading GET
/api/detailsensorreadings Detail Sensor Reading GET

I have modified code to run on port 9000 by this line:

app.run(debug=True, host='0.0.0.0', port=9000)

Here, I have also used a code to allow cross origin requests by this code. We just need to decorate the method.

@cross_origin()

Here is my entire code:

import RPi.GPIO as GPIO  # calling for header file which helps in using GPIOs of PI
import time
from flask import Flask, jsonify
from flask_cors import CORS, cross_origin
from flask_httpauth import HTTPBasicAuth

app = Flask(__name__)
cors = CORS(app)
auth = HTTPBasicAuth()
users = {
    "vishal": "Password",
    "amisha": "Password",
    "neeta": "Password",
    "nishita": "Password",
}
employees = [
    {
        'id': 1,
        'name': 'Vishal'
    },
    {
        'id': 2,
        'title': 'Amisha'
    }
]

app.config['CORS_HEADERS'] = 'Content-Type'

TRIG = 18
ECHO = 22
servo_pin = 40
duty_cycle = 7.5

GPIO.setmode(GPIO.BOARD)
GPIO.setup(TRIG, GPIO.OUT)
GPIO.setup(ECHO, GPIO.IN)
GPIO.setup(servo_pin, GPIO.OUT)
GPIO.setup(11, GPIO.OUT)
GPIO.setup(13, GPIO.OUT)
GPIO.setup(15, GPIO.OUT)
GPIO.setup(16, GPIO.OUT)
GPIO.setup(37, GPIO.OUT)
GPIO.setwarnings(False)

@auth.get_password
def GetPassword(username):
    if(username in users):
        return  users.get(username)
    return None

def ResetServoToFront():
    GPIO.output(TRIG, False)
    pwm_servo = GPIO.PWM(servo_pin, 50)
    pwm_servo.start(7.5)
    pwm_servo.ChangeDutyCycle(7.5)
    time.sleep(0.1)

def ReadSensorReading():
    GPIO.output(TRIG, True)
    time.sleep(0.00001)
    GPIO.output(TRIG, False)

    while GPIO.input(ECHO) == 0:
        pulse_start = time.time()

    while GPIO.input(ECHO) == 1:
        pulse_end = time.time()

    pulse_duration = pulse_end - pulse_start
    distance = pulse_duration * 17150
    distance = round(distance, 2)
    return distance

def GetSensorReadingsWithServo():
    GPIO.output(TRIG, False)
    pwm_servo = GPIO.PWM(servo_pin, 50)
    pwm_servo.start(duty_cycle)
    pwm_servo.ChangeDutyCycle(7.5)
    time.sleep(0.1)

    pwm_servo.ChangeDutyCycle(13)
    time.sleep(0.1)
    distincmsleft = ReadSensorReading()
    print "distincmsleft " + str(distincmsleft)

    time.sleep(0.1)
    pwm_servo.ChangeDutyCycle(10)
    time.sleep(0.1)
    distincmsleft1 = ReadSensorReading()
    print "distincmsleft1 " + str(distincmsleft1)

    time.sleep(0.1)
    pwm_servo.ChangeDutyCycle(7.5)
    time.sleep(0.1)
    distincmsstraight = ReadSensorReading()
    print "distincmsstraight " + str(distincmsstraight)

    time.sleep(0.1)
    pwm_servo.ChangeDutyCycle(5)
    time.sleep(0.1)
    distincmsright1 = ReadSensorReading()
    print "distincmsright1 " + str(distincmsright1)

    time.sleep(0.1)
    pwm_servo.ChangeDutyCycle(3)
    time.sleep(0.1)
    distincmsright = ReadSensorReading()
    print "distincmsright " + str(distincmsright)
    time.sleep(0.1)

    pwm_servo.start(duty_cycle)
    time.sleep(0.1)

    return distincmsleft, distincmsleft1, distincmsstraight, distincmsright1, distincmsright

def Forward():
    GPIO.output(11, 1)
    GPIO.output(13, 0)
    GPIO.output(15, 1)
    GPIO.output(16, 0)
    ResetServoToFront()

def LightOn():
    GPIO.output(37, 1)

def LightOff():
    GPIO.output(37, 0)

def Reverse():
    GPIO.output(11, 0)
    GPIO.output(13, 1)
    GPIO.output(15, 0)
    GPIO.output(16, 1)
    ResetServoToFront()

def Right():
    GPIO.output(11, 1)
    GPIO.output(13, 0)
    GPIO.output(15, 0)
    GPIO.output(16, 1)
    ResetServoToFront()

def Left():
    GPIO.output(11, 0)
    GPIO.output(13, 1)
    GPIO.output(15, 1)
    GPIO.output(16, 0)
    ResetServoToFront()

def Brake():
    GPIO.output(11, 1)
    GPIO.output(13, 1)
    GPIO.output(15, 1)
    GPIO.output(16, 1)
    ResetServoToFront()

@app.route('/api/login', methods=['POST'])
@cross_origin()
@auth.login_required
def index():
    return jsonify({'Login':'Success'})

@app.route('/api/employees', methods=['GET'])
def get_users():
    return jsonify({'employees': employees})

@app.route('/api/forward', methods=['GET'])
@cross_origin()
@auth.login_required
def forward():
    try:
        Forward()
        return jsonify({'forward': "1"})
    except:
        return jsonify({'forward': "0"})

@app.route('/api/lighton', methods=['GET'])
@cross_origin()
@auth.login_required
def lighton():
    try:
        LightOn()
        return jsonify({'light': "1"})
    except:
        return jsonify({'light': "-1"})


@app.route('/api/lightoff', methods=['GET'])
@cross_origin()
@auth.login_required
def lightoff():
    try:
        LightOff()
        return jsonify({'light': "0"})
    except:
        return jsonify({'light': "-1"})

@app.route('/api/reverse', methods=['GET'])
@cross_origin()
@auth.login_required
def reverse():
    try:
        Reverse()
        return jsonify({'reverse': "1"})
    except:
        return jsonify({'reverse': "0"})

@app.route('/api/left', methods=['GET'])
@cross_origin()
@auth.login_required
def left():
    try:
        for x in range(1, 3, 1):
            Left()
            time.sleep(0.1)
        Brake()
        return jsonify({'left': "1"})
    except:
        return jsonify({'left': "0"})

@app.route('/api/right', methods=['GET'])
@cross_origin()
@auth.login_required
def right():
    try:
        for x in range(1, 3, 1):
            Right()
            time.sleep(0.1)
        Brake()
        return jsonify({'right': "1"})
    except:
        return jsonify({'right': "0"})

@app.route('/api/brake', methods=['GET'])
@cross_origin()
@auth.login_required
def brake():
    try:
        Brake()
        return jsonify({'brake': "1"})
    except:
        return jsonify({'brake': "0"})

@app.route('/api/frontsensor', methods=['GET'])
@cross_origin()
@auth.login_required
def frontsensor():
    try:
        return jsonify({'reading': ReadSensorReading()})
    except:
        return jsonify({'reading': "-1"})

@app.route('/api/detailsensorreadings', methods=['GET'])
@cross_origin()
@auth.login_required
def detailsensorreadings():
    try:
        return jsonify({'detailsensorreadings': GetSensorReadingsWithServo()})
    except:
        return jsonify({'detailsensorreadings': "-1"})

if __name__ == '__main__':
    try:
        app.run(debug=True, host='0.0.0.0', port=9000)
    finally:
        print("Cleaning Up!")
        GPIO.cleanup()
Please follow and like us:

Leave a Reply

Your email address will not be published. Required fields are marked *