Part 1: Create and Sync Embeddings (AWS Bedrock with Cohere and Postgres with pgvector)

We will start by deploying two Pulsar Functions (CreateEmbedding1 and PGUpsert1 shown in green) to create embeddings and upsert them to our Postgres database.

The video to the right is a high level overview of deploying and testing the Pulsar Functions.

Deploy CreateEmbedding1

Code for this example can be found in streamnativerag1 class CreateCohereEmbedding.

To deploy:

  1. Navigate terminal to the folder containing streamnativerag1.zip.
  2. Execute the following pulsarctl command. Be sure to edit the tenant in three places in the command where you would like to deploy the Pulsar Function (–input, –output, –tenant).
pulsarctl functions create --classname streamnativerag1.CreateCohereEmbedding --py ./streamnativerag1.zip --inputs summitstudent1/developer/topicA --output summitstudent1/developer/embedding1 --tenant summitstudent1 --namespace developer --name CreateEmbedding1 --secrets '{"BEDROCKSECRET1": {"path": "bedrocksecret", "key": "accesskey"}, "BEDROCKSECRET2": {"path": "bedrocksecret", "key": "secretaccesskey"}}'

If the Pulsar Function starts deploying, you should see:

Created CreateEmbedding1 successfully

It may take a minute or two for the Create Embedding Pulsar Function to deploy. Once fully deployed, you should CreateEmbedding1 has a Status of Running on the Functions page for your tenant and namespace. If you see any System Exceptions, view Troubleshooting Pulsar Functions.

The Pulsar Function will make a call to AWS Bedrock to create a search_document type embedding from Cohere. We will create a search_query type in a later example.

Test CreateEmbedding1

To test our Python Pulsar Function, we need to produce a string message to topic specified in the –inputs field of the pulsarctl functions create command. In the example above this is summitstudent1/developer/topicA. This can be done easily using the Rest API with a walkthrough available here.

We start by publishing a message describing an animal. Once we’ve confirmed CreateEmbedding1 and PGUpsert1 are working, we can create embeddings about different animals.

curl -X POST https://<SERVER ENDPOINT>/admin/rest/topics/v1/persistent/summitstudent1/developer/topicA/message \
  --header 'Authorization: Bearer <JWT TOKEN>' \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/octet-stream' \
  --data-binary 'Cats can be many different colors.'

If the Pulsar Function triggered, you should see 1 in the Messages column.

If you see any System Exceptions or User Exceptions, view Troubleshooting Pulsar Functions.

We will use the UI to check the results in the output topic. Navigate to the correct tenant and namespace where you deployed the Pulsar Function. Select the embedding1 topic.

Before trying to peek at the messages, be sure you see some bytes in the Storage Size. While messages written to a topic without a subscription are by default eligible for deletion, they would not normally be deleted right away. If the storage size is still zero right after triggering the Pulsar Function, this could be a sign that the Pulsar Function did not produce results to the output topic correctly. View the section on Troubleshooting Pulsar Functions for more information.

To peek at the messages, navigate to the Details tab and create a subscription at the bottom of the page. Here I created a subscription called mysubscription and there is one message in the backlog.

To view the message, navigate to the Messages tab, select the partition embedding1-partition-0, the subscription mysubscription, and select a number of messages to peek. You should see the message in JSON format with primary_key, original_text, and vector fields.

In the next section we will upsert this vector to AWS RDS Postgres.

Deploy PGUpsert1

Code for this example can be found in streamnativerag1 class UpsertEmbedding.

To deploy:

  1. Navigate terminal to the folder containing streamnativerag1.zip.
  2. Execute the following pulsarctl command. Be sure to edit the tenant in three places in the command where you would like to deploy the Pulsar Function (–input, –output, –tenant). Be sure to use your postgres_endpoint in the user-config. The code assumes you are using the default postgres user, but this would be a quick change in the code! The postgres user’s password must be stored as a secret for the command to work. This should have already been completed here.
pulsarctl functions create --classname streamnativerag1.UpsertEmbedding --py ./streamnativerag1.zip --inputs summitstudent1/developer/embedding1 --output summitstudent1/developer/upsertcomplete --tenant summitstudent1 --namespace developer --name PGUpsert1 --secrets '{"PGSECRET": {"path": "postgrespassword", "key": "mypassword"}}' --user-config "{\"postgres_endpoint\":\"database-1.cluster-cjkgkqwq8ku7.us-east-1.rds.amazonaws.com\"}"

