Deploying a Sequencing Pipeline¶
Introduction¶
Thus far, we have demonstrated several useful features of actors:
Deployment, execution, and handling using Tapis CLI
Logging to 3rd party services (Slack in our example)
Inter-Actor messaging
In this section of the tutorial, we will demonstrate this toolbox in a realistic example: a basic pipeline for validating DNA sequencing data.
We will leverage a few additional Actor capabilities beyond what we have covered previously:
- Actors can interact with TACC filesystems such as
/work
using the Tapis API We won’t cover this in depth today, but if you’re interested in Tapis Files, please consult the Tapis documentation
- Actors can interact with TACC filesystems such as
Actors can submit HPC jobs, by submitting to Tapis Applications
Reusing as much of our previous work as possible, pipeline deployment will consist of the following steps:
Deploy a Tapis App
eho-fastqc-0.11.9
that runs the FastQC tool as HPC jobDeploy new actor
fastqc-launcher
Change
upstream-messenger
so it uploads a fastq data file from the web to TACC Stockyard filesystemRe-deploy
upstream-messenger
so it sends the path to data file to thefastqc-launcher
Since deploying Tapis Apps is outside the scope of this tutorial, I have already completed the first step. If you are interested in learning how to deploy Tapis Applications, I encourage you to consult one of our in-depth Tapis tutorials .
Deploy a New Actor fastqc-launcher
¶
Here, we will create an Actor that launches an submits an HPC job to a Tapis App. Recall that Tapis Apps are wrappers around computationally intensive processes. They are similar to Actors in that they can be managed using the Tapis API, but unlike Actors, they are HPC jobs under the hood.
Similar to the upstream-messenger
, this new Actor (which we will call fastqc-launcher
) leverages the active Tapis client
to interact with the Tapis ecosystem. In this case, we will submit to a
Tapis Application instead of messaging another Actor. We instantiate a new
Actor as before:
tapis actors init --template default --actor-name fastqc-launcher
cd fastqc_launcher
echo '{}' > secrets.json
We edit the Actor source code in default.py
so it resembles:
import os
from agavepy.actors import get_context, get_client
def main():
context = get_context()
fastq_uri = context['raw_message']
print("Actor received message: {}".format(fastq_uri))
# Usually, one would perform some input validation before submitting
# a job to a Tapis App. Here, we simply validate that the path looks
# like a Tapis/Agave URI
assert fastq_uri.startswith('agave://')
# Get an active Tapis client
client = get_client()
# Using our Tapis client, submit a job to Tapis App eho-fastqc-0.11.9
body = {
"name": "fastqc-test",
"appId": "eho-fastqc-0.11.9",
"archive": False,
"inputs": {
"fastq": "agave://eho.work.storage/{}".format(os.path.basename(fastq_uri))
}
}
response = client.jobs.submit(body=body)
print("Successfully submitted job {} to Tapis App {}".format(response['id'], response['appId']))
if __name__ == '__main__':
main()
We can deploy this new Actor as usual, by building, pushing, and registering the custom Docker image as a new Actor:
$ docker build -t taccuser/fastqc-launcher:0.0.1 .
$ docker push taccuser/fastqc-launcher:0.0.1
$ tapis actors create --repo taccuser/fastqc-launcher:0.0.1 \
-n fastqc-launcher \
-d "Submits job to FastQC Tapis App"
Edit upstream-messenger
Source¶
Using your favorite text editor, edit the default.py
for upstream-messenger
so it looks like:
import os
from agavepy.actors import get_context, get_client
import requests
def main():
"""Main entrypoint"""
context = get_context()
m = context['raw_message']
print("Actor received message: {}".format(m))
# Get an active Tapis client
client = get_client()
# Pull in the downstream Actor ID from the environment
downstream_actor_id = context['DOWNSTREAM_ACTOR_ID']
# alternatively:
# downstream_actor_id = os.environ['DOWNSTREAM_ACTOR_ID']
# Using our Tapis client,
# upload our fastq file to TACC Stockyard using
url = "https://raw.githubusercontent.com/eho-tacc/fastqc_app/main/tests/data_R1_001.fastq"
systemId = 'eho.work.storage'
files_resp = client.files.importData(
fileName='example.fastq',
filePath='/',
systemId=systemId, urlToIngest=url)
# Using our Tapis client, send the message containing file path
# to the downstream Actor
message = "agave://{}/{}".format(systemId, files_resp['path'])
print("Sending message '{}' to {}".format(message, downstream_actor_id))
response = client.actors.sendMessage(actorId=downstream_actor_id, body={"message": message})
print("Successfully triggered execution '{}' on actor '{}'".format(response['executionId'], downstream_actor_id))
if __name__ == '__main__':
main()
Re-deploy Actor upstream-messenger
¶
Our Actor upstream-messenger
is still configured to send messages to hello-world-actor
.
We would instead like it to send messages to our new actor fastqc-launcher
, so we must
update it with a new DOWNSTREAM_ACTOR_ID
. Instead of deleting and deploying a new
Actor, we can instead:
Build and push an updated Docker image
Update the
DOWNSTREAM_ACTOR_ID
variable usingtapis actors update
$ docker build -t enho/upstream-messenger:0.0.2 .
$ docker push enho/upstream-messenger:0.0.2
$ tapis actors update --repo taccuser/upstream-messenger:0.0.2 \
-e DOWNSTREAM_ACTOR_ID=$FASTQC_LAUNCHER_ID \
MDfoobar7AOwx
+----------------+-----------------------------------+
| Field | Value |
+----------------+-----------------------------------+
| id | MDfoobar7AOwx |
| name | upstream-messenger |
| owner | taccuser |
| image | taccuser/upstream-messenger:0.0.2 |
| lastUpdateTime | 2021-08-26T20:33:20.320620 |
| status | SUBMITTED |
| cronOn | False |
+----------------+-----------------------------------+
Test the Pipeline¶
Send Message to upstream-messenger
Using CLI¶
Once the upsteam_messenger
Actor is READY, we can trigger a new
execution by sending it a message:
$ tapis actors submit -m 'hello, FastQC pipeline!' MDfoobar7AOwx
+-------------+----------------------------+
| Field | Value |
+-------------+----------------------------+
| executionId | MDanexec7AOwx |
| msg | hello, FastQC pipeline! |
+-------------+----------------------------+
As usual, we check the status of the execution, and show the logs when it finishes:
$ tapis actors execs show MDfoobar7AOwx MDanexec7AOwx
+-----------+-----------------------------+
| Field | Value |
+-----------+-----------------------------+
| actorId | MDfoobar7AOwx |
| apiServer | https://api.tacc.utexas.edu |
| id | MDanexec7AOwx |
| status | COMPLETE |
| workerId | wZvworker1KmQ |
+-----------+-----------------------------+
$ tapis actors execs logs MDfoobar7AOwx MDanexec7AOwx
Actor received message: hello, FastQC pipeline!
Sending message 'greetings, hello-world-actor!' to MqqbarbazBB8x
Successfully triggered execution '5P7foobarrrA6' on actor 'MqqbarbazBB8x'
Check File Upload¶
Using the Tapis CLI, we can check that the upstream-messenger
created the expected file:
$ tapis files show agave://eho.work.storage/example.fastq
+--------------+-------------------------------+
| Field | Value |
+--------------+-------------------------------+
| name | example.fastq |
| path | /work/06634/eho/example.fastq |
| lastModified | 20 seconds ago |
| length | 64431 |
| permissions | READ_WRITE |
| mimeType | application/octet-stream |
| type | file |
+--------------+-------------------------------+
Check Execution of Downstream fastqc-launcher
¶
Let’s check the status of the execution and inspect the logs:
$ tapis actors execs logs MqqbarbazBB8x wKoAJD5NykAKN
Logs for execution wKoAJD5NykAKN
Actor received message: agave://eho.work.storage/example.fastq
Successfully submitted job 6c9d5842-XXXX-XXXX-XXXX-f07b1f73b948-007 to Tapis App eho-fastqc-0.11.9
Check Job Submitted to FastQC Tapis App¶
Finally, let’s check that our test job using the FastQC Tapis App successfully finished. We can check the status of this job using the CLI:
$ tapis jobs show 6c9d5842-XXXX-XXXX-XXXX-f07b1f73b948-007
+--------------------+---------------------------------------------------------------------------------+
| Field | Value |
+--------------------+---------------------------------------------------------------------------------+
| accepted | 2021-09-22T21:39:06.147Z |
| appId | eho-fastqc-0.11.9 |
| appUuid | 4765625137153839596-XXXXXXXX-0001-005 |
| archive | False |
| archivePath | eho/archive/jobs/job-XXXXXXXX-XXXX-XXXX-afbf-f07b1f73b948-007 |
| archiveSystem | None |
| blockedCount | 0 |
| created | 2021-09-22T21:39:06.152Z |
| ended | 4 minutes ago |
| failedStatusChecks | 0 |
| id | 6c9d5842-3066-XXXX-XXXX-f07b1f73b948-007 |
| lastStatusCheck | 4 minutes ago |
| lastStatusMessage | Transitioning from status CLEANING_UP to FINISHED in phase ARCHIVING. |
| lastUpdated | 2021-09-22T21:40:52.194Z |
| maxHours | 0.5 |
| memoryPerNode | 1.0 |
| name | fastqc-test |
| nodeCount | 1 |
| owner | eho |
| processorsPerNode | 1 |
| remoteEnded | 4 minutes ago |
| remoteJobId | 8493758 |
| remoteOutcome | FINISHED |
| remoteQueue | normal |
| remoteStarted | 2021-09-22T21:39:42.702Z |
| remoteStatusChecks | 2 |
| remoteSubmitted | 5 minutes ago |
| schedulerJobId | None |
| status | FINISHED |
| submitRetries | 0 |
| systemId | eho.stampede2.execution |
| tenantId | tacc.prod |
| tenantQueue | aloe.jobq.tacc.prod.submit.DefaultQueue |
| visible | True |
| workPath | /scratch/06634/eho/eho/job-XXXXXXXX-XXXX-XXXX-XXXX-f07b1f73b948-007-fastqc-test |
+--------------------+---------------------------------------------------------------------------------+
We can also download and inspect the job outputs:
$ tapis jobs outputs download 6c9d5842-XXXX-XXXX-XXXX-f07b1f73b948-007
+-------------+-------+
| Field | Value |
+-------------+-------+
| downloaded | 11 |
| skipped | 0 |
| messages | 5 |
| elapsed_sec | 59 |
+-------------+-------+
$ cd 6c9d5842-3066-XXXX-XXXX-f07b1f73b948-007
$ cat ./*.err
# ...
Started analysis of example.fastq
$ cat ./*.out
Analysis complete for example.fastq