If the Pulsar Function starts deploying, you should see:

Created PGUpsert1 successfully

It may take a minute or two for the function to deploy. Once fully deployed, you should see the Status is Running for PGUpsert1. If you see any System Exceptions, view Troubleshooting Pulsar Functions.

The code will create create a Postgres connection, create a table called myitems if it doesn’t exist (vector size 1024, original text of size 2000), and then upsert the vector embedding from the triggering message with the id and original text.

try:
    self.pg
except:
    postgresendpoint = context.get_user_config_value("postgres_endpoint")
    DB_HOST = postgresendpoint
    DB_PORT = 5432
    DB_NAME = 'postgres'
    DB_USER = 'postgres'
    DB_PASSWORD = context.get_secret("PGSECRET")
    self.pg = psycopg2.connect(
        host=DB_HOST,
        port=DB_PORT,
        database=DB_NAME,
        user=DB_USER,
        password=DB_PASSWORD,
        connect_timeout=10
    )

cursor = self.pg.cursor()

cursor.execute("""
    CREATE TABLE IF NOT EXISTS myitems (
    id VARCHAR(50) PRIMARY KEY,
    embedding VECTOR(1024),
    original_text VARCHAR(2000)
);
""")

self.pg.commit()

data = json.loads(record)

cursor.execute("INSERT INTO items5 (id,original_text,embedding) VALUES (%s, %s, %s)", (data["primary_key"], data["original_text"], data["vector"]))

self.pg.commit()

Test PGUpsert1

To test PGUpsert1, repeat the process of publishing to topicA using the Rest API. This will trigger CreateEmbedding1 to write the vector to embedding1.

curl -X POST https://<SERVER ENDPOINT>/admin/rest/topics/v1/persistent/summitstudent1/developer/topicA/message \
  --header 'Authorization: Bearer <JWT TOKEN>' \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/octet-stream' \
  --data-binary 'Cats can be many different colors.'

If PGUpsert1 triggered, you should see 1 in the Messages column.

Before checking the output of PGUpsert1, let’s look at the subscriptions on the input topic, embedding1. You should see a Shared subscription for PGUpsert1.

Since we know one message was processed by PGUpsert1, but the backlog is 0, the message likely processed correctly. In addition to upserting the data to Postgres, we’re publishing the primary_key using the function return. We can view this in topic upsertcomplete.

Querying Postgres Database Using Python

After installing any required dependencies, you can execute the following Python code directly from your machine to view all rows in the Postgres database.

import psycopg2
from pgvector.psycopg2 import register_vector

DB_HOST = '<POSTGRES ENDPOINT>'
DB_PORT = 5432
DB_NAME = 'postgres'
DB_USER = 'postgres'
DB_PASSWORD = '<POSTGRES PASSWORD>'

conn = psycopg2.connect(
    host=DB_HOST,
    port=DB_PORT,
    database=DB_NAME,
    user=DB_USER,
    password=DB_PASSWORD,
    connect_timeout=10
)

cursor = conn.cursor()

cursor.execute("""
SELECT * FROM myitems;
""")

results = cursor.fetchall()
print(results)

cursor.execute("""
SELECT COUNT(*) FROM myitems;
""")

results = cursor.fetchall()
print(results)

cursor.close()
conn.close()

The upserted vector will print to the terminal along with a count of all rows in the table.


[('8641feb8-df24-4c92-9ced-c621c9835eb0', '[0.018692017,0.015792847,0.0016384125,-0.026489258,-0.005897522,-0.0065689087,-0.009628296,-0.020217896,-0.018569946,0.019210815,-0.0093307495,-0.006790161,-0.047943115,-0.017196655,0.0025463104,-0.039794922,0.04537964,-0.009147644,0.03250122,0.0007414818,-0.018661499,0.02381897,-0.0010986328,0.0030269623,0.014091492,-0.02684021,-0.06970215,0.0013551712,0.0035533905,-0.033447266,-0.012382507,0.051849365,0.029724121,0.004245758,-0.01737976,0.07098389,-0.016921997,0.005874634,0.018798828,0.0061187744,0.0129852295,-0.017623901,0.011680603,0.011489868,-0.044006348,-0.020431519,0.025863647,-0.019866943,0.035064697,-0.025894165,-0.008628845,-0.024429321,-0.0069999695,-0.008094788,-0.03704834,0.019866943,0.02986145,0.015388489,0.04776001,-0.08331299,-0.07165527,-0.0026741028,0.042175293,0.0063552856,-0.0022144318,-0.029541016,-0.0008687973,0.023345947,0.025924683,0.002796173,0.0044059753,-0.0028839111,0.02658081,-0.066101074,-0.000729084,0.041625977,0.008102417,0.036254883,-0.0023937225,-0.06604004,0.024536133,0.034332275,0.022964478,-0.014556885,-0.0006952286,-0.0021877289,0.02293396,-0.020843506,-0.020080566,-0.022399902,0.00598526,0.043945312,-0.029281616,0.043426514,-0.008964539,0.012001038,-0.037109375,-0.043792725,-0.02017212,-0.043670654,0.0015802383,-0.024124146,-0.03314209,-0.0063591003,0.004486084,0.028839111,0.012916565,0.017349243,0.06774902,0.02319336,0.016204834,0.04815674,0.014274597,0.049621582,-0.03475952,-0.01449585,-0.02268982,-0.0056533813,-0.044128418,0.017318726,0.024749756,0.030792236,0.03375244,-0.099243164,-0.025222778,-0.028869629,-0.027282715,-0.03933716,-0.0012750626,-0.0069351196,-0.033172607,0.03439331,-0.02734375,-0.019683838,-0.006614685,-0.022567749,-0.04171753,0.0011768341,-0.008125305,-0.005130768,-0.008216858,-0.02583313,0.028152466,0.014404297,-0.019729614,0.035095215,0.04055786,-0.018127441,-0.012001038,-0.024169922,-0.012245178,0.007457733,-0.021652222,0.055664062,0.04547119,0.024459839,-0.026260376,0.031341553,0.03604126,0.06921387,0.01725769,-0.0056915283,0.060394287,0.025619507,0.029632568,0.018981934,-0.02748108,0.011253357,0.019378662,0.0031642914,-0.035705566,-0.0009784698,0.038024902,-0.016921997,-0.008354187,0.014282227,-0.013832092,0.010696411,0.047302246,-0.03262329,-0.013633728,-0.043304443,0.054595947,-0.007041931,-0.076538086,-0.053833008,0.002784729,0.018173218,0.054260254,0.031311035,0.0015125275,-0.038269043,0.01361084,-0.00054979324,-0.034057617,0.008552551,-0.019577026,0.041290283,-0.03564453,0.011810303,-0.008377075,0.0016899109,0.027450562,-0.038391113,-0.0025577545,-0.019989014,0.005859375,0.01159668,0.031433105,0.040252686,-0.008140564,-0.024017334,-0.0007662773,-0.0032691956,-0.010215759,0.0052375793,-0.026687622,-0.054595947,-0.0031032562,-0.041107178,0.051239014,0.014846802,0.021911621,0.07940674,-0.008224487,0.042114258,-0.026428223,0.013687134,-0.06652832,0.06109619,-0.049560547,-0.021575928,0.0043678284,0.02444458,-0.0058174133,-0.0036010742,0.032470703,0.021087646,0.025115967,0.0039024353,0.03375244,0.029434204,-0.029022217,-0.03161621,0.043792725,0.011680603,0.01676941,-0.009857178,-0.0074920654,-0.005088806,0.05038452,-0.036315918,0.027954102,0.010520935,-0.008407593,0.043395996,0.044006348,0.006000519,-0.020401001,0.0519104,-0.03314209,0.04638672,-0.003818512,0.007209778,-0.037384033,-0.05291748,-0.010749817,-0.0066947937,0.034240723,-0.059326172,0.024108887,-0.030303955,-0.0068092346,-0.005329132,0.003698349,0.032318115,-8.273125e-05,-0.0021190643,-0.028930664,0.0034370422,0.006088257,0.031204224,0.028289795,0.008659363,-0.006149292,-0.009849548,0.033111572,0.027374268,-0.007423401,0.044403076,0.058166504,0.052215576,0.026397705,-0.0045547485,-0.021575928,0.047058105,0.048339844,0.025665283,-0.0262146,0.01071167,-0.008224487,0.013961792,-0.01209259,-0.0015125275,-0.017181396,0.045715332,0.007259369,-0.010818481,-0.0127334595,0.007709503,-0.023086548,0.025924683,0.017044067,-0.013496399,0.031402588,-0.013519287,0.02835083,-0.008346558,0.01285553,0.01121521,-0.0234375,0.004524231,-0.018249512,-0.03857422,0.020751953,0.011741638,-0.061309814,-0.004749298,-0.07702637,-0.008911133,0.044555664,-0.045288086,0.02130127,-0.050811768,0.0071105957,-0.024734497,-0.005443573,0.1003418,0.0061912537,-0.006504059,0.0121154785,0.003255844,0.010810852,-0.024383545,0.029296875,-0.06933594,-0.022064209,-0.01071167,0.02670288,0.012039185,0.0030403137,0.007003784,-0.0049324036,0.047210693,0.05697632,-0.057495117,-0.01322937,0.06274414,0.022766113,-0.04525757,0.005027771,0.0047950745,-0.0236969,-0.04510498,0.012245178,-0.048553467,-0.02558899,0.0010786057,-0.0063591003,0.0063819885,-0.0770874,-0.002462387,0.016921997,0.033996582,0.025100708,0.033966064,-0.00434494,-0.041137695,-0.03152466,-0.03845215,0.011245728,-0.01247406,0.039001465,-0.016098022,-0.008522034,0.0095825195,-0.03555298,0.009017944,0.0049095154,0.026992798,-0.031585693,-0.009979248,-0.01828003,0.013023376,0.02230835,-0.0003299713,0.0009994507,-0.03414917,-0.0871582,-0.023117065,-0.018066406,-0.00038719177,-0.025466919,-0.0037078857,-0.054473877,-0.0020618439,-0.037872314,-0.03475952,0.03466797,0.099487305,0.000729084,-0.06286621,0.013938904,-0.048858643,-0.0071792603,0.034606934,-0.00920105,0.014701843,-0.051605225,0.008369446,0.039154053,-0.022766113,-0.030807495,-0.0049743652,-0.012161255,0.026916504,0.027954102,0.04373169,0.007987976,-0.07519531,-0.029403687,-0.044830322,0.016937256,0.019439697,-0.036621094,-0.008781433,-0.05328369,-0.025222778,0.01878357,-0.05429077,0.025497437,0.02116394,0.019210815,0.020187378,-0.0031337738,-0.013473511,0.04437256,0.013092041,-0.019836426,-0.008613586,-0.057556152,0.004711151,-0.019836426,0.021057129,-0.0004339218,-0.0015983582,0.020828247,0.043121338,-0.006713867,-0.008934021,0.033233643,-0.03930664,-0.06329346,0.016525269,-0.04144287,0.018096924,-0.008224487,-0.008430481,-0.04043579,-0.009651184,-0.039245605,0.051757812,0.03390503,0.007083893,0.009567261,0.0020427704,-0.026916504,-0.0012817383,-0.022659302,0.00086927414,0.012901306,-0.018112183,-0.01838684,0.009773254,0.059783936,-0.012779236,0.0011873245,0.043273926,0.010032654,-0.00079250336,-0.042663574,-0.017501831,-0.0039138794,-0.038879395,-0.034362793,0.0019836426,-0.01676941,-0.0070114136,-0.00045371056,-0.051696777,-0.07128906,-0.004787445,0.02961731,-0.014259338,-0.0038089752,-0.03277588,0.013298035,-0.015029907,0.042999268,0.038116455,-0.0026168823,-0.0121154785,0.023376465,-0.05126953,-0.005104065,0.04852295,-0.027252197,-0.053710938,0.0115356445,-0.0013580322,-0.010444641,-0.039398193,0.039794922,0.029434204,-0.012672424,0.003025055,0.008308411,0.04006958,-0.05166626,-0.022247314,-0.018753052,0.0009417534,0.056762695,0.012184143,0.017456055,0.02571106,0.0014209747,-0.016921997,-0.025756836,-0.009521484,-0.004878998,-0.02810669,-0.044830322,0.0071105957,0.0017967224,-0.056640625,-0.022888184,0.043395996,0.0016078949,0.0035953522,0.0385437,-0.0052490234,-0.044891357,-0.019515991,-0.023284912,0.030548096,-0.0077323914,0.02432251,-0.059814453,-0.03286743,0.04977417,0.03475952,-0.02607727,-0.036071777,0.04208374,-0.0345459,0.038391113,-0.04043579,0.021484375,-0.041931152,-0.021591187,-0.0050849915,0.017242432,0.016799927,0.023132324,0.025787354,-0.023040771,-0.019943237,0.029907227,0.033233643,-0.0041275024,0.062072754,0.007789612,0.042388916,-0.0015687943,-0.07086182,0.0143585205,-0.057647705,-0.02407837,-0.049468994,0.07910156,-0.029785156,-0.019729614,-0.056152344,0.016815186,-0.013511658,0.07244873,0.015281677,-0.019821167,0.022094727,0.023712158,-0.021255493,0.027282715,-0.009033203,0.015655518,-0.041381836,0.021987915,-0.004043579,-0.018371582,0.01776123,0.015838623,-0.050048828,0.03616333,0.027908325,0.025375366,0.027496338,0.0357666,-0.0042152405,-0.031051636,-0.03125,0.025497437,-0.018569946,0.03515625,0.025970459,-0.033477783,-0.019012451,0.015258789,-0.011260986,-0.005207062,0.022232056,-0.0072288513,0.010597229,-0.070373535,-0.047821045,-0.019821167,0.04067993,-0.021942139,-0.029571533,0.043823242,0.07470703,0.003791809,-0.0078048706,0.000834465,-0.015960693,-0.0035972595,-0.04119873,0.012634277,0.020233154,-0.016189575,0.017837524,0.0003027916,0.021865845,0.0035076141,-0.031402588,0.034851074,-0.014259338,-0.036621094,0.032714844,0.0031986237,-0.021942139,0.013938904,0.06585693,-0.023925781,0.011238098,0.059143066,-0.00907135,0.0362854,0.074523926,-0.027160645,0.016189575,0.04159546,0.02015686,0.008392334,-0.0028629303,-0.014640808,0.024536133,0.012634277,0.011375427,0.016204834,0.0011911392,-0.014907837,0.018356323,0.0012292862,0.023239136,-0.036834717,0.012016296,0.03503418,0.013053894,0.016403198,-0.05444336,-0.070373535,-0.04248047,0.0017910004,-0.055999756,0.008613586,0.0625,-5.9962273e-05,-0.07800293,0.009887695,-0.010131836,-0.0052604675,-0.036254883,0.072143555,-0.013290405,0.051849365,0.043304443,-0.033721924,-0.028762817,0.01108551,0.03161621,0.0075950623,0.00869751,-0.02142334,0.0019321442,0.0065574646,0.0011301041,0.016906738,0.054748535,0.034179688,-0.044921875,-0.023162842,-0.08123779,-0.009666443,0.02456665,0.013427734,0.044677734,-0.034454346,0.007507324,0.028411865,0.04586792,-0.0005736351,0.0018024445,0.016113281,-0.008842468,0.028121948,-0.039764404,-0.018188477,0.026306152,-0.030578613,-0.019882202,0.007972717,-0.025299072,0.031829834,0.010070801,0.019851685,0.03591919,0.010307312,0.03765869,-0.01134491,-0.0005517006,0.037109375,-0.00894928,0.035369873,-0.017181396,-0.003648758,-0.011070251,0.026031494,-0.0206604,0.032165527,-0.008476257,-0.090270996,-0.011138916,-0.012557983,0.04928589,0.0143966675,0.02229309,-0.04800415,0.011054993,0.022033691,0.015182495,-0.005256653,0.037017822,-0.031433105,-0.08734131,-0.017959595,0.041992188,-0.04498291,0.052581787,0.027282715,0.072021484,0.013824463,0.051361084,-0.030548096,-0.04800415,0.0035438538,-0.024749756,0.04244995,0.0077209473,0.03314209,0.037139893,0.004585266,0.016937256,-0.013954163,-0.013847351,0.008628845,0.03677368,0.042877197,-0.017364502,0.0028877258,0.016998291,0.011566162,-0.087890625,0.015670776,-0.0025196075,0.020202637,-0.05706787,-0.029052734,0.07043457,0.021331787,-0.008293152,0.01763916,0.0067329407,-0.011795044,0.037597656,-0.02154541,0.072265625,-0.1274414,-0.035888672,-0.05609131,0.0040283203,0.059692383,0.05706787,0.0037231445,0.008300781,-0.038238525,-0.036621094,0.040252686,-0.0018634796,-0.00043296814,0.04071045,-0.0013799667,-0.02909851,0.017318726,0.06829834,0.03643799,-0.0029411316,0.014541626,-0.021408081,0.0519104,0.0077705383,0.059265137,0.011291504,-0.0009288788,0.0110321045,0.0024986267,-0.030670166,0.01739502,0.0045661926,0.056396484,0.020309448,0.012466431,0.0043754578,-0.009841919,-0.03225708,0.020248413,0.06707764,0.06506348,-0.02027893,-0.024261475,-0.042541504,-0.055786133,0.046783447,0.03503418,0.05407715,0.018371582,0.04650879,0.022949219,-0.001244545,0.025177002,-0.0057144165,0.008216858,0.0011081696,-0.0035572052,-0.017715454,-0.02078247,0.020751953,0.008453369,-0.02909851,0.019851685,0.030426025,0.03253174,0.021774292,-0.018096924,-0.004337311,-0.05429077,0.04547119,-0.01448822,-0.0001026988,0.00012117624,-0.0048294067,0.02772522,-0.017868042,0.007949829,0.03277588,-0.005710602,0.043029785,0.016723633,0.031921387,-0.006072998,-0.054870605,0.03314209,-0.025344849,0.037872314,-0.02986145,-0.03564453,0.030883789,0.0071754456,-0.026031494,0.014122009,-0.025466919,-0.026824951,0.03164673,-0.014526367,0.0026512146,-0.0039787292,-0.009002686,-0.015571594,0.012306213,0.014785767,0.0647583,-0.005859375,0.015563965,0.011062622,-0.046905518,-0.00623703,0.0029525757,-0.025253296,-0.03149414,0.024353027,-0.01928711,0.0064697266,-0.01727295,0.0027580261,-0.008338928,0.035125732,-0.028564453,0.039764404,0.0023651123,-0.014266968,-0.06347656,0.0027999878,-0.0069465637,0.006225586,-0.010421753,-0.029159546,0.07672119,-0.029663086,-0.0234375,-0.015716553,0.004047394,0.011779785,0.0021209717,-0.03353882,0.018356323,-0.011009216,0.035064697,-0.028442383,-0.09240723,0.009902954,0.013877869,-0.027297974,0.03781128,-0.0070648193,-0.011421204,-0.021194458,0.038635254,-0.0073890686,0.016693115,0.016342163,-0.0098724365,0.027709961,-0.0005774498,-0.035461426,0.008560181,0.01096344,-0.013374329,0.008338928,0.031585693,-0.034179688,0.021453857,0.005908966,0.056640625,0.014259338,-0.000300169,0.017593384,0.021240234,0.0069122314,0.049713135,0.021148682,-0.031280518,0.026504517,0.066345215,-0.054138184,-0.04309082,-0.005378723,0.044433594,-0.04208374,0.02218628,-0.044708252,0.010757446,-0.030761719,-0.00046300888,0.047332764,0.009864807,0.006538391,-0.0011358261,0.028060913,-0.050872803,0.021835327,-0.009124756,0.005718231,0.06958008,-0.039276123,-0.005760193,-0.031066895,-0.042938232,0.002696991,-0.030670166,0.012252808,-9.590387e-05,-0.00026154518,-0.021347046,-0.026535034,-0.0072784424]', 'cats can be many different colors.')]
[(1,)]

We suggest using the Rest API to publish more messages to topicA. This will add more embeddings about different animals to Postgres. In the next exercise we will complete a similarity search to find the most relevant data for a specific animal.